mirror of
https://github.com/kuhyx/WUT_Computer_Science.git
synced 2026-07-04 16:03:11 +02:00
chore: vibe coding psd project
This commit is contained in:
commit
c6b4e8fd3a
66
alarm-visualizer/pom.xml
Normal file
66
alarm-visualizer/pom.xml
Normal file
@ -0,0 +1,66 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.anomaly</groupId>
|
||||
<artifactId>alarm-visualizer</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>11</maven.compiler.source>
|
||||
<maven.compiler.target>11</maven.compiler.target>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<!-- Kafka -->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>2.8.1</version>
|
||||
</dependency>
|
||||
<!-- JSON processing -->
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
<version>2.8.9</version>
|
||||
</dependency>
|
||||
<!-- Logging -->
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.32</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>1.2.9</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.2.4</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<transformers>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>com.anomaly.visualizer.AlertVisualizer</mainClass>
|
||||
</transformer>
|
||||
</transformers>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
@ -0,0 +1,275 @@
|
||||
package com.anomaly.visualizer;
|
||||
|
||||
import com.anomaly.model.TransactionAlert;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.kafka.clients.consumer.*;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.swing.*;
|
||||
import javax.swing.table.DefaultTableModel;
|
||||
import java.awt.*;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.List;
|
||||
|
||||
public class AlertVisualizer {
|
||||
private static final Logger logger = LoggerFactory.getLogger(AlertVisualizer.class);
|
||||
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
|
||||
private static final String GROUP_ID = "alert-visualizer-group";
|
||||
private static final String TOPIC = "alerts";
|
||||
private static final Gson gson = new Gson();
|
||||
|
||||
// UI Components
|
||||
private static JFrame frame;
|
||||
private static JTable alertTable;
|
||||
private static DefaultTableModel tableModel;
|
||||
private static JTextArea detailArea;
|
||||
private static final List<TransactionAlert> allAlerts = new ArrayList<>();
|
||||
private static final DateTimeFormatter formatter =
|
||||
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
|
||||
|
||||
public static void main(String[] args) {
|
||||
setupUI();
|
||||
|
||||
// Create consumer properties
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
|
||||
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
|
||||
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
|
||||
// Create consumer
|
||||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
|
||||
|
||||
// Subscribe to topic
|
||||
consumer.subscribe(Collections.singletonList(TOPIC));
|
||||
|
||||
// Add shutdown hook
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
logger.info("Shutting down alert visualizer...");
|
||||
consumer.close();
|
||||
logger.info("Alert visualizer closed");
|
||||
}));
|
||||
|
||||
// Poll for new alerts
|
||||
try {
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
||||
|
||||
boolean newAlerts = false;
|
||||
for (ConsumerRecord<String, String> record : records) {
|
||||
// Parse the alert
|
||||
TransactionAlert alert = gson.fromJson(record.value(), TransactionAlert.class);
|
||||
addAlert(alert);
|
||||
newAlerts = true;
|
||||
|
||||
// Display notification for new alert
|
||||
displayNotification(alert);
|
||||
}
|
||||
|
||||
// Update the UI if new alerts arrived
|
||||
if (newAlerts) {
|
||||
updateAlertTable();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
consumer.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static void setupUI() {
|
||||
frame = new JFrame("Transaction Alert Visualizer");
|
||||
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
|
||||
frame.setSize(1024, 768);
|
||||
|
||||
// Create table with columns
|
||||
String[] columnNames = {"Time", "Alert Type", "Card ID", "User ID", "Amount", "Message"};
|
||||
tableModel = new DefaultTableModel(columnNames, 0);
|
||||
alertTable = new JTable(tableModel);
|
||||
alertTable.setSelectionMode(ListSelectionModel.SINGLE_SELECTION);
|
||||
|
||||
// Add selection listener to show details when an alert is selected
|
||||
alertTable.getSelectionModel().addListSelectionListener(e -> {
|
||||
if (!e.getValueIsAdjusting()) {
|
||||
int selectedRow = alertTable.getSelectedRow();
|
||||
if (selectedRow >= 0 && selectedRow < allAlerts.size()) {
|
||||
showAlertDetails(allAlerts.get(selectedRow));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
JScrollPane tableScrollPane = new JScrollPane(alertTable);
|
||||
|
||||
// Create detail panel
|
||||
detailArea = new JTextArea();
|
||||
detailArea.setEditable(false);
|
||||
JScrollPane detailScrollPane = new JScrollPane(detailArea);
|
||||
|
||||
// Create map visualization panel
|
||||
JPanel mapPanel = new JPanel() {
|
||||
@Override
|
||||
protected void paintComponent(Graphics g) {
|
||||
super.paintComponent(g);
|
||||
drawMap(g);
|
||||
}
|
||||
};
|
||||
|
||||
// Create split panes for layout
|
||||
JSplitPane mainSplitPane = new JSplitPane(JSplitPane.VERTICAL_SPLIT,
|
||||
tableScrollPane, new JSplitPane(JSplitPane.HORIZONTAL_SPLIT, detailScrollPane, mapPanel));
|
||||
mainSplitPane.setDividerLocation(300);
|
||||
((JSplitPane)mainSplitPane.getBottomComponent()).setDividerLocation(500);
|
||||
|
||||
frame.add(mainSplitPane);
|
||||
frame.setVisible(true);
|
||||
}
|
||||
|
||||
private static void addAlert(TransactionAlert alert) {
|
||||
synchronized (allAlerts) {
|
||||
allAlerts.add(0, alert); // Add at the beginning for newest first
|
||||
}
|
||||
}
|
||||
|
||||
private static void updateAlertTable() {
|
||||
SwingUtilities.invokeLater(() -> {
|
||||
tableModel.setRowCount(0); // Clear table
|
||||
|
||||
synchronized (allAlerts) {
|
||||
for (TransactionAlert alert : allAlerts) {
|
||||
String formattedTime = formatter.format(alert.getAlertTime());
|
||||
tableModel.addRow(new Object[]{
|
||||
formattedTime,
|
||||
alert.getAlertType(),
|
||||
alert.getCardId(),
|
||||
alert.getUserId(),
|
||||
String.format("$%.2f", alert.getAmount()),
|
||||
alert.getMessage()
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static void showAlertDetails(TransactionAlert alert) {
|
||||
SwingUtilities.invokeLater(() -> {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("ALERT DETAILS\n");
|
||||
sb.append("============================================\n\n");
|
||||
sb.append("Alert Type: ").append(alert.getAlertType()).append("\n");
|
||||
sb.append("Alert Time: ").append(formatter.format(alert.getAlertTime())).append("\n");
|
||||
sb.append("Transaction Time: ").append(formatter.format(alert.getTimestamp())).append("\n\n");
|
||||
sb.append("Card ID: ").append(alert.getCardId()).append("\n");
|
||||
sb.append("User ID: ").append(alert.getUserId()).append("\n");
|
||||
sb.append("Transaction Amount: $").append(String.format("%.2f", alert.getAmount())).append("\n\n");
|
||||
sb.append("Location: ").append(alert.getLatitude()).append(", ").append(alert.getLongitude()).append("\n\n");
|
||||
sb.append("Alert Message: ").append(alert.getMessage()).append("\n");
|
||||
|
||||
detailArea.setText(sb.toString());
|
||||
frame.repaint(); // Trigger map redraw to highlight selected alert
|
||||
});
|
||||
}
|
||||
|
||||
private static void drawMap(Graphics g) {
|
||||
Graphics2D g2d = (Graphics2D) g;
|
||||
int width = g2d.getClipBounds().width;
|
||||
int height = g2d.getClipBounds().height;
|
||||
|
||||
// Draw world map (simplified)
|
||||
g2d.setColor(Color.LIGHT_GRAY);
|
||||
g2d.fillRect(0, 0, width, height);
|
||||
g2d.setColor(Color.DARK_GRAY);
|
||||
g2d.drawRect(0, 0, width - 1, height - 1);
|
||||
|
||||
// Draw grid lines
|
||||
g2d.setColor(Color.GRAY);
|
||||
for (int i = 0; i < width; i += 50) {
|
||||
g2d.drawLine(i, 0, i, height);
|
||||
}
|
||||
for (int i = 0; i < height; i += 50) {
|
||||
g2d.drawLine(0, i, width, i);
|
||||
}
|
||||
|
||||
// Get selected alert
|
||||
int selectedRow = alertTable.getSelectedRow();
|
||||
TransactionAlert selectedAlert = null;
|
||||
if (selectedRow >= 0 && selectedRow < allAlerts.size()) {
|
||||
selectedAlert = allAlerts.get(selectedRow);
|
||||
}
|
||||
|
||||
// Draw alert points
|
||||
synchronized (allAlerts) {
|
||||
for (TransactionAlert alert : allAlerts) {
|
||||
// Map lat/lon to screen coordinates
|
||||
int x = (int) ((alert.getLongitude() + 180) / 360 * width);
|
||||
int y = (int) ((90 - alert.getLatitude()) / 180 * height);
|
||||
|
||||
// Color by alert type
|
||||
if (alert.getAlertType().equals("AMOUNT_ANOMALY")) {
|
||||
g2d.setColor(Color.RED);
|
||||
} else if (alert.getAlertType().equals("LOCATION_ANOMALY")) {
|
||||
g2d.setColor(Color.BLUE);
|
||||
} else {
|
||||
g2d.setColor(Color.ORANGE);
|
||||
}
|
||||
|
||||
// Make selected alert larger
|
||||
if (alert == selectedAlert) {
|
||||
g2d.fillOval(x - 8, y - 8, 16, 16);
|
||||
} else {
|
||||
g2d.fillOval(x - 4, y - 4, 8, 8);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Draw legend
|
||||
g2d.setColor(Color.BLACK);
|
||||
g2d.drawString("Legend:", width - 150, 20);
|
||||
g2d.setColor(Color.RED);
|
||||
g2d.fillOval(width - 140, 30, 10, 10);
|
||||
g2d.setColor(Color.BLACK);
|
||||
g2d.drawString("Amount Anomaly", width - 125, 40);
|
||||
g2d.setColor(Color.BLUE);
|
||||
g2d.fillOval(width - 140, 50, 10, 10);
|
||||
g2d.setColor(Color.BLACK);
|
||||
g2d.drawString("Location Anomaly", width - 125, 60);
|
||||
g2d.setColor(Color.ORANGE);
|
||||
g2d.fillOval(width - 140, 70, 10, 10);
|
||||
g2d.setColor(Color.BLACK);
|
||||
g2d.drawString("Frequency Anomaly", width - 125, 80);
|
||||
}
|
||||
|
||||
private static void displayNotification(TransactionAlert alert) {
|
||||
SwingUtilities.invokeLater(() -> {
|
||||
// Make a simple notification window
|
||||
JDialog dialog = new JDialog(frame, "New Alert!", false);
|
||||
dialog.setSize(400, 150);
|
||||
dialog.setLocationRelativeTo(frame);
|
||||
|
||||
JPanel panel = new JPanel(new BorderLayout());
|
||||
JLabel label = new JLabel("<html><b>" + alert.getAlertType() + "</b><br>" +
|
||||
alert.getMessage() + "<br>Card: " + alert.getCardId() + "</html>");
|
||||
label.setBorder(BorderFactory.createEmptyBorder(10, 10, 10, 10));
|
||||
panel.add(label, BorderLayout.CENTER);
|
||||
|
||||
JButton closeButton = new JButton("Dismiss");
|
||||
closeButton.addActionListener(e -> dialog.dispose());
|
||||
JPanel buttonPanel = new JPanel();
|
||||
buttonPanel.add(closeButton);
|
||||
panel.add(buttonPanel, BorderLayout.SOUTH);
|
||||
|
||||
dialog.add(panel);
|
||||
dialog.setVisible(true);
|
||||
|
||||
// Auto-dismiss after 5 seconds
|
||||
Timer timer = new Timer(5000, e -> dialog.dispose());
|
||||
timer.setRepeats(false);
|
||||
timer.start();
|
||||
});
|
||||
}
|
||||
}
|
||||
81
anomaly-detector/pom.xml
Normal file
81
anomaly-detector/pom.xml
Normal file
@ -0,0 +1,81 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.anomaly</groupId>
|
||||
<artifactId>anomaly-detector</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>11</maven.compiler.source>
|
||||
<maven.compiler.target>11</maven.compiler.target>
|
||||
<flink.version>1.14.2</flink.version>
|
||||
<scala.binary.version>2.12</scala.binary.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<!-- Apache Flink core -->
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
<!-- Flink Kafka connector -->
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
<!-- Flink clients -->
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-clients_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
<!-- JSON processing -->
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
<version>2.8.9</version>
|
||||
</dependency>
|
||||
<!-- Logging -->
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.32</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>1.2.9</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.2.4</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<transformers>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>com.anomaly.detector.AnomalyDetector</mainClass>
|
||||
</transformer>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
|
||||
</transformers>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
@ -0,0 +1,269 @@
|
||||
package com.anomaly.detector;
|
||||
|
||||
import com.anomaly.model.Transaction;
|
||||
import com.anomaly.model.TransactionAlert;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.flink.api.common.functions.MapFunction;
|
||||
import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
import java.util.*;
|
||||
import java.time.Instant;
|
||||
|
||||
public class AnomalyDetector {
|
||||
|
||||
private static final String INPUT_TOPIC = "transactions";
|
||||
private static final String OUTPUT_TOPIC = "alerts";
|
||||
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
|
||||
private static final Gson gson = new Gson();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Set up the execution environment
|
||||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
// Configure Kafka consumer
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
|
||||
properties.setProperty("group.id", "anomaly-detector");
|
||||
|
||||
// Create Kafka consumer
|
||||
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
|
||||
INPUT_TOPIC,
|
||||
new SimpleStringSchema(),
|
||||
properties
|
||||
);
|
||||
|
||||
// Parse JSON transactions
|
||||
DataStream<Transaction> transactionStream = env
|
||||
.addSource(consumer)
|
||||
.map(new MapFunction<String, Transaction>() {
|
||||
@Override
|
||||
public Transaction map(String value) throws Exception {
|
||||
return gson.fromJson(value, Transaction.class);
|
||||
}
|
||||
});
|
||||
|
||||
// Detect anomalies based on different metrics
|
||||
// 1. Amount anomaly - sudden high-value transactions
|
||||
DataStream<TransactionAlert> amountAlerts = transactionStream
|
||||
.keyBy(Transaction::getCardId)
|
||||
.window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(1)))
|
||||
.process(new AmountAnomalyDetector());
|
||||
|
||||
// 2. Location anomaly - sudden change in location
|
||||
DataStream<TransactionAlert> locationAlerts = transactionStream
|
||||
.keyBy(Transaction::getCardId)
|
||||
.window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(1)))
|
||||
.process(new LocationAnomalyDetector());
|
||||
|
||||
// 3. Frequency anomaly - unusual number of transactions in short time
|
||||
DataStream<TransactionAlert> frequencyAlerts = transactionStream
|
||||
.keyBy(Transaction::getCardId)
|
||||
.window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1)))
|
||||
.process(new FrequencyAnomalyDetector());
|
||||
|
||||
// Union all alert streams
|
||||
DataStream<TransactionAlert> allAlerts = amountAlerts
|
||||
.union(locationAlerts, frequencyAlerts);
|
||||
|
||||
// Convert alerts to JSON and send to Kafka
|
||||
allAlerts
|
||||
.map(alert -> gson.toJson(alert))
|
||||
.addSink(new FlinkKafkaProducer<>(
|
||||
OUTPUT_TOPIC,
|
||||
new SimpleStringSchema(),
|
||||
properties
|
||||
));
|
||||
|
||||
// Execute the Flink job
|
||||
env.execute("Credit Card Transaction Anomaly Detection");
|
||||
}
|
||||
|
||||
// Detector for unusual transaction amounts
|
||||
public static class AmountAnomalyDetector
|
||||
extends ProcessWindowFunction<Transaction, TransactionAlert, String, TimeWindow> {
|
||||
|
||||
@Override
|
||||
public void process(String cardId, Context context, Iterable<Transaction> transactions,
|
||||
Collector<TransactionAlert> out) {
|
||||
List<Transaction> transactionList = new ArrayList<>();
|
||||
transactions.forEach(transactionList::add);
|
||||
|
||||
if (transactionList.isEmpty()) return;
|
||||
|
||||
// Calculate statistics
|
||||
double averageAmount = transactionList.stream()
|
||||
.mapToDouble(Transaction::getAmount)
|
||||
.average()
|
||||
.orElse(0);
|
||||
|
||||
double stdDeviation = calculateStdDeviation(transactionList, averageAmount);
|
||||
|
||||
// Check for anomalies (transactions that are more than 3 standard deviations from mean)
|
||||
for (Transaction transaction : transactionList) {
|
||||
if (stdDeviation > 0 && Math.abs(transaction.getAmount() - averageAmount) > 3 * stdDeviation) {
|
||||
out.collect(new TransactionAlert(
|
||||
"AMOUNT_ANOMALY",
|
||||
transaction.getCardId(),
|
||||
transaction.getUserId(),
|
||||
transaction.getAmount(),
|
||||
transaction.getLatitude(),
|
||||
transaction.getLongitude(),
|
||||
transaction.getTimestamp(),
|
||||
"Unusual transaction amount detected: $" + transaction.getAmount() +
|
||||
" (Average: $" + String.format("%.2f", averageAmount) + ")"
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private double calculateStdDeviation(List<Transaction> transactions, double mean) {
|
||||
return Math.sqrt(transactions.stream()
|
||||
.mapToDouble(t -> Math.pow(t.getAmount() - mean, 2))
|
||||
.average()
|
||||
.orElse(0));
|
||||
}
|
||||
}
|
||||
|
||||
// Detector for unusual transaction locations
|
||||
public static class LocationAnomalyDetector
|
||||
extends ProcessWindowFunction<Transaction, TransactionAlert, String, TimeWindow> {
|
||||
|
||||
// Map to store frequent locations for each card
|
||||
private final Map<String, Set<LocationPoint>> cardLocations = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void process(String cardId, Context context, Iterable<Transaction> transactions,
|
||||
Collector<TransactionAlert> out) {
|
||||
List<Transaction> transactionList = new ArrayList<>();
|
||||
transactions.forEach(transactionList::add);
|
||||
|
||||
if (transactionList.isEmpty()) return;
|
||||
|
||||
// Get or create location set for this card
|
||||
Set<LocationPoint> frequentLocations = cardLocations.computeIfAbsent(cardId, k -> new HashSet<>());
|
||||
|
||||
// Process each transaction
|
||||
for (Transaction transaction : transactionList) {
|
||||
LocationPoint currentPoint = new LocationPoint(transaction.getLatitude(), transaction.getLongitude());
|
||||
|
||||
// If we have at least 3 frequent locations for this card
|
||||
if (frequentLocations.size() >= 3) {
|
||||
boolean isNearKnownLocation = false;
|
||||
|
||||
// Check if current location is near any known frequent location
|
||||
for (LocationPoint knownPoint : frequentLocations) {
|
||||
if (calculateDistance(currentPoint, knownPoint) < 50) { // Less than 50km
|
||||
isNearKnownLocation = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// If not near any known location, it might be an anomaly
|
||||
if (!isNearKnownLocation) {
|
||||
out.collect(new TransactionAlert(
|
||||
"LOCATION_ANOMALY",
|
||||
transaction.getCardId(),
|
||||
transaction.getUserId(),
|
||||
transaction.getAmount(),
|
||||
transaction.getLatitude(),
|
||||
transaction.getLongitude(),
|
||||
transaction.getTimestamp(),
|
||||
"Unusual transaction location detected at: " +
|
||||
transaction.getLatitude() + ", " + transaction.getLongitude()
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Add current location to frequent locations (max 10 locations per card)
|
||||
if (frequentLocations.size() < 10) {
|
||||
frequentLocations.add(currentPoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate distance between two points using Haversine formula (in km)
|
||||
private double calculateDistance(LocationPoint p1, LocationPoint p2) {
|
||||
final int R = 6371; // Earth radius in km
|
||||
|
||||
double latDistance = Math.toRadians(p2.latitude - p1.latitude);
|
||||
double lonDistance = Math.toRadians(p2.longitude - p1.longitude);
|
||||
|
||||
double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2)
|
||||
+ Math.cos(Math.toRadians(p1.latitude)) * Math.cos(Math.toRadians(p2.latitude))
|
||||
* Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2);
|
||||
|
||||
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
|
||||
|
||||
return R * c;
|
||||
}
|
||||
|
||||
private static class LocationPoint {
|
||||
private final double latitude;
|
||||
private final double longitude;
|
||||
|
||||
public LocationPoint(double latitude, double longitude) {
|
||||
this.latitude = latitude;
|
||||
this.longitude = longitude;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
LocationPoint that = (LocationPoint) o;
|
||||
return Double.compare(that.latitude, latitude) == 0 &&
|
||||
Double.compare(that.longitude, longitude) == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(latitude, longitude);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Detector for unusual transaction frequency
|
||||
public static class FrequencyAnomalyDetector
|
||||
extends ProcessWindowFunction<Transaction, TransactionAlert, String, TimeWindow> {
|
||||
|
||||
@Override
|
||||
public void process(String cardId, Context context, Iterable<Transaction> transactions,
|
||||
Collector<TransactionAlert> out) {
|
||||
List<Transaction> transactionList = new ArrayList<>();
|
||||
transactions.forEach(transactionList::add);
|
||||
|
||||
// Get window info
|
||||
long windowStart = context.window().getStart();
|
||||
long windowEnd = context.window().getEnd();
|
||||
long windowSizeMinutes = (windowEnd - windowStart) / (1000 * 60);
|
||||
|
||||
// If there are more than 5 transactions in 5 minutes for the same card, flag it
|
||||
if (transactionList.size() > 5) {
|
||||
Transaction latestTransaction = transactionList.stream()
|
||||
.max(Comparator.comparing(Transaction::getTimestamp))
|
||||
.orElse(transactionList.get(0));
|
||||
|
||||
out.collect(new TransactionAlert(
|
||||
"FREQUENCY_ANOMALY",
|
||||
latestTransaction.getCardId(),
|
||||
latestTransaction.getUserId(),
|
||||
latestTransaction.getAmount(),
|
||||
latestTransaction.getLatitude(),
|
||||
latestTransaction.getLongitude(),
|
||||
latestTransaction.getTimestamp(),
|
||||
"Unusual transaction frequency detected: " + transactionList.size() +
|
||||
" transactions in " + windowSizeMinutes + " minutes"
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,66 @@
|
||||
package com.anomaly.model;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
public class TransactionAlert {
|
||||
private String alertType;
|
||||
private String cardId;
|
||||
private String userId;
|
||||
private double amount;
|
||||
private double latitude;
|
||||
private double longitude;
|
||||
private Instant timestamp;
|
||||
private String message;
|
||||
private Instant alertTime;
|
||||
|
||||
public TransactionAlert() {
|
||||
this.alertTime = Instant.now();
|
||||
}
|
||||
|
||||
public TransactionAlert(String alertType, String cardId, String userId, double amount,
|
||||
double latitude, double longitude, Instant timestamp, String message) {
|
||||
this.alertType = alertType;
|
||||
this.cardId = cardId;
|
||||
this.userId = userId;
|
||||
this.amount = amount;
|
||||
this.latitude = latitude;
|
||||
this.longitude = longitude;
|
||||
this.timestamp = timestamp;
|
||||
this.message = message;
|
||||
this.alertTime = Instant.now();
|
||||
}
|
||||
|
||||
// Getters and setters
|
||||
public String getAlertType() { return alertType; }
|
||||
public void setAlertType(String alertType) { this.alertType = alertType; }
|
||||
public String getCardId() { return cardId; }
|
||||
public void setCardId(String cardId) { this.cardId = cardId; }
|
||||
public String getUserId() { return userId; }
|
||||
public void setUserId(String userId) { this.userId = userId; }
|
||||
public double getAmount() { return amount; }
|
||||
public void setAmount(double amount) { this.amount = amount; }
|
||||
public double getLatitude() { return latitude; }
|
||||
public void setLatitude(double latitude) { this.latitude = latitude; }
|
||||
public double getLongitude() { return longitude; }
|
||||
public void setLongitude(double longitude) { this.longitude = longitude; }
|
||||
public Instant getTimestamp() { return timestamp; }
|
||||
public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }
|
||||
public String getMessage() { return message; }
|
||||
public void setMessage(String message) { this.message = message; }
|
||||
public Instant getAlertTime() { return alertTime; }
|
||||
public void setAlertTime(Instant alertTime) { this.alertTime = alertTime; }
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TransactionAlert{" +
|
||||
"alertType='" + alertType + '\'' +
|
||||
", cardId='" + cardId + '\'' +
|
||||
", userId='" + userId + '\'' +
|
||||
", amount=" + amount +
|
||||
", latitude=" + latitude +
|
||||
", longitude=" + longitude +
|
||||
", message='" + message + '\'' +
|
||||
", alertTime=" + alertTime +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
0
docker-compose.yml
Normal file
0
docker-compose.yml
Normal file
66
kafka-consumer-visualizer/pom.xml
Normal file
66
kafka-consumer-visualizer/pom.xml
Normal file
@ -0,0 +1,66 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.anomaly</groupId>
|
||||
<artifactId>kafka-consumer-visualizer</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>11</maven.compiler.source>
|
||||
<maven.compiler.target>11</maven.compiler.target>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<!-- Kafka -->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>2.8.1</version>
|
||||
</dependency>
|
||||
<!-- JSON processing -->
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
<version>2.8.9</version>
|
||||
</dependency>
|
||||
<!-- Logging -->
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.32</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>1.2.9</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.2.4</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<transformers>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>com.anomaly.consumer.TransactionConsumer</mainClass>
|
||||
</transformer>
|
||||
</transformers>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
@ -0,0 +1,178 @@
|
||||
package com.anomaly.consumer;
|
||||
|
||||
import com.anomaly.model.Transaction;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.kafka.clients.consumer.*;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.swing.*;
|
||||
import java.awt.*;
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.List;
|
||||
|
||||
public class TransactionConsumer {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TransactionConsumer.class);
|
||||
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
|
||||
private static final String GROUP_ID = "transaction-consumer-group";
|
||||
private static final String TOPIC = "transactions";
|
||||
private static final Gson gson = new Gson();
|
||||
|
||||
// UI Components
|
||||
private static JFrame frame;
|
||||
private static JTextArea logArea;
|
||||
private static JPanel chartPanel;
|
||||
private static final int MAX_DISPLAYED_TRANSACTIONS = 100;
|
||||
private static final List<Transaction> recentTransactions = new ArrayList<>();
|
||||
|
||||
public static void main(String[] args) {
|
||||
setupUI();
|
||||
|
||||
// Create consumer properties
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
|
||||
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
|
||||
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
|
||||
// Create consumer
|
||||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
|
||||
|
||||
// Subscribe to topic
|
||||
consumer.subscribe(Collections.singletonList(TOPIC));
|
||||
|
||||
// Add shutdown hook
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
logger.info("Shutting down consumer...");
|
||||
consumer.close();
|
||||
logger.info("Consumer closed");
|
||||
}));
|
||||
|
||||
// Poll for new data
|
||||
try {
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
||||
|
||||
for (ConsumerRecord<String, String> record : records) {
|
||||
logMessage("Received: Key: " + record.key() + ", Value: " + record.value());
|
||||
|
||||
// Parse the transaction
|
||||
Transaction transaction = gson.fromJson(record.value(), Transaction.class);
|
||||
addTransaction(transaction);
|
||||
}
|
||||
|
||||
// Update the visualization
|
||||
if (!records.isEmpty()) {
|
||||
updateChart();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
consumer.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static void setupUI() {
|
||||
frame = new JFrame("Transaction Visualizer");
|
||||
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
|
||||
frame.setSize(800, 600);
|
||||
|
||||
// Create log area
|
||||
logArea = new JTextArea();
|
||||
logArea.setEditable(false);
|
||||
JScrollPane scrollPane = new JScrollPane(logArea);
|
||||
scrollPane.setPreferredSize(new Dimension(800, 200));
|
||||
|
||||
// Create chart panel
|
||||
chartPanel = new JPanel() {
|
||||
@Override
|
||||
protected void paintComponent(Graphics g) {
|
||||
super.paintComponent(g);
|
||||
drawChart(g);
|
||||
}
|
||||
};
|
||||
chartPanel.setPreferredSize(new Dimension(800, 400));
|
||||
|
||||
// Add components to frame
|
||||
frame.setLayout(new BorderLayout());
|
||||
frame.add(scrollPane, BorderLayout.SOUTH);
|
||||
frame.add(chartPanel, BorderLayout.CENTER);
|
||||
|
||||
frame.setVisible(true);
|
||||
}
|
||||
|
||||
private static void logMessage(String message) {
|
||||
SwingUtilities.invokeLater(() -> {
|
||||
logArea.append(message + "\n");
|
||||
logArea.setCaretPosition(logArea.getDocument().getLength());
|
||||
});
|
||||
}
|
||||
|
||||
private static void addTransaction(Transaction transaction) {
|
||||
synchronized (recentTransactions) {
|
||||
recentTransactions.add(transaction);
|
||||
if (recentTransactions.size() > MAX_DISPLAYED_TRANSACTIONS) {
|
||||
recentTransactions.remove(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void updateChart() {
|
||||
SwingUtilities.invokeLater(() -> chartPanel.repaint());
|
||||
}
|
||||
|
||||
private static void drawChart(Graphics g) {
|
||||
Graphics2D g2d = (Graphics2D) g;
|
||||
int width = chartPanel.getWidth();
|
||||
int height = chartPanel.getHeight();
|
||||
|
||||
// Clear the background
|
||||
g2d.setColor(Color.WHITE);
|
||||
g2d.fillRect(0, 0, width, height);
|
||||
|
||||
synchronized (recentTransactions) {
|
||||
if (recentTransactions.isEmpty()) return;
|
||||
|
||||
// Find max amount for scaling
|
||||
double maxAmount = recentTransactions.stream()
|
||||
.mapToDouble(Transaction::getAmount)
|
||||
.max()
|
||||
.orElse(1.0);
|
||||
|
||||
// Draw axes
|
||||
g2d.setColor(Color.BLACK);
|
||||
g2d.drawLine(50, height - 50, width - 50, height - 50); // X-axis
|
||||
g2d.drawLine(50, 50, 50, height - 50); // Y-axis
|
||||
|
||||
// Draw labels
|
||||
g2d.drawString("Time →", width - 70, height - 20);
|
||||
g2d.drawString("Amount", 10, 40);
|
||||
g2d.drawString("$" + Math.round(maxAmount), 10, 60);
|
||||
|
||||
// Draw transactions as points
|
||||
int xStep = (width - 100) / Math.max(1, recentTransactions.size() - 1);
|
||||
int x = 50;
|
||||
|
||||
for (Transaction transaction : recentTransactions) {
|
||||
int y = height - 50 - (int) ((transaction.getAmount() / maxAmount) * (height - 100));
|
||||
|
||||
// Color based on available limit percentage
|
||||
double limitPercentage = transaction.getAvailableLimit() /
|
||||
(transaction.getAmount() + transaction.getAvailableLimit());
|
||||
|
||||
if (limitPercentage < 0.3) {
|
||||
g2d.setColor(Color.RED);
|
||||
} else if (limitPercentage < 0.6) {
|
||||
g2d.setColor(Color.ORANGE);
|
||||
} else {
|
||||
g2d.setColor(Color.GREEN);
|
||||
}
|
||||
|
||||
g2d.fillOval(x - 3, y - 3, 6, 6);
|
||||
x += xStep;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
66
transaction-simulator/pom.xml
Normal file
66
transaction-simulator/pom.xml
Normal file
@ -0,0 +1,66 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.anomaly</groupId>
|
||||
<artifactId>transaction-simulator</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>11</maven.compiler.source>
|
||||
<maven.compiler.target>11</maven.compiler.target>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<!-- Kafka -->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>2.8.1</version>
|
||||
</dependency>
|
||||
<!-- JSON processing -->
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
<version>2.8.9</version>
|
||||
</dependency>
|
||||
<!-- Logging -->
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.32</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>1.2.9</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.2.4</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<transformers>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>com.anomaly.producer.TransactionProducer</mainClass>
|
||||
</transformer>
|
||||
</transformers>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
@ -0,0 +1,164 @@
|
||||
package com.anomaly.generator;
|
||||
|
||||
import com.anomaly.model.Transaction;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
public class TransactionGenerator {
|
||||
private static final int NUM_CARDS = 10000;
|
||||
private static final int NUM_USERS = 5000; // Assuming each user has ~2 cards on average
|
||||
private static final Map<String, Set<Double[]>> cardLocations = new HashMap<>();
|
||||
private static final Map<String, Double> cardLimits = new HashMap<>();
|
||||
private static final Map<String, String> cardToUser = new HashMap<>();
|
||||
private static final List<String> cardIds = new ArrayList<>();
|
||||
private static final List<String> userIds = new ArrayList<>();
|
||||
|
||||
// Anomaly types
|
||||
private static final int ANOMALY_NONE = 0;
|
||||
private static final int ANOMALY_AMOUNT = 1;
|
||||
private static final int ANOMALY_LOCATION = 2;
|
||||
private static final int ANOMALY_FREQUENCY = 3;
|
||||
|
||||
// Probability of generating an anomaly (1%)
|
||||
private static final double ANOMALY_PROBABILITY = 0.01;
|
||||
|
||||
// Initialize card and user data
|
||||
static {
|
||||
// Generate user IDs
|
||||
for (int i = 0; i < NUM_USERS; i++) {
|
||||
userIds.add("USER_" + String.format("%05d", i));
|
||||
}
|
||||
|
||||
// Generate card IDs and assign to users
|
||||
for (int i = 0; i < NUM_CARDS; i++) {
|
||||
String cardId = "CARD_" + String.format("%05d", i);
|
||||
cardIds.add(cardId);
|
||||
String userId = userIds.get(ThreadLocalRandom.current().nextInt(NUM_USERS));
|
||||
cardToUser.put(cardId, userId);
|
||||
|
||||
// Initialize empty set of locations for this card
|
||||
cardLocations.put(cardId, new HashSet<>());
|
||||
|
||||
// Assign random limit between $1,000 and $20,000
|
||||
cardLimits.put(cardId, 1000.0 + ThreadLocalRandom.current().nextDouble() * 19000.0);
|
||||
}
|
||||
}
|
||||
|
||||
public Transaction generateTransaction(boolean forceAnomaly, int anomalyType) {
|
||||
// Select a random card
|
||||
String cardId = cardIds.get(ThreadLocalRandom.current().nextInt(NUM_CARDS));
|
||||
String userId = cardToUser.get(cardId);
|
||||
double availableLimit = cardLimits.get(cardId);
|
||||
|
||||
// Determine if this should be an anomaly
|
||||
int actualAnomalyType = ANOMALY_NONE;
|
||||
if (forceAnomaly) {
|
||||
actualAnomalyType = (anomalyType >= 0 && anomalyType <= 3) ?
|
||||
anomalyType : ThreadLocalRandom.current().nextInt(1, 4);
|
||||
} else if (ThreadLocalRandom.current().nextDouble() < ANOMALY_PROBABILITY) {
|
||||
actualAnomalyType = ThreadLocalRandom.current().nextInt(1, 4);
|
||||
}
|
||||
|
||||
// Get or generate location
|
||||
Double[] location = getLocationForCard(cardId, actualAnomalyType == ANOMALY_LOCATION);
|
||||
double latitude = location[0];
|
||||
double longitude = location[1];
|
||||
|
||||
// Generate transaction amount
|
||||
double amount;
|
||||
if (actualAnomalyType == ANOMALY_AMOUNT) {
|
||||
// Generate anomalously high amount (50-90% of available limit)
|
||||
amount = availableLimit * (0.5 + ThreadLocalRandom.current().nextDouble() * 0.4);
|
||||
} else {
|
||||
// Normal amount (1-10% of available limit)
|
||||
amount = availableLimit * (0.01 + ThreadLocalRandom.current().nextDouble() * 0.09);
|
||||
}
|
||||
|
||||
// Update available limit
|
||||
double newLimit = availableLimit - amount;
|
||||
cardLimits.put(cardId, newLimit > 0 ? newLimit : 0);
|
||||
|
||||
// Create transaction
|
||||
Transaction transaction = new Transaction(
|
||||
cardId,
|
||||
userId,
|
||||
latitude,
|
||||
longitude,
|
||||
amount,
|
||||
newLimit,
|
||||
Instant.now()
|
||||
);
|
||||
|
||||
return transaction;
|
||||
}
|
||||
|
||||
private Double[] getLocationForCard(String cardId, boolean generateAnomaly) {
|
||||
Set<Double[]> locations = cardLocations.get(cardId);
|
||||
|
||||
if (locations.isEmpty() || generateAnomaly) {
|
||||
// Generate a random worldwide location
|
||||
double latitude = ThreadLocalRandom.current().nextDouble(-90, 90);
|
||||
double longitude = ThreadLocalRandom.current().nextDouble(-180, 180);
|
||||
Double[] newLocation = {latitude, longitude};
|
||||
|
||||
// Store this location for future use unless it's an anomaly
|
||||
if (!generateAnomaly) {
|
||||
locations.add(newLocation);
|
||||
}
|
||||
|
||||
return newLocation;
|
||||
} else {
|
||||
// Pick a random location from the card's history
|
||||
Double[][] locArray = locations.toArray(new Double[0][]);
|
||||
return locArray[ThreadLocalRandom.current().nextInt(locArray.length)];
|
||||
}
|
||||
}
|
||||
|
||||
// Method to simulate frequency anomaly by generating multiple transactions in short succession
|
||||
public List<Transaction> generateFrequencyAnomaly(String specificCardId) {
|
||||
List<Transaction> transactions = new ArrayList<>();
|
||||
String cardId = specificCardId != null ?
|
||||
specificCardId : cardIds.get(ThreadLocalRandom.current().nextInt(NUM_CARDS));
|
||||
|
||||
// Generate 5-10 transactions in quick succession
|
||||
int numTransactions = ThreadLocalRandom.current().nextInt(5, 11);
|
||||
for (int i = 0; i < numTransactions; i++) {
|
||||
transactions.add(generateTransactionForCard(cardId, false, ANOMALY_NONE));
|
||||
}
|
||||
|
||||
return transactions;
|
||||
}
|
||||
|
||||
private Transaction generateTransactionForCard(String cardId, boolean forceAnomaly, int anomalyType) {
|
||||
String userId = cardToUser.get(cardId);
|
||||
double availableLimit = cardLimits.get(cardId);
|
||||
|
||||
// Get location
|
||||
Double[] location = getLocationForCard(cardId, forceAnomaly && anomalyType == ANOMALY_LOCATION);
|
||||
double latitude = location[0];
|
||||
double longitude = location[1];
|
||||
|
||||
// Generate amount
|
||||
double amount;
|
||||
if (forceAnomaly && anomalyType == ANOMALY_AMOUNT) {
|
||||
amount = availableLimit * (0.5 + ThreadLocalRandom.current().nextDouble() * 0.4);
|
||||
} else {
|
||||
amount = availableLimit * (0.01 + ThreadLocalRandom.current().nextDouble() * 0.09);
|
||||
}
|
||||
|
||||
// Update available limit
|
||||
double newLimit = availableLimit - amount;
|
||||
cardLimits.put(cardId, newLimit > 0 ? newLimit : 0);
|
||||
|
||||
return new Transaction(
|
||||
cardId,
|
||||
userId,
|
||||
latitude,
|
||||
longitude,
|
||||
amount,
|
||||
newLimit,
|
||||
Instant.now()
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,86 @@
|
||||
package com.anomaly.producer;
|
||||
|
||||
import com.anomaly.generator.TransactionGenerator;
|
||||
import com.anomaly.model.Transaction;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.kafka.clients.producer.*;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
public class TransactionProducer {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TransactionProducer.class);
|
||||
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
|
||||
private static final String TOPIC_NAME = "transactions";
|
||||
private static final TransactionGenerator generator = new TransactionGenerator();
|
||||
private static final Gson gson = new Gson();
|
||||
|
||||
public static void main(String[] args) {
|
||||
// Create producer properties
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
|
||||
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
|
||||
// Create producer
|
||||
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
|
||||
|
||||
// Add shutdown hook
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
logger.info("Shutting down producer...");
|
||||
producer.flush();
|
||||
producer.close();
|
||||
logger.info("Producer closed");
|
||||
}));
|
||||
|
||||
// Generate and send transactions
|
||||
try {
|
||||
while (true) {
|
||||
// Normal transaction generation
|
||||
if (ThreadLocalRandom.current().nextDouble() < 0.05) {
|
||||
// 5% chance to generate a frequency anomaly
|
||||
List<Transaction> anomalousTransactions = generator.generateFrequencyAnomaly(null);
|
||||
for (Transaction transaction : anomalousTransactions) {
|
||||
sendTransaction(producer, transaction);
|
||||
}
|
||||
} else {
|
||||
// Regular transaction
|
||||
boolean forceAnomaly = ThreadLocalRandom.current().nextDouble() < 0.02; // 2% chance
|
||||
Transaction transaction = generator.generateTransaction(forceAnomaly, -1);
|
||||
sendTransaction(producer, transaction);
|
||||
}
|
||||
|
||||
// Sleep between 100ms and 1s before generating next transaction
|
||||
Thread.sleep(ThreadLocalRandom.current().nextLong(100, 1000));
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
logger.error("Error in transaction producer", e);
|
||||
} finally {
|
||||
producer.flush();
|
||||
producer.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static void sendTransaction(KafkaProducer<String, String> producer, Transaction transaction)
|
||||
throws ExecutionException, InterruptedException {
|
||||
String jsonTransaction = gson.toJson(transaction);
|
||||
ProducerRecord<String, String> record = new ProducerRecord<>(
|
||||
TOPIC_NAME,
|
||||
transaction.getCardId(),
|
||||
jsonTransaction
|
||||
);
|
||||
|
||||
producer.send(record, (metadata, exception) -> {
|
||||
if (exception == null) {
|
||||
logger.info("Received metadata: Topic: {}, Partition: {}, Offset: {}, Timestamp: {}",
|
||||
metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp());
|
||||
} else {
|
||||
logger.error("Error sending message", exception);
|
||||
}
|
||||
}).get(); // Making it synchronous for demonstration
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user