Add 'Programming/psd_project/' from commit '458850e355e44401ee450b65161ac276a383524d'

git-subtree-dir: Programming/psd_project
git-subtree-mainline: 38501bf024
git-subtree-split: 458850e355
This commit is contained in:
Krzysztof kuhy Rudnicki 2026-02-06 22:14:27 +01:00
commit 17121f45b9
24 changed files with 2292 additions and 0 deletions

129
Programming/psd_project/.gitignore vendored Normal file
View File

@ -0,0 +1,129 @@
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
# https://github.com/takari/maven-wrapper#usage-without-binary-jar
.mvn/wrapper/maven-wrapper.jar
# Eclipse m2e generated files
# Eclipse Core
.project
# JDT-specific (Eclipse Java Development Tools)
.classpath
# Docker project generated files to ignore
# if you want to ignore files created by your editor/tools,
# please consider a global .gitignore https://help.github.com/articles/ignoring-files
.vagrant*
bin
docker/docker
.*.swp
a.out
*.orig
build_src
.flymake*
.idea
.DS_Store
docs/_build
docs/_static
docs/_templates
.gopath/
.dotcloud
*.test
bundles/
.hg/
.git/
vendor/pkg/
pyenv
Vagrantfile
dist
*classes
*.class
target/
build/
build_eclipse/
out/
.gradle/
.vscode/
lib_managed/
src_managed/
project/boot/
project/plugins/project/
patch-process/*
.idea
.svn
.classpath
/.metadata
/.recommenders
*~
*#
.#*
rat.out
TAGS
*.iml
.project
.settings
*.ipr
*.iws
.vagrant
Vagrantfile.local
/logs
.DS_Store
config/server-*
config/zookeeper-*
gradle/wrapper/*.jar
gradlew.bat
results
tests/results
.ducktape
tests/.ducktape
tests/venv
.cache
docs/generated/
.release-settings.json
kafkatest.egg-info/
systest/
*.swp
jmh-benchmarks/generated
jmh-benchmarks/src/main/generated
**/.jqwik-database
**/src/generated
**/src/generated-test
storage/kafka-tiered-storage/
docker/test/report_*.html
kafka.Kafka
__pycache__
# Compiled class file
*.class
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
replay_pid*

View File

