signal-bot/main.py
copilot-swe-agent[bot] 3a5bf40f41 Address code review feedback
- Remove unused imports from main.py
- Add clearer documentation for extract_envelope_source_uuid
- Use Iterable[str] type hint instead of tuple

Co-authored-by: kuhyx <147418882+kuhyx@users.noreply.github.com>
2025-12-01 15:16:15 +00:00

293 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import asyncio
import websockets
import requests
import base64
import json
from datetime import datetime, time, timedelta
from fastapi import FastAPI
from rules import (
RuleEngine,
CommandContext,
GlobalCooldownRule,
)
# Create FastAPI app
app = FastAPI()
PHONE_NUMBER = os.getenv('PHONE_NUMBER', '1234567890')
RECEIVE_URL = f"http://localhost:9922/v1/receive/{PHONE_NUMBER}"
REMOVE_ATTACHMENT_URL = f"http://localhost:9922/v1/attachments/"
SEND_URL = 'http://localhost:9922/v2/send'
GROUP_ID = os.getenv('GROUP_ID', '')
GROUP_ID_SEND = os.getenv('GROUP_ID_SEND', '')
CAT_API = os.getenv('CAT_API', '')
# Initialize the rule engine with a global 10-second cooldown (backward compatible)
rule_engine = RuleEngine()
rule_engine.add_global_rule(
GlobalCooldownRule(
cooldown=timedelta(seconds=10),
failure_message="BEEP BOOP POCZEKAJ 10 SEKUND."
)
)
warning_sent = False
class StringCounter:
def __init__(self):
self.string_map = {}
async def update_string_map(self, key, common_name):
if key in self.string_map:
self.string_map[key]['count'] += 1
else:
self.string_map[key] = {'common_name': common_name, 'count': 1}
return self.string_map
def get_common_name(self, key):
if key in self.string_map:
return self.string_map[key]['common_name']
return None
def download_image(image_url):
# Download the image
image_response = requests.get(image_url)
image_response.raise_for_status() # Ensure the request was successful
# Extract the image filename from the URL
image_filename = image_url.split("/")[-1]
with open(image_filename, 'wb') as image_file:
image_file.write(image_response.content)
# Convert the image to base64 encoded data
with open(image_filename, 'rb') as image_file:
base64_encoded_data = base64.b64encode(image_file.read()).decode('utf-8')
os.remove(image_filename)
return base64_encoded_data
def fetch_and_download_image(api_url, image_url_key):
# Send request to the API
response = requests.get(api_url)
response.raise_for_status() # Ensure the request was successful
# Parse the response JSON to get the image URL
data = response.json()
# Retrieve the image URL based on the provided key
if isinstance(image_url_key, list):
image_url = data
for key in image_url_key:
image_url = image_url[key]
else:
image_url = data[image_url_key]
return download_image(image_url)
async def send_image(base64_attachments, recipients=GROUP_ID_SEND):
data = {
"base64_attachments": [base64_attachments],
"number": PHONE_NUMBER,
"recipients": [recipients]
}
response = requests.post(SEND_URL, json=data)
if response.status_code == 200 or response.status_code == 201:
print("Request was successful.")
else:
print(f"Request failed with status code: {response.status_code}")
print(response.text)
def send_message(message_content, recipients=PHONE_NUMBER):
data = {
"message": str(message_content),
"number": PHONE_NUMBER,
"recipients": [recipients]
}
response = requests.post(SEND_URL, json=data)
if response.status_code == 200:
print("Request was successful.")
else:
print(f"Request failed with status code: {response.status_code} {data}")
print(response.text)
def message_message(inside_message):
message_value = inside_message.get('message')
return message_value
def message_group_id(inside_message):
return inside_message.get('groupInfo', {}).get('groupId', {})
def extract_message_content(message):
message_json = json.loads(message)
inside_message = message_json.get('envelope', {}).get('dataMessage', {})
if inside_message == {}:
inside_message = message_json.get('envelope', {}).get('syncMessage', {}).get('sentMessage', {})
return inside_message
def extract_source_uuid(message):
message_json = message
inside_message = message_json.get('sourceUuid', {})
return inside_message
command_map = {
("!kot", "!koty", "!kots", "!cat", "!cats", "!meow", "!miau", "!ᴋᴏᴛ", "!𝓴𝓸𝓽", "!𝗸𝗼𝘁"): lambda recipient: send_image(fetch_and_download_image("https://api.thecatapi.com/v1/images/search", [0, 'url']), recipient),
("!pies", "!psy", "!dog", "!dogs", "!woof", "!szczek", "!𝗽𝗶𝗲𝘀", "!͓̽p͓̽i͓̽e͓̽s͓̽"): lambda recipient: send_image(fetch_and_download_image("https://dog.ceo/api/breeds/image/random", 'message'), recipient),
# ("!traps"): lambda recipient: send_image(download_image(((r34Py.random_post(["trap"])).sample)), recipient)
}
def extract_source_name(message):
message_json = message
inside_message = message_json.get('sourceName', {})
return inside_message
USER_MESSAGE_COUNT = {}
async def count_messages(message_content, counter):
if message_content and should_count(message_content):
uuid = extract_source_uuid(message_content)
source_name = extract_source_name(message_content)
await counter.update_string_map(uuid, source_name)
send_message(counter.string_map, PHONE_NUMBER)
async def scheduled_task(counter):
while True:
now = datetime.now()
target_time = datetime.combine(now.date(), time(21, 37))
if now > target_time:
target_time += timedelta(days=1)
wait_time = (target_time - now).total_seconds()
await asyncio.sleep(wait_time)
# Trigger your function here
send_message(counter.string_map, GROUP_ID_SEND)
counter.string_map = {}
def extract_envelope_source_uuid(message):
"""Extract sourceUuid from the raw JSON message string.
Unlike extract_source_uuid() which works on parsed envelope data,
this function parses the raw message string and extracts from envelope.
Used to get user ID for the rule engine context.
"""
message_json = json.loads(message)
return message_json.get('envelope', {}).get('sourceUuid', '')
async def trigger_command(message_content, recipient, raw_message=None):
global warning_sent
message_value = message_message(message_content)
try:
if message_value is not None and message_value[0] == "!":
current_time = datetime.now()
# Extract user ID for the rule engine
user_id = ''
if raw_message:
user_id = extract_envelope_source_uuid(raw_message)
for command_triggers, command_function in command_map.items():
if message_value in command_triggers:
# Create command context for rule evaluation
context = CommandContext(
command=message_value,
user_id=user_id,
current_time=current_time,
message_data=message_content
)
# Evaluate rules
should_trigger, failure_message = rule_engine.evaluate(context)
if not should_trigger:
if failure_message and not warning_sent:
send_message(failure_message, recipient)
warning_sent = True
return
# Execute the command
await command_function(recipient)
# Record usage for the rule engine
rule_engine.record_usage(message_value, user_id, current_time)
warning_sent = False
break
except TypeError:
send_message(f"trigger_command, TypeError {message_content}", recipient)
except Exception as e:
send_message(f"trigger_command, unknown error {message_content}: {str(e)}", recipient)
async def send_to_group(message_content, counter, message):
if message_group_id(message_content) == GROUP_ID:
await count_messages(json.loads(message).get('envelope', {}), counter)
await trigger_command(message_content, GROUP_ID_SEND, message)
async def remove_attachment(attachment_id):
response = requests.delete(REMOVE_ATTACHMENT_URL + attachment_id)
if response.status_code == 200 or response.status_code == 204:
print("Request remove_attachment was successful.")
else:
print(f"Request remove_attachment failed with status code: {response.status_code}")
print(response)
async def get_attachments():
response = requests.get(REMOVE_ATTACHMENT_URL)
if response.status_code == 200:
attachments = json.loads(response.content)
print("attachments: ", attachments)
for attachment in attachments:
print("attachment: ", attachment)
await remove_attachment(attachment)
else:
print(f"Request failed with status code: {response.status_code}")
print(response)
def is_message_reaction(message):
message_json = json.loads(message)
inside_message = message_json.get('envelope', {}).get('dataMessage', {}).get('reaction', {})
if inside_message == {}:
inside_message = message_json.get('envelope', {}).get('syncMessage', {}).get('reaction', {})
if inside_message != {}:
return True
return False
def should_count(message_content):
print("should_count triggered")
#if message_content.get('destinationNumber', {}) != PHONE_NUMBER:
# print("not counting because destinationNumber != PHONE_NUMBER")
# return False
sticker = message_content.get("dataMessage", {}).get("sticker", {})
print("sticker ", sticker)
if sticker != {}:
print("not counting because message has a sticker ", message_content)
return False
print("counting message: ", message_content)
return True
async def listen_to_server(counter):
uri = f"ws://localhost:9922/v1/receive/{PHONE_NUMBER}?send_read_receipts=false"
async with websockets.connect(uri) as websocket:
print(f"Connected to signal server")
try:
async for message in websocket:
if is_message_reaction(message) == False:
print("message: ", message)
message_content = extract_message_content(message)
await send_to_group(message_content, counter, message)
except websockets.ConnectionClosed as e:
print(f"Connection closed: {e}")
# Endpoint to start the asyncio server tasks
@app.on_event("startup")
async def start_tasks():
counter = StringCounter()
task1 = asyncio.create_task(listen_to_server(counter))
task2 = asyncio.create_task(scheduled_task(counter))
await asyncio.gather(task1, task2)