From c6b4e8fd3a79811c5bd797ceae5e34d9920d3c01 Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Tue, 15 Apr 2025 17:08:40 +0200 Subject: [PATCH 01/12] chore: vibe coding psd project --- alarm-visualizer/pom.xml | 66 +++++ .../anomaly/visualizer/AlertVisualizer.java | 275 ++++++++++++++++++ anomaly-detector/pom.xml | 81 ++++++ .../com/anomaly/detector/AnomalyDetector.java | 269 +++++++++++++++++ .../com/anomaly/model/TransactionAlert.java | 66 +++++ docker-compose.yml | 0 kafka-consumer-visualizer/pom.xml | 66 +++++ .../anomaly/consumer/TransactionConsumer.java | 178 ++++++++++++ transaction-simulator/pom.xml | 66 +++++ .../generator/TransactionGenerator.java | 164 +++++++++++ .../java/com/anomaly/model/Transaction.java | 0 .../anomaly/producer/TransactionProducer.java | 86 ++++++ 12 files changed, 1317 insertions(+) create mode 100644 alarm-visualizer/pom.xml create mode 100644 alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java create mode 100644 anomaly-detector/pom.xml create mode 100644 anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java create mode 100644 anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java create mode 100644 docker-compose.yml create mode 100644 kafka-consumer-visualizer/pom.xml create mode 100644 kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java create mode 100644 transaction-simulator/pom.xml create mode 100644 transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java create mode 100644 transaction-simulator/src/main/java/com/anomaly/model/Transaction.java create mode 100644 transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java diff --git a/alarm-visualizer/pom.xml b/alarm-visualizer/pom.xml new file mode 100644 index 00000000..85650d32 --- /dev/null +++ b/alarm-visualizer/pom.xml @@ -0,0 +1,66 @@ + + + 4.0.0 + + com.anomaly + alarm-visualizer + 1.0-SNAPSHOT + + + 11 + 11 + + + + + + org.apache.kafka + kafka-clients + 2.8.1 + + + + com.google.code.gson + gson + 2.8.9 + + + + org.slf4j + slf4j-api + 1.7.32 + + + ch.qos.logback + logback-classic + 1.2.9 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + com.anomaly.visualizer.AlertVisualizer + + + + + + + + + diff --git a/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java new file mode 100644 index 00000000..9a81205c --- /dev/null +++ b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java @@ -0,0 +1,275 @@ +package com.anomaly.visualizer; + +import com.anomaly.model.TransactionAlert; +import com.google.gson.Gson; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.swing.*; +import javax.swing.table.DefaultTableModel; +import java.awt.*; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.List; + +public class AlertVisualizer { + private static final Logger logger = LoggerFactory.getLogger(AlertVisualizer.class); + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + private static final String GROUP_ID = "alert-visualizer-group"; + private static final String TOPIC = "alerts"; + private static final Gson gson = new Gson(); + + // UI Components + private static JFrame frame; + private static JTable alertTable; + private static DefaultTableModel tableModel; + private static JTextArea detailArea; + private static final List allAlerts = new ArrayList<>(); + private static final DateTimeFormatter formatter = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()); + + public static void main(String[] args) { + setupUI(); + + // Create consumer properties + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + // Create consumer + KafkaConsumer consumer = new KafkaConsumer<>(properties); + + // Subscribe to topic + consumer.subscribe(Collections.singletonList(TOPIC)); + + // Add shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + logger.info("Shutting down alert visualizer..."); + consumer.close(); + logger.info("Alert visualizer closed"); + })); + + // Poll for new alerts + try { + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + boolean newAlerts = false; + for (ConsumerRecord record : records) { + // Parse the alert + TransactionAlert alert = gson.fromJson(record.value(), TransactionAlert.class); + addAlert(alert); + newAlerts = true; + + // Display notification for new alert + displayNotification(alert); + } + + // Update the UI if new alerts arrived + if (newAlerts) { + updateAlertTable(); + } + } + } finally { + consumer.close(); + } + } + + private static void setupUI() { + frame = new JFrame("Transaction Alert Visualizer"); + frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); + frame.setSize(1024, 768); + + // Create table with columns + String[] columnNames = {"Time", "Alert Type", "Card ID", "User ID", "Amount", "Message"}; + tableModel = new DefaultTableModel(columnNames, 0); + alertTable = new JTable(tableModel); + alertTable.setSelectionMode(ListSelectionModel.SINGLE_SELECTION); + + // Add selection listener to show details when an alert is selected + alertTable.getSelectionModel().addListSelectionListener(e -> { + if (!e.getValueIsAdjusting()) { + int selectedRow = alertTable.getSelectedRow(); + if (selectedRow >= 0 && selectedRow < allAlerts.size()) { + showAlertDetails(allAlerts.get(selectedRow)); + } + } + }); + + JScrollPane tableScrollPane = new JScrollPane(alertTable); + + // Create detail panel + detailArea = new JTextArea(); + detailArea.setEditable(false); + JScrollPane detailScrollPane = new JScrollPane(detailArea); + + // Create map visualization panel + JPanel mapPanel = new JPanel() { + @Override + protected void paintComponent(Graphics g) { + super.paintComponent(g); + drawMap(g); + } + }; + + // Create split panes for layout + JSplitPane mainSplitPane = new JSplitPane(JSplitPane.VERTICAL_SPLIT, + tableScrollPane, new JSplitPane(JSplitPane.HORIZONTAL_SPLIT, detailScrollPane, mapPanel)); + mainSplitPane.setDividerLocation(300); + ((JSplitPane)mainSplitPane.getBottomComponent()).setDividerLocation(500); + + frame.add(mainSplitPane); + frame.setVisible(true); + } + + private static void addAlert(TransactionAlert alert) { + synchronized (allAlerts) { + allAlerts.add(0, alert); // Add at the beginning for newest first + } + } + + private static void updateAlertTable() { + SwingUtilities.invokeLater(() -> { + tableModel.setRowCount(0); // Clear table + + synchronized (allAlerts) { + for (TransactionAlert alert : allAlerts) { + String formattedTime = formatter.format(alert.getAlertTime()); + tableModel.addRow(new Object[]{ + formattedTime, + alert.getAlertType(), + alert.getCardId(), + alert.getUserId(), + String.format("$%.2f", alert.getAmount()), + alert.getMessage() + }); + } + } + }); + } + + private static void showAlertDetails(TransactionAlert alert) { + SwingUtilities.invokeLater(() -> { + StringBuilder sb = new StringBuilder(); + sb.append("ALERT DETAILS\n"); + sb.append("============================================\n\n"); + sb.append("Alert Type: ").append(alert.getAlertType()).append("\n"); + sb.append("Alert Time: ").append(formatter.format(alert.getAlertTime())).append("\n"); + sb.append("Transaction Time: ").append(formatter.format(alert.getTimestamp())).append("\n\n"); + sb.append("Card ID: ").append(alert.getCardId()).append("\n"); + sb.append("User ID: ").append(alert.getUserId()).append("\n"); + sb.append("Transaction Amount: $").append(String.format("%.2f", alert.getAmount())).append("\n\n"); + sb.append("Location: ").append(alert.getLatitude()).append(", ").append(alert.getLongitude()).append("\n\n"); + sb.append("Alert Message: ").append(alert.getMessage()).append("\n"); + + detailArea.setText(sb.toString()); + frame.repaint(); // Trigger map redraw to highlight selected alert + }); + } + + private static void drawMap(Graphics g) { + Graphics2D g2d = (Graphics2D) g; + int width = g2d.getClipBounds().width; + int height = g2d.getClipBounds().height; + + // Draw world map (simplified) + g2d.setColor(Color.LIGHT_GRAY); + g2d.fillRect(0, 0, width, height); + g2d.setColor(Color.DARK_GRAY); + g2d.drawRect(0, 0, width - 1, height - 1); + + // Draw grid lines + g2d.setColor(Color.GRAY); + for (int i = 0; i < width; i += 50) { + g2d.drawLine(i, 0, i, height); + } + for (int i = 0; i < height; i += 50) { + g2d.drawLine(0, i, width, i); + } + + // Get selected alert + int selectedRow = alertTable.getSelectedRow(); + TransactionAlert selectedAlert = null; + if (selectedRow >= 0 && selectedRow < allAlerts.size()) { + selectedAlert = allAlerts.get(selectedRow); + } + + // Draw alert points + synchronized (allAlerts) { + for (TransactionAlert alert : allAlerts) { + // Map lat/lon to screen coordinates + int x = (int) ((alert.getLongitude() + 180) / 360 * width); + int y = (int) ((90 - alert.getLatitude()) / 180 * height); + + // Color by alert type + if (alert.getAlertType().equals("AMOUNT_ANOMALY")) { + g2d.setColor(Color.RED); + } else if (alert.getAlertType().equals("LOCATION_ANOMALY")) { + g2d.setColor(Color.BLUE); + } else { + g2d.setColor(Color.ORANGE); + } + + // Make selected alert larger + if (alert == selectedAlert) { + g2d.fillOval(x - 8, y - 8, 16, 16); + } else { + g2d.fillOval(x - 4, y - 4, 8, 8); + } + } + } + + // Draw legend + g2d.setColor(Color.BLACK); + g2d.drawString("Legend:", width - 150, 20); + g2d.setColor(Color.RED); + g2d.fillOval(width - 140, 30, 10, 10); + g2d.setColor(Color.BLACK); + g2d.drawString("Amount Anomaly", width - 125, 40); + g2d.setColor(Color.BLUE); + g2d.fillOval(width - 140, 50, 10, 10); + g2d.setColor(Color.BLACK); + g2d.drawString("Location Anomaly", width - 125, 60); + g2d.setColor(Color.ORANGE); + g2d.fillOval(width - 140, 70, 10, 10); + g2d.setColor(Color.BLACK); + g2d.drawString("Frequency Anomaly", width - 125, 80); + } + + private static void displayNotification(TransactionAlert alert) { + SwingUtilities.invokeLater(() -> { + // Make a simple notification window + JDialog dialog = new JDialog(frame, "New Alert!", false); + dialog.setSize(400, 150); + dialog.setLocationRelativeTo(frame); + + JPanel panel = new JPanel(new BorderLayout()); + JLabel label = new JLabel("" + alert.getAlertType() + "
" + + alert.getMessage() + "
Card: " + alert.getCardId() + ""); + label.setBorder(BorderFactory.createEmptyBorder(10, 10, 10, 10)); + panel.add(label, BorderLayout.CENTER); + + JButton closeButton = new JButton("Dismiss"); + closeButton.addActionListener(e -> dialog.dispose()); + JPanel buttonPanel = new JPanel(); + buttonPanel.add(closeButton); + panel.add(buttonPanel, BorderLayout.SOUTH); + + dialog.add(panel); + dialog.setVisible(true); + + // Auto-dismiss after 5 seconds + Timer timer = new Timer(5000, e -> dialog.dispose()); + timer.setRepeats(false); + timer.start(); + }); + } +} diff --git a/anomaly-detector/pom.xml b/anomaly-detector/pom.xml new file mode 100644 index 00000000..1a18c827 --- /dev/null +++ b/anomaly-detector/pom.xml @@ -0,0 +1,81 @@ + + + 4.0.0 + + com.anomaly + anomaly-detector + 1.0-SNAPSHOT + + + 11 + 11 + 1.14.2 + 2.12 + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + + + + org.apache.flink + flink-connector-kafka_${scala.binary.version} + ${flink.version} + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${flink.version} + + + + com.google.code.gson + gson + 2.8.9 + + + + org.slf4j + slf4j-api + 1.7.32 + + + ch.qos.logback + logback-classic + 1.2.9 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + com.anomaly.detector.AnomalyDetector + + + + + + + + + + diff --git a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java new file mode 100644 index 00000000..b7ebe65a --- /dev/null +++ b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java @@ -0,0 +1,269 @@ +package com.anomaly.detector; + +import com.anomaly.model.Transaction; +import com.anomaly.model.TransactionAlert; +import com.google.gson.Gson; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.util.Collector; + +import java.util.*; +import java.time.Instant; + +public class AnomalyDetector { + + private static final String INPUT_TOPIC = "transactions"; + private static final String OUTPUT_TOPIC = "alerts"; + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + private static final Gson gson = new Gson(); + + public static void main(String[] args) throws Exception { + // Set up the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Configure Kafka consumer + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS); + properties.setProperty("group.id", "anomaly-detector"); + + // Create Kafka consumer + FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>( + INPUT_TOPIC, + new SimpleStringSchema(), + properties + ); + + // Parse JSON transactions + DataStream transactionStream = env + .addSource(consumer) + .map(new MapFunction() { + @Override + public Transaction map(String value) throws Exception { + return gson.fromJson(value, Transaction.class); + } + }); + + // Detect anomalies based on different metrics + // 1. Amount anomaly - sudden high-value transactions + DataStream amountAlerts = transactionStream + .keyBy(Transaction::getCardId) + .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(1))) + .process(new AmountAnomalyDetector()); + + // 2. Location anomaly - sudden change in location + DataStream locationAlerts = transactionStream + .keyBy(Transaction::getCardId) + .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(1))) + .process(new LocationAnomalyDetector()); + + // 3. Frequency anomaly - unusual number of transactions in short time + DataStream frequencyAlerts = transactionStream + .keyBy(Transaction::getCardId) + .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1))) + .process(new FrequencyAnomalyDetector()); + + // Union all alert streams + DataStream allAlerts = amountAlerts + .union(locationAlerts, frequencyAlerts); + + // Convert alerts to JSON and send to Kafka + allAlerts + .map(alert -> gson.toJson(alert)) + .addSink(new FlinkKafkaProducer<>( + OUTPUT_TOPIC, + new SimpleStringSchema(), + properties + )); + + // Execute the Flink job + env.execute("Credit Card Transaction Anomaly Detection"); + } + + // Detector for unusual transaction amounts + public static class AmountAnomalyDetector + extends ProcessWindowFunction { + + @Override + public void process(String cardId, Context context, Iterable transactions, + Collector out) { + List transactionList = new ArrayList<>(); + transactions.forEach(transactionList::add); + + if (transactionList.isEmpty()) return; + + // Calculate statistics + double averageAmount = transactionList.stream() + .mapToDouble(Transaction::getAmount) + .average() + .orElse(0); + + double stdDeviation = calculateStdDeviation(transactionList, averageAmount); + + // Check for anomalies (transactions that are more than 3 standard deviations from mean) + for (Transaction transaction : transactionList) { + if (stdDeviation > 0 && Math.abs(transaction.getAmount() - averageAmount) > 3 * stdDeviation) { + out.collect(new TransactionAlert( + "AMOUNT_ANOMALY", + transaction.getCardId(), + transaction.getUserId(), + transaction.getAmount(), + transaction.getLatitude(), + transaction.getLongitude(), + transaction.getTimestamp(), + "Unusual transaction amount detected: $" + transaction.getAmount() + + " (Average: $" + String.format("%.2f", averageAmount) + ")" + )); + } + } + } + + private double calculateStdDeviation(List transactions, double mean) { + return Math.sqrt(transactions.stream() + .mapToDouble(t -> Math.pow(t.getAmount() - mean, 2)) + .average() + .orElse(0)); + } + } + + // Detector for unusual transaction locations + public static class LocationAnomalyDetector + extends ProcessWindowFunction { + + // Map to store frequent locations for each card + private final Map> cardLocations = new HashMap<>(); + + @Override + public void process(String cardId, Context context, Iterable transactions, + Collector out) { + List transactionList = new ArrayList<>(); + transactions.forEach(transactionList::add); + + if (transactionList.isEmpty()) return; + + // Get or create location set for this card + Set frequentLocations = cardLocations.computeIfAbsent(cardId, k -> new HashSet<>()); + + // Process each transaction + for (Transaction transaction : transactionList) { + LocationPoint currentPoint = new LocationPoint(transaction.getLatitude(), transaction.getLongitude()); + + // If we have at least 3 frequent locations for this card + if (frequentLocations.size() >= 3) { + boolean isNearKnownLocation = false; + + // Check if current location is near any known frequent location + for (LocationPoint knownPoint : frequentLocations) { + if (calculateDistance(currentPoint, knownPoint) < 50) { // Less than 50km + isNearKnownLocation = true; + break; + } + } + + // If not near any known location, it might be an anomaly + if (!isNearKnownLocation) { + out.collect(new TransactionAlert( + "LOCATION_ANOMALY", + transaction.getCardId(), + transaction.getUserId(), + transaction.getAmount(), + transaction.getLatitude(), + transaction.getLongitude(), + transaction.getTimestamp(), + "Unusual transaction location detected at: " + + transaction.getLatitude() + ", " + transaction.getLongitude() + )); + } + } + + // Add current location to frequent locations (max 10 locations per card) + if (frequentLocations.size() < 10) { + frequentLocations.add(currentPoint); + } + } + } + + // Calculate distance between two points using Haversine formula (in km) + private double calculateDistance(LocationPoint p1, LocationPoint p2) { + final int R = 6371; // Earth radius in km + + double latDistance = Math.toRadians(p2.latitude - p1.latitude); + double lonDistance = Math.toRadians(p2.longitude - p1.longitude); + + double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2) + + Math.cos(Math.toRadians(p1.latitude)) * Math.cos(Math.toRadians(p2.latitude)) + * Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2); + + double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); + + return R * c; + } + + private static class LocationPoint { + private final double latitude; + private final double longitude; + + public LocationPoint(double latitude, double longitude) { + this.latitude = latitude; + this.longitude = longitude; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LocationPoint that = (LocationPoint) o; + return Double.compare(that.latitude, latitude) == 0 && + Double.compare(that.longitude, longitude) == 0; + } + + @Override + public int hashCode() { + return Objects.hash(latitude, longitude); + } + } + } + + // Detector for unusual transaction frequency + public static class FrequencyAnomalyDetector + extends ProcessWindowFunction { + + @Override + public void process(String cardId, Context context, Iterable transactions, + Collector out) { + List transactionList = new ArrayList<>(); + transactions.forEach(transactionList::add); + + // Get window info + long windowStart = context.window().getStart(); + long windowEnd = context.window().getEnd(); + long windowSizeMinutes = (windowEnd - windowStart) / (1000 * 60); + + // If there are more than 5 transactions in 5 minutes for the same card, flag it + if (transactionList.size() > 5) { + Transaction latestTransaction = transactionList.stream() + .max(Comparator.comparing(Transaction::getTimestamp)) + .orElse(transactionList.get(0)); + + out.collect(new TransactionAlert( + "FREQUENCY_ANOMALY", + latestTransaction.getCardId(), + latestTransaction.getUserId(), + latestTransaction.getAmount(), + latestTransaction.getLatitude(), + latestTransaction.getLongitude(), + latestTransaction.getTimestamp(), + "Unusual transaction frequency detected: " + transactionList.size() + + " transactions in " + windowSizeMinutes + " minutes" + )); + } + } + } +} diff --git a/anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java b/anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java new file mode 100644 index 00000000..89fddf4e --- /dev/null +++ b/anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java @@ -0,0 +1,66 @@ +package com.anomaly.model; + +import java.time.Instant; + +public class TransactionAlert { + private String alertType; + private String cardId; + private String userId; + private double amount; + private double latitude; + private double longitude; + private Instant timestamp; + private String message; + private Instant alertTime; + + public TransactionAlert() { + this.alertTime = Instant.now(); + } + + public TransactionAlert(String alertType, String cardId, String userId, double amount, + double latitude, double longitude, Instant timestamp, String message) { + this.alertType = alertType; + this.cardId = cardId; + this.userId = userId; + this.amount = amount; + this.latitude = latitude; + this.longitude = longitude; + this.timestamp = timestamp; + this.message = message; + this.alertTime = Instant.now(); + } + + // Getters and setters + public String getAlertType() { return alertType; } + public void setAlertType(String alertType) { this.alertType = alertType; } + public String getCardId() { return cardId; } + public void setCardId(String cardId) { this.cardId = cardId; } + public String getUserId() { return userId; } + public void setUserId(String userId) { this.userId = userId; } + public double getAmount() { return amount; } + public void setAmount(double amount) { this.amount = amount; } + public double getLatitude() { return latitude; } + public void setLatitude(double latitude) { this.latitude = latitude; } + public double getLongitude() { return longitude; } + public void setLongitude(double longitude) { this.longitude = longitude; } + public Instant getTimestamp() { return timestamp; } + public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; } + public String getMessage() { return message; } + public void setMessage(String message) { this.message = message; } + public Instant getAlertTime() { return alertTime; } + public void setAlertTime(Instant alertTime) { this.alertTime = alertTime; } + + @Override + public String toString() { + return "TransactionAlert{" + + "alertType='" + alertType + '\'' + + ", cardId='" + cardId + '\'' + + ", userId='" + userId + '\'' + + ", amount=" + amount + + ", latitude=" + latitude + + ", longitude=" + longitude + + ", message='" + message + '\'' + + ", alertTime=" + alertTime + + '}'; + } +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..e69de29b diff --git a/kafka-consumer-visualizer/pom.xml b/kafka-consumer-visualizer/pom.xml new file mode 100644 index 00000000..fba8f6d7 --- /dev/null +++ b/kafka-consumer-visualizer/pom.xml @@ -0,0 +1,66 @@ + + + 4.0.0 + + com.anomaly + kafka-consumer-visualizer + 1.0-SNAPSHOT + + + 11 + 11 + + + + + + org.apache.kafka + kafka-clients + 2.8.1 + + + + com.google.code.gson + gson + 2.8.9 + + + + org.slf4j + slf4j-api + 1.7.32 + + + ch.qos.logback + logback-classic + 1.2.9 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + com.anomaly.consumer.TransactionConsumer + + + + + + + + + diff --git a/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java b/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java new file mode 100644 index 00000000..493d9ead --- /dev/null +++ b/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java @@ -0,0 +1,178 @@ +package com.anomaly.consumer; + +import com.anomaly.model.Transaction; +import com.google.gson.Gson; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.swing.*; +import java.awt.*; +import java.time.Duration; +import java.util.*; +import java.util.List; + +public class TransactionConsumer { + private static final Logger logger = LoggerFactory.getLogger(TransactionConsumer.class); + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + private static final String GROUP_ID = "transaction-consumer-group"; + private static final String TOPIC = "transactions"; + private static final Gson gson = new Gson(); + + // UI Components + private static JFrame frame; + private static JTextArea logArea; + private static JPanel chartPanel; + private static final int MAX_DISPLAYED_TRANSACTIONS = 100; + private static final List recentTransactions = new ArrayList<>(); + + public static void main(String[] args) { + setupUI(); + + // Create consumer properties + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + // Create consumer + KafkaConsumer consumer = new KafkaConsumer<>(properties); + + // Subscribe to topic + consumer.subscribe(Collections.singletonList(TOPIC)); + + // Add shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + logger.info("Shutting down consumer..."); + consumer.close(); + logger.info("Consumer closed"); + })); + + // Poll for new data + try { + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + for (ConsumerRecord record : records) { + logMessage("Received: Key: " + record.key() + ", Value: " + record.value()); + + // Parse the transaction + Transaction transaction = gson.fromJson(record.value(), Transaction.class); + addTransaction(transaction); + } + + // Update the visualization + if (!records.isEmpty()) { + updateChart(); + } + } + } finally { + consumer.close(); + } + } + + private static void setupUI() { + frame = new JFrame("Transaction Visualizer"); + frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); + frame.setSize(800, 600); + + // Create log area + logArea = new JTextArea(); + logArea.setEditable(false); + JScrollPane scrollPane = new JScrollPane(logArea); + scrollPane.setPreferredSize(new Dimension(800, 200)); + + // Create chart panel + chartPanel = new JPanel() { + @Override + protected void paintComponent(Graphics g) { + super.paintComponent(g); + drawChart(g); + } + }; + chartPanel.setPreferredSize(new Dimension(800, 400)); + + // Add components to frame + frame.setLayout(new BorderLayout()); + frame.add(scrollPane, BorderLayout.SOUTH); + frame.add(chartPanel, BorderLayout.CENTER); + + frame.setVisible(true); + } + + private static void logMessage(String message) { + SwingUtilities.invokeLater(() -> { + logArea.append(message + "\n"); + logArea.setCaretPosition(logArea.getDocument().getLength()); + }); + } + + private static void addTransaction(Transaction transaction) { + synchronized (recentTransactions) { + recentTransactions.add(transaction); + if (recentTransactions.size() > MAX_DISPLAYED_TRANSACTIONS) { + recentTransactions.remove(0); + } + } + } + + private static void updateChart() { + SwingUtilities.invokeLater(() -> chartPanel.repaint()); + } + + private static void drawChart(Graphics g) { + Graphics2D g2d = (Graphics2D) g; + int width = chartPanel.getWidth(); + int height = chartPanel.getHeight(); + + // Clear the background + g2d.setColor(Color.WHITE); + g2d.fillRect(0, 0, width, height); + + synchronized (recentTransactions) { + if (recentTransactions.isEmpty()) return; + + // Find max amount for scaling + double maxAmount = recentTransactions.stream() + .mapToDouble(Transaction::getAmount) + .max() + .orElse(1.0); + + // Draw axes + g2d.setColor(Color.BLACK); + g2d.drawLine(50, height - 50, width - 50, height - 50); // X-axis + g2d.drawLine(50, 50, 50, height - 50); // Y-axis + + // Draw labels + g2d.drawString("Time →", width - 70, height - 20); + g2d.drawString("Amount", 10, 40); + g2d.drawString("$" + Math.round(maxAmount), 10, 60); + + // Draw transactions as points + int xStep = (width - 100) / Math.max(1, recentTransactions.size() - 1); + int x = 50; + + for (Transaction transaction : recentTransactions) { + int y = height - 50 - (int) ((transaction.getAmount() / maxAmount) * (height - 100)); + + // Color based on available limit percentage + double limitPercentage = transaction.getAvailableLimit() / + (transaction.getAmount() + transaction.getAvailableLimit()); + + if (limitPercentage < 0.3) { + g2d.setColor(Color.RED); + } else if (limitPercentage < 0.6) { + g2d.setColor(Color.ORANGE); + } else { + g2d.setColor(Color.GREEN); + } + + g2d.fillOval(x - 3, y - 3, 6, 6); + x += xStep; + } + } + } +} diff --git a/transaction-simulator/pom.xml b/transaction-simulator/pom.xml new file mode 100644 index 00000000..99123c8f --- /dev/null +++ b/transaction-simulator/pom.xml @@ -0,0 +1,66 @@ + + + 4.0.0 + + com.anomaly + transaction-simulator + 1.0-SNAPSHOT + + + 11 + 11 + + + + + + org.apache.kafka + kafka-clients + 2.8.1 + + + + com.google.code.gson + gson + 2.8.9 + + + + org.slf4j + slf4j-api + 1.7.32 + + + ch.qos.logback + logback-classic + 1.2.9 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + com.anomaly.producer.TransactionProducer + + + + + + + + + diff --git a/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java b/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java new file mode 100644 index 00000000..d9a52a07 --- /dev/null +++ b/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java @@ -0,0 +1,164 @@ +package com.anomaly.generator; + +import com.anomaly.model.Transaction; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; + +public class TransactionGenerator { + private static final int NUM_CARDS = 10000; + private static final int NUM_USERS = 5000; // Assuming each user has ~2 cards on average + private static final Map> cardLocations = new HashMap<>(); + private static final Map cardLimits = new HashMap<>(); + private static final Map cardToUser = new HashMap<>(); + private static final List cardIds = new ArrayList<>(); + private static final List userIds = new ArrayList<>(); + + // Anomaly types + private static final int ANOMALY_NONE = 0; + private static final int ANOMALY_AMOUNT = 1; + private static final int ANOMALY_LOCATION = 2; + private static final int ANOMALY_FREQUENCY = 3; + + // Probability of generating an anomaly (1%) + private static final double ANOMALY_PROBABILITY = 0.01; + + // Initialize card and user data + static { + // Generate user IDs + for (int i = 0; i < NUM_USERS; i++) { + userIds.add("USER_" + String.format("%05d", i)); + } + + // Generate card IDs and assign to users + for (int i = 0; i < NUM_CARDS; i++) { + String cardId = "CARD_" + String.format("%05d", i); + cardIds.add(cardId); + String userId = userIds.get(ThreadLocalRandom.current().nextInt(NUM_USERS)); + cardToUser.put(cardId, userId); + + // Initialize empty set of locations for this card + cardLocations.put(cardId, new HashSet<>()); + + // Assign random limit between $1,000 and $20,000 + cardLimits.put(cardId, 1000.0 + ThreadLocalRandom.current().nextDouble() * 19000.0); + } + } + + public Transaction generateTransaction(boolean forceAnomaly, int anomalyType) { + // Select a random card + String cardId = cardIds.get(ThreadLocalRandom.current().nextInt(NUM_CARDS)); + String userId = cardToUser.get(cardId); + double availableLimit = cardLimits.get(cardId); + + // Determine if this should be an anomaly + int actualAnomalyType = ANOMALY_NONE; + if (forceAnomaly) { + actualAnomalyType = (anomalyType >= 0 && anomalyType <= 3) ? + anomalyType : ThreadLocalRandom.current().nextInt(1, 4); + } else if (ThreadLocalRandom.current().nextDouble() < ANOMALY_PROBABILITY) { + actualAnomalyType = ThreadLocalRandom.current().nextInt(1, 4); + } + + // Get or generate location + Double[] location = getLocationForCard(cardId, actualAnomalyType == ANOMALY_LOCATION); + double latitude = location[0]; + double longitude = location[1]; + + // Generate transaction amount + double amount; + if (actualAnomalyType == ANOMALY_AMOUNT) { + // Generate anomalously high amount (50-90% of available limit) + amount = availableLimit * (0.5 + ThreadLocalRandom.current().nextDouble() * 0.4); + } else { + // Normal amount (1-10% of available limit) + amount = availableLimit * (0.01 + ThreadLocalRandom.current().nextDouble() * 0.09); + } + + // Update available limit + double newLimit = availableLimit - amount; + cardLimits.put(cardId, newLimit > 0 ? newLimit : 0); + + // Create transaction + Transaction transaction = new Transaction( + cardId, + userId, + latitude, + longitude, + amount, + newLimit, + Instant.now() + ); + + return transaction; + } + + private Double[] getLocationForCard(String cardId, boolean generateAnomaly) { + Set locations = cardLocations.get(cardId); + + if (locations.isEmpty() || generateAnomaly) { + // Generate a random worldwide location + double latitude = ThreadLocalRandom.current().nextDouble(-90, 90); + double longitude = ThreadLocalRandom.current().nextDouble(-180, 180); + Double[] newLocation = {latitude, longitude}; + + // Store this location for future use unless it's an anomaly + if (!generateAnomaly) { + locations.add(newLocation); + } + + return newLocation; + } else { + // Pick a random location from the card's history + Double[][] locArray = locations.toArray(new Double[0][]); + return locArray[ThreadLocalRandom.current().nextInt(locArray.length)]; + } + } + + // Method to simulate frequency anomaly by generating multiple transactions in short succession + public List generateFrequencyAnomaly(String specificCardId) { + List transactions = new ArrayList<>(); + String cardId = specificCardId != null ? + specificCardId : cardIds.get(ThreadLocalRandom.current().nextInt(NUM_CARDS)); + + // Generate 5-10 transactions in quick succession + int numTransactions = ThreadLocalRandom.current().nextInt(5, 11); + for (int i = 0; i < numTransactions; i++) { + transactions.add(generateTransactionForCard(cardId, false, ANOMALY_NONE)); + } + + return transactions; + } + + private Transaction generateTransactionForCard(String cardId, boolean forceAnomaly, int anomalyType) { + String userId = cardToUser.get(cardId); + double availableLimit = cardLimits.get(cardId); + + // Get location + Double[] location = getLocationForCard(cardId, forceAnomaly && anomalyType == ANOMALY_LOCATION); + double latitude = location[0]; + double longitude = location[1]; + + // Generate amount + double amount; + if (forceAnomaly && anomalyType == ANOMALY_AMOUNT) { + amount = availableLimit * (0.5 + ThreadLocalRandom.current().nextDouble() * 0.4); + } else { + amount = availableLimit * (0.01 + ThreadLocalRandom.current().nextDouble() * 0.09); + } + + // Update available limit + double newLimit = availableLimit - amount; + cardLimits.put(cardId, newLimit > 0 ? newLimit : 0); + + return new Transaction( + cardId, + userId, + latitude, + longitude, + amount, + newLimit, + Instant.now() + ); + } +} diff --git a/transaction-simulator/src/main/java/com/anomaly/model/Transaction.java b/transaction-simulator/src/main/java/com/anomaly/model/Transaction.java new file mode 100644 index 00000000..e69de29b diff --git a/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java b/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java new file mode 100644 index 00000000..ad67b774 --- /dev/null +++ b/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java @@ -0,0 +1,86 @@ +package com.anomaly.producer; + +import com.anomaly.generator.TransactionGenerator; +import com.anomaly.model.Transaction; +import com.google.gson.Gson; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; + +public class TransactionProducer { + private static final Logger logger = LoggerFactory.getLogger(TransactionProducer.class); + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + private static final String TOPIC_NAME = "transactions"; + private static final TransactionGenerator generator = new TransactionGenerator(); + private static final Gson gson = new Gson(); + + public static void main(String[] args) { + // Create producer properties + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + // Create producer + KafkaProducer producer = new KafkaProducer<>(properties); + + // Add shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + logger.info("Shutting down producer..."); + producer.flush(); + producer.close(); + logger.info("Producer closed"); + })); + + // Generate and send transactions + try { + while (true) { + // Normal transaction generation + if (ThreadLocalRandom.current().nextDouble() < 0.05) { + // 5% chance to generate a frequency anomaly + List anomalousTransactions = generator.generateFrequencyAnomaly(null); + for (Transaction transaction : anomalousTransactions) { + sendTransaction(producer, transaction); + } + } else { + // Regular transaction + boolean forceAnomaly = ThreadLocalRandom.current().nextDouble() < 0.02; // 2% chance + Transaction transaction = generator.generateTransaction(forceAnomaly, -1); + sendTransaction(producer, transaction); + } + + // Sleep between 100ms and 1s before generating next transaction + Thread.sleep(ThreadLocalRandom.current().nextLong(100, 1000)); + } + } catch (InterruptedException | ExecutionException e) { + logger.error("Error in transaction producer", e); + } finally { + producer.flush(); + producer.close(); + } + } + + private static void sendTransaction(KafkaProducer producer, Transaction transaction) + throws ExecutionException, InterruptedException { + String jsonTransaction = gson.toJson(transaction); + ProducerRecord record = new ProducerRecord<>( + TOPIC_NAME, + transaction.getCardId(), + jsonTransaction + ); + + producer.send(record, (metadata, exception) -> { + if (exception == null) { + logger.info("Received metadata: Topic: {}, Partition: {}, Offset: {}, Timestamp: {}", + metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp()); + } else { + logger.error("Error sending message", exception); + } + }).get(); // Making it synchronous for demonstration + } +} From 3d03d755f6728562752a22eb6c0a16870eae71ad Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Tue, 15 Apr 2025 17:22:50 +0200 Subject: [PATCH 02/12] chore: vibe debuging untill it started working --- .gitignore | 129 ++++++++++ .../com/anomaly/model/TransactionAlert.java | 106 ++++++++ .../anomaly/visualizer/AlertVisualizer.java | 2 +- .../java/com/anomaly/model/Transaction.java | 90 +++++++ .../com/anomaly/model/TransactionAlert.java | 94 +++++-- .../java/com/anomaly/model/Transaction.java | 65 +++++ run_anomaly_detection.sh | 233 ++++++++++++++++++ .../java/com/anomaly/model/Transaction.java | 85 +++++++ 8 files changed, 779 insertions(+), 25 deletions(-) create mode 100644 .gitignore create mode 100644 alarm-visualizer/src/main/java/com/anomaly/model/TransactionAlert.java create mode 100644 anomaly-detector/src/main/java/com/anomaly/model/Transaction.java create mode 100644 kafka-consumer-visualizer/src/main/java/com/anomaly/model/Transaction.java create mode 100755 run_anomaly_detection.sh diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..749d0d2a --- /dev/null +++ b/.gitignore @@ -0,0 +1,129 @@ +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +# https://github.com/takari/maven-wrapper#usage-without-binary-jar +.mvn/wrapper/maven-wrapper.jar + +# Eclipse m2e generated files +# Eclipse Core +.project +# JDT-specific (Eclipse Java Development Tools) +.classpath +# Docker project generated files to ignore +# if you want to ignore files created by your editor/tools, +# please consider a global .gitignore https://help.github.com/articles/ignoring-files +.vagrant* +bin +docker/docker +.*.swp +a.out +*.orig +build_src +.flymake* +.idea +.DS_Store +docs/_build +docs/_static +docs/_templates +.gopath/ +.dotcloud +*.test +bundles/ +.hg/ +.git/ +vendor/pkg/ +pyenv +Vagrantfile +dist +*classes +*.class +target/ +build/ +build_eclipse/ +out/ +.gradle/ +.vscode/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ +patch-process/* +.idea +.svn +.classpath +/.metadata +/.recommenders +*~ +*# +.#* +rat.out +TAGS +*.iml +.project +.settings +*.ipr +*.iws +.vagrant +Vagrantfile.local +/logs +.DS_Store + +config/server-* +config/zookeeper-* +gradle/wrapper/*.jar +gradlew.bat + +results +tests/results +.ducktape +tests/.ducktape +tests/venv +.cache + +docs/generated/ + +.release-settings.json + +kafkatest.egg-info/ +systest/ +*.swp +jmh-benchmarks/generated +jmh-benchmarks/src/main/generated +**/.jqwik-database +**/src/generated +**/src/generated-test +storage/kafka-tiered-storage/ + +docker/test/report_*.html +kafka.Kafka +__pycache__ +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* +replay_pid* \ No newline at end of file diff --git a/alarm-visualizer/src/main/java/com/anomaly/model/TransactionAlert.java b/alarm-visualizer/src/main/java/com/anomaly/model/TransactionAlert.java new file mode 100644 index 00000000..d115154f --- /dev/null +++ b/alarm-visualizer/src/main/java/com/anomaly/model/TransactionAlert.java @@ -0,0 +1,106 @@ +package com.anomaly.model; + +import java.time.Instant; + +public class TransactionAlert { + private String alertType; + private Instant alertTime; + private Instant timestamp; + private String cardId; + private String userId; + private double amount; + private double latitude; + private double longitude; + private String message; + + // Default constructor for Gson deserialization + public TransactionAlert() { + } + + public TransactionAlert(String alertType, Instant alertTime, Instant timestamp, + String cardId, String userId, double amount, + double latitude, double longitude, String message) { + this.alertType = alertType; + this.alertTime = alertTime; + this.timestamp = timestamp; + this.cardId = cardId; + this.userId = userId; + this.amount = amount; + this.latitude = latitude; + this.longitude = longitude; + this.message = message; + } + + // Getters and setters + public String getAlertType() { + return alertType; + } + + public void setAlertType(String alertType) { + this.alertType = alertType; + } + + public Instant getAlertTime() { + return alertTime; + } + + public void setAlertTime(Instant alertTime) { + this.alertTime = alertTime; + } + + public Instant getTimestamp() { + return timestamp; + } + + public void setTimestamp(Instant timestamp) { + this.timestamp = timestamp; + } + + public String getCardId() { + return cardId; + } + + public void setCardId(String cardId) { + this.cardId = cardId; + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public double getAmount() { + return amount; + } + + public void setAmount(double amount) { + this.amount = amount; + } + + public double getLatitude() { + return latitude; + } + + public void setLatitude(double latitude) { + this.latitude = latitude; + } + + public double getLongitude() { + return longitude; + } + + public void setLongitude(double longitude) { + this.longitude = longitude; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +} diff --git a/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java index 9a81205c..4c058171 100644 --- a/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java +++ b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java @@ -267,7 +267,7 @@ public class AlertVisualizer { dialog.setVisible(true); // Auto-dismiss after 5 seconds - Timer timer = new Timer(5000, e -> dialog.dispose()); + javax.swing.Timer timer = new javax.swing.Timer(5000, e -> dialog.dispose()); timer.setRepeats(false); timer.start(); }); diff --git a/anomaly-detector/src/main/java/com/anomaly/model/Transaction.java b/anomaly-detector/src/main/java/com/anomaly/model/Transaction.java new file mode 100644 index 00000000..5ced0318 --- /dev/null +++ b/anomaly-detector/src/main/java/com/anomaly/model/Transaction.java @@ -0,0 +1,90 @@ +package com.anomaly.model; + +import java.time.Instant; + +/** + * Represents a financial transaction with location data. + */ +public class Transaction { + private String cardId; + private String userId; + private double amount; + private double latitude; + private double longitude; + private Instant timestamp; + + // Default constructor for deserialization + public Transaction() { + } + + public Transaction(String cardId, String userId, double amount, + double latitude, double longitude, Instant timestamp) { + this.cardId = cardId; + this.userId = userId; + this.amount = amount; + this.latitude = latitude; + this.longitude = longitude; + this.timestamp = timestamp; + } + + // Getters and setters + public String getCardId() { + return cardId; + } + + public void setCardId(String cardId) { + this.cardId = cardId; + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public double getAmount() { + return amount; + } + + public void setAmount(double amount) { + this.amount = amount; + } + + public double getLatitude() { + return latitude; + } + + public void setLatitude(double latitude) { + this.latitude = latitude; + } + + public double getLongitude() { + return longitude; + } + + public void setLongitude(double longitude) { + this.longitude = longitude; + } + + public Instant getTimestamp() { + return timestamp; + } + + public void setTimestamp(Instant timestamp) { + this.timestamp = timestamp; + } + + @Override + public String toString() { + return "Transaction{" + + "cardId='" + cardId + '\'' + + ", userId='" + userId + '\'' + + ", amount=" + amount + + ", latitude=" + latitude + + ", longitude=" + longitude + + ", timestamp=" + timestamp + + '}'; + } +} diff --git a/anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java b/anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java index 89fddf4e..8c8131cd 100644 --- a/anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java +++ b/anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java @@ -2,6 +2,9 @@ package com.anomaly.model; import java.time.Instant; +/** + * Represents an alert generated when an anomaly is detected in a transaction. + */ public class TransactionAlert { private String alertType; private String cardId; @@ -11,14 +14,13 @@ public class TransactionAlert { private double longitude; private Instant timestamp; private String message; - private Instant alertTime; public TransactionAlert() { - this.alertTime = Instant.now(); } - public TransactionAlert(String alertType, String cardId, String userId, double amount, - double latitude, double longitude, Instant timestamp, String message) { + public TransactionAlert(String alertType, String cardId, String userId, + double amount, double latitude, double longitude, + Instant timestamp, String message) { this.alertType = alertType; this.cardId = cardId; this.userId = userId; @@ -27,28 +29,72 @@ public class TransactionAlert { this.longitude = longitude; this.timestamp = timestamp; this.message = message; - this.alertTime = Instant.now(); } // Getters and setters - public String getAlertType() { return alertType; } - public void setAlertType(String alertType) { this.alertType = alertType; } - public String getCardId() { return cardId; } - public void setCardId(String cardId) { this.cardId = cardId; } - public String getUserId() { return userId; } - public void setUserId(String userId) { this.userId = userId; } - public double getAmount() { return amount; } - public void setAmount(double amount) { this.amount = amount; } - public double getLatitude() { return latitude; } - public void setLatitude(double latitude) { this.latitude = latitude; } - public double getLongitude() { return longitude; } - public void setLongitude(double longitude) { this.longitude = longitude; } - public Instant getTimestamp() { return timestamp; } - public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; } - public String getMessage() { return message; } - public void setMessage(String message) { this.message = message; } - public Instant getAlertTime() { return alertTime; } - public void setAlertTime(Instant alertTime) { this.alertTime = alertTime; } + public String getAlertType() { + return alertType; + } + + public void setAlertType(String alertType) { + this.alertType = alertType; + } + + public String getCardId() { + return cardId; + } + + public void setCardId(String cardId) { + this.cardId = cardId; + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public double getAmount() { + return amount; + } + + public void setAmount(double amount) { + this.amount = amount; + } + + public double getLatitude() { + return latitude; + } + + public void setLatitude(double latitude) { + this.latitude = latitude; + } + + public double getLongitude() { + return longitude; + } + + public void setLongitude(double longitude) { + this.longitude = longitude; + } + + public Instant getTimestamp() { + return timestamp; + } + + public void setTimestamp(Instant timestamp) { + this.timestamp = timestamp; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } @Override public String toString() { @@ -59,8 +105,8 @@ public class TransactionAlert { ", amount=" + amount + ", latitude=" + latitude + ", longitude=" + longitude + + ", timestamp=" + timestamp + ", message='" + message + '\'' + - ", alertTime=" + alertTime + '}'; } } diff --git a/kafka-consumer-visualizer/src/main/java/com/anomaly/model/Transaction.java b/kafka-consumer-visualizer/src/main/java/com/anomaly/model/Transaction.java new file mode 100644 index 00000000..9beddc47 --- /dev/null +++ b/kafka-consumer-visualizer/src/main/java/com/anomaly/model/Transaction.java @@ -0,0 +1,65 @@ +package com.anomaly.model; + +/** + * Represents a financial transaction with amount and available credit limit information. + */ +public class Transaction { + private double amount; + private double availableLimit; + private String cardNumber; + private String timestamp; + + // Default constructor for deserialization + public Transaction() { + } + + // Full constructor + public Transaction(double amount, double availableLimit, String cardNumber, String timestamp) { + this.amount = amount; + this.availableLimit = availableLimit; + this.cardNumber = cardNumber; + this.timestamp = timestamp; + } + + public double getAmount() { + return amount; + } + + public void setAmount(double amount) { + this.amount = amount; + } + + public double getAvailableLimit() { + return availableLimit; + } + + public void setAvailableLimit(double availableLimit) { + this.availableLimit = availableLimit; + } + + public String getCardNumber() { + return cardNumber; + } + + public void setCardNumber(String cardNumber) { + this.cardNumber = cardNumber; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + @Override + public String toString() { + return "Transaction{" + + "amount=" + amount + + ", availableLimit=" + availableLimit + + ", cardNumber='" + cardNumber + '\'' + + ", timestamp='" + timestamp + '\'' + + '}'; + } +} diff --git a/run_anomaly_detection.sh b/run_anomaly_detection.sh new file mode 100755 index 00000000..1c25c8ae --- /dev/null +++ b/run_anomaly_detection.sh @@ -0,0 +1,233 @@ +#!/bin/bash + +# Colors for output +GREEN='\033[0;32m' +BLUE='\033[0;34m' +RED='\033[0;31m' +YELLOW='\033[0;33m' +NC='\033[0m' # No Color + +PROJECT_ROOT=$(pwd) +TOPICS=("transactions" "alerts") + +function check_prerequisites { + echo -e "${BLUE}Checking prerequisites...${NC}" + + # Check for Java + if ! command -v java &> /dev/null; then + echo -e "${RED}Java is not installed. Please install JDK 11 or higher.${NC}" + exit 1 + fi + + # Check for Maven + if ! command -v mvn &> /dev/null; then + echo -e "${RED}Maven is not installed. Please install Maven.${NC}" + exit 1 + fi + + # Check for Docker + if ! command -v docker &> /dev/null; then + echo -e "${RED}Docker is not installed. Please install Docker.${NC}" + exit 1 + fi + + # Check for docker-compose + if ! command -v docker-compose &> /dev/null; then + echo -e "${RED}Docker Compose is not installed. Please install Docker Compose.${NC}" + exit 1 + fi + + echo -e "${GREEN}All prerequisites are met.${NC}" +} + +function build_projects { + echo -e "${BLUE}Building all projects...${NC}" + + # Build each project + for project in "transaction-simulator" "kafka-consumer-visualizer" "anomaly-detector" "alarm-visualizer"; do + echo -e "${YELLOW}Building $project...${NC}" + cd "$PROJECT_ROOT/$project" + mvn clean package -DskipTests + + if [ $? -ne 0 ]; then + echo -e "${RED}Failed to build $project.${NC}" + exit 1 + fi + done + + cd "$PROJECT_ROOT" + echo -e "${GREEN}All projects built successfully.${NC}" +} + +function start_infrastructure { + echo -e "${BLUE}Starting Kafka and Flink containers...${NC}" + + docker-compose up -d + + # Wait for Kafka to be ready + echo -e "${YELLOW}Waiting for Kafka to be ready...${NC}" + sleep 10 + + # Create Kafka topics + for topic in "${TOPICS[@]}"; do + echo -e "${YELLOW}Creating Kafka topic: $topic${NC}" + docker-compose exec kafka kafka-topics --create \ + --topic "$topic" \ + --bootstrap-server localhost:9092 \ + --partitions 3 \ + --replication-factor 1 \ + --if-not-exists + done + + echo -e "${GREEN}Infrastructure is up and running.${NC}" +} + +function start_applications { + echo -e "${BLUE}Starting applications...${NC}" + + # Start anomaly detector (Flink app) + echo -e "${YELLOW}Starting Anomaly Detector (Flink App)...${NC}" + cd "$PROJECT_ROOT/anomaly-detector" + java -jar target/anomaly-detector-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/anomaly-detector.log" 2>&1 & + ANOMALY_DETECTOR_PID=$! + echo $ANOMALY_DETECTOR_PID > "$PROJECT_ROOT/logs/anomaly-detector.pid" + + sleep 5 + + # Start alarm visualizer + echo -e "${YELLOW}Starting Alarm Visualizer...${NC}" + cd "$PROJECT_ROOT/alarm-visualizer" + java -jar target/alarm-visualizer-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/alarm-visualizer.log" 2>&1 & + ALARM_VISUALIZER_PID=$! + echo $ALARM_VISUALIZER_PID > "$PROJECT_ROOT/logs/alarm-visualizer.pid" + + # Start test consumer/visualizer + echo -e "${YELLOW}Starting Test Consumer Visualizer...${NC}" + cd "$PROJECT_ROOT/kafka-consumer-visualizer" + java -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/consumer-visualizer.log" 2>&1 & + CONSUMER_PID=$! + echo $CONSUMER_PID > "$PROJECT_ROOT/logs/consumer-visualizer.pid" + + sleep 5 + + # Start transaction simulator + echo -e "${YELLOW}Starting Transaction Simulator...${NC}" + cd "$PROJECT_ROOT/transaction-simulator" + java -jar target/transaction-simulator-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/transaction-simulator.log" 2>&1 & + SIMULATOR_PID=$! + echo $SIMULATOR_PID > "$PROJECT_ROOT/logs/transaction-simulator.pid" + + cd "$PROJECT_ROOT" + echo -e "${GREEN}All applications are running.${NC}" + echo -e "${GREEN}Log files are available in the logs directory.${NC}" +} + +function stop_applications { + echo -e "${BLUE}Stopping applications...${NC}" + + # Stop all Java applications + if [ -f "$PROJECT_ROOT/logs/transaction-simulator.pid" ]; then + kill $(cat "$PROJECT_ROOT/logs/transaction-simulator.pid") 2>/dev/null + rm "$PROJECT_ROOT/logs/transaction-simulator.pid" + fi + + if [ -f "$PROJECT_ROOT/logs/consumer-visualizer.pid" ]; then + kill $(cat "$PROJECT_ROOT/logs/consumer-visualizer.pid") 2>/dev/null + rm "$PROJECT_ROOT/logs/consumer-visualizer.pid" + fi + + if [ -f "$PROJECT_ROOT/logs/anomaly-detector.pid" ]; then + kill $(cat "$PROJECT_ROOT/logs/anomaly-detector.pid") 2>/dev/null + rm "$PROJECT_ROOT/logs/anomaly-detector.pid" + fi + + if [ -f "$PROJECT_ROOT/logs/alarm-visualizer.pid" ]; then + kill $(cat "$PROJECT_ROOT/logs/alarm-visualizer.pid") 2>/dev/null + rm "$PROJECT_ROOT/logs/alarm-visualizer.pid" + fi + + echo -e "${GREEN}All applications stopped.${NC}" +} + +function stop_infrastructure { + echo -e "${BLUE}Stopping infrastructure...${NC}" + + docker-compose down + + echo -e "${GREEN}Infrastructure stopped.${NC}" +} + +function show_logs { + echo -e "${BLUE}Available logs:${NC}" + ls -l "$PROJECT_ROOT/logs" + + echo -e "${YELLOW}Use 'tail -f logs/[filename]' to view a specific log.${NC}" +} + +function print_usage { + echo -e "${BLUE}Credit Card Transaction Anomaly Detection System${NC}" + echo -e "Usage: $0 [options]" + echo -e "Options:" + echo -e " ${GREEN}start${NC} Build and start the entire system" + echo -e " ${GREEN}stop${NC} Stop all components" + echo -e " ${GREEN}restart${NC} Restart the entire system" + echo -e " ${GREEN}status${NC} Check if components are running" + echo -e " ${GREEN}logs${NC} Show log files" +} + +function check_status { + echo -e "${BLUE}Checking system status...${NC}" + + # Check Docker containers + echo -e "${YELLOW}Docker containers:${NC}" + docker-compose ps + + # Check Java processes + echo -e "\n${YELLOW}Java applications:${NC}" + for app in "transaction-simulator" "consumer-visualizer" "anomaly-detector" "alarm-visualizer"; do + if [ -f "$PROJECT_ROOT/logs/$app.pid" ]; then + pid=$(cat "$PROJECT_ROOT/logs/$app.pid") + if ps -p $pid > /dev/null; then + echo -e "${GREEN}$app is running (PID: $pid)${NC}" + else + echo -e "${RED}$app is not running (stale PID file)${NC}" + fi + else + echo -e "${RED}$app is not running${NC}" + fi + done +} + +# Create logs directory +mkdir -p "$PROJECT_ROOT/logs" + +# Parse command-line arguments +case "$1" in + start) + check_prerequisites + build_projects + start_infrastructure + start_applications + ;; + stop) + stop_applications + stop_infrastructure + ;; + restart) + stop_applications + stop_infrastructure + sleep 5 + start_infrastructure + sleep 5 + start_applications + ;; + status) + check_status + ;; + logs) + show_logs + ;; + *) + print_usage + ;; +esac diff --git a/transaction-simulator/src/main/java/com/anomaly/model/Transaction.java b/transaction-simulator/src/main/java/com/anomaly/model/Transaction.java index e69de29b..b45b3a31 100644 --- a/transaction-simulator/src/main/java/com/anomaly/model/Transaction.java +++ b/transaction-simulator/src/main/java/com/anomaly/model/Transaction.java @@ -0,0 +1,85 @@ +package com.anomaly.model; + +import java.time.Instant; +import java.util.Objects; + +public class Transaction { + private final String cardId; + private final String userId; + private final double latitude; + private final double longitude; + private final double amount; + private final double availableLimit; + private final Instant timestamp; + + public Transaction(String cardId, String userId, double latitude, double longitude, + double amount, double availableLimit, Instant timestamp) { + this.cardId = cardId; + this.userId = userId; + this.latitude = latitude; + this.longitude = longitude; + this.amount = amount; + this.availableLimit = availableLimit; + this.timestamp = timestamp; + } + + public String getCardId() { + return cardId; + } + + public String getUserId() { + return userId; + } + + public double getLatitude() { + return latitude; + } + + public double getLongitude() { + return longitude; + } + + public double getAmount() { + return amount; + } + + public double getAvailableLimit() { + return availableLimit; + } + + public Instant getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + return "Transaction{" + + "cardId='" + cardId + '\'' + + ", userId='" + userId + '\'' + + ", latitude=" + latitude + + ", longitude=" + longitude + + ", amount=" + amount + + ", availableLimit=" + availableLimit + + ", timestamp=" + timestamp + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Transaction that = (Transaction) o; + return Double.compare(that.latitude, latitude) == 0 && + Double.compare(that.longitude, longitude) == 0 && + Double.compare(that.amount, amount) == 0 && + Double.compare(that.availableLimit, availableLimit) == 0 && + Objects.equals(cardId, that.cardId) && + Objects.equals(userId, that.userId) && + Objects.equals(timestamp, that.timestamp); + } + + @Override + public int hashCode() { + return Objects.hash(cardId, userId, latitude, longitude, amount, availableLimit, timestamp); + } +} From fe9c99dac10a5bf3088914341c85db3ec8c1a5b4 Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Tue, 15 Apr 2025 18:54:58 +0200 Subject: [PATCH 03/12] fix: run script --- .../anomaly/visualizer/AlertVisualizer.java | 10 +- .../src/main/resources/logback.xml | 17 ++ .../src/main/resources/logback.xml | 20 ++ docker-compose.yml | 39 +++ .../anomaly/consumer/TransactionConsumer.java | 8 +- .../src/main/resources/logback.xml | 17 ++ run_all.sh | 81 ++++++ run_anomaly_detection.sh | 233 ------------------ stop_all.sh | 12 + .../anomaly/producer/TransactionProducer.java | 16 +- .../src/main/resources/logback.xml | 16 ++ 11 files changed, 217 insertions(+), 252 deletions(-) create mode 100644 alarm-visualizer/src/main/resources/logback.xml create mode 100644 anomaly-detector/src/main/resources/logback.xml create mode 100644 kafka-consumer-visualizer/src/main/resources/logback.xml create mode 100755 run_all.sh delete mode 100755 run_anomaly_detection.sh create mode 100755 stop_all.sh create mode 100644 transaction-simulator/src/main/resources/logback.xml diff --git a/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java index 4c058171..29c436b1 100644 --- a/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java +++ b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java @@ -4,8 +4,8 @@ import com.anomaly.model.TransactionAlert; import com.google.gson.Gson; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; import javax.swing.*; import javax.swing.table.DefaultTableModel; @@ -18,7 +18,7 @@ import java.util.*; import java.util.List; public class AlertVisualizer { - private static final Logger logger = LoggerFactory.getLogger(AlertVisualizer.class); + //private static final Logger logger = LoggerFactory.getLogger(AlertVisualizer.class); private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String GROUP_ID = "alert-visualizer-group"; private static final String TOPIC = "alerts"; @@ -52,9 +52,9 @@ public class AlertVisualizer { // Add shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("Shutting down alert visualizer..."); + //logger.info("Shutting down alert visualizer..."); consumer.close(); - logger.info("Alert visualizer closed"); + //logger.info("Alert visualizer closed"); })); // Poll for new alerts diff --git a/alarm-visualizer/src/main/resources/logback.xml b/alarm-visualizer/src/main/resources/logback.xml new file mode 100644 index 00000000..573bb1db --- /dev/null +++ b/alarm-visualizer/src/main/resources/logback.xml @@ -0,0 +1,17 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + diff --git a/anomaly-detector/src/main/resources/logback.xml b/anomaly-detector/src/main/resources/logback.xml new file mode 100644 index 00000000..85b380d9 --- /dev/null +++ b/anomaly-detector/src/main/resources/logback.xml @@ -0,0 +1,20 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + + diff --git a/docker-compose.yml b/docker-compose.yml index e69de29b..3a3dce99 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -0,0 +1,39 @@ +version: '3' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ports: + - "2181:2181" + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + + jobmanager: + image: flink:latest + ports: + - "8081:8081" + command: jobmanager + environment: + - JOB_MANAGER_RPC_ADDRESS=jobmanager + + taskmanager: + image: flink:latest + depends_on: + - jobmanager + command: taskmanager + environment: + - JOB_MANAGER_RPC_ADDRESS=jobmanager \ No newline at end of file diff --git a/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java b/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java index 493d9ead..9e0e46fc 100644 --- a/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java +++ b/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java @@ -4,8 +4,6 @@ import com.anomaly.model.Transaction; import com.google.gson.Gson; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.swing.*; import java.awt.*; @@ -14,7 +12,7 @@ import java.util.*; import java.util.List; public class TransactionConsumer { - private static final Logger logger = LoggerFactory.getLogger(TransactionConsumer.class); + //private static final Logger logger = LoggerFactory.getLogger(TransactionConsumer.class); private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String GROUP_ID = "transaction-consumer-group"; private static final String TOPIC = "transactions"; @@ -46,9 +44,9 @@ public class TransactionConsumer { // Add shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("Shutting down consumer..."); + //logger.info("Shutting down consumer..."); consumer.close(); - logger.info("Consumer closed"); + //logger.info("Consumer closed"); })); // Poll for new data diff --git a/kafka-consumer-visualizer/src/main/resources/logback.xml b/kafka-consumer-visualizer/src/main/resources/logback.xml new file mode 100644 index 00000000..573bb1db --- /dev/null +++ b/kafka-consumer-visualizer/src/main/resources/logback.xml @@ -0,0 +1,17 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + diff --git a/run_all.sh b/run_all.sh new file mode 100755 index 00000000..0ce2570b --- /dev/null +++ b/run_all.sh @@ -0,0 +1,81 @@ +#!/bin/bash + +# Set working directory to script location +cd "$(dirname "$0")" + +# Check if Docker daemon is running +if ! docker info &>/dev/null; then + echo "ERROR: Docker daemon is not running." + echo "Please start Docker with: 'sudo systemctl start docker'" + echo "If you want Docker to start automatically at boot: 'sudo systemctl enable docker'" + echo "To run Docker without sudo, add your user to the docker group: 'sudo usermod -aG docker $USER'" + echo "Then log out and log back in for the changes to take effect." + exit 1 +fi + +# Note about docker-compose.yml +echo "Note: Your docker-compose.yml contains an obsolete 'version' attribute that should be removed." + +echo "Starting Docker containers..." +docker-compose up -d + +echo "Building Maven projects..." +cd transaction-simulator && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd .. +cd anomaly-detector && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd .. +cd kafka-consumer-visualizer && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd .. +cd alarm-visualizer && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd .. + +echo "Creating Kafka topics..." +docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions +docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic alerts + +echo "Starting all applications..." + +# Start Flink job (Anomaly Detector) +echo "Starting Anomaly Detector..." +cd anomaly-detector +java -jar target/anomaly-detector-1.0-SNAPSHOT.jar & +ANOMALY_PID=$! +cd .. + +# Start Alert Visualizer +echo "Starting Alert Visualizer..." +cd alarm-visualizer +java -jar target/alarm-visualizer-1.0-SNAPSHOT.jar & +ALARM_PID=$! +cd .. + +# Start Transaction Consumer/Visualizer +echo "Starting Transaction Consumer..." +cd kafka-consumer-visualizer +java -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar & +CONSUMER_PID=$! +cd .. + +# Start Transaction Producer last +echo "Starting Transaction Producer..." +cd transaction-simulator +java -jar target/transaction-simulator-1.0-SNAPSHOT.jar & +PRODUCER_PID=$! +cd .. + +echo "All applications are running!" +echo "Press Ctrl+C to stop all applications" + +# Function to handle shutdown +function cleanup { + echo "Shutting down applications..." + kill $PRODUCER_PID $CONSUMER_PID $ALARM_PID $ANOMALY_PID + echo "Stopping Docker containers..." + docker-compose down + echo "All done!" + exit 0 +} + +# Catch shutdown signal +trap cleanup SIGINT SIGTERM + +# Keep script running +while true; do + sleep 1 +done diff --git a/run_anomaly_detection.sh b/run_anomaly_detection.sh deleted file mode 100755 index 1c25c8ae..00000000 --- a/run_anomaly_detection.sh +++ /dev/null @@ -1,233 +0,0 @@ -#!/bin/bash - -# Colors for output -GREEN='\033[0;32m' -BLUE='\033[0;34m' -RED='\033[0;31m' -YELLOW='\033[0;33m' -NC='\033[0m' # No Color - -PROJECT_ROOT=$(pwd) -TOPICS=("transactions" "alerts") - -function check_prerequisites { - echo -e "${BLUE}Checking prerequisites...${NC}" - - # Check for Java - if ! command -v java &> /dev/null; then - echo -e "${RED}Java is not installed. Please install JDK 11 or higher.${NC}" - exit 1 - fi - - # Check for Maven - if ! command -v mvn &> /dev/null; then - echo -e "${RED}Maven is not installed. Please install Maven.${NC}" - exit 1 - fi - - # Check for Docker - if ! command -v docker &> /dev/null; then - echo -e "${RED}Docker is not installed. Please install Docker.${NC}" - exit 1 - fi - - # Check for docker-compose - if ! command -v docker-compose &> /dev/null; then - echo -e "${RED}Docker Compose is not installed. Please install Docker Compose.${NC}" - exit 1 - fi - - echo -e "${GREEN}All prerequisites are met.${NC}" -} - -function build_projects { - echo -e "${BLUE}Building all projects...${NC}" - - # Build each project - for project in "transaction-simulator" "kafka-consumer-visualizer" "anomaly-detector" "alarm-visualizer"; do - echo -e "${YELLOW}Building $project...${NC}" - cd "$PROJECT_ROOT/$project" - mvn clean package -DskipTests - - if [ $? -ne 0 ]; then - echo -e "${RED}Failed to build $project.${NC}" - exit 1 - fi - done - - cd "$PROJECT_ROOT" - echo -e "${GREEN}All projects built successfully.${NC}" -} - -function start_infrastructure { - echo -e "${BLUE}Starting Kafka and Flink containers...${NC}" - - docker-compose up -d - - # Wait for Kafka to be ready - echo -e "${YELLOW}Waiting for Kafka to be ready...${NC}" - sleep 10 - - # Create Kafka topics - for topic in "${TOPICS[@]}"; do - echo -e "${YELLOW}Creating Kafka topic: $topic${NC}" - docker-compose exec kafka kafka-topics --create \ - --topic "$topic" \ - --bootstrap-server localhost:9092 \ - --partitions 3 \ - --replication-factor 1 \ - --if-not-exists - done - - echo -e "${GREEN}Infrastructure is up and running.${NC}" -} - -function start_applications { - echo -e "${BLUE}Starting applications...${NC}" - - # Start anomaly detector (Flink app) - echo -e "${YELLOW}Starting Anomaly Detector (Flink App)...${NC}" - cd "$PROJECT_ROOT/anomaly-detector" - java -jar target/anomaly-detector-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/anomaly-detector.log" 2>&1 & - ANOMALY_DETECTOR_PID=$! - echo $ANOMALY_DETECTOR_PID > "$PROJECT_ROOT/logs/anomaly-detector.pid" - - sleep 5 - - # Start alarm visualizer - echo -e "${YELLOW}Starting Alarm Visualizer...${NC}" - cd "$PROJECT_ROOT/alarm-visualizer" - java -jar target/alarm-visualizer-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/alarm-visualizer.log" 2>&1 & - ALARM_VISUALIZER_PID=$! - echo $ALARM_VISUALIZER_PID > "$PROJECT_ROOT/logs/alarm-visualizer.pid" - - # Start test consumer/visualizer - echo -e "${YELLOW}Starting Test Consumer Visualizer...${NC}" - cd "$PROJECT_ROOT/kafka-consumer-visualizer" - java -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/consumer-visualizer.log" 2>&1 & - CONSUMER_PID=$! - echo $CONSUMER_PID > "$PROJECT_ROOT/logs/consumer-visualizer.pid" - - sleep 5 - - # Start transaction simulator - echo -e "${YELLOW}Starting Transaction Simulator...${NC}" - cd "$PROJECT_ROOT/transaction-simulator" - java -jar target/transaction-simulator-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/transaction-simulator.log" 2>&1 & - SIMULATOR_PID=$! - echo $SIMULATOR_PID > "$PROJECT_ROOT/logs/transaction-simulator.pid" - - cd "$PROJECT_ROOT" - echo -e "${GREEN}All applications are running.${NC}" - echo -e "${GREEN}Log files are available in the logs directory.${NC}" -} - -function stop_applications { - echo -e "${BLUE}Stopping applications...${NC}" - - # Stop all Java applications - if [ -f "$PROJECT_ROOT/logs/transaction-simulator.pid" ]; then - kill $(cat "$PROJECT_ROOT/logs/transaction-simulator.pid") 2>/dev/null - rm "$PROJECT_ROOT/logs/transaction-simulator.pid" - fi - - if [ -f "$PROJECT_ROOT/logs/consumer-visualizer.pid" ]; then - kill $(cat "$PROJECT_ROOT/logs/consumer-visualizer.pid") 2>/dev/null - rm "$PROJECT_ROOT/logs/consumer-visualizer.pid" - fi - - if [ -f "$PROJECT_ROOT/logs/anomaly-detector.pid" ]; then - kill $(cat "$PROJECT_ROOT/logs/anomaly-detector.pid") 2>/dev/null - rm "$PROJECT_ROOT/logs/anomaly-detector.pid" - fi - - if [ -f "$PROJECT_ROOT/logs/alarm-visualizer.pid" ]; then - kill $(cat "$PROJECT_ROOT/logs/alarm-visualizer.pid") 2>/dev/null - rm "$PROJECT_ROOT/logs/alarm-visualizer.pid" - fi - - echo -e "${GREEN}All applications stopped.${NC}" -} - -function stop_infrastructure { - echo -e "${BLUE}Stopping infrastructure...${NC}" - - docker-compose down - - echo -e "${GREEN}Infrastructure stopped.${NC}" -} - -function show_logs { - echo -e "${BLUE}Available logs:${NC}" - ls -l "$PROJECT_ROOT/logs" - - echo -e "${YELLOW}Use 'tail -f logs/[filename]' to view a specific log.${NC}" -} - -function print_usage { - echo -e "${BLUE}Credit Card Transaction Anomaly Detection System${NC}" - echo -e "Usage: $0 [options]" - echo -e "Options:" - echo -e " ${GREEN}start${NC} Build and start the entire system" - echo -e " ${GREEN}stop${NC} Stop all components" - echo -e " ${GREEN}restart${NC} Restart the entire system" - echo -e " ${GREEN}status${NC} Check if components are running" - echo -e " ${GREEN}logs${NC} Show log files" -} - -function check_status { - echo -e "${BLUE}Checking system status...${NC}" - - # Check Docker containers - echo -e "${YELLOW}Docker containers:${NC}" - docker-compose ps - - # Check Java processes - echo -e "\n${YELLOW}Java applications:${NC}" - for app in "transaction-simulator" "consumer-visualizer" "anomaly-detector" "alarm-visualizer"; do - if [ -f "$PROJECT_ROOT/logs/$app.pid" ]; then - pid=$(cat "$PROJECT_ROOT/logs/$app.pid") - if ps -p $pid > /dev/null; then - echo -e "${GREEN}$app is running (PID: $pid)${NC}" - else - echo -e "${RED}$app is not running (stale PID file)${NC}" - fi - else - echo -e "${RED}$app is not running${NC}" - fi - done -} - -# Create logs directory -mkdir -p "$PROJECT_ROOT/logs" - -# Parse command-line arguments -case "$1" in - start) - check_prerequisites - build_projects - start_infrastructure - start_applications - ;; - stop) - stop_applications - stop_infrastructure - ;; - restart) - stop_applications - stop_infrastructure - sleep 5 - start_infrastructure - sleep 5 - start_applications - ;; - status) - check_status - ;; - logs) - show_logs - ;; - *) - print_usage - ;; -esac diff --git a/stop_all.sh b/stop_all.sh new file mode 100755 index 00000000..e07a9bb9 --- /dev/null +++ b/stop_all.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +echo "Stopping all Java applications..." +pkill -f "java -jar target/transaction-simulator" +pkill -f "java -jar target/anomaly-detector" +pkill -f "java -jar target/kafka-consumer-visualizer" +pkill -f "java -jar target/alarm-visualizer" + +echo "Stopping Docker containers..." +docker-compose down + +echo "All applications have been stopped!" diff --git a/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java b/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java index ad67b774..d24d9283 100644 --- a/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java +++ b/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java @@ -5,15 +5,13 @@ import com.anomaly.model.Transaction; import com.google.gson.Gson; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; public class TransactionProducer { - private static final Logger logger = LoggerFactory.getLogger(TransactionProducer.class); + //private static final Logger logger = LoggerFactory.getLogger(TransactionProducer.class); private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String TOPIC_NAME = "transactions"; private static final TransactionGenerator generator = new TransactionGenerator(); @@ -31,10 +29,10 @@ public class TransactionProducer { // Add shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("Shutting down producer..."); + //logger.info("Shutting down producer..."); producer.flush(); producer.close(); - logger.info("Producer closed"); + //logger.info("Producer closed"); })); // Generate and send transactions @@ -58,7 +56,7 @@ public class TransactionProducer { Thread.sleep(ThreadLocalRandom.current().nextLong(100, 1000)); } } catch (InterruptedException | ExecutionException e) { - logger.error("Error in transaction producer", e); + //logger.error("Error in transaction producer", e); } finally { producer.flush(); producer.close(); @@ -76,10 +74,10 @@ public class TransactionProducer { producer.send(record, (metadata, exception) -> { if (exception == null) { - logger.info("Received metadata: Topic: {}, Partition: {}, Offset: {}, Timestamp: {}", - metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp()); + //logger.info("Received metadata: Topic: {}, Partition: {}, Offset: {}, Timestamp: {}", + // metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp()); } else { - logger.error("Error sending message", exception); + //logger.error("Error sending message", exception); } }).get(); // Making it synchronous for demonstration } diff --git a/transaction-simulator/src/main/resources/logback.xml b/transaction-simulator/src/main/resources/logback.xml new file mode 100644 index 00000000..e9231f59 --- /dev/null +++ b/transaction-simulator/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + From cb1bd7dae3df5b9316dcde4218f1623826bac38c Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Tue, 15 Apr 2025 19:09:46 +0200 Subject: [PATCH 04/12] fix: transaction visualizer displays something --- .../com/anomaly/detector/AnomalyDetector.java | 44 +++++++++++-------- .../anomaly/producer/TransactionProducer.java | 22 +++++++++- 2 files changed, 46 insertions(+), 20 deletions(-) diff --git a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java index b7ebe65a..04f73f73 100644 --- a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java +++ b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java @@ -5,14 +5,17 @@ import com.anomaly.model.TransactionAlert; import com.google.gson.Gson; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.util.Collector; import java.util.*; @@ -29,21 +32,18 @@ public class AnomalyDetector { // Set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // Configure Kafka consumer - Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS); - properties.setProperty("group.id", "anomaly-detector"); - - // Create Kafka consumer - FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>( - INPUT_TOPIC, - new SimpleStringSchema(), - properties - ); + // Create Kafka source to replace deprecated FlinkKafkaConsumer + KafkaSource kafkaSource = KafkaSource.builder() + .setBootstrapServers(BOOTSTRAP_SERVERS) + .setTopics(INPUT_TOPIC) + .setGroupId("anomaly-detector") + .setStartingOffsets(OffsetsInitializer.earliest()) + .setValueOnlyDeserializer(new SimpleStringSchema()) + .build(); // Parse JSON transactions DataStream transactionStream = env - .addSource(consumer) + .fromSource(kafkaSource, org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(), "Kafka Source") .map(new MapFunction() { @Override public Transaction map(String value) throws Exception { @@ -74,14 +74,20 @@ public class AnomalyDetector { DataStream allAlerts = amountAlerts .union(locationAlerts, frequencyAlerts); + // Create KafkaSink to replace deprecated FlinkKafkaProducer + KafkaSink kafkaSink = KafkaSink.builder() + .setBootstrapServers(BOOTSTRAP_SERVERS) + .setRecordSerializer(KafkaRecordSerializationSchema.builder() + .setTopic(OUTPUT_TOPIC) + .setValueSerializationSchema(new SimpleStringSchema()) + .build()) + .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .build(); + // Convert alerts to JSON and send to Kafka allAlerts .map(alert -> gson.toJson(alert)) - .addSink(new FlinkKafkaProducer<>( - OUTPUT_TOPIC, - new SimpleStringSchema(), - properties - )); + .sinkTo(kafkaSink); // Execute the Flink job env.execute("Credit Card Transaction Anomaly Detection"); diff --git a/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java b/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java index d24d9283..e0129fa4 100644 --- a/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java +++ b/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java @@ -3,9 +3,15 @@ package com.anomaly.producer; import com.anomaly.generator.TransactionGenerator; import com.anomaly.model.Transaction; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; +import java.io.IOException; +import java.time.Instant; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; @@ -15,7 +21,21 @@ public class TransactionProducer { private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String TOPIC_NAME = "transactions"; private static final TransactionGenerator generator = new TransactionGenerator(); - private static final Gson gson = new Gson(); + + // Replace simple Gson with a properly configured instance + private static final Gson gson = new GsonBuilder() + .registerTypeAdapter(Instant.class, new TypeAdapter() { + @Override + public void write(JsonWriter out, Instant value) throws IOException { + out.value(value != null ? value.toString() : null); + } + + @Override + public Instant read(JsonReader in) throws IOException { + return Instant.parse(in.nextString()); + } + }) + .create(); public static void main(String[] args) { // Create producer properties From 4b3f12627d55049b0109a329ae11e3ca12a6c0fc Mon Sep 17 00:00:00 2001 From: Krzysztof Rudnicki Date: Sun, 20 Apr 2025 13:44:56 +0200 Subject: [PATCH 05/12] fix: source and target prop replaced with release --- alarm-visualizer/pom.xml | 3 +-- anomaly-detector/pom.xml | 3 +-- kafka-consumer-visualizer/pom.xml | 3 +-- transaction-simulator/pom.xml | 3 +-- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/alarm-visualizer/pom.xml b/alarm-visualizer/pom.xml index 85650d32..41f49433 100644 --- a/alarm-visualizer/pom.xml +++ b/alarm-visualizer/pom.xml @@ -9,8 +9,7 @@ 1.0-SNAPSHOT - 11 - 11 + 11 diff --git a/anomaly-detector/pom.xml b/anomaly-detector/pom.xml index 1a18c827..d113bad1 100644 --- a/anomaly-detector/pom.xml +++ b/anomaly-detector/pom.xml @@ -9,8 +9,7 @@ 1.0-SNAPSHOT - 11 - 11 + 11 1.14.2 2.12 diff --git a/kafka-consumer-visualizer/pom.xml b/kafka-consumer-visualizer/pom.xml index fba8f6d7..f4098780 100644 --- a/kafka-consumer-visualizer/pom.xml +++ b/kafka-consumer-visualizer/pom.xml @@ -9,8 +9,7 @@ 1.0-SNAPSHOT - 11 - 11 + 11 diff --git a/transaction-simulator/pom.xml b/transaction-simulator/pom.xml index 99123c8f..3a18a857 100644 --- a/transaction-simulator/pom.xml +++ b/transaction-simulator/pom.xml @@ -9,8 +9,7 @@ 1.0-SNAPSHOT - 11 - 11 + 11 From be94f2dd00a1e2f3e03198d86b958224f0b639c6 Mon Sep 17 00:00:00 2001 From: Krzysztof Rudnicki Date: Sun, 20 Apr 2025 14:26:50 +0200 Subject: [PATCH 06/12] fix: anomaly detector breaking --- .../com/anomaly/detector/AnomalyDetector.java | 28 ++++++++++++++++++- run_all.sh | 8 +++--- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java index 04f73f73..04058140 100644 --- a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java +++ b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java @@ -3,6 +3,10 @@ package com.anomaly.detector; import com.anomaly.model.Transaction; import com.anomaly.model.TransactionAlert; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; @@ -20,13 +24,35 @@ import org.apache.flink.util.Collector; import java.util.*; import java.time.Instant; +import java.io.IOException; public class AnomalyDetector { private static final String INPUT_TOPIC = "transactions"; private static final String OUTPUT_TOPIC = "alerts"; private static final String BOOTSTRAP_SERVERS = "localhost:9092"; - private static final Gson gson = new Gson(); + + // Replace the simple Gson initialization with a configured one + private static final Gson gson = new GsonBuilder() + .registerTypeAdapter(Instant.class, new InstantTypeAdapter()) + .create(); + + // Add a custom TypeAdapter for Instant + private static class InstantTypeAdapter extends TypeAdapter { + @Override + public void write(JsonWriter out, Instant value) throws IOException { + if (value == null) { + out.nullValue(); + } else { + out.value(value.toString()); + } + } + + @Override + public Instant read(JsonReader in) throws IOException { + return Instant.parse(in.nextString()); + } + } public static void main(String[] args) throws Exception { // Set up the execution environment diff --git a/run_all.sh b/run_all.sh index 0ce2570b..3f04ca72 100755 --- a/run_all.sh +++ b/run_all.sh @@ -34,28 +34,28 @@ echo "Starting all applications..." # Start Flink job (Anomaly Detector) echo "Starting Anomaly Detector..." cd anomaly-detector -java -jar target/anomaly-detector-1.0-SNAPSHOT.jar & +java --add-opens java.base/java.time=ALL-UNNAMED -jar target/anomaly-detector-1.0-SNAPSHOT.jar & ANOMALY_PID=$! cd .. # Start Alert Visualizer echo "Starting Alert Visualizer..." cd alarm-visualizer -java -jar target/alarm-visualizer-1.0-SNAPSHOT.jar & +java --add-opens java.base/java.time=ALL-UNNAMED -jar target/alarm-visualizer-1.0-SNAPSHOT.jar & ALARM_PID=$! cd .. # Start Transaction Consumer/Visualizer echo "Starting Transaction Consumer..." cd kafka-consumer-visualizer -java -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar & +java --add-opens java.base/java.time=ALL-UNNAMED -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar & CONSUMER_PID=$! cd .. # Start Transaction Producer last echo "Starting Transaction Producer..." cd transaction-simulator -java -jar target/transaction-simulator-1.0-SNAPSHOT.jar & +java --add-opens java.base/java.time=ALL-UNNAMED -jar target/transaction-simulator-1.0-SNAPSHOT.jar & PRODUCER_PID=$! cd .. From 81532221d447aff1ef33286a9869be44fe0e153d Mon Sep 17 00:00:00 2001 From: Krzysztof Rudnicki Date: Sun, 20 Apr 2025 14:28:21 +0200 Subject: [PATCH 07/12] fix: alert visualizer does sometihing --- .../anomaly/visualizer/AlertVisualizer.java | 27 ++++++++++++++++++- .../generator/TransactionGenerator.java | 2 +- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java index 29c436b1..209f6405 100644 --- a/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java +++ b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java @@ -2,6 +2,11 @@ package com.anomaly.visualizer; import com.anomaly.model.TransactionAlert; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonParseException; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; //import org.slf4j.Logger; @@ -10,6 +15,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import javax.swing.*; import javax.swing.table.DefaultTableModel; import java.awt.*; +import java.lang.reflect.Type; import java.time.Duration; import java.time.Instant; import java.time.ZoneId; @@ -22,7 +28,11 @@ public class AlertVisualizer { private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String GROUP_ID = "alert-visualizer-group"; private static final String TOPIC = "alerts"; - private static final Gson gson = new Gson(); + + // Custom Gson instance with Instant type adapter + private static final Gson gson = new GsonBuilder() + .registerTypeAdapter(Instant.class, new InstantDeserializer()) + .create(); // UI Components private static JFrame frame; @@ -33,6 +43,21 @@ public class AlertVisualizer { private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()); + // Custom deserializer for Instant + private static class InstantDeserializer implements JsonDeserializer { + @Override + public Instant deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { + try { + // Try parsing as long (epoch milliseconds) + return Instant.ofEpochMilli(json.getAsLong()); + } catch (NumberFormatException e) { + // Try parsing as ISO-8601 string + return Instant.parse(json.getAsString()); + } + } + } + public static void main(String[] args) { setupUI(); diff --git a/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java b/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java index d9a52a07..1c05b7b4 100644 --- a/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java +++ b/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java @@ -21,7 +21,7 @@ public class TransactionGenerator { private static final int ANOMALY_FREQUENCY = 3; // Probability of generating an anomaly (1%) - private static final double ANOMALY_PROBABILITY = 0.01; + private static final double ANOMALY_PROBABILITY = 0.5; // Initialize card and user data static { From 767de2e64381511c9e546bc87efffaffa3237556 Mon Sep 17 00:00:00 2001 From: Krzysztof Rudnicki Date: Sun, 20 Apr 2025 14:30:45 +0200 Subject: [PATCH 08/12] fix: alert visualoizer adds alerst to table --- .../java/com/anomaly/visualizer/AlertVisualizer.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java index 209f6405..c27be437 100644 --- a/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java +++ b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java @@ -167,7 +167,9 @@ public class AlertVisualizer { synchronized (allAlerts) { for (TransactionAlert alert : allAlerts) { - String formattedTime = formatter.format(alert.getAlertTime()); + // Add null check for alert time + String formattedTime = alert.getAlertTime() != null ? + formatter.format(alert.getAlertTime()) : "N/A"; tableModel.addRow(new Object[]{ formattedTime, alert.getAlertType(), @@ -187,8 +189,10 @@ public class AlertVisualizer { sb.append("ALERT DETAILS\n"); sb.append("============================================\n\n"); sb.append("Alert Type: ").append(alert.getAlertType()).append("\n"); - sb.append("Alert Time: ").append(formatter.format(alert.getAlertTime())).append("\n"); - sb.append("Transaction Time: ").append(formatter.format(alert.getTimestamp())).append("\n\n"); + sb.append("Alert Time: ").append(alert.getAlertTime() != null ? + formatter.format(alert.getAlertTime()) : "N/A").append("\n"); + sb.append("Transaction Time: ").append(alert.getTimestamp() != null ? + formatter.format(alert.getTimestamp()) : "N/A").append("\n\n"); sb.append("Card ID: ").append(alert.getCardId()).append("\n"); sb.append("User ID: ").append(alert.getUserId()).append("\n"); sb.append("Transaction Amount: $").append(String.format("%.2f", alert.getAmount())).append("\n\n"); From 06f79923bbbf5330210140212405a8805e4d1776 Mon Sep 17 00:00:00 2001 From: kacperlo Date: Tue, 10 Jun 2025 15:49:31 +0200 Subject: [PATCH 09/12] Fix anomaly detectors --- .../com/anomaly/detector/AnomalyDetector.java | 216 ++++++++++++------ 1 file changed, 148 insertions(+), 68 deletions(-) diff --git a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java index 04058140..c7128331 100644 --- a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java +++ b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java @@ -21,10 +21,16 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; import java.util.*; import java.time.Instant; import java.io.IOException; +import java.io.Serializable; public class AnomalyDetector { @@ -81,13 +87,13 @@ public class AnomalyDetector { // 1. Amount anomaly - sudden high-value transactions DataStream amountAlerts = transactionStream .keyBy(Transaction::getCardId) - .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(1))) + .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1))) .process(new AmountAnomalyDetector()); // 2. Location anomaly - sudden change in location DataStream locationAlerts = transactionStream .keyBy(Transaction::getCardId) - .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(1))) + .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1))) .process(new LocationAnomalyDetector()); // 3. Frequency anomaly - unusual number of transactions in short time @@ -120,28 +126,28 @@ public class AnomalyDetector { } // Detector for unusual transaction amounts - public static class AmountAnomalyDetector + public static class AmountAnomalyDetector extends ProcessWindowFunction { - + @Override - public void process(String cardId, Context context, Iterable transactions, + public void process(String cardId, Context context, Iterable transactions, Collector out) { List transactionList = new ArrayList<>(); transactions.forEach(transactionList::add); - + if (transactionList.isEmpty()) return; - + // Calculate statistics double averageAmount = transactionList.stream() .mapToDouble(Transaction::getAmount) .average() .orElse(0); - + double stdDeviation = calculateStdDeviation(transactionList, averageAmount); - - // Check for anomalies (transactions that are more than 3 standard deviations from mean) + + // Check for anomalies (transactions that are more than 1.7 standard deviations from mean) for (Transaction transaction : transactionList) { - if (stdDeviation > 0 && Math.abs(transaction.getAmount() - averageAmount) > 3 * stdDeviation) { + if (stdDeviation > 0 && Math.abs(transaction.getAmount() - averageAmount) > 2 * stdDeviation && transaction.getAmount() > averageAmount && transaction.getAmount() > 1000) { out.collect(new TransactionAlert( "AMOUNT_ANOMALY", transaction.getCardId(), @@ -150,13 +156,13 @@ public class AnomalyDetector { transaction.getLatitude(), transaction.getLongitude(), transaction.getTimestamp(), - "Unusual transaction amount detected: $" + transaction.getAmount() + + "Unusual transaction amount detected: $" + transaction.getAmount() + " (Average: $" + String.format("%.2f", averageAmount) + ")" )); } } } - + private double calculateStdDeviation(List transactions, double mean) { return Math.sqrt(transactions.stream() .mapToDouble(t -> Math.pow(t.getAmount() - mean, 2)) @@ -166,87 +172,153 @@ public class AnomalyDetector { } // Detector for unusual transaction locations - public static class LocationAnomalyDetector + public static class LocationAnomalyDetector extends ProcessWindowFunction { - - // Map to store frequent locations for each card - private final Map> cardLocations = new HashMap<>(); - + + private transient MapState> knownLocations; + private static final int MAX_KNOWN_LOCATIONS = 5; // Limit known locations to avoid memory issues + private static final double ANOMALY_DISTANCE_THRESHOLD = 50.0; // Threshold in km + private static final int MIN_LOCATIONS_FOR_DETECTION = 3; // Minimum known locations before detecting anomalies + @Override - public void process(String cardId, Context context, Iterable transactions, - Collector out) { + public void open(Configuration parameters) throws Exception { + MapStateDescriptor> descriptor = + new MapStateDescriptor<>( + "knownLocations", + TypeInformation.of(String.class), + TypeInformation.of(new TypeHint>() {}) + ); + knownLocations = getRuntimeContext().getMapState(descriptor); + } + + @Override + public void process(String cardId, Context context, Iterable transactions, + Collector out) throws Exception { List transactionList = new ArrayList<>(); transactions.forEach(transactionList::add); - + if (transactionList.isEmpty()) return; - + // Get or create location set for this card - Set frequentLocations = cardLocations.computeIfAbsent(cardId, k -> new HashSet<>()); - + Set cardKnownLocations; + if (knownLocations.contains(cardId)) { + cardKnownLocations = knownLocations.get(cardId); + System.out.println("Card " + cardId + " has " + cardKnownLocations.size() + " known locations"); + } else { + cardKnownLocations = new HashSet<>(); + System.out.println("New card detected: " + cardId + ", initializing known locations"); + } + // Process each transaction for (Transaction transaction : transactionList) { LocationPoint currentPoint = new LocationPoint(transaction.getLatitude(), transaction.getLongitude()); - - // If we have at least 3 frequent locations for this card - if (frequentLocations.size() >= 3) { - boolean isNearKnownLocation = false; - - // Check if current location is near any known frequent location - for (LocationPoint knownPoint : frequentLocations) { - if (calculateDistance(currentPoint, knownPoint) < 50) { // Less than 50km - isNearKnownLocation = true; + + // First few transactions establish the baseline locations + if (cardKnownLocations.size() < MIN_LOCATIONS_FOR_DETECTION) { + System.out.println("Building baseline for card " + cardId + ", adding location #" + + (cardKnownLocations.size() + 1) + " to known locations"); + + // Check if this location is already very close to a known location before adding + boolean isVeryCloseToKnown = false; + for (LocationPoint knownPoint : cardKnownLocations) { + if (calculateDistance(currentPoint, knownPoint) < 2.0) { // Within 2km = same area + isVeryCloseToKnown = true; + System.out.println("Location is very close to existing baseline location, not adding duplicate"); break; } } - - // If not near any known location, it might be an anomaly - if (!isNearKnownLocation) { - out.collect(new TransactionAlert( - "LOCATION_ANOMALY", - transaction.getCardId(), - transaction.getUserId(), - transaction.getAmount(), - transaction.getLatitude(), - transaction.getLongitude(), - transaction.getTimestamp(), - "Unusual transaction location detected at: " + - transaction.getLatitude() + ", " + transaction.getLongitude() - )); + + // Only add distinct baseline locations + if (!isVeryCloseToKnown) { + cardKnownLocations.add(currentPoint); + } + + // We're still building the baseline, don't check for anomalies yet + continue; + } + + // Check distance to known locations + double closestDistance = Double.MAX_VALUE; + LocationPoint closestPoint = null; + + for (LocationPoint knownPoint : cardKnownLocations) { + double distance = calculateDistance(currentPoint, knownPoint); + if (distance < closestDistance) { + closestDistance = distance; + closestPoint = knownPoint; } } - - // Add current location to frequent locations (max 10 locations per card) - if (frequentLocations.size() < 10) { - frequentLocations.add(currentPoint); + + System.out.println("CARD " + cardId + ": Transaction at " + currentPoint + ", closest known location: " + + closestPoint + " (" + String.format("%.2f", closestDistance) + " km)"); + + // Detect anomaly if transaction is far from all known locations + if (closestDistance > ANOMALY_DISTANCE_THRESHOLD) { + System.out.println("⚠️ LOCATION ANOMALY DETECTED: Distance " + + String.format("%.2f", closestDistance) + "km exceeds threshold of " + + ANOMALY_DISTANCE_THRESHOLD + "km"); + + out.collect(new TransactionAlert( + "LOCATION_ANOMALY", + transaction.getCardId(), + transaction.getUserId(), + transaction.getAmount(), + transaction.getLatitude(), + transaction.getLongitude(), + transaction.getTimestamp(), + "Unusual transaction location: " + String.format("%.2f", closestDistance) + + "km from nearest known location" + )); + + // Don't automatically add anomalous locations to known locations + } else { + // Check if this location is already very close to a known location + boolean isVeryCloseToKnown = false; + for (LocationPoint knownPoint : cardKnownLocations) { + if (calculateDistance(currentPoint, knownPoint) < 2.0) { // Within 2km = same area + isVeryCloseToKnown = true; + break; + } + } + + // Only add distinct new locations, up to our maximum + if (!isVeryCloseToKnown && cardKnownLocations.size() < MAX_KNOWN_LOCATIONS) { + cardKnownLocations.add(currentPoint); + System.out.println("Added new location to known locations: " + currentPoint); + } } } + + // Update the state + knownLocations.put(cardId, cardKnownLocations); } - + // Calculate distance between two points using Haversine formula (in km) private double calculateDistance(LocationPoint p1, LocationPoint p2) { final int R = 6371; // Earth radius in km - + double latDistance = Math.toRadians(p2.latitude - p1.latitude); double lonDistance = Math.toRadians(p2.longitude - p1.longitude); - + double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2) + Math.cos(Math.toRadians(p1.latitude)) * Math.cos(Math.toRadians(p2.latitude)) * Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2); - + double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); - + return R * c; } - - private static class LocationPoint { + + private static class LocationPoint implements Serializable { + private static final long serialVersionUID = 1L; private final double latitude; private final double longitude; - + public LocationPoint(double latitude, double longitude) { this.latitude = latitude; this.longitude = longitude; } - + @Override public boolean equals(Object o) { if (this == o) return true; @@ -255,35 +327,43 @@ public class AnomalyDetector { return Double.compare(that.latitude, latitude) == 0 && Double.compare(that.longitude, longitude) == 0; } - + @Override public int hashCode() { return Objects.hash(latitude, longitude); } + + @Override + public String toString() { + return "LocationPoint{" + + "lat=" + latitude + + ", lon=" + longitude + + '}'; + } } } // Detector for unusual transaction frequency - public static class FrequencyAnomalyDetector + public static class FrequencyAnomalyDetector extends ProcessWindowFunction { - + @Override - public void process(String cardId, Context context, Iterable transactions, + public void process(String cardId, Context context, Iterable transactions, Collector out) { List transactionList = new ArrayList<>(); transactions.forEach(transactionList::add); - + // Get window info long windowStart = context.window().getStart(); long windowEnd = context.window().getEnd(); long windowSizeMinutes = (windowEnd - windowStart) / (1000 * 60); - + // If there are more than 5 transactions in 5 minutes for the same card, flag it if (transactionList.size() > 5) { Transaction latestTransaction = transactionList.stream() .max(Comparator.comparing(Transaction::getTimestamp)) .orElse(transactionList.get(0)); - + out.collect(new TransactionAlert( "FREQUENCY_ANOMALY", latestTransaction.getCardId(), @@ -292,7 +372,7 @@ public class AnomalyDetector { latestTransaction.getLatitude(), latestTransaction.getLongitude(), latestTransaction.getTimestamp(), - "Unusual transaction frequency detected: " + transactionList.size() + + "Unusual transaction frequency detected: " + transactionList.size() + " transactions in " + windowSizeMinutes + " minutes" )); } From 41abb200947e3ef59071bf573da720985a549ed6 Mon Sep 17 00:00:00 2001 From: kacperlo Date: Tue, 10 Jun 2025 15:49:43 +0200 Subject: [PATCH 10/12] tune up transaction generator --- .../generator/TransactionGenerator.java | 109 +++++++++++------- 1 file changed, 70 insertions(+), 39 deletions(-) diff --git a/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java b/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java index 1c05b7b4..5cdab5d7 100644 --- a/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java +++ b/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java @@ -8,63 +8,70 @@ import java.util.concurrent.ThreadLocalRandom; public class TransactionGenerator { private static final int NUM_CARDS = 10000; private static final int NUM_USERS = 5000; // Assuming each user has ~2 cards on average - private static final Map> cardLocations = new HashMap<>(); + private static final Map> cardLocations = new HashMap<>(); private static final Map cardLimits = new HashMap<>(); private static final Map cardToUser = new HashMap<>(); private static final List cardIds = new ArrayList<>(); private static final List userIds = new ArrayList<>(); - + // Anomaly types private static final int ANOMALY_NONE = 0; private static final int ANOMALY_AMOUNT = 1; private static final int ANOMALY_LOCATION = 2; private static final int ANOMALY_FREQUENCY = 3; - + // Probability of generating an anomaly (1%) private static final double ANOMALY_PROBABILITY = 0.5; - + // Initialize card and user data static { // Generate user IDs for (int i = 0; i < NUM_USERS; i++) { userIds.add("USER_" + String.format("%05d", i)); } - + // Generate card IDs and assign to users for (int i = 0; i < NUM_CARDS; i++) { String cardId = "CARD_" + String.format("%05d", i); cardIds.add(cardId); String userId = userIds.get(ThreadLocalRandom.current().nextInt(NUM_USERS)); cardToUser.put(cardId, userId); - + // Initialize empty set of locations for this card cardLocations.put(cardId, new HashSet<>()); - + // Assign random limit between $1,000 and $20,000 cardLimits.put(cardId, 1000.0 + ThreadLocalRandom.current().nextDouble() * 19000.0); } } - + public Transaction generateTransaction(boolean forceAnomaly, int anomalyType) { // Select a random card String cardId = cardIds.get(ThreadLocalRandom.current().nextInt(NUM_CARDS)); String userId = cardToUser.get(cardId); double availableLimit = cardLimits.get(cardId); - + // Determine if this should be an anomaly int actualAnomalyType = ANOMALY_NONE; if (forceAnomaly) { - actualAnomalyType = (anomalyType >= 0 && anomalyType <= 3) ? + actualAnomalyType = (anomalyType >= 0 && anomalyType <= 3) ? anomalyType : ThreadLocalRandom.current().nextInt(1, 4); } else if (ThreadLocalRandom.current().nextDouble() < ANOMALY_PROBABILITY) { - actualAnomalyType = ThreadLocalRandom.current().nextInt(1, 4); + double roll = ThreadLocalRandom.current().nextDouble(); + if (roll < 0.3) { + actualAnomalyType = ANOMALY_LOCATION; + } else if (roll < 0.8) { + actualAnomalyType = ANOMALY_AMOUNT; + } else { + actualAnomalyType = ANOMALY_FREQUENCY; + } } - + // Get or generate location - Double[] location = getLocationForCard(cardId, actualAnomalyType == ANOMALY_LOCATION); - double latitude = location[0]; - double longitude = location[1]; - + LocationPoint location = getLocationForCard(cardId, actualAnomalyType == ANOMALY_LOCATION); + double latitude = location.latitude; + double longitude = location.longitude; + // Generate transaction amount double amount; if (actualAnomalyType == ANOMALY_AMOUNT) { @@ -74,11 +81,11 @@ public class TransactionGenerator { // Normal amount (1-10% of available limit) amount = availableLimit * (0.01 + ThreadLocalRandom.current().nextDouble() * 0.09); } - + // Update available limit double newLimit = availableLimit - amount; cardLimits.put(cardId, newLimit > 0 ? newLimit : 0); - + // Create transaction Transaction transaction = new Transaction( cardId, @@ -89,56 +96,56 @@ public class TransactionGenerator { newLimit, Instant.now() ); - + return transaction; } - - private Double[] getLocationForCard(String cardId, boolean generateAnomaly) { - Set locations = cardLocations.get(cardId); - + + private LocationPoint getLocationForCard(String cardId, boolean generateAnomaly) { + Set locations = cardLocations.get(cardId); + if (locations.isEmpty() || generateAnomaly) { // Generate a random worldwide location double latitude = ThreadLocalRandom.current().nextDouble(-90, 90); double longitude = ThreadLocalRandom.current().nextDouble(-180, 180); - Double[] newLocation = {latitude, longitude}; - + LocationPoint newLocation = new LocationPoint(latitude, longitude); + // Store this location for future use unless it's an anomaly if (!generateAnomaly) { locations.add(newLocation); } - + return newLocation; } else { // Pick a random location from the card's history - Double[][] locArray = locations.toArray(new Double[0][]); + LocationPoint[] locArray = locations.toArray(new LocationPoint[0]); return locArray[ThreadLocalRandom.current().nextInt(locArray.length)]; } } - + // Method to simulate frequency anomaly by generating multiple transactions in short succession public List generateFrequencyAnomaly(String specificCardId) { List transactions = new ArrayList<>(); - String cardId = specificCardId != null ? + String cardId = specificCardId != null ? specificCardId : cardIds.get(ThreadLocalRandom.current().nextInt(NUM_CARDS)); - + // Generate 5-10 transactions in quick succession int numTransactions = ThreadLocalRandom.current().nextInt(5, 11); for (int i = 0; i < numTransactions; i++) { transactions.add(generateTransactionForCard(cardId, false, ANOMALY_NONE)); } - + return transactions; } - + private Transaction generateTransactionForCard(String cardId, boolean forceAnomaly, int anomalyType) { String userId = cardToUser.get(cardId); double availableLimit = cardLimits.get(cardId); - + // Get location - Double[] location = getLocationForCard(cardId, forceAnomaly && anomalyType == ANOMALY_LOCATION); - double latitude = location[0]; - double longitude = location[1]; - + LocationPoint location = getLocationForCard(cardId, forceAnomaly && anomalyType == ANOMALY_LOCATION); + double latitude = location.latitude; + double longitude = location.longitude; + // Generate amount double amount; if (forceAnomaly && anomalyType == ANOMALY_AMOUNT) { @@ -146,11 +153,11 @@ public class TransactionGenerator { } else { amount = availableLimit * (0.01 + ThreadLocalRandom.current().nextDouble() * 0.09); } - + // Update available limit double newLimit = availableLimit - amount; cardLimits.put(cardId, newLimit > 0 ? newLimit : 0); - + return new Transaction( cardId, userId, @@ -161,4 +168,28 @@ public class TransactionGenerator { Instant.now() ); } + + private static class LocationPoint { + final double latitude; + final double longitude; + + LocationPoint(double latitude, double longitude) { + this.latitude = latitude; + this.longitude = longitude; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LocationPoint that = (LocationPoint) o; + return Double.compare(that.latitude, latitude) == 0 && + Double.compare(that.longitude, longitude) == 0; + } + + @Override + public int hashCode() { + return Objects.hash(latitude, longitude); + } + } } From ff642d68f078c3b47c4e906cfa4859a4b2d3a851 Mon Sep 17 00:00:00 2001 From: kacperlo Date: Tue, 10 Jun 2025 15:49:50 +0200 Subject: [PATCH 11/12] Add windows scripts --- run_all_windows.bat | 38 ++++++++++++++++++++++++++++++++++++++ stop_all.bat | 30 ++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 run_all_windows.bat create mode 100644 stop_all.bat diff --git a/run_all_windows.bat b/run_all_windows.bat new file mode 100644 index 00000000..cd73e3f6 --- /dev/null +++ b/run_all_windows.bat @@ -0,0 +1,38 @@ +@echo off +setlocal enabledelayedexpansion + +REM Set working directory to script location +cd /d "%~dp0" +set "PROJECT_ROOT=%cd%" + +REM Check if Docker is running +docker info >nul 2>&1 +if errorlevel 1 ( + echo ERROR: Docker daemon is not running. + echo Please start Docker Desktop and try again. + pause + exit /b 1 +) + +echo Starting Docker containers... +docker-compose up -d + +echo Creating Kafka topics... +docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions +docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic alerts + +echo Starting all applications in new windows... + +echo Starting Anomaly Detector... +start "Anomaly Detector" cmd /k "cd /d %PROJECT_ROOT%\anomaly-detector && java --add-opens java.base/java.time=ALL-UNNAMED -jar target\anomaly-detector-1.0-SNAPSHOT.jar" +echo Starting Alert Visualizer... +start "Alert Visualizer" cmd /k "cd /d %PROJECT_ROOT%\alarm-visualizer && java --add-opens java.base/java.time=ALL-UNNAMED -jar target\alarm-visualizer-1.0-SNAPSHOT.jar" +echo Starting Transaction Consumer... +start "Transaction Consumer" cmd /k "cd /d %PROJECT_ROOT%\kafka-consumer-visualizer && java --add-opens java.base/java.time=ALL-UNNAMED -jar target\kafka-consumer-visualizer-1.0-SNAPSHOT.jar" +echo Starting Transaction Producer... +start "Transaction Producer" cmd /k "cd /d %PROJECT_ROOT%\transaction-simulator && java --add-opens java.base/java.time=ALL-UNNAMED -jar target\transaction-simulator-1.0-SNAPSHOT.jar" + +echo All applications are running! +echo To stop everything, close all opened windows and run: +echo docker-compose down +pause \ No newline at end of file diff --git a/stop_all.bat b/stop_all.bat new file mode 100644 index 00000000..07ae49c6 --- /dev/null +++ b/stop_all.bat @@ -0,0 +1,30 @@ +@echo off +REM filepath: d:\studia\semestr3\psd\projekt\psd_project\stop_all_windows.bat + +echo Stopping all Java applications... + +REM Stop Transaction Simulator +for /f "tokens=2" %%a in ('tasklist /FI "IMAGENAME eq java.exe" /v /fo list ^| findstr /I "transaction-simulator"') do ( + taskkill /PID %%a /F +) + +REM Stop Anomaly Detector +for /f "tokens=2" %%a in ('tasklist /FI "IMAGENAME eq java.exe" /v /fo list ^| findstr /I "anomaly-detector"') do ( + taskkill /PID %%a /F +) + +REM Stop Kafka Consumer Visualizer +for /f "tokens=2" %%a in ('tasklist /FI "IMAGENAME eq java.exe" /v /fo list ^| findstr /I "kafka-consumer-visualizer"') do ( + taskkill /PID %%a /F +) + +REM Stop Alarm Visualizer +for /f "tokens=2" %%a in ('tasklist /FI "IMAGENAME eq java.exe" /v /fo list ^| findstr /I "alarm-visualizer"') do ( + taskkill /PID %%a /F +) + +echo Stopping Docker containers... +docker-compose down + +echo All applications have been stopped! +pause \ No newline at end of file From 458850e355e44401ee450b65161ac276a383524d Mon Sep 17 00:00:00 2001 From: kacperlo Date: Thu, 12 Jun 2025 16:30:52 +0200 Subject: [PATCH 12/12] change probabilities and parameters --- .../main/java/com/anomaly/detector/AnomalyDetector.java | 8 ++++---- .../java/com/anomaly/generator/TransactionGenerator.java | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java index c7128331..be2df2ce 100644 --- a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java +++ b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java @@ -147,7 +147,7 @@ public class AnomalyDetector { // Check for anomalies (transactions that are more than 1.7 standard deviations from mean) for (Transaction transaction : transactionList) { - if (stdDeviation > 0 && Math.abs(transaction.getAmount() - averageAmount) > 2 * stdDeviation && transaction.getAmount() > averageAmount && transaction.getAmount() > 1000) { + if (stdDeviation > 0 && Math.abs(transaction.getAmount() - averageAmount) > 1.5 * stdDeviation && transaction.getAmount() > averageAmount && transaction.getAmount() > 1000) { out.collect(new TransactionAlert( "AMOUNT_ANOMALY", transaction.getCardId(), @@ -177,8 +177,8 @@ public class AnomalyDetector { private transient MapState> knownLocations; private static final int MAX_KNOWN_LOCATIONS = 5; // Limit known locations to avoid memory issues - private static final double ANOMALY_DISTANCE_THRESHOLD = 50.0; // Threshold in km - private static final int MIN_LOCATIONS_FOR_DETECTION = 3; // Minimum known locations before detecting anomalies + private static final double ANOMALY_DISTANCE_THRESHOLD = 10.0; // Threshold in km + private static final int MIN_LOCATIONS_FOR_DETECTION = 2; // Minimum known locations before detecting anomalies @Override public void open(Configuration parameters) throws Exception { @@ -359,7 +359,7 @@ public class AnomalyDetector { long windowSizeMinutes = (windowEnd - windowStart) / (1000 * 60); // If there are more than 5 transactions in 5 minutes for the same card, flag it - if (transactionList.size() > 5) { + if (transactionList.size() > 7) { Transaction latestTransaction = transactionList.stream() .max(Comparator.comparing(Transaction::getTimestamp)) .orElse(transactionList.get(0)); diff --git a/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java b/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java index 5cdab5d7..fa2950bd 100644 --- a/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java +++ b/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java @@ -21,7 +21,7 @@ public class TransactionGenerator { private static final int ANOMALY_FREQUENCY = 3; // Probability of generating an anomaly (1%) - private static final double ANOMALY_PROBABILITY = 0.5; + private static final double ANOMALY_PROBABILITY = 0.05; // Initialize card and user data static { @@ -58,9 +58,9 @@ public class TransactionGenerator { anomalyType : ThreadLocalRandom.current().nextInt(1, 4); } else if (ThreadLocalRandom.current().nextDouble() < ANOMALY_PROBABILITY) { double roll = ThreadLocalRandom.current().nextDouble(); - if (roll < 0.3) { + if (roll < 0.4) { actualAnomalyType = ANOMALY_LOCATION; - } else if (roll < 0.8) { + } else if (roll < 0.7) { actualAnomalyType = ANOMALY_AMOUNT; } else { actualAnomalyType = ANOMALY_FREQUENCY;