@ -0,0 +1,65 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.anomaly</groupId>
<artifactId>alarm-visualizer</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.release>11</maven.compiler.release>
</properties>
<dependencies>
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
<!-- JSON processing -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.9</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.anomaly.visualizer.AlertVisualizer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,106 @@
package com.anomaly.model;
import java.time.Instant;
public class TransactionAlert {
private String alertType;
private Instant alertTime;
private Instant timestamp;
private String cardId;
private String userId;
private double amount;
private double latitude;
private double longitude;
private String message;
// Default constructor for Gson deserialization
public TransactionAlert() {
}
public TransactionAlert(String alertType, Instant alertTime, Instant timestamp,
String cardId, String userId, double amount,
double latitude, double longitude, String message) {
this.alertType = alertType;
this.alertTime = alertTime;
this.timestamp = timestamp;
this.cardId = cardId;
this.userId = userId;
this.amount = amount;
this.latitude = latitude;
this.longitude = longitude;
this.message = message;
}
// Getters and setters
public String getAlertType() {
return alertType;
}
public void setAlertType(String alertType) {
this.alertType = alertType;
}
public Instant getAlertTime() {
return alertTime;
}
public void setAlertTime(Instant alertTime) {
this.alertTime = alertTime;
}
public Instant getTimestamp() {
return timestamp;
}
public void setTimestamp(Instant timestamp) {
this.timestamp = timestamp;
}
public String getCardId() {
return cardId;
}
public void setCardId(String cardId) {
this.cardId = cardId;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
public double getLatitude() {
return latitude;
}
public void setLatitude(double latitude) {
this.latitude = latitude;
}
public double getLongitude() {
return longitude;
}
public void setLongitude(double longitude) {
this.longitude = longitude;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@ -0,0 +1,304 @@
package com.anomaly.visualizer;
import com.anomaly.model.TransactionAlert;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
import javax.swing.*;
import javax.swing.table.DefaultTableModel;
import java.awt.*;
import java.lang.reflect.Type;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.List;
public class AlertVisualizer {
//private static final Logger logger = LoggerFactory.getLogger(AlertVisualizer.class);
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "alert-visualizer-group";
private static final String TOPIC = "alerts";
// Custom Gson instance with Instant type adapter
private static final Gson gson = new GsonBuilder()
.registerTypeAdapter(Instant.class, new InstantDeserializer())
.create();
// UI Components
private static JFrame frame;
private static JTable alertTable;
private static DefaultTableModel tableModel;
private static JTextArea detailArea;
private static final List<TransactionAlert> allAlerts = new ArrayList<>();
private static final DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
// Custom deserializer for Instant
private static class InstantDeserializer implements JsonDeserializer<Instant> {
@Override
public Instant deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
throws JsonParseException {
try {
// Try parsing as long (epoch milliseconds)
return Instant.ofEpochMilli(json.getAsLong());
} catch (NumberFormatException e) {
// Try parsing as ISO-8601 string
return Instant.parse(json.getAsString());
}
}
}
public static void main(String[] args) {
setupUI();
// Create consumer properties
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Subscribe to topic
consumer.subscribe(Collections.singletonList(TOPIC));
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
//logger.info("Shutting down alert visualizer...");
consumer.close();
//logger.info("Alert visualizer closed");
}));
// Poll for new alerts
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
boolean newAlerts = false;
for (ConsumerRecord<String, String> record : records) {
// Parse the alert
TransactionAlert alert = gson.fromJson(record.value(), TransactionAlert.class);
addAlert(alert);
newAlerts = true;
// Display notification for new alert
displayNotification(alert);
}
// Update the UI if new alerts arrived
if (newAlerts) {
updateAlertTable();
}
}
} finally {
consumer.close();
}
}
private static void setupUI() {
frame = new JFrame("Transaction Alert Visualizer");
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
frame.setSize(1024, 768);
// Create table with columns
String[] columnNames = {"Time", "Alert Type", "Card ID", "User ID", "Amount", "Message"};
tableModel = new DefaultTableModel(columnNames, 0);
alertTable = new JTable(tableModel);
alertTable.setSelectionMode(ListSelectionModel.SINGLE_SELECTION);
// Add selection listener to show details when an alert is selected
alertTable.getSelectionModel().addListSelectionListener(e -> {
if (!e.getValueIsAdjusting()) {
int selectedRow = alertTable.getSelectedRow();
if (selectedRow >= 0 && selectedRow < allAlerts.size()) {
showAlertDetails(allAlerts.get(selectedRow));
}
}
});
JScrollPane tableScrollPane = new JScrollPane(alertTable);
// Create detail panel
detailArea = new JTextArea();
detailArea.setEditable(false);
JScrollPane detailScrollPane = new JScrollPane(detailArea);
// Create map visualization panel
JPanel mapPanel = new JPanel() {
@Override
protected void paintComponent(Graphics g) {
super.paintComponent(g);
drawMap(g);
}
};
// Create split panes for layout
JSplitPane mainSplitPane = new JSplitPane(JSplitPane.VERTICAL_SPLIT,
tableScrollPane, new JSplitPane(JSplitPane.HORIZONTAL_SPLIT, detailScrollPane, mapPanel));
mainSplitPane.setDividerLocation(300);
((JSplitPane)mainSplitPane.getBottomComponent()).setDividerLocation(500);
frame.add(mainSplitPane);
frame.setVisible(true);
}
private static void addAlert(TransactionAlert alert) {
synchronized (allAlerts) {
allAlerts.add(0, alert); // Add at the beginning for newest first
}
}
private static void updateAlertTable() {
SwingUtilities.invokeLater(() -> {
tableModel.setRowCount(0); // Clear table
synchronized (allAlerts) {
for (TransactionAlert alert : allAlerts) {
// Add null check for alert time
String formattedTime = alert.getAlertTime() != null ?
formatter.format(alert.getAlertTime()) : "N/A";
tableModel.addRow(new Object[]{
formattedTime,
alert.getAlertType(),
alert.getCardId(),
alert.getUserId(),
String.format("$%.2f", alert.getAmount()),
alert.getMessage()
});
}
}
});
}
private static void showAlertDetails(TransactionAlert alert) {
SwingUtilities.invokeLater(() -> {
StringBuilder sb = new StringBuilder();
sb.append("ALERT DETAILS\n");
sb.append("============================================\n\n");
sb.append("Alert Type: ").append(alert.getAlertType()).append("\n");
sb.append("Alert Time: ").append(alert.getAlertTime() != null ?
formatter.format(alert.getAlertTime()) : "N/A").append("\n");
sb.append("Transaction Time: ").append(alert.getTimestamp() != null ?
formatter.format(alert.getTimestamp()) : "N/A").append("\n\n");
sb.append("Card ID: ").append(alert.getCardId()).append("\n");
sb.append("User ID: ").append(alert.getUserId()).append("\n");
sb.append("Transaction Amount: $").append(String.format("%.2f", alert.getAmount())).append("\n\n");
sb.append("Location: ").append(alert.getLatitude()).append(", ").append(alert.getLongitude()).append("\n\n");
sb.append("Alert Message: ").append(alert.getMessage()).append("\n");
detailArea.setText(sb.toString());
frame.repaint(); // Trigger map redraw to highlight selected alert
});
}
private static void drawMap(Graphics g) {
Graphics2D g2d = (Graphics2D) g;
int width = g2d.getClipBounds().width;
int height = g2d.getClipBounds().height;
// Draw world map (simplified)
g2d.setColor(Color.LIGHT_GRAY);
g2d.fillRect(0, 0, width, height);
g2d.setColor(Color.DARK_GRAY);
g2d.drawRect(0, 0, width - 1, height - 1);
// Draw grid lines
g2d.setColor(Color.GRAY);
for (int i = 0; i < width; i += 50) {
g2d.drawLine(i, 0, i, height);
}
for (int i = 0; i < height; i += 50) {
g2d.drawLine(0, i, width, i);
}
// Get selected alert
int selectedRow = alertTable.getSelectedRow();
TransactionAlert selectedAlert = null;
if (selectedRow >= 0 && selectedRow < allAlerts.size()) {
selectedAlert = allAlerts.get(selectedRow);
}
// Draw alert points
synchronized (allAlerts) {
for (TransactionAlert alert : allAlerts) {
// Map lat/lon to screen coordinates
int x = (int) ((alert.getLongitude() + 180) / 360 * width);
int y = (int) ((90 - alert.getLatitude()) / 180 * height);
// Color by alert type
if (alert.getAlertType().equals("AMOUNT_ANOMALY")) {
g2d.setColor(Color.RED);
} else if (alert.getAlertType().equals("LOCATION_ANOMALY")) {
g2d.setColor(Color.BLUE);
} else {
g2d.setColor(Color.ORANGE);
}
// Make selected alert larger
if (alert == selectedAlert) {
g2d.fillOval(x - 8, y - 8, 16, 16);
} else {
g2d.fillOval(x - 4, y - 4, 8, 8);
}
}
}
// Draw legend
g2d.setColor(Color.BLACK);
g2d.drawString("Legend:", width - 150, 20);
g2d.setColor(Color.RED);
g2d.fillOval(width - 140, 30, 10, 10);
g2d.setColor(Color.BLACK);
g2d.drawString("Amount Anomaly", width - 125, 40);
g2d.setColor(Color.BLUE);
g2d.fillOval(width - 140, 50, 10, 10);
g2d.setColor(Color.BLACK);
g2d.drawString("Location Anomaly", width - 125, 60);
g2d.setColor(Color.ORANGE);
g2d.fillOval(width - 140, 70, 10, 10);
g2d.setColor(Color.BLACK);
g2d.drawString("Frequency Anomaly", width - 125, 80);
}
private static void displayNotification(TransactionAlert alert) {
SwingUtilities.invokeLater(() -> {
// Make a simple notification window
JDialog dialog = new JDialog(frame, "New Alert!", false);
dialog.setSize(400, 150);
dialog.setLocationRelativeTo(frame);
JPanel panel = new JPanel(new BorderLayout());
JLabel label = new JLabel("<html><b>" + alert.getAlertType() + "</b><br>" +
alert.getMessage() + "<br>Card: " + alert.getCardId() + "</html>");
label.setBorder(BorderFactory.createEmptyBorder(10, 10, 10, 10));
panel.add(label, BorderLayout.CENTER);
JButton closeButton = new JButton("Dismiss");
closeButton.addActionListener(e -> dialog.dispose());
JPanel buttonPanel = new JPanel();
buttonPanel.add(closeButton);
panel.add(buttonPanel, BorderLayout.SOUTH);
dialog.add(panel);
dialog.setVisible(true);
// Auto-dismiss after 5 seconds
javax.swing.Timer timer = new javax.swing.Timer(5000, e -> dialog.dispose());
timer.setRepeats(false);
timer.start();
});
}
}

