diff --git a/Programming/PSD/zin3/third/docker-compose.yml b/Programming/PSD/zin3/third/docker-compose.yml new file mode 100644 index 00000000..39b61682 --- /dev/null +++ b/Programming/PSD/zin3/third/docker-compose.yml @@ -0,0 +1,37 @@ +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ports: + - "2181:2181" + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + + jobmanager: + image: flink:latest + ports: + - "8081:8081" + command: jobmanager + environment: + - JOB_MANAGER_RPC_ADDRESS=jobmanager + + taskmanager: + image: flink:latest + depends_on: + - jobmanager + command: taskmanager + environment: + - JOB_MANAGER_RPC_ADDRESS=jobmanager \ No newline at end of file diff --git a/Programming/PSD/zin3/third/run_all.sh b/Programming/PSD/zin3/third/run_all.sh new file mode 100755 index 00000000..9ebcc2d0 --- /dev/null +++ b/Programming/PSD/zin3/third/run_all.sh @@ -0,0 +1,74 @@ +#!/bin/bash + +# Set working directory to script location +cd "$(dirname "$0")" + +# Check if Docker daemon is running +if ! docker info &>/dev/null; then + echo "ERROR: Docker daemon is not running." + echo "Please start Docker with: 'sudo systemctl start docker'" + echo "If you want Docker to start automatically at boot: 'sudo systemctl enable docker'" + echo "To run Docker without sudo, add your user to the docker group: 'sudo usermod -aG docker $USER'" + echo "Then log out and log back in for the changes to take effect." + exit 1 +fi + +echo "Starting Docker containers..." +docker-compose up -d + +# Wait for services to start +echo "Waiting for Kafka and Flink to start..." +sleep 10 + +echo "Building Maven projects..." +cd temperature-generator && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd .. +cd temperature-anomaly-detector && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd .. +cd temperature-alert-visualizer && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd .. + +echo "Creating Kafka topics..." +docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Temperatura +docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Alarm + +echo "Starting all applications..." + +# Start temperature anomaly detector - submit to Flink +echo "Starting Temperature Anomaly Detector..." +cd temperature-anomaly-detector +java -jar target/temperature-anomaly-detector-1.0-SNAPSHOT.jar & +ANOMALY_PID=$! +cd .. + +# Start Alert Visualizer +echo "Starting Temperature Alert Visualizer..." +cd temperature-alert-visualizer +java -jar target/temperature-alert-visualizer-1.0-SNAPSHOT.jar & +VISUALIZER_PID=$! +cd .. + +# Start Temperature Generator last +echo "Starting Temperature Generator..." +cd temperature-generator +java -jar target/temperature-generator-1.0-SNAPSHOT.jar & +GENERATOR_PID=$! +cd .. + +echo "All applications are running!" +echo "Press Ctrl+C to stop all applications" + +# Function to handle shutdown +function cleanup { + echo "Shutting down applications..." + kill $GENERATOR_PID $VISUALIZER_PID $ANOMALY_PID + echo "Stopping Docker containers..." + docker-compose down + echo "All done!" + exit 0 +} + +# Catch shutdown signal +trap cleanup SIGINT SIGTERM + +# Keep script running +while true; do + sleep 1 +done diff --git a/Programming/PSD/zin3/third/temperature-alert-visualizer/dependency-reduced-pom.xml b/Programming/PSD/zin3/third/temperature-alert-visualizer/dependency-reduced-pom.xml new file mode 100644 index 00000000..b4772156 --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-alert-visualizer/dependency-reduced-pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + org.example + temperature-alert-visualizer + 1.0-SNAPSHOT + + + + maven-shade-plugin + 3.4.1 + + + package + + shade + + + + + org.example.TemperatureAlertVisualizer + + + + + + + + + + 11 + 2.14.2 + 11 + 3.4.0 + + diff --git a/Programming/PSD/zin3/third/temperature-alert-visualizer/pom.xml b/Programming/PSD/zin3/third/temperature-alert-visualizer/pom.xml new file mode 100644 index 00000000..cb6817af --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-alert-visualizer/pom.xml @@ -0,0 +1,74 @@ + + + 4.0.0 + + org.example + temperature-alert-visualizer + 1.0-SNAPSHOT + + + 11 + 11 + 3.4.0 + 2.14.2 + + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + + org.jfree + jfreechart + 1.5.3 + + + + org.slf4j + slf4j-api + 2.0.5 + + + org.slf4j + slf4j-simple + 2.0.5 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.4.1 + + + package + + shade + + + + + org.example.TemperatureAlertVisualizer + + + + + + + + + diff --git a/Programming/PSD/zin3/third/temperature-alert-visualizer/src/main/java/org/example/TemperatureAlertVisualizer.java b/Programming/PSD/zin3/third/temperature-alert-visualizer/src/main/java/org/example/TemperatureAlertVisualizer.java new file mode 100644 index 00000000..759f7097 --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-alert-visualizer/src/main/java/org/example/TemperatureAlertVisualizer.java @@ -0,0 +1,188 @@ +package org.example; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.jfree.chart.ChartFactory; +import org.jfree.chart.ChartPanel; +import org.jfree.chart.JFreeChart; +import org.jfree.chart.axis.NumberAxis; +import org.jfree.chart.plot.CategoryPlot; +import org.jfree.chart.plot.PlotOrientation; +import org.jfree.data.category.DefaultCategoryDataset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.swing.*; +import java.awt.*; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class TemperatureAlertVisualizer { + private static final Logger logger = LoggerFactory.getLogger(TemperatureAlertVisualizer.class); + private static final String TOPIC = "Alarm"; + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private static final ConcurrentHashMap latestAlerts = new ConcurrentHashMap<>(); + private static final DefaultCategoryDataset dataset = new DefaultCategoryDataset(); + private static final JTextArea alertTextArea = new JTextArea(20, 50); + private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + public static void main(String[] args) { + // Create and configure UI + setupUI(); + + // Start Kafka consumer in a separate thread + Thread consumerThread = new Thread(() -> consumeAlerts()); + consumerThread.setDaemon(true); + consumerThread.start(); + + // Schedule regular UI updates + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate(TemperatureAlertVisualizer::updateUI, 0, 2, TimeUnit.SECONDS); + } + + private static void setupUI() { + JFrame frame = new JFrame("Temperature Alert Visualizer"); + frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); + frame.setSize(800, 600); + + JPanel mainPanel = new JPanel(new BorderLayout()); + + // Create chart for temperatures + JFreeChart chart = ChartFactory.createBarChart( + "Temperature Anomalies", + "Thermometer", + "Temperature (°C)", + dataset, + PlotOrientation.VERTICAL, + true, + true, + false + ); + + // Customize chart + CategoryPlot plot = chart.getCategoryPlot(); + NumberAxis rangeAxis = (NumberAxis) plot.getRangeAxis(); + rangeAxis.setRange(-15.0, 5.0); // Set range for negative temperatures + + ChartPanel chartPanel = new ChartPanel(chart); + chartPanel.setPreferredSize(new Dimension(800, 300)); + + // Set up text area for alerts + alertTextArea.setEditable(false); + JScrollPane scrollPane = new JScrollPane(alertTextArea); + + // Add components to main panel + mainPanel.add(chartPanel, BorderLayout.CENTER); + mainPanel.add(scrollPane, BorderLayout.SOUTH); + + frame.add(mainPanel); + frame.setVisible(true); + } + + private static void consumeAlerts() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "temperature-alert-visualizer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { + consumer.subscribe(Collections.singletonList(TOPIC)); + + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + for (ConsumerRecord record : records) { + try { + JsonNode alertJson = objectMapper.readTree(record.value()); + + TemperatureAlert alert = new TemperatureAlert( + alertJson.get("thermometerId").asText(), + alertJson.get("temperature").asDouble(), + alertJson.get("alertTime").asText(), + alertJson.get("message").asText() + ); + + latestAlerts.put(alert.thermometerId, alert); + logger.info("Received alert: " + alert); + } catch (Exception e) { + logger.error("Error processing alert record", e); + } + } + } + } + } + + private static void updateUI() { + SwingUtilities.invokeLater(() -> { + // Update chart data + dataset.clear(); + StringBuilder alertText = new StringBuilder(); + + for (TemperatureAlert alert : latestAlerts.values()) { + // Add to chart + dataset.addValue(alert.temperature, "Temperature", alert.thermometerId); + + // Format timestamp + LocalDateTime dateTime = LocalDateTime.ofInstant( + Instant.parse(alert.alertTime), + ZoneId.systemDefault() + ); + String formattedTime = formatter.format(dateTime); + + // Add to text area + alertText.append(formattedTime) + .append(" | ") + .append(alert.thermometerId) + .append(" | ") + .append(String.format("%.2f°C", alert.temperature)) + .append(" | ") + .append(alert.message) + .append("\n"); + } + + alertTextArea.setText(alertText.toString()); + }); + } + + static class TemperatureAlert { + String thermometerId; + double temperature; + String alertTime; + String message; + + public TemperatureAlert(String thermometerId, double temperature, String alertTime, String message) { + this.thermometerId = thermometerId; + this.temperature = temperature; + this.alertTime = alertTime; + this.message = message; + } + + @Override + public String toString() { + return "Alert{" + + "thermometerId='" + thermometerId + '\'' + + ", temperature=" + temperature + + ", alertTime='" + alertTime + '\'' + + ", message='" + message + '\'' + + '}'; + } + } +} diff --git a/Programming/PSD/zin3/third/temperature-alert-visualizer/target/classes/org/example/TemperatureAlertVisualizer$TemperatureAlert.class b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/classes/org/example/TemperatureAlertVisualizer$TemperatureAlert.class new file mode 100644 index 00000000..6aa6b8ce Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/classes/org/example/TemperatureAlertVisualizer$TemperatureAlert.class differ diff --git a/Programming/PSD/zin3/third/temperature-alert-visualizer/target/classes/org/example/TemperatureAlertVisualizer.class b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/classes/org/example/TemperatureAlertVisualizer.class new file mode 100644 index 00000000..a47b8bac Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/classes/org/example/TemperatureAlertVisualizer.class differ diff --git a/Programming/PSD/zin3/third/temperature-alert-visualizer/target/maven-archiver/pom.properties b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/maven-archiver/pom.properties new file mode 100644 index 00000000..d689c38f --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/maven-archiver/pom.properties @@ -0,0 +1,3 @@ +artifactId=temperature-alert-visualizer +groupId=org.example +version=1.0-SNAPSHOT diff --git a/Programming/PSD/zin3/third/temperature-alert-visualizer/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst new file mode 100644 index 00000000..44533a87 --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst @@ -0,0 +1,2 @@ +org/example/TemperatureAlertVisualizer$TemperatureAlert.class +org/example/TemperatureAlertVisualizer.class diff --git a/Programming/PSD/zin3/third/temperature-alert-visualizer/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst new file mode 100644 index 00000000..ddb5dba3 --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst @@ -0,0 +1 @@ +/home/kuhy/WUT_Computer_Science/Programming/PSD/zin3/third/temperature-alert-visualizer/src/main/java/org/example/TemperatureAlertVisualizer.java diff --git a/Programming/PSD/zin3/third/temperature-alert-visualizer/target/original-temperature-alert-visualizer-1.0-SNAPSHOT.jar b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/original-temperature-alert-visualizer-1.0-SNAPSHOT.jar new file mode 100644 index 00000000..cec6ffdd Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/original-temperature-alert-visualizer-1.0-SNAPSHOT.jar differ diff --git a/Programming/PSD/zin3/third/temperature-alert-visualizer/target/temperature-alert-visualizer-1.0-SNAPSHOT.jar b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/temperature-alert-visualizer-1.0-SNAPSHOT.jar new file mode 100644 index 00000000..7745b054 Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-alert-visualizer/target/temperature-alert-visualizer-1.0-SNAPSHOT.jar differ diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/dependency-reduced-pom.xml b/Programming/PSD/zin3/third/temperature-anomaly-detector/dependency-reduced-pom.xml new file mode 100644 index 00000000..0bb8ca87 --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-anomaly-detector/dependency-reduced-pom.xml @@ -0,0 +1,55 @@ + + + 4.0.0 + org.example + temperature-anomaly-detector + 1.0-SNAPSHOT + + + + maven-shade-plugin + 3.4.1 + + + package + + shade + + + + + org.example.TemperatureAnomalyDetector + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + + org.apache.flink + flink-streaming-java + 1.17.0 + provided + + + + 11 + 2.14.2 + 1.17.0 + 11 + + diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/pom.xml b/Programming/PSD/zin3/third/temperature-anomaly-detector/pom.xml new file mode 100644 index 00000000..6e51f0ef --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-anomaly-detector/pom.xml @@ -0,0 +1,150 @@ + + + 4.0.0 + + org.example + temperature-anomaly-detector + 1.0-SNAPSHOT + + + 11 + 11 + 1.17.0 + 2.14.2 + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + + + + + org.apache.flink + flink-connector-kafka + ${flink.version} + + + + org.apache.flink + flink-clients + ${flink.version} + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + + org.slf4j + slf4j-api + 2.0.5 + + + org.slf4j + slf4j-simple + 2.0.5 + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${maven.compiler.source} + ${maven.compiler.target} + + --add-opens + java.base/java.util=ALL-UNNAMED + --add-opens + java.base/java.lang=ALL-UNNAMED + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.4.1 + + + package + + shade + + + false + + + org.example.TemperatureAnomalyDetector + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + + + src/main/resources + true + + + + + + + + add-opens + + true + + + + + org.codehaus.mojo + exec-maven-plugin + 3.1.0 + + java + + --add-opens + java.base/java.util=ALL-UNNAMED + --add-opens + java.base/java.lang=ALL-UNNAMED + -classpath + + org.example.TemperatureAnomalyDetector + + + + + + + + diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/src/main/java/org/example/TemperatureAnomalyDetector.java b/Programming/PSD/zin3/third/temperature-anomaly-detector/src/main/java/org/example/TemperatureAnomalyDetector.java new file mode 100644 index 00000000..1f64774a --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-anomaly-detector/src/main/java/org/example/TemperatureAnomalyDetector.java @@ -0,0 +1,149 @@ +package org.example; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.Properties; + +public class TemperatureAnomalyDetector { + private static final Logger logger = LoggerFactory.getLogger(TemperatureAnomalyDetector.class); + private static final String INPUT_TOPIC = "Temperatura"; + private static final String OUTPUT_TOPIC = "Alarm"; + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + private static final ObjectMapper objectMapper = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + logger.info("Starting Temperature Anomaly Detector application"); + + // Create and configure the Flink execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Set parallelism to 1 for simplified processing + env.setParallelism(1); + + logger.info("Configuring Kafka source for topic: {}", INPUT_TOPIC); + + // Configure Kafka source + KafkaSource source = KafkaSource.builder() + .setBootstrapServers(BOOTSTRAP_SERVERS) + .setTopics(INPUT_TOPIC) + .setGroupId("temperature-anomaly-detector") + .setStartingOffsets(OffsetsInitializer.earliest()) + .setValueOnlyDeserializer(new SimpleStringSchema()) + .build(); + + logger.info("Configuring Kafka sink for topic: {}", OUTPUT_TOPIC); + + // Configure Kafka sink + Properties producerProps = new Properties(); + producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + + KafkaSink sink = KafkaSink.builder() + .setBootstrapServers(BOOTSTRAP_SERVERS) + .setRecordSerializer(KafkaRecordSerializationSchema.builder() + .setTopic(OUTPUT_TOPIC) + .setValueSerializationSchema(new SimpleStringSchema()) + .build()) + .setKafkaProducerConfig(producerProps) + .build(); + + logger.info("Building temperature processing pipeline"); + + // Parse temperature readings from Kafka + DataStream temperatureStream = env + .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source") + .map(new MapFunction() { + @Override + public TemperatureReading map(String jsonString) throws Exception { + logger.debug("Received temperature reading: {}", jsonString); + return objectMapper.readValue(jsonString, TemperatureReading.class); + } + }); + + // Detect anomalies (temperatures below zero) + DataStream alarmStream = temperatureStream + .filter(new FilterFunction() { + @Override + public boolean filter(TemperatureReading reading) throws Exception { + boolean isAnomaly = reading.temperature < 0.0; + if (isAnomaly) { + logger.info("Anomaly detected: {}°C from {}", + reading.temperature, reading.thermometerId); + } + return isAnomaly; + } + }) + // Group by thermometer ID + .keyBy((KeySelector) reading -> reading.thermometerId) + // Apply a time window to aggregate anomalies + .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) + .apply(new TemperatureWindowFunction()) + .map(new MapFunction() { + @Override + public String map(TemperatureAlert alert) throws Exception { + ObjectNode alertJson = objectMapper.createObjectNode(); + alertJson.put("thermometerId", alert.thermometerId); + alertJson.put("temperature", alert.temperature); + alertJson.put("alertTime", alert.alertTime); + alertJson.put("message", alert.message); + String json = objectMapper.writeValueAsString(alertJson); + logger.info("Producing alert: {}", json); + return json; + } + }); + + // Send alerts to Kafka + alarmStream.sinkTo(sink); + + logger.info("Executing Temperature Anomaly Detector job"); + + // Execute the Flink job + try { + env.execute("Temperature Anomaly Detector"); + } catch (Exception e) { + logger.error("Error executing Flink job", e); + throw e; + } + } + + // POJO classes + public static class TemperatureReading { + public String thermometerId; + public String timestamp; + public double temperature; + + public TemperatureReading() {} + } + + public static class TemperatureAlert { + public String thermometerId; + public double temperature; + public String alertTime; + public String message; + + public TemperatureAlert(String thermometerId, double temperature, String message) { + this.thermometerId = thermometerId; + this.temperature = temperature; + this.alertTime = Instant.now().toString(); + this.message = message; + } + } +} diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/src/main/java/org/example/TemperatureWindowFunction.java b/Programming/PSD/zin3/third/temperature-anomaly-detector/src/main/java/org/example/TemperatureWindowFunction.java new file mode 100644 index 00000000..06570ee2 --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-anomaly-detector/src/main/java/org/example/TemperatureWindowFunction.java @@ -0,0 +1,50 @@ +package org.example; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; +import org.example.TemperatureAnomalyDetector.TemperatureReading; +import org.example.TemperatureAnomalyDetector.TemperatureAlert; + +import java.util.Iterator; + +public class TemperatureWindowFunction implements WindowFunction { + + @Override + public void apply( + String key, + TimeWindow window, + Iterable values, + Collector out) { + + Iterator iterator = values.iterator(); + if (!iterator.hasNext()) { + return; // No readings in this window + } + + // Find the lowest temperature in the window + TemperatureReading lowestReading = iterator.next(); + double lowestTemp = lowestReading.temperature; + + while (iterator.hasNext()) { + TemperatureReading reading = iterator.next(); + if (reading.temperature < lowestTemp) { + lowestTemp = reading.temperature; + lowestReading = reading; + } + } + + // Create and emit an alert for the lowest temperature reading + String message = String.format("Freezing temperature alert! %s reported %.2f°C", + lowestReading.thermometerId, lowestTemp); + + TemperatureAlert alert = new TemperatureAlert( + lowestReading.thermometerId, + lowestTemp, + message + ); + + out.collect(alert); + } +} diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$1.class b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$1.class new file mode 100644 index 00000000..5d2532f8 Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$1.class differ diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$2.class b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$2.class new file mode 100644 index 00000000..f3716549 Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$2.class differ diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$3.class b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$3.class new file mode 100644 index 00000000..e6d21ed9 Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$3.class differ diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$TemperatureAlert.class b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$TemperatureAlert.class new file mode 100644 index 00000000..f6df69b7 Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$TemperatureAlert.class differ diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$TemperatureReading.class b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$TemperatureReading.class new file mode 100644 index 00000000..9b4c9072 Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector$TemperatureReading.class differ diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector.class b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector.class new file mode 100644 index 00000000..c7d8f6e9 Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureAnomalyDetector.class differ diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureWindowFunction.class b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureWindowFunction.class new file mode 100644 index 00000000..913c0af6 Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/classes/org/example/TemperatureWindowFunction.class differ diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/target/maven-archiver/pom.properties b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/maven-archiver/pom.properties new file mode 100644 index 00000000..e4bd7466 --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/maven-archiver/pom.properties @@ -0,0 +1,3 @@ +artifactId=temperature-anomaly-detector +groupId=org.example +version=1.0-SNAPSHOT diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst new file mode 100644 index 00000000..56447dbd --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst @@ -0,0 +1,7 @@ +org/example/TemperatureAnomalyDetector.class +org/example/TemperatureAnomalyDetector$2.class +org/example/TemperatureWindowFunction.class +org/example/TemperatureAnomalyDetector$TemperatureReading.class +org/example/TemperatureAnomalyDetector$TemperatureAlert.class +org/example/TemperatureAnomalyDetector$1.class +org/example/TemperatureAnomalyDetector$3.class diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst new file mode 100644 index 00000000..f905c6f5 --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst @@ -0,0 +1,2 @@ +/home/kuhy/WUT_Computer_Science/Programming/PSD/zin3/third/temperature-anomaly-detector/src/main/java/org/example/TemperatureAnomalyDetector.java +/home/kuhy/WUT_Computer_Science/Programming/PSD/zin3/third/temperature-anomaly-detector/src/main/java/org/example/TemperatureWindowFunction.java diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/target/original-temperature-anomaly-detector-1.0-SNAPSHOT.jar b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/original-temperature-anomaly-detector-1.0-SNAPSHOT.jar new file mode 100644 index 00000000..b9bc7773 Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/original-temperature-anomaly-detector-1.0-SNAPSHOT.jar differ diff --git a/Programming/PSD/zin3/third/temperature-anomaly-detector/target/temperature-anomaly-detector-1.0-SNAPSHOT.jar b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/temperature-anomaly-detector-1.0-SNAPSHOT.jar new file mode 100644 index 00000000..7f6c5eaf Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-anomaly-detector/target/temperature-anomaly-detector-1.0-SNAPSHOT.jar differ diff --git a/Programming/PSD/zin3/third/temperature-generator/dependency-reduced-pom.xml b/Programming/PSD/zin3/third/temperature-generator/dependency-reduced-pom.xml new file mode 100644 index 00000000..ff595e63 --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-generator/dependency-reduced-pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + org.example + temperature-generator + 1.0-SNAPSHOT + + + + maven-shade-plugin + 3.4.1 + + + package + + shade + + + + + org.example.TemperatureGenerator + + + + + + + + + + 11 + 2.14.2 + 11 + 3.4.0 + + diff --git a/Programming/PSD/zin3/third/temperature-generator/pom.xml b/Programming/PSD/zin3/third/temperature-generator/pom.xml new file mode 100644 index 00000000..2728dfde --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-generator/pom.xml @@ -0,0 +1,68 @@ + + + 4.0.0 + + org.example + temperature-generator + 1.0-SNAPSHOT + + + 11 + 11 + 3.4.0 + 2.14.2 + + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + + org.slf4j + slf4j-api + 2.0.5 + + + org.slf4j + slf4j-simple + 2.0.5 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.4.1 + + + package + + shade + + + + + org.example.TemperatureGenerator + + + + + + + + + diff --git a/Programming/PSD/zin3/third/temperature-generator/src/main/java/org/example/TemperatureGenerator.java b/Programming/PSD/zin3/third/temperature-generator/src/main/java/org/example/TemperatureGenerator.java new file mode 100644 index 00000000..6a2fb335 --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-generator/src/main/java/org/example/TemperatureGenerator.java @@ -0,0 +1,85 @@ +package org.example; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +public class TemperatureGenerator { + private static final Logger logger = LoggerFactory.getLogger(TemperatureGenerator.class); + private static final String TOPIC = "Temperatura"; + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + private static final int NUM_THERMOMETERS = 5; + private static final Random random = new Random(); + private static final ObjectMapper objectMapper = new ObjectMapper(); + + public static void main(String[] args) { + // Configure Kafka producer + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + try (KafkaProducer producer = new KafkaProducer<>(props)) { + logger.info("Starting temperature data generation..."); + + while (true) { + for (int i = 1; i <= NUM_THERMOMETERS; i++) { + // Generate a temperature (sometimes below zero) + double temperature = (random.nextDouble() * 40) - 10; // Range from -10°C to 30°C + + TemperatureReading reading = new TemperatureReading( + "Thermometer-" + i, + Instant.now().toString(), + temperature + ); + + String jsonReading = objectMapper.writeValueAsString(reading); + ProducerRecord record = new ProducerRecord<>( + TOPIC, + reading.thermometerId, + jsonReading + ); + + producer.send(record, (metadata, exception) -> { + if (exception != null) { + logger.error("Error sending temperature data", exception); + } else { + logger.info("Sent temperature reading: {} | {} | {:.2f}°C", + reading.thermometerId, + reading.timestamp, + reading.temperature); + } + }); + } + + // Sleep before generating next batch + TimeUnit.SECONDS.sleep(2); + } + } catch (Exception e) { + logger.error("Error in temperature generator", e); + } + } + + public static class TemperatureReading { + public String thermometerId; + public String timestamp; + public double temperature; + + public TemperatureReading() {} + + public TemperatureReading(String thermometerId, String timestamp, double temperature) { + this.thermometerId = thermometerId; + this.timestamp = timestamp; + this.temperature = temperature; + } + } +} diff --git a/Programming/PSD/zin3/third/temperature-generator/target/classes/org/example/TemperatureGenerator$TemperatureReading.class b/Programming/PSD/zin3/third/temperature-generator/target/classes/org/example/TemperatureGenerator$TemperatureReading.class new file mode 100644 index 00000000..9000cd72 Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-generator/target/classes/org/example/TemperatureGenerator$TemperatureReading.class differ diff --git a/Programming/PSD/zin3/third/temperature-generator/target/classes/org/example/TemperatureGenerator.class b/Programming/PSD/zin3/third/temperature-generator/target/classes/org/example/TemperatureGenerator.class new file mode 100644 index 00000000..fe9c2770 Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-generator/target/classes/org/example/TemperatureGenerator.class differ diff --git a/Programming/PSD/zin3/third/temperature-generator/target/maven-archiver/pom.properties b/Programming/PSD/zin3/third/temperature-generator/target/maven-archiver/pom.properties new file mode 100644 index 00000000..87c699dd --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-generator/target/maven-archiver/pom.properties @@ -0,0 +1,3 @@ +artifactId=temperature-generator +groupId=org.example +version=1.0-SNAPSHOT diff --git a/Programming/PSD/zin3/third/temperature-generator/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst b/Programming/PSD/zin3/third/temperature-generator/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst new file mode 100644 index 00000000..9379b3d8 --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-generator/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst @@ -0,0 +1,2 @@ +org/example/TemperatureGenerator.class +org/example/TemperatureGenerator$TemperatureReading.class diff --git a/Programming/PSD/zin3/third/temperature-generator/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst b/Programming/PSD/zin3/third/temperature-generator/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst new file mode 100644 index 00000000..aadb35a4 --- /dev/null +++ b/Programming/PSD/zin3/third/temperature-generator/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst @@ -0,0 +1 @@ +/home/kuhy/WUT_Computer_Science/Programming/PSD/zin3/third/temperature-generator/src/main/java/org/example/TemperatureGenerator.java diff --git a/Programming/PSD/zin3/third/temperature-generator/target/original-temperature-generator-1.0-SNAPSHOT.jar b/Programming/PSD/zin3/third/temperature-generator/target/original-temperature-generator-1.0-SNAPSHOT.jar new file mode 100644 index 00000000..e3bf7ab6 Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-generator/target/original-temperature-generator-1.0-SNAPSHOT.jar differ diff --git a/Programming/PSD/zin3/third/temperature-generator/target/temperature-generator-1.0-SNAPSHOT.jar b/Programming/PSD/zin3/third/temperature-generator/target/temperature-generator-1.0-SNAPSHOT.jar new file mode 100644 index 00000000..8c2ff61c Binary files /dev/null and b/Programming/PSD/zin3/third/temperature-generator/target/temperature-generator-1.0-SNAPSHOT.jar differ