feat: psd report 3 code

This commit is contained in:
Krzysztof Rudnicki 2025-04-24 13:34:07 +02:00
parent 4db98e3c93
commit 10dd006e58
7 changed files with 429 additions and 0 deletions

View File

@ -0,0 +1,53 @@
package generator;
import model.TemperatureReading;
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;
import java.util.Random;
public class TemperatureGenerator {
public static void main(String[] args) throws Exception {
// Set up Kafka producer properties
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ObjectMapper objectMapper = new ObjectMapper();
Random random = new Random();
// Generate a fixed number of thermometer IDs
String[] thermometerIds = {"Therm-1", "Therm-2", "Therm-3", "Therm-4", "Therm-5"};
try {
while (true) {
for (String thermometerId : thermometerIds) {
// Generate a random temperature between -10 and 30 degrees
double temperature = random.nextDouble() * 40 - 10;
TemperatureReading reading = new TemperatureReading();
reading.setThermometerId(thermometerId);
reading.setTimestamp(System.currentTimeMillis());
reading.setTemperature(temperature);
// Convert to JSON
String json = objectMapper.writeValueAsString(reading);
// Send to Kafka topic "Temperatura"
ProducerRecord<String, String> record = new ProducerRecord<>("Temperatura", thermometerId, json);
producer.send(record);
System.out.println("Sent: " + json);
}
// Sleep for a second
Thread.sleep(1000);
}
} finally {
producer.close();
}
}
}

View File

