feat: added tests for lichess bot

This commit is contained in:
Krzysztof kuhy Rudnicki 2025-08-23 15:16:26 +02:00
parent 5461801043
commit 392a97085f
6 changed files with 507 additions and 77 deletions

7
.vscode/tasks.json vendored
View File

@ -9,6 +9,13 @@
"$pytest"
],
"group": "build"
},
{
"label": "pytest quick",
"type": "shell",
"command": "python -m pip install -r requirements.txt && pytest -q",
"isBackground": false,
"group": "build"
}
]
}

View File

@ -1 +1 @@
13
14

71
PYTHON/lichess_bot/run_tests.sh Executable file
View File

@ -0,0 +1,71 @@
#!/usr/bin/env bash
set -euo pipefail
# Directory of this script (lichess_bot module root)
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$ROOT_DIR"
# Try to detect repo root (two levels up from PYTHON/lichess_bot)
REPO_ROOT="$(cd "$ROOT_DIR/../.." 2>/dev/null && pwd)"
# Prefer Python 3 if available
if command -v python3 >/dev/null 2>&1; then
PY=python3
else
PY=python
fi
echo "[run_tests] Base Python: $($PY -c 'import sys; print(sys.executable)')"
# Create/use local virtual environment to avoid system-managed pip restrictions (PEP 668)
VENV_DIR="$ROOT_DIR/.venv"
if [[ ! -d "$VENV_DIR" ]]; then
echo "[run_tests] Creating virtual environment at $VENV_DIR"
$PY -m venv "$VENV_DIR"
fi
VENV_PY="$VENV_DIR/bin/python"
echo "[run_tests] Venv Python: $($VENV_PY -c 'import sys; print(sys.executable)')"
echo "[run_tests] Upgrading pip/setuptools/wheel"
"$VENV_PY" -m pip install --upgrade pip setuptools wheel >/dev/null
# Choose requirements file: prefer repo root, fallback to local
REQ_FILE=""
if [[ -f "$REPO_ROOT/requirements.txt" ]]; then
REQ_FILE="$REPO_ROOT/requirements.txt"
elif [[ -f "$ROOT_DIR/requirements.txt" ]]; then
REQ_FILE="$ROOT_DIR/requirements.txt"
fi
if [[ -n "$REQ_FILE" ]]; then
echo "[run_tests] Installing requirements from $REQ_FILE"
"$VENV_PY" -m pip install -r "$REQ_FILE"
else
echo "[run_tests] No requirements.txt found; proceeding without dependency install"
fi
# Ensure pytest is available in venv
if ! "$VENV_PY" -c "import pytest" >/dev/null 2>&1; then
echo "[run_tests] Installing pytest"
"$VENV_PY" -m pip install pytest
fi
# Make project importable (module root and repo root)
export PYTHONPATH="$ROOT_DIR:${REPO_ROOT:-$ROOT_DIR}:${PYTHONPATH:-}"
TEST_PATH_REL="PYTHON/lichess_bot/tests"
TEST_PATH_ABS="$REPO_ROOT/$TEST_PATH_REL"
if [[ ! -d "$TEST_PATH_ABS" ]]; then
# Fallback if script moved and relative layout differs
if [[ -d "$ROOT_DIR/tests" ]]; then
TEST_PATH_ABS="$ROOT_DIR/tests"
else
echo "[run_tests] Test directory not found (tried: $TEST_PATH_ABS and $ROOT_DIR/tests)." >&2
exit 1
fi
fi
echo "[run_tests] Running pytest for $TEST_PATH_ABS"
"$VENV_PY" -m pytest -q "$TEST_PATH_ABS" "$@"

View File

