From 2c3c0efe8b7a638185e18ec3c7ec8a8ece8ef7d7 Mon Sep 17 00:00:00 2001 From: psd Date: Thu, 24 Apr 2025 15:56:19 +0200 Subject: [PATCH] wip zin 3 psd python --- .../code/generator/temperature_generator.py | 66 +++++++++ .../temperature_alarm.cpython-310.pyc | Bin 0 -> 934 bytes .../temperature_reading.cpython-310.pyc | Bin 0 -> 944 bytes .../python/code/model/temperature_alarm.py | 20 +++ .../python/code/model/temperature_reading.py | 20 +++ .../processor/temperature_anomaly_detector.py | 129 +++++++++++++++++ .../code/visualizer/alarm_visualizer.py | 106 ++++++++++++++ .../PSD/zin3/python/run_temperature_system.sh | 132 ++++++++++++++++++ 8 files changed, 473 insertions(+) create mode 100644 Programming/PSD/zin3/python/code/generator/temperature_generator.py create mode 100644 Programming/PSD/zin3/python/code/model/__pycache__/temperature_alarm.cpython-310.pyc create mode 100644 Programming/PSD/zin3/python/code/model/__pycache__/temperature_reading.cpython-310.pyc create mode 100644 Programming/PSD/zin3/python/code/model/temperature_alarm.py create mode 100644 Programming/PSD/zin3/python/code/model/temperature_reading.py create mode 100644 Programming/PSD/zin3/python/code/processor/temperature_anomaly_detector.py create mode 100644 Programming/PSD/zin3/python/code/visualizer/alarm_visualizer.py create mode 100755 Programming/PSD/zin3/python/run_temperature_system.sh diff --git a/Programming/PSD/zin3/python/code/generator/temperature_generator.py b/Programming/PSD/zin3/python/code/generator/temperature_generator.py new file mode 100644 index 00000000..250e9b41 --- /dev/null +++ b/Programming/PSD/zin3/python/code/generator/temperature_generator.py @@ -0,0 +1,66 @@ +import json +import random +import time +from confluent_kafka import Producer +import sys +import os + +# Add parent directory to path to import model +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from model.temperature_reading import TemperatureReading + +def delivery_report(err, msg): + """Callback for message delivery reports""" + if err is not None: + print(f'Message delivery failed: {err}') + else: + pass # Successfully delivered + +def main(): + # Set up Kafka producer with confluent-kafka + producer_config = { + 'bootstrap.servers': 'localhost:9092' + } + producer = Producer(producer_config) + + # Generate a fixed number of thermometer IDs + thermometer_ids = ["Therm-1", "Therm-2", "Therm-3", "Therm-4", "Therm-5"] + + try: + while True: + for thermometer_id in thermometer_ids: + # Generate a random temperature between -10 and 30 degrees + temperature = random.uniform(-10, 30) + + # Create reading object + reading = TemperatureReading( + thermometer_id=thermometer_id, + timestamp=int(time.time() * 1000), + temperature=temperature + ) + + # Convert to dictionary for JSON serialization + reading_dict = reading.to_dict() + payload = json.dumps(reading_dict).encode('utf-8') + + # Send to Kafka topic "Temperatura" + producer.produce( + "Temperatura", + key=thermometer_id.encode('utf-8'), + value=payload, + callback=delivery_report + ) + producer.flush(timeout=1) + # print(f"Sent: {json.dumps(reading_dict)}") + + # Sleep for a second + time.sleep(1) + + except KeyboardInterrupt: + print("Stopping temperature generator") + finally: + # Wait for any outstanding messages to be delivered + producer.flush() + +if __name__ == "__main__": + main() diff --git a/Programming/PSD/zin3/python/code/model/__pycache__/temperature_alarm.cpython-310.pyc b/Programming/PSD/zin3/python/code/model/__pycache__/temperature_alarm.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..a053ed43feb1faff6b73a4398f9f85cd3c6c2b1a GIT binary patch literal 934 zcmZWnJ#X7E5aov~%XaMs8H%=3S9OSnP6gVaMG6#MgaA%aKno#=jIBVFREi#AXer_~ z|DZ!N<}Y#WzbH_kcXFJl@d-R0PvpDDcdVwFeRzHwN5fYBU+`%EN} zbOYmxB}B4033&$j|Lp$6Ye*}SP)QOd1IZ-c(1c4*4mKq5)Ij=jxFJ_G@g)VMkvlk3 zx>hE&E2BOZsnK!f^`J2ZisS@F^9)=|TOvryz|)q4XOaLAjc>V|*`+eN(#k3$a_L4k z*Q&9puHD2wlnFiOHL5r_$l729GXQr>%n+tONwfrrsBUER?dzF1sdT+Uzp1 zsTL;HIxiQ|>HJf)&dWDZeQB3f8D*7Jkp@>py@DdeiyhUME)XIwb1MYisqw-2)Td** ze(_j_qutJo!mgcRjD$NmusbcezyYFMFiC%Mkb=b_chjB}U*y9j4Hgcm!P4Dzw=RIC z8`?_9JhP}mgBg!`)T932_47S;cUv=r+Zo1)+mU+(K?~jb1rO(jb~C%1`3;rcHBueK z24CVHJ;xWyhRpG-Xq+cgn?B|`IyaT>*vdqwkj}wBE Zd<>%FzuuV+qEkM2w`FkLokkrG$v@NK&msT- literal 0 HcmV?d00001 diff --git a/Programming/PSD/zin3/python/code/model/__pycache__/temperature_reading.cpython-310.pyc b/Programming/PSD/zin3/python/code/model/__pycache__/temperature_reading.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..e8683c6484b51f98d3b50db47c3bd40a3386a36c GIT binary patch literal 944 zcmZuvO^?$s5cNlzrY$W}4;+94*KpVzI3WRAX^{|@XhjzxMMzc@d)utyBslJ2tK8Zv z|9}H~+`r^2{{jh#nRL4w*qzAZ@x*@f{KnOEIwoMR-;P&XM#wK*^o_&f7)CP(_kluzB$j#4` zu9Zpc+Ne(|m3g_0Gp`4ZF_0uDFq$XeTG|ppS_YoB96XZ*plE!}-OR3((Un$K8Iemj zvbk1`O?B-i_O4LqIj>R0xk26rE0_VeYhs2l{r(!Q03xaz8GZSDCQd3{uMu#O<*Lk7 zbZV-lNwo&9(dpuSw8_g?QGIDwRT*WKRFMW(M7@H-?5}oIU%Ehuyv(f-c&o+-=To1K z>E_veDUJ?%GYb0-hB1=vB*7lFbHOD2!9fldhulqja(t9`7d2Qqqy~$3SKhk- zmTqV(A@j_l5)EcN=24IOe>P7a;CHh*L%@S!jL03m7Z9}2v!C#Qu4p&2o0(rx>3v7C zgV^AUyhqUSrLrM&JS!UK$<(I*vmKqAN_Tu`0MH-`V*$kv7(ai2@$G?-$8j7Eu^(ls zl@M+$1axj)VEjmkZ|k(!|1l$EHv#7+S&=pkJg`+I&0|cXP(6o|8SD^_j!;0xG#Ieq bjCT1LgvWoqGaW>ye(+|`;9Ykbm7J5mh6B?t literal 0 HcmV?d00001 diff --git a/Programming/PSD/zin3/python/code/model/temperature_alarm.py b/Programming/PSD/zin3/python/code/model/temperature_alarm.py new file mode 100644 index 00000000..d37da56d --- /dev/null +++ b/Programming/PSD/zin3/python/code/model/temperature_alarm.py @@ -0,0 +1,20 @@ +class TemperatureAlarm: + def __init__(self, thermometer_id=None, timestamp=None, temperature=None): + self.thermometer_id = thermometer_id + self.timestamp = timestamp + self.temperature = temperature + + def to_dict(self): + return { + "thermometerId": self.thermometer_id, + "timestamp": self.timestamp, + "temperature": self.temperature + } + + @classmethod + def from_dict(cls, data): + return cls( + thermometer_id=data.get("thermometerId"), + timestamp=data.get("timestamp"), + temperature=data.get("temperature") + ) diff --git a/Programming/PSD/zin3/python/code/model/temperature_reading.py b/Programming/PSD/zin3/python/code/model/temperature_reading.py new file mode 100644 index 00000000..01c15f24 --- /dev/null +++ b/Programming/PSD/zin3/python/code/model/temperature_reading.py @@ -0,0 +1,20 @@ +class TemperatureReading: + def __init__(self, thermometer_id=None, timestamp=None, temperature=None): + self.thermometer_id = thermometer_id + self.timestamp = timestamp + self.temperature = temperature + + def to_dict(self): + return { + "thermometerId": self.thermometer_id, + "timestamp": self.timestamp, + "temperature": self.temperature + } + + @classmethod + def from_dict(cls, data): + return cls( + thermometer_id=data.get("thermometerId"), + timestamp=data.get("timestamp"), + temperature=data.get("temperature") + ) diff --git a/Programming/PSD/zin3/python/code/processor/temperature_anomaly_detector.py b/Programming/PSD/zin3/python/code/processor/temperature_anomaly_detector.py new file mode 100644 index 00000000..89e2e569 --- /dev/null +++ b/Programming/PSD/zin3/python/code/processor/temperature_anomaly_detector.py @@ -0,0 +1,129 @@ +import os +import json +import sys +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.common.serialization import SimpleStringSchema +from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer +from pyflink.common.time import Time +from pyflink.common.typeinfo import Types +from pyflink.common.watermark_strategy import WatermarkStrategy +from pyflink.datastream.functions import MapFunction, KeyedProcessFunction +from pyflink.datastream.state import ValueStateDescriptor + +# Add parent directory to path to import model +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +class ReadingMapFunction(MapFunction): + def map(self, value): + data = json.loads(value) + return (data["thermometerId"], data["timestamp"], data["temperature"]) + +class AnomalyDetectionFunction(KeyedProcessFunction): + def __init__(self): + self.window_size = 10 * 1000 # 10 seconds in milliseconds + + def process_element(self, value, ctx): + thermometer_id, timestamp, temperature = value + + # Check if temperature is below zero (anomaly) + if temperature < 0.0: + # Create alarm + alarm = { + "thermometerId": thermometer_id, + "timestamp": timestamp, + "temperature": temperature + } + yield json.dumps(alarm) + +def add_kafka_dependencies(env): + """Add Kafka connector dependencies to the environment""" + + # User-specific paths where JAR files are located + kafka_path = "/home/psd/Downloads/kafka_2.13-3.4.0/libs" + flink_path = "/home/psd/Downloads/flink-1.17.0/lib" + + # Required JAR files + kafka_jars = { + "kafka-clients": "kafka-clients-3.4.0.jar", + "flink-connector-kafka": None # Will be detected or reported as missing + } + + jar_paths = [] + + # Add kafka-clients JAR + clients_jar = os.path.join(kafka_path, kafka_jars["kafka-clients"]) + if os.path.exists(clients_jar): + jar_paths.append(f"file://{clients_jar}") + else: + print(f"Warning: Could not find Kafka clients JAR at {clients_jar}") + + # Try to find Flink Kafka connector JAR + connector_found = False + for file in os.listdir(flink_path): + if file.startswith("flink-connector-kafka"): + kafka_jars["flink-connector-kafka"] = file + jar_paths.append(f"file://{os.path.join(flink_path, file)}") + connector_found = True + break + + if not connector_found: + print("Warning: No Flink Kafka connector JAR found! You need to download it.") + print("You can download it from Maven Central or the Apache Flink website.") + print("The JAR should be named like 'flink-connector-kafka-1.17.0.jar'") + print(f"Place it in: {flink_path}") + + if jar_paths: + env.get_config().get_configuration().set_string("pipeline.jars", ";".join(jar_paths)) + print(f"Added JAR files: {jar_paths}") + +def main(): + # Create execution environment + env = StreamExecutionEnvironment.get_execution_environment() + + # Add Kafka connector dependencies + add_kafka_dependencies(env) + + # Configure Kafka consumer + consumer_props = { + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'temperature-anomaly-detector' + } + + # Create Kafka consumer + consumer = FlinkKafkaConsumer( + topics=['Temperatura'], + deserialization_schema=SimpleStringSchema(), + properties=consumer_props + ) + + # Configure Kafka producer + producer_props = { + 'bootstrap.servers': 'localhost:9092' + } + + # Create Kafka producer + producer = FlinkKafkaProducer( + topic='Alarm', + serialization_schema=SimpleStringSchema(), + producer_config=producer_props + ) + + # Add source and transformations + env.add_source(consumer) \ + .map(ReadingMapFunction(), output_type=Types.TUPLE([Types.STRING(), Types.LONG(), Types.DOUBLE()])) \ + .assign_timestamps_and_watermarks( + WatermarkStrategy.for_monotonous_timestamps() + .with_timestamp_assigner(lambda event, timestamp: event[1]) + ) \ + .key_by(lambda x: x[0]) \ + .process(AnomalyDetectionFunction()) \ + .add_sink(producer) + + # Print the execution plan + print(env.get_execution_plan()) + + # Execute + env.execute("Temperature Anomaly Detector with Time Windows") + +if __name__ == "__main__": + main() diff --git a/Programming/PSD/zin3/python/code/visualizer/alarm_visualizer.py b/Programming/PSD/zin3/python/code/visualizer/alarm_visualizer.py new file mode 100644 index 00000000..3f9175f3 --- /dev/null +++ b/Programming/PSD/zin3/python/code/visualizer/alarm_visualizer.py @@ -0,0 +1,106 @@ +import json +import threading +import tkinter as tk +from tkinter import ttk +from confluent_kafka import Consumer +import sys +import os +from datetime import datetime +from collections import Counter + +# Add parent directory to path to import model +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from model.temperature_alarm import TemperatureAlarm + +class AlarmVisualizer: + def __init__(self, root): + self.root = root + self.root.title("Temperature Alarm Visualizer") + self.root.geometry("600x400") + + # Create the list model for alarms + self.alarm_list = tk.Listbox(root, font=("Courier", 12)) + self.alarm_list.pack(fill=tk.BOTH, expand=True) + + # Create scrollbar + scrollbar = ttk.Scrollbar(self.alarm_list, orient="vertical", command=self.alarm_list.yview) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + self.alarm_list.config(yscrollcommand=scrollbar.set) + + # Create stats panel + self.stats_frame = tk.Frame(root) + self.stats_frame.pack(fill=tk.X, side=tk.BOTTOM) + + self.stats_label = tk.Label(self.stats_frame, text="No alarms yet") + self.stats_label.pack(pady=5) + + # Stats tracking + self.thermometer_alarm_count = Counter() + self.max_alarms = 100 + + # Start Kafka consumer in a separate thread + self.consumer_thread = threading.Thread(target=self.consume_alarms, daemon=True) + self.consumer_thread.start() + + def consume_alarms(self): + # Set up Kafka consumer with confluent-kafka + consumer_config = { + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'alarm-visualizer', + 'auto.offset.reset': 'latest' + } + consumer = Consumer(consumer_config) + consumer.subscribe(['Alarm']) + + try: + while True: + msg = consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + print(f"Consumer error: {msg.error()}") + continue + + # Parse the alarm message + alarm_data = json.loads(msg.value().decode('utf-8')) + alarm = TemperatureAlarm.from_dict(alarm_data) + + # Format the timestamp + timestamp_dt = datetime.fromtimestamp(alarm.timestamp / 1000) + formatted_date = timestamp_dt.strftime('%Y-%m-%d %H:%M:%S') + + # Create alarm message + alarm_message = f"⚠️ ALARM: Thermometer {alarm.thermometer_id} reported {alarm.temperature:.2f}°C at {formatted_date}" + + # Update the UI + self.root.after(0, self.update_ui, alarm_message, alarm.thermometer_id) + + except Exception as e: + print(f"Error in Kafka consumer: {e}") + finally: + consumer.close() + + def update_ui(self, alarm_message, thermometer_id): + # Add new alarm to the top of the list + self.alarm_list.insert(0, alarm_message) + + # Keep list at a reasonable size + if self.alarm_list.size() > self.max_alarms: + self.alarm_list.delete(self.max_alarms) + + # Update statistics + self.thermometer_alarm_count[thermometer_id] += 1 + + # Format stats string + if self.thermometer_alarm_count: + stats_parts = [f"{id}={count}" for id, count in self.thermometer_alarm_count.items()] + stats_text = "Alarm Counts: " + ", ".join(stats_parts) + self.stats_label.config(text=stats_text) + +def main(): + root = tk.Tk() + app = AlarmVisualizer(root) + root.mainloop() + +if __name__ == "__main__": + main() diff --git a/Programming/PSD/zin3/python/run_temperature_system.sh b/Programming/PSD/zin3/python/run_temperature_system.sh new file mode 100755 index 00000000..db8315fc --- /dev/null +++ b/Programming/PSD/zin3/python/run_temperature_system.sh @@ -0,0 +1,132 @@ +#!/bin/bash + +# Directory where the code is located +CODE_DIR="$(dirname "$(readlink -f "$0")")/code" +KAFKA_DIR=${KAFKA_HOME:-"/opt/kafka"} +KAFKA_BIN="$KAFKA_DIR/bin" + +# Colors for console output +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +RED='\033[0;31m' +NC='\033[0m' # No Color + +# Function to check if a command exists +command_exists() { + command -v "$1" >/dev/null 2>&1 +} + +# Function to check if Kafka is running +kafka_running() { + if command_exists netstat; then + netstat -tuln | grep -q 9092 + else + echo -e "${YELLOW}Warning: netstat not available, assuming Kafka is running${NC}" + return 0 + fi +} + +# Cleanup function to kill all processes on exit +cleanup() { + echo -e "${YELLOW}Shutting down the temperature monitoring system...${NC}" + + # Kill all background processes + if [[ ! -z $GENERATOR_PID ]]; then + echo "Stopping temperature generator..." + kill $GENERATOR_PID 2>/dev/null || true + fi + + if [[ ! -z $DETECTOR_PID ]]; then + echo "Stopping anomaly detector..." + kill $DETECTOR_PID 2>/dev/null || true + fi + + if [[ ! -z $VISUALIZER_PID ]]; then + echo "Stopping alarm visualizer..." + kill $VISUALIZER_PID 2>/dev/null || true + fi + + echo -e "${GREEN}All components stopped successfully.${NC}" + exit 0 +} + +# Register the cleanup function for script termination +trap cleanup EXIT INT TERM + +# Check Python environment +if ! command_exists python3; then + echo -e "${RED}Error: Python3 is not installed. Please install Python3 and try again.${NC}" + exit 1 +fi + +KAFKA_BIN="/home/psd/Downloads/kafka_2.13-3.4.0/bin" +# Check if Kafka is running +echo "Checking Kafka status..." +if ! kafka_running; then + echo -e "${YELLOW}Kafka is not running. Attempting to start Kafka...${NC}" + + # Check Zookeeper first + if ! netstat -tuln | grep -q 2181; then + echo "Starting Zookeeper..." + if [[ -f "$KAFKA_BIN/zookeeper-server-start.sh" ]]; then + "$KAFKA_BIN/zookeeper-server-start.sh" "$KAFKA_DIR/config/zookeeper.properties" > /dev/null 2>&1 & + sleep 5 + else + echo -e "${RED}Error: Zookeeper startup script not found.${NC}" + exit 1 + fi + fi + + # Start Kafka + if [[ -f "$KAFKA_BIN/kafka-server-start.sh" ]]; then + echo "Starting Kafka server..." + "$KAFKA_BIN/kafka-server-start.sh" "$KAFKA_DIR/config/server.properties" > /dev/null 2>&1 & + sleep 10 + + # Check if Kafka started successfully + if ! kafka_running; then + echo -e "${RED}Error: Failed to start Kafka. Please check Kafka installation.${NC}" + exit 1 + fi + else + echo -e "${RED}Error: Kafka startup script not found.${NC}" + exit 1 + fi +fi + +echo -e "${GREEN}Kafka is running.${NC}" + +# Create Kafka topics if they don't exist +echo "Creating Kafka topics if they don't exist..." +if [[ -f "$KAFKA_BIN/kafka-topics.sh" ]]; then + "$KAFKA_BIN/kafka-topics.sh" --create --bootstrap-server localhost:9092 --topic Temperatura --partitions 3 --replication-factor 1 --if-not-exists + "$KAFKA_BIN/kafka-topics.sh" --create --bootstrap-server localhost:9092 --topic Alarm --partitions 3 --replication-factor 1 --if-not-exists +else + echo -e "${YELLOW}Warning: Kafka topics script not found. Assuming topics already exist.${NC}" +fi + +# Start components in the correct order +echo -e "${GREEN}Starting temperature monitoring system...${NC}" + +# 1. Start the alarm visualizer +echo "Starting alarm visualizer..." +python3 "$CODE_DIR/visualizer/alarm_visualizer.py" & +VISUALIZER_PID=$! +sleep 2 + +# 2. Start the anomaly detector +echo "Starting temperature anomaly detector..." +python3 "$CODE_DIR/processor/temperature_anomaly_detector.py" & +DETECTOR_PID=$! +sleep 2 + +# 3. Start the temperature generator +echo "Starting temperature generator..." +python3 "$CODE_DIR/generator/temperature_generator.py" & +GENERATOR_PID=$! + +echo -e "${GREEN}All components started successfully!${NC}" +echo "Press Ctrl+C to stop the system." + +# Wait for any process to exit +wait