fix: anomaly detector breaking

This commit is contained in:
Krzysztof Rudnicki 2025-04-20 14:26:50 +02:00
parent 4b3f12627d
commit be94f2dd00
2 changed files with 31 additions and 5 deletions

View File

@ -3,6 +3,10 @@ package com.anomaly.detector;
import com.anomaly.model.Transaction;
import com.anomaly.model.TransactionAlert;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
@ -20,13 +24,35 @@ import org.apache.flink.util.Collector;
import java.util.*;
import java.time.Instant;
import java.io.IOException;
public class AnomalyDetector {
private static final String INPUT_TOPIC = "transactions";
private static final String OUTPUT_TOPIC = "alerts";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final Gson gson = new Gson();
// Replace the simple Gson initialization with a configured one
private static final Gson gson = new GsonBuilder()
.registerTypeAdapter(Instant.class, new InstantTypeAdapter())
.create();
// Add a custom TypeAdapter for Instant
private static class InstantTypeAdapter extends TypeAdapter<Instant> {
@Override
public void write(JsonWriter out, Instant value) throws IOException {
if (value == null) {
out.nullValue();
} else {
out.value(value.toString());
}
}
@Override
public Instant read(JsonReader in) throws IOException {
return Instant.parse(in.nextString());
}
}
public static void main(String[] args) throws Exception {
// Set up the execution environment

View File

@ -34,28 +34,28 @@ echo "Starting all applications..."
# Start Flink job (Anomaly Detector)
echo "Starting Anomaly Detector..."
cd anomaly-detector
java -jar target/anomaly-detector-1.0-SNAPSHOT.jar &
java --add-opens java.base/java.time=ALL-UNNAMED -jar target/anomaly-detector-1.0-SNAPSHOT.jar &
ANOMALY_PID=$!
cd ..
# Start Alert Visualizer
echo "Starting Alert Visualizer..."
cd alarm-visualizer
java -jar target/alarm-visualizer-1.0-SNAPSHOT.jar &
java --add-opens java.base/java.time=ALL-UNNAMED -jar target/alarm-visualizer-1.0-SNAPSHOT.jar &
ALARM_PID=$!
cd ..
# Start Transaction Consumer/Visualizer
echo "Starting Transaction Consumer..."
cd kafka-consumer-visualizer
java -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar &
java --add-opens java.base/java.time=ALL-UNNAMED -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar &
CONSUMER_PID=$!
cd ..
# Start Transaction Producer last
echo "Starting Transaction Producer..."
cd transaction-simulator
java -jar target/transaction-simulator-1.0-SNAPSHOT.jar &
java --add-opens java.base/java.time=ALL-UNNAMED -jar target/transaction-simulator-1.0-SNAPSHOT.jar &
PRODUCER_PID=$!
cd ..