@ -0,0 +1,52 @@
import os
import sys
import chess
import pytest
# Ensure repo root is importable when running pytest directly
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if REPO_ROOT not in sys.path:
sys.path.insert(0, REPO_ROOT)
from PYTHON.lichess_bot.engine import RandomEngine # noqa: E402
BLUNDER_CASES = [
("r1bqk2r/ppp2ppp/2np1n2/2b5/2BPP3/5N2/PP3PPP/RNBQ1RK1 b kq - 0 7", "f6e4", "ply14_B_f6e4"),
("r2qk2r/pppb2pp/2n5/2p3B1/Q1B1p3/5N2/PP3PPP/R4RK1 b kq - 1 12", "e4f3", "ply24_B_e4f3"),
("r2Bk2r/pppb2pp/2n5/2p5/Q1B5/8/PP3PpP/R4RK1 w kq - 0 14", "g1g2", "ply27_W_g1g2"),
("r1k4r/pppb2pp/2n5/2p5/2B5/1Q6/PP3PKP/3R1R2 b - - 3 16", "g7g6", "ply32_B_g7g6"),
("rk5r/ppp4p/2n2Qp1/2p5/8/8/PP3PKP/3R1R2 b - - 2 19", "b7b5", "ply38_B_b7b5"),
("rk5r/p1p4p/2n2Qp1/1pp5/8/8/PP3PKP/3R1R2 w - - 0 20", "f6h8", "ply39_W_f6h8"),
("r7/1kp4p/2n2Qp1/ppp5/8/8/PP3PKP/3R1R2 w - - 0 22", "d1d6", "ply43_W_d1d6"),
("8/8/2k3pR/1p6/p1p5/8/PP3PKP/8 b - - 1 29", "c6d7", "ply58_B_c6d7"),
("4k3/8/6R1/1p6/p1p5/8/PP3PKP/8 w - - 1 31", "f2f4", "ply61_W_f2f4"),
("4k3/8/6R1/1p3P2/p1p5/8/PP4KP/8 w - - 1 33", "f5f6", "ply65_W_f5f6"),
("4k3/8/5PR1/1p6/p1p5/8/PP4KP/8 b - - 0 33", "e8d8", "ply66_B_e8d8"),
("5k2/5P1R/8/1p6/p1p4P/8/PP4K1/8 w - - 1 38", "h4h5", "ply75_W_h4h5"),
("5k2/5P1R/8/1p5P/p1p5/8/PP4K1/8 b - - 0 38", "f8e7", "ply76_B_f8e7"),
("5k2/5PR1/7P/1p6/p1p5/8/PP4K1/8 b - - 2 40", "c4c3", "ply80_B_c4c3"),
("5k2/5PR1/7P/1p6/p7/2P5/P5K1/8 b - - 0 41", "f8e7", "ply82_B_f8e7"),
("5Q2/6R1/8/1p1k3Q/p7/2P5/P5K1/8 b - - 2 45", "d5e6", "ply90_B_d5e6"),
("5Q2/6R1/4k3/1p5Q/p7/2P5/P5K1/8 w - - 3 46", "g7g6", "ply91_W_g7g6"),
("5Q2/3k4/6R1/1p5Q/p7/2P5/P5K1/8 w - - 5 47", "f8f7", "ply93_W_f8f7"),
("3k4/5Q2/6R1/1p5Q/p7/2P5/P5K1/8 w - - 7 48", "h5h8", "ply95_W_h5h8"),
]
@pytest.mark.parametrize('fen,blunder_uci,label', BLUNDER_CASES, ids=[c[2] for c in BLUNDER_CASES])
def test_engine_avoids_logged_blunder(fen, blunder_uci, label):
board = chess.Board(fen)
eng = RandomEngine(depth=4, max_time_sec=1.2)
# Prefer explanation variant if available for better failure messages
move = None
explanation = ''
if hasattr(eng, 'choose_move_with_explanation'):
try:
mv, expl = eng.choose_move_with_explanation(board, time_budget_sec=1.2)
move, explanation = mv, expl or ''
except Exception:
move = eng.choose_move(board)
else:
move = eng.choose_move(board)
assert move is not None, 'Engine returned no move'
assert move in board.legal_moves, 'Engine move is illegal'
assert move.uci() != blunder_uci, f'Engine repeated blunder {blunder_uci} at {label}. Explanation: {explanation}'

View File

