diff --git a/Programming/psd_project/.gitignore b/Programming/psd_project/.gitignore
new file mode 100644
index 00000000..749d0d2a
--- /dev/null
+++ b/Programming/psd_project/.gitignore
@@ -0,0 +1,129 @@
+target/
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+pom.xml.next
+release.properties
+dependency-reduced-pom.xml
+buildNumber.properties
+.mvn/timing.properties
+# https://github.com/takari/maven-wrapper#usage-without-binary-jar
+.mvn/wrapper/maven-wrapper.jar
+
+# Eclipse m2e generated files
+# Eclipse Core
+.project
+# JDT-specific (Eclipse Java Development Tools)
+.classpath
+# Docker project generated files to ignore
+# if you want to ignore files created by your editor/tools,
+# please consider a global .gitignore https://help.github.com/articles/ignoring-files
+.vagrant*
+bin
+docker/docker
+.*.swp
+a.out
+*.orig
+build_src
+.flymake*
+.idea
+.DS_Store
+docs/_build
+docs/_static
+docs/_templates
+.gopath/
+.dotcloud
+*.test
+bundles/
+.hg/
+.git/
+vendor/pkg/
+pyenv
+Vagrantfile
+dist
+*classes
+*.class
+target/
+build/
+build_eclipse/
+out/
+.gradle/
+.vscode/
+lib_managed/
+src_managed/
+project/boot/
+project/plugins/project/
+patch-process/*
+.idea
+.svn
+.classpath
+/.metadata
+/.recommenders
+*~
+*#
+.#*
+rat.out
+TAGS
+*.iml
+.project
+.settings
+*.ipr
+*.iws
+.vagrant
+Vagrantfile.local
+/logs
+.DS_Store
+
+config/server-*
+config/zookeeper-*
+gradle/wrapper/*.jar
+gradlew.bat
+
+results
+tests/results
+.ducktape
+tests/.ducktape
+tests/venv
+.cache
+
+docs/generated/
+
+.release-settings.json
+
+kafkatest.egg-info/
+systest/
+*.swp
+jmh-benchmarks/generated
+jmh-benchmarks/src/main/generated
+**/.jqwik-database
+**/src/generated
+**/src/generated-test
+storage/kafka-tiered-storage/
+
+docker/test/report_*.html
+kafka.Kafka
+__pycache__
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+replay_pid*
\ No newline at end of file
diff --git a/Programming/psd_project/alarm-visualizer/pom.xml b/Programming/psd_project/alarm-visualizer/pom.xml
new file mode 100644
index 00000000..41f49433
--- /dev/null
+++ b/Programming/psd_project/alarm-visualizer/pom.xml
@@ -0,0 +1,65 @@
+
+
+ 4.0.0
+
+ com.anomaly
+ alarm-visualizer
+ 1.0-SNAPSHOT
+
+
+ 11
+
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 2.8.1
+
+
+
+ com.google.code.gson
+ gson
+ 2.8.9
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.32
+
+
+ ch.qos.logback
+ logback-classic
+ 1.2.9
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+ package
+
+ shade
+
+
+
+
+ com.anomaly.visualizer.AlertVisualizer
+
+
+
+
+
+
+
+
+
diff --git a/Programming/psd_project/alarm-visualizer/src/main/java/com/anomaly/model/TransactionAlert.java b/Programming/psd_project/alarm-visualizer/src/main/java/com/anomaly/model/TransactionAlert.java
new file mode 100644
index 00000000..d115154f
--- /dev/null
+++ b/Programming/psd_project/alarm-visualizer/src/main/java/com/anomaly/model/TransactionAlert.java
@@ -0,0 +1,106 @@
+package com.anomaly.model;
+
+import java.time.Instant;
+
+public class TransactionAlert {
+ private String alertType;
+ private Instant alertTime;
+ private Instant timestamp;
+ private String cardId;
+ private String userId;
+ private double amount;
+ private double latitude;
+ private double longitude;
+ private String message;
+
+ // Default constructor for Gson deserialization
+ public TransactionAlert() {
+ }
+
+ public TransactionAlert(String alertType, Instant alertTime, Instant timestamp,
+ String cardId, String userId, double amount,
+ double latitude, double longitude, String message) {
+ this.alertType = alertType;
+ this.alertTime = alertTime;
+ this.timestamp = timestamp;
+ this.cardId = cardId;
+ this.userId = userId;
+ this.amount = amount;
+ this.latitude = latitude;
+ this.longitude = longitude;
+ this.message = message;
+ }
+
+ // Getters and setters
+ public String getAlertType() {
+ return alertType;
+ }
+
+ public void setAlertType(String alertType) {
+ this.alertType = alertType;
+ }
+
+ public Instant getAlertTime() {
+ return alertTime;
+ }
+
+ public void setAlertTime(Instant alertTime) {
+ this.alertTime = alertTime;
+ }
+
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Instant timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getCardId() {
+ return cardId;
+ }
+
+ public void setCardId(String cardId) {
+ this.cardId = cardId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public double getAmount() {
+ return amount;
+ }
+
+ public void setAmount(double amount) {
+ this.amount = amount;
+ }
+
+ public double getLatitude() {
+ return latitude;
+ }
+
+ public void setLatitude(double latitude) {
+ this.latitude = latitude;
+ }
+
+ public double getLongitude() {
+ return longitude;
+ }
+
+ public void setLongitude(double longitude) {
+ this.longitude = longitude;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+}
diff --git a/Programming/psd_project/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java b/Programming/psd_project/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java
new file mode 100644
index 00000000..c27be437
--- /dev/null
+++ b/Programming/psd_project/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java
@@ -0,0 +1,304 @@
+package com.anomaly.visualizer;
+
+import com.anomaly.model.TransactionAlert;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.serialization.StringDeserializer;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import javax.swing.*;
+import javax.swing.table.DefaultTableModel;
+import java.awt.*;
+import java.lang.reflect.Type;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+import java.util.List;
+
+public class AlertVisualizer {
+ //private static final Logger logger = LoggerFactory.getLogger(AlertVisualizer.class);
+ private static final String BOOTSTRAP_SERVERS = "localhost:9092";
+ private static final String GROUP_ID = "alert-visualizer-group";
+ private static final String TOPIC = "alerts";
+
+ // Custom Gson instance with Instant type adapter
+ private static final Gson gson = new GsonBuilder()
+ .registerTypeAdapter(Instant.class, new InstantDeserializer())
+ .create();
+
+ // UI Components
+ private static JFrame frame;
+ private static JTable alertTable;
+ private static DefaultTableModel tableModel;
+ private static JTextArea detailArea;
+ private static final List allAlerts = new ArrayList<>();
+ private static final DateTimeFormatter formatter =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
+
+ // Custom deserializer for Instant
+ private static class InstantDeserializer implements JsonDeserializer {
+ @Override
+ public Instant deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
+ throws JsonParseException {
+ try {
+ // Try parsing as long (epoch milliseconds)
+ return Instant.ofEpochMilli(json.getAsLong());
+ } catch (NumberFormatException e) {
+ // Try parsing as ISO-8601 string
+ return Instant.parse(json.getAsString());
+ }
+ }
+ }
+
+ public static void main(String[] args) {
+ setupUI();
+
+ // Create consumer properties
+ Properties properties = new Properties();
+ properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ // Create consumer
+ KafkaConsumer consumer = new KafkaConsumer<>(properties);
+
+ // Subscribe to topic
+ consumer.subscribe(Collections.singletonList(TOPIC));
+
+ // Add shutdown hook
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ //logger.info("Shutting down alert visualizer...");
+ consumer.close();
+ //logger.info("Alert visualizer closed");
+ }));
+
+ // Poll for new alerts
+ try {
+ while (true) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+
+ boolean newAlerts = false;
+ for (ConsumerRecord record : records) {
+ // Parse the alert
+ TransactionAlert alert = gson.fromJson(record.value(), TransactionAlert.class);
+ addAlert(alert);
+ newAlerts = true;
+
+ // Display notification for new alert
+ displayNotification(alert);
+ }
+
+ // Update the UI if new alerts arrived
+ if (newAlerts) {
+ updateAlertTable();
+ }
+ }
+ } finally {
+ consumer.close();
+ }
+ }
+
+ private static void setupUI() {
+ frame = new JFrame("Transaction Alert Visualizer");
+ frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
+ frame.setSize(1024, 768);
+
+ // Create table with columns
+ String[] columnNames = {"Time", "Alert Type", "Card ID", "User ID", "Amount", "Message"};
+ tableModel = new DefaultTableModel(columnNames, 0);
+ alertTable = new JTable(tableModel);
+ alertTable.setSelectionMode(ListSelectionModel.SINGLE_SELECTION);
+
+ // Add selection listener to show details when an alert is selected
+ alertTable.getSelectionModel().addListSelectionListener(e -> {
+ if (!e.getValueIsAdjusting()) {
+ int selectedRow = alertTable.getSelectedRow();
+ if (selectedRow >= 0 && selectedRow < allAlerts.size()) {
+ showAlertDetails(allAlerts.get(selectedRow));
+ }
+ }
+ });
+
+ JScrollPane tableScrollPane = new JScrollPane(alertTable);
+
+ // Create detail panel
+ detailArea = new JTextArea();
+ detailArea.setEditable(false);
+ JScrollPane detailScrollPane = new JScrollPane(detailArea);
+
+ // Create map visualization panel
+ JPanel mapPanel = new JPanel() {
+ @Override
+ protected void paintComponent(Graphics g) {
+ super.paintComponent(g);
+ drawMap(g);
+ }
+ };
+
+ // Create split panes for layout
+ JSplitPane mainSplitPane = new JSplitPane(JSplitPane.VERTICAL_SPLIT,
+ tableScrollPane, new JSplitPane(JSplitPane.HORIZONTAL_SPLIT, detailScrollPane, mapPanel));
+ mainSplitPane.setDividerLocation(300);
+ ((JSplitPane)mainSplitPane.getBottomComponent()).setDividerLocation(500);
+
+ frame.add(mainSplitPane);
+ frame.setVisible(true);
+ }
+
+ private static void addAlert(TransactionAlert alert) {
+ synchronized (allAlerts) {
+ allAlerts.add(0, alert); // Add at the beginning for newest first
+ }
+ }
+
+ private static void updateAlertTable() {
+ SwingUtilities.invokeLater(() -> {
+ tableModel.setRowCount(0); // Clear table
+
+ synchronized (allAlerts) {
+ for (TransactionAlert alert : allAlerts) {
+ // Add null check for alert time
+ String formattedTime = alert.getAlertTime() != null ?
+ formatter.format(alert.getAlertTime()) : "N/A";
+ tableModel.addRow(new Object[]{
+ formattedTime,
+ alert.getAlertType(),
+ alert.getCardId(),
+ alert.getUserId(),
+ String.format("$%.2f", alert.getAmount()),
+ alert.getMessage()
+ });
+ }
+ }
+ });
+ }
+
+ private static void showAlertDetails(TransactionAlert alert) {
+ SwingUtilities.invokeLater(() -> {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ALERT DETAILS\n");
+ sb.append("============================================\n\n");
+ sb.append("Alert Type: ").append(alert.getAlertType()).append("\n");
+ sb.append("Alert Time: ").append(alert.getAlertTime() != null ?
+ formatter.format(alert.getAlertTime()) : "N/A").append("\n");
+ sb.append("Transaction Time: ").append(alert.getTimestamp() != null ?
+ formatter.format(alert.getTimestamp()) : "N/A").append("\n\n");
+ sb.append("Card ID: ").append(alert.getCardId()).append("\n");
+ sb.append("User ID: ").append(alert.getUserId()).append("\n");
+ sb.append("Transaction Amount: $").append(String.format("%.2f", alert.getAmount())).append("\n\n");
+ sb.append("Location: ").append(alert.getLatitude()).append(", ").append(alert.getLongitude()).append("\n\n");
+ sb.append("Alert Message: ").append(alert.getMessage()).append("\n");
+
+ detailArea.setText(sb.toString());
+ frame.repaint(); // Trigger map redraw to highlight selected alert
+ });
+ }
+
+ private static void drawMap(Graphics g) {
+ Graphics2D g2d = (Graphics2D) g;
+ int width = g2d.getClipBounds().width;
+ int height = g2d.getClipBounds().height;
+
+ // Draw world map (simplified)
+ g2d.setColor(Color.LIGHT_GRAY);
+ g2d.fillRect(0, 0, width, height);
+ g2d.setColor(Color.DARK_GRAY);
+ g2d.drawRect(0, 0, width - 1, height - 1);
+
+ // Draw grid lines
+ g2d.setColor(Color.GRAY);
+ for (int i = 0; i < width; i += 50) {
+ g2d.drawLine(i, 0, i, height);
+ }
+ for (int i = 0; i < height; i += 50) {
+ g2d.drawLine(0, i, width, i);
+ }
+
+ // Get selected alert
+ int selectedRow = alertTable.getSelectedRow();
+ TransactionAlert selectedAlert = null;
+ if (selectedRow >= 0 && selectedRow < allAlerts.size()) {
+ selectedAlert = allAlerts.get(selectedRow);
+ }
+
+ // Draw alert points
+ synchronized (allAlerts) {
+ for (TransactionAlert alert : allAlerts) {
+ // Map lat/lon to screen coordinates
+ int x = (int) ((alert.getLongitude() + 180) / 360 * width);
+ int y = (int) ((90 - alert.getLatitude()) / 180 * height);
+
+ // Color by alert type
+ if (alert.getAlertType().equals("AMOUNT_ANOMALY")) {
+ g2d.setColor(Color.RED);
+ } else if (alert.getAlertType().equals("LOCATION_ANOMALY")) {
+ g2d.setColor(Color.BLUE);
+ } else {
+ g2d.setColor(Color.ORANGE);
+ }
+
+ // Make selected alert larger
+ if (alert == selectedAlert) {
+ g2d.fillOval(x - 8, y - 8, 16, 16);
+ } else {
+ g2d.fillOval(x - 4, y - 4, 8, 8);
+ }
+ }
+ }
+
+ // Draw legend
+ g2d.setColor(Color.BLACK);
+ g2d.drawString("Legend:", width - 150, 20);
+ g2d.setColor(Color.RED);
+ g2d.fillOval(width - 140, 30, 10, 10);
+ g2d.setColor(Color.BLACK);
+ g2d.drawString("Amount Anomaly", width - 125, 40);
+ g2d.setColor(Color.BLUE);
+ g2d.fillOval(width - 140, 50, 10, 10);
+ g2d.setColor(Color.BLACK);
+ g2d.drawString("Location Anomaly", width - 125, 60);
+ g2d.setColor(Color.ORANGE);
+ g2d.fillOval(width - 140, 70, 10, 10);
+ g2d.setColor(Color.BLACK);
+ g2d.drawString("Frequency Anomaly", width - 125, 80);
+ }
+
+ private static void displayNotification(TransactionAlert alert) {
+ SwingUtilities.invokeLater(() -> {
+ // Make a simple notification window
+ JDialog dialog = new JDialog(frame, "New Alert!", false);
+ dialog.setSize(400, 150);
+ dialog.setLocationRelativeTo(frame);
+
+ JPanel panel = new JPanel(new BorderLayout());
+ JLabel label = new JLabel("" + alert.getAlertType() + "
" +
+ alert.getMessage() + "
Card: " + alert.getCardId() + "");
+ label.setBorder(BorderFactory.createEmptyBorder(10, 10, 10, 10));
+ panel.add(label, BorderLayout.CENTER);
+
+ JButton closeButton = new JButton("Dismiss");
+ closeButton.addActionListener(e -> dialog.dispose());
+ JPanel buttonPanel = new JPanel();
+ buttonPanel.add(closeButton);
+ panel.add(buttonPanel, BorderLayout.SOUTH);
+
+ dialog.add(panel);
+ dialog.setVisible(true);
+
+ // Auto-dismiss after 5 seconds
+ javax.swing.Timer timer = new javax.swing.Timer(5000, e -> dialog.dispose());
+ timer.setRepeats(false);
+ timer.start();
+ });
+ }
+}
diff --git a/Programming/psd_project/alarm-visualizer/src/main/resources/logback.xml b/Programming/psd_project/alarm-visualizer/src/main/resources/logback.xml
new file mode 100644
index 00000000..573bb1db
--- /dev/null
+++ b/Programming/psd_project/alarm-visualizer/src/main/resources/logback.xml
@@ -0,0 +1,17 @@
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Programming/psd_project/anomaly-detector/pom.xml b/Programming/psd_project/anomaly-detector/pom.xml
new file mode 100644
index 00000000..d113bad1
--- /dev/null
+++ b/Programming/psd_project/anomaly-detector/pom.xml
@@ -0,0 +1,80 @@
+
+
+ 4.0.0
+
+ com.anomaly
+ anomaly-detector
+ 1.0-SNAPSHOT
+
+
+ 11
+ 1.14.2
+ 2.12
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-connector-kafka_${scala.binary.version}
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-clients_${scala.binary.version}
+ ${flink.version}
+
+
+
+ com.google.code.gson
+ gson
+ 2.8.9
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.32
+
+
+ ch.qos.logback
+ logback-classic
+ 1.2.9
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+ package
+
+ shade
+
+
+
+
+ com.anomaly.detector.AnomalyDetector
+
+
+
+
+
+
+
+
+
+
diff --git a/Programming/psd_project/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java b/Programming/psd_project/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java
new file mode 100644
index 00000000..be2df2ce
--- /dev/null
+++ b/Programming/psd_project/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java
@@ -0,0 +1,381 @@
+package com.anomaly.detector;
+
+import com.anomaly.model.Transaction;
+import com.anomaly.model.TransactionAlert;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.*;
+import java.time.Instant;
+import java.io.IOException;
+import java.io.Serializable;
+
+public class AnomalyDetector {
+
+ private static final String INPUT_TOPIC = "transactions";
+ private static final String OUTPUT_TOPIC = "alerts";
+ private static final String BOOTSTRAP_SERVERS = "localhost:9092";
+
+ // Replace the simple Gson initialization with a configured one
+ private static final Gson gson = new GsonBuilder()
+ .registerTypeAdapter(Instant.class, new InstantTypeAdapter())
+ .create();
+
+ // Add a custom TypeAdapter for Instant
+ private static class InstantTypeAdapter extends TypeAdapter {
+ @Override
+ public void write(JsonWriter out, Instant value) throws IOException {
+ if (value == null) {
+ out.nullValue();
+ } else {
+ out.value(value.toString());
+ }
+ }
+
+ @Override
+ public Instant read(JsonReader in) throws IOException {
+ return Instant.parse(in.nextString());
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ // Set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // Create Kafka source to replace deprecated FlinkKafkaConsumer
+ KafkaSource kafkaSource = KafkaSource.builder()
+ .setBootstrapServers(BOOTSTRAP_SERVERS)
+ .setTopics(INPUT_TOPIC)
+ .setGroupId("anomaly-detector")
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setValueOnlyDeserializer(new SimpleStringSchema())
+ .build();
+
+ // Parse JSON transactions
+ DataStream transactionStream = env
+ .fromSource(kafkaSource, org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(), "Kafka Source")
+ .map(new MapFunction() {
+ @Override
+ public Transaction map(String value) throws Exception {
+ return gson.fromJson(value, Transaction.class);
+ }
+ });
+
+ // Detect anomalies based on different metrics
+ // 1. Amount anomaly - sudden high-value transactions
+ DataStream amountAlerts = transactionStream
+ .keyBy(Transaction::getCardId)
+ .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1)))
+ .process(new AmountAnomalyDetector());
+
+ // 2. Location anomaly - sudden change in location
+ DataStream locationAlerts = transactionStream
+ .keyBy(Transaction::getCardId)
+ .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1)))
+ .process(new LocationAnomalyDetector());
+
+ // 3. Frequency anomaly - unusual number of transactions in short time
+ DataStream frequencyAlerts = transactionStream
+ .keyBy(Transaction::getCardId)
+ .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1)))
+ .process(new FrequencyAnomalyDetector());
+
+ // Union all alert streams
+ DataStream allAlerts = amountAlerts
+ .union(locationAlerts, frequencyAlerts);
+
+ // Create KafkaSink to replace deprecated FlinkKafkaProducer
+ KafkaSink kafkaSink = KafkaSink.builder()
+ .setBootstrapServers(BOOTSTRAP_SERVERS)
+ .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+ .setTopic(OUTPUT_TOPIC)
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .build())
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build();
+
+ // Convert alerts to JSON and send to Kafka
+ allAlerts
+ .map(alert -> gson.toJson(alert))
+ .sinkTo(kafkaSink);
+
+ // Execute the Flink job
+ env.execute("Credit Card Transaction Anomaly Detection");
+ }
+
+ // Detector for unusual transaction amounts
+ public static class AmountAnomalyDetector
+ extends ProcessWindowFunction {
+
+ @Override
+ public void process(String cardId, Context context, Iterable transactions,
+ Collector out) {
+ List transactionList = new ArrayList<>();
+ transactions.forEach(transactionList::add);
+
+ if (transactionList.isEmpty()) return;
+
+ // Calculate statistics
+ double averageAmount = transactionList.stream()
+ .mapToDouble(Transaction::getAmount)
+ .average()
+ .orElse(0);
+
+ double stdDeviation = calculateStdDeviation(transactionList, averageAmount);
+
+ // Check for anomalies (transactions that are more than 1.7 standard deviations from mean)
+ for (Transaction transaction : transactionList) {
+ if (stdDeviation > 0 && Math.abs(transaction.getAmount() - averageAmount) > 1.5 * stdDeviation && transaction.getAmount() > averageAmount && transaction.getAmount() > 1000) {
+ out.collect(new TransactionAlert(
+ "AMOUNT_ANOMALY",
+ transaction.getCardId(),
+ transaction.getUserId(),
+ transaction.getAmount(),
+ transaction.getLatitude(),
+ transaction.getLongitude(),
+ transaction.getTimestamp(),
+ "Unusual transaction amount detected: $" + transaction.getAmount() +
+ " (Average: $" + String.format("%.2f", averageAmount) + ")"
+ ));
+ }
+ }
+ }
+
+ private double calculateStdDeviation(List transactions, double mean) {
+ return Math.sqrt(transactions.stream()
+ .mapToDouble(t -> Math.pow(t.getAmount() - mean, 2))
+ .average()
+ .orElse(0));
+ }
+ }
+
+ // Detector for unusual transaction locations
+ public static class LocationAnomalyDetector
+ extends ProcessWindowFunction {
+
+ private transient MapState> knownLocations;
+ private static final int MAX_KNOWN_LOCATIONS = 5; // Limit known locations to avoid memory issues
+ private static final double ANOMALY_DISTANCE_THRESHOLD = 10.0; // Threshold in km
+ private static final int MIN_LOCATIONS_FOR_DETECTION = 2; // Minimum known locations before detecting anomalies
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ MapStateDescriptor> descriptor =
+ new MapStateDescriptor<>(
+ "knownLocations",
+ TypeInformation.of(String.class),
+ TypeInformation.of(new TypeHint>() {})
+ );
+ knownLocations = getRuntimeContext().getMapState(descriptor);
+ }
+
+ @Override
+ public void process(String cardId, Context context, Iterable transactions,
+ Collector out) throws Exception {
+ List transactionList = new ArrayList<>();
+ transactions.forEach(transactionList::add);
+
+ if (transactionList.isEmpty()) return;
+
+ // Get or create location set for this card
+ Set cardKnownLocations;
+ if (knownLocations.contains(cardId)) {
+ cardKnownLocations = knownLocations.get(cardId);
+ System.out.println("Card " + cardId + " has " + cardKnownLocations.size() + " known locations");
+ } else {
+ cardKnownLocations = new HashSet<>();
+ System.out.println("New card detected: " + cardId + ", initializing known locations");
+ }
+
+ // Process each transaction
+ for (Transaction transaction : transactionList) {
+ LocationPoint currentPoint = new LocationPoint(transaction.getLatitude(), transaction.getLongitude());
+
+ // First few transactions establish the baseline locations
+ if (cardKnownLocations.size() < MIN_LOCATIONS_FOR_DETECTION) {
+ System.out.println("Building baseline for card " + cardId + ", adding location #" +
+ (cardKnownLocations.size() + 1) + " to known locations");
+
+ // Check if this location is already very close to a known location before adding
+ boolean isVeryCloseToKnown = false;
+ for (LocationPoint knownPoint : cardKnownLocations) {
+ if (calculateDistance(currentPoint, knownPoint) < 2.0) { // Within 2km = same area
+ isVeryCloseToKnown = true;
+ System.out.println("Location is very close to existing baseline location, not adding duplicate");
+ break;
+ }
+ }
+
+ // Only add distinct baseline locations
+ if (!isVeryCloseToKnown) {
+ cardKnownLocations.add(currentPoint);
+ }
+
+ // We're still building the baseline, don't check for anomalies yet
+ continue;
+ }
+
+ // Check distance to known locations
+ double closestDistance = Double.MAX_VALUE;
+ LocationPoint closestPoint = null;
+
+ for (LocationPoint knownPoint : cardKnownLocations) {
+ double distance = calculateDistance(currentPoint, knownPoint);
+ if (distance < closestDistance) {
+ closestDistance = distance;
+ closestPoint = knownPoint;
+ }
+ }
+
+ System.out.println("CARD " + cardId + ": Transaction at " + currentPoint + ", closest known location: " +
+ closestPoint + " (" + String.format("%.2f", closestDistance) + " km)");
+
+ // Detect anomaly if transaction is far from all known locations
+ if (closestDistance > ANOMALY_DISTANCE_THRESHOLD) {
+ System.out.println("⚠️ LOCATION ANOMALY DETECTED: Distance " +
+ String.format("%.2f", closestDistance) + "km exceeds threshold of " +
+ ANOMALY_DISTANCE_THRESHOLD + "km");
+
+ out.collect(new TransactionAlert(
+ "LOCATION_ANOMALY",
+ transaction.getCardId(),
+ transaction.getUserId(),
+ transaction.getAmount(),
+ transaction.getLatitude(),
+ transaction.getLongitude(),
+ transaction.getTimestamp(),
+ "Unusual transaction location: " + String.format("%.2f", closestDistance) +
+ "km from nearest known location"
+ ));
+
+ // Don't automatically add anomalous locations to known locations
+ } else {
+ // Check if this location is already very close to a known location
+ boolean isVeryCloseToKnown = false;
+ for (LocationPoint knownPoint : cardKnownLocations) {
+ if (calculateDistance(currentPoint, knownPoint) < 2.0) { // Within 2km = same area
+ isVeryCloseToKnown = true;
+ break;
+ }
+ }
+
+ // Only add distinct new locations, up to our maximum
+ if (!isVeryCloseToKnown && cardKnownLocations.size() < MAX_KNOWN_LOCATIONS) {
+ cardKnownLocations.add(currentPoint);
+ System.out.println("Added new location to known locations: " + currentPoint);
+ }
+ }
+ }
+
+ // Update the state
+ knownLocations.put(cardId, cardKnownLocations);
+ }
+
+ // Calculate distance between two points using Haversine formula (in km)
+ private double calculateDistance(LocationPoint p1, LocationPoint p2) {
+ final int R = 6371; // Earth radius in km
+
+ double latDistance = Math.toRadians(p2.latitude - p1.latitude);
+ double lonDistance = Math.toRadians(p2.longitude - p1.longitude);
+
+ double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2)
+ + Math.cos(Math.toRadians(p1.latitude)) * Math.cos(Math.toRadians(p2.latitude))
+ * Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2);
+
+ double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
+
+ return R * c;
+ }
+
+ private static class LocationPoint implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final double latitude;
+ private final double longitude;
+
+ public LocationPoint(double latitude, double longitude) {
+ this.latitude = latitude;
+ this.longitude = longitude;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ LocationPoint that = (LocationPoint) o;
+ return Double.compare(that.latitude, latitude) == 0 &&
+ Double.compare(that.longitude, longitude) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(latitude, longitude);
+ }
+
+ @Override
+ public String toString() {
+ return "LocationPoint{" +
+ "lat=" + latitude +
+ ", lon=" + longitude +
+ '}';
+ }
+ }
+ }
+
+ // Detector for unusual transaction frequency
+ public static class FrequencyAnomalyDetector
+ extends ProcessWindowFunction {
+
+ @Override
+ public void process(String cardId, Context context, Iterable transactions,
+ Collector out) {
+ List transactionList = new ArrayList<>();
+ transactions.forEach(transactionList::add);
+
+ // Get window info
+ long windowStart = context.window().getStart();
+ long windowEnd = context.window().getEnd();
+ long windowSizeMinutes = (windowEnd - windowStart) / (1000 * 60);
+
+ // If there are more than 5 transactions in 5 minutes for the same card, flag it
+ if (transactionList.size() > 7) {
+ Transaction latestTransaction = transactionList.stream()
+ .max(Comparator.comparing(Transaction::getTimestamp))
+ .orElse(transactionList.get(0));
+
+ out.collect(new TransactionAlert(
+ "FREQUENCY_ANOMALY",
+ latestTransaction.getCardId(),
+ latestTransaction.getUserId(),
+ latestTransaction.getAmount(),
+ latestTransaction.getLatitude(),
+ latestTransaction.getLongitude(),
+ latestTransaction.getTimestamp(),
+ "Unusual transaction frequency detected: " + transactionList.size() +
+ " transactions in " + windowSizeMinutes + " minutes"
+ ));
+ }
+ }
+ }
+}
diff --git a/Programming/psd_project/anomaly-detector/src/main/java/com/anomaly/model/Transaction.java b/Programming/psd_project/anomaly-detector/src/main/java/com/anomaly/model/Transaction.java
new file mode 100644
index 00000000..5ced0318
--- /dev/null
+++ b/Programming/psd_project/anomaly-detector/src/main/java/com/anomaly/model/Transaction.java
@@ -0,0 +1,90 @@
+package com.anomaly.model;
+
+import java.time.Instant;
+
+/**
+ * Represents a financial transaction with location data.
+ */
+public class Transaction {
+ private String cardId;
+ private String userId;
+ private double amount;
+ private double latitude;
+ private double longitude;
+ private Instant timestamp;
+
+ // Default constructor for deserialization
+ public Transaction() {
+ }
+
+ public Transaction(String cardId, String userId, double amount,
+ double latitude, double longitude, Instant timestamp) {
+ this.cardId = cardId;
+ this.userId = userId;
+ this.amount = amount;
+ this.latitude = latitude;
+ this.longitude = longitude;
+ this.timestamp = timestamp;
+ }
+
+ // Getters and setters
+ public String getCardId() {
+ return cardId;
+ }
+
+ public void setCardId(String cardId) {
+ this.cardId = cardId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public double getAmount() {
+ return amount;
+ }
+
+ public void setAmount(double amount) {
+ this.amount = amount;
+ }
+
+ public double getLatitude() {
+ return latitude;
+ }
+
+ public void setLatitude(double latitude) {
+ this.latitude = latitude;
+ }
+
+ public double getLongitude() {
+ return longitude;
+ }
+
+ public void setLongitude(double longitude) {
+ this.longitude = longitude;
+ }
+
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Instant timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "Transaction{" +
+ "cardId='" + cardId + '\'' +
+ ", userId='" + userId + '\'' +
+ ", amount=" + amount +
+ ", latitude=" + latitude +
+ ", longitude=" + longitude +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+}
diff --git a/Programming/psd_project/anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java b/Programming/psd_project/anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java
new file mode 100644
index 00000000..8c8131cd
--- /dev/null
+++ b/Programming/psd_project/anomaly-detector/src/main/java/com/anomaly/model/TransactionAlert.java
@@ -0,0 +1,112 @@
+package com.anomaly.model;
+
+import java.time.Instant;
+
+/**
+ * Represents an alert generated when an anomaly is detected in a transaction.
+ */
+public class TransactionAlert {
+ private String alertType;
+ private String cardId;
+ private String userId;
+ private double amount;
+ private double latitude;
+ private double longitude;
+ private Instant timestamp;
+ private String message;
+
+ public TransactionAlert() {
+ }
+
+ public TransactionAlert(String alertType, String cardId, String userId,
+ double amount, double latitude, double longitude,
+ Instant timestamp, String message) {
+ this.alertType = alertType;
+ this.cardId = cardId;
+ this.userId = userId;
+ this.amount = amount;
+ this.latitude = latitude;
+ this.longitude = longitude;
+ this.timestamp = timestamp;
+ this.message = message;
+ }
+
+ // Getters and setters
+ public String getAlertType() {
+ return alertType;
+ }
+
+ public void setAlertType(String alertType) {
+ this.alertType = alertType;
+ }
+
+ public String getCardId() {
+ return cardId;
+ }
+
+ public void setCardId(String cardId) {
+ this.cardId = cardId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public double getAmount() {
+ return amount;
+ }
+
+ public void setAmount(double amount) {
+ this.amount = amount;
+ }
+
+ public double getLatitude() {
+ return latitude;
+ }
+
+ public void setLatitude(double latitude) {
+ this.latitude = latitude;
+ }
+
+ public double getLongitude() {
+ return longitude;
+ }
+
+ public void setLongitude(double longitude) {
+ this.longitude = longitude;
+ }
+
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Instant timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public String toString() {
+ return "TransactionAlert{" +
+ "alertType='" + alertType + '\'' +
+ ", cardId='" + cardId + '\'' +
+ ", userId='" + userId + '\'' +
+ ", amount=" + amount +
+ ", latitude=" + latitude +
+ ", longitude=" + longitude +
+ ", timestamp=" + timestamp +
+ ", message='" + message + '\'' +
+ '}';
+ }
+}
diff --git a/Programming/psd_project/anomaly-detector/src/main/resources/logback.xml b/Programming/psd_project/anomaly-detector/src/main/resources/logback.xml
new file mode 100644
index 00000000..85b380d9
--- /dev/null
+++ b/Programming/psd_project/anomaly-detector/src/main/resources/logback.xml
@@ -0,0 +1,20 @@
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Programming/psd_project/docker-compose.yml b/Programming/psd_project/docker-compose.yml
new file mode 100644
index 00000000..3a3dce99
--- /dev/null
+++ b/Programming/psd_project/docker-compose.yml
@@ -0,0 +1,39 @@
+version: '3'
+
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:latest
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ports:
+ - "2181:2181"
+
+ kafka:
+ image: confluentinc/cp-kafka:latest
+ depends_on:
+ - zookeeper
+ ports:
+ - "9092:9092"
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+
+ jobmanager:
+ image: flink:latest
+ ports:
+ - "8081:8081"
+ command: jobmanager
+ environment:
+ - JOB_MANAGER_RPC_ADDRESS=jobmanager
+
+ taskmanager:
+ image: flink:latest
+ depends_on:
+ - jobmanager
+ command: taskmanager
+ environment:
+ - JOB_MANAGER_RPC_ADDRESS=jobmanager
\ No newline at end of file
diff --git a/Programming/psd_project/kafka-consumer-visualizer/pom.xml b/Programming/psd_project/kafka-consumer-visualizer/pom.xml
new file mode 100644
index 00000000..f4098780
--- /dev/null
+++ b/Programming/psd_project/kafka-consumer-visualizer/pom.xml
@@ -0,0 +1,65 @@
+
+
+ 4.0.0
+
+ com.anomaly
+ kafka-consumer-visualizer
+ 1.0-SNAPSHOT
+
+
+ 11
+
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 2.8.1
+
+
+
+ com.google.code.gson
+ gson
+ 2.8.9
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.32
+
+
+ ch.qos.logback
+ logback-classic
+ 1.2.9
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+ package
+
+ shade
+
+
+
+
+ com.anomaly.consumer.TransactionConsumer
+
+
+
+
+
+
+
+
+
diff --git a/Programming/psd_project/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java b/Programming/psd_project/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java
new file mode 100644
index 00000000..9e0e46fc
--- /dev/null
+++ b/Programming/psd_project/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java
@@ -0,0 +1,176 @@
+package com.anomaly.consumer;
+
+import com.anomaly.model.Transaction;
+import com.google.gson.Gson;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import javax.swing.*;
+import java.awt.*;
+import java.time.Duration;
+import java.util.*;
+import java.util.List;
+
+public class TransactionConsumer {
+ //private static final Logger logger = LoggerFactory.getLogger(TransactionConsumer.class);
+ private static final String BOOTSTRAP_SERVERS = "localhost:9092";
+ private static final String GROUP_ID = "transaction-consumer-group";
+ private static final String TOPIC = "transactions";
+ private static final Gson gson = new Gson();
+
+ // UI Components
+ private static JFrame frame;
+ private static JTextArea logArea;
+ private static JPanel chartPanel;
+ private static final int MAX_DISPLAYED_TRANSACTIONS = 100;
+ private static final List recentTransactions = new ArrayList<>();
+
+ public static void main(String[] args) {
+ setupUI();
+
+ // Create consumer properties
+ Properties properties = new Properties();
+ properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ // Create consumer
+ KafkaConsumer consumer = new KafkaConsumer<>(properties);
+
+ // Subscribe to topic
+ consumer.subscribe(Collections.singletonList(TOPIC));
+
+ // Add shutdown hook
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ //logger.info("Shutting down consumer...");
+ consumer.close();
+ //logger.info("Consumer closed");
+ }));
+
+ // Poll for new data
+ try {
+ while (true) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+
+ for (ConsumerRecord record : records) {
+ logMessage("Received: Key: " + record.key() + ", Value: " + record.value());
+
+ // Parse the transaction
+ Transaction transaction = gson.fromJson(record.value(), Transaction.class);
+ addTransaction(transaction);
+ }
+
+ // Update the visualization
+ if (!records.isEmpty()) {
+ updateChart();
+ }
+ }
+ } finally {
+ consumer.close();
+ }
+ }
+
+ private static void setupUI() {
+ frame = new JFrame("Transaction Visualizer");
+ frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
+ frame.setSize(800, 600);
+
+ // Create log area
+ logArea = new JTextArea();
+ logArea.setEditable(false);
+ JScrollPane scrollPane = new JScrollPane(logArea);
+ scrollPane.setPreferredSize(new Dimension(800, 200));
+
+ // Create chart panel
+ chartPanel = new JPanel() {
+ @Override
+ protected void paintComponent(Graphics g) {
+ super.paintComponent(g);
+ drawChart(g);
+ }
+ };
+ chartPanel.setPreferredSize(new Dimension(800, 400));
+
+ // Add components to frame
+ frame.setLayout(new BorderLayout());
+ frame.add(scrollPane, BorderLayout.SOUTH);
+ frame.add(chartPanel, BorderLayout.CENTER);
+
+ frame.setVisible(true);
+ }
+
+ private static void logMessage(String message) {
+ SwingUtilities.invokeLater(() -> {
+ logArea.append(message + "\n");
+ logArea.setCaretPosition(logArea.getDocument().getLength());
+ });
+ }
+
+ private static void addTransaction(Transaction transaction) {
+ synchronized (recentTransactions) {
+ recentTransactions.add(transaction);
+ if (recentTransactions.size() > MAX_DISPLAYED_TRANSACTIONS) {
+ recentTransactions.remove(0);
+ }
+ }
+ }
+
+ private static void updateChart() {
+ SwingUtilities.invokeLater(() -> chartPanel.repaint());
+ }
+
+ private static void drawChart(Graphics g) {
+ Graphics2D g2d = (Graphics2D) g;
+ int width = chartPanel.getWidth();
+ int height = chartPanel.getHeight();
+
+ // Clear the background
+ g2d.setColor(Color.WHITE);
+ g2d.fillRect(0, 0, width, height);
+
+ synchronized (recentTransactions) {
+ if (recentTransactions.isEmpty()) return;
+
+ // Find max amount for scaling
+ double maxAmount = recentTransactions.stream()
+ .mapToDouble(Transaction::getAmount)
+ .max()
+ .orElse(1.0);
+
+ // Draw axes
+ g2d.setColor(Color.BLACK);
+ g2d.drawLine(50, height - 50, width - 50, height - 50); // X-axis
+ g2d.drawLine(50, 50, 50, height - 50); // Y-axis
+
+ // Draw labels
+ g2d.drawString("Time →", width - 70, height - 20);
+ g2d.drawString("Amount", 10, 40);
+ g2d.drawString("$" + Math.round(maxAmount), 10, 60);
+
+ // Draw transactions as points
+ int xStep = (width - 100) / Math.max(1, recentTransactions.size() - 1);
+ int x = 50;
+
+ for (Transaction transaction : recentTransactions) {
+ int y = height - 50 - (int) ((transaction.getAmount() / maxAmount) * (height - 100));
+
+ // Color based on available limit percentage
+ double limitPercentage = transaction.getAvailableLimit() /
+ (transaction.getAmount() + transaction.getAvailableLimit());
+
+ if (limitPercentage < 0.3) {
+ g2d.setColor(Color.RED);
+ } else if (limitPercentage < 0.6) {
+ g2d.setColor(Color.ORANGE);
+ } else {
+ g2d.setColor(Color.GREEN);
+ }
+
+ g2d.fillOval(x - 3, y - 3, 6, 6);
+ x += xStep;
+ }
+ }
+ }
+}
diff --git a/Programming/psd_project/kafka-consumer-visualizer/src/main/java/com/anomaly/model/Transaction.java b/Programming/psd_project/kafka-consumer-visualizer/src/main/java/com/anomaly/model/Transaction.java
new file mode 100644
index 00000000..9beddc47
--- /dev/null
+++ b/Programming/psd_project/kafka-consumer-visualizer/src/main/java/com/anomaly/model/Transaction.java
@@ -0,0 +1,65 @@
+package com.anomaly.model;
+
+/**
+ * Represents a financial transaction with amount and available credit limit information.
+ */
+public class Transaction {
+ private double amount;
+ private double availableLimit;
+ private String cardNumber;
+ private String timestamp;
+
+ // Default constructor for deserialization
+ public Transaction() {
+ }
+
+ // Full constructor
+ public Transaction(double amount, double availableLimit, String cardNumber, String timestamp) {
+ this.amount = amount;
+ this.availableLimit = availableLimit;
+ this.cardNumber = cardNumber;
+ this.timestamp = timestamp;
+ }
+
+ public double getAmount() {
+ return amount;
+ }
+
+ public void setAmount(double amount) {
+ this.amount = amount;
+ }
+
+ public double getAvailableLimit() {
+ return availableLimit;
+ }
+
+ public void setAvailableLimit(double availableLimit) {
+ this.availableLimit = availableLimit;
+ }
+
+ public String getCardNumber() {
+ return cardNumber;
+ }
+
+ public void setCardNumber(String cardNumber) {
+ this.cardNumber = cardNumber;
+ }
+
+ public String getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(String timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "Transaction{" +
+ "amount=" + amount +
+ ", availableLimit=" + availableLimit +
+ ", cardNumber='" + cardNumber + '\'' +
+ ", timestamp='" + timestamp + '\'' +
+ '}';
+ }
+}
diff --git a/Programming/psd_project/kafka-consumer-visualizer/src/main/resources/logback.xml b/Programming/psd_project/kafka-consumer-visualizer/src/main/resources/logback.xml
new file mode 100644
index 00000000..573bb1db
--- /dev/null
+++ b/Programming/psd_project/kafka-consumer-visualizer/src/main/resources/logback.xml
@@ -0,0 +1,17 @@
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Programming/psd_project/run_all.sh b/Programming/psd_project/run_all.sh
new file mode 100755
index 00000000..3f04ca72
--- /dev/null
+++ b/Programming/psd_project/run_all.sh
@@ -0,0 +1,81 @@
+#!/bin/bash
+
+# Set working directory to script location
+cd "$(dirname "$0")"
+
+# Check if Docker daemon is running
+if ! docker info &>/dev/null; then
+ echo "ERROR: Docker daemon is not running."
+ echo "Please start Docker with: 'sudo systemctl start docker'"
+ echo "If you want Docker to start automatically at boot: 'sudo systemctl enable docker'"
+ echo "To run Docker without sudo, add your user to the docker group: 'sudo usermod -aG docker $USER'"
+ echo "Then log out and log back in for the changes to take effect."
+ exit 1
+fi
+
+# Note about docker-compose.yml
+echo "Note: Your docker-compose.yml contains an obsolete 'version' attribute that should be removed."
+
+echo "Starting Docker containers..."
+docker-compose up -d
+
+echo "Building Maven projects..."
+cd transaction-simulator && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
+cd anomaly-detector && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
+cd kafka-consumer-visualizer && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
+cd alarm-visualizer && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
+
+echo "Creating Kafka topics..."
+docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions
+docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic alerts
+
+echo "Starting all applications..."
+
+# Start Flink job (Anomaly Detector)
+echo "Starting Anomaly Detector..."
+cd anomaly-detector
+java --add-opens java.base/java.time=ALL-UNNAMED -jar target/anomaly-detector-1.0-SNAPSHOT.jar &
+ANOMALY_PID=$!
+cd ..
+
+# Start Alert Visualizer
+echo "Starting Alert Visualizer..."
+cd alarm-visualizer
+java --add-opens java.base/java.time=ALL-UNNAMED -jar target/alarm-visualizer-1.0-SNAPSHOT.jar &
+ALARM_PID=$!
+cd ..
+
+# Start Transaction Consumer/Visualizer
+echo "Starting Transaction Consumer..."
+cd kafka-consumer-visualizer
+java --add-opens java.base/java.time=ALL-UNNAMED -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar &
+CONSUMER_PID=$!
+cd ..
+
+# Start Transaction Producer last
+echo "Starting Transaction Producer..."
+cd transaction-simulator
+java --add-opens java.base/java.time=ALL-UNNAMED -jar target/transaction-simulator-1.0-SNAPSHOT.jar &
+PRODUCER_PID=$!
+cd ..
+
+echo "All applications are running!"
+echo "Press Ctrl+C to stop all applications"
+
+# Function to handle shutdown
+function cleanup {
+ echo "Shutting down applications..."
+ kill $PRODUCER_PID $CONSUMER_PID $ALARM_PID $ANOMALY_PID
+ echo "Stopping Docker containers..."
+ docker-compose down
+ echo "All done!"
+ exit 0
+}
+
+# Catch shutdown signal
+trap cleanup SIGINT SIGTERM
+
+# Keep script running
+while true; do
+ sleep 1
+done
diff --git a/Programming/psd_project/run_all_windows.bat b/Programming/psd_project/run_all_windows.bat
new file mode 100644
index 00000000..cd73e3f6
--- /dev/null
+++ b/Programming/psd_project/run_all_windows.bat
@@ -0,0 +1,38 @@
+@echo off
+setlocal enabledelayedexpansion
+
+REM Set working directory to script location
+cd /d "%~dp0"
+set "PROJECT_ROOT=%cd%"
+
+REM Check if Docker is running
+docker info >nul 2>&1
+if errorlevel 1 (
+ echo ERROR: Docker daemon is not running.
+ echo Please start Docker Desktop and try again.
+ pause
+ exit /b 1
+)
+
+echo Starting Docker containers...
+docker-compose up -d
+
+echo Creating Kafka topics...
+docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions
+docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic alerts
+
+echo Starting all applications in new windows...
+
+echo Starting Anomaly Detector...
+start "Anomaly Detector" cmd /k "cd /d %PROJECT_ROOT%\anomaly-detector && java --add-opens java.base/java.time=ALL-UNNAMED -jar target\anomaly-detector-1.0-SNAPSHOT.jar"
+echo Starting Alert Visualizer...
+start "Alert Visualizer" cmd /k "cd /d %PROJECT_ROOT%\alarm-visualizer && java --add-opens java.base/java.time=ALL-UNNAMED -jar target\alarm-visualizer-1.0-SNAPSHOT.jar"
+echo Starting Transaction Consumer...
+start "Transaction Consumer" cmd /k "cd /d %PROJECT_ROOT%\kafka-consumer-visualizer && java --add-opens java.base/java.time=ALL-UNNAMED -jar target\kafka-consumer-visualizer-1.0-SNAPSHOT.jar"
+echo Starting Transaction Producer...
+start "Transaction Producer" cmd /k "cd /d %PROJECT_ROOT%\transaction-simulator && java --add-opens java.base/java.time=ALL-UNNAMED -jar target\transaction-simulator-1.0-SNAPSHOT.jar"
+
+echo All applications are running!
+echo To stop everything, close all opened windows and run:
+echo docker-compose down
+pause
\ No newline at end of file
diff --git a/Programming/psd_project/stop_all.bat b/Programming/psd_project/stop_all.bat
new file mode 100644
index 00000000..07ae49c6
--- /dev/null
+++ b/Programming/psd_project/stop_all.bat
@@ -0,0 +1,30 @@
+@echo off
+REM filepath: d:\studia\semestr3\psd\projekt\psd_project\stop_all_windows.bat
+
+echo Stopping all Java applications...
+
+REM Stop Transaction Simulator
+for /f "tokens=2" %%a in ('tasklist /FI "IMAGENAME eq java.exe" /v /fo list ^| findstr /I "transaction-simulator"') do (
+ taskkill /PID %%a /F
+)
+
+REM Stop Anomaly Detector
+for /f "tokens=2" %%a in ('tasklist /FI "IMAGENAME eq java.exe" /v /fo list ^| findstr /I "anomaly-detector"') do (
+ taskkill /PID %%a /F
+)
+
+REM Stop Kafka Consumer Visualizer
+for /f "tokens=2" %%a in ('tasklist /FI "IMAGENAME eq java.exe" /v /fo list ^| findstr /I "kafka-consumer-visualizer"') do (
+ taskkill /PID %%a /F
+)
+
+REM Stop Alarm Visualizer
+for /f "tokens=2" %%a in ('tasklist /FI "IMAGENAME eq java.exe" /v /fo list ^| findstr /I "alarm-visualizer"') do (
+ taskkill /PID %%a /F
+)
+
+echo Stopping Docker containers...
+docker-compose down
+
+echo All applications have been stopped!
+pause
\ No newline at end of file
diff --git a/Programming/psd_project/stop_all.sh b/Programming/psd_project/stop_all.sh
new file mode 100755
index 00000000..e07a9bb9
--- /dev/null
+++ b/Programming/psd_project/stop_all.sh
@@ -0,0 +1,12 @@
+#!/bin/bash
+
+echo "Stopping all Java applications..."
+pkill -f "java -jar target/transaction-simulator"
+pkill -f "java -jar target/anomaly-detector"
+pkill -f "java -jar target/kafka-consumer-visualizer"
+pkill -f "java -jar target/alarm-visualizer"
+
+echo "Stopping Docker containers..."
+docker-compose down
+
+echo "All applications have been stopped!"
diff --git a/Programming/psd_project/transaction-simulator/pom.xml b/Programming/psd_project/transaction-simulator/pom.xml
new file mode 100644
index 00000000..3a18a857
--- /dev/null
+++ b/Programming/psd_project/transaction-simulator/pom.xml
@@ -0,0 +1,65 @@
+
+
+ 4.0.0
+
+ com.anomaly
+ transaction-simulator
+ 1.0-SNAPSHOT
+
+
+ 11
+
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 2.8.1
+
+
+
+ com.google.code.gson
+ gson
+ 2.8.9
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.32
+
+
+ ch.qos.logback
+ logback-classic
+ 1.2.9
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+ package
+
+ shade
+
+
+
+
+ com.anomaly.producer.TransactionProducer
+
+
+
+
+
+
+
+
+
diff --git a/Programming/psd_project/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java b/Programming/psd_project/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java
new file mode 100644
index 00000000..fa2950bd
--- /dev/null
+++ b/Programming/psd_project/transaction-simulator/src/main/java/com/anomaly/generator/TransactionGenerator.java
@@ -0,0 +1,195 @@
+package com.anomaly.generator;
+
+import com.anomaly.model.Transaction;
+import java.time.Instant;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TransactionGenerator {
+ private static final int NUM_CARDS = 10000;
+ private static final int NUM_USERS = 5000; // Assuming each user has ~2 cards on average
+ private static final Map> cardLocations = new HashMap<>();
+ private static final Map cardLimits = new HashMap<>();
+ private static final Map cardToUser = new HashMap<>();
+ private static final List cardIds = new ArrayList<>();
+ private static final List userIds = new ArrayList<>();
+
+ // Anomaly types
+ private static final int ANOMALY_NONE = 0;
+ private static final int ANOMALY_AMOUNT = 1;
+ private static final int ANOMALY_LOCATION = 2;
+ private static final int ANOMALY_FREQUENCY = 3;
+
+ // Probability of generating an anomaly (1%)
+ private static final double ANOMALY_PROBABILITY = 0.05;
+
+ // Initialize card and user data
+ static {
+ // Generate user IDs
+ for (int i = 0; i < NUM_USERS; i++) {
+ userIds.add("USER_" + String.format("%05d", i));
+ }
+
+ // Generate card IDs and assign to users
+ for (int i = 0; i < NUM_CARDS; i++) {
+ String cardId = "CARD_" + String.format("%05d", i);
+ cardIds.add(cardId);
+ String userId = userIds.get(ThreadLocalRandom.current().nextInt(NUM_USERS));
+ cardToUser.put(cardId, userId);
+
+ // Initialize empty set of locations for this card
+ cardLocations.put(cardId, new HashSet<>());
+
+ // Assign random limit between $1,000 and $20,000
+ cardLimits.put(cardId, 1000.0 + ThreadLocalRandom.current().nextDouble() * 19000.0);
+ }
+ }
+
+ public Transaction generateTransaction(boolean forceAnomaly, int anomalyType) {
+ // Select a random card
+ String cardId = cardIds.get(ThreadLocalRandom.current().nextInt(NUM_CARDS));
+ String userId = cardToUser.get(cardId);
+ double availableLimit = cardLimits.get(cardId);
+
+ // Determine if this should be an anomaly
+ int actualAnomalyType = ANOMALY_NONE;
+ if (forceAnomaly) {
+ actualAnomalyType = (anomalyType >= 0 && anomalyType <= 3) ?
+ anomalyType : ThreadLocalRandom.current().nextInt(1, 4);
+ } else if (ThreadLocalRandom.current().nextDouble() < ANOMALY_PROBABILITY) {
+ double roll = ThreadLocalRandom.current().nextDouble();
+ if (roll < 0.4) {
+ actualAnomalyType = ANOMALY_LOCATION;
+ } else if (roll < 0.7) {
+ actualAnomalyType = ANOMALY_AMOUNT;
+ } else {
+ actualAnomalyType = ANOMALY_FREQUENCY;
+ }
+ }
+
+ // Get or generate location
+ LocationPoint location = getLocationForCard(cardId, actualAnomalyType == ANOMALY_LOCATION);
+ double latitude = location.latitude;
+ double longitude = location.longitude;
+
+ // Generate transaction amount
+ double amount;
+ if (actualAnomalyType == ANOMALY_AMOUNT) {
+ // Generate anomalously high amount (50-90% of available limit)
+ amount = availableLimit * (0.5 + ThreadLocalRandom.current().nextDouble() * 0.4);
+ } else {
+ // Normal amount (1-10% of available limit)
+ amount = availableLimit * (0.01 + ThreadLocalRandom.current().nextDouble() * 0.09);
+ }
+
+ // Update available limit
+ double newLimit = availableLimit - amount;
+ cardLimits.put(cardId, newLimit > 0 ? newLimit : 0);
+
+ // Create transaction
+ Transaction transaction = new Transaction(
+ cardId,
+ userId,
+ latitude,
+ longitude,
+ amount,
+ newLimit,
+ Instant.now()
+ );
+
+ return transaction;
+ }
+
+ private LocationPoint getLocationForCard(String cardId, boolean generateAnomaly) {
+ Set locations = cardLocations.get(cardId);
+
+ if (locations.isEmpty() || generateAnomaly) {
+ // Generate a random worldwide location
+ double latitude = ThreadLocalRandom.current().nextDouble(-90, 90);
+ double longitude = ThreadLocalRandom.current().nextDouble(-180, 180);
+ LocationPoint newLocation = new LocationPoint(latitude, longitude);
+
+ // Store this location for future use unless it's an anomaly
+ if (!generateAnomaly) {
+ locations.add(newLocation);
+ }
+
+ return newLocation;
+ } else {
+ // Pick a random location from the card's history
+ LocationPoint[] locArray = locations.toArray(new LocationPoint[0]);
+ return locArray[ThreadLocalRandom.current().nextInt(locArray.length)];
+ }
+ }
+
+ // Method to simulate frequency anomaly by generating multiple transactions in short succession
+ public List generateFrequencyAnomaly(String specificCardId) {
+ List transactions = new ArrayList<>();
+ String cardId = specificCardId != null ?
+ specificCardId : cardIds.get(ThreadLocalRandom.current().nextInt(NUM_CARDS));
+
+ // Generate 5-10 transactions in quick succession
+ int numTransactions = ThreadLocalRandom.current().nextInt(5, 11);
+ for (int i = 0; i < numTransactions; i++) {
+ transactions.add(generateTransactionForCard(cardId, false, ANOMALY_NONE));
+ }
+
+ return transactions;
+ }
+
+ private Transaction generateTransactionForCard(String cardId, boolean forceAnomaly, int anomalyType) {
+ String userId = cardToUser.get(cardId);
+ double availableLimit = cardLimits.get(cardId);
+
+ // Get location
+ LocationPoint location = getLocationForCard(cardId, forceAnomaly && anomalyType == ANOMALY_LOCATION);
+ double latitude = location.latitude;
+ double longitude = location.longitude;
+
+ // Generate amount
+ double amount;
+ if (forceAnomaly && anomalyType == ANOMALY_AMOUNT) {
+ amount = availableLimit * (0.5 + ThreadLocalRandom.current().nextDouble() * 0.4);
+ } else {
+ amount = availableLimit * (0.01 + ThreadLocalRandom.current().nextDouble() * 0.09);
+ }
+
+ // Update available limit
+ double newLimit = availableLimit - amount;
+ cardLimits.put(cardId, newLimit > 0 ? newLimit : 0);
+
+ return new Transaction(
+ cardId,
+ userId,
+ latitude,
+ longitude,
+ amount,
+ newLimit,
+ Instant.now()
+ );
+ }
+
+ private static class LocationPoint {
+ final double latitude;
+ final double longitude;
+
+ LocationPoint(double latitude, double longitude) {
+ this.latitude = latitude;
+ this.longitude = longitude;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ LocationPoint that = (LocationPoint) o;
+ return Double.compare(that.latitude, latitude) == 0 &&
+ Double.compare(that.longitude, longitude) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(latitude, longitude);
+ }
+ }
+}
diff --git a/Programming/psd_project/transaction-simulator/src/main/java/com/anomaly/model/Transaction.java b/Programming/psd_project/transaction-simulator/src/main/java/com/anomaly/model/Transaction.java
new file mode 100644
index 00000000..b45b3a31
--- /dev/null
+++ b/Programming/psd_project/transaction-simulator/src/main/java/com/anomaly/model/Transaction.java
@@ -0,0 +1,85 @@
+package com.anomaly.model;
+
+import java.time.Instant;
+import java.util.Objects;
+
+public class Transaction {
+ private final String cardId;
+ private final String userId;
+ private final double latitude;
+ private final double longitude;
+ private final double amount;
+ private final double availableLimit;
+ private final Instant timestamp;
+
+ public Transaction(String cardId, String userId, double latitude, double longitude,
+ double amount, double availableLimit, Instant timestamp) {
+ this.cardId = cardId;
+ this.userId = userId;
+ this.latitude = latitude;
+ this.longitude = longitude;
+ this.amount = amount;
+ this.availableLimit = availableLimit;
+ this.timestamp = timestamp;
+ }
+
+ public String getCardId() {
+ return cardId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public double getLatitude() {
+ return latitude;
+ }
+
+ public double getLongitude() {
+ return longitude;
+ }
+
+ public double getAmount() {
+ return amount;
+ }
+
+ public double getAvailableLimit() {
+ return availableLimit;
+ }
+
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "Transaction{" +
+ "cardId='" + cardId + '\'' +
+ ", userId='" + userId + '\'' +
+ ", latitude=" + latitude +
+ ", longitude=" + longitude +
+ ", amount=" + amount +
+ ", availableLimit=" + availableLimit +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Transaction that = (Transaction) o;
+ return Double.compare(that.latitude, latitude) == 0 &&
+ Double.compare(that.longitude, longitude) == 0 &&
+ Double.compare(that.amount, amount) == 0 &&
+ Double.compare(that.availableLimit, availableLimit) == 0 &&
+ Objects.equals(cardId, that.cardId) &&
+ Objects.equals(userId, that.userId) &&
+ Objects.equals(timestamp, that.timestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cardId, userId, latitude, longitude, amount, availableLimit, timestamp);
+ }
+}
diff --git a/Programming/psd_project/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java b/Programming/psd_project/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java
new file mode 100644
index 00000000..e0129fa4
--- /dev/null
+++ b/Programming/psd_project/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java
@@ -0,0 +1,104 @@
+package com.anomaly.producer;
+
+import com.anomaly.generator.TransactionGenerator;
+import com.anomaly.model.Transaction;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TransactionProducer {
+ //private static final Logger logger = LoggerFactory.getLogger(TransactionProducer.class);
+ private static final String BOOTSTRAP_SERVERS = "localhost:9092";
+ private static final String TOPIC_NAME = "transactions";
+ private static final TransactionGenerator generator = new TransactionGenerator();
+
+ // Replace simple Gson with a properly configured instance
+ private static final Gson gson = new GsonBuilder()
+ .registerTypeAdapter(Instant.class, new TypeAdapter() {
+ @Override
+ public void write(JsonWriter out, Instant value) throws IOException {
+ out.value(value != null ? value.toString() : null);
+ }
+
+ @Override
+ public Instant read(JsonReader in) throws IOException {
+ return Instant.parse(in.nextString());
+ }
+ })
+ .create();
+
+ public static void main(String[] args) {
+ // Create producer properties
+ Properties properties = new Properties();
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+ properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ // Create producer
+ KafkaProducer producer = new KafkaProducer<>(properties);
+
+ // Add shutdown hook
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ //logger.info("Shutting down producer...");
+ producer.flush();
+ producer.close();
+ //logger.info("Producer closed");
+ }));
+
+ // Generate and send transactions
+ try {
+ while (true) {
+ // Normal transaction generation
+ if (ThreadLocalRandom.current().nextDouble() < 0.05) {
+ // 5% chance to generate a frequency anomaly
+ List anomalousTransactions = generator.generateFrequencyAnomaly(null);
+ for (Transaction transaction : anomalousTransactions) {
+ sendTransaction(producer, transaction);
+ }
+ } else {
+ // Regular transaction
+ boolean forceAnomaly = ThreadLocalRandom.current().nextDouble() < 0.02; // 2% chance
+ Transaction transaction = generator.generateTransaction(forceAnomaly, -1);
+ sendTransaction(producer, transaction);
+ }
+
+ // Sleep between 100ms and 1s before generating next transaction
+ Thread.sleep(ThreadLocalRandom.current().nextLong(100, 1000));
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ //logger.error("Error in transaction producer", e);
+ } finally {
+ producer.flush();
+ producer.close();
+ }
+ }
+
+ private static void sendTransaction(KafkaProducer producer, Transaction transaction)
+ throws ExecutionException, InterruptedException {
+ String jsonTransaction = gson.toJson(transaction);
+ ProducerRecord record = new ProducerRecord<>(
+ TOPIC_NAME,
+ transaction.getCardId(),
+ jsonTransaction
+ );
+
+ producer.send(record, (metadata, exception) -> {
+ if (exception == null) {
+ //logger.info("Received metadata: Topic: {}, Partition: {}, Offset: {}, Timestamp: {}",
+ // metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp());
+ } else {
+ //logger.error("Error sending message", exception);
+ }
+ }).get(); // Making it synchronous for demonstration
+ }
+}
diff --git a/Programming/psd_project/transaction-simulator/src/main/resources/logback.xml b/Programming/psd_project/transaction-simulator/src/main/resources/logback.xml
new file mode 100644
index 00000000..e9231f59
--- /dev/null
+++ b/Programming/psd_project/transaction-simulator/src/main/resources/logback.xml
@@ -0,0 +1,16 @@
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+