diff --git a/Programming/PSD/zin3/code/generator/TemperatureGenerator.java b/Programming/PSD/zin3/code/generator/TemperatureGenerator.java new file mode 100644 index 00000000..28912d68 --- /dev/null +++ b/Programming/PSD/zin3/code/generator/TemperatureGenerator.java @@ -0,0 +1,53 @@ +package generator; + +import model.TemperatureReading; +import org.apache.kafka.clients.producer.*; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.Properties; +import java.util.Random; + +public class TemperatureGenerator { + public static void main(String[] args) throws Exception { + // Set up Kafka producer properties + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + Producer producer = new KafkaProducer<>(props); + ObjectMapper objectMapper = new ObjectMapper(); + Random random = new Random(); + + // Generate a fixed number of thermometer IDs + String[] thermometerIds = {"Therm-1", "Therm-2", "Therm-3", "Therm-4", "Therm-5"}; + + try { + while (true) { + for (String thermometerId : thermometerIds) { + // Generate a random temperature between -10 and 30 degrees + double temperature = random.nextDouble() * 40 - 10; + + TemperatureReading reading = new TemperatureReading(); + reading.setThermometerId(thermometerId); + reading.setTimestamp(System.currentTimeMillis()); + reading.setTemperature(temperature); + + // Convert to JSON + String json = objectMapper.writeValueAsString(reading); + + // Send to Kafka topic "Temperatura" + ProducerRecord record = new ProducerRecord<>("Temperatura", thermometerId, json); + producer.send(record); + + System.out.println("Sent: " + json); + } + + // Sleep for a second + Thread.sleep(1000); + } + } finally { + producer.close(); + } + } +} diff --git a/Programming/PSD/zin3/code/model/TemperatureAlarm.java b/Programming/PSD/zin3/code/model/TemperatureAlarm.java new file mode 100644 index 00000000..6d0ab7ce --- /dev/null +++ b/Programming/PSD/zin3/code/model/TemperatureAlarm.java @@ -0,0 +1,40 @@ +package model; + +public class TemperatureAlarm { + private String thermometerId; + private long timestamp; + private double temperature; + + public TemperatureAlarm() { + } + + public TemperatureAlarm(String thermometerId, long timestamp, double temperature) { + this.thermometerId = thermometerId; + this.timestamp = timestamp; + this.temperature = temperature; + } + + public String getThermometerId() { + return thermometerId; + } + + public void setThermometerId(String thermometerId) { + this.thermometerId = thermometerId; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public double getTemperature() { + return temperature; + } + + public void setTemperature(double temperature) { + this.temperature = temperature; + } +} diff --git a/Programming/PSD/zin3/code/model/TemperatureReading.java b/Programming/PSD/zin3/code/model/TemperatureReading.java new file mode 100644 index 00000000..1bc0ba92 --- /dev/null +++ b/Programming/PSD/zin3/code/model/TemperatureReading.java @@ -0,0 +1,40 @@ +package model; + +public class TemperatureReading { + private String thermometerId; + private long timestamp; + private double temperature; + + public TemperatureReading() { + } + + public TemperatureReading(String thermometerId, long timestamp, double temperature) { + this.thermometerId = thermometerId; + this.timestamp = timestamp; + this.temperature = temperature; + } + + public String getThermometerId() { + return thermometerId; + } + + public void setThermometerId(String thermometerId) { + this.thermometerId = thermometerId; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public double getTemperature() { + return temperature; + } + + public void setTemperature(double temperature) { + this.temperature = temperature; + } +} diff --git a/Programming/PSD/zin3/code/pom.xml b/Programming/PSD/zin3/code/pom.xml new file mode 100644 index 00000000..492d4ae0 --- /dev/null +++ b/Programming/PSD/zin3/code/pom.xml @@ -0,0 +1,91 @@ + + + 4.0.0 + + com.example + temperature-monitoring + 1.0-SNAPSHOT + jar + + + UTF-8 + 1.14.0 + 2.8.0 + 1.8 + ${java.version} + ${java.version} + + + + + + org.apache.flink + flink-java + ${flink.version} + + + org.apache.flink + flink-streaming-java_2.12 + ${flink.version} + + + org.apache.flink + flink-clients_2.12 + ${flink.version} + + + org.apache.flink + flink-connector-kafka_2.12 + ${flink.version} + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + + + com.fasterxml.jackson.core + jackson-databind + 2.12.5 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${java.version} + ${java.version} + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.3.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + diff --git a/Programming/PSD/zin3/code/processor/TemperatureAnomalyDetector.java b/Programming/PSD/zin3/code/processor/TemperatureAnomalyDetector.java new file mode 100644 index 00000000..227ea57a --- /dev/null +++ b/Programming/PSD/zin3/code/processor/TemperatureAnomalyDetector.java @@ -0,0 +1,97 @@ +package processor; + +import model.TemperatureAlarm; +import model.TemperatureReading; +import org.apache.flink.api.common.functions.*; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.connectors.kafka.*; +import org.apache.flink.util.Collector; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.Properties; + +public class TemperatureAnomalyDetector { + public static void main(String[] args) throws Exception { + // Set up the streaming execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + // Configure Kafka consumer + Properties consumerProps = new Properties(); + consumerProps.setProperty("bootstrap.servers", "localhost:9092"); + consumerProps.setProperty("group.id", "temperature-anomaly-detector"); + + // Create Kafka consumer + FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>( + "Temperatura", new SimpleStringSchema(), consumerProps); + + // Create Kafka producer for alarms + Properties producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", "localhost:9092"); + + FlinkKafkaProducer producer = new FlinkKafkaProducer<>( + "Alarm", new SimpleStringSchema(), producerProps); + + // Object mapper for JSON conversion + final ObjectMapper objectMapper = new ObjectMapper(); + + // Create a DataStream from Kafka + DataStream inputStream = env.addSource(consumer); + + // Parse JSON, assign timestamps, and convert to tuples + DataStream> temperatureStream = inputStream + .map(json -> { + TemperatureReading reading = objectMapper.readValue(json, TemperatureReading.class); + return new Tuple3<>(reading.getThermometerId(), reading.getTimestamp(), reading.getTemperature()); + }) + .assignTimestampsAndWatermarks( + new AscendingTimestampExtractor>() { + @Override + public long extractAscendingTimestamp(Tuple3 element) { + return element.f1; // timestamp field + } + } + ); + + // Group by thermometer ID, apply time window, and detect anomalies + DataStream alarmStream = temperatureStream + .keyBy(0) // key by thermometer ID + .timeWindow(Time.seconds(10)) // 10-second windows + .apply(new WindowFunction, TemperatureAlarm, Tuple, TimeWindow>() { + @Override + public void apply(Tuple key, TimeWindow window, + Iterable> input, + Collector out) { + + // Check each reading in the window for below-zero temperature + for (Tuple3 reading : input) { + if (reading.f2 < 0.0) { + TemperatureAlarm alarm = new TemperatureAlarm(); + alarm.setThermometerId(reading.f0); + alarm.setTimestamp(reading.f1); + alarm.setTemperature(reading.f2); + out.collect(alarm); + } + } + } + }) + .map(alarm -> objectMapper.writeValueAsString(alarm)); + + // Send alarms to Kafka + alarmStream.addSink(producer); + + // Print alarms to console for debugging + alarmStream.print(); + + // Execute the Flink job + env.execute("Temperature Anomaly Detector with Time Windows"); + } +} diff --git a/Programming/PSD/zin3/code/visualizer/AlarmVisualizer.java b/Programming/PSD/zin3/code/visualizer/AlarmVisualizer.java new file mode 100644 index 00000000..0595eac2 --- /dev/null +++ b/Programming/PSD/zin3/code/visualizer/AlarmVisualizer.java @@ -0,0 +1,108 @@ +package visualizer; + +import model.TemperatureAlarm; +import org.apache.kafka.clients.consumer.*; +import javax.swing.*; +import java.awt.*; +import java.time.Duration; +import java.text.SimpleDateFormat; +import java.util.*; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class AlarmVisualizer { + private static DefaultListModel listModel = new DefaultListModel<>(); + private static final int MAX_ALARMS = 100; + + public static void main(String[] args) { + // Set up the GUI + JFrame frame = new JFrame("Temperature Alarm Visualizer"); + frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); + frame.setSize(600, 400); + + JList alarmList = new JList<>(listModel); + alarmList.setFont(new Font(Font.MONOSPACED, Font.PLAIN, 12)); + + JScrollPane scrollPane = new JScrollPane(alarmList); + frame.getContentPane().add(scrollPane, BorderLayout.CENTER); + + JPanel statsPanel = new JPanel(); + JLabel statsLabel = new JLabel("No alarms yet"); + statsPanel.add(statsLabel); + frame.getContentPane().add(statsPanel, BorderLayout.SOUTH); + + frame.setVisible(true); + + // Set up Kafka consumer in a separate thread + Thread consumerThread = new Thread(() -> { + // Set up Kafka consumer properties + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("group.id", "alarm-visualizer"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("auto.offset.reset", "latest"); + + Consumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList("Alarm")); + + ObjectMapper objectMapper = new ObjectMapper(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + Map thermometerAlarmCount = new HashMap<>(); + + try { + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + try { + TemperatureAlarm alarm = objectMapper.readValue(record.value(), TemperatureAlarm.class); + + // Format the timestamp + Date date = new Date(alarm.getTimestamp()); + String formattedDate = sdf.format(date); + + // Create alarm message + String alarmMessage = String.format( + "⚠️ ALARM: Thermometer %s reported %.2f°C at %s", + alarm.getThermometerId(), alarm.getTemperature(), formattedDate); + + // Update UI on the Event Dispatch Thread + SwingUtilities.invokeLater(() -> { + // Add new alarm to the top of the list + listModel.add(0, alarmMessage); + + // Keep list at a reasonable size + if (listModel.size() > MAX_ALARMS) { + listModel.remove(MAX_ALARMS); + } + + // Update statistics + thermometerAlarmCount.put( + alarm.getThermometerId(), + thermometerAlarmCount.getOrDefault(alarm.getThermometerId(), 0) + 1 + ); + + StringBuilder stats = new StringBuilder("Alarm Counts: "); + for (Map.Entry entry : thermometerAlarmCount.entrySet()) { + stats.append(entry.getKey()).append("=").append(entry.getValue()).append(", "); + } + // Remove trailing comma + if (stats.length() > 2) { + stats.setLength(stats.length() - 2); + } + statsLabel.setText(stats.toString()); + }); + } catch (Exception e) { + System.err.println("Error processing alarm: " + e.getMessage()); + } + } + } + } finally { + consumer.close(); + } + }); + + consumerThread.setDaemon(true); + consumerThread.start(); + } +} diff --git a/Programming/PSD/zin3/z03_Zajęcia zintegrowane_2025L_(001).pdf b/Programming/PSD/zin3/z03_Zajęcia zintegrowane_2025L_(001).pdf new file mode 100644 index 00000000..cbbc17a5 Binary files /dev/null and b/Programming/PSD/zin3/z03_Zajęcia zintegrowane_2025L_(001).pdf differ