mirror of
https://github.com/kuhyx/WUT_Computer_Science.git
synced 2026-07-04 14:43:08 +02:00
fix: run script
This commit is contained in:
parent
3d03d755f6
commit
fe9c99dac1
@ -4,8 +4,8 @@ import com.anomaly.model.TransactionAlert;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.kafka.clients.consumer.*;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
//import org.slf4j.Logger;
|
||||
//import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.swing.*;
|
||||
import javax.swing.table.DefaultTableModel;
|
||||
@ -18,7 +18,7 @@ import java.util.*;
|
||||
import java.util.List;
|
||||
|
||||
public class AlertVisualizer {
|
||||
private static final Logger logger = LoggerFactory.getLogger(AlertVisualizer.class);
|
||||
//private static final Logger logger = LoggerFactory.getLogger(AlertVisualizer.class);
|
||||
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
|
||||
private static final String GROUP_ID = "alert-visualizer-group";
|
||||
private static final String TOPIC = "alerts";
|
||||
@ -52,9 +52,9 @@ public class AlertVisualizer {
|
||||
|
||||
// Add shutdown hook
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
logger.info("Shutting down alert visualizer...");
|
||||
//logger.info("Shutting down alert visualizer...");
|
||||
consumer.close();
|
||||
logger.info("Alert visualizer closed");
|
||||
//logger.info("Alert visualizer closed");
|
||||
}));
|
||||
|
||||
// Poll for new alerts
|
||||
|
||||
17
alarm-visualizer/src/main/resources/logback.xml
Normal file
17
alarm-visualizer/src/main/resources/logback.xml
Normal file
@ -0,0 +1,17 @@
|
||||
<configuration>
|
||||
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- Set Kafka related loggers to ERROR level -->
|
||||
<logger name="org.apache.kafka" level="ERROR"/>
|
||||
<logger name="kafka" level="ERROR"/>
|
||||
<logger name="org.apache.zookeeper" level="ERROR"/>
|
||||
|
||||
<!-- Set root logger to WARN -->
|
||||
<root level="WARN">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</root>
|
||||
</configuration>
|
||||
20
anomaly-detector/src/main/resources/logback.xml
Normal file
20
anomaly-detector/src/main/resources/logback.xml
Normal file
@ -0,0 +1,20 @@
|
||||
<configuration>
|
||||
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- Set Kafka related loggers to ERROR level -->
|
||||
<logger name="org.apache.kafka" level="ERROR"/>
|
||||
<logger name="kafka" level="ERROR"/>
|
||||
|
||||
<!-- Set Flink related loggers to ERROR level -->
|
||||
<logger name="org.apache.flink" level="ERROR"/>
|
||||
<logger name="org.apache.zookeeper" level="ERROR"/>
|
||||
|
||||
<!-- Set root logger to WARN -->
|
||||
<root level="WARN">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</root>
|
||||
</configuration>
|
||||
@ -0,0 +1,39 @@
|
||||
version: '3'
|
||||
|
||||
services:
|
||||
zookeeper:
|
||||
image: confluentinc/cp-zookeeper:latest
|
||||
environment:
|
||||
ZOOKEEPER_CLIENT_PORT: 2181
|
||||
ports:
|
||||
- "2181:2181"
|
||||
|
||||
kafka:
|
||||
image: confluentinc/cp-kafka:latest
|
||||
depends_on:
|
||||
- zookeeper
|
||||
ports:
|
||||
- "9092:9092"
|
||||
environment:
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
||||
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
|
||||
jobmanager:
|
||||
image: flink:latest
|
||||
ports:
|
||||
- "8081:8081"
|
||||
command: jobmanager
|
||||
environment:
|
||||
- JOB_MANAGER_RPC_ADDRESS=jobmanager
|
||||
|
||||
taskmanager:
|
||||
image: flink:latest
|
||||
depends_on:
|
||||
- jobmanager
|
||||
command: taskmanager
|
||||
environment:
|
||||
- JOB_MANAGER_RPC_ADDRESS=jobmanager
|
||||
@ -4,8 +4,6 @@ import com.anomaly.model.Transaction;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.kafka.clients.consumer.*;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.swing.*;
|
||||
import java.awt.*;
|
||||
@ -14,7 +12,7 @@ import java.util.*;
|
||||
import java.util.List;
|
||||
|
||||
public class TransactionConsumer {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TransactionConsumer.class);
|
||||
//private static final Logger logger = LoggerFactory.getLogger(TransactionConsumer.class);
|
||||
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
|
||||
private static final String GROUP_ID = "transaction-consumer-group";
|
||||
private static final String TOPIC = "transactions";
|
||||
@ -46,9 +44,9 @@ public class TransactionConsumer {
|
||||
|
||||
// Add shutdown hook
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
logger.info("Shutting down consumer...");
|
||||
//logger.info("Shutting down consumer...");
|
||||
consumer.close();
|
||||
logger.info("Consumer closed");
|
||||
//logger.info("Consumer closed");
|
||||
}));
|
||||
|
||||
// Poll for new data
|
||||
|
||||
17
kafka-consumer-visualizer/src/main/resources/logback.xml
Normal file
17
kafka-consumer-visualizer/src/main/resources/logback.xml
Normal file
@ -0,0 +1,17 @@
|
||||
<configuration>
|
||||
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- Set Kafka related loggers to ERROR level -->
|
||||
<logger name="org.apache.kafka" level="ERROR"/>
|
||||
<logger name="kafka" level="ERROR"/>
|
||||
<logger name="org.apache.zookeeper" level="ERROR"/>
|
||||
|
||||
<!-- Set root logger to WARN -->
|
||||
<root level="WARN">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</root>
|
||||
</configuration>
|
||||
81
run_all.sh
Executable file
81
run_all.sh
Executable file
@ -0,0 +1,81 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Set working directory to script location
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
# Check if Docker daemon is running
|
||||
if ! docker info &>/dev/null; then
|
||||
echo "ERROR: Docker daemon is not running."
|
||||
echo "Please start Docker with: 'sudo systemctl start docker'"
|
||||
echo "If you want Docker to start automatically at boot: 'sudo systemctl enable docker'"
|
||||
echo "To run Docker without sudo, add your user to the docker group: 'sudo usermod -aG docker $USER'"
|
||||
echo "Then log out and log back in for the changes to take effect."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Note about docker-compose.yml
|
||||
echo "Note: Your docker-compose.yml contains an obsolete 'version' attribute that should be removed."
|
||||
|
||||
echo "Starting Docker containers..."
|
||||
docker-compose up -d
|
||||
|
||||
echo "Building Maven projects..."
|
||||
cd transaction-simulator && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
|
||||
cd anomaly-detector && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
|
||||
cd kafka-consumer-visualizer && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
|
||||
cd alarm-visualizer && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd ..
|
||||
|
||||
echo "Creating Kafka topics..."
|
||||
docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions
|
||||
docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic alerts
|
||||
|
||||
echo "Starting all applications..."
|
||||
|
||||
# Start Flink job (Anomaly Detector)
|
||||
echo "Starting Anomaly Detector..."
|
||||
cd anomaly-detector
|
||||
java -jar target/anomaly-detector-1.0-SNAPSHOT.jar &
|
||||
ANOMALY_PID=$!
|
||||
cd ..
|
||||
|
||||
# Start Alert Visualizer
|
||||
echo "Starting Alert Visualizer..."
|
||||
cd alarm-visualizer
|
||||
java -jar target/alarm-visualizer-1.0-SNAPSHOT.jar &
|
||||
ALARM_PID=$!
|
||||
cd ..
|
||||
|
||||
# Start Transaction Consumer/Visualizer
|
||||
echo "Starting Transaction Consumer..."
|
||||
cd kafka-consumer-visualizer
|
||||
java -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar &
|
||||
CONSUMER_PID=$!
|
||||
cd ..
|
||||
|
||||
# Start Transaction Producer last
|
||||
echo "Starting Transaction Producer..."
|
||||
cd transaction-simulator
|
||||
java -jar target/transaction-simulator-1.0-SNAPSHOT.jar &
|
||||
PRODUCER_PID=$!
|
||||
cd ..
|
||||
|
||||
echo "All applications are running!"
|
||||
echo "Press Ctrl+C to stop all applications"
|
||||
|
||||
# Function to handle shutdown
|
||||
function cleanup {
|
||||
echo "Shutting down applications..."
|
||||
kill $PRODUCER_PID $CONSUMER_PID $ALARM_PID $ANOMALY_PID
|
||||
echo "Stopping Docker containers..."
|
||||
docker-compose down
|
||||
echo "All done!"
|
||||
exit 0
|
||||
}
|
||||
|
||||
# Catch shutdown signal
|
||||
trap cleanup SIGINT SIGTERM
|
||||
|
||||
# Keep script running
|
||||
while true; do
|
||||
sleep 1
|
||||
done
|
||||
@ -1,233 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Colors for output
|
||||
GREEN='\033[0;32m'
|
||||
BLUE='\033[0;34m'
|
||||
RED='\033[0;31m'
|
||||
YELLOW='\033[0;33m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
PROJECT_ROOT=$(pwd)
|
||||
TOPICS=("transactions" "alerts")
|
||||
|
||||
function check_prerequisites {
|
||||
echo -e "${BLUE}Checking prerequisites...${NC}"
|
||||
|
||||
# Check for Java
|
||||
if ! command -v java &> /dev/null; then
|
||||
echo -e "${RED}Java is not installed. Please install JDK 11 or higher.${NC}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Check for Maven
|
||||
if ! command -v mvn &> /dev/null; then
|
||||
echo -e "${RED}Maven is not installed. Please install Maven.${NC}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Check for Docker
|
||||
if ! command -v docker &> /dev/null; then
|
||||
echo -e "${RED}Docker is not installed. Please install Docker.${NC}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Check for docker-compose
|
||||
if ! command -v docker-compose &> /dev/null; then
|
||||
echo -e "${RED}Docker Compose is not installed. Please install Docker Compose.${NC}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo -e "${GREEN}All prerequisites are met.${NC}"
|
||||
}
|
||||
|
||||
function build_projects {
|
||||
echo -e "${BLUE}Building all projects...${NC}"
|
||||
|
||||
# Build each project
|
||||
for project in "transaction-simulator" "kafka-consumer-visualizer" "anomaly-detector" "alarm-visualizer"; do
|
||||
echo -e "${YELLOW}Building $project...${NC}"
|
||||
cd "$PROJECT_ROOT/$project"
|
||||
mvn clean package -DskipTests
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
echo -e "${RED}Failed to build $project.${NC}"
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
cd "$PROJECT_ROOT"
|
||||
echo -e "${GREEN}All projects built successfully.${NC}"
|
||||
}
|
||||
|
||||
function start_infrastructure {
|
||||
echo -e "${BLUE}Starting Kafka and Flink containers...${NC}"
|
||||
|
||||
docker-compose up -d
|
||||
|
||||
# Wait for Kafka to be ready
|
||||
echo -e "${YELLOW}Waiting for Kafka to be ready...${NC}"
|
||||
sleep 10
|
||||
|
||||
# Create Kafka topics
|
||||
for topic in "${TOPICS[@]}"; do
|
||||
echo -e "${YELLOW}Creating Kafka topic: $topic${NC}"
|
||||
docker-compose exec kafka kafka-topics --create \
|
||||
--topic "$topic" \
|
||||
--bootstrap-server localhost:9092 \
|
||||
--partitions 3 \
|
||||
--replication-factor 1 \
|
||||
--if-not-exists
|
||||
done
|
||||
|
||||
echo -e "${GREEN}Infrastructure is up and running.${NC}"
|
||||
}
|
||||
|
||||
function start_applications {
|
||||
echo -e "${BLUE}Starting applications...${NC}"
|
||||
|
||||
# Start anomaly detector (Flink app)
|
||||
echo -e "${YELLOW}Starting Anomaly Detector (Flink App)...${NC}"
|
||||
cd "$PROJECT_ROOT/anomaly-detector"
|
||||
java -jar target/anomaly-detector-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/anomaly-detector.log" 2>&1 &
|
||||
ANOMALY_DETECTOR_PID=$!
|
||||
echo $ANOMALY_DETECTOR_PID > "$PROJECT_ROOT/logs/anomaly-detector.pid"
|
||||
|
||||
sleep 5
|
||||
|
||||
# Start alarm visualizer
|
||||
echo -e "${YELLOW}Starting Alarm Visualizer...${NC}"
|
||||
cd "$PROJECT_ROOT/alarm-visualizer"
|
||||
java -jar target/alarm-visualizer-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/alarm-visualizer.log" 2>&1 &
|
||||
ALARM_VISUALIZER_PID=$!
|
||||
echo $ALARM_VISUALIZER_PID > "$PROJECT_ROOT/logs/alarm-visualizer.pid"
|
||||
|
||||
# Start test consumer/visualizer
|
||||
echo -e "${YELLOW}Starting Test Consumer Visualizer...${NC}"
|
||||
cd "$PROJECT_ROOT/kafka-consumer-visualizer"
|
||||
java -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/consumer-visualizer.log" 2>&1 &
|
||||
CONSUMER_PID=$!
|
||||
echo $CONSUMER_PID > "$PROJECT_ROOT/logs/consumer-visualizer.pid"
|
||||
|
||||
sleep 5
|
||||
|
||||
# Start transaction simulator
|
||||
echo -e "${YELLOW}Starting Transaction Simulator...${NC}"
|
||||
cd "$PROJECT_ROOT/transaction-simulator"
|
||||
java -jar target/transaction-simulator-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/transaction-simulator.log" 2>&1 &
|
||||
SIMULATOR_PID=$!
|
||||
echo $SIMULATOR_PID > "$PROJECT_ROOT/logs/transaction-simulator.pid"
|
||||
|
||||
cd "$PROJECT_ROOT"
|
||||
echo -e "${GREEN}All applications are running.${NC}"
|
||||
echo -e "${GREEN}Log files are available in the logs directory.${NC}"
|
||||
}
|
||||
|
||||
function stop_applications {
|
||||
echo -e "${BLUE}Stopping applications...${NC}"
|
||||
|
||||
# Stop all Java applications
|
||||
if [ -f "$PROJECT_ROOT/logs/transaction-simulator.pid" ]; then
|
||||
kill $(cat "$PROJECT_ROOT/logs/transaction-simulator.pid") 2>/dev/null
|
||||
rm "$PROJECT_ROOT/logs/transaction-simulator.pid"
|
||||
fi
|
||||
|
||||
if [ -f "$PROJECT_ROOT/logs/consumer-visualizer.pid" ]; then
|
||||
kill $(cat "$PROJECT_ROOT/logs/consumer-visualizer.pid") 2>/dev/null
|
||||
rm "$PROJECT_ROOT/logs/consumer-visualizer.pid"
|
||||
fi
|
||||
|
||||
if [ -f "$PROJECT_ROOT/logs/anomaly-detector.pid" ]; then
|
||||
kill $(cat "$PROJECT_ROOT/logs/anomaly-detector.pid") 2>/dev/null
|
||||
rm "$PROJECT_ROOT/logs/anomaly-detector.pid"
|
||||
fi
|
||||
|
||||
if [ -f "$PROJECT_ROOT/logs/alarm-visualizer.pid" ]; then
|
||||
kill $(cat "$PROJECT_ROOT/logs/alarm-visualizer.pid") 2>/dev/null
|
||||
rm "$PROJECT_ROOT/logs/alarm-visualizer.pid"
|
||||
fi
|
||||
|
||||
echo -e "${GREEN}All applications stopped.${NC}"
|
||||
}
|
||||
|
||||
function stop_infrastructure {
|
||||
echo -e "${BLUE}Stopping infrastructure...${NC}"
|
||||
|
||||
docker-compose down
|
||||
|
||||
echo -e "${GREEN}Infrastructure stopped.${NC}"
|
||||
}
|
||||
|
||||
function show_logs {
|
||||
echo -e "${BLUE}Available logs:${NC}"
|
||||
ls -l "$PROJECT_ROOT/logs"
|
||||
|
||||
echo -e "${YELLOW}Use 'tail -f logs/[filename]' to view a specific log.${NC}"
|
||||
}
|
||||
|
||||
function print_usage {
|
||||
echo -e "${BLUE}Credit Card Transaction Anomaly Detection System${NC}"
|
||||
echo -e "Usage: $0 [options]"
|
||||
echo -e "Options:"
|
||||
echo -e " ${GREEN}start${NC} Build and start the entire system"
|
||||
echo -e " ${GREEN}stop${NC} Stop all components"
|
||||
echo -e " ${GREEN}restart${NC} Restart the entire system"
|
||||
echo -e " ${GREEN}status${NC} Check if components are running"
|
||||
echo -e " ${GREEN}logs${NC} Show log files"
|
||||
}
|
||||
|
||||
function check_status {
|
||||
echo -e "${BLUE}Checking system status...${NC}"
|
||||
|
||||
# Check Docker containers
|
||||
echo -e "${YELLOW}Docker containers:${NC}"
|
||||
docker-compose ps
|
||||
|
||||
# Check Java processes
|
||||
echo -e "\n${YELLOW}Java applications:${NC}"
|
||||
for app in "transaction-simulator" "consumer-visualizer" "anomaly-detector" "alarm-visualizer"; do
|
||||
if [ -f "$PROJECT_ROOT/logs/$app.pid" ]; then
|
||||
pid=$(cat "$PROJECT_ROOT/logs/$app.pid")
|
||||
if ps -p $pid > /dev/null; then
|
||||
echo -e "${GREEN}$app is running (PID: $pid)${NC}"
|
||||
else
|
||||
echo -e "${RED}$app is not running (stale PID file)${NC}"
|
||||
fi
|
||||
else
|
||||
echo -e "${RED}$app is not running${NC}"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
# Create logs directory
|
||||
mkdir -p "$PROJECT_ROOT/logs"
|
||||
|
||||
# Parse command-line arguments
|
||||
case "$1" in
|
||||
start)
|
||||
check_prerequisites
|
||||
build_projects
|
||||
start_infrastructure
|
||||
start_applications
|
||||
;;
|
||||
stop)
|
||||
stop_applications
|
||||
stop_infrastructure
|
||||
;;
|
||||
restart)
|
||||
stop_applications
|
||||
stop_infrastructure
|
||||
sleep 5
|
||||
start_infrastructure
|
||||
sleep 5
|
||||
start_applications
|
||||
;;
|
||||
status)
|
||||
check_status
|
||||
;;
|
||||
logs)
|
||||
show_logs
|
||||
;;
|
||||
*)
|
||||
print_usage
|
||||
;;
|
||||
esac
|
||||
12
stop_all.sh
Executable file
12
stop_all.sh
Executable file
@ -0,0 +1,12 @@
|
||||
#!/bin/bash
|
||||
|
||||
echo "Stopping all Java applications..."
|
||||
pkill -f "java -jar target/transaction-simulator"
|
||||
pkill -f "java -jar target/anomaly-detector"
|
||||
pkill -f "java -jar target/kafka-consumer-visualizer"
|
||||
pkill -f "java -jar target/alarm-visualizer"
|
||||
|
||||
echo "Stopping Docker containers..."
|
||||
docker-compose down
|
||||
|
||||
echo "All applications have been stopped!"
|
||||
@ -5,15 +5,13 @@ import com.anomaly.model.Transaction;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.kafka.clients.producer.*;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
public class TransactionProducer {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TransactionProducer.class);
|
||||
//private static final Logger logger = LoggerFactory.getLogger(TransactionProducer.class);
|
||||
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
|
||||
private static final String TOPIC_NAME = "transactions";
|
||||
private static final TransactionGenerator generator = new TransactionGenerator();
|
||||
@ -31,10 +29,10 @@ public class TransactionProducer {
|
||||
|
||||
// Add shutdown hook
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
logger.info("Shutting down producer...");
|
||||
//logger.info("Shutting down producer...");
|
||||
producer.flush();
|
||||
producer.close();
|
||||
logger.info("Producer closed");
|
||||
//logger.info("Producer closed");
|
||||
}));
|
||||
|
||||
// Generate and send transactions
|
||||
@ -58,7 +56,7 @@ public class TransactionProducer {
|
||||
Thread.sleep(ThreadLocalRandom.current().nextLong(100, 1000));
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
logger.error("Error in transaction producer", e);
|
||||
//logger.error("Error in transaction producer", e);
|
||||
} finally {
|
||||
producer.flush();
|
||||
producer.close();
|
||||
@ -76,10 +74,10 @@ public class TransactionProducer {
|
||||
|
||||
producer.send(record, (metadata, exception) -> {
|
||||
if (exception == null) {
|
||||
logger.info("Received metadata: Topic: {}, Partition: {}, Offset: {}, Timestamp: {}",
|
||||
metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp());
|
||||
//logger.info("Received metadata: Topic: {}, Partition: {}, Offset: {}, Timestamp: {}",
|
||||
// metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp());
|
||||
} else {
|
||||
logger.error("Error sending message", exception);
|
||||
//logger.error("Error sending message", exception);
|
||||
}
|
||||
}).get(); // Making it synchronous for demonstration
|
||||
}
|
||||
|
||||
16
transaction-simulator/src/main/resources/logback.xml
Normal file
16
transaction-simulator/src/main/resources/logback.xml
Normal file
@ -0,0 +1,16 @@
|
||||
<configuration>
|
||||
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- Set Kafka related loggers to ERROR level -->
|
||||
<logger name="org.apache.kafka" level="ERROR"/>
|
||||
<logger name="kafka" level="ERROR"/>
|
||||
|
||||
<!-- Set root logger to WARN -->
|
||||
<root level="WARN">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</root>
|
||||
</configuration>
|
||||
Loading…
Reference in New Issue
Block a user