testsAndMisc/python_pkg/lichess_bot/main.py

647 lines
20 KiB
Python
Raw Normal View History

"""Main entry point for the Lichess bot."""
from __future__ import annotations
2025-08-22 16:49:30 +02:00
import argparse
import contextlib
from dataclasses import dataclass, field
import datetime
2025-08-22 16:49:30 +02:00
import json
import logging
import os
from pathlib import Path
import re
import subprocess
import sys
2025-08-22 16:49:30 +02:00
import threading
from typing import TYPE_CHECKING
2025-08-22 16:49:30 +02:00
2025-08-22 17:04:42 +02:00
import chess
2025-08-22 17:26:17 +02:00
import chess.pgn
import requests
2025-08-22 17:04:42 +02:00
from python_pkg.lichess_bot.engine import RandomEngine
from python_pkg.lichess_bot.lichess_api import LichessAPI
from python_pkg.lichess_bot.utils import backoff_sleep, get_and_increment_version
2025-08-22 16:49:30 +02:00
if TYPE_CHECKING:
from collections.abc import Iterator
_logger = logging.getLogger(__name__)
# Regex for parsing ply numbers from analysis output
_PLY_LINE_RE = re.compile(r"^\s*(\d+)\s")
2025-08-22 16:49:30 +02:00
# Game end statuses
_GAME_END_STATUSES = frozenset({"mate", "resign", "stalemate", "timeout", "draw"})
@dataclass
class GameMeta:
"""Metadata for a game (for PGN headers and logging)."""
game_id: str
bot_version: int
site_url: str | None = None
date_iso: str | None = None
white_name: str | None = None
black_name: str | None = None
@dataclass
class GameState:
"""Mutable state for an ongoing game."""
board: chess.Board = field(default_factory=chess.Board)
color: str | None = None
last_handled_len: int = -1
my_ms: int | None = None
opp_ms: int | None = None
inc_ms: int = 0
log_path: Path | None = None
@dataclass
class BotContext:
"""Shared context for bot operations."""
api: LichessAPI
engine: RandomEngine
bot_version: int
decline_correspondence: bool = False
def _apply_move_to_board(board: chess.Board, move: str, game_id: str) -> None:
"""Apply a single move to the board, logging errors."""
try:
board.push_uci(move)
except ValueError:
_logger.debug(f"Game {game_id}: could not apply move {move}")
def _init_game_log(game_id: str, bot_version: int) -> Path | None:
"""Initialize the game log file."""
game_log_path = Path.cwd() / f"lichess_bot_game_{game_id}.log"
try:
with open(game_log_path, "w") as lf:
lf.write(f"game {game_id} started\n")
lf.write(f"bot_version v{bot_version}\n")
except OSError:
return None
return game_log_path
def _extract_game_full_data(
event: dict[str, object],
state: GameState,
meta: GameMeta,
api: LichessAPI,
) -> tuple[str, str | None]:
"""Extract data from a gameFull event.
Returns:
Tuple of (moves_string, status).
"""
state_data = event.get("state", {})
if not isinstance(state_data, dict):
state_data = {}
moves = str(state_data.get("moves", ""))
status = state_data.get("status")
# Update clocks - values are int milliseconds from API
wtime = state_data.get("wtime")
btime = state_data.get("btime")
if state.color == "white":
state.my_ms = int(wtime) if isinstance(wtime, int | float) else None
state.opp_ms = int(btime) if isinstance(btime, int | float) else None
else:
state.my_ms = int(btime) if isinstance(btime, int | float) else None
state.opp_ms = int(wtime) if isinstance(wtime, int | float) else None
inc = state_data.get("winc") or state_data.get("binc")
state.inc_ms = int(inc) if isinstance(inc, int | float) else 0
# Extract player info
white_data = event.get("white", {})
black_data = event.get("black", {})
if isinstance(white_data, dict) and isinstance(black_data, dict):
white_id = white_data.get("id")
black_id = black_data.get("id")
meta.white_name = str(white_data.get("name") or white_id or "?")
meta.black_name = str(black_data.get("name") or black_id or "?")
# Determine color
me = api.get_my_user_id()
if me == white_id:
state.color = "white"
elif me == black_id:
state.color = "black"
# Extract date
with contextlib.suppress(Exception):
created_ms = event.get("createdAt") or event.get("createdAtDate")
if created_ms is not None:
meta.date_iso = datetime.datetime.fromtimestamp(
int(str(created_ms)) / 1000,
tz=datetime.timezone.utc,
).strftime("%Y.%m.%d")
meta.site_url = f"https://lichess.org/{meta.game_id}"
return moves, str(status) if status else None
def _extract_game_state_data(
event: dict[str, object], state: GameState
) -> tuple[str, str | None]:
"""Extract data from a gameState event.
Returns:
Tuple of (moves_string, status).
"""
moves = str(event.get("moves", ""))
status = event.get("status")
# Update clocks based on color
if state.color == "white":
state.my_ms = event.get("wtime", state.my_ms) # type: ignore[assignment]
state.opp_ms = event.get("btime", state.opp_ms) # type: ignore[assignment]
state.inc_ms = event.get("winc", state.inc_ms) # type: ignore[assignment]
elif state.color == "black":
state.my_ms = event.get("btime", state.my_ms) # type: ignore[assignment]
state.opp_ms = event.get("wtime", state.opp_ms) # type: ignore[assignment]
state.inc_ms = event.get("binc", state.inc_ms) # type: ignore[assignment]
return moves, str(status) if status else None
def _calculate_time_budget(
state: GameState, board: chess.Board, max_time_sec: float
) -> float:
"""Calculate time budget for the next move."""
est_moves_left = max(10, min(60, 30 - board.fullmove_number // 2))
time_left_sec = (state.my_ms or 0) / 1000.0
inc_sec = (state.inc_ms or 0) / 1000.0
budget = 0.6 * (time_left_sec / max(1, est_moves_left)) + 0.5 * inc_sec
# Double the budget for more thoughtful moves
budget *= 2.0
return max(0.05, min(max_time_sec, budget))
def _log_move_to_file(
log_path: Path | None, ply: int, move: chess.Move, reason: str
) -> None:
"""Log a move to the game log file."""
if log_path:
with open(log_path, "a") as lf:
lf.write(f"ply {ply}: {move.uci()}\n{reason}\n\n")
def _attempt_move(
ctx: BotContext,
state: GameState,
meta: GameMeta,
board: chess.Board,
) -> bool:
"""Attempt to make a move. Returns True if game should continue."""
budget = _calculate_time_budget(state, board, ctx.engine.max_time_sec)
move, reason = ctx.engine.choose_move_with_explanation(
board, time_budget_sec=budget
)
if move is None:
_logger.info(f"Game {meta.game_id}: no legal moves (game likely over)")
return False
time_left_sec = (state.my_ms or 0) / 1000.0
inc_sec = (state.inc_ms or 0) / 1000.0
try:
if move not in board.legal_moves:
_logger.info(
f"Game {meta.game_id}: selected move no longer legal; skipping send"
)
else:
_logger.info(
f"Game {meta.game_id}: playing {move.uci()} "
f"(budget={budget:.2f}s, my_time_left={time_left_sec:.1f}s, "
f"inc={inc_sec:.2f}s)"
)
_log_move_to_file(state.log_path, state.last_handled_len + 1, move, reason)
ctx.api.make_move(meta.game_id, move)
except requests.RequestException as e:
_logger.warning(f"Game {meta.game_id}: move {move.uci()} failed: {e}")
return True
def _is_my_turn(board: chess.Board, color: str | None) -> bool:
"""Check if it's our turn to move."""
is_white_turn = board.turn
return (is_white_turn and color == "white") or (
(not is_white_turn) and color == "black"
)
def _rebuild_board_from_moves(moves_list: list[str], game_id: str) -> chess.Board:
"""Rebuild board from list of moves."""
board = chess.Board()
for m in moves_list:
_apply_move_to_board(board, m, game_id)
return board
def _handle_move_if_needed(
ctx: BotContext,
state: GameState,
meta: GameMeta,
et: str,
new_len: int,
) -> bool:
"""Handle making a move if it's our turn. Returns False if game ends."""
my_turn = _is_my_turn(state.board, state.color)
turn_str = "white" if state.board.turn else "black"
_logger.info(f"Game {meta.game_id}: turn={turn_str}, my_turn={my_turn}")
# Move policy
allow_move = (et == "gameState") or (et == "gameFull" and new_len == 0)
if my_turn and allow_move and not _attempt_move(ctx, state, meta, state.board):
return False
# Mark position as handled
if et == "gameState" or (my_turn and allow_move):
state.last_handled_len = new_len
return True
def _process_game_event(
event: dict[str, object],
ctx: BotContext,
state: GameState,
meta: GameMeta,
) -> bool:
"""Process a single game event. Returns False if game should end."""
et = event.get("type")
if et not in ("gameFull", "gameState"):
return True # Continue processing other events
# At this point et is guaranteed to be a string
event_type = str(et)
# Extract moves and status based on event type
if event_type == "gameFull":
moves, status = _extract_game_full_data(event, state, meta, ctx.api)
_logger.info(f"Game {meta.game_id}: joined as {state.color} (gameFull)")
else:
moves, status = _extract_game_state_data(event, state)
moves_list = moves.split() if moves else []
new_len = len(moves_list)
_logger.info(
f"Game {meta.game_id}: event={event_type}, moves={new_len}, color={state.color}"
)
if new_len == state.last_handled_len:
_logger.debug(
f"Game {meta.game_id}: position unchanged (len={new_len}), skipping"
)
return True
# Rebuild board from moves
state.board = _rebuild_board_from_moves(moves_list, meta.game_id)
if state.color is None:
_logger.info(f"Game {meta.game_id}: color unknown yet; waiting for gameFull")
if event_type == "gameState":
state.last_handled_len = new_len
return True
if not _handle_move_if_needed(ctx, state, meta, event_type, new_len):
return False
# Check for game end
if status in _GAME_END_STATUSES:
_logger.info(f"Game {meta.game_id} finished: {status}")
return False
return True
def _write_pgn_to_log(log_path: Path, board: chess.Board, meta: GameMeta) -> None:
"""Write PGN to the game log file."""
game = chess.pgn.Game.from_board(board)
with contextlib.suppress(Exception):
game.headers["BotVersion"] = f"v{meta.bot_version}"
if meta.site_url:
game.headers["Site"] = meta.site_url
if meta.date_iso:
game.headers["Date"] = meta.date_iso
if meta.white_name:
game.headers["White"] = meta.white_name
if meta.black_name:
game.headers["Black"] = meta.black_name
with open(log_path, "a") as lf:
lf.write("\nPGN:\n")
exporter = chess.pgn.StringExporter(
headers=True, variations=False, comments=False
)
lf.write(game.accept(exporter))
lf.write("\n")
def _run_analysis_subprocess(
game_id: str, log_path: Path, total_plies: int
) -> str | None:
"""Run the analysis script and return output text."""
analyze_script = (
Path(__file__).resolve().parent.parent
/ "stockfish_analysis"
/ "analyze_chess_game.py"
)
if not analyze_script.is_file():
_logger.info(
f"Game {game_id}: analysis script not found at {analyze_script}; "
"skipping analysis"
)
return None
_logger.info(f"Game {game_id}: starting post-game analysis ({total_plies} plies)")
proc = subprocess.Popen( # noqa: S603 - trusted internal analysis script
[sys.executable, "-u", str(analyze_script), str(log_path)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
analyzed = 0
lines: list[str] = []
# stdout/stderr are guaranteed non-None with PIPE
assert proc.stdout is not None # noqa: S101
assert proc.stderr is not None # noqa: S101
for line in proc.stdout:
lines.append(line)
m = _PLY_LINE_RE.match(line)
if m:
analyzed += 1
_log_analysis_progress(game_id, analyzed, total_plies)
stderr_text = proc.stderr.read() or ""
ret = proc.wait()
analysis_text = "".join(lines)
if ret != 0:
_logger.warning(f"Game {game_id}: analysis script exited with code {ret}")
if stderr_text:
analysis_text += "\n[stderr]\n" + stderr_text
_logger.info(f"Game {game_id}: analysis complete")
return analysis_text
def _log_analysis_progress(game_id: str, analyzed: int, total_plies: int) -> None:
"""Log analysis progress."""
if total_plies:
left = max(0, total_plies - analyzed)
pct = analyzed / total_plies * 100.0
_logger.info(
f"Game {game_id}: analysis progress "
f"{analyzed}/{total_plies} ({pct:.0f}%), left {left}"
)
else:
_logger.info(
f"Game {game_id}: analysis progress {analyzed} plies (total unknown)"
)
def _insert_analysis_into_log(
log_path: Path, analysis_text: str, meta: GameMeta
) -> None:
"""Insert analysis text into the log file before PGN section."""
try:
with open(log_path, encoding="utf-8", errors="replace") as f:
content = f.read()
# Find insertion point (before PGN)
insert_idx = 0
p = content.find("\nPGN:\n")
if p != -1:
insert_idx = p + 1
elif content.startswith("PGN:\n"):
insert_idx = 0
else:
insert_idx = len(content)
# Build meta block
meta_lines = []
if meta.date_iso:
meta_lines.append(f"Date: {meta.date_iso}")
if meta.white_name or meta.black_name:
meta_lines.append(
f"Players: {meta.white_name or '?'} vs {meta.black_name or '?'}"
)
meta_block = "\n".join(meta_lines) + "\n" if meta_lines else ""
analysis_block = f"{meta_block}ANALYSIS:\n{analysis_text.rstrip()}\n\n"
new_content = content[:insert_idx] + analysis_block + content[insert_idx:]
with open(log_path, "w", encoding="utf-8") as f:
f.write(new_content)
except OSError as e:
_logger.debug(f"Game {meta.game_id}: could not write analysis to log: {e}")
def _finalize_game(state: GameState, meta: GameMeta) -> None:
"""Finalize game: write PGN and run analysis."""
if not state.log_path:
return
try:
_write_pgn_to_log(state.log_path, state.board, meta)
except OSError as e:
_logger.debug(f"Game {meta.game_id}: could not write PGN: {e}")
return
# Run analysis
try:
total_plies = len(state.board.move_stack)
except TypeError:
total_plies = 0
try:
analysis_text = _run_analysis_subprocess(
meta.game_id, state.log_path, total_plies
)
if analysis_text:
_insert_analysis_into_log(state.log_path, analysis_text, meta)
except (subprocess.SubprocessError, OSError) as e:
_logger.debug(f"Game {meta.game_id}: analysis run failed: {e}")
def _handle_game(game_id: str, ctx: BotContext, my_color: str | None = None) -> None:
"""Handle a single game from start to finish."""
_logger.info(f"Starting game thread for {game_id} [bot v{ctx.bot_version}]")
meta = GameMeta(game_id=game_id, bot_version=ctx.bot_version)
state = GameState(color=my_color)
state.log_path = _init_game_log(game_id, ctx.bot_version)
try:
for event in ctx.api.stream_game_events(game_id):
et = event.get("type")
if et in ("chatLine", "opponentGone"):
continue
if not _process_game_event(event, ctx, state, meta):
break
except requests.RequestException:
_logger.exception(f"Game {game_id} thread error")
finally:
_finalize_game(state, meta)
_logger.info(f"Ending game thread for {game_id}")
def _handle_challenge(
challenge: dict[str, object], api: LichessAPI, *, decline_correspondence: bool
) -> None:
"""Handle an incoming challenge."""
ch_id = challenge.get("id", "")
variant_data = challenge.get("variant", {})
variant = (
variant_data.get("key", "standard")
if isinstance(variant_data, dict)
else "standard"
)
speed = challenge.get("speed")
perf_ok = speed in {"bullet", "blitz", "rapid", "classical"}
not_corr = speed != "correspondence" or not decline_correspondence
if variant == "standard" and perf_ok and not_corr:
_logger.info(f"Accepting challenge {ch_id} ({speed})")
api.accept_challenge(str(ch_id))
else:
_logger.info(f"Declining challenge {ch_id} (variant={variant}, speed={speed})")
api.decline_challenge(str(ch_id))
def _process_bot_event(
event: dict[str, object],
ctx: BotContext,
game_threads: dict[str, threading.Thread],
) -> None:
"""Process a single bot event (challenge, gameStart, etc.)."""
event_type = event.get("type")
if event_type == "challenge":
challenge = event.get("challenge", {})
if isinstance(challenge, dict):
_handle_challenge(
challenge, ctx.api, decline_correspondence=ctx.decline_correspondence
)
elif event_type == "gameStart":
game_data = event.get("game", {})
if isinstance(game_data, dict):
game_id = str(game_data.get("id", ""))
if game_id and (
game_id not in game_threads or not game_threads[game_id].is_alive()
):
t = threading.Thread(
target=_handle_game,
args=(game_id, ctx),
name=f"game-{game_id}",
)
t.daemon = True
game_threads[game_id] = t
t.start()
elif event_type == "gameFinish":
game_data = event.get("game", {})
if isinstance(game_data, dict):
game_id = game_data.get("id", "")
_logger.info(f"Game finished event: {game_id}")
else:
_logger.debug(f"Unhandled event: {json.dumps(event)}")
def _stream_bot_events(ctx: BotContext) -> Iterator[dict[str, object]]:
"""Stream events from Lichess API with type hints."""
yield from ctx.api.stream_events()
def _run_event_loop_iteration(
ctx: BotContext, game_threads: dict[str, threading.Thread]
) -> int:
"""Run one iteration of the event loop.
Returns:
New backoff value (0 on success).
"""
for event in _stream_bot_events(ctx):
_process_bot_event(event, ctx, game_threads)
return 0
def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> None:
"""Start the bot and listen for incoming events."""
2025-08-22 16:49:30 +02:00
logging.basicConfig(
level=getattr(logging, log_level.upper(), logging.INFO),
format="[%(asctime)s] %(levelname)s %(threadName)s: %(message)s",
)
token = os.getenv("LICHESS_TOKEN")
if not token:
msg = "LICHESS_TOKEN environment variable is required"
raise RuntimeError(msg)
2025-08-22 16:49:30 +02:00
_logger.info("Token present. Initializing client and engine...")
2025-08-22 20:22:06 +02:00
bot_version = get_and_increment_version()
_logger.info(f"Bot version: v{bot_version}")
ctx = BotContext(
api=LichessAPI(token),
engine=RandomEngine(),
bot_version=bot_version,
decline_correspondence=decline_correspondence,
)
game_threads: dict[str, threading.Thread] = {}
_logger.info("Connecting to Lichess event stream. Waiting for challenges...")
backoff = 0
while True:
try:
backoff = _run_event_loop_iteration(ctx, game_threads)
except requests.RequestException as e: # noqa: PERF203 - intentional reconnection loop
_logger.warning(f"Event stream error: {e}")
backoff = backoff_sleep(backoff)
2025-08-22 16:49:30 +02:00
def main() -> None:
"""Parse arguments and run the Lichess bot."""
2025-08-22 16:49:30 +02:00
parser = argparse.ArgumentParser(description="Run a minimal Lichess bot")
parser.add_argument(
"--log-level", default="INFO", help="Logging level (default: INFO)"
)
parser.add_argument(
"--decline-correspondence",
action="store_true",
help="Decline correspondence challenges",
)
2025-08-22 16:49:30 +02:00
args = parser.parse_args()
run_bot(args.log_level, decline_correspondence=args.decline_correspondence)
2025-08-22 16:49:30 +02:00
if __name__ == "__main__":
main()