fix: transaction visualizer displays something

This commit is contained in:
Krzysztof kuhy Rudnicki 2025-04-15 19:09:46 +02:00
parent fe9c99dac1
commit cb1bd7dae3
2 changed files with 46 additions and 20 deletions

View File

@ -5,14 +5,17 @@ import com.anomaly.model.TransactionAlert;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import java.util.*;
@ -29,21 +32,18 @@ public class AnomalyDetector {
// Set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure Kafka consumer
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.setProperty("group.id", "anomaly-detector");
// Create Kafka consumer
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
INPUT_TOPIC,
new SimpleStringSchema(),
properties
);
// Create Kafka source to replace deprecated FlinkKafkaConsumer
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setTopics(INPUT_TOPIC)
.setGroupId("anomaly-detector")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// Parse JSON transactions
DataStream<Transaction> transactionStream = env
.addSource(consumer)
.fromSource(kafkaSource, org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(), "Kafka Source")
.map(new MapFunction<String, Transaction>() {
@Override
public Transaction map(String value) throws Exception {
@ -74,14 +74,20 @@ public class AnomalyDetector {
DataStream<TransactionAlert> allAlerts = amountAlerts
.union(locationAlerts, frequencyAlerts);
// Create KafkaSink to replace deprecated FlinkKafkaProducer
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(OUTPUT_TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// Convert alerts to JSON and send to Kafka
allAlerts
.map(alert -> gson.toJson(alert))
.addSink(new FlinkKafkaProducer<>(
OUTPUT_TOPIC,
new SimpleStringSchema(),
properties
));
.sinkTo(kafkaSink);
// Execute the Flink job
env.execute("Credit Card Transaction Anomaly Detection");

View File

@ -3,9 +3,15 @@ package com.anomaly.producer;
import com.anomaly.generator.TransactionGenerator;
import com.anomaly.model.Transaction;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
@ -15,7 +21,21 @@ public class TransactionProducer {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC_NAME = "transactions";
private static final TransactionGenerator generator = new TransactionGenerator();
private static final Gson gson = new Gson();
// Replace simple Gson with a properly configured instance
private static final Gson gson = new GsonBuilder()
.registerTypeAdapter(Instant.class, new TypeAdapter<Instant>() {
@Override
public void write(JsonWriter out, Instant value) throws IOException {
out.value(value != null ? value.toString() : null);
}
@Override
public Instant read(JsonReader in) throws IOException {
return Instant.parse(in.nextString());
}
})
.create();
public static void main(String[] args) {
// Create producer properties