From be94f2dd00a1e2f3e03198d86b958224f0b639c6 Mon Sep 17 00:00:00 2001 From: Krzysztof Rudnicki Date: Sun, 20 Apr 2025 14:26:50 +0200 Subject: [PATCH] fix: anomaly detector breaking --- .../com/anomaly/detector/AnomalyDetector.java | 28 ++++++++++++++++++- run_all.sh | 8 +++--- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java index 04f73f73..04058140 100644 --- a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java +++ b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java @@ -3,6 +3,10 @@ package com.anomaly.detector; import com.anomaly.model.Transaction; import com.anomaly.model.TransactionAlert; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; @@ -20,13 +24,35 @@ import org.apache.flink.util.Collector; import java.util.*; import java.time.Instant; +import java.io.IOException; public class AnomalyDetector { private static final String INPUT_TOPIC = "transactions"; private static final String OUTPUT_TOPIC = "alerts"; private static final String BOOTSTRAP_SERVERS = "localhost:9092"; - private static final Gson gson = new Gson(); + + // Replace the simple Gson initialization with a configured one + private static final Gson gson = new GsonBuilder() + .registerTypeAdapter(Instant.class, new InstantTypeAdapter()) + .create(); + + // Add a custom TypeAdapter for Instant + private static class InstantTypeAdapter extends TypeAdapter { + @Override + public void write(JsonWriter out, Instant value) throws IOException { + if (value == null) { + out.nullValue(); + } else { + out.value(value.toString()); + } + } + + @Override + public Instant read(JsonReader in) throws IOException { + return Instant.parse(in.nextString()); + } + } public static void main(String[] args) throws Exception { // Set up the execution environment diff --git a/run_all.sh b/run_all.sh index 0ce2570b..3f04ca72 100755 --- a/run_all.sh +++ b/run_all.sh @@ -34,28 +34,28 @@ echo "Starting all applications..." # Start Flink job (Anomaly Detector) echo "Starting Anomaly Detector..." cd anomaly-detector -java -jar target/anomaly-detector-1.0-SNAPSHOT.jar & +java --add-opens java.base/java.time=ALL-UNNAMED -jar target/anomaly-detector-1.0-SNAPSHOT.jar & ANOMALY_PID=$! cd .. # Start Alert Visualizer echo "Starting Alert Visualizer..." cd alarm-visualizer -java -jar target/alarm-visualizer-1.0-SNAPSHOT.jar & +java --add-opens java.base/java.time=ALL-UNNAMED -jar target/alarm-visualizer-1.0-SNAPSHOT.jar & ALARM_PID=$! cd .. # Start Transaction Consumer/Visualizer echo "Starting Transaction Consumer..." cd kafka-consumer-visualizer -java -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar & +java --add-opens java.base/java.time=ALL-UNNAMED -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar & CONSUMER_PID=$! cd .. # Start Transaction Producer last echo "Starting Transaction Producer..." cd transaction-simulator -java -jar target/transaction-simulator-1.0-SNAPSHOT.jar & +java --add-opens java.base/java.time=ALL-UNNAMED -jar target/transaction-simulator-1.0-SNAPSHOT.jar & PRODUCER_PID=$! cd ..