commit c6b4e8fd3a79811c5bd797ceae5e34d9920d3c01 Author: Krzysztof kuhy Rudnicki Date: Tue Apr 15 17:08:40 2025 +0200 chore: vibe coding psd project diff --git a/alarm-visualizer/pom.xml b/alarm-visualizer/pom.xml new file mode 100644 index 00000000..85650d32 --- /dev/null +++ b/alarm-visualizer/pom.xml @@ -0,0 +1,66 @@ + + + 4.0.0 + + com.anomaly + alarm-visualizer + 1.0-SNAPSHOT + + + 11 + 11 + + + + + + org.apache.kafka + kafka-clients + 2.8.1 + + + + com.google.code.gson + gson + 2.8.9 + + + + org.slf4j + slf4j-api + 1.7.32 + + + ch.qos.logback + logback-classic + 1.2.9 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + com.anomaly.visualizer.AlertVisualizer + + + + + + + + + diff --git a/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java new file mode 100644 index 00000000..9a81205c --- /dev/null +++ b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java @@ -0,0 +1,275 @@ +package com.anomaly.visualizer; + +import com.anomaly.model.TransactionAlert; +import com.google.gson.Gson; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.swing.*; +import javax.swing.table.DefaultTableModel; +import java.awt.*; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.List; + +public class AlertVisualizer { + private static final Logger logger = LoggerFactory.getLogger(AlertVisualizer.class); + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + private static final String GROUP_ID = "alert-visualizer-group"; + private static final String TOPIC = "alerts"; + private static final Gson gson = new Gson(); + + // UI Components + private static JFrame frame; + private static JTable alertTable; + private static DefaultTableModel tableModel; + private static JTextArea detailArea; + private static final List allAlerts = new ArrayList<>(); + private static final DateTimeFormatter formatter = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()); + + public static void main(String[] args) { + setupUI(); + + // Create consumer properties + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + // Create consumer + KafkaConsumer consumer = new KafkaConsumer<>(properties); + + // Subscribe to topic + consumer.subscribe(Collections.singletonList(TOPIC)); + + // Add shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + logger.info("Shutting down alert visualizer..."); + consumer.close(); + logger.info("Alert visualizer closed"); + })); + + // Poll for new alerts + try { + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + boolean newAlerts = false; + for (ConsumerRecord record : records) { + // Parse the alert + TransactionAlert alert = gson.fromJson(record.value(), TransactionAlert.class); + addAlert(alert); + newAlerts = true; + + // Display notification for new alert + displayNotification(alert); + } + + // Update the UI if new alerts arrived + if (newAlerts) { + updateAlertTable(); + } + } + } finally { + consumer.close(); + } + } + + private static void setupUI() { + frame = new JFrame("Transaction Alert Visualizer"); + frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); + frame.setSize(1024, 768); + + // Create table with columns + String[] columnNames = {"Time", "Alert Type", "Card ID", "User ID", "Amount", "Message"}; + tableModel = new DefaultTableModel(columnNames, 0); + alertTable = new JTable(tableModel); + alertTable.setSelectionMode(ListSelectionModel.SINGLE_SELECTION); + + // Add selection listener to show details when an alert is selected + alertTable.getSelectionModel().addListSelectionListener(e -> { + if (!e.getValueIsAdjusting()) { + int selectedRow = alertTable.getSelectedRow(); + if (selectedRow >= 0 && selectedRow < allAlerts.size()) { + showAlertDetails(allAlerts.get(selectedRow)); + } + } + }); + + JScrollPane tableScrollPane = new JScrollPane(alertTable); + + // Create detail panel + detailArea = new JTextArea(); + detailArea.setEditable(false); + JScrollPane detailScrollPane = new JScrollPane(detailArea); + + // Create map visualization panel + JPanel mapPanel = new JPanel() { + @Override + protected void paintComponent(Graphics g) { + super.paintComponent(g); + drawMap(g); + } + }; + + // Create split panes for layout + JSplitPane mainSplitPane = new JSplitPane(JSplitPane.VERTICAL_SPLIT, + tableScrollPane, new JSplitPane(JSplitPane.HORIZONTAL_SPLIT, detailScrollPane, mapPanel)); + mainSplitPane.setDividerLocation(300); + ((JSplitPane)mainSplitPane.getBottomComponent()).setDividerLocation(500); + + frame.add(mainSplitPane); + frame.setVisible(true); + } + + private static void addAlert(TransactionAlert alert) { + synchronized (allAlerts) { + allAlerts.add(0, alert); // Add at the beginning for newest first + } + } + + private static void updateAlertTable() { + SwingUtilities.invokeLater(() -> { + tableModel.setRowCount(0); // Clear table + + synchronized (allAlerts) { + for (TransactionAlert alert : allAlerts) { + String formattedTime = formatter.format(alert.getAlertTime()); + tableModel.addRow(new Object[]{ + formattedTime, + alert.getAlertType(), + alert.getCardId(), + alert.getUserId(), + String.format("$%.2f", alert.getAmount()), + alert.getMessage() + }); + } + } + }); + } + + private static void showAlertDetails(TransactionAlert alert) { + SwingUtilities.invokeLater(() -> { + StringBuilder sb = new StringBuilder(); + sb.append("ALERT DETAILS\n"); + sb.append("============================================\n\n"); + sb.append("Alert Type: ").append(alert.getAlertType()).append("\n"); + sb.append("Alert Time: ").append(formatter.format(alert.getAlertTime())).append("\n"); + sb.append("Transaction Time: ").append(formatter.format(alert.getTimestamp())).append("\n\n"); + sb.append("Card ID: ").append(alert.getCardId()).append("\n"); + sb.append("User ID: ").append(alert.getUserId()).append("\n"); + sb.append("Transaction Amount: $").append(String.format("%.2f", alert.getAmount())).append("\n\n"); + sb.append("Location: ").append(alert.getLatitude()).append(", ").append(alert.getLongitude()).append("\n\n"); + sb.append("Alert Message: ").append(alert.getMessage()).append("\n"); + + detailArea.setText(sb.toString()); + frame.repaint(); // Trigger map redraw to highlight selected alert + }); + } + + private static void drawMap(Graphics g) { + Graphics2D g2d = (Graphics2D) g; + int width = g2d.getClipBounds().width; + int height = g2d.getClipBounds().height; + + // Draw world map (simplified) + g2d.setColor(Color.LIGHT_GRAY); + g2d.fillRect(0, 0, width, height); + g2d.setColor(Color.DARK_GRAY); + g2d.drawRect(0, 0, width - 1, height - 1); + + // Draw grid lines + g2d.setColor(Color.GRAY); + for (int i = 0; i < width; i += 50) { + g2d.drawLine(i, 0, i, height); + } + for (int i = 0; i < height; i += 50) { + g2d.drawLine(0, i, width, i); + } + + // Get selected alert + int selectedRow = alertTable.getSelectedRow(); + TransactionAlert selectedAlert = null; + if (selectedRow >= 0 && selectedRow < allAlerts.size()) { + selectedAlert = allAlerts.get(selectedRow); + } + + // Draw alert points + synchronized (allAlerts) { + for (TransactionAlert alert : allAlerts) { + // Map lat/lon to screen coordinates + int x = (int) ((alert.getLongitude() + 180) / 360 * width); + int y = (int) ((90 - alert.getLatitude()) / 180 * height); + + // Color by alert type + if (alert.getAlertType().equals("AMOUNT_ANOMALY")) { + g2d.setColor(Color.RED); + } else if (alert.getAlertType().equals("LOCATION_ANOMALY")) { + g2d.setColor(Color.BLUE); + } else { + g2d.setColor(Color.ORANGE); + } + + // Make selected alert larger + if (alert == selectedAlert) { + g2d.fillOval(x - 8, y - 8, 16, 16); + } else { + g2d.fillOval(x - 4, y - 4, 8, 8); + } + } + } + + // Draw legend + g2d.setColor(Color.BLACK); + g2d.drawString("Legend:", width - 150, 20); + g2d.setColor(Color.RED); + g2d.fillOval(width - 140, 30, 10, 10); + g2d.setColor(Color.BLACK); + g2d.drawString("Amount Anomaly", width - 125, 40); + g2d.setColor(Color.BLUE); + g2d.fillOval(width - 140, 50, 10, 10); + g2d.setColor(Color.BLACK); + g2d.drawString("Location Anomaly", width - 125, 60); + g2d.setColor(Color.ORANGE); + g2d.fillOval(width - 140, 70, 10, 10); + g2d.setColor(Color.BLACK); + g2d.drawString("Frequency Anomaly", width - 125, 80); + } + + private static void displayNotification(TransactionAlert alert) { + SwingUtilities.invokeLater(() -> { + // Make a simple notification window + JDialog dialog = new JDialog(frame, "New Alert!", false); + dialog.setSize(400, 150); + dialog.setLocationRelativeTo(frame); + + JPanel panel = new JPanel(new BorderLayout()); + JLabel label = new JLabel("" + alert.getAlertType() + "
" + + alert.getMessage() + "
Card: " + alert.getCardId() + ""); + label.setBorder(BorderFactory.createEmptyBorder(10, 10, 10, 10)); + panel.add(label, BorderLayout.CENTER); + + JButton closeButton = new JButton("Dismiss"); + closeButton.addActionListener(e -> dialog.dispose()); + JPanel buttonPanel = new JPanel(); + buttonPanel.add(closeButton); + panel.add(buttonPanel, BorderLayout.SOUTH); + + dialog.add(panel); + dialog.setVisible(true); + + // Auto-dismiss after 5 seconds + Timer timer = new Timer(5000, e -> dialog.dispose()); + timer.setRepeats(false); + timer.start(); + }); + } +} diff --git a/anomaly-detector/pom.xml b/anomaly-detector/pom.xml new file mode 100644 index 00000000..1a18c827 --- /dev/null +++ b/anomaly-detector/pom.xml @@ -0,0 +1,81 @@ + + + 4.0.0 + + com.anomaly + anomaly-detector + 1.0-SNAPSHOT + + + 11 + 11 + 1.14.2 + 2.12 + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + + + + org.apache.flink + flink-connector-kafka_${scala.binary.version} + ${flink.version} + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${flink.version} + + + + com.google.code.gson + gson + 2.8.9 + + + + org.slf4j + slf4j-api + 1.7.32 + + + ch.qos.logback + logback-classic + 1.2.9 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + com.anomaly.detector.AnomalyDetector + + + + + + + + + + diff --git a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java new file mode 100644 index 00000000..b7ebe65a --- /dev/null +++ b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java @@ -0,0 +1,269 @@ +package com.anomaly.detector; + +import com.anomaly.model.Transaction; +import com.anomaly.model.TransactionAlert; +import com.google.gson.Gson; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.util.Collector; + +import java.util.*; +import java.time.Instant; + +public class AnomalyDetector { + + private static final String INPUT_TOPIC = "transactions"; + private static final String OUTPUT_TOPIC = "alerts"; + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + private static final Gson gson = new Gson(); + + public static void main(String[] args) throws Exception { + // Set up the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Configure Kafka consumer + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS); + properties.setProperty("group.id", "anomaly-detector"); + + // Create Kafka consumer + FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>( + INPUT_TOPIC, + new SimpleStringSchema(), + properties + ); + + // Parse JSON transactions + DataStream transactionStream = env + .addSource(consumer) + .map(new MapFunction() { + @Override + public Transaction map(String value) throws Exception { + return gson.fromJson(value, Transaction.class); + } + }); + + // Detect anomalies based on different metrics + // 1. Amount anomaly - sudden high-value transactions + DataStream amountAlerts = transactionStream + .keyBy(Transaction::getCardId) + .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(1))) + .process(new AmountAnomalyDetector()); + + // 2. Location anomaly - sudden change in location + DataStream locationAlerts = transactionStream + .keyBy(Transaction::getCardId) + .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(1))) + .process(new LocationAnomalyDetector()); + + // 3. Frequency anomaly - unusual number of transactions in short time + DataStream frequencyAlerts = transactionStream + .keyBy(Transaction::getCardId) + .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1))) + .process(new FrequencyAnomalyDetector()); + + // Union all alert streams + DataStream allAlerts = amountAlerts + .union(locationAlerts, frequencyAlerts); + + // Convert alerts to JSON and send to Kafka + allAlerts + .map(alert -> gson.toJson(alert)) + .addSink(new FlinkKafkaProducer<>( + OUTPUT_TOPIC, + new SimpleStringSchema(), + properties + )); + + // Execute the Flink job + env.execute("Credit Card Transaction Anomaly Detection"); + } + + // Detector for unusual transaction amounts + public static class AmountAnomalyDetector + extends ProcessWindowFunction { + + @Override + public void process(String cardId, Context context, Iterable transactions, + Collector out) { + List transactionList = new ArrayList<>(); + transactions.forEach(transactionList::add); + + if (transactionList.isEmpty()) return; + + // Calculate statistics + double averageAmount = transactionList.stream() + .mapToDouble(Transaction::getAmount) + .average() + .orElse(0); + + double stdDeviation = calculateStdDeviation(transactionList, averageAmount); + + // Check for anomalies (transactions that are more than 3 standard deviations from mean) + for (Transaction transaction : transactionList) { + if (stdDeviation > 0 && Math.abs(transaction.getAmount() - averageAmount) > 3 * stdDeviation) { + out.collect(new TransactionAlert( + "AMOUNT_ANOMALY", + transaction.getCardId(), + transaction.getUserId(), + transaction.getAmount(), + transaction.getLatitude(), + transaction.getLongitude(), + transaction.getTimestamp(), + "Unusual transaction amount detected: $" + transaction.getAmount() + + " (Average: $" + String.format("%.2f", averageAmount) + ")" + )); + } + } + } + + private double calculateStdDeviation(List transactions, double mean) { + return Math.sqrt(transactions.stream() + .mapToDouble(t -> Math.pow(t.getAmount() - mean, 2)) + .average() + .orElse(0)); + } + } + + // Detector for unusual transaction locations + public static class LocationAnomalyDetector + extends ProcessWindowFunction { + + // Map to store frequent locations for each card + private final Map> cardLocations = new HashMap<>(); + + @Override + public void process(String cardId, Context context, Iterable transactions, + Collector out) { + List transactionList = new ArrayList<>(); + transactions.forEach(transactionList::add); + + if (transactionList.isEmpty()) return; + + // Get or create location set for this card + Set frequentLocations = cardLocations.computeIfAbsent(cardId, k -> new HashSet<>()); + + // Process each transaction + for (Transaction transaction : transactionList) { + LocationPoint currentPoint = new LocationPoint(transaction.getLatitude(), transaction.getLongitude()); + + // If we have at least 3 frequent locations for this card + if (frequentLocations.size() >= 3) { + boolean isNearKnownLocation = false; + + // Check if current location is near any known frequent location + for (LocationPoint knownPoint : frequentLocations) { + if (calculateDistance(currentPoint, knownPoint) < 50) { // Less than 50km + isNearKnownLocation = true; + break; + } + } + + // If not near any known location, it might be an anomaly + if (!isNearKnownLocation) { + out.collect(new TransactionAlert( + "LOCATION_ANOMALY", + transaction.getCardId(), + transaction.getUserId(), + transaction.getAmount(), + transaction.getLatitude(), + transaction.getLongitude(), + transaction.getTimestamp(), + "Unusual transaction location detected at: " + + transaction.getLatitude() + ", " + transaction.getLongitude() + )); + } + } + + // Add current location to frequent locations (max 10 locations per card) + if (frequentLocations.size() < 10) { + frequentLocations.add(currentPoint); + } + } + } + + // Calculate distance between two points using Haversine formula (in km) + private double calculateDistance(LocationPoint p1, LocationPoint p2) { + final int R = 6371; // Earth radius in km + + double latDistance = Math.toRadians(p2.latitude - p1.latitude); + double lonDistance = Math.toRadians(p2.longitude - p1.longitude); + + double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2) + + Math.cos(Math.toRadians(p1.latitude)) * Math.cos(Math.toRadians(p2.latitude)) + * Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2); + + double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); + + return R * c; + } + + private static class LocationPoint { + private final double latitude; + private final double longitude; + + public LocationPoint(double latitude, double longitude) { + this.latitude = latitude; + this.longitude = longitude; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LocationPoint that = (LocationPoint) o; + return Double.compare(that.latitude, latitude) == 0 && + Double.compare(that.longitude, longitude) == 0; + } + + @Override + public int hashCode() { + return Objects.hash(latitude, longitude); + } + } + } + + // Detector for unusual transaction frequency + public static class FrequencyAnomalyDetector + extends ProcessWindowFunction { + + @Override + public void process(String cardId, Context context, Iterable transactions, + Collector out) { + List transactionList = new ArrayList<>(); + transactions.forEach(transactionList::add); + + // Get window info + long windowStart = context.window().getStart(); + long windowEnd = context.window().getEnd(); + long windowSizeMinutes = (windowEnd - windowStart) / (1000 * 60); + + // If there are more than 5 transactions in 5 minutes for the same card, flag it + if (transactionList.size() > 5) { + Transaction latestTransaction = transactionList.stream() + .max(Comparator.comparing(Transaction::getTimestamp)) + .orElse(transactionList.get(0)); + + out.collect(new TransactionAlert( + "FREQUENCY_ANOMALY", + latestTransaction.getCardId(), + latestTransaction.getUserId(), + latestTransaction.getAmount(), + latestTransaction.getLatitude(), + latestTransaction.getLongitude(), + latestTransaction.getTimestamp(), + "Unusual transaction frequency detected: " + transactionList.size() + + " transactions in " + windowSizeMinutes + " minutes" + )); + } + } + } +} diff --git a/anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java b/anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java new file mode 100644 index 00000000..89fddf4e --- /dev/null +++ b/anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java @@ -0,0 +1,66 @@ +package com.anomaly.model; + +import java.time.Instant; + +public class TransactionAlert { + private String alertType; + private String cardId; + private String userId; + private double amount; + private double latitude; + private double longitude; + private Instant timestamp; + private String message; + private Instant alertTime; + + public TransactionAlert() { + this.alertTime = Instant.now(); + } + + public TransactionAlert(String alertType, String cardId, String userId, double amount, + double latitude, double longitude, Instant timestamp, String message) { + this.alertType = alertType; + this.cardId = cardId; + this.userId = userId; + this.amount = amount; + this.latitude = latitude; + this.longitude = longitude; + this.timestamp = timestamp; + this.message = message; + this.alertTime = Instant.now(); + } + + // Getters and setters + public String getAlertType() { return alertType; } + public void setAlertType(String alertType) { this.alertType = alertType; } + public String getCardId() { return cardId; } + public void setCardId(String cardId) { this.cardId = cardId; } + public String getUserId() { return userId; } + public void setUserId(String userId) { this.userId = userId; } + public double getAmount() { return amount; } + public void setAmount(double amount) { this.amount = amount; } + public double getLatitude() { return latitude; } + public void setLatitude(double latitude) { this.latitude = latitude; } + public double getLongitude() { return longitude; } + public void setLongitude(double longitude) { this.longitude = longitude; } + public Instant getTimestamp() { return timestamp; } + public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; } + public String getMessage() { return message; } + public void setMessage(String message) { this.message = message; } + public Instant getAlertTime() { return alertTime; } + public void setAlertTime(Instant alertTime) { this.alertTime = alertTime; } + + @Override + public String toString() { + return "TransactionAlert{" + + "alertType='" + alertType + '\'' + + ", cardId='" + cardId + '\'' + + ", userId='" + userId + '\'' + + ", amount=" + amount + + ", latitude=" + latitude + + ", longitude=" + longitude + + ", message='" + message + '\'' + + ", alertTime=" + alertTime + + '}'; + } +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..e69de29b diff --git a/kafka-consumer-visualizer/pom.xml b/kafka-consumer-visualizer/pom.xml new file mode 100644 index 00000000..fba8f6d7 --- /dev/null +++ b/kafka-consumer-visualizer/pom.xml @@ -0,0 +1,66 @@ + + + 4.0.0 + + com.anomaly + kafka-consumer-visualizer + 1.0-SNAPSHOT + + + 11 + 11 + + + + + + org.apache.kafka + kafka-clients + 2.8.1 + + + + com.google.code.gson + gson + 2.8.9 + + + + org.slf4j + slf4j-api + 1.7.32 + + + ch.qos.logback + logback-classic + 1.2.9 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + com.anomaly.consumer.TransactionConsumer + + + + + + + + + diff --git a/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java b/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java new file mode 100644 index 00000000..493d9ead --- /dev/null +++ b/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java @@ -0,0 +1,178 @@ +package com.anomaly.consumer; + +import com.anomaly.model.Transaction; +import com.google.gson.Gson; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.swing.*; +import java.awt.*; +import java.time.Duration; +import java.util.*; +import java.util.List; + +public class TransactionConsumer { + private static final Logger logger = LoggerFactory.getLogger(TransactionConsumer.class); + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + private static final String GROUP_ID = "transaction-consumer-group"; + private static final String TOPIC = "transactions"; + private static final Gson gson = new Gson(); + + // UI Components + private static JFrame frame; + private static JTextArea logArea; + private static JPanel chartPanel; + private static final int MAX_DISPLAYED_TRANSACTIONS = 100; + private static final List recentTransactions = new ArrayList<>(); + + public static void main(String[] args) { + setupUI(); + + // Create consumer properties + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + // Create consumer + KafkaConsumer consumer = new KafkaConsumer<>(properties); + + // Subscribe to topic + consumer.subscribe(Collections.singletonList(TOPIC)); + + // Add shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + logger.info("Shutting down consumer..."); + consumer.close(); + logger.info("Consumer closed"); + })); + + // Poll for new data + try { + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + for (ConsumerRecord record : records) { + logMessage("Received: Key: " + record.key() + ", Value: " + record.value()); + + // Parse the transaction + Transaction transaction = gson.fromJson(record.value(), Transaction.class); + addTransaction(transaction); + } + + // Update the visualization + if (!records.isEmpty()) { + updateChart(); + } + } + } finally { + consumer.close(); + } + } + + private static void setupUI() { + frame = new JFrame("Transaction Visualizer"); + frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); + frame.setSize(800, 600); + + // Create log area + logArea = new JTextArea(); + logArea.setEditable(false); + JScrollPane scrollPane = new JScrollPane(logArea); + scrollPane.setPreferredSize(new Dimension(800, 200)); + + // Create chart panel + chartPanel = new JPanel() { + @Override + protected void paintComponent(Graphics g) { + super.paintComponent(g); + drawChart(g); + } + }; + chartPanel.setPreferredSize(new Dimension(800, 400)); + + // Add components to frame + frame.setLayout(new BorderLayout()); + frame.add(scrollPane, BorderLayout.SOUTH); + frame.add(chartPanel, BorderLayout.CENTER); + + frame.setVisible(true); + } + + private static void logMessage(String message) { + SwingUtilities.invokeLater(() -> { + logArea.append(message + "\n"); + logArea.setCaretPosition(logArea.getDocument().getLength()); + }); + } + + private static void addTransaction(Transaction transaction) { + synchronized (recentTransactions) { + recentTransactions.add(transaction); + if (recentTransactions.size() > MAX_DISPLAYED_TRANSACTIONS) { + recentTransactions.remove(0); + } + } + } + + private static void updateChart() { + SwingUtilities.invokeLater(() -> chartPanel.repaint()); + } + + private static void drawChart(Graphics g) { + Graphics2D g2d = (Graphics2D) g; + int width = chartPanel.getWidth(); + int height = chartPanel.getHeight(); + + // Clear the background + g2d.setColor(Color.WHITE); + g2d.fillRect(0, 0, width, height); + + synchronized (recentTransactions) { + if (recentTransactions.isEmpty()) return; + + // Find max amount for scaling + double maxAmount = recentTransactions.stream() + .mapToDouble(Transaction::getAmount) + .max() + .orElse(1.0); + + // Draw axes + g2d.setColor(Color.BLACK); + g2d.drawLine(50, height - 50, width - 50, height - 50); // X-axis + g2d.drawLine(50, 50, 50, height - 50); // Y-axis + + // Draw labels + g2d.drawString("Time →", width - 70, height - 20); + g2d.drawString("Amount", 10, 40); + g2d.drawString("$" + Math.round(maxAmount), 10, 60); + + // Draw transactions as points + int xStep = (width - 100) / Math.max(1, recentTransactions.size() - 1); + int x = 50; + + for (Transaction transaction : recentTransactions) { + int y = height - 50 - (int) ((transaction.getAmount() / maxAmount) * (height - 100)); + + // Color based on available limit percentage + double limitPercentage = transaction.getAvailableLimit() / + (transaction.getAmount() + transaction.getAvailableLimit()); + + if (limitPercentage < 0.3) { + g2d.setColor(Color.RED); + } else if (limitPercentage < 0.6) { + g2d.setColor(Color.ORANGE); + } else { + g2d.setColor(Color.GREEN); + } + + g2d.fillOval(x - 3, y - 3, 6, 6); + x += xStep; + } + } + } +} diff --git a/transaction-simulator/pom.xml b/transaction-simulator/pom.xml new file mode 100644 index 00000000..99123c8f --- /dev/null +++ b/transaction-simulator/pom.xml @@ -0,0 +1,66 @@ + + + 4.0.0 + + com.anomaly + transaction-simulator + 1.0-SNAPSHOT + + + 11 + 11 + + + + + + org.apache.kafka + kafka-clients + 2.8.1 + + + + com.google.code.gson + gson + 2.8.9 + + + + org.slf4j + slf4j-api + 1.7.32 + + + ch.qos.logback + logback-classic + 1.2.9 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + com.anomaly.producer.TransactionProducer + + + + + + + + + diff --git a/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java b/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java new file mode 100644 index 00000000..d9a52a07 --- /dev/null +++ b/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java @@ -0,0 +1,164 @@ +package com.anomaly.generator; + +import com.anomaly.model.Transaction; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; + +public class TransactionGenerator { + private static final int NUM_CARDS = 10000; + private static final int NUM_USERS = 5000; // Assuming each user has ~2 cards on average + private static final Map> cardLocations = new HashMap<>(); + private static final Map cardLimits = new HashMap<>(); + private static final Map cardToUser = new HashMap<>(); + private static final List cardIds = new ArrayList<>(); + private static final List userIds = new ArrayList<>(); + + // Anomaly types + private static final int ANOMALY_NONE = 0; + private static final int ANOMALY_AMOUNT = 1; + private static final int ANOMALY_LOCATION = 2; + private static final int ANOMALY_FREQUENCY = 3; + + // Probability of generating an anomaly (1%) + private static final double ANOMALY_PROBABILITY = 0.01; + + // Initialize card and user data + static { + // Generate user IDs + for (int i = 0; i < NUM_USERS; i++) { + userIds.add("USER_" + String.format("%05d", i)); + } + + // Generate card IDs and assign to users + for (int i = 0; i < NUM_CARDS; i++) { + String cardId = "CARD_" + String.format("%05d", i); + cardIds.add(cardId); + String userId = userIds.get(ThreadLocalRandom.current().nextInt(NUM_USERS)); + cardToUser.put(cardId, userId); + + // Initialize empty set of locations for this card + cardLocations.put(cardId, new HashSet<>()); + + // Assign random limit between $1,000 and $20,000 + cardLimits.put(cardId, 1000.0 + ThreadLocalRandom.current().nextDouble() * 19000.0); + } + } + + public Transaction generateTransaction(boolean forceAnomaly, int anomalyType) { + // Select a random card + String cardId = cardIds.get(ThreadLocalRandom.current().nextInt(NUM_CARDS)); + String userId = cardToUser.get(cardId); + double availableLimit = cardLimits.get(cardId); + + // Determine if this should be an anomaly + int actualAnomalyType = ANOMALY_NONE; + if (forceAnomaly) { + actualAnomalyType = (anomalyType >= 0 && anomalyType <= 3) ? + anomalyType : ThreadLocalRandom.current().nextInt(1, 4); + } else if (ThreadLocalRandom.current().nextDouble() < ANOMALY_PROBABILITY) { + actualAnomalyType = ThreadLocalRandom.current().nextInt(1, 4); + } + + // Get or generate location + Double[] location = getLocationForCard(cardId, actualAnomalyType == ANOMALY_LOCATION); + double latitude = location[0]; + double longitude = location[1]; + + // Generate transaction amount + double amount; + if (actualAnomalyType == ANOMALY_AMOUNT) { + // Generate anomalously high amount (50-90% of available limit) + amount = availableLimit * (0.5 + ThreadLocalRandom.current().nextDouble() * 0.4); + } else { + // Normal amount (1-10% of available limit) + amount = availableLimit * (0.01 + ThreadLocalRandom.current().nextDouble() * 0.09); + } + + // Update available limit + double newLimit = availableLimit - amount; + cardLimits.put(cardId, newLimit > 0 ? newLimit : 0); + + // Create transaction + Transaction transaction = new Transaction( + cardId, + userId, + latitude, + longitude, + amount, + newLimit, + Instant.now() + ); + + return transaction; + } + + private Double[] getLocationForCard(String cardId, boolean generateAnomaly) { + Set locations = cardLocations.get(cardId); + + if (locations.isEmpty() || generateAnomaly) { + // Generate a random worldwide location + double latitude = ThreadLocalRandom.current().nextDouble(-90, 90); + double longitude = ThreadLocalRandom.current().nextDouble(-180, 180); + Double[] newLocation = {latitude, longitude}; + + // Store this location for future use unless it's an anomaly + if (!generateAnomaly) { + locations.add(newLocation); + } + + return newLocation; + } else { + // Pick a random location from the card's history + Double[][] locArray = locations.toArray(new Double[0][]); + return locArray[ThreadLocalRandom.current().nextInt(locArray.length)]; + } + } + + // Method to simulate frequency anomaly by generating multiple transactions in short succession + public List generateFrequencyAnomaly(String specificCardId) { + List transactions = new ArrayList<>(); + String cardId = specificCardId != null ? + specificCardId : cardIds.get(ThreadLocalRandom.current().nextInt(NUM_CARDS)); + + // Generate 5-10 transactions in quick succession + int numTransactions = ThreadLocalRandom.current().nextInt(5, 11); + for (int i = 0; i < numTransactions; i++) { + transactions.add(generateTransactionForCard(cardId, false, ANOMALY_NONE)); + } + + return transactions; + } + + private Transaction generateTransactionForCard(String cardId, boolean forceAnomaly, int anomalyType) { + String userId = cardToUser.get(cardId); + double availableLimit = cardLimits.get(cardId); + + // Get location + Double[] location = getLocationForCard(cardId, forceAnomaly && anomalyType == ANOMALY_LOCATION); + double latitude = location[0]; + double longitude = location[1]; + + // Generate amount + double amount; + if (forceAnomaly && anomalyType == ANOMALY_AMOUNT) { + amount = availableLimit * (0.5 + ThreadLocalRandom.current().nextDouble() * 0.4); + } else { + amount = availableLimit * (0.01 + ThreadLocalRandom.current().nextDouble() * 0.09); + } + + // Update available limit + double newLimit = availableLimit - amount; + cardLimits.put(cardId, newLimit > 0 ? newLimit : 0); + + return new Transaction( + cardId, + userId, + latitude, + longitude, + amount, + newLimit, + Instant.now() + ); + } +} diff --git a/transaction-simulator/src/main/java/com/anomaly/model/Transaction.java b/transaction-simulator/src/main/java/com/anomaly/model/Transaction.java new file mode 100644 index 00000000..e69de29b diff --git a/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java b/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java new file mode 100644 index 00000000..ad67b774 --- /dev/null +++ b/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java @@ -0,0 +1,86 @@ +package com.anomaly.producer; + +import com.anomaly.generator.TransactionGenerator; +import com.anomaly.model.Transaction; +import com.google.gson.Gson; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; + +public class TransactionProducer { + private static final Logger logger = LoggerFactory.getLogger(TransactionProducer.class); + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + private static final String TOPIC_NAME = "transactions"; + private static final TransactionGenerator generator = new TransactionGenerator(); + private static final Gson gson = new Gson(); + + public static void main(String[] args) { + // Create producer properties + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + // Create producer + KafkaProducer producer = new KafkaProducer<>(properties); + + // Add shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + logger.info("Shutting down producer..."); + producer.flush(); + producer.close(); + logger.info("Producer closed"); + })); + + // Generate and send transactions + try { + while (true) { + // Normal transaction generation + if (ThreadLocalRandom.current().nextDouble() < 0.05) { + // 5% chance to generate a frequency anomaly + List anomalousTransactions = generator.generateFrequencyAnomaly(null); + for (Transaction transaction : anomalousTransactions) { + sendTransaction(producer, transaction); + } + } else { + // Regular transaction + boolean forceAnomaly = ThreadLocalRandom.current().nextDouble() < 0.02; // 2% chance + Transaction transaction = generator.generateTransaction(forceAnomaly, -1); + sendTransaction(producer, transaction); + } + + // Sleep between 100ms and 1s before generating next transaction + Thread.sleep(ThreadLocalRandom.current().nextLong(100, 1000)); + } + } catch (InterruptedException | ExecutionException e) { + logger.error("Error in transaction producer", e); + } finally { + producer.flush(); + producer.close(); + } + } + + private static void sendTransaction(KafkaProducer producer, Transaction transaction) + throws ExecutionException, InterruptedException { + String jsonTransaction = gson.toJson(transaction); + ProducerRecord record = new ProducerRecord<>( + TOPIC_NAME, + transaction.getCardId(), + jsonTransaction + ); + + producer.send(record, (metadata, exception) -> { + if (exception == null) { + logger.info("Received metadata: Topic: {}, Partition: {}, Offset: {}, Timestamp: {}", + metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp()); + } else { + logger.error("Error sending message", exception); + } + }).get(); // Making it synchronous for demonstration + } +}