refactor(stockfish_analysis): reduce complexity of main() function

Break down monolithic main() into smaller, focused functions:
- _build_argument_parser(): CLI argument setup
- _load_game(): PGN file loading and parsing
- _configure_threads/hash/multipv/nnue(): Engine configuration helpers
- _setup_engine(): Engine initialization orchestration
- _get_best_move(): Engine best move extraction
- _evaluate_position(): Position evaluation wrapper
- _classify_mate_move(): Mate score classification logic
- _analyze_single_move(): Single move analysis
- _log_move_analysis(): Move analysis logging
- _run_analysis(): Analysis loop orchestration
- _analyze_last_move/all_moves(): Specific analysis modes

Add dataclasses MoveAnalysis and AnalysisContext to bundle related
parameters, improving code organization and reducing function signatures.

This removes the need for C901, PLR0912, PLR0915 per-file ignores
as all functions now comply with complexity limits.
This commit is contained in:
Krzysztof kuhy Rudnicki 2025-11-30 23:23:10 +01:00
parent da742f0699
commit 09a74deab8
2 changed files with 302 additions and 320 deletions

View File

@ -75,11 +75,6 @@ unfixable = []
"python_pkg/lichess_bot/engine.py" = [
"S603", # Subprocess for engine communication
]
"python_pkg/stockfish_analysis/analyze_chess_game.py" = [
"C901", # Complex main() with many argument combinations and analysis modes
"PLR0912", # Complex main() with many argument combinations and analysis modes
"PLR0915", # Long main() handling complete analysis workflow
]
"python_pkg/keyboard_coop/main.py" = [
"FBT003", # Boolean positional values in pygame API calls (e.g., font.render)
]

View File

