feat: make the analyze chess game script use more pc resources

This commit is contained in:
Krzysztof kuhy Rudnicki 2025-08-22 22:30:47 +02:00
parent 8716b7213c
commit 5461801043
12 changed files with 338 additions and 13 deletions

32
.github/workflows/python-tests.yml vendored Normal file
View File

@ -0,0 +1,32 @@
name: Python tests
on:
push:
branches: [ main ]
paths:
- 'PYTHON/lichess_bot/**'
- 'PYTHON/**'
- 'tests/**'
- 'requirements.txt'
pull_request:
branches: [ main ]
paths:
- 'PYTHON/lichess_bot/**'
- 'PYTHON/**'
- 'tests/**'
- 'requirements.txt'
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run pytest
run: pytest -q

14
.vscode/tasks.json vendored Normal file
View File

@ -0,0 +1,14 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "pytest quick",
"type": "shell",
"command": "python -m pip install -r requirements.txt && pytest -q",
"problemMatcher": [
"$pytest"
],
"group": "build"
}
]
}

1
PYTHON/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Top-level package marker for Python modules in this repo."""

View File

@ -1 +1 @@
10
13

View File

@ -1 +1,2 @@
"""Package marker for lichess_bot."""
__all__ = []

View File

@ -195,9 +195,13 @@ class RandomEngine:
board.push(move)
score = -self._alphabeta(board, depth - 1, -beta, -alpha, start)
board.pop()
# Prefer lower-risk choices on score ties
if score > best_score:
best_score = score
best_move = move
elif best_move is not None and (score == best_score or abs(score - best_score) < 1e-3):
if self._risk_score(board, move) < self._risk_score(board, best_move):
best_move = move
if score > alpha:
alpha = score
if alpha >= beta:
@ -289,6 +293,17 @@ class RandomEngine:
early = self._is_early_game(board)
piece = board.piece_at(m.from_square)
if piece:
# Heuristic: demote unsound early bishop sacs on f2/f7
if early and self._is_bishop_sac_on_f2f7(board, m):
try:
see_sac = int(self._see_value(board, m))
except Exception:
see_sac = -300
# Large penalty if SEE is bad or not clearly winning material
if see_sac <= -50:
s -= 1300 # outweigh capture+check bonuses
else:
s -= 600
# Discourage premature queen adventures in the opening
if piece.piece_type == chess.QUEEN and early:
victim = board.piece_at(m.to_square)
@ -342,6 +357,33 @@ class RandomEngine:
from_file = chess.square_file(m.from_square)
from_rank = chess.square_rank(m.from_square)
to_rank = chess.square_rank(m.to_square)
# Bishop kick patterns (a6 vs Bb5, h6 vs Bg5, g6 vs Bf5)
if piece.color == chess.BLACK:
if m.from_square == chess.H7 and m.to_square == chess.H6:
tgt = board.piece_at(chess.G5)
if tgt and tgt.color != piece.color and tgt.piece_type == chess.BISHOP:
s += 130
if m.from_square == chess.A7 and m.to_square == chess.A6:
tgt = board.piece_at(chess.B5)
if tgt and tgt.color != piece.color and tgt.piece_type == chess.BISHOP:
s += 120
if m.from_square == chess.G7 and m.to_square == chess.G6:
tgt = board.piece_at(chess.F5)
if tgt and tgt.color != piece.color and tgt.piece_type == chess.BISHOP:
s += 90
else:
if m.from_square == chess.H2 and m.to_square == chess.H3:
tgt = board.piece_at(chess.G4)
if tgt and tgt.color != piece.color and tgt.piece_type == chess.BISHOP:
s += 130
if m.from_square == chess.A2 and m.to_square == chess.A3:
tgt = board.piece_at(chess.B4)
if tgt and tgt.color != piece.color and tgt.piece_type == chess.BISHOP:
s += 120
if m.from_square == chess.G2 and m.to_square == chess.G3:
tgt = board.piece_at(chess.F4)
if tgt and tgt.color != piece.color and tgt.piece_type == chess.BISHOP:
s += 90
# Discourage early f-pawn push and also random wing pawn thrusts like a/b/g/h
if from_file == 5:
if piece.color == chess.WHITE and from_rank == 1 and to_rank == 2:
@ -629,3 +671,39 @@ class RandomEngine:
pen_black += val
# Convert to white-centric score
return pen_white - pen_black
# --- Risk/Pattern helpers ---
def _is_bishop_sac_on_f2f7(self, board: chess.Board, move: chess.Move) -> bool:
pc = board.piece_at(move.from_square)
if not pc or pc.piece_type != chess.BISHOP:
return False
# Only consider captures of the f-pawn on its home square
target = chess.F2 if pc.color == chess.BLACK else chess.F7
if move.to_square != target:
return False
if not board.is_capture(move):
return False
victim = board.piece_at(move.to_square)
if not victim or victim.piece_type != chess.PAWN:
return False
# Typically it's tempting because it's check; if not a check, still likely bad
try:
is_check = board.gives_check(move)
except Exception:
is_check = False
return True
def _risk_score(self, board: chess.Board, move: chess.Move) -> int:
"""Lower is safer. Positive values indicate tactical/material risk for the mover."""
risk = 0
# Negative SEE means we may be losing material on this move
try:
see = int(self._see_value(board, move))
except Exception:
see = 0
if see < 0:
risk += -see
# Extra risk for early bishop sac on f2/f7
if self._is_early_game(board) and self._is_bishop_sac_on_f2f7(board, move):
risk += 600
return risk

View File

@ -128,7 +128,9 @@ def run_bot(log_level: str = "INFO", decline_correspondence: bool = False) -> No
if color is None:
logging.info(f"Game {game_id}: color unknown yet; waiting for gameFull")
last_handled_len = new_len
# Do not mark this position handled on gameFull; wait for authoritative gameState
if et == "gameState":
last_handled_len = new_len
continue
is_white_turn = board.turn
@ -166,8 +168,12 @@ def run_bot(log_level: str = "INFO", decline_correspondence: bool = False) -> No
api.make_move(game_id, move)
except Exception as e:
logging.warning(f"Game {game_id}: move {move.uci()} failed: {e}")
# Mark this position as handled (whether or not we moved)
last_handled_len = new_len
# Mark this position as handled only on authoritative gameState,
# or after we've actually attempted a move. This prevents missing
# our very first move as White when gameFull (len=0) is followed by
# gameState (len=0).
if et == "gameState" or (my_turn and allow_move):
last_handled_len = new_len
if status in {"mate", "resign", "stalemate", "timeout", "draw"}:
logging.info(f"Game {game_id} finished: {status}")
break

View File

@ -3,12 +3,18 @@
Analyze a chess game's moves using a local Stockfish engine and rate each move.
Usage:
python3 PYTHON/analyze_chess_game.py <path-to-file> [--engine stockfish] [--time 0.2 | --depth 12]
python3 PYTHON/analyze_chess_game.py <path-to-file>
[--engine stockfish]
[--time 0.5 | --depth 20]
[--threads auto|N]
[--hash-mb auto|MB]
[--multipv N]
Notes:
- Requires python-chess. Install from PYTHON/stockfish_analysis/requirements.txt
- The input file can be a pure PGN or a log file containing a PGN section.
- The script tries to locate the PGN by looking for a 'PGN:' marker, PGN tags '[...]', or a move list starting with '1.'.
- Requires python-chess. Install from PYTHON/stockfish_analysis/requirements.txt
- The input file can be a pure PGN or a log file containing a PGN section.
- The script tries to locate the PGN by looking for a 'PGN:' marker, PGN tags '[...]', or a move list starting with '1.'.
- Stockfish is CPU-based; it doesn't use GPU VRAM. "Full power" here means using many CPU threads and a large transposition table (Hash).
"""
from __future__ import annotations
@ -19,6 +25,12 @@ import os
import re
import sys
from typing import Optional, Tuple
import multiprocessing
try:
import psutil # type: ignore
except Exception: # pragma: no cover - optional dependency; we fall back if unavailable
psutil = None # type: ignore
try:
import chess
@ -117,13 +129,83 @@ def fmt_eval(cp: Optional[int], mate_in: Optional[int]) -> str:
return f"{cp/100.0:+.2f}"
def _parse_threads(value: str) -> Optional[int]:
v = value.strip().lower()
if v in ("auto", "max", ""): # auto-detect
return None
try:
n = int(v)
return max(1, n)
except ValueError:
raise argparse.ArgumentTypeError("--threads must be an integer or 'auto'")
def _parse_hash_mb(value: str) -> Optional[int]:
v = value.strip().lower()
if v in ("auto", "max", ""): # auto-detect
return None
try:
mb = int(v)
return max(16, mb)
except ValueError:
raise argparse.ArgumentTypeError("--hash-mb must be an integer (MB) or 'auto'")
def _detect_total_mem_mb() -> Optional[int]:
# Prefer psutil if available
if psutil is not None:
try:
return int(psutil.virtual_memory().total // (1024 * 1024))
except Exception:
pass
# Fallback: Linux /proc/meminfo
try:
with open("/proc/meminfo", "r", encoding="utf-8", errors="ignore") as f:
for line in f:
if line.startswith("MemTotal:"):
parts = line.split()
if len(parts) >= 2 and parts[1].isdigit():
# Value is in kB
kb = int(parts[1])
return kb // 1024
except Exception:
pass
return None
def _auto_hash_mb(threads_wanted: int, engine_options) -> int:
total_mb = _detect_total_mem_mb() or 2048
# Heuristic: cap at 4 GiB by default; keep at most half of RAM; ensure >= 64MB
half_ram = max(64, total_mb // 2)
target = half_ram
# Respect engine "Hash" max if exposed
opt = engine_options.get("Hash")
max_allowed = None
try:
max_allowed = getattr(opt, "max") if opt is not None else None
except Exception:
max_allowed = None
if isinstance(max_allowed, int):
target = min(target, max_allowed)
# Some rough scaling: if very many threads, give a bit more (but not huge)
if threads_wanted >= 16:
target = min(target + 1024, (total_mb * 3) // 4)
return max(64, int(target))
def main():
ap = argparse.ArgumentParser(description="Analyze a chess game's moves with Stockfish and rate each move.")
ap.add_argument("file", help="Path to a PGN file or a log containing a PGN section")
ap.add_argument("--engine", default="stockfish", help="Path to stockfish executable (default: stockfish)")
# Exactly one of time or depth may be provided; default to time
ap.add_argument("--time", type=float, default=4, help="Analysis time per evaluation in seconds (default: 0.2)")
ap.add_argument("--time", type=float, default=0.5, help="Analysis time per evaluation in seconds (default: 0.5)")
ap.add_argument("--depth", type=int, default=None, help="Fixed depth per evaluation (overrides --time)")
# Performance knobs
ap.add_argument("--threads", type=_parse_threads, default=None, metavar="auto|N",
help="Engine threads to use (default: auto = all logical cores)")
ap.add_argument("--hash-mb", type=_parse_hash_mb, default=None, metavar="auto|MB",
help="Hash table size in MB (default: auto = up to half RAM, capped)")
ap.add_argument("--multipv", type=int, default=2, help="Number of principal variations to compute (default: 1)")
args = ap.parse_args()
if not os.path.isfile(args.file):
@ -151,6 +233,64 @@ def main():
print("Ensure Stockfish is installed and in PATH, or specify with --engine.", file=sys.stderr)
sys.exit(4)
# Configure engine performance options if available
try:
options = engine.options # type: ignore[attr-defined]
except Exception:
options = {}
# Threads
wanted_threads = args.threads if args.threads is not None else (multiprocessing.cpu_count() or 1)
# Respect engine bounds if present
if "Threads" in options:
try:
max_thr = getattr(options["Threads"], "max", None)
min_thr = getattr(options["Threads"], "min", 1)
if isinstance(max_thr, int):
wanted_threads = min(wanted_threads, max_thr)
if isinstance(min_thr, int):
wanted_threads = max(wanted_threads, min_thr)
engine.configure({"Threads": int(wanted_threads)})
except Exception:
pass
# Hash (MB)
if "Hash" in options:
try:
if args.hash_mb is not None:
target_hash = int(args.hash_mb)
else:
target_hash = _auto_hash_mb(int(wanted_threads), options)
# Respect bounds
max_hash = getattr(options["Hash"], "max", None)
min_hash = getattr(options["Hash"], "min", 16)
if isinstance(max_hash, int):
target_hash = min(target_hash, max_hash)
if isinstance(min_hash, int):
target_hash = max(target_hash, min_hash)
engine.configure({"Hash": int(target_hash)})
except Exception:
pass
# MultiPV
effective_mpv = max(1, int(args.multipv))
if "MultiPV" in options:
try:
max_mpv = getattr(options["MultiPV"], "max", None)
if isinstance(max_mpv, int):
effective_mpv = min(effective_mpv, max_mpv)
engine.configure({"MultiPV": int(effective_mpv)})
except Exception:
pass
# Enable NNUE if the option exists
for nnue_key in ("Use NNUE", "UseNNUE"):
if nnue_key in options:
try:
engine.configure({nnue_key: True})
except Exception:
pass
limit: chess.engine.Limit
if args.depth is not None:
limit = chess.engine.Limit(depth=args.depth)
@ -165,6 +305,19 @@ def main():
print(f" {white} vs {black} Result: {result}")
print()
print("Columns: ply side move played_eval best_eval loss class best_suggestion")
# Brief performance summary (best-effort)
try:
thr_show = int(wanted_threads)
except Exception:
thr_show = 1
try:
hash_show = int(engine.options.get("Hash").value) if hasattr(engine, "options") and engine.options.get("Hash") else None
except Exception:
hash_show = None
if hash_show is not None:
print(f"Using engine options: Threads={thr_show}, Hash={hash_show} MB, MultiPV={effective_mpv}")
else:
print(f"Using engine options: Threads={thr_show}, MultiPV={effective_mpv}")
ply = 1
try:
@ -175,7 +328,7 @@ def main():
mover_white = board.turn
# Analyse position to get engine best move suggestion
info_root_raw = engine.analyse(board, limit=limit, multipv=1)
info_root_raw = engine.analyse(board, limit=limit, multipv=effective_mpv)
info_root = info_root_raw[0] if isinstance(info_root_raw, list) else info_root_raw
best_move = None
if info_root is not None and "pv" in info_root and info_root["pv"]:
@ -189,7 +342,7 @@ def main():
san = board.san(move)
board_played = board.copy()
board_played.push(move)
info_played_raw = engine.analyse(board_played, limit=limit, multipv=1)
info_played_raw = engine.analyse(board_played, limit=limit, multipv=effective_mpv)
info_played = info_played_raw[0] if isinstance(info_played_raw, list) else info_played_raw
if info_played is None or "score" not in info_played:
played_cp, played_mate = None, None
@ -201,7 +354,7 @@ def main():
if best_move is not None:
board_best = board.copy()
board_best.push(best_move)
info_best_raw = engine.analyse(board_best, limit=limit, multipv=1)
info_best_raw = engine.analyse(board_best, limit=limit, multipv=effective_mpv)
info_best = info_best_raw[0] if isinstance(info_best_raw, list) else info_best_raw
if info_best is None or "score" not in info_best:
best_cp, best_mate = None, None

View File

@ -1 +1,2 @@
python-chess>=1.999
python-chess>=1.999
psutil>=5.9

0
PYTHON/stockfish_analysis/run.sh Normal file → Executable file
View File

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
python-chess>=1.999
pytest>=7.0

View File

@ -0,0 +1,37 @@
import os
import sys
import chess
# Ensure repo root in path for 'PYTHON' package imports when running locally
REPO_ROOT = os.path.dirname(os.path.abspath(__file__ + "/.."))
PARENT = os.path.dirname(REPO_ROOT)
if PARENT not in sys.path:
sys.path.insert(0, PARENT)
from PYTHON.lichess_bot.engine import RandomEngine # noqa: E402
def position_after_italian_bg5():
# 1.e4 e5 2.Nf3 Nc6 3.Bc4 Nf6 4.d3 Bc5 5.O-O d6 6.Bg5
moves = [
"e4", "e5", "Nf3", "Nc6", "Bc4", "Nf6", "d3", "Bc5", "O-O", "d6", "Bg5"
]
board = chess.Board()
for san in moves:
board.push_san(san)
return board
def test_engine_avoids_unsound_bxf2_in_italian_bg5():
board = position_after_italian_bg5()
eng = RandomEngine(depth=4, max_time_sec=1.5)
move, expl = eng.choose_move_with_explanation(board, time_budget_sec=1.5)
# The engine should avoid Bxf2+ here (known blunder); assert chosen move isn't that
bxf2 = chess.Move.from_uci("c5f2")
assert move != bxf2, f"Engine picked unsound Bxf2+: {expl}"
# Also ensure Bxf2+ is not the top candidate in analyze list for small depth
# We can do a weaker invariant: if the engine considered only a couple moves earlier,
# ensure current top move either equals move or is not Bxf2.
assert move is not None
assert move is None or move != bxf2