wip: report 3

This commit is contained in:
Krzysztof kuhy Rudnicki 2025-05-05 15:59:12 +02:00
parent ade11e2225
commit 41057763b3
38 changed files with 1026 additions and 0 deletions

View File

@ -0,0 +1,37 @@
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
jobmanager:
image: flink:latest
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:latest
depends_on:
- jobmanager
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager

View File

@ -0,0 +1,74 @@
#!/bin/bash
# Set working directory to script location
cd "$(dirname "$0")"
# Check if Docker daemon is running
if ! docker info &>/dev/null; then
echo "ERROR: Docker daemon is not running."
echo "Please start Docker with: 'sudo systemctl start docker'"
echo "If you want Docker to start automatically at boot: 'sudo systemctl enable docker'"
echo "To run Docker without sudo, add your user to the docker group: 'sudo usermod -aG docker $USER'"
echo "Then log out and log back in for the changes to take effect."
exit 1
fi
echo "Starting Docker containers..."
docker-compose up -d
# Wait for services to start
echo "Waiting for Kafka and Flink to start..."
sleep 10
echo "Building Maven projects..."
cd temperature-generator && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
cd temperature-anomaly-detector && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
cd temperature-alert-visualizer && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
echo "Creating Kafka topics..."
docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Temperatura
docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Alarm
echo "Starting all applications..."
# Start temperature anomaly detector - submit to Flink
echo "Starting Temperature Anomaly Detector..."
cd temperature-anomaly-detector
java -jar target/temperature-anomaly-detector-1.0-SNAPSHOT.jar &
ANOMALY_PID=$!
cd ..
# Start Alert Visualizer
echo "Starting Temperature Alert Visualizer..."
cd temperature-alert-visualizer
java -jar target/temperature-alert-visualizer-1.0-SNAPSHOT.jar &
VISUALIZER_PID=$!
cd ..
# Start Temperature Generator last
echo "Starting Temperature Generator..."
cd temperature-generator
java -jar target/temperature-generator-1.0-SNAPSHOT.jar &
GENERATOR_PID=$!
cd ..
echo "All applications are running!"
echo "Press Ctrl+C to stop all applications"
# Function to handle shutdown
function cleanup {
echo "Shutting down applications..."
kill $GENERATOR_PID $VISUALIZER_PID $ANOMALY_PID
echo "Stopping Docker containers..."
docker-compose down
echo "All done!"
exit 0
}
# Catch shutdown signal
trap cleanup SIGINT SIGTERM
# Keep script running
while true; do
sleep 1
done

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>temperature-alert-visualizer</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer>
<mainClass>org.example.TemperatureAlertVisualizer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<maven.compiler.target>11</maven.compiler.target>
<jackson.version>2.14.2</jackson.version>
<maven.compiler.source>11</maven.compiler.source>
<kafka.version>3.4.0</kafka.version>
</properties>
</project>

View File

