WUT_Computer_Science/Programming/PSD/flink/anomaly_detector.py
2026-02-06 22:15:31 +01:00

21 lines
733 B
Python

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
import json
def detect_anomalies(transaction):
transaction_data = json.loads(transaction)
# Add anomaly detection logic here
return transaction_data
env = StreamExecutionEnvironment.get_execution_environment()
kafka_consumer = FlinkKafkaConsumer(
topics='transactions',
deserialization_schema=SimpleStringSchema(),
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'anomaly_detection'}
)
data_stream = env.add_source(kafka_consumer).map(detect_anomalies)
data_stream.print()
env.execute("Anomaly Detection")