diff --git a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java index b7ebe65a..04f73f73 100644 --- a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java +++ b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java @@ -5,14 +5,17 @@ import com.anomaly.model.TransactionAlert; import com.google.gson.Gson; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.util.Collector; import java.util.*; @@ -29,21 +32,18 @@ public class AnomalyDetector { // Set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // Configure Kafka consumer - Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS); - properties.setProperty("group.id", "anomaly-detector"); - - // Create Kafka consumer - FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>( - INPUT_TOPIC, - new SimpleStringSchema(), - properties - ); + // Create Kafka source to replace deprecated FlinkKafkaConsumer + KafkaSource kafkaSource = KafkaSource.builder() + .setBootstrapServers(BOOTSTRAP_SERVERS) + .setTopics(INPUT_TOPIC) + .setGroupId("anomaly-detector") + .setStartingOffsets(OffsetsInitializer.earliest()) + .setValueOnlyDeserializer(new SimpleStringSchema()) + .build(); // Parse JSON transactions DataStream transactionStream = env - .addSource(consumer) + .fromSource(kafkaSource, org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(), "Kafka Source") .map(new MapFunction() { @Override public Transaction map(String value) throws Exception { @@ -74,14 +74,20 @@ public class AnomalyDetector { DataStream allAlerts = amountAlerts .union(locationAlerts, frequencyAlerts); + // Create KafkaSink to replace deprecated FlinkKafkaProducer + KafkaSink kafkaSink = KafkaSink.builder() + .setBootstrapServers(BOOTSTRAP_SERVERS) + .setRecordSerializer(KafkaRecordSerializationSchema.builder() + .setTopic(OUTPUT_TOPIC) + .setValueSerializationSchema(new SimpleStringSchema()) + .build()) + .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .build(); + // Convert alerts to JSON and send to Kafka allAlerts .map(alert -> gson.toJson(alert)) - .addSink(new FlinkKafkaProducer<>( - OUTPUT_TOPIC, - new SimpleStringSchema(), - properties - )); + .sinkTo(kafkaSink); // Execute the Flink job env.execute("Credit Card Transaction Anomaly Detection"); diff --git a/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java b/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java index d24d9283..e0129fa4 100644 --- a/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java +++ b/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java @@ -3,9 +3,15 @@ package com.anomaly.producer; import com.anomaly.generator.TransactionGenerator; import com.anomaly.model.Transaction; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; +import java.io.IOException; +import java.time.Instant; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; @@ -15,7 +21,21 @@ public class TransactionProducer { private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String TOPIC_NAME = "transactions"; private static final TransactionGenerator generator = new TransactionGenerator(); - private static final Gson gson = new Gson(); + + // Replace simple Gson with a properly configured instance + private static final Gson gson = new GsonBuilder() + .registerTypeAdapter(Instant.class, new TypeAdapter() { + @Override + public void write(JsonWriter out, Instant value) throws IOException { + out.value(value != null ? value.toString() : null); + } + + @Override + public Instant read(JsonReader in) throws IOException { + return Instant.parse(in.nextString()); + } + }) + .create(); public static void main(String[] args) { // Create producer properties