@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>temperature-alert-visualizer</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<kafka.version>3.4.0</kafka.version>
<jackson.version>2.14.2</jackson.version>
</properties>
<dependencies>
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- Jackson for JSON -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- For simple UI -->
<dependency>
<groupId>org.jfree</groupId>
<artifactId>jfreechart</artifactId>
<version>1.5.3</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.example.TemperatureAlertVisualizer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,188 @@
package org.example;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartPanel;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.axis.NumberAxis;
import org.jfree.chart.plot.CategoryPlot;
import org.jfree.chart.plot.PlotOrientation;
import org.jfree.data.category.DefaultCategoryDataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.swing.*;
import java.awt.*;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TemperatureAlertVisualizer {
private static final Logger logger = LoggerFactory.getLogger(TemperatureAlertVisualizer.class);
private static final String TOPIC = "Alarm";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final ConcurrentHashMap<String, TemperatureAlert> latestAlerts = new ConcurrentHashMap<>();
private static final DefaultCategoryDataset dataset = new DefaultCategoryDataset();
private static final JTextArea alertTextArea = new JTextArea(20, 50);
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
// Create and configure UI
setupUI();
// Start Kafka consumer in a separate thread
Thread consumerThread = new Thread(() -> consumeAlerts());
consumerThread.setDaemon(true);
consumerThread.start();
// Schedule regular UI updates
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(TemperatureAlertVisualizer::updateUI, 0, 2, TimeUnit.SECONDS);
}
private static void setupUI() {
JFrame frame = new JFrame("Temperature Alert Visualizer");
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
frame.setSize(800, 600);
JPanel mainPanel = new JPanel(new BorderLayout());
// Create chart for temperatures
JFreeChart chart = ChartFactory.createBarChart(
"Temperature Anomalies",
"Thermometer",
"Temperature (°C)",
dataset,
PlotOrientation.VERTICAL,
true,
true,
false
);
// Customize chart
CategoryPlot plot = chart.getCategoryPlot();
NumberAxis rangeAxis = (NumberAxis) plot.getRangeAxis();
rangeAxis.setRange(-15.0, 5.0); // Set range for negative temperatures
ChartPanel chartPanel = new ChartPanel(chart);
chartPanel.setPreferredSize(new Dimension(800, 300));
// Set up text area for alerts
alertTextArea.setEditable(false);
JScrollPane scrollPane = new JScrollPane(alertTextArea);
// Add components to main panel
mainPanel.add(chartPanel, BorderLayout.CENTER);
mainPanel.add(scrollPane, BorderLayout.SOUTH);
frame.add(mainPanel);
frame.setVisible(true);
}
private static void consumeAlerts() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "temperature-alert-visualizer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
JsonNode alertJson = objectMapper.readTree(record.value());
TemperatureAlert alert = new TemperatureAlert(
alertJson.get("thermometerId").asText(),
alertJson.get("temperature").asDouble(),
alertJson.get("alertTime").asText(),
alertJson.get("message").asText()
);
latestAlerts.put(alert.thermometerId, alert);
logger.info("Received alert: " + alert);
} catch (Exception e) {
logger.error("Error processing alert record", e);
}
}
}
}
}
private static void updateUI() {
SwingUtilities.invokeLater(() -> {
// Update chart data
dataset.clear();
StringBuilder alertText = new StringBuilder();
for (TemperatureAlert alert : latestAlerts.values()) {
// Add to chart
dataset.addValue(alert.temperature, "Temperature", alert.thermometerId);
// Format timestamp
LocalDateTime dateTime = LocalDateTime.ofInstant(
Instant.parse(alert.alertTime),
ZoneId.systemDefault()
);
String formattedTime = formatter.format(dateTime);
// Add to text area
alertText.append(formattedTime)
.append(" | ")
.append(alert.thermometerId)
.append(" | ")
.append(String.format("%.2f°C", alert.temperature))
.append(" | ")
.append(alert.message)
.append("\n");
}
alertTextArea.setText(alertText.toString());
});
}
static class TemperatureAlert {
String thermometerId;
double temperature;
String alertTime;
String message;
public TemperatureAlert(String thermometerId, double temperature, String alertTime, String message) {
this.thermometerId = thermometerId;
this.temperature = temperature;
this.alertTime = alertTime;
this.message = message;
}
@Override
public String toString() {
return "Alert{" +
"thermometerId='" + thermometerId + '\'' +
", temperature=" + temperature +
", alertTime='" + alertTime + '\'' +
", message='" + message + '\'' +
'}';
}
}
}

View File

@ -0,0 +1,3 @@
artifactId=temperature-alert-visualizer
groupId=org.example
version=1.0-SNAPSHOT

View File

@ -0,0 +1,2 @@
org/example/TemperatureAlertVisualizer$TemperatureAlert.class
org/example/TemperatureAlertVisualizer.class

View File

@ -0,0 +1 @@
/home/kuhy/WUT_Computer_Science/Programming/PSD/zin3/third/temperature-alert-visualizer/src/main/java/org/example/TemperatureAlertVisualizer.java

View File

@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>temperature-anomaly-detector</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer>
<mainClass>org.example.TemperatureAnomalyDetector</mainClass>
</transformer>
<transformer />
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<properties>
<maven.compiler.target>11</maven.compiler.target>
<jackson.version>2.14.2</jackson.version>
<flink.version>1.17.0</flink.version>
<maven.compiler.source>11</maven.compiler.source>
</properties>
</project>

View File