View File

@ -0,0 +1,17 @@
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- Set Kafka related loggers to ERROR level -->
<logger name="org.apache.kafka" level="ERROR"/>
<logger name="kafka" level="ERROR"/>
<logger name="org.apache.zookeeper" level="ERROR"/>
<!-- Set root logger to WARN -->
<root level="WARN">
<appender-ref ref="CONSOLE" />
</root>
</configuration>

View File

@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.anomaly</groupId>
<artifactId>anomaly-detector</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.release>11</maven.compiler.release>
<flink.version>1.14.2</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- Apache Flink core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- JSON processing -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.9</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.anomaly.detector.AnomalyDetector</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,381 @@
package com.anomaly.detector;
import com.anomaly.model.Transaction;
import com.anomaly.model.TransactionAlert;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.*;
import java.time.Instant;
import java.io.IOException;
import java.io.Serializable;
public class AnomalyDetector {
private static final String INPUT_TOPIC = "transactions";
private static final String OUTPUT_TOPIC = "alerts";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
// Replace the simple Gson initialization with a configured one
private static final Gson gson = new GsonBuilder()
.registerTypeAdapter(Instant.class, new InstantTypeAdapter())
.create();
// Add a custom TypeAdapter for Instant
private static class InstantTypeAdapter extends TypeAdapter<Instant> {
@Override
public void write(JsonWriter out, Instant value) throws IOException {
if (value == null) {
out.nullValue();
} else {
out.value(value.toString());
}
}
@Override
public Instant read(JsonReader in) throws IOException {
return Instant.parse(in.nextString());
}
}
public static void main(String[] args) throws Exception {
// Set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create Kafka source to replace deprecated FlinkKafkaConsumer
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setTopics(INPUT_TOPIC)
.setGroupId("anomaly-detector")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// Parse JSON transactions
DataStream<Transaction> transactionStream = env
.fromSource(kafkaSource, org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(), "Kafka Source")
.map(new MapFunction<String, Transaction>() {
@Override
public Transaction map(String value) throws Exception {
return gson.fromJson(value, Transaction.class);
}
});
// Detect anomalies based on different metrics
// 1. Amount anomaly - sudden high-value transactions
DataStream<TransactionAlert> amountAlerts = transactionStream
.keyBy(Transaction::getCardId)
.window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.process(new AmountAnomalyDetector());
// 2. Location anomaly - sudden change in location
DataStream<TransactionAlert> locationAlerts = transactionStream
.keyBy(Transaction::getCardId)
.window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.process(new LocationAnomalyDetector());
// 3. Frequency anomaly - unusual number of transactions in short time
DataStream<TransactionAlert> frequencyAlerts = transactionStream
.keyBy(Transaction::getCardId)
.window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.process(new FrequencyAnomalyDetector());
// Union all alert streams
DataStream<TransactionAlert> allAlerts = amountAlerts
.union(locationAlerts, frequencyAlerts);
// Create KafkaSink to replace deprecated FlinkKafkaProducer
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(OUTPUT_TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// Convert alerts to JSON and send to Kafka
allAlerts
.map(alert -> gson.toJson(alert))
.sinkTo(kafkaSink);
// Execute the Flink job
env.execute("Credit Card Transaction Anomaly Detection");
}
// Detector for unusual transaction amounts
public static class AmountAnomalyDetector
extends ProcessWindowFunction<Transaction, TransactionAlert, String, TimeWindow> {
@Override
public void process(String cardId, Context context, Iterable<Transaction> transactions,
Collector<TransactionAlert> out) {
List<Transaction> transactionList = new ArrayList<>();
transactions.forEach(transactionList::add);
if (transactionList.isEmpty()) return;
// Calculate statistics
double averageAmount = transactionList.stream()
.mapToDouble(Transaction::getAmount)
.average()
.orElse(0);
double stdDeviation = calculateStdDeviation(transactionList, averageAmount);
// Check for anomalies (transactions that are more than 1.7 standard deviations from mean)
for (Transaction transaction : transactionList) {
if (stdDeviation > 0 && Math.abs(transaction.getAmount() - averageAmount) > 1.5 * stdDeviation && transaction.getAmount() > averageAmount && transaction.getAmount() > 1000) {
out.collect(new TransactionAlert(
"AMOUNT_ANOMALY",
transaction.getCardId(),
transaction.getUserId(),
transaction.getAmount(),
transaction.getLatitude(),
transaction.getLongitude(),
transaction.getTimestamp(),
"Unusual transaction amount detected: $" + transaction.getAmount() +
" (Average: $" + String.format("%.2f", averageAmount) + ")"
));
}
}
}
private double calculateStdDeviation(List<Transaction> transactions, double mean) {
return Math.sqrt(transactions.stream()
.mapToDouble(t -> Math.pow(t.getAmount() - mean, 2))
.average()
.orElse(0));
}
}
// Detector for unusual transaction locations
public static class LocationAnomalyDetector
extends ProcessWindowFunction<Transaction, TransactionAlert, String, TimeWindow> {
private transient MapState<String, Set<LocationPoint>> knownLocations;
private static final int MAX_KNOWN_LOCATIONS = 5; // Limit known locations to avoid memory issues
private static final double ANOMALY_DISTANCE_THRESHOLD = 10.0; // Threshold in km
private static final int MIN_LOCATIONS_FOR_DETECTION = 2; // Minimum known locations before detecting anomalies
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Set<LocationPoint>> descriptor =
new MapStateDescriptor<>(
"knownLocations",
TypeInformation.of(String.class),
TypeInformation.of(new TypeHint<Set<LocationPoint>>() {})
);
knownLocations = getRuntimeContext().getMapState(descriptor);
}
@Override
public void process(String cardId, Context context, Iterable<Transaction> transactions,
Collector<TransactionAlert> out) throws Exception {
List<Transaction> transactionList = new ArrayList<>();
transactions.forEach(transactionList::add);
if (transactionList.isEmpty()) return;
// Get or create location set for this card
Set<LocationPoint> cardKnownLocations;
if (knownLocations.contains(cardId)) {
cardKnownLocations = knownLocations.get(cardId);
System.out.println("Card " + cardId + " has " + cardKnownLocations.size() + " known locations");
} else {
cardKnownLocations = new HashSet<>();
System.out.println("New card detected: " + cardId + ", initializing known locations");
}
// Process each transaction
for (Transaction transaction : transactionList) {
LocationPoint currentPoint = new LocationPoint(transaction.getLatitude(), transaction.getLongitude());
// First few transactions establish the baseline locations
if (cardKnownLocations.size() < MIN_LOCATIONS_FOR_DETECTION) {
System.out.println("Building baseline for card " + cardId + ", adding location #" +
(cardKnownLocations.size() + 1) + " to known locations");
// Check if this location is already very close to a known location before adding
boolean isVeryCloseToKnown = false;
for (LocationPoint knownPoint : cardKnownLocations) {
if (calculateDistance(currentPoint, knownPoint) < 2.0) { // Within 2km = same area
isVeryCloseToKnown = true;
System.out.println("Location is very close to existing baseline location, not adding duplicate");
break;
}
}
// Only add distinct baseline locations
if (!isVeryCloseToKnown) {
cardKnownLocations.add(currentPoint);
}
// We're still building the baseline, don't check for anomalies yet
continue;
}
// Check distance to known locations
double closestDistance = Double.MAX_VALUE;
LocationPoint closestPoint = null;
for (LocationPoint knownPoint : cardKnownLocations) {
double distance = calculateDistance(currentPoint, knownPoint);
if (distance < closestDistance) {
closestDistance = distance;
closestPoint = knownPoint;
}
}
System.out.println("CARD " + cardId + ": Transaction at " + currentPoint + ", closest known location: " +
closestPoint + " (" + String.format("%.2f", closestDistance) + " km)");
// Detect anomaly if transaction is far from all known locations
if (closestDistance > ANOMALY_DISTANCE_THRESHOLD) {
System.out.println("⚠️ LOCATION ANOMALY DETECTED: Distance " +
String.format("%.2f", closestDistance) + "km exceeds threshold of " +
ANOMALY_DISTANCE_THRESHOLD + "km");
out.collect(new TransactionAlert(
"LOCATION_ANOMALY",
transaction.getCardId(),
transaction.getUserId(),
transaction.getAmount(),
transaction.getLatitude(),
transaction.getLongitude(),
transaction.getTimestamp(),
"Unusual transaction location: " + String.format("%.2f", closestDistance) +
"km from nearest known location"
));
// Don't automatically add anomalous locations to known locations
} else {
// Check if this location is already very close to a known location
boolean isVeryCloseToKnown = false;
for (LocationPoint knownPoint : cardKnownLocations) {
if (calculateDistance(currentPoint, knownPoint) < 2.0) { // Within 2km = same area
isVeryCloseToKnown = true;
break;
}
}
// Only add distinct new locations, up to our maximum
if (!isVeryCloseToKnown && cardKnownLocations.size() < MAX_KNOWN_LOCATIONS) {
cardKnownLocations.add(currentPoint);
System.out.println("Added new location to known locations: " + currentPoint);
}
}
}
// Update the state
knownLocations.put(cardId, cardKnownLocations);
}
// Calculate distance between two points using Haversine formula (in km)
private double calculateDistance(LocationPoint p1, LocationPoint p2) {
final int R = 6371; // Earth radius in km
double latDistance = Math.toRadians(p2.latitude - p1.latitude);
double lonDistance = Math.toRadians(p2.longitude - p1.longitude);
double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2)
+ Math.cos(Math.toRadians(p1.latitude)) * Math.cos(Math.toRadians(p2.latitude))
* Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2);
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
return R * c;
}
private static class LocationPoint implements Serializable {
private static final long serialVersionUID = 1L;
private final double latitude;
private final double longitude;
public LocationPoint(double latitude, double longitude) {
this.latitude = latitude;
this.longitude = longitude;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LocationPoint that = (LocationPoint) o;
return Double.compare(that.latitude, latitude) == 0 &&
Double.compare(that.longitude, longitude) == 0;
}
@Override
public int hashCode() {
return Objects.hash(latitude, longitude);
}
@Override
public String toString() {
return "LocationPoint{" +
"lat=" + latitude +
", lon=" + longitude +
'}';
}
}
}
// Detector for unusual transaction frequency
public static class FrequencyAnomalyDetector
extends ProcessWindowFunction<Transaction, TransactionAlert, String, TimeWindow> {
@Override
public void process(String cardId, Context context, Iterable<Transaction> transactions,
Collector<TransactionAlert> out) {
List<Transaction> transactionList = new ArrayList<>();
transactions.forEach(transactionList::add);
// Get window info
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
long windowSizeMinutes = (windowEnd - windowStart) / (1000 * 60);
// If there are more than 5 transactions in 5 minutes for the same card, flag it
if (transactionList.size() > 7) {
Transaction latestTransaction = transactionList.stream()
.max(Comparator.comparing(Transaction::getTimestamp))
.orElse(transactionList.get(0));
out.collect(new TransactionAlert(
"FREQUENCY_ANOMALY",
latestTransaction.getCardId(),
latestTransaction.getUserId(),
latestTransaction.getAmount(),
latestTransaction.getLatitude(),
latestTransaction.getLongitude(),
latestTransaction.getTimestamp(),
"Unusual transaction frequency detected: " + transactionList.size() +
" transactions in " + windowSizeMinutes + " minutes"
));
}
}
}
}