@ -23,6 +23,7 @@ from __future__ import annotations
import argparse
import contextlib
from dataclasses import dataclass
import io
import logging
import multiprocessing
@ -216,8 +217,35 @@ def _auto_hash_mb(threads_wanted: int, engine_options: dict[str, object]) -> int
return max(64, int(target))
def main() -> None:
"""Parse arguments and run chess game analysis."""
# Type aliases for clarity
EngineOptions = dict[str, object]
@dataclass
class MoveAnalysis:
"""Container for single move analysis results."""
san: str
best_san: str
played_cp: int | None
played_mate: int | None
best_cp: int | None
best_mate: int | None
cp_loss: int | None
classification: str
@dataclass
class AnalysisContext:
"""Container for analysis parameters passed between functions."""
engine: chess.engine.SimpleEngine
limit: chess.engine.Limit
multipv: int
def _build_argument_parser() -> argparse.ArgumentParser:
"""Build and return the argument parser for the analysis script."""
ap = argparse.ArgumentParser(
description="Analyze a chess game's moves with Stockfish and rate each move."
)
@ -227,7 +255,6 @@ def main() -> None:
default="stockfish",
help="Path to stockfish executable (default: stockfish)",
)
# Exactly one of time or depth may be provided; default to time
ap.add_argument(
"--time",
type=float,
@ -240,7 +267,6 @@ def main() -> None:
default=None,
help="Fixed depth per evaluation (overrides --time)",
)
# Performance knobs
ap.add_argument(
"--threads",
type=_parse_threads,
@ -269,13 +295,16 @@ def main() -> None:
"(reports its eval and the best move)"
),
)
args = ap.parse_args()
return ap
if not Path(args.file).is_file():
_logger.error(f"Input not found: {args.file}")
def _load_game(file_path: str) -> chess.pgn.Game:
"""Load and parse a chess game from a file."""
if not Path(file_path).is_file():
_logger.error(f"Input not found: {file_path}")
sys.exit(1)
with open(args.file, encoding="utf-8", errors="replace") as f:
with open(file_path, encoding="utf-8", errors="replace") as f:
raw = f.read()
pgn_text = extract_pgn_text(raw)
@ -288,7 +317,84 @@ def main() -> None:
_logger.error("Failed to parse PGN.")
sys.exit(3)
# Prepare engine
return game
def _configure_threads(
engine: chess.engine.SimpleEngine,
options: EngineOptions,
requested: int | None,
) -> int:
"""Configure engine thread count and return actual threads used."""
wanted = requested if requested is not None else (multiprocessing.cpu_count() or 1)
if "Threads" not in options:
return wanted
try:
max_thr = getattr(options["Threads"], "max", None)
min_thr = getattr(options["Threads"], "min", 1)
if isinstance(max_thr, int):
wanted = min(wanted, max_thr)
if isinstance(min_thr, int):
wanted = max(wanted, min_thr)
engine.configure({"Threads": int(wanted)})
except (AttributeError, TypeError, ValueError):
_logger.debug("Failed to configure Threads option")
return wanted
def _configure_hash(
engine: chess.engine.SimpleEngine,
options: EngineOptions,
requested: int | None,
threads: int,
) -> None:
"""Configure engine hash table size."""
if "Hash" not in options:
return
try:
target = (
int(requested) if requested is not None else _auto_hash_mb(threads, options)
)
max_hash = getattr(options["Hash"], "max", None)
min_hash = getattr(options["Hash"], "min", 16)
if isinstance(max_hash, int):
target = min(target, max_hash)
if isinstance(min_hash, int):
target = max(target, min_hash)
engine.configure({"Hash": int(target)})
except (AttributeError, TypeError, ValueError):
_logger.debug("Failed to configure Hash option")
def _configure_multipv(
engine: chess.engine.SimpleEngine, options: EngineOptions, requested: int
) -> int:
"""Configure MultiPV and return effective value."""
effective = max(1, int(requested))
if "MultiPV" not in options:
return effective
try:
max_mpv = getattr(options["MultiPV"], "max", None)
if isinstance(max_mpv, int):
effective = min(effective, max_mpv)
engine.configure({"MultiPV": int(effective)})
except (AttributeError, TypeError, ValueError):
_logger.debug("Failed to configure MultiPV option")
return effective
def _configure_nnue(engine: chess.engine.SimpleEngine, options: EngineOptions) -> None:
"""Enable NNUE if supported."""
for nnue_key in ("Use NNUE", "UseNNUE"):
if nnue_key in options:
with contextlib.suppress(Exception):
engine.configure({nnue_key: True})
def _setup_engine(
args: argparse.Namespace,
) -> tuple[chess.engine.SimpleEngine, int, chess.engine.Limit]:
"""Initialize and configure the chess engine."""
try:
engine = chess.engine.SimpleEngine.popen_uci([args.engine])
except FileNotFoundError:
@ -298,63 +404,15 @@ def main() -> None:
)
sys.exit(4)
# Configure engine performance options if available
try:
options = engine.options # type: ignore[attr-defined]
except AttributeError:
options = {}
# Threads
wanted_threads = (
args.threads if args.threads is not None else (multiprocessing.cpu_count() or 1)
)
# Respect engine bounds if present
if "Threads" in options:
try:
max_thr = getattr(options["Threads"], "max", None)
min_thr = getattr(options["Threads"], "min", 1)
if isinstance(max_thr, int):
wanted_threads = min(wanted_threads, max_thr)
if isinstance(min_thr, int):
wanted_threads = max(wanted_threads, min_thr)
engine.configure({"Threads": int(wanted_threads)})
except (AttributeError, TypeError, ValueError):
_logger.debug("Failed to configure Threads option")
# Configure hash table size in MB.
if "Hash" in options:
try:
if args.hash_mb is not None:
target_hash = int(args.hash_mb)
else:
target_hash = _auto_hash_mb(int(wanted_threads), options)
# Respect bounds
max_hash = getattr(options["Hash"], "max", None)
min_hash = getattr(options["Hash"], "min", 16)
if isinstance(max_hash, int):
target_hash = min(target_hash, max_hash)
if isinstance(min_hash, int):
target_hash = max(target_hash, min_hash)
engine.configure({"Hash": int(target_hash)})
except (AttributeError, TypeError, ValueError):
_logger.debug("Failed to configure Hash option")
# MultiPV
effective_mpv = max(1, int(args.multipv))
if "MultiPV" in options:
try:
max_mpv = getattr(options["MultiPV"], "max", None)
if isinstance(max_mpv, int):
effective_mpv = min(effective_mpv, max_mpv)
engine.configure({"MultiPV": int(effective_mpv)})
except (AttributeError, TypeError, ValueError):
_logger.debug("Failed to configure MultiPV option")
# Enable NNUE if the option exists
for nnue_key in ("Use NNUE", "UseNNUE"):
if nnue_key in options:
with contextlib.suppress(Exception):
engine.configure({nnue_key: True})
threads = _configure_threads(engine, options, args.threads)
_configure_hash(engine, options, args.hash_mb, threads)
effective_mpv = _configure_multipv(engine, options, args.multipv)
_configure_nnue(engine, options)
limit: chess.engine.Limit
if args.depth is not None:
@ -362,6 +420,136 @@ def main() -> None:
else:
limit = chess.engine.Limit(time=max(0.05, args.time))
_log_engine_config(engine, threads, effective_mpv)
return engine, effective_mpv, limit
def _log_engine_config(
engine: chess.engine.SimpleEngine, threads: int, multipv: int
) -> None:
"""Log engine configuration summary."""
try:
hash_val = engine.options.get("Hash")
hash_show = int(hash_val.value) if hash_val else None
except (AttributeError, TypeError, ValueError):
hash_show = None
if hash_show is not None:
_logger.info(
f"Using engine options: Threads={threads}, "
f"Hash={hash_show} MB, MultiPV={multipv}"
)
else:
_logger.info(f"Using engine options: Threads={threads}, MultiPV={multipv}")
def _get_best_move(
engine: chess.engine.SimpleEngine,
board: chess.Board,
limit: chess.engine.Limit,
multipv: int,
) -> chess.Move | None:
"""Get the engine's best move for a position."""
info_raw = engine.analyse(board, limit=limit, multipv=multipv)
info = info_raw[0] if isinstance(info_raw, list) else info_raw
if info is not None and "pv" in info and info["pv"]:
return info["pv"][0]
res = engine.play(board, limit)
return res.move
def _evaluate_position(
engine: chess.engine.SimpleEngine,
board: chess.Board,
limit: chess.engine.Limit,
multipv: int,
*,
pov_white: bool,
) -> tuple[int | None, int | None]:
"""Evaluate a position and return (cp, mate_in) from POV."""
info_raw = engine.analyse(board, limit=limit, multipv=multipv)
info = info_raw[0] if isinstance(info_raw, list) else info_raw
if info is None or "score" not in info:
return None, None
return score_to_cp(info["score"], pov_white=pov_white)
def _classify_mate_move(best_mate: int | None, played_mate: int | None) -> str:
"""Classify a move when mate scores are involved."""
if best_mate is None or played_mate is None:
return "Blunder"
if (best_mate > 0) and (played_mate > 0):
if abs(played_mate) > abs(best_mate):
return "Inaccuracy"
return "Best"
if (best_mate < 0) and (played_mate < 0):
if abs(played_mate) < abs(best_mate):
return "Blunder"
return "Best" if abs(played_mate) == abs(best_mate) else "Good"
return "Blunder"
def _analyze_single_move(
ctx: AnalysisContext, board: chess.Board, move: chess.Move
) -> MoveAnalysis:
"""Analyze a single move and return analysis data."""
mover_white = board.turn
san = board.san(move)
best_move = _get_best_move(ctx.engine, board, ctx.limit, ctx.multipv)
best_san = board.san(best_move) if best_move is not None else "?"
board_played = board.copy()
board_played.push(move)
played_cp, played_mate = _evaluate_position(
ctx.engine, board_played, ctx.limit, ctx.multipv, pov_white=mover_white
)
if best_move is not None:
board_best = board.copy()
board_best.push(best_move)
best_cp, best_mate = _evaluate_position(
ctx.engine, board_best, ctx.limit, ctx.multipv, pov_white=mover_white
)
else:
best_cp, best_mate = None, None
cp_loss: int | None = None
if best_mate is not None or played_mate is not None:
classification = _classify_mate_move(best_mate, played_mate)
elif best_cp is not None and played_cp is not None:
cp_loss = max(0, best_cp - played_cp)
classification = classify_cp_loss(cp_loss)
else:
classification = "Unknown"
return MoveAnalysis(
san=san,
best_san=best_san,
played_cp=played_cp,
played_mate=played_mate,
best_cp=best_cp,
best_mate=best_mate,
cp_loss=cp_loss,
classification=classification,
)
def _log_move_analysis(ply: int, result: MoveAnalysis, *, mover_white: bool) -> None:
"""Log a single move's analysis result."""
side = "W" if mover_white else "B"
loss_str = str(result.cp_loss) if result.cp_loss is not None else ""
_logger.info(
f"{ply:>3} {side} {result.san:<8} "
f"{fmt_eval(result.played_cp, result.played_mate):>10} "
f"{fmt_eval(result.best_cp, result.best_mate):>9} "
f"{loss_str:>5} {result.classification:<12} {result.best_san}"
)
def _run_analysis(
game: chess.pgn.Game, ctx: AnalysisContext, *, last_move_only: bool
) -> None:
"""Run the move-by-move analysis."""
board = game.board()
_logger.info("Game:")
white = game.headers.get("White", "White")
@ -372,264 +560,63 @@ def main() -> None:
_logger.info(
"Columns: ply side move played_eval best_eval loss class best_suggestion"
)
# Brief performance summary (best-effort)
try:
thr_show = int(wanted_threads)
except (ValueError, TypeError):
thr_show = 1
try:
hash_show = (
int(engine.options.get("Hash").value)
if hasattr(engine, "options") and engine.options.get("Hash")
else None
)
except (AttributeError, TypeError, ValueError):
hash_show = None
if hash_show is not None:
_logger.info(
f"Using engine options: Threads={thr_show}, "
f"Hash={hash_show} MB, MultiPV={effective_mpv}"
)
if last_move_only:
_analyze_last_move(game, board, ctx)
else:
_logger.info(
f"Using engine options: Threads={thr_show}, MultiPV={effective_mpv}"
)
_analyze_all_moves(game, board, ctx)
def _analyze_last_move(
node: chess.pgn.Game, board: chess.Board, ctx: AnalysisContext
) -> None:
"""Walk to last move and analyze only that ply."""
if not node.variations:
_logger.warning("No moves found in the game.")
return
ply = 1
while node.variations:
move_node = node.variations[0]
move = move_node.move
if not move_node.variations:
result = _analyze_single_move(ctx, board, move)
_log_move_analysis(ply, result, mover_white=board.turn)
break
board.push(move)
node = move_node
ply += 1
def _analyze_all_moves(
node: chess.pgn.Game, board: chess.Board, ctx: AnalysisContext
) -> None:
"""Analyze all moves in the game."""
ply = 1
while node.variations:
move_node = node.variations[0]
move = move_node.move
mover_white = board.turn
result = _analyze_single_move(ctx, board, move)
_log_move_analysis(ply, result, mover_white=mover_white)
node = move_node
ply += 1
board.push(move)
def main() -> None:
"""Parse arguments and run chess game analysis."""
args = _build_argument_parser().parse_args()
game = _load_game(args.file)
engine, effective_mpv, limit = _setup_engine(args)
ctx = AnalysisContext(engine=engine, limit=limit, multipv=effective_mpv)
try:
node = game
if args.last_move_only:
# Walk to the last move in the main line and analyze only that ply.
if not node.variations:
_logger.warning("No moves found in the game.")
else:
while node.variations:
move_node = node.variations[0]
move = move_node.move
mover_white = board.turn
# If this is the final move in the mainline, analyze it and stop.
if not move_node.variations:
# Analyse current position to get engine best move suggestion
info_root_raw = engine.analyse(
board, limit=limit, multipv=effective_mpv
)
info_root = (
info_root_raw[0]
if isinstance(info_root_raw, list)
else info_root_raw
)
best_move = None
if (
info_root is not None
and "pv" in info_root
and info_root["pv"]
):
best_move = info_root["pv"][0]
if best_move is None:
res = engine.play(board, limit)
best_move = res.move
san = board.san(move)
# Evaluate played move
board_played = board.copy()
board_played.push(move)
info_played_raw = engine.analyse(
board_played, limit=limit, multipv=effective_mpv
)
info_played = (
info_played_raw[0]
if isinstance(info_played_raw, list)
else info_played_raw
)
if info_played is None or "score" not in info_played:
played_cp, played_mate = None, None
else:
played_cp, played_mate = score_to_cp(
info_played["score"], pov_white=mover_white
)
# Evaluate best move position (for mover POV)
best_san = (
board.san(best_move) if best_move is not None else "?"
)
if best_move is not None:
board_best = board.copy()
board_best.push(best_move)
info_best_raw = engine.analyse(
board_best, limit=limit, multipv=effective_mpv
)
info_best = (
info_best_raw[0]
if isinstance(info_best_raw, list)
else info_best_raw
)
if info_best is None or "score" not in info_best:
best_cp, best_mate = None, None
else:
best_cp, best_mate = score_to_cp(
info_best["score"], pov_white=mover_white
)
else:
best_cp, best_mate = None, None
# Compute loss/classification
cp_loss: int | None = None
classification = "Unknown"
if best_mate is not None or played_mate is not None:
if best_mate is not None and played_mate is not None:
if (best_mate > 0) and (played_mate > 0):
if abs(played_mate) == abs(best_mate):
classification = "Best"
elif abs(played_mate) > abs(best_mate):
classification = "Inaccuracy"
else:
classification = "Best"
elif (best_mate < 0) and (played_mate < 0):
if abs(played_mate) == abs(best_mate):
classification = "Best"
elif abs(played_mate) < abs(best_mate):
classification = "Blunder"
else:
classification = "Good"
else:
classification = "Blunder"
else:
classification = "Blunder"
elif best_cp is not None and played_cp is not None:
cp_loss = max(0, best_cp - played_cp)
classification = classify_cp_loss(cp_loss)
side = "W" if mover_white else "B"
_logger.info(
f"{ply:>3} {side} {san:<8} "
f"{fmt_eval(played_cp, played_mate):>10} "
f"{fmt_eval(best_cp, best_mate):>9} "
f"{(str(cp_loss) if cp_loss is not None else ''):>5} "
f"{classification:<12} {best_san}"
)
break
# Advance to keep searching for the last move
board.push(move)
node = move_node
ply += 1
else:
# Default behavior: analyze all moves
while node.variations:
move_node = node.variations[0]
move = move_node.move
mover_white = board.turn
# Analyse position to get engine best move suggestion
info_root_raw = engine.analyse(
board, limit=limit, multipv=effective_mpv
)
info_root = (
info_root_raw[0]
if isinstance(info_root_raw, list)
else info_root_raw
)
best_move = None
if info_root is not None and "pv" in info_root and info_root["pv"]:
best_move = info_root["pv"][0]
# Fallback to engine.play if PV missing
if best_move is None:
res = engine.play(board, limit)
best_move = res.move
# Evaluate played move position (for mover POV) using a temp board
san = board.san(move)
board_played = board.copy()
board_played.push(move)
info_played_raw = engine.analyse(
board_played, limit=limit, multipv=effective_mpv
)
info_played = (
info_played_raw[0]
if isinstance(info_played_raw, list)
else info_played_raw
)
if info_played is None or "score" not in info_played:
played_cp, played_mate = None, None
else:
played_cp, played_mate = score_to_cp(
info_played["score"], pov_white=mover_white
)
# Evaluate best move position (for mover POV)
best_san = board.san(best_move) if best_move is not None else "?"
if best_move is not None:
board_best = board.copy()
board_best.push(best_move)
info_best_raw = engine.analyse(
board_best, limit=limit, multipv=effective_mpv
)
info_best = (
info_best_raw[0]
if isinstance(info_best_raw, list)
else info_best_raw
)
if info_best is None or "score" not in info_best:
best_cp, best_mate = None, None
else:
best_cp, best_mate = score_to_cp(
info_best["score"], pov_white=mover_white
)
else:
best_cp, best_mate = None, None
# Compute centipawn loss bands
cp_loss: int | None = None
classification = "Unknown"
# Handle mate cases first
if best_mate is not None or played_mate is not None:
if best_mate is not None and played_mate is not None:
# Same sign -> compare speed
if (best_mate > 0) and (played_mate > 0):
# Keeping a mate: equal speed Best;
# slower -> Inaccuracy; faster -> Best
if abs(played_mate) == abs(best_mate):
classification = "Best"
elif abs(played_mate) > abs(best_mate):
classification = "Inaccuracy"
else:
classification = "Best"
elif (best_mate < 0) and (played_mate < 0):
# Defending: equal delay Best;
# sooner mate -> Blunder;
# if played delays more -> Good
if abs(played_mate) == abs(best_mate):
classification = "Best"
elif abs(played_mate) < abs(best_mate):
classification = "Blunder"
else:
classification = "Good"
else:
# Sign flip across who mates -> Blunder
classification = "Blunder"
else:
# Losing a forced mate or missing one
classification = "Blunder"
elif best_cp is not None and played_cp is not None:
cp_loss = max(0, best_cp - played_cp)
classification = classify_cp_loss(cp_loss)
side = "W" if mover_white else "B"
_logger.info(
f"{ply:>3} {side} {san:<8} "
f"{fmt_eval(played_cp, played_mate):>10} "
f"{fmt_eval(best_cp, best_mate):>9} "
f"{(str(cp_loss) if cp_loss is not None else ''):>5} "
f"{classification:<12} {best_san}"
)
node = move_node
ply += 1
# Advance the live board for the next ply
board.push(move)
_run_analysis(game, ctx, last_move_only=args.last_move_only)
finally:
engine.quit()