@ -0,0 +1,150 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>temperature-anomaly-detector</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<jackson.version>2.14.2</jackson.version>
</properties>
<dependencies>
<!-- Flink Core - Remove the 'provided' scope to include in the JAR -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<!-- Remove the provided scope so it's included in the JAR -->
</dependency>
<!-- Flink Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Core - Required for execution -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Jackson for JSON -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Add maven-compiler-plugin to configure JVM arguments -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<compilerArgs>
<arg>--add-opens</arg>
<arg>java.base/java.util=ALL-UNNAMED</arg>
<arg>--add-opens</arg>
<arg>java.base/java.lang=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.example.TemperatureAnomalyDetector</mainClass>
</transformer>
<!-- Include this transformer to handle service files properly -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<!-- Configure the same JVM args for runtime execution -->
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
</build>
<!-- Add exec plugin to pass JVM arguments when running with mvn exec -->
<profiles>
<profile>
<id>add-opens</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<executable>java</executable>
<arguments>
<argument>--add-opens</argument>
<argument>java.base/java.util=ALL-UNNAMED</argument>
<argument>--add-opens</argument>
<argument>java.base/java.lang=ALL-UNNAMED</argument>
<argument>-classpath</argument>
<classpath/>
<argument>org.example.TemperatureAnomalyDetector</argument>
</arguments>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,149 @@
package org.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.Properties;
public class TemperatureAnomalyDetector {
private static final Logger logger = LoggerFactory.getLogger(TemperatureAnomalyDetector.class);
private static final String INPUT_TOPIC = "Temperatura";
private static final String OUTPUT_TOPIC = "Alarm";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
logger.info("Starting Temperature Anomaly Detector application");
// Create and configure the Flink execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism to 1 for simplified processing
env.setParallelism(1);
logger.info("Configuring Kafka source for topic: {}", INPUT_TOPIC);
// Configure Kafka source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setTopics(INPUT_TOPIC)
.setGroupId("temperature-anomaly-detector")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
logger.info("Configuring Kafka sink for topic: {}", OUTPUT_TOPIC);
// Configure Kafka sink
Properties producerProps = new Properties();
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()
.setTopic(OUTPUT_TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setKafkaProducerConfig(producerProps)
.build();
logger.info("Building temperature processing pipeline");
// Parse temperature readings from Kafka
DataStream<TemperatureReading> temperatureStream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.map(new MapFunction<String, TemperatureReading>() {
@Override
public TemperatureReading map(String jsonString) throws Exception {
logger.debug("Received temperature reading: {}", jsonString);
return objectMapper.readValue(jsonString, TemperatureReading.class);
}
});
// Detect anomalies (temperatures below zero)
DataStream<String> alarmStream = temperatureStream
.filter(new FilterFunction<TemperatureReading>() {
@Override
public boolean filter(TemperatureReading reading) throws Exception {
boolean isAnomaly = reading.temperature < 0.0;
if (isAnomaly) {
logger.info("Anomaly detected: {}°C from {}",
reading.temperature, reading.thermometerId);
}
return isAnomaly;
}
})
// Group by thermometer ID
.keyBy((KeySelector<TemperatureReading, String>) reading -> reading.thermometerId)
// Apply a time window to aggregate anomalies
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new TemperatureWindowFunction())
.map(new MapFunction<TemperatureAlert, String>() {
@Override
public String map(TemperatureAlert alert) throws Exception {
ObjectNode alertJson = objectMapper.createObjectNode();
alertJson.put("thermometerId", alert.thermometerId);
alertJson.put("temperature", alert.temperature);
alertJson.put("alertTime", alert.alertTime);
alertJson.put("message", alert.message);
String json = objectMapper.writeValueAsString(alertJson);
logger.info("Producing alert: {}", json);
return json;
}
});
// Send alerts to Kafka
alarmStream.sinkTo(sink);
logger.info("Executing Temperature Anomaly Detector job");
// Execute the Flink job
try {
env.execute("Temperature Anomaly Detector");
} catch (Exception e) {
logger.error("Error executing Flink job", e);
throw e;
}
}
// POJO classes
public static class TemperatureReading {
public String thermometerId;
public String timestamp;
public double temperature;
public TemperatureReading() {}
}
public static class TemperatureAlert {
public String thermometerId;
public double temperature;
public String alertTime;
public String message;
public TemperatureAlert(String thermometerId, double temperature, String message) {
this.thermometerId = thermometerId;
this.temperature = temperature;
this.alertTime = Instant.now().toString();
this.message = message;
}
}
}

View File