View File

@ -0,0 +1,90 @@
package com.anomaly.model;
import java.time.Instant;
/**
* Represents a financial transaction with location data.
*/
public class Transaction {
private String cardId;
private String userId;
private double amount;
private double latitude;
private double longitude;
private Instant timestamp;
// Default constructor for deserialization
public Transaction() {
}
public Transaction(String cardId, String userId, double amount,
double latitude, double longitude, Instant timestamp) {
this.cardId = cardId;
this.userId = userId;
this.amount = amount;
this.latitude = latitude;
this.longitude = longitude;
this.timestamp = timestamp;
}
// Getters and setters
public String getCardId() {
return cardId;
}
public void setCardId(String cardId) {
this.cardId = cardId;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
public double getLatitude() {
return latitude;
}
public void setLatitude(double latitude) {
this.latitude = latitude;
}
public double getLongitude() {
return longitude;
}
public void setLongitude(double longitude) {
this.longitude = longitude;
}
public Instant getTimestamp() {
return timestamp;
}
public void setTimestamp(Instant timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Transaction{" +
"cardId='" + cardId + '\'' +
", userId='" + userId + '\'' +
", amount=" + amount +
", latitude=" + latitude +
", longitude=" + longitude +
", timestamp=" + timestamp +
'}';
}
}

View File

@ -0,0 +1,112 @@
package com.anomaly.model;
import java.time.Instant;
/**
* Represents an alert generated when an anomaly is detected in a transaction.
*/
public class TransactionAlert {
private String alertType;
private String cardId;
private String userId;
private double amount;
private double latitude;
private double longitude;
private Instant timestamp;
private String message;
public TransactionAlert() {
}
public TransactionAlert(String alertType, String cardId, String userId,
double amount, double latitude, double longitude,
Instant timestamp, String message) {
this.alertType = alertType;
this.cardId = cardId;
this.userId = userId;
this.amount = amount;
this.latitude = latitude;
this.longitude = longitude;
this.timestamp = timestamp;
this.message = message;
}
// Getters and setters
public String getAlertType() {
return alertType;
}
public void setAlertType(String alertType) {
this.alertType = alertType;
}
public String getCardId() {
return cardId;
}
public void setCardId(String cardId) {
this.cardId = cardId;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
public double getLatitude() {
return latitude;
}
public void setLatitude(double latitude) {
this.latitude = latitude;
}
public double getLongitude() {
return longitude;
}
public void setLongitude(double longitude) {
this.longitude = longitude;
}
public Instant getTimestamp() {
return timestamp;
}
public void setTimestamp(Instant timestamp) {
this.timestamp = timestamp;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "TransactionAlert{" +
"alertType='" + alertType + '\'' +
", cardId='" + cardId + '\'' +
", userId='" + userId + '\'' +
", amount=" + amount +
", latitude=" + latitude +
", longitude=" + longitude +
", timestamp=" + timestamp +
", message='" + message + '\'' +
'}';
}
}

View File

@ -0,0 +1,20 @@
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- Set Kafka related loggers to ERROR level -->
<logger name="org.apache.kafka" level="ERROR"/>
<logger name="kafka" level="ERROR"/>
<!-- Set Flink related loggers to ERROR level -->
<logger name="org.apache.flink" level="ERROR"/>
<logger name="org.apache.zookeeper" level="ERROR"/>
<!-- Set root logger to WARN -->
<root level="WARN">
<appender-ref ref="CONSOLE" />
</root>
</configuration>

View File

@ -0,0 +1,39 @@
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
jobmanager:
image: flink:latest
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:latest
depends_on:
- jobmanager
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager

View File

@ -0,0 +1,65 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.anomaly</groupId>
<artifactId>kafka-consumer-visualizer</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.release>11</maven.compiler.release>
</properties>
<dependencies>
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
<!-- JSON processing -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.9</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.anomaly.consumer.TransactionConsumer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,176 @@
package com.anomaly.consumer;
import com.anomaly.model.Transaction;
import com.google.gson.Gson;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import javax.swing.*;
import java.awt.*;
import java.time.Duration;
import java.util.*;
import java.util.List;
public class TransactionConsumer {
//private static final Logger logger = LoggerFactory.getLogger(TransactionConsumer.class);
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "transaction-consumer-group";
private static final String TOPIC = "transactions";
private static final Gson gson = new Gson();
// UI Components
private static JFrame frame;
private static JTextArea logArea;
private static JPanel chartPanel;
private static final int MAX_DISPLAYED_TRANSACTIONS = 100;
private static final List<Transaction> recentTransactions = new ArrayList<>();
public static void main(String[] args) {
setupUI();
// Create consumer properties
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Subscribe to topic
consumer.subscribe(Collections.singletonList(TOPIC));
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
//logger.info("Shutting down consumer...");
consumer.close();
//logger.info("Consumer closed");
}));
// Poll for new data
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logMessage("Received: Key: " + record.key() + ", Value: " + record.value());
// Parse the transaction
Transaction transaction = gson.fromJson(record.value(), Transaction.class);
addTransaction(transaction);
}
// Update the visualization
if (!records.isEmpty()) {
updateChart();
}
}
} finally {
consumer.close();
}
}
private static void setupUI() {
frame = new JFrame("Transaction Visualizer");
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
frame.setSize(800, 600);
// Create log area
logArea = new JTextArea();
logArea.setEditable(false);
JScrollPane scrollPane = new JScrollPane(logArea);
scrollPane.setPreferredSize(new Dimension(800, 200));
// Create chart panel
chartPanel = new JPanel() {
@Override
protected void paintComponent(Graphics g) {
super.paintComponent(g);
drawChart(g);
}
};
chartPanel.setPreferredSize(new Dimension(800, 400));
// Add components to frame
frame.setLayout(new BorderLayout());
frame.add(scrollPane, BorderLayout.SOUTH);
frame.add(chartPanel, BorderLayout.CENTER);
frame.setVisible(true);
}
private static void logMessage(String message) {
SwingUtilities.invokeLater(() -> {
logArea.append(message + "\n");
logArea.setCaretPosition(logArea.getDocument().getLength());
});
}
private static void addTransaction(Transaction transaction) {
synchronized (recentTransactions) {
recentTransactions.add(transaction);
if (recentTransactions.size() > MAX_DISPLAYED_TRANSACTIONS) {
recentTransactions.remove(0);
}
}
}
private static void updateChart() {
SwingUtilities.invokeLater(() -> chartPanel.repaint());
}
private static void drawChart(Graphics g) {
Graphics2D g2d = (Graphics2D) g;
int width = chartPanel.getWidth();
int height = chartPanel.getHeight();
// Clear the background
g2d.setColor(Color.WHITE);
g2d.fillRect(0, 0, width, height);
synchronized (recentTransactions) {
if (recentTransactions.isEmpty()) return;
// Find max amount for scaling
double maxAmount = recentTransactions.stream()
.mapToDouble(Transaction::getAmount)
.max()
.orElse(1.0);
// Draw axes
g2d.setColor(Color.BLACK);
g2d.drawLine(50, height - 50, width - 50, height - 50); // X-axis
g2d.drawLine(50, 50, 50, height - 50); // Y-axis
// Draw labels
g2d.drawString("Time →", width - 70, height - 20);
g2d.drawString("Amount", 10, 40);
g2d.drawString("$" + Math.round(maxAmount), 10, 60);
// Draw transactions as points
int xStep = (width - 100) / Math.max(1, recentTransactions.size() - 1);
int x = 50;
for (Transaction transaction : recentTransactions) {
int y = height - 50 - (int) ((transaction.getAmount() / maxAmount) * (height - 100));
// Color based on available limit percentage
double limitPercentage = transaction.getAvailableLimit() /
(transaction.getAmount() + transaction.getAvailableLimit());
if (limitPercentage < 0.3) {
g2d.setColor(Color.RED);
} else if (limitPercentage < 0.6) {
g2d.setColor(Color.ORANGE);
} else {
g2d.setColor(Color.GREEN);
}
g2d.fillOval(x - 3, y - 3, 6, 6);
x += xStep;
}
}
}
}

