mirror of
https://github.com/kuhyx/testsAndMisc.git
synced 2026-07-04 22:23:02 +02:00
Replace bare 'except Exception' with specific exception types: - ValueError for move parsing (chess.Move.from_uci, board.push_uci) - json.JSONDecodeError for JSON parsing - OSError for file operations - ImportError for optional imports - AttributeError for attribute access - TypeError for type-related operations - requests.RequestException for HTTP operations - subprocess.SubprocessError for subprocess failures - selenium.NoSuchElementException for element finding Also fixes: - pytest hook signature issue in conftest.py (_config -> _) - Missing file handling in test_puzzles.py - Line length in stockfish_analysis.py Removes all BLE001 per-file ignores from pyproject.toml.
491 lines
22 KiB
Python
491 lines
22 KiB
Python
"""Main entry point for the Lichess bot."""
|
|
|
|
import argparse
|
|
import contextlib
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
|
|
import chess
|
|
import chess.pgn
|
|
import requests
|
|
|
|
from python_pkg.lichess_bot.engine import RandomEngine
|
|
from python_pkg.lichess_bot.lichess_api import LichessAPI
|
|
from python_pkg.lichess_bot.utils import backoff_sleep, get_and_increment_version
|
|
|
|
|
|
def _apply_move_to_board(board: chess.Board, move: str, game_id: str) -> None:
|
|
"""Apply a single move to the board, logging errors.
|
|
|
|
Args:
|
|
board: The chess board to apply the move to.
|
|
move: The UCI move string.
|
|
game_id: The game ID for logging purposes.
|
|
"""
|
|
try:
|
|
board.push_uci(move)
|
|
except ValueError:
|
|
logging.debug(f"Game {game_id}: could not apply move {move}")
|
|
|
|
|
|
def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> None:
|
|
"""Start the bot and listen for incoming events."""
|
|
logging.basicConfig(
|
|
level=getattr(logging, log_level.upper(), logging.INFO),
|
|
format="[%(asctime)s] %(levelname)s %(threadName)s: %(message)s",
|
|
)
|
|
|
|
token = os.getenv("LICHESS_TOKEN")
|
|
if not token:
|
|
msg = "LICHESS_TOKEN environment variable is required"
|
|
raise RuntimeError(msg)
|
|
|
|
logging.info("Token present. Initializing client and engine...")
|
|
# Self-incrementing bot version (persisted on disk)
|
|
bot_version = get_and_increment_version()
|
|
logging.info(f"Bot version: v{bot_version}")
|
|
api = LichessAPI(token)
|
|
engine = RandomEngine()
|
|
|
|
game_threads = {}
|
|
|
|
def handle_game(game_id: str, my_color: str | None = None) -> None:
|
|
logging.info(f"Starting game thread for {game_id} [bot v{bot_version}]")
|
|
board = chess.Board()
|
|
color: str | None = my_color
|
|
# Track how many moves we have already processed;
|
|
# start at -1 so we act on the first state (0 moves)
|
|
last_handled_len = -1
|
|
# Prepare a per-game log file
|
|
game_log_path = os.path.join(os.getcwd(), f"lichess_bot_game_{game_id}.log")
|
|
try:
|
|
with open(game_log_path, "w") as lf:
|
|
lf.write(f"game {game_id} started\n")
|
|
lf.write(f"bot_version v{bot_version}\n")
|
|
except OSError:
|
|
game_log_path = None
|
|
# Simple time manager state
|
|
my_ms = None
|
|
opp_ms = None
|
|
inc_ms = 0
|
|
# Meta info for logging/PGN
|
|
game_date_iso: str | None = None
|
|
white_name: str | None = None
|
|
black_name: str | None = None
|
|
site_url: str | None = None
|
|
try:
|
|
# Only send moves on authoritative gameState events to avoid race
|
|
# conditions right after gameFull arrives.
|
|
for event in api.stream_game_events(game_id):
|
|
et = event.get("type")
|
|
if et in ("gameFull", "gameState"):
|
|
# Determine moves list and optional status
|
|
if et == "gameFull":
|
|
state = event.get("state", {})
|
|
moves = state.get("moves", "")
|
|
status = state.get("status")
|
|
# clocks are in milliseconds if present
|
|
my_ms = (
|
|
state.get("wtime")
|
|
if color == "white"
|
|
else state.get("btime")
|
|
)
|
|
opp_ms = (
|
|
state.get("btime")
|
|
if color == "white"
|
|
else state.get("wtime")
|
|
)
|
|
inc_ms = state.get("winc") or state.get("binc") or 0
|
|
# Discover my color from gameFull
|
|
white_id = event["white"].get("id")
|
|
black_id = event["black"].get("id")
|
|
white_name = event["white"].get("name") or white_id or "?"
|
|
black_name = event["black"].get("name") or black_id or "?"
|
|
# Set site and date if available
|
|
with contextlib.suppress(Exception):
|
|
# Lichess event may include 'createdAt' ms epoch
|
|
created_ms = event.get("createdAt") or event.get(
|
|
"createdAtDate"
|
|
)
|
|
if created_ms:
|
|
game_date_iso = datetime.datetime.fromtimestamp(
|
|
int(created_ms) / 1000, tz=datetime.timezone.utc
|
|
).strftime("%Y.%m.%d")
|
|
site_url = f"https://lichess.org/{game_id}"
|
|
me = api.get_my_user_id()
|
|
if me == white_id:
|
|
color = "white"
|
|
elif me == black_id:
|
|
color = "black"
|
|
logging.info(f"Game {game_id}: joined as {color} (gameFull)")
|
|
else:
|
|
moves = event.get("moves", "")
|
|
status = event.get("status")
|
|
# update clocks from gameState if present
|
|
if color == "white":
|
|
my_ms = event.get("wtime", my_ms)
|
|
opp_ms = event.get("btime", opp_ms)
|
|
inc_ms = event.get("winc", inc_ms)
|
|
elif color == "black":
|
|
my_ms = event.get("btime", my_ms)
|
|
opp_ms = event.get("wtime", opp_ms)
|
|
inc_ms = event.get("binc", inc_ms)
|
|
|
|
moves_list = moves.split() if moves else []
|
|
new_len = len(moves_list)
|
|
logging.info(
|
|
f"Game {game_id}: event={et}, moves={new_len}, color={color}"
|
|
)
|
|
if new_len == last_handled_len:
|
|
logging.debug(
|
|
f"Game {game_id}: position unchanged "
|
|
f"(len={new_len}), skipping"
|
|
)
|
|
continue
|
|
|
|
# Rebuild board from moves
|
|
board = chess.Board()
|
|
for m in moves_list:
|
|
_apply_move_to_board(board, m, game_id)
|
|
|
|
if color is None:
|
|
logging.info(
|
|
f"Game {game_id}: color unknown yet; waiting for gameFull"
|
|
)
|
|
# Do not mark this position handled on gameFull;
|
|
# wait for authoritative gameState
|
|
if et == "gameState":
|
|
last_handled_len = new_len
|
|
continue
|
|
|
|
is_white_turn = board.turn
|
|
my_turn = (is_white_turn and color == "white") or (
|
|
(not is_white_turn) and color == "black"
|
|
)
|
|
logging.info(
|
|
f"Game {game_id}: "
|
|
f"turn={'white' if is_white_turn else 'black'}, "
|
|
f"my_turn={my_turn}"
|
|
)
|
|
# Move policy:
|
|
# - Always move on 'gameState' (authoritative)
|
|
# - Also allow moving on the initial 'gameFull' when there are
|
|
# zero moves and it's our turn. This avoids stalling at game
|
|
# start when Lichess doesn't immediately send a 'gameState'
|
|
# for 0 moves.
|
|
allow_move = (et == "gameState") or (
|
|
et == "gameFull" and new_len == 0
|
|
)
|
|
if my_turn and allow_move:
|
|
# Compute a per-move time budget (seconds) based on
|
|
# remaining time.
|
|
# Heuristic: use min( max_time_sec,
|
|
# max(0.05, 0.6 * my_time_left/remaining_moves + inc) )
|
|
# Estimate remaining moves as 30 - ply/2 bounded to [10, 60]
|
|
est_moves_left = max(
|
|
10, min(60, 30 - board.fullmove_number // 2)
|
|
)
|
|
time_left_sec = (my_ms or 0) / 1000.0
|
|
inc_sec = (inc_ms or 0) / 1000.0
|
|
budget = (
|
|
0.6 * (time_left_sec / max(1, est_moves_left))
|
|
+ 0.5 * inc_sec
|
|
)
|
|
# Spend more time per move (requested): double the budget
|
|
budget *= 2.0
|
|
# Keep within reasonable bounds
|
|
budget = max(0.05, min(engine.max_time_sec, budget))
|
|
move, reason = engine.choose_move_with_explanation(
|
|
board, time_budget_sec=budget
|
|
)
|
|
if move is None:
|
|
logging.info(
|
|
f"Game {game_id}: no legal moves (game likely over)"
|
|
)
|
|
break
|
|
try:
|
|
# Double-check legality just before sending
|
|
# to avoid 400s when state changed.
|
|
if move not in board.legal_moves:
|
|
logging.info(
|
|
f"Game {game_id}: selected move "
|
|
"no longer legal; skipping send"
|
|
)
|
|
else:
|
|
logging.info(
|
|
f"Game {game_id}: playing {move.uci()} "
|
|
f"(budget={budget:.2f}s, "
|
|
f"my_time_left={time_left_sec:.1f}s, "
|
|
f"inc={inc_sec:.2f}s)"
|
|
)
|
|
if game_log_path:
|
|
with open(game_log_path, "a") as lf:
|
|
lf.write(
|
|
f"ply {last_handled_len + 1}: "
|
|
f"{move.uci()}\n{reason}\n\n"
|
|
)
|
|
api.make_move(game_id, move)
|
|
except requests.RequestException as e:
|
|
logging.warning(
|
|
f"Game {game_id}: move {move.uci()} failed: {e}"
|
|
)
|
|
# Mark this position as handled on authoritative
|
|
# gameState, or after we've actually attempted a move
|
|
# (including the first move on gameFull len=0).
|
|
if et == "gameState" or (my_turn and allow_move):
|
|
last_handled_len = new_len
|
|
if status in {"mate", "resign", "stalemate", "timeout", "draw"}:
|
|
logging.info(f"Game {game_id} finished: {status}")
|
|
break
|
|
elif et in {"chatLine", "opponentGone"}:
|
|
continue
|
|
except requests.RequestException:
|
|
logging.exception(f"Game {game_id} thread error")
|
|
finally:
|
|
# On game end, write full PGN to the log file
|
|
try:
|
|
if game_log_path:
|
|
game = chess.pgn.Game.from_board(board)
|
|
# Record the bot version in the PGN headers
|
|
with contextlib.suppress(Exception):
|
|
game.headers["BotVersion"] = f"v{bot_version}"
|
|
if site_url:
|
|
game.headers["Site"] = site_url
|
|
if game_date_iso:
|
|
game.headers["Date"] = game_date_iso
|
|
if white_name:
|
|
game.headers["White"] = white_name
|
|
if black_name:
|
|
game.headers["Black"] = black_name
|
|
with open(game_log_path, "a") as lf:
|
|
lf.write("\nPGN:\n")
|
|
exporter = chess.pgn.StringExporter(
|
|
headers=True, variations=False, comments=False
|
|
)
|
|
lf.write(game.accept(exporter))
|
|
lf.write("\n")
|
|
# After PGN is written, run analysis and save it
|
|
# to the same file (inserted before PGN)
|
|
if game_log_path:
|
|
analysis_text: str | None = None
|
|
try:
|
|
analyze_script = os.path.join(
|
|
os.path.dirname(os.path.dirname(__file__)),
|
|
"stockfish_analysis",
|
|
"analyze_chess_game.py",
|
|
)
|
|
if os.path.isfile(analyze_script):
|
|
# Estimate total plies from the final board
|
|
try:
|
|
total_plies = len(board.move_stack)
|
|
except TypeError:
|
|
total_plies = 0
|
|
|
|
logging.info(
|
|
f"Game {game_id}: starting post-game "
|
|
f"analysis ({total_plies} plies)"
|
|
)
|
|
# Run analyzer unbuffered and stream output for progress
|
|
proc = subprocess.Popen(
|
|
[sys.executable, "-u", analyze_script, game_log_path],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1,
|
|
)
|
|
analyzed = 0
|
|
lines: list[str] = []
|
|
ply_line_re = __import__("re").compile(r"^\s*(\d+)\s")
|
|
# Read stdout line by line
|
|
# stdout/stderr are guaranteed non-None with PIPE
|
|
assert proc.stdout is not None # noqa: S101
|
|
assert proc.stderr is not None # noqa: S101
|
|
for line in proc.stdout:
|
|
lines.append(line)
|
|
m = ply_line_re.match(line)
|
|
if m:
|
|
# Count as one analyzed ply
|
|
analyzed += 1
|
|
left = (
|
|
max(0, (total_plies or 0) - analyzed)
|
|
if total_plies
|
|
else "?"
|
|
)
|
|
if total_plies:
|
|
pct = analyzed / total_plies * 100.0
|
|
logging.info(
|
|
f"Game {game_id}: analysis progress "
|
|
f"{analyzed}/{total_plies} "
|
|
f"({pct:.0f}%), left {left}"
|
|
)
|
|
else:
|
|
logging.info(
|
|
f"Game {game_id}: analysis progress "
|
|
f"{analyzed} plies (total unknown)"
|
|
)
|
|
|
|
# Capture any remaining stderr and ensure process ends
|
|
stderr_text = proc.stderr.read() or ""
|
|
ret = proc.wait()
|
|
analysis_text = "".join(lines)
|
|
if ret != 0:
|
|
logging.warning(
|
|
f"Game {game_id}: analysis script "
|
|
f"exited with code {ret}"
|
|
)
|
|
if stderr_text:
|
|
analysis_text += "\n[stderr]\n" + stderr_text
|
|
logging.info(f"Game {game_id}: analysis complete")
|
|
else:
|
|
logging.info(
|
|
f"Game {game_id}: analysis script not found "
|
|
f"at {analyze_script}; skipping analysis"
|
|
)
|
|
except (subprocess.SubprocessError, OSError) as e:
|
|
logging.debug(f"Game {game_id}: analysis run failed: {e}")
|
|
|
|
# Insert analysis before the PGN section so future runs
|
|
# can still parse PGN cleanly
|
|
if analysis_text:
|
|
try:
|
|
with open(
|
|
game_log_path, encoding="utf-8", errors="replace"
|
|
) as f:
|
|
content = f.read()
|
|
|
|
# Find the start of the 'PGN:' line
|
|
insert_idx = 0
|
|
p = content.find("\nPGN:\n")
|
|
if p != -1:
|
|
insert_idx = (
|
|
p + 1
|
|
) # start of the line after the preceding newline
|
|
elif content.startswith("PGN:\n"):
|
|
insert_idx = 0
|
|
else:
|
|
# If PGN marker not found (unexpected), append at end
|
|
insert_idx = len(content)
|
|
|
|
# Prepend meta information block for easier parsing later
|
|
meta_lines = []
|
|
if game_date_iso:
|
|
meta_lines.append(f"Date: {game_date_iso}")
|
|
if white_name or black_name:
|
|
meta_lines.append(
|
|
f"Players: {white_name or '?'} "
|
|
f"vs {black_name or '?'}"
|
|
)
|
|
if meta_lines:
|
|
meta_block = "\n".join(meta_lines) + "\n"
|
|
else:
|
|
meta_block = ""
|
|
|
|
analysis_block = (
|
|
(meta_block if meta_block else "")
|
|
+ "ANALYSIS:\n"
|
|
+ analysis_text.rstrip()
|
|
+ "\n\n"
|
|
)
|
|
new_content = (
|
|
content[:insert_idx]
|
|
+ analysis_block
|
|
+ content[insert_idx:]
|
|
)
|
|
with open(game_log_path, "w", encoding="utf-8") as f:
|
|
f.write(new_content)
|
|
except OSError as e:
|
|
logging.debug(
|
|
f"Game {game_id}: could not write analysis to log: {e}"
|
|
)
|
|
except OSError as e:
|
|
logging.debug(f"Game {game_id}: could not write PGN: {e}")
|
|
logging.info(f"Ending game thread for {game_id}")
|
|
|
|
def _process_event_stream() -> None:
|
|
"""Process events from the Lichess event stream.
|
|
|
|
Handles challenges and game start/finish events.
|
|
"""
|
|
for event in api.stream_events():
|
|
if event.get("type") == "challenge":
|
|
challenge = event["challenge"]
|
|
ch_id = challenge["id"]
|
|
variant = challenge.get("variant", {}).get("key", "standard")
|
|
speed = challenge.get("speed")
|
|
perf_ok = speed in {"bullet", "blitz", "rapid", "classical"}
|
|
not_corr = (
|
|
challenge.get("speed") != "correspondence"
|
|
or not decline_correspondence
|
|
)
|
|
if variant == "standard" and perf_ok and not_corr:
|
|
logging.info(f"Accepting challenge {ch_id} ({speed})")
|
|
api.accept_challenge(ch_id)
|
|
else:
|
|
logging.info(
|
|
f"Declining challenge {ch_id} "
|
|
f"(variant={variant}, speed={speed})"
|
|
)
|
|
api.decline_challenge(ch_id)
|
|
|
|
elif event.get("type") == "gameStart":
|
|
game_id = event["game"]["id"]
|
|
# Spin up a game thread
|
|
if game_id not in game_threads or not game_threads[game_id].is_alive():
|
|
t = threading.Thread(
|
|
target=handle_game, args=(game_id,), name=f"game-{game_id}"
|
|
)
|
|
t.daemon = True
|
|
game_threads[game_id] = t
|
|
t.start()
|
|
|
|
elif event.get("type") == "gameFinish":
|
|
game_id = event["game"]["id"]
|
|
logging.info(f"Game finished event: {game_id}")
|
|
else:
|
|
logging.debug(f"Unhandled event: {json.dumps(event)}")
|
|
|
|
def _run_event_loop_iteration() -> int:
|
|
"""Run one iteration of the event loop with error handling.
|
|
|
|
Returns:
|
|
New backoff value (0 on success, increased on error).
|
|
"""
|
|
try:
|
|
_process_event_stream()
|
|
except requests.RequestException as e:
|
|
logging.warning(f"Event stream error: {e}")
|
|
return backoff_sleep(backoff)
|
|
else:
|
|
# If stream ends normally, reset backoff
|
|
return 0
|
|
|
|
# Main event stream: challenge and game start events
|
|
logging.info("Connecting to Lichess event stream. Waiting for challenges...")
|
|
backoff = 0
|
|
while True:
|
|
backoff = _run_event_loop_iteration()
|
|
|
|
|
|
def main() -> None:
|
|
"""Parse arguments and run the Lichess bot."""
|
|
parser = argparse.ArgumentParser(description="Run a minimal Lichess bot")
|
|
parser.add_argument(
|
|
"--log-level", default="INFO", help="Logging level (default: INFO)"
|
|
)
|
|
parser.add_argument(
|
|
"--decline-correspondence",
|
|
action="store_true",
|
|
help="Decline correspondence challenges",
|
|
)
|
|
args = parser.parse_args()
|
|
run_bot(args.log_level, decline_correspondence=args.decline_correspondence)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|