WUT_Computer_Science/Programming/PSD/flink/anomaly_detector.py

21 lines
733 B
Python
Raw Normal View History

2024-06-16 21:47:34 +02:00
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
import json
def detect_anomalies(transaction):
transaction_data = json.loads(transaction)
# Add anomaly detection logic here
return transaction_data
env = StreamExecutionEnvironment.get_execution_environment()
kafka_consumer = FlinkKafkaConsumer(
topics='transactions',
deserialization_schema=SimpleStringSchema(),
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'anomaly_detection'}
)
data_stream = env.add_source(kafka_consumer).map(detect_anomalies)
data_stream.print()
env.execute("Anomaly Detection")