View File

@ -0,0 +1,65 @@
package com.anomaly.model;
/**
* Represents a financial transaction with amount and available credit limit information.
*/
public class Transaction {
private double amount;
private double availableLimit;
private String cardNumber;
private String timestamp;
// Default constructor for deserialization
public Transaction() {
}
// Full constructor
public Transaction(double amount, double availableLimit, String cardNumber, String timestamp) {
this.amount = amount;
this.availableLimit = availableLimit;
this.cardNumber = cardNumber;
this.timestamp = timestamp;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
public double getAvailableLimit() {
return availableLimit;
}
public void setAvailableLimit(double availableLimit) {
this.availableLimit = availableLimit;
}
public String getCardNumber() {
return cardNumber;
}
public void setCardNumber(String cardNumber) {
this.cardNumber = cardNumber;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Transaction{" +
"amount=" + amount +
", availableLimit=" + availableLimit +
", cardNumber='" + cardNumber + '\'' +
", timestamp='" + timestamp + '\'' +
'}';
}
}

View File

@ -0,0 +1,17 @@
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- Set Kafka related loggers to ERROR level -->
<logger name="org.apache.kafka" level="ERROR"/>
<logger name="kafka" level="ERROR"/>
<logger name="org.apache.zookeeper" level="ERROR"/>
<!-- Set root logger to WARN -->
<root level="WARN">
<appender-ref ref="CONSOLE" />
</root>
</configuration>

View File

@ -0,0 +1,81 @@
#!/bin/bash
# Set working directory to script location
cd "$(dirname "$0")"
# Check if Docker daemon is running
if ! docker info &>/dev/null; then
echo "ERROR: Docker daemon is not running."
echo "Please start Docker with: 'sudo systemctl start docker'"
echo "If you want Docker to start automatically at boot: 'sudo systemctl enable docker'"
echo "To run Docker without sudo, add your user to the docker group: 'sudo usermod -aG docker $USER'"
echo "Then log out and log back in for the changes to take effect."
exit 1
fi
# Note about docker-compose.yml
echo "Note: Your docker-compose.yml contains an obsolete 'version' attribute that should be removed."
echo "Starting Docker containers..."
docker-compose up -d
echo "Building Maven projects..."
cd transaction-simulator && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
cd anomaly-detector && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
cd kafka-consumer-visualizer && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
cd alarm-visualizer && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
echo "Creating Kafka topics..."
docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions
docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic alerts
echo "Starting all applications..."
# Start Flink job (Anomaly Detector)
echo "Starting Anomaly Detector..."
cd anomaly-detector
java --add-opens java.base/java.time=ALL-UNNAMED -jar target/anomaly-detector-1.0-SNAPSHOT.jar &
ANOMALY_PID=$!
cd ..
# Start Alert Visualizer
echo "Starting Alert Visualizer..."
cd alarm-visualizer
java --add-opens java.base/java.time=ALL-UNNAMED -jar target/alarm-visualizer-1.0-SNAPSHOT.jar &
ALARM_PID=$!
cd ..
# Start Transaction Consumer/Visualizer
echo "Starting Transaction Consumer..."
cd kafka-consumer-visualizer
java --add-opens java.base/java.time=ALL-UNNAMED -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar &
CONSUMER_PID=$!
cd ..
# Start Transaction Producer last
echo "Starting Transaction Producer..."
cd transaction-simulator
java --add-opens java.base/java.time=ALL-UNNAMED -jar target/transaction-simulator-1.0-SNAPSHOT.jar &
PRODUCER_PID=$!
cd ..
echo "All applications are running!"
echo "Press Ctrl+C to stop all applications"
# Function to handle shutdown
function cleanup {
echo "Shutting down applications..."
kill $PRODUCER_PID $CONSUMER_PID $ALARM_PID $ANOMALY_PID
echo "Stopping Docker containers..."
docker-compose down
echo "All done!"
exit 0
}
# Catch shutdown signal
trap cleanup SIGINT SIGTERM
# Keep script running
while true; do
sleep 1
done

View File

@ -0,0 +1,38 @@
@echo off
setlocal enabledelayedexpansion
REM Set working directory to script location
cd /d "%~dp0"
set "PROJECT_ROOT=%cd%"
REM Check if Docker is running
docker info >nul 2>&1
if errorlevel 1 (
echo ERROR: Docker daemon is not running.
echo Please start Docker Desktop and try again.
pause
exit /b 1
)
echo Starting Docker containers...
docker-compose up -d
echo Creating Kafka topics...
docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions
docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic alerts
echo Starting all applications in new windows...
echo Starting Anomaly Detector...
start "Anomaly Detector" cmd /k "cd /d %PROJECT_ROOT%\anomaly-detector && java --add-opens java.base/java.time=ALL-UNNAMED -jar target\anomaly-detector-1.0-SNAPSHOT.jar"
echo Starting Alert Visualizer...
start "Alert Visualizer" cmd /k "cd /d %PROJECT_ROOT%\alarm-visualizer && java --add-opens java.base/java.time=ALL-UNNAMED -jar target\alarm-visualizer-1.0-SNAPSHOT.jar"
echo Starting Transaction Consumer...
start "Transaction Consumer" cmd /k "cd /d %PROJECT_ROOT%\kafka-consumer-visualizer && java --add-opens java.base/java.time=ALL-UNNAMED -jar target\kafka-consumer-visualizer-1.0-SNAPSHOT.jar"
echo Starting Transaction Producer...
start "Transaction Producer" cmd /k "cd /d %PROJECT_ROOT%\transaction-simulator && java --add-opens java.base/java.time=ALL-UNNAMED -jar target\transaction-simulator-1.0-SNAPSHOT.jar"
echo All applications are running!
echo To stop everything, close all opened windows and run:
echo docker-compose down
pause

View File

@ -0,0 +1,30 @@
@echo off
REM filepath: d:\studia\semestr3\psd\projekt\psd_project\stop_all_windows.bat
echo Stopping all Java applications...
REM Stop Transaction Simulator
for /f "tokens=2" %%a in ('tasklist /FI "IMAGENAME eq java.exe" /v /fo list ^| findstr /I "transaction-simulator"') do (
taskkill /PID %%a /F
)
REM Stop Anomaly Detector
for /f "tokens=2" %%a in ('tasklist /FI "IMAGENAME eq java.exe" /v /fo list ^| findstr /I "anomaly-detector"') do (
taskkill /PID %%a /F
)
REM Stop Kafka Consumer Visualizer
for /f "tokens=2" %%a in ('tasklist /FI "IMAGENAME eq java.exe" /v /fo list ^| findstr /I "kafka-consumer-visualizer"') do (
taskkill /PID %%a /F
)
REM Stop Alarm Visualizer
for /f "tokens=2" %%a in ('tasklist /FI "IMAGENAME eq java.exe" /v /fo list ^| findstr /I "alarm-visualizer"') do (
taskkill /PID %%a /F
)
echo Stopping Docker containers...
docker-compose down
echo All applications have been stopped!
pause

View File

@ -0,0 +1,12 @@
#!/bin/bash
echo "Stopping all Java applications..."
pkill -f "java -jar target/transaction-simulator"
pkill -f "java -jar target/anomaly-detector"
pkill -f "java -jar target/kafka-consumer-visualizer"
pkill -f "java -jar target/alarm-visualizer"
echo "Stopping Docker containers..."
docker-compose down
echo "All applications have been stopped!"

View File

@ -0,0 +1,65 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.anomaly</groupId>
<artifactId>transaction-simulator</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.release>11</maven.compiler.release>
</properties>
<dependencies>
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
<!-- JSON processing -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.9</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.anomaly.producer.TransactionProducer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,195 @@
package com.anomaly.generator;
import com.anomaly.model.Transaction;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
public class TransactionGenerator {
private static final int NUM_CARDS = 10000;
private static final int NUM_USERS = 5000; // Assuming each user has ~2 cards on average
private static final Map<String, Set<LocationPoint>> cardLocations = new HashMap<>();
private static final Map<String, Double> cardLimits = new HashMap<>();
private static final Map<String, String> cardToUser = new HashMap<>();
private static final List<String> cardIds = new ArrayList<>();
private static final List<String> userIds = new ArrayList<>();
// Anomaly types
private static final int ANOMALY_NONE = 0;
private static final int ANOMALY_AMOUNT = 1;
private static final int ANOMALY_LOCATION = 2;
private static final int ANOMALY_FREQUENCY = 3;
// Probability of generating an anomaly (1%)
private static final double ANOMALY_PROBABILITY = 0.05;
// Initialize card and user data
static {
// Generate user IDs
for (int i = 0; i < NUM_USERS; i++) {
userIds.add("USER_" + String.format("%05d", i));
}
// Generate card IDs and assign to users
for (int i = 0; i < NUM_CARDS; i++) {
String cardId = "CARD_" + String.format("%05d", i);
cardIds.add(cardId);
String userId = userIds.get(ThreadLocalRandom.current().nextInt(NUM_USERS));
cardToUser.put(cardId, userId);
// Initialize empty set of locations for this card
cardLocations.put(cardId, new HashSet<>());
// Assign random limit between $1,000 and $20,000
cardLimits.put(cardId, 1000.0 + ThreadLocalRandom.current().nextDouble() * 19000.0);
}
}
public Transaction generateTransaction(boolean forceAnomaly, int anomalyType) {
// Select a random card
String cardId = cardIds.get(ThreadLocalRandom.current().nextInt(NUM_CARDS));
String userId = cardToUser.get(cardId);
double availableLimit = cardLimits.get(cardId);
// Determine if this should be an anomaly
int actualAnomalyType = ANOMALY_NONE;
if (forceAnomaly) {
actualAnomalyType = (anomalyType >= 0 && anomalyType <= 3) ?
anomalyType : ThreadLocalRandom.current().nextInt(1, 4);
} else if (ThreadLocalRandom.current().nextDouble() < ANOMALY_PROBABILITY) {
double roll = ThreadLocalRandom.current().nextDouble();
if (roll < 0.4) {
actualAnomalyType = ANOMALY_LOCATION;
} else if (roll < 0.7) {
actualAnomalyType = ANOMALY_AMOUNT;
} else {
actualAnomalyType = ANOMALY_FREQUENCY;
}
}
// Get or generate location
LocationPoint location = getLocationForCard(cardId, actualAnomalyType == ANOMALY_LOCATION);
double latitude = location.latitude;
double longitude = location.longitude;
// Generate transaction amount
double amount;
if (actualAnomalyType == ANOMALY_AMOUNT) {
// Generate anomalously high amount (50-90% of available limit)
amount = availableLimit * (0.5 + ThreadLocalRandom.current().nextDouble() * 0.4);
} else {
// Normal amount (1-10% of available limit)
amount = availableLimit * (0.01 + ThreadLocalRandom.current().nextDouble() * 0.09);
}
// Update available limit
double newLimit = availableLimit - amount;
cardLimits.put(cardId, newLimit > 0 ? newLimit : 0);
// Create transaction
Transaction transaction = new Transaction(
cardId,
userId,
latitude,
longitude,
amount,
newLimit,
Instant.now()
);
return transaction;
}
private LocationPoint getLocationForCard(String cardId, boolean generateAnomaly) {
Set<LocationPoint> locations = cardLocations.get(cardId);
if (locations.isEmpty() || generateAnomaly) {
// Generate a random worldwide location
double latitude = ThreadLocalRandom.current().nextDouble(-90, 90);
double longitude = ThreadLocalRandom.current().nextDouble(-180, 180);
LocationPoint newLocation = new LocationPoint(latitude, longitude);
// Store this location for future use unless it's an anomaly
if (!generateAnomaly) {
locations.add(newLocation);
}
return newLocation;
} else {
// Pick a random location from the card's history
LocationPoint[] locArray = locations.toArray(new LocationPoint[0]);
return locArray[ThreadLocalRandom.current().nextInt(locArray.length)];
}
}
// Method to simulate frequency anomaly by generating multiple transactions in short succession
public List<Transaction> generateFrequencyAnomaly(String specificCardId) {
List<Transaction> transactions = new ArrayList<>();
String cardId = specificCardId != null ?
specificCardId : cardIds.get(ThreadLocalRandom.current().nextInt(NUM_CARDS));
// Generate 5-10 transactions in quick succession
int numTransactions = ThreadLocalRandom.current().nextInt(5, 11);
for (int i = 0; i < numTransactions; i++) {
transactions.add(generateTransactionForCard(cardId, false, ANOMALY_NONE));
}
return transactions;
}
private Transaction generateTransactionForCard(String cardId, boolean forceAnomaly, int anomalyType) {
String userId = cardToUser.get(cardId);
double availableLimit = cardLimits.get(cardId);
// Get location
LocationPoint location = getLocationForCard(cardId, forceAnomaly && anomalyType == ANOMALY_LOCATION);
double latitude = location.latitude;
double longitude = location.longitude;
// Generate amount
double amount;
if (forceAnomaly && anomalyType == ANOMALY_AMOUNT) {
amount = availableLimit * (0.5 + ThreadLocalRandom.current().nextDouble() * 0.4);
} else {
amount = availableLimit * (0.01 + ThreadLocalRandom.current().nextDouble() * 0.09);
}
// Update available limit
double newLimit = availableLimit - amount;
cardLimits.put(cardId, newLimit > 0 ? newLimit : 0);
return new Transaction(
cardId,
userId,
latitude,
longitude,
amount,
newLimit,
Instant.now()
);
}
private static class LocationPoint {
final double latitude;
final double longitude;
LocationPoint(double latitude, double longitude) {
this.latitude = latitude;
this.longitude = longitude;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LocationPoint that = (LocationPoint) o;
return Double.compare(that.latitude, latitude) == 0 &&
Double.compare(that.longitude, longitude) == 0;
}
@Override
public int hashCode() {
return Objects.hash(latitude, longitude);
}
}
}

View File

@ -0,0 +1,85 @@
package com.anomaly.model;
import java.time.Instant;
import java.util.Objects;
public class Transaction {
private final String cardId;
private final String userId;
private final double latitude;
private final double longitude;
private final double amount;
private final double availableLimit;
private final Instant timestamp;
public Transaction(String cardId, String userId, double latitude, double longitude,
double amount, double availableLimit, Instant timestamp) {
this.cardId = cardId;
this.userId = userId;
this.latitude = latitude;
this.longitude = longitude;
this.amount = amount;
this.availableLimit = availableLimit;
this.timestamp = timestamp;
}
public String getCardId() {
return cardId;
}
public String getUserId() {
return userId;
}
public double getLatitude() {
return latitude;
}
public double getLongitude() {
return longitude;
}
public double getAmount() {
return amount;
}
public double getAvailableLimit() {
return availableLimit;
}
public Instant getTimestamp() {
return timestamp;
}
@Override
public String toString() {
return "Transaction{" +
"cardId='" + cardId + '\'' +
", userId='" + userId + '\'' +
", latitude=" + latitude +
", longitude=" + longitude +
", amount=" + amount +
", availableLimit=" + availableLimit +
", timestamp=" + timestamp +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Transaction that = (Transaction) o;
return Double.compare(that.latitude, latitude) == 0 &&
Double.compare(that.longitude, longitude) == 0 &&
Double.compare(that.amount, amount) == 0 &&
Double.compare(that.availableLimit, availableLimit) == 0 &&
Objects.equals(cardId, that.cardId) &&
Objects.equals(userId, that.userId) &&
Objects.equals(timestamp, that.timestamp);
}
@Override
public int hashCode() {
return Objects.hash(cardId, userId, latitude, longitude, amount, availableLimit, timestamp);
}
}

View File

@ -0,0 +1,104 @@
package com.anomaly.producer;
import com.anomaly.generator.TransactionGenerator;
import com.anomaly.model.Transaction;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
public class TransactionProducer {
//private static final Logger logger = LoggerFactory.getLogger(TransactionProducer.class);
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC_NAME = "transactions";
private static final TransactionGenerator generator = new TransactionGenerator();
// Replace simple Gson with a properly configured instance
private static final Gson gson = new GsonBuilder()
.registerTypeAdapter(Instant.class, new TypeAdapter<Instant>() {
@Override
public void write(JsonWriter out, Instant value) throws IOException {
out.value(value != null ? value.toString() : null);
}
@Override
public Instant read(JsonReader in) throws IOException {
return Instant.parse(in.nextString());
}
})
.create();
public static void main(String[] args) {
// Create producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Create producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
//logger.info("Shutting down producer...");
producer.flush();
producer.close();
//logger.info("Producer closed");
}));
// Generate and send transactions
try {
while (true) {
// Normal transaction generation
if (ThreadLocalRandom.current().nextDouble() < 0.05) {
// 5% chance to generate a frequency anomaly
List<Transaction> anomalousTransactions = generator.generateFrequencyAnomaly(null);
for (Transaction transaction : anomalousTransactions) {
sendTransaction(producer, transaction);
}
} else {
// Regular transaction
boolean forceAnomaly = ThreadLocalRandom.current().nextDouble() < 0.02; // 2% chance
Transaction transaction = generator.generateTransaction(forceAnomaly, -1);
sendTransaction(producer, transaction);
}
// Sleep between 100ms and 1s before generating next transaction
Thread.sleep(ThreadLocalRandom.current().nextLong(100, 1000));
}
} catch (InterruptedException | ExecutionException e) {
//logger.error("Error in transaction producer", e);
} finally {
producer.flush();
producer.close();
}
}
private static void sendTransaction(KafkaProducer<String, String> producer, Transaction transaction)
throws ExecutionException, InterruptedException {
String jsonTransaction = gson.toJson(transaction);
ProducerRecord<String, String> record = new ProducerRecord<>(
TOPIC_NAME,
transaction.getCardId(),
jsonTransaction
);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
//logger.info("Received metadata: Topic: {}, Partition: {}, Offset: {}, Timestamp: {}",
// metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp());
} else {
//logger.error("Error sending message", exception);
}
}).get(); // Making it synchronous for demonstration
}
}

View File

@ -0,0 +1,16 @@
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- Set Kafka related loggers to ERROR level -->
<logger name="org.apache.kafka" level="ERROR"/>
<logger name="kafka" level="ERROR"/>
<!-- Set root logger to WARN -->
<root level="WARN">
<appender-ref ref="CONSOLE" />
</root>
</configuration>