import os import asyncio import websockets import requests import base64 import json from datetime import datetime, time, timedelta from fastapi import FastAPI # Create FastAPI app app = FastAPI() # Mode configuration: DBG for debug, PRD for production MODE = os.getenv('MODE', 'PRD') DEBUG_MODE = MODE == 'DBG' PHONE_NUMBER = os.getenv('PHONE_NUMBER', '1234567890') RECEIVE_URL = f"http://localhost:9922/v1/receive/{PHONE_NUMBER}" REMOVE_ATTACHMENT_URL = f"http://localhost:9922/v1/attachments/" SEND_URL = 'http://localhost:9922/v2/send' GROUP_ID = os.getenv('GROUP_ID', '') GROUP_ID_SEND = os.getenv('GROUP_ID_SEND', '') CAT_API = os.getenv('CAT_API', '') last_command_time = None warning_sent = False def get_recipient(): """Get the appropriate recipient based on debug mode. In debug mode, messages are sent to 'note to self' (own phone number). """ if DEBUG_MODE: return PHONE_NUMBER return GROUP_ID_SEND def get_source_group(): """Get the source group to listen to based on debug mode. In debug mode, listen to messages from self (note to self). """ if DEBUG_MODE: return None # In debug mode, listen to all messages from self return GROUP_ID class StringCounter: def __init__(self): self.string_map = {} async def update_string_map(self, key, common_name): if key in self.string_map: self.string_map[key]['count'] += 1 else: self.string_map[key] = {'common_name': common_name, 'count': 1} return self.string_map def get_common_name(self, key): if key in self.string_map: return self.string_map[key]['common_name'] return None def download_image(image_url): # Download the image image_response = requests.get(image_url) image_response.raise_for_status() # Ensure the request was successful # Extract the image filename from the URL image_filename = image_url.split("/")[-1] with open(image_filename, 'wb') as image_file: image_file.write(image_response.content) # Convert the image to base64 encoded data with open(image_filename, 'rb') as image_file: base64_encoded_data = base64.b64encode(image_file.read()).decode('utf-8') os.remove(image_filename) return base64_encoded_data def fetch_and_download_image(api_url, image_url_key): # Send request to the API response = requests.get(api_url) response.raise_for_status() # Ensure the request was successful # Parse the response JSON to get the image URL data = response.json() # Retrieve the image URL based on the provided key if isinstance(image_url_key, list): image_url = data for key in image_url_key: image_url = image_url[key] else: image_url = data[image_url_key] return download_image(image_url) async def send_image(base64_attachments, recipients=GROUP_ID_SEND): data = { "base64_attachments": [base64_attachments], "number": PHONE_NUMBER, "recipients": [recipients] } response = requests.post(SEND_URL, json=data) if response.status_code == 200 or response.status_code == 201: print("Request was successful.") else: print(f"Request failed with status code: {response.status_code}") print(response.text) def send_message(message_content, recipients=PHONE_NUMBER): data = { "message": str(message_content), "number": PHONE_NUMBER, "recipients": [recipients] } response = requests.post(SEND_URL, json=data) if response.status_code == 200: print("Request was successful.") else: print(f"Request failed with status code: {response.status_code} {data}") print(response.text) def message_message(inside_message): message_value = inside_message.get('message') return message_value def message_group_id(inside_message): return inside_message.get('groupInfo', {}).get('groupId', {}) def extract_message_content(message): message_json = json.loads(message) inside_message = message_json.get('envelope', {}).get('dataMessage', {}) if inside_message == {}: inside_message = message_json.get('envelope', {}).get('syncMessage', {}).get('sentMessage', {}) return inside_message def extract_source_uuid(message): message_json = message inside_message = message_json.get('sourceUuid', {}) return inside_message command_map = { ("!kot", "!koty", "!kots", "!cat", "!cats", "!meow", "!miau", "!į“‹į“į“›", "!š““š“øš“½", "!š—øš—¼š˜"): lambda recipient: send_image(fetch_and_download_image("https://api.thecatapi.com/v1/images/search", [0, 'url']), recipient), ("!pies", "!psy", "!dog", "!dogs", "!woof", "!szczek", "!š—½š—¶š—²š˜€", "!͓̽p͓̽i͓̽e͓̽s͓̽"): lambda recipient: send_image(fetch_and_download_image("https://dog.ceo/api/breeds/image/random", 'message'), recipient), # ("!traps"): lambda recipient: send_image(download_image(((r34Py.random_post(["trap"])).sample)), recipient) } def extract_source_name(message): message_json = message inside_message = message_json.get('sourceName', {}) return inside_message USER_MESSAGE_COUNT = {} async def count_messages(message_content, counter): if message_content and should_count(message_content): uuid = extract_source_uuid(message_content) source_name = extract_source_name(message_content) await counter.update_string_map(uuid, source_name) # Use get_recipient() to respect debug mode - send count update to self in debug mode recipient = get_recipient() send_message(counter.string_map, recipient) async def scheduled_task(counter): while True: now = datetime.now() target_time = datetime.combine(now.date(), time(21, 37)) if now > target_time: target_time += timedelta(days=1) wait_time = (target_time - now).total_seconds() await asyncio.sleep(wait_time) # Trigger your function here - use appropriate recipient based on mode recipient = get_recipient() send_message(counter.string_map, recipient) counter.string_map = {} async def trigger_command(message_content, recipient): global last_command_time, warning_sent message_value = message_message(message_content) try: if message_value is not None and message_value[0] == "!": current_time = datetime.now() if last_command_time and current_time - last_command_time < timedelta(seconds=10): if not warning_sent: send_message("BEEP BOOP POCZEKAJ 10 SEKUND.", recipient) warning_sent = True return for command_triggers, command_function in command_map.items(): if message_value in command_triggers: await command_function(recipient) last_command_time = current_time warning_sent = False break except TypeError: send_message(f"trigger_command, TypeError {message_content}", recipient) except Exception as e: send_message(f"trigger_command, unknown error {message_content}: {str(e)}", recipient) async def send_to_group(message_content, counter, message): """Process messages and send responses. In debug mode, processes messages from 'note to self' and sends responses back to self. In production mode, processes messages from GROUP_ID and sends to GROUP_ID_SEND. """ source_group = message_group_id(message_content) recipient = get_recipient() if DEBUG_MODE: # In debug mode, process all messages (including from self/"note to self") # Check if it's a sync message (from self) message_json = json.loads(message) is_sync_message = message_json.get('envelope', {}).get('syncMessage', {}) != {} if is_sync_message: # This is a "note to self" message await count_messages(message_json.get('envelope', {}), counter) await trigger_command(message_content, recipient) else: # Production mode: only process messages from the configured group if source_group == GROUP_ID: await count_messages(json.loads(message).get('envelope', {}), counter) await trigger_command(message_content, recipient) async def remove_attachment(attachment_id): response = requests.delete(REMOVE_ATTACHMENT_URL + attachment_id) if response.status_code == 200 or response.status_code == 204: print("Request remove_attachment was successful.") else: print(f"Request remove_attachment failed with status code: {response.status_code}") print(response) async def get_attachments(): response = requests.get(REMOVE_ATTACHMENT_URL) if response.status_code == 200: attachments = json.loads(response.content) print("attachments: ", attachments) for attachment in attachments: print("attachment: ", attachment) await remove_attachment(attachment) else: print(f"Request failed with status code: {response.status_code}") print(response) def is_message_reaction(message): message_json = json.loads(message) inside_message = message_json.get('envelope', {}).get('dataMessage', {}).get('reaction', {}) if inside_message == {}: inside_message = message_json.get('envelope', {}).get('syncMessage', {}).get('reaction', {}) if inside_message != {}: return True return False def should_count(message_content): print("should_count triggered") #if message_content.get('destinationNumber', {}) != PHONE_NUMBER: # print("not counting because destinationNumber != PHONE_NUMBER") # return False sticker = message_content.get("dataMessage", {}).get("sticker", {}) print("sticker ", sticker) if sticker != {}: print("not counting because message has a sticker ", message_content) return False print("counting message: ", message_content) return True async def listen_to_server(counter): uri = f"ws://localhost:9922/v1/receive/{PHONE_NUMBER}?send_read_receipts=false" async with websockets.connect(uri) as websocket: mode_str = "DEBUG" if DEBUG_MODE else "PRODUCTION" print(f"Connected to signal server in {mode_str} mode") if DEBUG_MODE: print("Debug mode: Commands will be received and sent via 'note to self'") try: async for message in websocket: if is_message_reaction(message) == False: print("message: ", message) message_content = extract_message_content(message) await send_to_group(message_content, counter, message) except websockets.ConnectionClosed as e: print(f"Connection closed: {e}") # Endpoint to start the asyncio server tasks @app.on_event("startup") async def start_tasks(): counter = StringCounter() task1 = asyncio.create_task(listen_to_server(counter)) task2 = asyncio.create_task(scheduled_task(counter)) await asyncio.gather(task1, task2)