@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""
Generate pytest cases from a lichess analysis log.
Input: a log file that contains a "Columns:" section and a "PGN:" section,
like the example the user provided. We'll extract each row where class==Blunder,
reconstruct the FEN of the position before the blunder, and the blunder move in
UCI. Then we write a parametrized pytest that asserts the engine does not pick
that same blunder move from that position.
Usage:
python PYTHON/lichess_bot/tools/generate_blunder_tests.py /path/to/lichess_bot_game_xxxxx.log
It will create a file like:
PYTHON/lichess_bot/tests/test_blunders_<gameid>.py
Dependencies: python-chess, pytest (already in requirements.txt)
"""
from __future__ import annotations
import io
import os
import re
import sys
from dataclasses import dataclass
from typing import List, Tuple
import chess
import chess.pgn
@dataclass
class Blunder:
ply: int
side: str # 'W' or 'B'
san: str # SAN of the played blunder
def parse_columns_for_blunders(text: str) -> List[Blunder]:
lines = text.splitlines()
# Find start of "Columns:" block
try:
idx = next(i for i, ln in enumerate(lines) if ln.strip().startswith("Columns:"))
except StopIteration:
return []
blunders: List[Blunder] = []
# Lines after header until a blank line or "PGN:" marker
for ln in lines[idx + 1:]:
if not ln.strip():
break
if ln.strip().startswith("PGN:"):
break
# Expect lines starting with a move number
if not re.match(r"^\s*\d+\s+", ln):
continue
# Split by 2+ spaces to get columns
parts = re.split(r"\s{2,}", ln.strip())
# Expected columns: ply, side, move, played_eval, best_eval, loss, class, best_suggestion
if len(parts) < 7:
continue
try:
ply = int(parts[0])
except ValueError:
continue
side = parts[1]
move_san = parts[2]
clazz = parts[6]
if clazz == "Blunder":
blunders.append(Blunder(ply=ply, side=side, san=move_san))
return blunders
def extract_pgn(text: str) -> str | None:
# Extract the PGN block after a line that is exactly 'PGN:' or starts with it
m = re.search(r"^PGN:\s*$", text, flags=re.M)
if not m:
return None
start = m.end()
pgn = text[start:].strip()
return pgn if pgn else None
def san_list_from_game(game: chess.pgn.Game) -> List[str]:
san_moves: List[str] = []
node = game
while node.variations:
node = node.variation(0)
san_moves.append(node.san())
return san_moves
def fen_and_uci_for_blunders(pgn_text: str, blunders: List[Blunder]) -> List[Tuple[str, str, Blunder]]:
game = chess.pgn.read_game(io.StringIO(pgn_text))
if game is None:
raise RuntimeError("Failed to parse PGN from log")
main_sans = san_list_from_game(game)
results: List[Tuple[str, str, Blunder]] = []
for bl in blunders:
# Reconstruct the board before this ply
board = game.board()
# plies are 1-based; apply moves up to ply-1
upto = max(0, bl.ply - 1)
for i in range(min(upto, len(main_sans))):
board.push_san(main_sans[i])
fen_before = board.fen()
# Parse the SAN blunder at this position to get UCI. If parse fails, skip.
try:
move = board.parse_san(bl.san)
except ValueError:
# Try to fall back to using the game's move at that ply if available
if bl.ply - 1 < len(main_sans):
try:
move = board.parse_san(main_sans[bl.ply - 1])
except Exception:
continue
else:
continue
results.append((fen_before, move.uci(), bl))
return results
def write_pytest(target_path: str, cases: List[Tuple[str, str, Blunder]], game_id: str):
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f:
f.write(
"""import os
import sys
import chess
import pytest
# Ensure repo root is importable when running pytest directly
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if REPO_ROOT not in sys.path:
sys.path.insert(0, REPO_ROOT)
from PYTHON.lichess_bot.engine import RandomEngine # noqa: E402
BLUNDER_CASES = [
"""
)
for fen, uci, bl in cases:
label = f"ply{bl.ply}_{'W' if bl.side=='W' else 'B'}_{uci}"
f.write(f" (\"{fen}\", \"{uci}\", \"{label}\"),\n")
f.write(
"]\n\n"
"@pytest.mark.parametrize('fen,blunder_uci,label', BLUNDER_CASES, ids=[c[2] for c in BLUNDER_CASES])\n"
"def test_engine_avoids_logged_blunder(fen, blunder_uci, label):\n"
" board = chess.Board(fen)\n"
" eng = RandomEngine(depth=4, max_time_sec=1.2)\n"
" # Prefer explanation variant if available for better failure messages\n"
" move = None\n"
" explanation = ''\n"
" if hasattr(eng, 'choose_move_with_explanation'):\n"
" try:\n"
" mv, expl = eng.choose_move_with_explanation(board, time_budget_sec=1.2)\n"
" move, explanation = mv, expl or ''\n"
" except Exception:\n"
" move = eng.choose_move(board)\n"
" else:\n"
" move = eng.choose_move(board)\n"
" assert move is not None, 'Engine returned no move'\n"
" assert move in board.legal_moves, 'Engine move is illegal'\n"
" assert move.uci() != blunder_uci, f'Engine repeated blunder {blunder_uci} at {label}. Explanation: {explanation}'\n"
)
print(f"Wrote {target_path} with {len(cases)} blunder checks (game {game_id}).")
def main(argv: List[str]) -> int:
if len(argv) < 2:
print("Usage: generate_blunder_tests.py /path/to/lichess_bot_game_xxx.log")
return 2
log_path = argv[1]
with open(log_path, "r", encoding="utf-8") as fh:
text = fh.read()
blunders = parse_columns_for_blunders(text)
if not blunders:
print("No blunders found in the log's Columns section.")
return 1
pgn_text = extract_pgn(text)
if not pgn_text:
print("No PGN section found in the log.")
return 1
cases = fen_and_uci_for_blunders(pgn_text, blunders)
if not cases:
print("Failed to reconstruct any blunder positions from PGN.")
return 1
# Try to derive game id from file name
base = os.path.basename(log_path)
m = re.search(r"game_([A-Za-z0-9]+)\.log$", base)
game_id = m.group(1) if m else os.path.splitext(base)[0]
target = os.path.join(os.path.dirname(__file__), "..", "tests", f"test_blunders_{game_id}.py")
target = os.path.abspath(target)
write_pytest(target, cases, game_id)
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv))

View File

@ -9,6 +9,7 @@ Usage:
[--threads auto|N]
[--hash-mb auto|MB]
[--multipv N]
[--last-move-only]
Notes:
- Requires python-chess. Install from PYTHON/stockfish_analysis/requirements.txt
@ -206,6 +207,8 @@ def main():
ap.add_argument("--hash-mb", type=_parse_hash_mb, default=None, metavar="auto|MB",
help="Hash table size in MB (default: auto = up to half RAM, capped)")
ap.add_argument("--multipv", type=int, default=2, help="Number of principal variations to compute (default: 1)")
ap.add_argument("--last-move-only", action="store_true",
help="Analyze only the last move of the main line (reports its eval and the best move)")
args = ap.parse_args()
if not os.path.isfile(args.file):
@ -322,92 +325,183 @@ def main():
ply = 1
try:
node = game
while node.variations:
move_node = node.variations[0]
move = move_node.move
mover_white = board.turn
# Analyse position to get engine best move suggestion
info_root_raw = engine.analyse(board, limit=limit, multipv=effective_mpv)
info_root = info_root_raw[0] if isinstance(info_root_raw, list) else info_root_raw
best_move = None
if info_root is not None and "pv" in info_root and info_root["pv"]:
best_move = info_root["pv"][0]
# Fallback to engine.play if PV missing
if best_move is None:
res = engine.play(board, limit)
best_move = res.move
# Evaluate played move position (for mover POV) using a temp board
san = board.san(move)
board_played = board.copy()
board_played.push(move)
info_played_raw = engine.analyse(board_played, limit=limit, multipv=effective_mpv)
info_played = info_played_raw[0] if isinstance(info_played_raw, list) else info_played_raw
if info_played is None or "score" not in info_played:
played_cp, played_mate = None, None
if args.last_move_only:
# Walk to the last move in the main line and analyze only that ply.
if not node.variations:
print("No moves found in the game.")
else:
played_cp, played_mate = score_to_cp(info_played["score"], pov_white=mover_white)
while node.variations:
move_node = node.variations[0]
move = move_node.move
mover_white = board.turn
# Evaluate best move position (for mover POV)
best_san = board.san(best_move) if best_move is not None else "?"
if best_move is not None:
board_best = board.copy()
board_best.push(best_move)
info_best_raw = engine.analyse(board_best, limit=limit, multipv=effective_mpv)
info_best = info_best_raw[0] if isinstance(info_best_raw, list) else info_best_raw
if info_best is None or "score" not in info_best:
best_cp, best_mate = None, None
# If this is the final move in the mainline, analyze it and stop.
if not move_node.variations:
# Analyse current position to get engine best move suggestion
info_root_raw = engine.analyse(board, limit=limit, multipv=effective_mpv)
info_root = info_root_raw[0] if isinstance(info_root_raw, list) else info_root_raw
best_move = None
if info_root is not None and "pv" in info_root and info_root["pv"]:
best_move = info_root["pv"][0]
if best_move is None:
res = engine.play(board, limit)
best_move = res.move
san = board.san(move)
# Evaluate played move
board_played = board.copy()
board_played.push(move)
info_played_raw = engine.analyse(board_played, limit=limit, multipv=effective_mpv)
info_played = info_played_raw[0] if isinstance(info_played_raw, list) else info_played_raw
if info_played is None or "score" not in info_played:
played_cp, played_mate = None, None
else:
played_cp, played_mate = score_to_cp(info_played["score"], pov_white=mover_white)
# Evaluate best move position (for mover POV)
best_san = board.san(best_move) if best_move is not None else "?"
if best_move is not None:
board_best = board.copy()
board_best.push(best_move)
info_best_raw = engine.analyse(board_best, limit=limit, multipv=effective_mpv)
info_best = info_best_raw[0] if isinstance(info_best_raw, list) else info_best_raw
if info_best is None or "score" not in info_best:
best_cp, best_mate = None, None
else:
best_cp, best_mate = score_to_cp(info_best["score"], pov_white=mover_white)
else:
best_cp, best_mate = None, None
# Compute loss/classification
cp_loss: Optional[int] = None
classification = "Unknown"
if best_mate is not None or played_mate is not None:
if best_mate is not None and played_mate is not None:
if (best_mate > 0) and (played_mate > 0):
if abs(played_mate) == abs(best_mate):
classification = "Best"
elif abs(played_mate) > abs(best_mate):
classification = "Inaccuracy"
else:
classification = "Best"
elif (best_mate < 0) and (played_mate < 0):
if abs(played_mate) == abs(best_mate):
classification = "Best"
elif abs(played_mate) < abs(best_mate):
classification = "Blunder"
else:
classification = "Good"
else:
classification = "Blunder"
else:
classification = "Blunder"
else:
if best_cp is not None and played_cp is not None:
cp_loss = max(0, best_cp - played_cp)
classification = classify_cp_loss(cp_loss)
side = "W" if mover_white else "B"
print(
f"{ply:>3} {side} {san:<8} {fmt_eval(played_cp, played_mate):>10} "
f"{fmt_eval(best_cp, best_mate):>9} "
f"{(str(cp_loss) if cp_loss is not None else ''):>5} {classification:<12} {best_san}"
)
break
# Advance to keep searching for the last move
board.push(move)
node = move_node
ply += 1
else:
# Default behavior: analyze all moves
while node.variations:
move_node = node.variations[0]
move = move_node.move
mover_white = board.turn
# Analyse position to get engine best move suggestion
info_root_raw = engine.analyse(board, limit=limit, multipv=effective_mpv)
info_root = info_root_raw[0] if isinstance(info_root_raw, list) else info_root_raw
best_move = None
if info_root is not None and "pv" in info_root and info_root["pv"]:
best_move = info_root["pv"][0]
# Fallback to engine.play if PV missing
if best_move is None:
res = engine.play(board, limit)
best_move = res.move
# Evaluate played move position (for mover POV) using a temp board
san = board.san(move)
board_played = board.copy()
board_played.push(move)
info_played_raw = engine.analyse(board_played, limit=limit, multipv=effective_mpv)
info_played = info_played_raw[0] if isinstance(info_played_raw, list) else info_played_raw
if info_played is None or "score" not in info_played:
played_cp, played_mate = None, None
else:
best_cp, best_mate = score_to_cp(info_best["score"], pov_white=mover_white)
else:
best_cp, best_mate = None, None
played_cp, played_mate = score_to_cp(info_played["score"], pov_white=mover_white)
# Compute centipawn loss bands
cp_loss: Optional[int] = None
classification = "Unknown"
# Handle mate cases first
if best_mate is not None or played_mate is not None:
if best_mate is not None and played_mate is not None:
# Same sign -> compare speed
if (best_mate > 0) and (played_mate > 0):
# Keeping a mate: equal speed Best; slower -> Inaccuracy; faster -> Best
if abs(played_mate) == abs(best_mate):
classification = "Best"
elif abs(played_mate) > abs(best_mate):
classification = "Inaccuracy"
else:
classification = "Best"
elif (best_mate < 0) and (played_mate < 0):
# Defending: equal delay Best; if played is sooner mate -> Blunder; if played delays more -> Good
if abs(played_mate) == abs(best_mate):
classification = "Best"
elif abs(played_mate) < abs(best_mate):
classification = "Blunder"
else:
classification = "Good"
# Evaluate best move position (for mover POV)
best_san = board.san(best_move) if best_move is not None else "?"
if best_move is not None:
board_best = board.copy()
board_best.push(best_move)
info_best_raw = engine.analyse(board_best, limit=limit, multipv=effective_mpv)
info_best = info_best_raw[0] if isinstance(info_best_raw, list) else info_best_raw
if info_best is None or "score" not in info_best:
best_cp, best_mate = None, None
else:
# Sign flip across who mates -> Blunder
best_cp, best_mate = score_to_cp(info_best["score"], pov_white=mover_white)
else:
best_cp, best_mate = None, None
# Compute centipawn loss bands
cp_loss: Optional[int] = None
classification = "Unknown"
# Handle mate cases first
if best_mate is not None or played_mate is not None:
if best_mate is not None and played_mate is not None:
# Same sign -> compare speed
if (best_mate > 0) and (played_mate > 0):
# Keeping a mate: equal speed Best; slower -> Inaccuracy; faster -> Best
if abs(played_mate) == abs(best_mate):
classification = "Best"
elif abs(played_mate) > abs(best_mate):
classification = "Inaccuracy"
else:
classification = "Best"
elif (best_mate < 0) and (played_mate < 0):
# Defending: equal delay Best; if played is sooner mate -> Blunder; if played delays more -> Good
if abs(played_mate) == abs(best_mate):
classification = "Best"
elif abs(played_mate) < abs(best_mate):
classification = "Blunder"
else:
classification = "Good"
else:
# Sign flip across who mates -> Blunder
classification = "Blunder"
else:
# Losing a forced mate or missing one
classification = "Blunder"
else:
# Losing a forced mate or missing one
classification = "Blunder"
else:
if best_cp is not None and played_cp is not None:
cp_loss = max(0, best_cp - played_cp)
classification = classify_cp_loss(cp_loss)
if best_cp is not None and played_cp is not None:
cp_loss = max(0, best_cp - played_cp)
classification = classify_cp_loss(cp_loss)
side = "W" if mover_white else "B"
print(
f"{ply:>3} {side} {san:<8} {fmt_eval(played_cp, played_mate):>10} "
f"{fmt_eval(best_cp, best_mate):>9} "
f"{(str(cp_loss) if cp_loss is not None else ''):>5} {classification:<12} {best_san}"
)
side = "W" if mover_white else "B"
print(
f"{ply:>3} {side} {san:<8} {fmt_eval(played_cp, played_mate):>10} "
f"{fmt_eval(best_cp, best_mate):>9} "
f"{(str(cp_loss) if cp_loss is not None else ''):>5} {classification:<12} {best_san}"
)
node = move_node
ply += 1
# Advance the live board for the next ply
board.push(move)
node = move_node
ply += 1
# Advance the live board for the next ply
board.push(move)
finally:
engine.quit()