testsAndMisc-archive/python_pkg/lichess_bot/main.py
Krzysztof kuhy Rudnicki e2d0c103ae fix(lint): BLE001 - replace blind except with specific exceptions
Replace bare 'except Exception' with specific exception types:
- ValueError for move parsing (chess.Move.from_uci, board.push_uci)
- json.JSONDecodeError for JSON parsing
- OSError for file operations
- ImportError for optional imports
- AttributeError for attribute access
- TypeError for type-related operations
- requests.RequestException for HTTP operations
- subprocess.SubprocessError for subprocess failures
- selenium.NoSuchElementException for element finding

Also fixes:
- pytest hook signature issue in conftest.py (_config -> _)
- Missing file handling in test_puzzles.py
- Line length in stockfish_analysis.py

Removes all BLE001 per-file ignores from pyproject.toml.
2025-11-30 21:37:47 +01:00

491 lines
22 KiB
Python

"""Main entry point for the Lichess bot."""
import argparse
import contextlib
import datetime
import json
import logging
import os
import subprocess
import sys
import threading
import chess
import chess.pgn
import requests
from python_pkg.lichess_bot.engine import RandomEngine
from python_pkg.lichess_bot.lichess_api import LichessAPI
from python_pkg.lichess_bot.utils import backoff_sleep, get_and_increment_version
def _apply_move_to_board(board: chess.Board, move: str, game_id: str) -> None:
"""Apply a single move to the board, logging errors.
Args:
board: The chess board to apply the move to.
move: The UCI move string.
game_id: The game ID for logging purposes.
"""
try:
board.push_uci(move)
except ValueError:
logging.debug(f"Game {game_id}: could not apply move {move}")
def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> None:
"""Start the bot and listen for incoming events."""
logging.basicConfig(
level=getattr(logging, log_level.upper(), logging.INFO),
format="[%(asctime)s] %(levelname)s %(threadName)s: %(message)s",
)
token = os.getenv("LICHESS_TOKEN")
if not token:
msg = "LICHESS_TOKEN environment variable is required"
raise RuntimeError(msg)
logging.info("Token present. Initializing client and engine...")
# Self-incrementing bot version (persisted on disk)
bot_version = get_and_increment_version()
logging.info(f"Bot version: v{bot_version}")
api = LichessAPI(token)
engine = RandomEngine()
game_threads = {}
def handle_game(game_id: str, my_color: str | None = None) -> None:
logging.info(f"Starting game thread for {game_id} [bot v{bot_version}]")
board = chess.Board()
color: str | None = my_color
# Track how many moves we have already processed;
# start at -1 so we act on the first state (0 moves)
last_handled_len = -1
# Prepare a per-game log file
game_log_path = os.path.join(os.getcwd(), f"lichess_bot_game_{game_id}.log")
try:
with open(game_log_path, "w") as lf:
lf.write(f"game {game_id} started\n")
lf.write(f"bot_version v{bot_version}\n")
except OSError:
game_log_path = None
# Simple time manager state
my_ms = None
opp_ms = None
inc_ms = 0
# Meta info for logging/PGN
game_date_iso: str | None = None
white_name: str | None = None
black_name: str | None = None
site_url: str | None = None
try:
# Only send moves on authoritative gameState events to avoid race
# conditions right after gameFull arrives.
for event in api.stream_game_events(game_id):
et = event.get("type")
if et in ("gameFull", "gameState"):
# Determine moves list and optional status
if et == "gameFull":
state = event.get("state", {})
moves = state.get("moves", "")
status = state.get("status")
# clocks are in milliseconds if present
my_ms = (
state.get("wtime")
if color == "white"
else state.get("btime")
)
opp_ms = (
state.get("btime")
if color == "white"
else state.get("wtime")
)
inc_ms = state.get("winc") or state.get("binc") or 0
# Discover my color from gameFull
white_id = event["white"].get("id")
black_id = event["black"].get("id")
white_name = event["white"].get("name") or white_id or "?"
black_name = event["black"].get("name") or black_id or "?"
# Set site and date if available
with contextlib.suppress(Exception):
# Lichess event may include 'createdAt' ms epoch
created_ms = event.get("createdAt") or event.get(
"createdAtDate"
)
if created_ms:
game_date_iso = datetime.datetime.fromtimestamp(
int(created_ms) / 1000, tz=datetime.timezone.utc
).strftime("%Y.%m.%d")
site_url = f"https://lichess.org/{game_id}"
me = api.get_my_user_id()
if me == white_id:
color = "white"
elif me == black_id:
color = "black"
logging.info(f"Game {game_id}: joined as {color} (gameFull)")
else:
moves = event.get("moves", "")
status = event.get("status")
# update clocks from gameState if present
if color == "white":
my_ms = event.get("wtime", my_ms)
opp_ms = event.get("btime", opp_ms)
inc_ms = event.get("winc", inc_ms)
elif color == "black":
my_ms = event.get("btime", my_ms)
opp_ms = event.get("wtime", opp_ms)
inc_ms = event.get("binc", inc_ms)
moves_list = moves.split() if moves else []
new_len = len(moves_list)
logging.info(
f"Game {game_id}: event={et}, moves={new_len}, color={color}"
)
if new_len == last_handled_len:
logging.debug(
f"Game {game_id}: position unchanged "
f"(len={new_len}), skipping"
)
continue
# Rebuild board from moves
board = chess.Board()
for m in moves_list:
_apply_move_to_board(board, m, game_id)
if color is None:
logging.info(
f"Game {game_id}: color unknown yet; waiting for gameFull"
)
# Do not mark this position handled on gameFull;
# wait for authoritative gameState
if et == "gameState":
last_handled_len = new_len
continue
is_white_turn = board.turn
my_turn = (is_white_turn and color == "white") or (
(not is_white_turn) and color == "black"
)
logging.info(
f"Game {game_id}: "
f"turn={'white' if is_white_turn else 'black'}, "
f"my_turn={my_turn}"
)
# Move policy:
# - Always move on 'gameState' (authoritative)
# - Also allow moving on the initial 'gameFull' when there are
# zero moves and it's our turn. This avoids stalling at game
# start when Lichess doesn't immediately send a 'gameState'
# for 0 moves.
allow_move = (et == "gameState") or (
et == "gameFull" and new_len == 0
)
if my_turn and allow_move:
# Compute a per-move time budget (seconds) based on
# remaining time.
# Heuristic: use min( max_time_sec,
# max(0.05, 0.6 * my_time_left/remaining_moves + inc) )
# Estimate remaining moves as 30 - ply/2 bounded to [10, 60]
est_moves_left = max(
10, min(60, 30 - board.fullmove_number // 2)
)
time_left_sec = (my_ms or 0) / 1000.0
inc_sec = (inc_ms or 0) / 1000.0
budget = (
0.6 * (time_left_sec / max(1, est_moves_left))
+ 0.5 * inc_sec
)
# Spend more time per move (requested): double the budget
budget *= 2.0
# Keep within reasonable bounds
budget = max(0.05, min(engine.max_time_sec, budget))
move, reason = engine.choose_move_with_explanation(
board, time_budget_sec=budget
)
if move is None:
logging.info(
f"Game {game_id}: no legal moves (game likely over)"
)
break
try:
# Double-check legality just before sending
# to avoid 400s when state changed.
if move not in board.legal_moves:
logging.info(
f"Game {game_id}: selected move "
"no longer legal; skipping send"
)
else:
logging.info(
f"Game {game_id}: playing {move.uci()} "
f"(budget={budget:.2f}s, "
f"my_time_left={time_left_sec:.1f}s, "
f"inc={inc_sec:.2f}s)"
)
if game_log_path:
with open(game_log_path, "a") as lf:
lf.write(
f"ply {last_handled_len + 1}: "
f"{move.uci()}\n{reason}\n\n"
)
api.make_move(game_id, move)
except requests.RequestException as e:
logging.warning(
f"Game {game_id}: move {move.uci()} failed: {e}"
)
# Mark this position as handled on authoritative
# gameState, or after we've actually attempted a move
# (including the first move on gameFull len=0).
if et == "gameState" or (my_turn and allow_move):
last_handled_len = new_len
if status in {"mate", "resign", "stalemate", "timeout", "draw"}:
logging.info(f"Game {game_id} finished: {status}")
break
elif et in {"chatLine", "opponentGone"}:
continue
except requests.RequestException:
logging.exception(f"Game {game_id} thread error")
finally:
# On game end, write full PGN to the log file
try:
if game_log_path:
game = chess.pgn.Game.from_board(board)
# Record the bot version in the PGN headers
with contextlib.suppress(Exception):
game.headers["BotVersion"] = f"v{bot_version}"
if site_url:
game.headers["Site"] = site_url
if game_date_iso:
game.headers["Date"] = game_date_iso
if white_name:
game.headers["White"] = white_name
if black_name:
game.headers["Black"] = black_name
with open(game_log_path, "a") as lf:
lf.write("\nPGN:\n")
exporter = chess.pgn.StringExporter(
headers=True, variations=False, comments=False
)
lf.write(game.accept(exporter))
lf.write("\n")
# After PGN is written, run analysis and save it
# to the same file (inserted before PGN)
if game_log_path:
analysis_text: str | None = None
try:
analyze_script = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
"stockfish_analysis",
"analyze_chess_game.py",
)
if os.path.isfile(analyze_script):
# Estimate total plies from the final board
try:
total_plies = len(board.move_stack)
except TypeError:
total_plies = 0
logging.info(
f"Game {game_id}: starting post-game "
f"analysis ({total_plies} plies)"
)
# Run analyzer unbuffered and stream output for progress
proc = subprocess.Popen(
[sys.executable, "-u", analyze_script, game_log_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
analyzed = 0
lines: list[str] = []
ply_line_re = __import__("re").compile(r"^\s*(\d+)\s")
# Read stdout line by line
# stdout/stderr are guaranteed non-None with PIPE
assert proc.stdout is not None # noqa: S101
assert proc.stderr is not None # noqa: S101
for line in proc.stdout:
lines.append(line)
m = ply_line_re.match(line)
if m:
# Count as one analyzed ply
analyzed += 1
left = (
max(0, (total_plies or 0) - analyzed)
if total_plies
else "?"
)
if total_plies:
pct = analyzed / total_plies * 100.0
logging.info(
f"Game {game_id}: analysis progress "
f"{analyzed}/{total_plies} "
f"({pct:.0f}%), left {left}"
)
else:
logging.info(
f"Game {game_id}: analysis progress "
f"{analyzed} plies (total unknown)"
)
# Capture any remaining stderr and ensure process ends
stderr_text = proc.stderr.read() or ""
ret = proc.wait()
analysis_text = "".join(lines)
if ret != 0:
logging.warning(
f"Game {game_id}: analysis script "
f"exited with code {ret}"
)
if stderr_text:
analysis_text += "\n[stderr]\n" + stderr_text
logging.info(f"Game {game_id}: analysis complete")
else:
logging.info(
f"Game {game_id}: analysis script not found "
f"at {analyze_script}; skipping analysis"
)
except (subprocess.SubprocessError, OSError) as e:
logging.debug(f"Game {game_id}: analysis run failed: {e}")
# Insert analysis before the PGN section so future runs
# can still parse PGN cleanly
if analysis_text:
try:
with open(
game_log_path, encoding="utf-8", errors="replace"
) as f:
content = f.read()
# Find the start of the 'PGN:' line
insert_idx = 0
p = content.find("\nPGN:\n")
if p != -1:
insert_idx = (
p + 1
) # start of the line after the preceding newline
elif content.startswith("PGN:\n"):
insert_idx = 0
else:
# If PGN marker not found (unexpected), append at end
insert_idx = len(content)
# Prepend meta information block for easier parsing later
meta_lines = []
if game_date_iso:
meta_lines.append(f"Date: {game_date_iso}")
if white_name or black_name:
meta_lines.append(
f"Players: {white_name or '?'} "
f"vs {black_name or '?'}"
)
if meta_lines:
meta_block = "\n".join(meta_lines) + "\n"
else:
meta_block = ""
analysis_block = (
(meta_block if meta_block else "")
+ "ANALYSIS:\n"
+ analysis_text.rstrip()
+ "\n\n"
)
new_content = (
content[:insert_idx]
+ analysis_block
+ content[insert_idx:]
)
with open(game_log_path, "w", encoding="utf-8") as f:
f.write(new_content)
except OSError as e:
logging.debug(
f"Game {game_id}: could not write analysis to log: {e}"
)
except OSError as e:
logging.debug(f"Game {game_id}: could not write PGN: {e}")
logging.info(f"Ending game thread for {game_id}")
def _process_event_stream() -> None:
"""Process events from the Lichess event stream.
Handles challenges and game start/finish events.
"""
for event in api.stream_events():
if event.get("type") == "challenge":
challenge = event["challenge"]
ch_id = challenge["id"]
variant = challenge.get("variant", {}).get("key", "standard")
speed = challenge.get("speed")
perf_ok = speed in {"bullet", "blitz", "rapid", "classical"}
not_corr = (
challenge.get("speed") != "correspondence"
or not decline_correspondence
)
if variant == "standard" and perf_ok and not_corr:
logging.info(f"Accepting challenge {ch_id} ({speed})")
api.accept_challenge(ch_id)
else:
logging.info(
f"Declining challenge {ch_id} "
f"(variant={variant}, speed={speed})"
)
api.decline_challenge(ch_id)
elif event.get("type") == "gameStart":
game_id = event["game"]["id"]
# Spin up a game thread
if game_id not in game_threads or not game_threads[game_id].is_alive():
t = threading.Thread(
target=handle_game, args=(game_id,), name=f"game-{game_id}"
)
t.daemon = True
game_threads[game_id] = t
t.start()
elif event.get("type") == "gameFinish":
game_id = event["game"]["id"]
logging.info(f"Game finished event: {game_id}")
else:
logging.debug(f"Unhandled event: {json.dumps(event)}")
def _run_event_loop_iteration() -> int:
"""Run one iteration of the event loop with error handling.
Returns:
New backoff value (0 on success, increased on error).
"""
try:
_process_event_stream()
except requests.RequestException as e:
logging.warning(f"Event stream error: {e}")
return backoff_sleep(backoff)
else:
# If stream ends normally, reset backoff
return 0
# Main event stream: challenge and game start events
logging.info("Connecting to Lichess event stream. Waiting for challenges...")
backoff = 0
while True:
backoff = _run_event_loop_iteration()
def main() -> None:
"""Parse arguments and run the Lichess bot."""
parser = argparse.ArgumentParser(description="Run a minimal Lichess bot")
parser.add_argument(
"--log-level", default="INFO", help="Logging level (default: INFO)"
)
parser.add_argument(
"--decline-correspondence",
action="store_true",
help="Decline correspondence challenges",
)
args = parser.parse_args()
run_bot(args.log_level, decline_correspondence=args.decline_correspondence)
if __name__ == "__main__":
main()