commit cc1989eb7b6e039cafe95d2bef697ace67cbc1d2 Author: Krzysztof Rudnicki Date: Sun Jun 16 21:47:34 2024 +0200 feat: project speedrun diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..efa407c3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,162 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ \ No newline at end of file diff --git a/flink/anomaly_detector.py b/flink/anomaly_detector.py new file mode 100644 index 00000000..7a4a8d91 --- /dev/null +++ b/flink/anomaly_detector.py @@ -0,0 +1,20 @@ +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer +from pyflink.common.serialization import SimpleStringSchema +import json + +def detect_anomalies(transaction): + transaction_data = json.loads(transaction) + # Add anomaly detection logic here + return transaction_data + +env = StreamExecutionEnvironment.get_execution_environment() +kafka_consumer = FlinkKafkaConsumer( + topics='transactions', + deserialization_schema=SimpleStringSchema(), + properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'anomaly_detection'} +) + +data_stream = env.add_source(kafka_consumer).map(detect_anomalies) +data_stream.print() +env.execute("Anomaly Detection") diff --git a/kafka/generate_and_send.py b/kafka/generate_and_send.py new file mode 100644 index 00000000..9c12d84d --- /dev/null +++ b/kafka/generate_and_send.py @@ -0,0 +1,32 @@ +from confluent_kafka import Producer +import json +import random +import time + +# Configuration for Kafka Producer +conf = {'bootstrap.servers': "localhost:9092"} +producer = Producer(**conf) + +def generate_transaction(): + return { + "card_id": random.randint(1000, 9999), + "user_id": random.randint(100, 999), + "location": { + "latitude": round(random.uniform(-90, 90), 6), + "longitude": round(random.uniform(-180, 180), 6) + }, + "transaction_value": round(random.uniform(1, 1000), 2), + "spending_limit": round(random.uniform(1000, 5000), 2) + } + +def delivery_report(err, msg): + if err is not None: + print('Message delivery failed: {}'.format(err)) + else: + print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) + +while True: + transaction = generate_transaction() + producer.produce('transactions', key=str(transaction["card_id"]), value=json.dumps(transaction), callback=delivery_report) + producer.poll(1) + time.sleep(1) # Adjust the sleep time to control the frequency of transactions diff --git a/kafka/kafka_consumer.py b/kafka/kafka_consumer.py new file mode 100644 index 00000000..21a79f9d --- /dev/null +++ b/kafka/kafka_consumer.py @@ -0,0 +1,21 @@ +from confluent_kafka import Consumer, KafkaError + +conf = {'bootstrap.servers': "localhost:9092", + 'group.id': "test_group", + 'auto.offset.reset': 'earliest'} + +consumer = Consumer(**conf) +consumer.subscribe(['transactions']) + +while True: + msg = consumer.poll(timeout=1.0) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print(msg.error()) + break + print('Received message: {}'.format(msg.value().decode('utf-8'))) +consumer.close() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..7c525bf7 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +flask +confluent_kafka +pyflink diff --git a/run.sh b/run.sh new file mode 100755 index 00000000..4e166bc7 --- /dev/null +++ b/run.sh @@ -0,0 +1,99 @@ +#!/bin/bash + +# Function to run a script in the background and save its PID +run_script() { + python $1 & + echo $! +} + +# Function to install Kafka +install_kafka() { + if ! command -v kafka-server-start &> /dev/null; then + echo "Kafka is not installed. Installing Kafka..." + sudo pacman -Syu --noconfirm + sudo pacman -S --noconfirm kafka + else + echo "Kafka is already installed." + fi +} + +# Function to install Flink using yay +install_flink() { + if ! command -v flink &> /dev/null; then + echo "Flink is not installed. Installing Flink..." + if ! command -v yay &> /dev/null; then + echo "yay is not installed. Installing yay..." + sudo pacman -S --noconfirm git + git clone https://aur.archlinux.org/yay.git + cd yay + makepkg -si --noconfirm + cd .. + rm -rf yay + fi + yay -S --noconfirm apache-flink + else + echo "Flink is already installed." + fi +} + +# Ensure Kafka and Flink are installed +#install_kafka +#install_flink + +# Ensure Kafka and Flink services are running +echo "Starting Kafka and Flink services..." +sudo systemctl start kafka +start-cluster.sh + +# Give some time for Kafka and Flink to start +sleep 1 + +# Start the Kafka producer (Transaction Simulator) +echo "Starting transaction simulator..." +producer_pid=$(run_script './kafka/generate_and_send.py') +echo "Started transaction simulator with PID $producer_pid" + +# Give some time for the producer to start and produce initial transactions +sleep 1 + +# Start the Kafka consumer for testing +echo "Starting Kafka consumer for testing..." +consumer_pid=$(run_script './kafka/kafka_consumer.py') +echo "Started Kafka consumer for testing with PID $consumer_pid" + +# Give some time for the consumer to read initial transactions +sleep 1 + +# Start the Flink job (Anomaly Detector) +echo "Starting Flink anomaly detector..." +flink_pid=$(run_script './flink/anomaly_detector.py') +echo "Started Flink anomaly detector with PID $flink_pid" + +# Give some time for the Flink job to start processing +sleep 1 + +# Start the Flask application (Alarm Reader and Visualizer) +echo "Starting Flask alarm reader..." +flask_pid=$(run_script './webinterface/webinterface.py') +echo "Started Flask alarm reader with PID $flask_pid" + +# Function to handle script termination +terminate_scripts() { + echo "Shutting down..." + kill $producer_pid + kill $consumer_pid + kill $flink_pid + kill $flask_pid + echo "All processes terminated." + # Optionally stop Kafka and Flink services + sudo systemctl stop kafka + sudo systemctl stop flink +} + +# Trap SIGINT (Ctrl+C) to terminate all subprocesses +trap terminate_scripts SIGINT + +# Keep the main script running to allow all subprocesses to continue executing +while true; do + sleep 1 +done diff --git a/webinterface/webinterface.py b/webinterface/webinterface.py new file mode 100644 index 00000000..c08d8959 --- /dev/null +++ b/webinterface/webinterface.py @@ -0,0 +1,31 @@ +from flask import Flask, jsonify +from confluent_kafka import Consumer, KafkaError + +app = Flask(__name__) + +@app.route('/alarms', methods=['GET']) +def get_alarms(): + alarms = [] + conf = {'bootstrap.servers': "localhost:9092", + 'group.id': "alarm_group", + 'auto.offset.reset': 'earliest'} + + consumer = Consumer(**conf) + consumer.subscribe(['alarms']) + + while True: + msg = consumer.poll(timeout=1.0) + if msg is None: + break + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + return str(msg.error()), 500 + alarms.append(json.loads(msg.value().decode('utf-8'))) + + consumer.close() + return jsonify(alarms) + +if __name__ == '__main__': + app.run(debug=True)