wip zin 3 psd python

This commit is contained in:
psd 2025-04-24 15:56:19 +02:00
parent 10dd006e58
commit 2c3c0efe8b
8 changed files with 473 additions and 0 deletions

View File

@ -0,0 +1,66 @@
import json
import random
import time
from confluent_kafka import Producer
import sys
import os
# Add parent directory to path to import model
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from model.temperature_reading import TemperatureReading
def delivery_report(err, msg):
"""Callback for message delivery reports"""
if err is not None:
print(f'Message delivery failed: {err}')
else:
pass # Successfully delivered
def main():
# Set up Kafka producer with confluent-kafka
producer_config = {
'bootstrap.servers': 'localhost:9092'
}
producer = Producer(producer_config)
# Generate a fixed number of thermometer IDs
thermometer_ids = ["Therm-1", "Therm-2", "Therm-3", "Therm-4", "Therm-5"]
try:
while True:
for thermometer_id in thermometer_ids:
# Generate a random temperature between -10 and 30 degrees
temperature = random.uniform(-10, 30)
# Create reading object
reading = TemperatureReading(
thermometer_id=thermometer_id,
timestamp=int(time.time() * 1000),
temperature=temperature
)
# Convert to dictionary for JSON serialization
reading_dict = reading.to_dict()
payload = json.dumps(reading_dict).encode('utf-8')
# Send to Kafka topic "Temperatura"
producer.produce(
"Temperatura",
key=thermometer_id.encode('utf-8'),
value=payload,
callback=delivery_report
)
producer.flush(timeout=1)
# print(f"Sent: {json.dumps(reading_dict)}")
# Sleep for a second
time.sleep(1)
except KeyboardInterrupt:
print("Stopping temperature generator")
finally:
# Wait for any outstanding messages to be delivered
producer.flush()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,20 @@
class TemperatureAlarm:
def __init__(self, thermometer_id=None, timestamp=None, temperature=None):
self.thermometer_id = thermometer_id
self.timestamp = timestamp
self.temperature = temperature
def to_dict(self):
return {
"thermometerId": self.thermometer_id,
"timestamp": self.timestamp,
"temperature": self.temperature
}
@classmethod
def from_dict(cls, data):
return cls(
thermometer_id=data.get("thermometerId"),
timestamp=data.get("timestamp"),
temperature=data.get("temperature")
)

View File

@ -0,0 +1,20 @@
class TemperatureReading:
def __init__(self, thermometer_id=None, timestamp=None, temperature=None):
self.thermometer_id = thermometer_id
self.timestamp = timestamp
self.temperature = temperature
def to_dict(self):
return {
"thermometerId": self.thermometer_id,
"timestamp": self.timestamp,
"temperature": self.temperature
}
@classmethod
def from_dict(cls, data):
return cls(
thermometer_id=data.get("thermometerId"),
timestamp=data.get("timestamp"),
temperature=data.get("temperature")
)

View File

@ -0,0 +1,129 @@
import os
import json
import sys
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.functions import MapFunction, KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor
# Add parent directory to path to import model
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class ReadingMapFunction(MapFunction):
def map(self, value):
data = json.loads(value)
return (data["thermometerId"], data["timestamp"], data["temperature"])
class AnomalyDetectionFunction(KeyedProcessFunction):
def __init__(self):
self.window_size = 10 * 1000 # 10 seconds in milliseconds
def process_element(self, value, ctx):
thermometer_id, timestamp, temperature = value
# Check if temperature is below zero (anomaly)
if temperature < 0.0:
# Create alarm
alarm = {
"thermometerId": thermometer_id,
"timestamp": timestamp,
"temperature": temperature
}
yield json.dumps(alarm)
def add_kafka_dependencies(env):
"""Add Kafka connector dependencies to the environment"""
# User-specific paths where JAR files are located
kafka_path = "/home/psd/Downloads/kafka_2.13-3.4.0/libs"
flink_path = "/home/psd/Downloads/flink-1.17.0/lib"
# Required JAR files
kafka_jars = {
"kafka-clients": "kafka-clients-3.4.0.jar",
"flink-connector-kafka": None # Will be detected or reported as missing
}
jar_paths = []
# Add kafka-clients JAR
clients_jar = os.path.join(kafka_path, kafka_jars["kafka-clients"])
if os.path.exists(clients_jar):
jar_paths.append(f"file://{clients_jar}")
else:
print(f"Warning: Could not find Kafka clients JAR at {clients_jar}")
# Try to find Flink Kafka connector JAR
connector_found = False
for file in os.listdir(flink_path):
if file.startswith("flink-connector-kafka"):
kafka_jars["flink-connector-kafka"] = file
jar_paths.append(f"file://{os.path.join(flink_path, file)}")
connector_found = True
break
if not connector_found:
print("Warning: No Flink Kafka connector JAR found! You need to download it.")
print("You can download it from Maven Central or the Apache Flink website.")
print("The JAR should be named like 'flink-connector-kafka-1.17.0.jar'")
print(f"Place it in: {flink_path}")
if jar_paths:
env.get_config().get_configuration().set_string("pipeline.jars", ";".join(jar_paths))
print(f"Added JAR files: {jar_paths}")
def main():
# Create execution environment
env = StreamExecutionEnvironment.get_execution_environment()
# Add Kafka connector dependencies
add_kafka_dependencies(env)
# Configure Kafka consumer
consumer_props = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'temperature-anomaly-detector'
}
# Create Kafka consumer
consumer = FlinkKafkaConsumer(
topics=['Temperatura'],
deserialization_schema=SimpleStringSchema(),
properties=consumer_props
)
# Configure Kafka producer
producer_props = {
'bootstrap.servers': 'localhost:9092'
}
# Create Kafka producer
producer = FlinkKafkaProducer(
topic='Alarm',
serialization_schema=SimpleStringSchema(),
producer_config=producer_props
)
# Add source and transformations
env.add_source(consumer) \
.map(ReadingMapFunction(), output_type=Types.TUPLE([Types.STRING(), Types.LONG(), Types.DOUBLE()])) \
.assign_timestamps_and_watermarks(
WatermarkStrategy.for_monotonous_timestamps()
.with_timestamp_assigner(lambda event, timestamp: event[1])
) \
.key_by(lambda x: x[0]) \
.process(AnomalyDetectionFunction()) \
.add_sink(producer)
# Print the execution plan
print(env.get_execution_plan())
# Execute
env.execute("Temperature Anomaly Detector with Time Windows")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,106 @@
import json
import threading
import tkinter as tk
from tkinter import ttk
from confluent_kafka import Consumer
import sys
import os
from datetime import datetime
from collections import Counter
# Add parent directory to path to import model
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from model.temperature_alarm import TemperatureAlarm
class AlarmVisualizer:
def __init__(self, root):
self.root = root
self.root.title("Temperature Alarm Visualizer")
self.root.geometry("600x400")
# Create the list model for alarms
self.alarm_list = tk.Listbox(root, font=("Courier", 12))
self.alarm_list.pack(fill=tk.BOTH, expand=True)
# Create scrollbar
scrollbar = ttk.Scrollbar(self.alarm_list, orient="vertical", command=self.alarm_list.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.alarm_list.config(yscrollcommand=scrollbar.set)
# Create stats panel
self.stats_frame = tk.Frame(root)
self.stats_frame.pack(fill=tk.X, side=tk.BOTTOM)
self.stats_label = tk.Label(self.stats_frame, text="No alarms yet")
self.stats_label.pack(pady=5)
# Stats tracking
self.thermometer_alarm_count = Counter()
self.max_alarms = 100
# Start Kafka consumer in a separate thread
self.consumer_thread = threading.Thread(target=self.consume_alarms, daemon=True)
self.consumer_thread.start()
def consume_alarms(self):
# Set up Kafka consumer with confluent-kafka
consumer_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'alarm-visualizer',
'auto.offset.reset': 'latest'
}
consumer = Consumer(consumer_config)
consumer.subscribe(['Alarm'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
# Parse the alarm message
alarm_data = json.loads(msg.value().decode('utf-8'))
alarm = TemperatureAlarm.from_dict(alarm_data)
# Format the timestamp
timestamp_dt = datetime.fromtimestamp(alarm.timestamp / 1000)
formatted_date = timestamp_dt.strftime('%Y-%m-%d %H:%M:%S')
# Create alarm message
alarm_message = f"⚠️ ALARM: Thermometer {alarm.thermometer_id} reported {alarm.temperature:.2f}°C at {formatted_date}"
# Update the UI
self.root.after(0, self.update_ui, alarm_message, alarm.thermometer_id)
except Exception as e:
print(f"Error in Kafka consumer: {e}")
finally:
consumer.close()
def update_ui(self, alarm_message, thermometer_id):
# Add new alarm to the top of the list
self.alarm_list.insert(0, alarm_message)
# Keep list at a reasonable size
if self.alarm_list.size() > self.max_alarms:
self.alarm_list.delete(self.max_alarms)
# Update statistics
self.thermometer_alarm_count[thermometer_id] += 1
# Format stats string
if self.thermometer_alarm_count:
stats_parts = [f"{id}={count}" for id, count in self.thermometer_alarm_count.items()]
stats_text = "Alarm Counts: " + ", ".join(stats_parts)
self.stats_label.config(text=stats_text)
def main():
root = tk.Tk()
app = AlarmVisualizer(root)
root.mainloop()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,132 @@
#!/bin/bash
# Directory where the code is located
CODE_DIR="$(dirname "$(readlink -f "$0")")/code"
KAFKA_DIR=${KAFKA_HOME:-"/opt/kafka"}
KAFKA_BIN="$KAFKA_DIR/bin"
# Colors for console output
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
NC='\033[0m' # No Color
# Function to check if a command exists
command_exists() {
command -v "$1" >/dev/null 2>&1
}
# Function to check if Kafka is running
kafka_running() {
if command_exists netstat; then
netstat -tuln | grep -q 9092
else
echo -e "${YELLOW}Warning: netstat not available, assuming Kafka is running${NC}"
return 0
fi
}
# Cleanup function to kill all processes on exit
cleanup() {
echo -e "${YELLOW}Shutting down the temperature monitoring system...${NC}"
# Kill all background processes
if [[ ! -z $GENERATOR_PID ]]; then
echo "Stopping temperature generator..."
kill $GENERATOR_PID 2>/dev/null || true
fi
if [[ ! -z $DETECTOR_PID ]]; then
echo "Stopping anomaly detector..."
kill $DETECTOR_PID 2>/dev/null || true
fi
if [[ ! -z $VISUALIZER_PID ]]; then
echo "Stopping alarm visualizer..."
kill $VISUALIZER_PID 2>/dev/null || true
fi
echo -e "${GREEN}All components stopped successfully.${NC}"
exit 0
}
# Register the cleanup function for script termination
trap cleanup EXIT INT TERM
# Check Python environment
if ! command_exists python3; then
echo -e "${RED}Error: Python3 is not installed. Please install Python3 and try again.${NC}"
exit 1
fi
KAFKA_BIN="/home/psd/Downloads/kafka_2.13-3.4.0/bin"
# Check if Kafka is running
echo "Checking Kafka status..."
if ! kafka_running; then
echo -e "${YELLOW}Kafka is not running. Attempting to start Kafka...${NC}"
# Check Zookeeper first
if ! netstat -tuln | grep -q 2181; then
echo "Starting Zookeeper..."
if [[ -f "$KAFKA_BIN/zookeeper-server-start.sh" ]]; then
"$KAFKA_BIN/zookeeper-server-start.sh" "$KAFKA_DIR/config/zookeeper.properties" > /dev/null 2>&1 &
sleep 5
else
echo -e "${RED}Error: Zookeeper startup script not found.${NC}"
exit 1
fi
fi
# Start Kafka
if [[ -f "$KAFKA_BIN/kafka-server-start.sh" ]]; then
echo "Starting Kafka server..."
"$KAFKA_BIN/kafka-server-start.sh" "$KAFKA_DIR/config/server.properties" > /dev/null 2>&1 &
sleep 10
# Check if Kafka started successfully
if ! kafka_running; then
echo -e "${RED}Error: Failed to start Kafka. Please check Kafka installation.${NC}"
exit 1
fi
else
echo -e "${RED}Error: Kafka startup script not found.${NC}"
exit 1
fi
fi
echo -e "${GREEN}Kafka is running.${NC}"
# Create Kafka topics if they don't exist
echo "Creating Kafka topics if they don't exist..."
if [[ -f "$KAFKA_BIN/kafka-topics.sh" ]]; then
"$KAFKA_BIN/kafka-topics.sh" --create --bootstrap-server localhost:9092 --topic Temperatura --partitions 3 --replication-factor 1 --if-not-exists
"$KAFKA_BIN/kafka-topics.sh" --create --bootstrap-server localhost:9092 --topic Alarm --partitions 3 --replication-factor 1 --if-not-exists
else
echo -e "${YELLOW}Warning: Kafka topics script not found. Assuming topics already exist.${NC}"
fi
# Start components in the correct order
echo -e "${GREEN}Starting temperature monitoring system...${NC}"
# 1. Start the alarm visualizer
echo "Starting alarm visualizer..."
python3 "$CODE_DIR/visualizer/alarm_visualizer.py" &
VISUALIZER_PID=$!
sleep 2
# 2. Start the anomaly detector
echo "Starting temperature anomaly detector..."
python3 "$CODE_DIR/processor/temperature_anomaly_detector.py" &
DETECTOR_PID=$!
sleep 2
# 3. Start the temperature generator
echo "Starting temperature generator..."
python3 "$CODE_DIR/generator/temperature_generator.py" &
GENERATOR_PID=$!
echo -e "${GREEN}All components started successfully!${NC}"
echo "Press Ctrl+C to stop the system."
# Wait for any process to exit
wait