testsAndMisc/python_pkg/lichess_bot/main.py
Krzysztof kuhy Rudnicki dc12f22c3e refactor(lichess_bot): reduce complexity with dataclasses and extracted functions
- Add GameState, GameMeta, BotContext dataclasses for state bundling
- Extract 20+ helper functions from nested closures to module level
- Fix C901, PLR0912, PLR0915 complexity violations
- Fix mypy type errors with proper type annotations
- Add noqa for intentional S603 (subprocess call to trusted internal script)
2025-11-30 23:40:53 +01:00

647 lines
20 KiB
Python

"""Main entry point for the Lichess bot."""
from __future__ import annotations
import argparse
import contextlib
from dataclasses import dataclass, field
import datetime
import json
import logging
import os
from pathlib import Path
import re
import subprocess
import sys
import threading
from typing import TYPE_CHECKING
import chess
import chess.pgn
import requests
from python_pkg.lichess_bot.engine import RandomEngine
from python_pkg.lichess_bot.lichess_api import LichessAPI
from python_pkg.lichess_bot.utils import backoff_sleep, get_and_increment_version
if TYPE_CHECKING:
from collections.abc import Iterator
_logger = logging.getLogger(__name__)
# Regex for parsing ply numbers from analysis output
_PLY_LINE_RE = re.compile(r"^\s*(\d+)\s")
# Game end statuses
_GAME_END_STATUSES = frozenset({"mate", "resign", "stalemate", "timeout", "draw"})
@dataclass
class GameMeta:
"""Metadata for a game (for PGN headers and logging)."""
game_id: str
bot_version: int
site_url: str | None = None
date_iso: str | None = None
white_name: str | None = None
black_name: str | None = None
@dataclass
class GameState:
"""Mutable state for an ongoing game."""
board: chess.Board = field(default_factory=chess.Board)
color: str | None = None
last_handled_len: int = -1
my_ms: int | None = None
opp_ms: int | None = None
inc_ms: int = 0
log_path: Path | None = None
@dataclass
class BotContext:
"""Shared context for bot operations."""
api: LichessAPI
engine: RandomEngine
bot_version: int
decline_correspondence: bool = False
def _apply_move_to_board(board: chess.Board, move: str, game_id: str) -> None:
"""Apply a single move to the board, logging errors."""
try:
board.push_uci(move)
except ValueError:
_logger.debug(f"Game {game_id}: could not apply move {move}")
def _init_game_log(game_id: str, bot_version: int) -> Path | None:
"""Initialize the game log file."""
game_log_path = Path.cwd() / f"lichess_bot_game_{game_id}.log"
try:
with open(game_log_path, "w") as lf:
lf.write(f"game {game_id} started\n")
lf.write(f"bot_version v{bot_version}\n")
except OSError:
return None
return game_log_path
def _extract_game_full_data(
event: dict[str, object],
state: GameState,
meta: GameMeta,
api: LichessAPI,
) -> tuple[str, str | None]:
"""Extract data from a gameFull event.
Returns:
Tuple of (moves_string, status).
"""
state_data = event.get("state", {})
if not isinstance(state_data, dict):
state_data = {}
moves = str(state_data.get("moves", ""))
status = state_data.get("status")
# Update clocks - values are int milliseconds from API
wtime = state_data.get("wtime")
btime = state_data.get("btime")
if state.color == "white":
state.my_ms = int(wtime) if isinstance(wtime, int | float) else None
state.opp_ms = int(btime) if isinstance(btime, int | float) else None
else:
state.my_ms = int(btime) if isinstance(btime, int | float) else None
state.opp_ms = int(wtime) if isinstance(wtime, int | float) else None
inc = state_data.get("winc") or state_data.get("binc")
state.inc_ms = int(inc) if isinstance(inc, int | float) else 0
# Extract player info
white_data = event.get("white", {})
black_data = event.get("black", {})
if isinstance(white_data, dict) and isinstance(black_data, dict):
white_id = white_data.get("id")
black_id = black_data.get("id")
meta.white_name = str(white_data.get("name") or white_id or "?")
meta.black_name = str(black_data.get("name") or black_id or "?")
# Determine color
me = api.get_my_user_id()
if me == white_id:
state.color = "white"
elif me == black_id:
state.color = "black"
# Extract date
with contextlib.suppress(Exception):
created_ms = event.get("createdAt") or event.get("createdAtDate")
if created_ms is not None:
meta.date_iso = datetime.datetime.fromtimestamp(
int(str(created_ms)) / 1000,
tz=datetime.timezone.utc,
).strftime("%Y.%m.%d")
meta.site_url = f"https://lichess.org/{meta.game_id}"
return moves, str(status) if status else None
def _extract_game_state_data(
event: dict[str, object], state: GameState
) -> tuple[str, str | None]:
"""Extract data from a gameState event.
Returns:
Tuple of (moves_string, status).
"""
moves = str(event.get("moves", ""))
status = event.get("status")
# Update clocks based on color
if state.color == "white":
state.my_ms = event.get("wtime", state.my_ms) # type: ignore[assignment]
state.opp_ms = event.get("btime", state.opp_ms) # type: ignore[assignment]
state.inc_ms = event.get("winc", state.inc_ms) # type: ignore[assignment]
elif state.color == "black":
state.my_ms = event.get("btime", state.my_ms) # type: ignore[assignment]
state.opp_ms = event.get("wtime", state.opp_ms) # type: ignore[assignment]
state.inc_ms = event.get("binc", state.inc_ms) # type: ignore[assignment]
return moves, str(status) if status else None
def _calculate_time_budget(
state: GameState, board: chess.Board, max_time_sec: float
) -> float:
"""Calculate time budget for the next move."""
est_moves_left = max(10, min(60, 30 - board.fullmove_number // 2))
time_left_sec = (state.my_ms or 0) / 1000.0
inc_sec = (state.inc_ms or 0) / 1000.0
budget = 0.6 * (time_left_sec / max(1, est_moves_left)) + 0.5 * inc_sec
# Double the budget for more thoughtful moves
budget *= 2.0
return max(0.05, min(max_time_sec, budget))
def _log_move_to_file(
log_path: Path | None, ply: int, move: chess.Move, reason: str
) -> None:
"""Log a move to the game log file."""
if log_path:
with open(log_path, "a") as lf:
lf.write(f"ply {ply}: {move.uci()}\n{reason}\n\n")
def _attempt_move(
ctx: BotContext,
state: GameState,
meta: GameMeta,
board: chess.Board,
) -> bool:
"""Attempt to make a move. Returns True if game should continue."""
budget = _calculate_time_budget(state, board, ctx.engine.max_time_sec)
move, reason = ctx.engine.choose_move_with_explanation(
board, time_budget_sec=budget
)
if move is None:
_logger.info(f"Game {meta.game_id}: no legal moves (game likely over)")
return False
time_left_sec = (state.my_ms or 0) / 1000.0
inc_sec = (state.inc_ms or 0) / 1000.0
try:
if move not in board.legal_moves:
_logger.info(
f"Game {meta.game_id}: selected move no longer legal; skipping send"
)
else:
_logger.info(
f"Game {meta.game_id}: playing {move.uci()} "
f"(budget={budget:.2f}s, my_time_left={time_left_sec:.1f}s, "
f"inc={inc_sec:.2f}s)"
)
_log_move_to_file(state.log_path, state.last_handled_len + 1, move, reason)
ctx.api.make_move(meta.game_id, move)
except requests.RequestException as e:
_logger.warning(f"Game {meta.game_id}: move {move.uci()} failed: {e}")
return True
def _is_my_turn(board: chess.Board, color: str | None) -> bool:
"""Check if it's our turn to move."""
is_white_turn = board.turn
return (is_white_turn and color == "white") or (
(not is_white_turn) and color == "black"
)
def _rebuild_board_from_moves(moves_list: list[str], game_id: str) -> chess.Board:
"""Rebuild board from list of moves."""
board = chess.Board()
for m in moves_list:
_apply_move_to_board(board, m, game_id)
return board
def _handle_move_if_needed(
ctx: BotContext,
state: GameState,
meta: GameMeta,
et: str,
new_len: int,
) -> bool:
"""Handle making a move if it's our turn. Returns False if game ends."""
my_turn = _is_my_turn(state.board, state.color)
turn_str = "white" if state.board.turn else "black"
_logger.info(f"Game {meta.game_id}: turn={turn_str}, my_turn={my_turn}")
# Move policy
allow_move = (et == "gameState") or (et == "gameFull" and new_len == 0)
if my_turn and allow_move and not _attempt_move(ctx, state, meta, state.board):
return False
# Mark position as handled
if et == "gameState" or (my_turn and allow_move):
state.last_handled_len = new_len
return True
def _process_game_event(
event: dict[str, object],
ctx: BotContext,
state: GameState,
meta: GameMeta,
) -> bool:
"""Process a single game event. Returns False if game should end."""
et = event.get("type")
if et not in ("gameFull", "gameState"):
return True # Continue processing other events
# At this point et is guaranteed to be a string
event_type = str(et)
# Extract moves and status based on event type
if event_type == "gameFull":
moves, status = _extract_game_full_data(event, state, meta, ctx.api)
_logger.info(f"Game {meta.game_id}: joined as {state.color} (gameFull)")
else:
moves, status = _extract_game_state_data(event, state)
moves_list = moves.split() if moves else []
new_len = len(moves_list)
_logger.info(
f"Game {meta.game_id}: event={event_type}, moves={new_len}, color={state.color}"
)
if new_len == state.last_handled_len:
_logger.debug(
f"Game {meta.game_id}: position unchanged (len={new_len}), skipping"
)
return True
# Rebuild board from moves
state.board = _rebuild_board_from_moves(moves_list, meta.game_id)
if state.color is None:
_logger.info(f"Game {meta.game_id}: color unknown yet; waiting for gameFull")
if event_type == "gameState":
state.last_handled_len = new_len
return True
if not _handle_move_if_needed(ctx, state, meta, event_type, new_len):
return False
# Check for game end
if status in _GAME_END_STATUSES:
_logger.info(f"Game {meta.game_id} finished: {status}")
return False
return True
def _write_pgn_to_log(log_path: Path, board: chess.Board, meta: GameMeta) -> None:
"""Write PGN to the game log file."""
game = chess.pgn.Game.from_board(board)
with contextlib.suppress(Exception):
game.headers["BotVersion"] = f"v{meta.bot_version}"
if meta.site_url:
game.headers["Site"] = meta.site_url
if meta.date_iso:
game.headers["Date"] = meta.date_iso
if meta.white_name:
game.headers["White"] = meta.white_name
if meta.black_name:
game.headers["Black"] = meta.black_name
with open(log_path, "a") as lf:
lf.write("\nPGN:\n")
exporter = chess.pgn.StringExporter(
headers=True, variations=False, comments=False
)
lf.write(game.accept(exporter))
lf.write("\n")
def _run_analysis_subprocess(
game_id: str, log_path: Path, total_plies: int
) -> str | None:
"""Run the analysis script and return output text."""
analyze_script = (
Path(__file__).resolve().parent.parent
/ "stockfish_analysis"
/ "analyze_chess_game.py"
)
if not analyze_script.is_file():
_logger.info(
f"Game {game_id}: analysis script not found at {analyze_script}; "
"skipping analysis"
)
return None
_logger.info(f"Game {game_id}: starting post-game analysis ({total_plies} plies)")
proc = subprocess.Popen(
[sys.executable, "-u", str(analyze_script), str(log_path)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
analyzed = 0
lines: list[str] = []
# stdout/stderr are guaranteed non-None with PIPE
assert proc.stdout is not None # noqa: S101
assert proc.stderr is not None # noqa: S101
for line in proc.stdout:
lines.append(line)
m = _PLY_LINE_RE.match(line)
if m:
analyzed += 1
_log_analysis_progress(game_id, analyzed, total_plies)
stderr_text = proc.stderr.read() or ""
ret = proc.wait()
analysis_text = "".join(lines)
if ret != 0:
_logger.warning(f"Game {game_id}: analysis script exited with code {ret}")
if stderr_text:
analysis_text += "\n[stderr]\n" + stderr_text
_logger.info(f"Game {game_id}: analysis complete")
return analysis_text
def _log_analysis_progress(game_id: str, analyzed: int, total_plies: int) -> None:
"""Log analysis progress."""
if total_plies:
left = max(0, total_plies - analyzed)
pct = analyzed / total_plies * 100.0
_logger.info(
f"Game {game_id}: analysis progress "
f"{analyzed}/{total_plies} ({pct:.0f}%), left {left}"
)
else:
_logger.info(
f"Game {game_id}: analysis progress {analyzed} plies (total unknown)"
)
def _insert_analysis_into_log(
log_path: Path, analysis_text: str, meta: GameMeta
) -> None:
"""Insert analysis text into the log file before PGN section."""
try:
with open(log_path, encoding="utf-8", errors="replace") as f:
content = f.read()
# Find insertion point (before PGN)
insert_idx = 0
p = content.find("\nPGN:\n")
if p != -1:
insert_idx = p + 1
elif content.startswith("PGN:\n"):
insert_idx = 0
else:
insert_idx = len(content)
# Build meta block
meta_lines = []
if meta.date_iso:
meta_lines.append(f"Date: {meta.date_iso}")
if meta.white_name or meta.black_name:
meta_lines.append(
f"Players: {meta.white_name or '?'} vs {meta.black_name or '?'}"
)
meta_block = "\n".join(meta_lines) + "\n" if meta_lines else ""
analysis_block = f"{meta_block}ANALYSIS:\n{analysis_text.rstrip()}\n\n"
new_content = content[:insert_idx] + analysis_block + content[insert_idx:]
with open(log_path, "w", encoding="utf-8") as f:
f.write(new_content)
except OSError as e:
_logger.debug(f"Game {meta.game_id}: could not write analysis to log: {e}")
def _finalize_game(state: GameState, meta: GameMeta) -> None:
"""Finalize game: write PGN and run analysis."""
if not state.log_path:
return
try:
_write_pgn_to_log(state.log_path, state.board, meta)
except OSError as e:
_logger.debug(f"Game {meta.game_id}: could not write PGN: {e}")
return
# Run analysis
try:
total_plies = len(state.board.move_stack)
except TypeError:
total_plies = 0
try:
analysis_text = _run_analysis_subprocess(
meta.game_id, state.log_path, total_plies
)
if analysis_text:
_insert_analysis_into_log(state.log_path, analysis_text, meta)
except (subprocess.SubprocessError, OSError) as e:
_logger.debug(f"Game {meta.game_id}: analysis run failed: {e}")
def _handle_game(game_id: str, ctx: BotContext, my_color: str | None = None) -> None:
"""Handle a single game from start to finish."""
_logger.info(f"Starting game thread for {game_id} [bot v{ctx.bot_version}]")
meta = GameMeta(game_id=game_id, bot_version=ctx.bot_version)
state = GameState(color=my_color)
state.log_path = _init_game_log(game_id, ctx.bot_version)
try:
for event in ctx.api.stream_game_events(game_id):
et = event.get("type")
if et in ("chatLine", "opponentGone"):
continue
if not _process_game_event(event, ctx, state, meta):
break
except requests.RequestException:
_logger.exception(f"Game {game_id} thread error")
finally:
_finalize_game(state, meta)
_logger.info(f"Ending game thread for {game_id}")
def _handle_challenge(
challenge: dict[str, object], api: LichessAPI, *, decline_correspondence: bool
) -> None:
"""Handle an incoming challenge."""
ch_id = challenge.get("id", "")
variant_data = challenge.get("variant", {})
variant = (
variant_data.get("key", "standard")
if isinstance(variant_data, dict)
else "standard"
)
speed = challenge.get("speed")
perf_ok = speed in {"bullet", "blitz", "rapid", "classical"}
not_corr = speed != "correspondence" or not decline_correspondence
if variant == "standard" and perf_ok and not_corr:
_logger.info(f"Accepting challenge {ch_id} ({speed})")
api.accept_challenge(str(ch_id))
else:
_logger.info(f"Declining challenge {ch_id} (variant={variant}, speed={speed})")
api.decline_challenge(str(ch_id))
def _process_bot_event(
event: dict[str, object],
ctx: BotContext,
game_threads: dict[str, threading.Thread],
) -> None:
"""Process a single bot event (challenge, gameStart, etc.)."""
event_type = event.get("type")
if event_type == "challenge":
challenge = event.get("challenge", {})
if isinstance(challenge, dict):
_handle_challenge(
challenge, ctx.api, decline_correspondence=ctx.decline_correspondence
)
elif event_type == "gameStart":
game_data = event.get("game", {})
if isinstance(game_data, dict):
game_id = str(game_data.get("id", ""))
if game_id and (
game_id not in game_threads or not game_threads[game_id].is_alive()
):
t = threading.Thread(
target=_handle_game,
args=(game_id, ctx),
name=f"game-{game_id}",
)
t.daemon = True
game_threads[game_id] = t
t.start()
elif event_type == "gameFinish":
game_data = event.get("game", {})
if isinstance(game_data, dict):
game_id = game_data.get("id", "")
_logger.info(f"Game finished event: {game_id}")
else:
_logger.debug(f"Unhandled event: {json.dumps(event)}")
def _stream_bot_events(ctx: BotContext) -> Iterator[dict[str, object]]:
"""Stream events from Lichess API with type hints."""
yield from ctx.api.stream_events()
def _run_event_loop_iteration(
ctx: BotContext, game_threads: dict[str, threading.Thread]
) -> int:
"""Run one iteration of the event loop.
Returns:
New backoff value (0 on success).
"""
for event in _stream_bot_events(ctx):
_process_bot_event(event, ctx, game_threads)
return 0
def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> None:
"""Start the bot and listen for incoming events."""
logging.basicConfig(
level=getattr(logging, log_level.upper(), logging.INFO),
format="[%(asctime)s] %(levelname)s %(threadName)s: %(message)s",
)
token = os.getenv("LICHESS_TOKEN")
if not token:
msg = "LICHESS_TOKEN environment variable is required"
raise RuntimeError(msg)
_logger.info("Token present. Initializing client and engine...")
bot_version = get_and_increment_version()
_logger.info(f"Bot version: v{bot_version}")
ctx = BotContext(
api=LichessAPI(token),
engine=RandomEngine(),
bot_version=bot_version,
decline_correspondence=decline_correspondence,
)
game_threads: dict[str, threading.Thread] = {}
_logger.info("Connecting to Lichess event stream. Waiting for challenges...")
backoff = 0
while True:
try:
backoff = _run_event_loop_iteration(ctx, game_threads)
except requests.RequestException as e: # noqa: PERF203 - intentional reconnection loop
_logger.warning(f"Event stream error: {e}")
backoff = backoff_sleep(backoff)
def main() -> None:
"""Parse arguments and run the Lichess bot."""
parser = argparse.ArgumentParser(description="Run a minimal Lichess bot")
parser.add_argument(
"--log-level", default="INFO", help="Logging level (default: INFO)"
)
parser.add_argument(
"--decline-correspondence",
action="store_true",
help="Decline correspondence challenges",
)
args = parser.parse_args()
run_bot(args.log_level, decline_correspondence=args.decline_correspondence)
if __name__ == "__main__":
main()