@ -0,0 +1,40 @@
package model;
public class TemperatureAlarm {
private String thermometerId;
private long timestamp;
private double temperature;
public TemperatureAlarm() {
}
public TemperatureAlarm(String thermometerId, long timestamp, double temperature) {
this.thermometerId = thermometerId;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getThermometerId() {
return thermometerId;
}
public void setThermometerId(String thermometerId) {
this.thermometerId = thermometerId;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public double getTemperature() {
return temperature;
}
public void setTemperature(double temperature) {
this.temperature = temperature;
}
}

View File

@ -0,0 +1,40 @@
package model;
public class TemperatureReading {
private String thermometerId;
private long timestamp;
private double temperature;
public TemperatureReading() {
}
public TemperatureReading(String thermometerId, long timestamp, double temperature) {
this.thermometerId = thermometerId;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getThermometerId() {
return thermometerId;
}
public void setThermometerId(String thermometerId) {
this.thermometerId = thermometerId;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public double getTemperature() {
return temperature;
}
public void setTemperature(double temperature) {
this.temperature = temperature;
}
}

View File

@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>temperature-monitoring</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.0</flink.version>
<kafka.version>2.8.0</kafka.version>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Apache Kafka dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- Jackson for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,97 @@
package processor;
import model.TemperatureAlarm;
import model.TemperatureReading;
import org.apache.flink.api.common.functions.*;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.*;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;
public class TemperatureAnomalyDetector {
public static void main(String[] args) throws Exception {
// Set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Configure Kafka consumer
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("group.id", "temperature-anomaly-detector");
// Create Kafka consumer
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"Temperatura", new SimpleStringSchema(), consumerProps);
// Create Kafka producer for alarms
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
"Alarm", new SimpleStringSchema(), producerProps);
// Object mapper for JSON conversion
final ObjectMapper objectMapper = new ObjectMapper();
// Create a DataStream from Kafka
DataStream<String> inputStream = env.addSource(consumer);
// Parse JSON, assign timestamps, and convert to tuples
DataStream<Tuple3<String, Long, Double>> temperatureStream = inputStream
.map(json -> {
TemperatureReading reading = objectMapper.readValue(json, TemperatureReading.class);
return new Tuple3<>(reading.getThermometerId(), reading.getTimestamp(), reading.getTemperature());
})
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple3<String, Long, Double>>() {
@Override
public long extractAscendingTimestamp(Tuple3<String, Long, Double> element) {
return element.f1; // timestamp field
}
}
);
// Group by thermometer ID, apply time window, and detect anomalies
DataStream<String> alarmStream = temperatureStream
.keyBy(0) // key by thermometer ID
.timeWindow(Time.seconds(10)) // 10-second windows
.apply(new WindowFunction<Tuple3<String, Long, Double>, TemperatureAlarm, Tuple, TimeWindow>() {
@Override
public void apply(Tuple key, TimeWindow window,
Iterable<Tuple3<String, Long, Double>> input,
Collector<TemperatureAlarm> out) {
// Check each reading in the window for below-zero temperature
for (Tuple3<String, Long, Double> reading : input) {
if (reading.f2 < 0.0) {
TemperatureAlarm alarm = new TemperatureAlarm();
alarm.setThermometerId(reading.f0);
alarm.setTimestamp(reading.f1);
alarm.setTemperature(reading.f2);
out.collect(alarm);
}
}
}
})
.map(alarm -> objectMapper.writeValueAsString(alarm));
// Send alarms to Kafka
alarmStream.addSink(producer);
// Print alarms to console for debugging
alarmStream.print();
// Execute the Flink job
env.execute("Temperature Anomaly Detector with Time Windows");
}
}

View File

@ -0,0 +1,108 @@
package visualizer;
import model.TemperatureAlarm;
import org.apache.kafka.clients.consumer.*;
import javax.swing.*;
import java.awt.*;
import java.time.Duration;
import java.text.SimpleDateFormat;
import java.util.*;
import com.fasterxml.jackson.databind.ObjectMapper;
public class AlarmVisualizer {
private static DefaultListModel<String> listModel = new DefaultListModel<>();
private static final int MAX_ALARMS = 100;
public static void main(String[] args) {
// Set up the GUI
JFrame frame = new JFrame("Temperature Alarm Visualizer");
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
frame.setSize(600, 400);
JList<String> alarmList = new JList<>(listModel);
alarmList.setFont(new Font(Font.MONOSPACED, Font.PLAIN, 12));
JScrollPane scrollPane = new JScrollPane(alarmList);
frame.getContentPane().add(scrollPane, BorderLayout.CENTER);
JPanel statsPanel = new JPanel();
JLabel statsLabel = new JLabel("No alarms yet");
statsPanel.add(statsLabel);
frame.getContentPane().add(statsPanel, BorderLayout.SOUTH);
frame.setVisible(true);
// Set up Kafka consumer in a separate thread
Thread consumerThread = new Thread(() -> {
// Set up Kafka consumer properties
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "alarm-visualizer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("Alarm"));
ObjectMapper objectMapper = new ObjectMapper();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Map<String, Integer> thermometerAlarmCount = new HashMap<>();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
TemperatureAlarm alarm = objectMapper.readValue(record.value(), TemperatureAlarm.class);
// Format the timestamp
Date date = new Date(alarm.getTimestamp());
String formattedDate = sdf.format(date);
// Create alarm message
String alarmMessage = String.format(
"⚠️ ALARM: Thermometer %s reported %.2f°C at %s",
alarm.getThermometerId(), alarm.getTemperature(), formattedDate);
// Update UI on the Event Dispatch Thread
SwingUtilities.invokeLater(() -> {
// Add new alarm to the top of the list
listModel.add(0, alarmMessage);
// Keep list at a reasonable size
if (listModel.size() > MAX_ALARMS) {
listModel.remove(MAX_ALARMS);
}
// Update statistics
thermometerAlarmCount.put(
alarm.getThermometerId(),
thermometerAlarmCount.getOrDefault(alarm.getThermometerId(), 0) + 1
);
StringBuilder stats = new StringBuilder("Alarm Counts: ");
for (Map.Entry<String, Integer> entry : thermometerAlarmCount.entrySet()) {
stats.append(entry.getKey()).append("=").append(entry.getValue()).append(", ");
}
// Remove trailing comma
if (stats.length() > 2) {
stats.setLength(stats.length() - 2);
}
statsLabel.setText(stats.toString());
});
} catch (Exception e) {
System.err.println("Error processing alarm: " + e.getMessage());
}
}
}
} finally {
consumer.close();
}
});
consumerThread.setDaemon(true);
consumerThread.start();
}
}