From 42ec8be258c8ea63e3c610010a9e9ae5b8a65a89 Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Fri, 22 Aug 2025 17:26:17 +0200 Subject: [PATCH] feat: make the bot record games --- PYTHON/lichess_bot/engine.py | 196 +++++++++++++++++++++++++++++++++-- PYTHON/lichess_bot/main.py | 24 ++++- 2 files changed, 213 insertions(+), 7 deletions(-) diff --git a/PYTHON/lichess_bot/engine.py b/PYTHON/lichess_bot/engine.py index 3252f3c..b8a6900 100644 --- a/PYTHON/lichess_bot/engine.py +++ b/PYTHON/lichess_bot/engine.py @@ -1,17 +1,201 @@ +import time import random -from typing import Optional +from typing import Optional, Tuple import chess class RandomEngine: - """Picks a random legal move. + """A simple engine with a tiny alpha-beta search and material+mobility eval. - You can replace this with a UCI engine wrapper or a better search. + Keeps the same name for compatibility, but no longer picks purely random moves. """ + def __init__(self, depth: int = 100, max_time_sec: float = 5): + self.depth = depth + self.max_time_sec = max_time_sec + + # Centipawn values + self.piece_values = { + chess.PAWN: 100, + chess.KNIGHT: 320, + chess.BISHOP: 330, + chess.ROOK: 500, + chess.QUEEN: 900, + chess.KING: 0, + } + def choose_move(self, board: chess.Board) -> Optional[chess.Move]: + start = time.time() + best_move: Optional[chess.Move] = None + best_score = -float("inf") if board.turn else float("inf") + + # Iterative deepening up to depth or time limit + for d in range(1, self.depth + 1): + elapsed = time.time() - start + if elapsed >= self.max_time_sec: + break + score, move = self._search_root(board, d, start) + if move is not None: + best_move, best_score = move, score + + # Fallback to random if search didn’t find anything + if best_move is None: + moves = list(board.legal_moves) + return random.choice(moves) if moves else None + return best_move + + def choose_move_with_explanation(self, board: chess.Board) -> Tuple[Optional[chess.Move], str]: + """Return the chosen move and a human-readable explanation of top candidates. + + The explanation lists top candidates with scores and quick annotations. + """ + start = time.time() + depth_used = 0 + best_move: Optional[chess.Move] = None + scores: list[Tuple[chess.Move, float]] = [] + + # Analyze all legal moves at the root with alpha-beta to given depth/time + for d in range(1, self.depth + 1): + if time.time() - start >= self.max_time_sec: + break + depth_used = d + scores = self._analyze_root(board, d, start) + if scores: + best_move = scores[0][0] + + if not scores: + # Fallback + mv = self.choose_move(board) + return mv, "fallback: random/legal-only (no analysis)" + + # Build explanation + def annotate(m: chess.Move) -> str: + tags = [] + if board.is_capture(m): + tags.append("x") + if m.promotion: + tags.append(f"={chess.piece_symbol(m.promotion).upper()}") + try: + if board.gives_check(m): + tags.append("+") + except Exception: + pass + return "".join(tags) + + top = scores[:5] + best_cp = top[0][1] + lines = [ + f"depth={depth_used} time={time.time()-start:.2f}s candidates={len(scores)}", + f"best {board.san(top[0][0])} ({top[0][0].uci()}) score={best_cp:.1f}", + ] + if len(top) > 1: + lines.append("alternatives:") + for mv, sc in top[1:]: + delta = sc - best_cp + lines.append(f" {board.san(mv)} ({mv.uci()}) score={sc:.1f} delta={delta:+.1f} {annotate(mv)}") + + return best_move, "\n".join(lines) + + def _analyze_root(self, board: chess.Board, depth: int, start: float) -> list[Tuple[chess.Move, float]]: + alpha = -float("inf") + beta = float("inf") + scored: list[Tuple[chess.Move, float]] = [] + for move in self._ordered_moves(board): + if time.time() - start >= self.max_time_sec: + break + board.push(move) + score = -self._alphabeta(board, depth - 1, -beta, -alpha, start) + board.pop() + scored.append((move, score)) + if score > alpha: + alpha = score + if alpha >= beta: + break + scored.sort(key=lambda t: t[1], reverse=True) + return scored + + def _search_root(self, board: chess.Board, depth: int, start: float) -> Tuple[float, Optional[chess.Move]]: + alpha = -float("inf") + beta = float("inf") + best_move: Optional[chess.Move] = None + best_score = -float("inf") + + moves = self._ordered_moves(board) + for move in moves: + if time.time() - start >= self.max_time_sec: + break + board.push(move) + score = -self._alphabeta(board, depth - 1, -beta, -alpha, start) + board.pop() + if score > best_score: + best_score = score + best_move = move + if score > alpha: + alpha = score + if alpha >= beta: + break + return best_score, best_move + + def _alphabeta(self, board: chess.Board, depth: int, alpha: float, beta: float, start: float) -> float: + # Time cutoff + if time.time() - start >= self.max_time_sec: + return self._evaluate(board) + + # Terminal nodes + if depth == 0 or board.is_game_over(): + return self._evaluate(board) + + best = -float("inf") + for move in self._ordered_moves(board): + board.push(move) + score = -self._alphabeta(board, depth - 1, -beta, -alpha, start) + board.pop() + if score > best: + best = score + if best > alpha: + alpha = best + if alpha >= beta: + break + return best + + def _ordered_moves(self, board: chess.Board): + # Simple move ordering: captures/promotions first, then checks + def score_move(m: chess.Move) -> int: + s = 0 + if board.is_capture(m): + s += 1000 + if m.promotion: + s += 800 + try: + if board.gives_check(m): + s += 100 + except Exception: + pass + return s + moves = list(board.legal_moves) - if not moves: - return None - return random.choice(moves) + moves.sort(key=score_move, reverse=True) + return moves + + def _evaluate(self, board: chess.Board) -> float: + # Game end conditions + if board.is_checkmate(): + return -100000 if board.turn else 100000 # side-to-move is mated + if board.is_stalemate() or board.is_insufficient_material() or board.can_claim_draw(): + return 0 + + # Material + material = 0 + for square, piece in board.piece_map().items(): + val = self.piece_values[piece.piece_type] + material += val if piece.color == chess.WHITE else -val + + # Mobility (only side to move for speed; acts as tempo bonus) + mobility = 5 * sum(1 for _ in board.legal_moves) + if board.turn: + mobility_term = mobility + else: + mobility_term = -mobility + + return material + mobility_term diff --git a/PYTHON/lichess_bot/main.py b/PYTHON/lichess_bot/main.py index b9320dc..e5b597f 100644 --- a/PYTHON/lichess_bot/main.py +++ b/PYTHON/lichess_bot/main.py @@ -7,6 +7,7 @@ import time from typing import Optional import chess +import chess.pgn from .engine import RandomEngine from .lichess_api import LichessAPI @@ -35,6 +36,13 @@ def run_bot(log_level: str = "INFO", decline_correspondence: bool = False) -> No color: Optional[str] = my_color # Track how many moves we have already processed; start at -1 so we act on the first state (0 moves) last_handled_len = -1 + # Prepare a per-game log file + game_log_path = os.path.join(os.getcwd(), f"lichess_bot_game_{game_id}.log") + try: + with open(game_log_path, "w") as lf: + lf.write(f"game {game_id} started\n") + except Exception: + game_log_path = None try: for event in api.stream_game_events(game_id): et = event.get("type") @@ -85,12 +93,15 @@ def run_bot(log_level: str = "INFO", decline_correspondence: bool = False) -> No f"Game {game_id}: turn={'white' if is_white_turn else 'black'}, my_turn={my_turn}" ) if my_turn: - move = engine.choose_move(board) + move, reason = engine.choose_move_with_explanation(board) if move is None: logging.info(f"Game {game_id}: no legal moves (game likely over)") break try: logging.info(f"Game {game_id}: playing {move.uci()}") + if game_log_path: + with open(game_log_path, "a") as lf: + lf.write(f"ply {last_handled_len+1}: {move.uci()}\n{reason}\n\n") api.make_move(game_id, move) except Exception as e: logging.warning(f"Game {game_id}: move {move.uci()} failed: {e}") @@ -106,6 +117,17 @@ def run_bot(log_level: str = "INFO", decline_correspondence: bool = False) -> No except Exception as e: logging.exception(f"Game {game_id} thread error: {e}") finally: + # On game end, write full PGN to the log file + try: + if game_log_path: + game = chess.pgn.Game.from_board(board) + with open(game_log_path, "a") as lf: + lf.write("\nPGN:\n") + exporter = chess.pgn.StringExporter(headers=True, variations=False, comments=False) + lf.write(game.accept(exporter)) + lf.write("\n") + except Exception as e: + logging.debug(f"Game {game_id}: could not write PGN: {e}") logging.info(f"Ending game thread for {game_id}") # Main event stream: challenge and game start events