@ -0,0 +1,50 @@
package org.example;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.example.TemperatureAnomalyDetector.TemperatureReading;
import org.example.TemperatureAnomalyDetector.TemperatureAlert;
import java.util.Iterator;
public class TemperatureWindowFunction implements WindowFunction<TemperatureReading, TemperatureAlert, String, TimeWindow> {
@Override
public void apply(
String key,
TimeWindow window,
Iterable<TemperatureReading> values,
Collector<TemperatureAlert> out) {
Iterator<TemperatureReading> iterator = values.iterator();
if (!iterator.hasNext()) {
return; // No readings in this window
}
// Find the lowest temperature in the window
TemperatureReading lowestReading = iterator.next();
double lowestTemp = lowestReading.temperature;
while (iterator.hasNext()) {
TemperatureReading reading = iterator.next();
if (reading.temperature < lowestTemp) {
lowestTemp = reading.temperature;
lowestReading = reading;
}
}
// Create and emit an alert for the lowest temperature reading
String message = String.format("Freezing temperature alert! %s reported %.2f°C",
lowestReading.thermometerId, lowestTemp);
TemperatureAlert alert = new TemperatureAlert(
lowestReading.thermometerId,
lowestTemp,
message
);
out.collect(alert);
}
}

View File

@ -0,0 +1,3 @@
artifactId=temperature-anomaly-detector
groupId=org.example
version=1.0-SNAPSHOT

View File

@ -0,0 +1,7 @@
org/example/TemperatureAnomalyDetector.class
org/example/TemperatureAnomalyDetector$2.class
org/example/TemperatureWindowFunction.class
org/example/TemperatureAnomalyDetector$TemperatureReading.class
org/example/TemperatureAnomalyDetector$TemperatureAlert.class
org/example/TemperatureAnomalyDetector$1.class
org/example/TemperatureAnomalyDetector$3.class

View File

@ -0,0 +1,2 @@
/home/kuhy/WUT_Computer_Science/Programming/PSD/zin3/third/temperature-anomaly-detector/src/main/java/org/example/TemperatureAnomalyDetector.java
/home/kuhy/WUT_Computer_Science/Programming/PSD/zin3/third/temperature-anomaly-detector/src/main/java/org/example/TemperatureWindowFunction.java

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>temperature-generator</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer>
<mainClass>org.example.TemperatureGenerator</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<maven.compiler.target>11</maven.compiler.target>
<jackson.version>2.14.2</jackson.version>
<maven.compiler.source>11</maven.compiler.source>
<kafka.version>3.4.0</kafka.version>
</properties>
</project>

View File

@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>temperature-generator</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<kafka.version>3.4.0</kafka.version>
<jackson.version>2.14.2</jackson.version>
</properties>
<dependencies>
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- Jackson for JSON -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.example.TemperatureGenerator</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,85 @@
package org.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class TemperatureGenerator {
private static final Logger logger = LoggerFactory.getLogger(TemperatureGenerator.class);
private static final String TOPIC = "Temperatura";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final int NUM_THERMOMETERS = 5;
private static final Random random = new Random();
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) {
// Configure Kafka producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
logger.info("Starting temperature data generation...");
while (true) {
for (int i = 1; i <= NUM_THERMOMETERS; i++) {
// Generate a temperature (sometimes below zero)
double temperature = (random.nextDouble() * 40) - 10; // Range from -10°C to 30°C
TemperatureReading reading = new TemperatureReading(
"Thermometer-" + i,
Instant.now().toString(),
temperature
);
String jsonReading = objectMapper.writeValueAsString(reading);
ProducerRecord<String, String> record = new ProducerRecord<>(
TOPIC,
reading.thermometerId,
jsonReading
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
logger.error("Error sending temperature data", exception);
} else {
logger.info("Sent temperature reading: {} | {} | {:.2f}°C",
reading.thermometerId,
reading.timestamp,
reading.temperature);
}
});
}
// Sleep before generating next batch
TimeUnit.SECONDS.sleep(2);
}
} catch (Exception e) {
logger.error("Error in temperature generator", e);
}
}
public static class TemperatureReading {
public String thermometerId;
public String timestamp;
public double temperature;
public TemperatureReading() {}
public TemperatureReading(String thermometerId, String timestamp, double temperature) {
this.thermometerId = thermometerId;
this.timestamp = timestamp;
this.temperature = temperature;
}
}
}

View File

@ -0,0 +1,3 @@
artifactId=temperature-generator
groupId=org.example
version=1.0-SNAPSHOT

View File

@ -0,0 +1,2 @@
org/example/TemperatureGenerator.class
org/example/TemperatureGenerator$TemperatureReading.class

View File

@ -0,0 +1 @@
/home/kuhy/WUT_Computer_Science/Programming/PSD/zin3/third/temperature-generator/src/main/java/org/example/TemperatureGenerator.java