diff --git a/python_pkg/stockfish_analysis/analyze_chess_game.py b/python_pkg/stockfish_analysis/analyze_chess_game.py index 4b2e15d..f22b94b 100755 --- a/python_pkg/stockfish_analysis/analyze_chess_game.py +++ b/python_pkg/stockfish_analysis/analyze_chess_game.py @@ -23,6 +23,7 @@ from __future__ import annotations import argparse import contextlib +from dataclasses import dataclass import io import logging import multiprocessing @@ -216,8 +217,35 @@ def _auto_hash_mb(threads_wanted: int, engine_options: dict[str, object]) -> int return max(64, int(target)) -def main() -> None: - """Parse arguments and run chess game analysis.""" +# Type aliases for clarity +EngineOptions = dict[str, object] + + +@dataclass +class MoveAnalysis: + """Container for single move analysis results.""" + + san: str + best_san: str + played_cp: int | None + played_mate: int | None + best_cp: int | None + best_mate: int | None + cp_loss: int | None + classification: str + + +@dataclass +class AnalysisContext: + """Container for analysis parameters passed between functions.""" + + engine: chess.engine.SimpleEngine + limit: chess.engine.Limit + multipv: int + + +def _build_argument_parser() -> argparse.ArgumentParser: + """Build and return the argument parser for the analysis script.""" ap = argparse.ArgumentParser( description="Analyze a chess game's moves with Stockfish and rate each move." ) @@ -227,7 +255,6 @@ def main() -> None: default="stockfish", help="Path to stockfish executable (default: stockfish)", ) - # Exactly one of time or depth may be provided; default to time ap.add_argument( "--time", type=float, @@ -240,7 +267,6 @@ def main() -> None: default=None, help="Fixed depth per evaluation (overrides --time)", ) - # Performance knobs ap.add_argument( "--threads", type=_parse_threads, @@ -269,13 +295,16 @@ def main() -> None: "(reports its eval and the best move)" ), ) - args = ap.parse_args() + return ap - if not Path(args.file).is_file(): - _logger.error(f"Input not found: {args.file}") + +def _load_game(file_path: str) -> chess.pgn.Game: + """Load and parse a chess game from a file.""" + if not Path(file_path).is_file(): + _logger.error(f"Input not found: {file_path}") sys.exit(1) - with open(args.file, encoding="utf-8", errors="replace") as f: + with open(file_path, encoding="utf-8", errors="replace") as f: raw = f.read() pgn_text = extract_pgn_text(raw) @@ -288,7 +317,84 @@ def main() -> None: _logger.error("Failed to parse PGN.") sys.exit(3) - # Prepare engine + return game + + +def _configure_threads( + engine: chess.engine.SimpleEngine, + options: EngineOptions, + requested: int | None, +) -> int: + """Configure engine thread count and return actual threads used.""" + wanted = requested if requested is not None else (multiprocessing.cpu_count() or 1) + if "Threads" not in options: + return wanted + try: + max_thr = getattr(options["Threads"], "max", None) + min_thr = getattr(options["Threads"], "min", 1) + if isinstance(max_thr, int): + wanted = min(wanted, max_thr) + if isinstance(min_thr, int): + wanted = max(wanted, min_thr) + engine.configure({"Threads": int(wanted)}) + except (AttributeError, TypeError, ValueError): + _logger.debug("Failed to configure Threads option") + return wanted + + +def _configure_hash( + engine: chess.engine.SimpleEngine, + options: EngineOptions, + requested: int | None, + threads: int, +) -> None: + """Configure engine hash table size.""" + if "Hash" not in options: + return + try: + target = ( + int(requested) if requested is not None else _auto_hash_mb(threads, options) + ) + max_hash = getattr(options["Hash"], "max", None) + min_hash = getattr(options["Hash"], "min", 16) + if isinstance(max_hash, int): + target = min(target, max_hash) + if isinstance(min_hash, int): + target = max(target, min_hash) + engine.configure({"Hash": int(target)}) + except (AttributeError, TypeError, ValueError): + _logger.debug("Failed to configure Hash option") + + +def _configure_multipv( + engine: chess.engine.SimpleEngine, options: EngineOptions, requested: int +) -> int: + """Configure MultiPV and return effective value.""" + effective = max(1, int(requested)) + if "MultiPV" not in options: + return effective + try: + max_mpv = getattr(options["MultiPV"], "max", None) + if isinstance(max_mpv, int): + effective = min(effective, max_mpv) + engine.configure({"MultiPV": int(effective)}) + except (AttributeError, TypeError, ValueError): + _logger.debug("Failed to configure MultiPV option") + return effective + + +def _configure_nnue(engine: chess.engine.SimpleEngine, options: EngineOptions) -> None: + """Enable NNUE if supported.""" + for nnue_key in ("Use NNUE", "UseNNUE"): + if nnue_key in options: + with contextlib.suppress(Exception): + engine.configure({nnue_key: True}) + + +def _setup_engine( + args: argparse.Namespace, +) -> tuple[chess.engine.SimpleEngine, int, chess.engine.Limit]: + """Initialize and configure the chess engine.""" try: engine = chess.engine.SimpleEngine.popen_uci([args.engine]) except FileNotFoundError: @@ -298,63 +404,15 @@ def main() -> None: ) sys.exit(4) - # Configure engine performance options if available try: options = engine.options # type: ignore[attr-defined] except AttributeError: options = {} - # Threads - wanted_threads = ( - args.threads if args.threads is not None else (multiprocessing.cpu_count() or 1) - ) - # Respect engine bounds if present - if "Threads" in options: - try: - max_thr = getattr(options["Threads"], "max", None) - min_thr = getattr(options["Threads"], "min", 1) - if isinstance(max_thr, int): - wanted_threads = min(wanted_threads, max_thr) - if isinstance(min_thr, int): - wanted_threads = max(wanted_threads, min_thr) - engine.configure({"Threads": int(wanted_threads)}) - except (AttributeError, TypeError, ValueError): - _logger.debug("Failed to configure Threads option") - - # Configure hash table size in MB. - if "Hash" in options: - try: - if args.hash_mb is not None: - target_hash = int(args.hash_mb) - else: - target_hash = _auto_hash_mb(int(wanted_threads), options) - # Respect bounds - max_hash = getattr(options["Hash"], "max", None) - min_hash = getattr(options["Hash"], "min", 16) - if isinstance(max_hash, int): - target_hash = min(target_hash, max_hash) - if isinstance(min_hash, int): - target_hash = max(target_hash, min_hash) - engine.configure({"Hash": int(target_hash)}) - except (AttributeError, TypeError, ValueError): - _logger.debug("Failed to configure Hash option") - - # MultiPV - effective_mpv = max(1, int(args.multipv)) - if "MultiPV" in options: - try: - max_mpv = getattr(options["MultiPV"], "max", None) - if isinstance(max_mpv, int): - effective_mpv = min(effective_mpv, max_mpv) - engine.configure({"MultiPV": int(effective_mpv)}) - except (AttributeError, TypeError, ValueError): - _logger.debug("Failed to configure MultiPV option") - - # Enable NNUE if the option exists - for nnue_key in ("Use NNUE", "UseNNUE"): - if nnue_key in options: - with contextlib.suppress(Exception): - engine.configure({nnue_key: True}) + threads = _configure_threads(engine, options, args.threads) + _configure_hash(engine, options, args.hash_mb, threads) + effective_mpv = _configure_multipv(engine, options, args.multipv) + _configure_nnue(engine, options) limit: chess.engine.Limit if args.depth is not None: @@ -362,6 +420,136 @@ def main() -> None: else: limit = chess.engine.Limit(time=max(0.05, args.time)) + _log_engine_config(engine, threads, effective_mpv) + return engine, effective_mpv, limit + + +def _log_engine_config( + engine: chess.engine.SimpleEngine, threads: int, multipv: int +) -> None: + """Log engine configuration summary.""" + try: + hash_val = engine.options.get("Hash") + hash_show = int(hash_val.value) if hash_val else None + except (AttributeError, TypeError, ValueError): + hash_show = None + if hash_show is not None: + _logger.info( + f"Using engine options: Threads={threads}, " + f"Hash={hash_show} MB, MultiPV={multipv}" + ) + else: + _logger.info(f"Using engine options: Threads={threads}, MultiPV={multipv}") + + +def _get_best_move( + engine: chess.engine.SimpleEngine, + board: chess.Board, + limit: chess.engine.Limit, + multipv: int, +) -> chess.Move | None: + """Get the engine's best move for a position.""" + info_raw = engine.analyse(board, limit=limit, multipv=multipv) + info = info_raw[0] if isinstance(info_raw, list) else info_raw + if info is not None and "pv" in info and info["pv"]: + return info["pv"][0] + res = engine.play(board, limit) + return res.move + + +def _evaluate_position( + engine: chess.engine.SimpleEngine, + board: chess.Board, + limit: chess.engine.Limit, + multipv: int, + *, + pov_white: bool, +) -> tuple[int | None, int | None]: + """Evaluate a position and return (cp, mate_in) from POV.""" + info_raw = engine.analyse(board, limit=limit, multipv=multipv) + info = info_raw[0] if isinstance(info_raw, list) else info_raw + if info is None or "score" not in info: + return None, None + return score_to_cp(info["score"], pov_white=pov_white) + + +def _classify_mate_move(best_mate: int | None, played_mate: int | None) -> str: + """Classify a move when mate scores are involved.""" + if best_mate is None or played_mate is None: + return "Blunder" + if (best_mate > 0) and (played_mate > 0): + if abs(played_mate) > abs(best_mate): + return "Inaccuracy" + return "Best" + if (best_mate < 0) and (played_mate < 0): + if abs(played_mate) < abs(best_mate): + return "Blunder" + return "Best" if abs(played_mate) == abs(best_mate) else "Good" + return "Blunder" + + +def _analyze_single_move( + ctx: AnalysisContext, board: chess.Board, move: chess.Move +) -> MoveAnalysis: + """Analyze a single move and return analysis data.""" + mover_white = board.turn + san = board.san(move) + + best_move = _get_best_move(ctx.engine, board, ctx.limit, ctx.multipv) + best_san = board.san(best_move) if best_move is not None else "?" + + board_played = board.copy() + board_played.push(move) + played_cp, played_mate = _evaluate_position( + ctx.engine, board_played, ctx.limit, ctx.multipv, pov_white=mover_white + ) + + if best_move is not None: + board_best = board.copy() + board_best.push(best_move) + best_cp, best_mate = _evaluate_position( + ctx.engine, board_best, ctx.limit, ctx.multipv, pov_white=mover_white + ) + else: + best_cp, best_mate = None, None + + cp_loss: int | None = None + if best_mate is not None or played_mate is not None: + classification = _classify_mate_move(best_mate, played_mate) + elif best_cp is not None and played_cp is not None: + cp_loss = max(0, best_cp - played_cp) + classification = classify_cp_loss(cp_loss) + else: + classification = "Unknown" + + return MoveAnalysis( + san=san, + best_san=best_san, + played_cp=played_cp, + played_mate=played_mate, + best_cp=best_cp, + best_mate=best_mate, + cp_loss=cp_loss, + classification=classification, + ) + + +def _log_move_analysis(ply: int, result: MoveAnalysis, *, mover_white: bool) -> None: + """Log a single move's analysis result.""" + side = "W" if mover_white else "B" + loss_str = str(result.cp_loss) if result.cp_loss is not None else "—" + _logger.info( + f"{ply:>3} {side} {result.san:<8} " + f"{fmt_eval(result.played_cp, result.played_mate):>10} " + f"{fmt_eval(result.best_cp, result.best_mate):>9} " + f"{loss_str:>5} {result.classification:<12} {result.best_san}" + ) + + +def _run_analysis( + game: chess.pgn.Game, ctx: AnalysisContext, *, last_move_only: bool +) -> None: + """Run the move-by-move analysis.""" board = game.board() _logger.info("Game:") white = game.headers.get("White", "White") @@ -372,264 +560,63 @@ def main() -> None: _logger.info( "Columns: ply side move played_eval best_eval loss class best_suggestion" ) - # Brief performance summary (best-effort) - try: - thr_show = int(wanted_threads) - except (ValueError, TypeError): - thr_show = 1 - try: - hash_show = ( - int(engine.options.get("Hash").value) - if hasattr(engine, "options") and engine.options.get("Hash") - else None - ) - except (AttributeError, TypeError, ValueError): - hash_show = None - if hash_show is not None: - _logger.info( - f"Using engine options: Threads={thr_show}, " - f"Hash={hash_show} MB, MultiPV={effective_mpv}" - ) + + if last_move_only: + _analyze_last_move(game, board, ctx) else: - _logger.info( - f"Using engine options: Threads={thr_show}, MultiPV={effective_mpv}" - ) + _analyze_all_moves(game, board, ctx) + + +def _analyze_last_move( + node: chess.pgn.Game, board: chess.Board, ctx: AnalysisContext +) -> None: + """Walk to last move and analyze only that ply.""" + if not node.variations: + _logger.warning("No moves found in the game.") + return ply = 1 + while node.variations: + move_node = node.variations[0] + move = move_node.move + + if not move_node.variations: + result = _analyze_single_move(ctx, board, move) + _log_move_analysis(ply, result, mover_white=board.turn) + break + + board.push(move) + node = move_node + ply += 1 + + +def _analyze_all_moves( + node: chess.pgn.Game, board: chess.Board, ctx: AnalysisContext +) -> None: + """Analyze all moves in the game.""" + ply = 1 + while node.variations: + move_node = node.variations[0] + move = move_node.move + mover_white = board.turn + + result = _analyze_single_move(ctx, board, move) + _log_move_analysis(ply, result, mover_white=mover_white) + + node = move_node + ply += 1 + board.push(move) + + +def main() -> None: + """Parse arguments and run chess game analysis.""" + args = _build_argument_parser().parse_args() + game = _load_game(args.file) + engine, effective_mpv, limit = _setup_engine(args) + ctx = AnalysisContext(engine=engine, limit=limit, multipv=effective_mpv) + try: - node = game - - if args.last_move_only: - # Walk to the last move in the main line and analyze only that ply. - if not node.variations: - _logger.warning("No moves found in the game.") - else: - while node.variations: - move_node = node.variations[0] - move = move_node.move - mover_white = board.turn - - # If this is the final move in the mainline, analyze it and stop. - if not move_node.variations: - # Analyse current position to get engine best move suggestion - info_root_raw = engine.analyse( - board, limit=limit, multipv=effective_mpv - ) - info_root = ( - info_root_raw[0] - if isinstance(info_root_raw, list) - else info_root_raw - ) - best_move = None - if ( - info_root is not None - and "pv" in info_root - and info_root["pv"] - ): - best_move = info_root["pv"][0] - if best_move is None: - res = engine.play(board, limit) - best_move = res.move - - san = board.san(move) - - # Evaluate played move - board_played = board.copy() - board_played.push(move) - info_played_raw = engine.analyse( - board_played, limit=limit, multipv=effective_mpv - ) - info_played = ( - info_played_raw[0] - if isinstance(info_played_raw, list) - else info_played_raw - ) - if info_played is None or "score" not in info_played: - played_cp, played_mate = None, None - else: - played_cp, played_mate = score_to_cp( - info_played["score"], pov_white=mover_white - ) - - # Evaluate best move position (for mover POV) - best_san = ( - board.san(best_move) if best_move is not None else "?" - ) - if best_move is not None: - board_best = board.copy() - board_best.push(best_move) - info_best_raw = engine.analyse( - board_best, limit=limit, multipv=effective_mpv - ) - info_best = ( - info_best_raw[0] - if isinstance(info_best_raw, list) - else info_best_raw - ) - if info_best is None or "score" not in info_best: - best_cp, best_mate = None, None - else: - best_cp, best_mate = score_to_cp( - info_best["score"], pov_white=mover_white - ) - else: - best_cp, best_mate = None, None - - # Compute loss/classification - cp_loss: int | None = None - classification = "Unknown" - if best_mate is not None or played_mate is not None: - if best_mate is not None and played_mate is not None: - if (best_mate > 0) and (played_mate > 0): - if abs(played_mate) == abs(best_mate): - classification = "Best" - elif abs(played_mate) > abs(best_mate): - classification = "Inaccuracy" - else: - classification = "Best" - elif (best_mate < 0) and (played_mate < 0): - if abs(played_mate) == abs(best_mate): - classification = "Best" - elif abs(played_mate) < abs(best_mate): - classification = "Blunder" - else: - classification = "Good" - else: - classification = "Blunder" - else: - classification = "Blunder" - elif best_cp is not None and played_cp is not None: - cp_loss = max(0, best_cp - played_cp) - classification = classify_cp_loss(cp_loss) - - side = "W" if mover_white else "B" - _logger.info( - f"{ply:>3} {side} {san:<8} " - f"{fmt_eval(played_cp, played_mate):>10} " - f"{fmt_eval(best_cp, best_mate):>9} " - f"{(str(cp_loss) if cp_loss is not None else '—'):>5} " - f"{classification:<12} {best_san}" - ) - break - - # Advance to keep searching for the last move - board.push(move) - node = move_node - ply += 1 - else: - # Default behavior: analyze all moves - while node.variations: - move_node = node.variations[0] - move = move_node.move - mover_white = board.turn - - # Analyse position to get engine best move suggestion - info_root_raw = engine.analyse( - board, limit=limit, multipv=effective_mpv - ) - info_root = ( - info_root_raw[0] - if isinstance(info_root_raw, list) - else info_root_raw - ) - best_move = None - if info_root is not None and "pv" in info_root and info_root["pv"]: - best_move = info_root["pv"][0] - # Fallback to engine.play if PV missing - if best_move is None: - res = engine.play(board, limit) - best_move = res.move - - # Evaluate played move position (for mover POV) using a temp board - san = board.san(move) - board_played = board.copy() - board_played.push(move) - info_played_raw = engine.analyse( - board_played, limit=limit, multipv=effective_mpv - ) - info_played = ( - info_played_raw[0] - if isinstance(info_played_raw, list) - else info_played_raw - ) - if info_played is None or "score" not in info_played: - played_cp, played_mate = None, None - else: - played_cp, played_mate = score_to_cp( - info_played["score"], pov_white=mover_white - ) - - # Evaluate best move position (for mover POV) - best_san = board.san(best_move) if best_move is not None else "?" - if best_move is not None: - board_best = board.copy() - board_best.push(best_move) - info_best_raw = engine.analyse( - board_best, limit=limit, multipv=effective_mpv - ) - info_best = ( - info_best_raw[0] - if isinstance(info_best_raw, list) - else info_best_raw - ) - if info_best is None or "score" not in info_best: - best_cp, best_mate = None, None - else: - best_cp, best_mate = score_to_cp( - info_best["score"], pov_white=mover_white - ) - else: - best_cp, best_mate = None, None - - # Compute centipawn loss bands - cp_loss: int | None = None - classification = "Unknown" - # Handle mate cases first - if best_mate is not None or played_mate is not None: - if best_mate is not None and played_mate is not None: - # Same sign -> compare speed - if (best_mate > 0) and (played_mate > 0): - # Keeping a mate: equal speed Best; - # slower -> Inaccuracy; faster -> Best - if abs(played_mate) == abs(best_mate): - classification = "Best" - elif abs(played_mate) > abs(best_mate): - classification = "Inaccuracy" - else: - classification = "Best" - elif (best_mate < 0) and (played_mate < 0): - # Defending: equal delay Best; - # sooner mate -> Blunder; - # if played delays more -> Good - if abs(played_mate) == abs(best_mate): - classification = "Best" - elif abs(played_mate) < abs(best_mate): - classification = "Blunder" - else: - classification = "Good" - else: - # Sign flip across who mates -> Blunder - classification = "Blunder" - else: - # Losing a forced mate or missing one - classification = "Blunder" - elif best_cp is not None and played_cp is not None: - cp_loss = max(0, best_cp - played_cp) - classification = classify_cp_loss(cp_loss) - - side = "W" if mover_white else "B" - _logger.info( - f"{ply:>3} {side} {san:<8} " - f"{fmt_eval(played_cp, played_mate):>10} " - f"{fmt_eval(best_cp, best_mate):>9} " - f"{(str(cp_loss) if cp_loss is not None else '—'):>5} " - f"{classification:<12} {best_san}" - ) - - node = move_node - ply += 1 - # Advance the live board for the next ply - board.push(move) + _run_analysis(game, ctx, last_move_only=args.last_move_only) finally: engine.quit()