diff --git a/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java index 4c058171..29c436b1 100644 --- a/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java +++ b/alarm-visualizer/src/main/java/com/anomaly/visualizer/AlertVisualizer.java @@ -4,8 +4,8 @@ import com.anomaly.model.TransactionAlert; import com.google.gson.Gson; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; import javax.swing.*; import javax.swing.table.DefaultTableModel; @@ -18,7 +18,7 @@ import java.util.*; import java.util.List; public class AlertVisualizer { - private static final Logger logger = LoggerFactory.getLogger(AlertVisualizer.class); + //private static final Logger logger = LoggerFactory.getLogger(AlertVisualizer.class); private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String GROUP_ID = "alert-visualizer-group"; private static final String TOPIC = "alerts"; @@ -52,9 +52,9 @@ public class AlertVisualizer { // Add shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("Shutting down alert visualizer..."); + //logger.info("Shutting down alert visualizer..."); consumer.close(); - logger.info("Alert visualizer closed"); + //logger.info("Alert visualizer closed"); })); // Poll for new alerts diff --git a/alarm-visualizer/src/main/resources/logback.xml b/alarm-visualizer/src/main/resources/logback.xml new file mode 100644 index 00000000..573bb1db --- /dev/null +++ b/alarm-visualizer/src/main/resources/logback.xml @@ -0,0 +1,17 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + diff --git a/anomaly-detector/src/main/resources/logback.xml b/anomaly-detector/src/main/resources/logback.xml new file mode 100644 index 00000000..85b380d9 --- /dev/null +++ b/anomaly-detector/src/main/resources/logback.xml @@ -0,0 +1,20 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + + diff --git a/docker-compose.yml b/docker-compose.yml index e69de29b..3a3dce99 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -0,0 +1,39 @@ +version: '3' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ports: + - "2181:2181" + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + + jobmanager: + image: flink:latest + ports: + - "8081:8081" + command: jobmanager + environment: + - JOB_MANAGER_RPC_ADDRESS=jobmanager + + taskmanager: + image: flink:latest + depends_on: + - jobmanager + command: taskmanager + environment: + - JOB_MANAGER_RPC_ADDRESS=jobmanager \ No newline at end of file diff --git a/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java b/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java index 493d9ead..9e0e46fc 100644 --- a/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java +++ b/kafka-consumer-visualizer/src/main/java/com/anomaly/consumer/TransactionConsumer.java @@ -4,8 +4,6 @@ import com.anomaly.model.Transaction; import com.google.gson.Gson; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.swing.*; import java.awt.*; @@ -14,7 +12,7 @@ import java.util.*; import java.util.List; public class TransactionConsumer { - private static final Logger logger = LoggerFactory.getLogger(TransactionConsumer.class); + //private static final Logger logger = LoggerFactory.getLogger(TransactionConsumer.class); private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String GROUP_ID = "transaction-consumer-group"; private static final String TOPIC = "transactions"; @@ -46,9 +44,9 @@ public class TransactionConsumer { // Add shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("Shutting down consumer..."); + //logger.info("Shutting down consumer..."); consumer.close(); - logger.info("Consumer closed"); + //logger.info("Consumer closed"); })); // Poll for new data diff --git a/kafka-consumer-visualizer/src/main/resources/logback.xml b/kafka-consumer-visualizer/src/main/resources/logback.xml new file mode 100644 index 00000000..573bb1db --- /dev/null +++ b/kafka-consumer-visualizer/src/main/resources/logback.xml @@ -0,0 +1,17 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + diff --git a/run_all.sh b/run_all.sh new file mode 100755 index 00000000..0ce2570b --- /dev/null +++ b/run_all.sh @@ -0,0 +1,81 @@ +#!/bin/bash + +# Set working directory to script location +cd "$(dirname "$0")" + +# Check if Docker daemon is running +if ! docker info &>/dev/null; then + echo "ERROR: Docker daemon is not running." + echo "Please start Docker with: 'sudo systemctl start docker'" + echo "If you want Docker to start automatically at boot: 'sudo systemctl enable docker'" + echo "To run Docker without sudo, add your user to the docker group: 'sudo usermod -aG docker $USER'" + echo "Then log out and log back in for the changes to take effect." + exit 1 +fi + +# Note about docker-compose.yml +echo "Note: Your docker-compose.yml contains an obsolete 'version' attribute that should be removed." + +echo "Starting Docker containers..." +docker-compose up -d + +echo "Building Maven projects..." +cd transaction-simulator && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd .. +cd anomaly-detector && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd .. +cd kafka-consumer-visualizer && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd .. +cd alarm-visualizer && mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean package && cd .. + +echo "Creating Kafka topics..." +docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions +docker exec psd_project-kafka-1 kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic alerts + +echo "Starting all applications..." + +# Start Flink job (Anomaly Detector) +echo "Starting Anomaly Detector..." +cd anomaly-detector +java -jar target/anomaly-detector-1.0-SNAPSHOT.jar & +ANOMALY_PID=$! +cd .. + +# Start Alert Visualizer +echo "Starting Alert Visualizer..." +cd alarm-visualizer +java -jar target/alarm-visualizer-1.0-SNAPSHOT.jar & +ALARM_PID=$! +cd .. + +# Start Transaction Consumer/Visualizer +echo "Starting Transaction Consumer..." +cd kafka-consumer-visualizer +java -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar & +CONSUMER_PID=$! +cd .. + +# Start Transaction Producer last +echo "Starting Transaction Producer..." +cd transaction-simulator +java -jar target/transaction-simulator-1.0-SNAPSHOT.jar & +PRODUCER_PID=$! +cd .. + +echo "All applications are running!" +echo "Press Ctrl+C to stop all applications" + +# Function to handle shutdown +function cleanup { + echo "Shutting down applications..." + kill $PRODUCER_PID $CONSUMER_PID $ALARM_PID $ANOMALY_PID + echo "Stopping Docker containers..." + docker-compose down + echo "All done!" + exit 0 +} + +# Catch shutdown signal +trap cleanup SIGINT SIGTERM + +# Keep script running +while true; do + sleep 1 +done diff --git a/run_anomaly_detection.sh b/run_anomaly_detection.sh deleted file mode 100755 index 1c25c8ae..00000000 --- a/run_anomaly_detection.sh +++ /dev/null @@ -1,233 +0,0 @@ -#!/bin/bash - -# Colors for output -GREEN='\033[0;32m' -BLUE='\033[0;34m' -RED='\033[0;31m' -YELLOW='\033[0;33m' -NC='\033[0m' # No Color - -PROJECT_ROOT=$(pwd) -TOPICS=("transactions" "alerts") - -function check_prerequisites { - echo -e "${BLUE}Checking prerequisites...${NC}" - - # Check for Java - if ! command -v java &> /dev/null; then - echo -e "${RED}Java is not installed. Please install JDK 11 or higher.${NC}" - exit 1 - fi - - # Check for Maven - if ! command -v mvn &> /dev/null; then - echo -e "${RED}Maven is not installed. Please install Maven.${NC}" - exit 1 - fi - - # Check for Docker - if ! command -v docker &> /dev/null; then - echo -e "${RED}Docker is not installed. Please install Docker.${NC}" - exit 1 - fi - - # Check for docker-compose - if ! command -v docker-compose &> /dev/null; then - echo -e "${RED}Docker Compose is not installed. Please install Docker Compose.${NC}" - exit 1 - fi - - echo -e "${GREEN}All prerequisites are met.${NC}" -} - -function build_projects { - echo -e "${BLUE}Building all projects...${NC}" - - # Build each project - for project in "transaction-simulator" "kafka-consumer-visualizer" "anomaly-detector" "alarm-visualizer"; do - echo -e "${YELLOW}Building $project...${NC}" - cd "$PROJECT_ROOT/$project" - mvn clean package -DskipTests - - if [ $? -ne 0 ]; then - echo -e "${RED}Failed to build $project.${NC}" - exit 1 - fi - done - - cd "$PROJECT_ROOT" - echo -e "${GREEN}All projects built successfully.${NC}" -} - -function start_infrastructure { - echo -e "${BLUE}Starting Kafka and Flink containers...${NC}" - - docker-compose up -d - - # Wait for Kafka to be ready - echo -e "${YELLOW}Waiting for Kafka to be ready...${NC}" - sleep 10 - - # Create Kafka topics - for topic in "${TOPICS[@]}"; do - echo -e "${YELLOW}Creating Kafka topic: $topic${NC}" - docker-compose exec kafka kafka-topics --create \ - --topic "$topic" \ - --bootstrap-server localhost:9092 \ - --partitions 3 \ - --replication-factor 1 \ - --if-not-exists - done - - echo -e "${GREEN}Infrastructure is up and running.${NC}" -} - -function start_applications { - echo -e "${BLUE}Starting applications...${NC}" - - # Start anomaly detector (Flink app) - echo -e "${YELLOW}Starting Anomaly Detector (Flink App)...${NC}" - cd "$PROJECT_ROOT/anomaly-detector" - java -jar target/anomaly-detector-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/anomaly-detector.log" 2>&1 & - ANOMALY_DETECTOR_PID=$! - echo $ANOMALY_DETECTOR_PID > "$PROJECT_ROOT/logs/anomaly-detector.pid" - - sleep 5 - - # Start alarm visualizer - echo -e "${YELLOW}Starting Alarm Visualizer...${NC}" - cd "$PROJECT_ROOT/alarm-visualizer" - java -jar target/alarm-visualizer-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/alarm-visualizer.log" 2>&1 & - ALARM_VISUALIZER_PID=$! - echo $ALARM_VISUALIZER_PID > "$PROJECT_ROOT/logs/alarm-visualizer.pid" - - # Start test consumer/visualizer - echo -e "${YELLOW}Starting Test Consumer Visualizer...${NC}" - cd "$PROJECT_ROOT/kafka-consumer-visualizer" - java -jar target/kafka-consumer-visualizer-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/consumer-visualizer.log" 2>&1 & - CONSUMER_PID=$! - echo $CONSUMER_PID > "$PROJECT_ROOT/logs/consumer-visualizer.pid" - - sleep 5 - - # Start transaction simulator - echo -e "${YELLOW}Starting Transaction Simulator...${NC}" - cd "$PROJECT_ROOT/transaction-simulator" - java -jar target/transaction-simulator-1.0-SNAPSHOT.jar > "$PROJECT_ROOT/logs/transaction-simulator.log" 2>&1 & - SIMULATOR_PID=$! - echo $SIMULATOR_PID > "$PROJECT_ROOT/logs/transaction-simulator.pid" - - cd "$PROJECT_ROOT" - echo -e "${GREEN}All applications are running.${NC}" - echo -e "${GREEN}Log files are available in the logs directory.${NC}" -} - -function stop_applications { - echo -e "${BLUE}Stopping applications...${NC}" - - # Stop all Java applications - if [ -f "$PROJECT_ROOT/logs/transaction-simulator.pid" ]; then - kill $(cat "$PROJECT_ROOT/logs/transaction-simulator.pid") 2>/dev/null - rm "$PROJECT_ROOT/logs/transaction-simulator.pid" - fi - - if [ -f "$PROJECT_ROOT/logs/consumer-visualizer.pid" ]; then - kill $(cat "$PROJECT_ROOT/logs/consumer-visualizer.pid") 2>/dev/null - rm "$PROJECT_ROOT/logs/consumer-visualizer.pid" - fi - - if [ -f "$PROJECT_ROOT/logs/anomaly-detector.pid" ]; then - kill $(cat "$PROJECT_ROOT/logs/anomaly-detector.pid") 2>/dev/null - rm "$PROJECT_ROOT/logs/anomaly-detector.pid" - fi - - if [ -f "$PROJECT_ROOT/logs/alarm-visualizer.pid" ]; then - kill $(cat "$PROJECT_ROOT/logs/alarm-visualizer.pid") 2>/dev/null - rm "$PROJECT_ROOT/logs/alarm-visualizer.pid" - fi - - echo -e "${GREEN}All applications stopped.${NC}" -} - -function stop_infrastructure { - echo -e "${BLUE}Stopping infrastructure...${NC}" - - docker-compose down - - echo -e "${GREEN}Infrastructure stopped.${NC}" -} - -function show_logs { - echo -e "${BLUE}Available logs:${NC}" - ls -l "$PROJECT_ROOT/logs" - - echo -e "${YELLOW}Use 'tail -f logs/[filename]' to view a specific log.${NC}" -} - -function print_usage { - echo -e "${BLUE}Credit Card Transaction Anomaly Detection System${NC}" - echo -e "Usage: $0 [options]" - echo -e "Options:" - echo -e " ${GREEN}start${NC} Build and start the entire system" - echo -e " ${GREEN}stop${NC} Stop all components" - echo -e " ${GREEN}restart${NC} Restart the entire system" - echo -e " ${GREEN}status${NC} Check if components are running" - echo -e " ${GREEN}logs${NC} Show log files" -} - -function check_status { - echo -e "${BLUE}Checking system status...${NC}" - - # Check Docker containers - echo -e "${YELLOW}Docker containers:${NC}" - docker-compose ps - - # Check Java processes - echo -e "\n${YELLOW}Java applications:${NC}" - for app in "transaction-simulator" "consumer-visualizer" "anomaly-detector" "alarm-visualizer"; do - if [ -f "$PROJECT_ROOT/logs/$app.pid" ]; then - pid=$(cat "$PROJECT_ROOT/logs/$app.pid") - if ps -p $pid > /dev/null; then - echo -e "${GREEN}$app is running (PID: $pid)${NC}" - else - echo -e "${RED}$app is not running (stale PID file)${NC}" - fi - else - echo -e "${RED}$app is not running${NC}" - fi - done -} - -# Create logs directory -mkdir -p "$PROJECT_ROOT/logs" - -# Parse command-line arguments -case "$1" in - start) - check_prerequisites - build_projects - start_infrastructure - start_applications - ;; - stop) - stop_applications - stop_infrastructure - ;; - restart) - stop_applications - stop_infrastructure - sleep 5 - start_infrastructure - sleep 5 - start_applications - ;; - status) - check_status - ;; - logs) - show_logs - ;; - *) - print_usage - ;; -esac diff --git a/stop_all.sh b/stop_all.sh new file mode 100755 index 00000000..e07a9bb9 --- /dev/null +++ b/stop_all.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +echo "Stopping all Java applications..." +pkill -f "java -jar target/transaction-simulator" +pkill -f "java -jar target/anomaly-detector" +pkill -f "java -jar target/kafka-consumer-visualizer" +pkill -f "java -jar target/alarm-visualizer" + +echo "Stopping Docker containers..." +docker-compose down + +echo "All applications have been stopped!" diff --git a/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java b/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java index ad67b774..d24d9283 100644 --- a/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java +++ b/transaction-simulator/src/main/java/com/anomaly/producer/TransactionProducer.java @@ -5,15 +5,13 @@ import com.anomaly.model.Transaction; import com.google.gson.Gson; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; public class TransactionProducer { - private static final Logger logger = LoggerFactory.getLogger(TransactionProducer.class); + //private static final Logger logger = LoggerFactory.getLogger(TransactionProducer.class); private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String TOPIC_NAME = "transactions"; private static final TransactionGenerator generator = new TransactionGenerator(); @@ -31,10 +29,10 @@ public class TransactionProducer { // Add shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("Shutting down producer..."); + //logger.info("Shutting down producer..."); producer.flush(); producer.close(); - logger.info("Producer closed"); + //logger.info("Producer closed"); })); // Generate and send transactions @@ -58,7 +56,7 @@ public class TransactionProducer { Thread.sleep(ThreadLocalRandom.current().nextLong(100, 1000)); } } catch (InterruptedException | ExecutionException e) { - logger.error("Error in transaction producer", e); + //logger.error("Error in transaction producer", e); } finally { producer.flush(); producer.close(); @@ -76,10 +74,10 @@ public class TransactionProducer { producer.send(record, (metadata, exception) -> { if (exception == null) { - logger.info("Received metadata: Topic: {}, Partition: {}, Offset: {}, Timestamp: {}", - metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp()); + //logger.info("Received metadata: Topic: {}, Partition: {}, Offset: {}, Timestamp: {}", + // metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp()); } else { - logger.error("Error sending message", exception); + //logger.error("Error sending message", exception); } }).get(); // Making it synchronous for demonstration } diff --git a/transaction-simulator/src/main/resources/logback.xml b/transaction-simulator/src/main/resources/logback.xml new file mode 100644 index 00000000..e9231f59 --- /dev/null +++ b/transaction-simulator/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + +