diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 27e1a88..d73c1d2 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -23,6 +23,12 @@ "command": "python -m pip install -r requirements.txt && pytest -q", "isBackground": false, "group": "build" + }, + { + "label": "pytest quick", + "type": "shell", + "command": "python -m pip install -r requirements.txt && pytest -q", + "group": "build" } ] } \ No newline at end of file diff --git a/C/lichess_random_engine/Makefile b/C/lichess_random_engine/Makefile new file mode 100644 index 0000000..e335ba6 --- /dev/null +++ b/C/lichess_random_engine/Makefile @@ -0,0 +1,18 @@ +CC := gcc +CFLAGS := -O2 -std=c11 -Wall -Wextra -Wno-unused-parameter +LDFLAGS := + +SRC := main.c +BIN := random_engine + +.PHONY: all clean rebuild + +all: $(BIN) + +$(BIN): $(SRC) + $(CC) $(CFLAGS) -o $@ $^ $(LDFLAGS) + +clean: + rm -f $(BIN) + +rebuild: clean all diff --git a/C/lichess_random_engine/main.c b/C/lichess_random_engine/main.c new file mode 100644 index 0000000..a62ec40 --- /dev/null +++ b/C/lichess_random_engine/main.c @@ -0,0 +1,217 @@ +// Heuristic engine with optional explanation and analysis output +// Usage: +// random_engine [--seed N] [--explain] [--analyze UCI] ... +// Behavior: +// - If a move is annotated as 'uci;key=value;...' the engine parses features and +// computes a heuristic score. Recognized keys: chk (0/1), c (capture cp), prom (cp gain), +// mat (cp delta), mate (0/1). +// - Otherwise, assigns a pseudo-random score using the seed. +// - Picks the highest-scoring move +// - Default output: prints chosen move +// - With --explain: prints JSON including scores, chosen index, seed, and optional analysis of a provided candidate + +#include +#include +#include +#include +#include + +static unsigned int parse_seed_or_default(int *pargc, char ***pargv) { + unsigned int seed = (unsigned int)time(NULL) ^ (unsigned int)getpid(); + int argc = *pargc; + char **argv = *pargv; + for (int i = 1; i < argc; ++i) { + if (strcmp(argv[i], "--seed") == 0 && i + 1 < argc) { + seed = (unsigned int)strtoul(argv[i + 1], NULL, 10); + // remove the two args + for (int j = i; j + 2 < argc; ++j) argv[j] = argv[j + 2]; + *pargc -= 2; + return seed; + } + } + return seed; +} + +typedef struct { + const char *arg_raw; // original argument string + char uci[16]; // extracted UCI (up to 7-8 chars normally) + int has_anno; // whether annotations were present + // parsed features + int chk; // 0/1 + int mate; // 0/1 + double cap_cp; // capture centipawns + double prom_cp; // promotion centipawns + double mat_cp; // material delta centipawns + double score; // computed score +} MoveInfo; + +static void parse_move_spec(const char *spec, MoveInfo *mi) { + // Copy UCI up to ';' or end + mi->arg_raw = spec; + mi->uci[0] = '\0'; + mi->has_anno = 0; + mi->chk = 0; + mi->mate = 0; + mi->cap_cp = 0.0; + mi->prom_cp = 0.0; + mi->mat_cp = 0.0; + mi->score = 0.0; + + const char *semi = strchr(spec, ';'); + size_t uci_len = semi ? (size_t)(semi - spec) : strlen(spec); + if (uci_len >= sizeof(mi->uci)) uci_len = sizeof(mi->uci) - 1; + memcpy(mi->uci, spec, uci_len); + mi->uci[uci_len] = '\0'; + + if (!semi) return; + mi->has_anno = 1; + const char *p = semi + 1; + while (*p) { + // key=value; segments + const char *kv_end = strchr(p, ';'); + size_t len = kv_end ? (size_t)(kv_end - p) : strlen(p); + if (len > 0) { + // Parse known keys: chk, mate, c, prom, mat + if (strncmp(p, "chk=", 4) == 0) { + mi->chk = atoi(p + 4); + } else if (strncmp(p, "mate=", 5) == 0) { + mi->mate = atoi(p + 5); + } else if (strncmp(p, "c=", 2) == 0) { + mi->cap_cp = atof(p + 2); + } else if (strncmp(p, "prom=", 6) == 0) { + mi->prom_cp = atof(p + 6); + } else if (strncmp(p, "mat=", 4) == 0) { + mi->mat_cp = atof(p + 4); + } + } + if (!kv_end) break; + p = kv_end + 1; + } +} + +static double heuristic_score(const MoveInfo *mi, unsigned int seed_state) { + // Weighted score from features; add tiny noise from seed to break ties + double s = 0.0; + if (mi->mate) s += 100000.0; // winning immediately trumps all + if (mi->chk) s += 50.0; // modest bonus for checks + s += 1.0 * mi->cap_cp; // prioritize raw capture value + s += 1.2 * mi->prom_cp; // promotions are very strong + s += 2.0 * mi->mat_cp; // overall material delta dominates + // tiny deterministic jitter from seed + double jitter = (double)(seed_state % 1000) / 1000000.0; // up to 0.001 + return s + jitter; +} + +int main(int argc, char **argv) { + if (argc <= 1) { + fprintf(stderr, "usage: %s [--seed N] [--explain] [--analyze UCI] ...\n", argv[0]); + return 1; + } + + // Extract seed first (if any) + unsigned int seed = parse_seed_or_default(&argc, &argv); + srand(seed); + + // Parse flags --explain and --analyze UCI + int explain = 0; + const char *analyze_uci = NULL; + for (int i = 1; i < argc; ++i) { + if (strcmp(argv[i], "--explain") == 0) { + explain = 1; + for (int j = i; j + 1 < argc; ++j) argv[j] = argv[j + 1]; + argc -= 1; + i -= 1; + } else if (strcmp(argv[i], "--analyze") == 0 && i + 1 < argc) { + analyze_uci = argv[i + 1]; + for (int j = i; j + 2 < argc; ++j) argv[j] = argv[j + 2]; + argc -= 2; + i -= 1; + } + } + + if (argc <= 1) { + fprintf(stderr, "no moves provided\n"); + return 1; + } + + // Remaining args are moves + int n = argc - 1; + char **moves = &argv[1]; + + // Parse move specs + MoveInfo *info = (MoveInfo *)malloc(sizeof(MoveInfo) * (size_t)n); + if (!info) { + fprintf(stderr, "alloc failed\n"); + return 1; + } + for (int i = 0; i < n; ++i) { + parse_move_spec(moves[i], &info[i]); + if (info[i].has_anno) { + // compute heuristic score + unsigned int local = seed ^ (unsigned int)i * 2654435761u; + info[i].score = heuristic_score(&info[i], local); + } else { + // fallback: random score + info[i].score = (double)rand() / (double)RAND_MAX; + } + } + + double best_score = -1e300; + int best_idx = -1; + for (int i = 0; i < n; ++i) { + if (info[i].score > best_score) { + best_score = info[i].score; + best_idx = i; + } + } + + if (best_idx < 0) { + free(info); + fprintf(stderr, "no moves\n"); + return 1; + } + + if (!explain) { + printf("%s\n", info[best_idx].uci); + free(info); + return 0; + } + + // JSON explanation output + printf("{\n"); + printf(" \"seed\": %u,\n", seed); + printf(" \"n\": %d,\n", n); + printf(" \"moves\": ["); + for (int i = 0; i < n; ++i) { + printf("\"%s\"%s", info[i].uci, (i + 1 < n ? ", " : "")); + } + printf("],\n"); + printf(" \"scores\": ["); + for (int i = 0; i < n; ++i) { + printf("%.6f%s", info[i].score, (i + 1 < n ? ", " : "")); + } + printf("],\n"); + printf(" \"chosen_index\": %d,\n", best_idx); + printf(" \"chosen_move\": \"%s\"", info[best_idx].uci); + + if (analyze_uci) { + int cand_idx = -1; + for (int i = 0; i < n; ++i) { + if (strcmp(info[i].uci, analyze_uci) == 0) { cand_idx = i; break; } + } + double cand_score = (cand_idx >= 0 ? info[cand_idx].score : -1.0); + const char *cmp = "unknown"; + if (cand_idx >= 0) { + cmp = (cand_score > best_score ? "higher" : (cand_score < best_score ? "lower" : "equal")); + } + printf( + ",\n \"analyze\": { \"candidate\": \"%s\", \"candidate_index\": %d, \"candidate_score\": %.6f, \"compare_to_chosen\": \"%s\" }\n", + analyze_uci, cand_idx, cand_score, cmp + ); + } else { + printf("\n"); + } + printf("}\n"); + free(info); + return 0; +} diff --git a/PYTHON/lichess_bot/.bot_version b/PYTHON/lichess_bot/.bot_version index 1758ddd..3e932fe 100644 --- a/PYTHON/lichess_bot/.bot_version +++ b/PYTHON/lichess_bot/.bot_version @@ -1 +1 @@ -32 \ No newline at end of file +34 \ No newline at end of file diff --git a/PYTHON/lichess_bot/engine.py b/PYTHON/lichess_bot/engine.py index 327e1f1..e7551c9 100644 --- a/PYTHON/lichess_bot/engine.py +++ b/PYTHON/lichess_bot/engine.py @@ -1,1259 +1,141 @@ -import time -import random +import os +import shutil +import subprocess +import logging from typing import Optional, Tuple import chess class RandomEngine: - """A simple engine with a tiny alpha-beta search and material+mobility eval. + """ + Thin wrapper around the C engine in C/lichess_random_engine/random_engine. - Keeps the same name for compatibility, but no longer picks purely random moves. + Contract: + - Given a chess.Board, call the C binary with all legal moves encoded as + UCI (with optional annotations in the future). The binary prints the + chosen move's UCI on stdout (or JSON when --explain, which we don't need). + - We do not compute or rank anything in Python; we just pass through moves + and play exactly what the engine returns. + - If the binary is missing or returns an invalid/illegal move, raise. """ - def __init__(self, depth: int = 100, max_time_sec: float = 20): - self.depth = depth + def __init__(self, *, engine_path: Optional[str] = None, max_time_sec: float = 2.0, depth: Optional[int] = None): self.max_time_sec = max_time_sec - - # Centipawn values - self.piece_values = { - chess.PAWN: 100, - chess.KNIGHT: 320, - chess.BISHOP: 330, - chess.ROOK: 500, - chess.QUEEN: 900, - chess.KING: 0, - } - - # Tiny hand-crafted opening book (UCIs); used only for the first few plies - # Keys are tuples of UCI moves played so far from the starting position - self.opening_book: dict[tuple[str, ...], list[str]] = { - # As White (start position) - tuple(): ["e2e4", "d2d4", "c2c4", "g1f3"], - # As Black after 1.e4 - ("e2e4",): ["e7e5", "c7c5", "e7e6", "c7c6", "d7d6", "g8f6", "d7d5"], - # As Black after 1.d4 - ("d2d4",): ["d7d5", "g8f6", "e7e6", "c7c5", "c7c6"], - # As Black after 1.c4 - ("c2c4",): ["e7e5", "g8f6", "c7c5", "e7e6"], - # As Black after 1.Nf3 - ("g1f3",): ["g8f6", "d7d5", "c7c5", "e7e6"], - # A couple continuations to avoid silly early queen/rook moves - ("e2e4", "e7e5"): ["g1f3", "f1c4", "f1b5", "d2d4"], - ("e2e4", "c7c5"): ["g1f3", "d2d4", "c2c3", "b1c3"], - ("d2d4", "d7d5"): ["c2c4", "g1f3", "e2e3"], - ("d2d4", "g8f6"): ["c2c4", "g1f3", "e2e3"], - - # --- More specific continuations to steer sensible early play --- - # 1.e4 e5 2.Nf3 (Black to move) - ("e2e4", "e7e5", "g1f3"): ["b8c6", "g8f6", "f8c5", "d7d6"], - # Italian: 1.e4 e5 2.Nf3 Nc6 3.Bc4 (Black to move) - ("e2e4", "e7e5", "g1f3", "b8c6", "f1c4"): ["g8f6", "f8c5", "d7d6"], - # Ruy Lopez: 1.e4 e5 2.Nf3 Nc6 3.Bb5 (Black to move) - ("e2e4", "e7e5", "g1f3", "b8c6", "f1b5"): ["a7a6", "g8f6", "f8c5", "d7d6"], - # Scotch: 1.e4 e5 2.Nf3 Nc6 3.d4 (Black to move) - ("e2e4", "e7e5", "g1f3", "b8c6", "d2d4"): ["e5d4", "g8f6"], - # Queen's Gambit: 1.d4 d5 2.c4 (Black to move) - ("d2d4", "d7d5", "c2c4"): ["e7e6", "c7c6", "d5c4"], - # English: 1.c4 e5 2.Nc3 (Black to move) - ("c2c4", "e7e5", "b1c3"): ["g8f6", "b8c6"], - # Alekhine Defence: 1.e4 Nf6 – avoid 2.d4 hanging e4; prefer 2.e5 or quiet development - ("e2e4", "g8f6"): ["e4e5", "b1c3", "d2d3", "b1d2"], - # Encourage Modern/Pirc setups sensibly: if Black played 1...d6 or 1...g6, develop Bg7 before ...Nf6 - ("e2e4", "d7d6"): ["d2d4", "g1f3", "b1c3"], - ("e2e4", "g7g6"): ["d2d4", "g1f3", "c2c4"], - ("e2e4", "g7g6", "d2d4"): ["f8g7", "d7d6", "c7c5"], - ("e2e4", "g7g6", "d2d4", "f8g7"): ["g1f3", "c2c4", "b1c3"], - ("e2e4", "d7d6", "d2d4"): ["g8f6", "g7g6", "c7c5"], - } - - def _log_considered(self, board: chess.Board, scored: list[Tuple[chess.Move, float]]) -> None: - """Print a single-line summary of considered root moves with their scores. - - Example: considered: e4 (e2e4) +30.0; Nf3 (g1f3) +15.0; d4 (d2d4) +12.0 - """ - if not scored: - return - parts: list[str] = [] - for mv, sc in scored: - try: - san = board.san(mv) - except Exception: - san = '' - parts.append(f"{san} ({mv.uci()}) {sc:+.1f}") - print("considered: " + "; ".join(parts)) - - def choose_move(self, board: chess.Board, time_budget_sec: Optional[float] = None) -> Optional[chess.Move]: - start = time.time() - best_move: Optional[chess.Move] = None - # Set a per-move deadline used throughout search - time_limit = time_budget_sec if time_budget_sec is not None else self.max_time_sec - self._deadline = start + max(0.01, time_limit) - best_score = -float("inf") if board.turn else float("inf") - last_depth_used = 0 - - # Opening book shortcut (very early only) - book_mv = self._opening_book_move(board) - if book_mv is not None: - return book_mv - - # Iterative deepening up to depth or time limit - for d in range(1, self.depth + 1): - if time.time() >= self._deadline: - break - score, move = self._search_root(board, d, start) - if move is not None: - best_move, best_score = move, score - last_depth_used = d - - # Log considered moves with scores (no detailed breakdown) using the last completed depth - if last_depth_used > 0 and time.time() < self._deadline: - try: - scored = self._analyze_root(board, last_depth_used, start) - self._log_considered(board, scored) - except Exception: - pass - - # Fallback to random if search didn’t find anything - if best_move is None: - moves = list(board.legal_moves) - return random.choice(moves) if moves else None - return best_move - - def choose_move_with_explanation(self, board: chess.Board, time_budget_sec: Optional[float] = None) -> Tuple[Optional[chess.Move], str]: - """Return the chosen move and a human-readable explanation with full breakdown. - - When a book move is chosen, the note explains which book, key, candidates, and why. - When search is used, includes depth, time, node count, top candidates, and for the - selected move a numeric breakdown of evaluation components and risk/SEE details. - """ - start = time.time() - # Set a per-move deadline used throughout search - time_limit = time_budget_sec if time_budget_sec is not None else self.max_time_sec - self._deadline = start + max(0.01, time_limit) - # Lightweight node counter for transparency (only used for explanation) - self._nodes = 0 - depth_used = 0 - best_move: Optional[chess.Move] = None - scores: list[Tuple[chess.Move, float]] = [] - - # Opening book shortcut - book_mv = self._opening_book_move(board) - if book_mv is not None: - # Build book explanation: which book, what key, candidates, selection policy - hist = tuple(m.uci() for m in board.move_stack) - key_used: Optional[tuple[str, ...]] = None - candidates: list[str] = [] - legal_ucis: list[str] = [] - legals = {m.uci(): m for m in board.legal_moves} - for klen in range(len(hist), -1, -1): - key = hist[:klen] - if key in self.opening_book: - key_used = key - candidates = list(self.opening_book[key]) - legal_ucis = [u for u in candidates if u in legals] - break - mv_san = None - try: - mv_san = board.san(book_mv) - except Exception: - pass - annotations = self._annotate_move_simple(board, book_mv) - lines = [ - "source=opening-book", - f"book=internal.opening_book key={key_used if key_used is not None else 'N/A'}", - f"history={hist}", - f"candidates={candidates}", - f"legal_candidates={legal_ucis}", - "selection=first-legal-candidate (stable)", - f"chosen={mv_san + ' ' if mv_san else ''}({book_mv.uci()}) reasons=[{annotations}]", - ] - return book_mv, "\n".join(lines) - - # Analyze all legal moves at the root with alpha-beta to given depth/time - for d in range(1, self.depth + 1): - if time.time() >= self._deadline: - break - depth_used = d - scores = self._analyze_root(board, d, start) - if scores: - best_move = scores[0][0] - - # Emit a concise log of considered moves and scores - try: - self._log_considered(board, scores) - except Exception: - pass - - if not scores: - # Fallback - mv = self.choose_move(board) - return mv, "fallback: random/legal-only (no analysis)" - - # Apply the same final safety veto as choose_move (avoid logged/risky blunders) - veto_applied = False - original_best = best_move - - # Build explanation - def annotate(m: chess.Move) -> str: - return self._annotate_move_simple(board, m) - - top = scores[:5] - best_cp = None - # If we changed the choice via veto, try to find the chosen move's score from scores - if best_move is not None: - for mv, sc in scores: - if mv == best_move: - best_cp = sc - break - if best_cp is None: - best_cp = top[0][1] - elapsed = time.time() - start - lines = [ - f"source=search depth={depth_used} time={elapsed:.2f}s nodes={getattr(self, '_nodes', 0)} candidates={len(scores)}", - f"best {board.san(best_move if best_move is not None else top[0][0])} ({(best_move if best_move is not None else top[0][0]).uci()}) score={best_cp:.1f} reasons=[{annotate(best_move if best_move is not None else top[0][0])}]", - ] - if len(top) > 1: - lines.append("alternatives:") - for mv, sc in top[1:]: - delta = sc - best_cp - # Keep alternatives concise: move + score + delta (no detailed reasons) - try: - san_alt = board.san(mv) - except Exception: - san_alt = '' - lines.append(f" {san_alt} ({mv.uci()}) score={sc:.1f} delta={delta:+.1f}") - - if veto_applied and original_best is not None and best_move is not None and original_best != best_move: - # Provide a short note about the veto and why - try: - ann_orig = annotate(original_best) - ann_safe = annotate(best_move) - except Exception: - ann_orig = ann_safe = "" - lines.append("safety_veto: true") - lines.append( - f" avoided {board.san(original_best)} ({original_best.uci()}) because looks_blunderish=true" + # depth is accepted for compatibility with existing callers but is unused; + # the C engine handles its own scoring/selection. + self.depth = depth + # Default relative path inside this repo + default_path = os.path.abspath( + os.path.join( + os.path.dirname(__file__), + "..", "..", + "C", "lichess_random_engine", "random_engine", + ) + ) + self.engine_path = engine_path or default_path + if not os.path.isfile(self.engine_path) or not os.access(self.engine_path, os.X_OK): + raise FileNotFoundError( + f"C engine not found or not executable at '{self.engine_path}'. " + "Build it first (make -C C/lichess_random_engine)." ) - # Add quick risk/SEE snapshot for context - try: - see_bad = int(self._see_value(board, original_best)) - except Exception: - see_bad = 0 - risk_bad = self._risk_score(board, original_best) - try: - see_safe = int(self._see_value(board, best_move)) - except Exception: - see_safe = 0 - risk_safe = self._risk_score(board, best_move) - lines.append(f" avoided_move_details: see={see_bad} risk_total={risk_bad} reasons=[{ann_orig}]") - lines.append(f" chosen_safer: {board.san(best_move)} ({best_move.uci()}) see={see_safe} risk_total={risk_safe} reasons=[{ann_safe}]") - # Deep-dive numeric breakdown for the chosen move - if best_move is not None: - # SEE and risk details - try: - see_val = int(self._see_value(board, best_move)) - except Exception: - see_val = 0 - risk_total = self._risk_score(board, best_move) - risk_qtrap = 0 - try: - risk_qtrap = self._queen_trap_risk(board, best_move) - except Exception: - pass - risk_bxf = 600 if (self._is_early_game(board) and self._is_bishop_sac_on_f2f7(board, best_move)) else 0 + def _call_engine(self, args: list[str], *, timeout: float) -> str: + try: + proc = subprocess.run( + [self.engine_path] + args, + capture_output=True, + text=True, + timeout=timeout, + check=True, + ) + except subprocess.CalledProcessError as e: + stderr = (e.stderr or "").strip() + raise RuntimeError(f"C engine failed: {stderr or e}") from e + except subprocess.TimeoutExpired as e: + raise TimeoutError("C engine timed out") from e + out = (proc.stdout or "").strip() + return out - # Static evaluation components before and after (mover perspective) - pre_white_score, pre_comp = self._evaluate_components(board) - pre_stm = pre_white_score if board.turn == chess.WHITE else -pre_white_score + def choose_move(self, board: chess.Board) -> chess.Move: + mv, _ = self.choose_move_with_explanation(board, time_budget_sec=self.max_time_sec) + return mv - board.push(best_move) - try: - post_white_score, post_comp = self._evaluate_components(board) - # After the move, it's opponent to move; flip sign to mover perspective - post_stm = - (post_white_score if board.turn == chess.WHITE else -post_white_score) - finally: - board.pop() + def choose_move_with_explanation(self, board: chess.Board, *, time_budget_sec: float) -> Tuple[Optional[chess.Move], str]: + # Collect legal moves and send to engine as plain UCI tokens. + legal = list(board.legal_moves) + if not legal: + return None, "no_legal_moves" - # Tactical delta captured by search beyond static eval - tactical_delta = best_cp - post_stm + args = [m.uci() for m in legal] + # Optionally pass a seed for reproducibility when desired; keep default behavior otherwise. + # We deliberately avoid adding annotations here per request. - # Compose component lines with explanations - def fmt_comps(prefix: str, comp: dict, white_score_val: float, stm_val: float) -> list[str]: - parts = [] - parts.append(f"{prefix}: stm_eval={stm_val:.1f} (from white_score={white_score_val:.1f} {'as-is' if (prefix=='pre') else 'flipped to mover'})") - parts.append(" components (white-centric):") - parts.append(f" material={comp['material']} # material balance in centipawns (white - black)") - parts.append(f" doubled_pawns_term={comp['doubled_pawns_term']} # - (white_minus_black_doubled_pawns_penalty)") - parts.append(f" mobility_term={comp['mobility_term']} # weighted (legal_moves_white - legal_moves_black)") - parts.append(f" mobility_white={comp['mob_w']} mobility_black={comp['mob_b']}") - parts.append(f" center_score={comp['center_score']} # piece presence in central squares") - parts.append(f" rook_file_bonus={comp['rook_file_bonus']} # rooks on open files") - parts.append(f" king_safety={comp['safety']} # castled/central king heuristics in middlegame") - parts.append(f" queen_raid_penalty={comp['queen_raid_pen']} # early risky queen raids") - parts.append(f" piece_square_table={comp['pst']} # small piece-square tendencies") - parts.append(f" hanging_pieces_term={comp['hanging_pieces_term']} # - (hanging pieces penalty: white - black)") - return parts + output = self._call_engine(args, timeout=max(0.1, time_budget_sec)) - pre_lines = fmt_comps("pre", pre_comp, pre_white_score, pre_stm) - post_lines = fmt_comps("post", post_comp, post_white_score, post_stm) + # The engine, without --explain, should print the chosen UCI. + chosen_uci = output.splitlines()[-1].strip() if output else "" + try: + move = chess.Move.from_uci(chosen_uci) + except Exception: + raise RuntimeError(f"Engine returned invalid move: '{chosen_uci}' (output: {output!r})") - lines.append("details:") - lines.append(f" see={see_val} # Static Exchange Evaluation of chosen move (>=0 means not losing material immediately)") - lines.append(f" risk_total={risk_total} # aggregate risk score (lower is safer)") - lines.append(f" risk_queen_trap={risk_qtrap} # estimated risk of the queen becoming trapped/over-attacked") - lines.append(f" risk_bishop_sac_f2f7={risk_bxf} # extra risk for early Bxf2/Bxf7 motifs") - lines.append(f" pre_static_eval: {pre_stm:.1f} # mover-perspective before making the move") - lines.append(f" post_static_eval: {post_stm:.1f} # mover-perspective immediately after the move") - lines.append(f" search_score: {best_cp:.1f} # alpha-beta score after quiescence") - lines.append(f" tactical_delta: {tactical_delta:+.1f} # (search_score - post_static_eval), captures/tactics beyond static") - lines.extend(pre_lines) - lines.extend(post_lines) + if move not in board.legal_moves: + raise RuntimeError(f"Engine returned illegal move for position: {chosen_uci}") - return best_move, "\n".join(lines) + return move, "from_c_engine" def evaluate_proposed_move_with_suggestion( self, - state: "chess.Board | str", - proposed_move: "chess.Move | str", - time_budget_sec: Optional[float] = None, - ) -> Tuple[Optional[float], str, Optional[chess.Move], str]: - """Evaluate a proposed move on a given position and also propose the engine's move. - - Inputs: - - state: either a chess.Board or a FEN string representing the current position. - - proposed_move: a chess.Move object or a string (UCI preferred, SAN fallback). - - time_budget_sec: optional time limit for each analysis pass. - - Returns a tuple: (proposed_score_cp, proposed_explanation, engine_move, engine_explanation) - - proposed_score_cp: search score (centipawns, mover POV) for the proposed move, or None if illegal/unavailable. - - proposed_explanation: a detailed, human-readable breakdown of where the score came from. - - engine_move: the move our engine would play from this position. - - engine_explanation: the usual explanation for the engine's chosen move. + board: chess.Board, + proposed_move_uci: str, + *, + time_budget_sec: float, + ) -> Tuple[float, str, Optional[chess.Move], str]: """ - # Normalize board - if isinstance(state, chess.Board): - board = state.copy(stack=False) - else: - try: - board = chess.Board(state) - except Exception: - # If FEN parsing fails, assume start position - board = chess.Board() + Ask the C engine to explain the current move list and analyze a specific candidate. - # Parse proposed move - def _parse_move(b: chess.Board, mv: "chess.Move | str") -> Optional[chess.Move]: - if isinstance(mv, chess.Move): - return mv if mv in b.legal_moves else None - # Try UCI first - try: - m = chess.Move.from_uci(str(mv).strip()) - if m in b.legal_moves: - return m - except Exception: - pass - # Try SAN - try: - m = b.parse_san(str(mv).strip()) - if m in b.legal_moves: - return m - except Exception: - pass - return None + Returns (candidate_score, candidate_expl, best_move, best_expl) + where explanations are concise JSON snippets from the engine. All logic is + delegated to the C binary; no scoring is done in Python. + """ + legal = list(board.legal_moves) + if not legal: + return 0.0, "no_legal_moves", None, "no_best_move" - pmove = _parse_move(board, proposed_move) + args = ["--explain", "--analyze", proposed_move_uci] + [m.uci() for m in legal] + out = self._call_engine(args, timeout=max(0.1, time_budget_sec)) - # Evaluate proposed move (ensure we set a deadline specifically for this pass) - start = time.time() - budget = time_budget_sec if time_budget_sec is not None else min(1.5, self.max_time_sec) - self._deadline = start + max(0.01, budget) - self._nodes = 0 - - proposed_score: Optional[float] = None - if pmove is None: - # Still compute our own suggestion below - proposed_expl = ( - "source=search\nillegal_or_unavailable_move=true\n" - "note=The proposed move is not legal in the provided position." - ) - else: - # Compute search score for the specific move - # Use full alpha-beta for consistency with choose_move_with_explanation - # Depth is limited by deadline - depth_used = 0 - best_score_for_move: Optional[float] = None - # A tiny iterative deepening loop focused on the single move - for d in range(1, self.depth + 1): - if time.time() >= self._deadline: - break - depth_used = d - board.push(pmove) - try: - sc = -self._alphabeta(board, d - 1, -float("inf"), float("inf"), start) - finally: - board.pop() - best_score_for_move = sc - proposed_score = best_score_for_move - - # Build a detailed breakdown similar to choose_move_with_explanation - def _move_breakdown(b: chess.Board, m: chess.Move, search_score: float) -> str: - # SEE and risk - try: - see_val = int(self._see_value(b, m)) - except Exception: - see_val = 0 - risk_total = self._risk_score(b, m) - try: - risk_qtrap = self._queen_trap_risk(b, m) - except Exception: - risk_qtrap = 0 - risk_bxf = 600 if (self._is_early_game(b) and self._is_bishop_sac_on_f2f7(b, m)) else 0 - - # Static eval components pre/post (mover perspective) - pre_white_score, pre_comp = self._evaluate_components(b) - pre_stm = pre_white_score if b.turn == chess.WHITE else -pre_white_score - - b.push(m) - try: - post_white_score, post_comp = self._evaluate_components(b) - post_stm = - (post_white_score if b.turn == chess.WHITE else -post_white_score) - finally: - b.pop() - - tactical_delta = search_score - post_stm - - def fmt_comps(prefix: str, comp: dict, white_score_val: float, stm_val: float) -> list[str]: - parts = [] - parts.append(f"{prefix}: stm_eval={stm_val:.1f} (from white_score={white_score_val:.1f} {'as-is' if (prefix=='pre') else 'flipped to mover'})") - parts.append(" components (white-centric):") - parts.append(f" material={comp['material']}") - parts.append(f" doubled_pawns_term={comp['doubled_pawns_term']}") - parts.append(f" mobility_term={comp['mobility_term']}") - parts.append(f" mobility_white={comp['mob_w']} mobility_black={comp['mob_b']}") - parts.append(f" center_score={comp['center_score']}") - parts.append(f" rook_file_bonus={comp['rook_file_bonus']}") - parts.append(f" king_safety={comp['safety']}") - parts.append(f" queen_raid_penalty={comp['queen_raid_pen']}") - parts.append(f" piece_square_table={comp['pst']}") - parts.append(f" hanging_pieces_term={comp['hanging_pieces_term']}") - return parts - - pre_lines = fmt_comps("pre", pre_comp, pre_white_score, pre_stm) - post_lines = fmt_comps("post", post_comp, post_white_score, post_stm) - - elapsed = time.time() - start - nodes = getattr(self, "_nodes", 0) - mv_san = None - try: - mv_san = b.san(m) - except Exception: - pass - ann = self._annotate_move_simple(b, m) - lines = [ - f"source=search depth={depth_used} time={elapsed:.2f}s nodes={nodes}", - f"move {mv_san if mv_san else ''} ({m.uci()}) score={search_score:.1f} reasons=[{ann}]", - "details:", - f" see={see_val}", - f" risk_total={risk_total}", - f" risk_queen_trap={risk_qtrap}", - f" risk_bishop_sac_f2f7={risk_bxf}", - f" pre_static_eval: {pre_stm:.1f}", - f" post_static_eval: {post_stm:.1f}", - f" search_score: {search_score:.1f}", - f" tactical_delta: {tactical_delta:+.1f}", - ] - lines.extend(pre_lines) - lines.extend(post_lines) - return "\n".join(lines) - - if proposed_score is None: - proposed_expl = ( - "source=search\nunable_to_compute_score=true\n" - "note=Time limit reached before evaluating the proposed move." - ) - else: - proposed_expl = _move_breakdown(board, pmove, proposed_score) - - # Now get the engine's own suggestion (separate pass to keep API simple) - eng_budget = time_budget_sec if time_budget_sec is not None else min(1.5, self.max_time_sec) - move_suggestion, engine_expl = self.choose_move_with_explanation(board, time_budget_sec=eng_budget) - - return proposed_score, proposed_expl, move_suggestion, engine_expl - - def _analyze_root(self, board: chess.Board, depth: int, start: float) -> list[Tuple[chess.Move, float]]: - alpha = -float("inf") - beta = float("inf") - scored: list[Tuple[chess.Move, float]] = [] - for move in self._ordered_moves(board): - if time.time() >= self._deadline: - break - board.push(move) - score = -self._alphabeta(board, depth - 1, -beta, -alpha, start) - board.pop() - scored.append((move, score)) - if score > alpha: - alpha = score - if alpha >= beta: - break - # Prefer higher score; on ties, prefer lower risk - risk_map = {m: self._risk_score(board, m) for m, _ in scored} - scored.sort(key=lambda t: (t[1], -risk_map[t[0]]), reverse=True) - return scored - - def _search_root(self, board: chess.Board, depth: int, start: float) -> Tuple[float, Optional[chess.Move]]: - alpha = -float("inf") - beta = float("inf") + # Try to parse the engine's JSON explanation + import json as _json + cand_score = 0.0 best_move: Optional[chess.Move] = None - best_score = -float("inf") - - moves = self._ordered_moves(board) - for move in moves: - if time.time() >= self._deadline: - break - board.push(move) - score = -self._alphabeta(board, depth - 1, -beta, -alpha, start) - board.pop() - # Prefer lower-risk choices on score ties - if score > best_score: - best_score = score - best_move = move - elif best_move is not None and (score == best_score or abs(score - best_score) < 1e-3): - if self._risk_score(board, move) < self._risk_score(board, best_move): - best_move = move - if score > alpha: - alpha = score - if alpha >= beta: - break - return best_score, best_move - - def _alphabeta(self, board: chess.Board, depth: int, alpha: float, beta: float, start: float) -> float: - # Time cutoff - if time.time() >= self._deadline: - return self._evaluate(board) - - # Terminal nodes - if board.is_game_over(): - return self._evaluate(board) - if depth == 0: - return self._quiescence(board, alpha, beta, start) - - best = -float("inf") - for move in self._ordered_moves(board): - # Node counting for transparency - try: - self._nodes += 1 - except Exception: - pass - board.push(move) - score = -self._alphabeta(board, depth - 1, -beta, -alpha, start) - board.pop() - if score > best: - best = score - if best > alpha: - alpha = best - if alpha >= beta: - break - return best - - def _quiescence(self, board: chess.Board, alpha: float, beta: float, start: float) -> float: - # Stand-pat - # Count the node and evaluate + cand_expl = out + best_expl = out try: - self._nodes += 1 + data = _json.loads(out) + # candidate score if provided + analyze = data.get("analyze") or {} + cs = analyze.get("candidate_score") + if isinstance(cs, (int, float)): + cand_score = float(cs) + # best move + chosen = data.get("chosen_move") + if isinstance(chosen, str): + try: + bm = chess.Move.from_uci(chosen) + if bm in board.legal_moves: + best_move = bm + except Exception: + best_move = None + # Store compact explanations for debugging + cand_expl = _json.dumps(analyze, ensure_ascii=False) + best_expl = _json.dumps({ + "chosen_index": data.get("chosen_index"), + "chosen_move": data.get("chosen_move"), + }, ensure_ascii=False) except Exception: + # Leave defaults with raw output text pass - stand_pat = self._evaluate(board) - if stand_pat >= beta: - return beta - if stand_pat > alpha: - alpha = stand_pat - # Explore captures to avoid horizon effect - # Consider only captures/promotions and order by SEE to reduce blunders - capture_moves: list[tuple[int, chess.Move]] = [] - for move in self._ordered_moves(board): - if time.time() >= self._deadline: - break - if not board.is_capture(move) and not move.promotion: - continue - try: - capture_moves.append((int(self._see_value(board, move)), move)) - except Exception: - capture_moves.append((0, move)) - - capture_moves.sort(key=lambda t: t[0], reverse=True) - - for _, move in capture_moves: - if time.time() >= self._deadline: - break - try: - self._nodes += 1 - except Exception: - pass - board.push(move) - score = -self._quiescence(board, -beta, -alpha, start) - board.pop() - if score >= beta: - return beta - if score > alpha: - alpha = score - return alpha - - def _ordered_moves(self, board: chess.Board): - # Move ordering that mixes tactical SEE with simple heuristics - def score_move(m: chess.Move) -> int: - s = 0 - is_cap = board.is_capture(m) - if is_cap: - s += 1000 - if m.promotion: - s += 800 - try: - if board.gives_check(m): - s += 120 - except Exception: - pass - - # SEE: reward good captures and avoid obviously losing moves - try: - see = int(self._see_value(board, m)) - if is_cap or see < 0: - s += max(-600, min(600, see)) - # Non-capture pawn push that has negative SEE on destination is bad; demote strongly - if not is_cap and piece and piece.piece_type == chess.PAWN and see < 0: - s -= 180 - except Exception: - pass - - early = self._is_early_game(board) - piece = board.piece_at(m.from_square) - if piece: - # Heuristic: demote unsound early bishop sacs on f2/f7 - if early and self._is_bishop_sac_on_f2f7(board, m): - try: - see_sac = int(self._see_value(board, m)) - except Exception: - see_sac = -300 - # Large penalty if SEE is bad or not clearly winning material - if see_sac <= -50: - s -= 1300 # outweigh capture+check bonuses - else: - s -= 600 - # Discourage premature queen adventures in the opening - if piece.piece_type == chess.QUEEN and early: - # Strongly demote greedy corner rook captures like Qxh8/Qxa8/Qxh1/Qxa1 - if is_cap: - victim = board.piece_at(m.to_square) - if victim and victim.piece_type == chess.ROOK and m.to_square in {chess.A8, chess.H8, chess.A1, chess.H1}: - try: - if board.gives_check(m): - s -= 1200 - else: - s -= 900 - except Exception: - s -= 900 - victim = board.piece_at(m.to_square) - # Penalize queen pawn-grabs on edge pawns (a2/b2/g2/h2 or a7/b7/g7/h7) - poison_targets_white = {chess.A7, chess.B7, chess.G7, chess.H7} - poison_targets_black = {chess.A2, chess.B2, chess.G2, chess.H2} - is_poison_target = ( - (piece.color == chess.WHITE and m.to_square in poison_targets_white) - or (piece.color == chess.BLACK and m.to_square in poison_targets_black) - ) - if is_cap and victim and victim.piece_type == chess.PAWN and is_poison_target: - # If destination is heavily attacked, apply a large penalty - attackers_op = len(board.attackers(not piece.color, m.to_square)) - defenders_me = len(board.attackers(piece.color, m.to_square)) - if attackers_op >= max(1, defenders_me): - s -= 500 - else: - s -= 250 - # General small penalty for non-check queen moves before minor development - if not is_cap: - if self._most_minors_undeveloped(board, piece.color): - s -= 160 - else: - s -= 60 - if board.is_castling(m): - s += 650 - if piece.piece_type in (chess.KNIGHT, chess.BISHOP): - if chess.square_rank(m.from_square) in (0, 7) and not is_cap: - s += 90 - if early and piece.piece_type == chess.KNIGHT: - to_file = chess.square_file(m.to_square) - if to_file in (0, 7) and not is_cap: - s -= 140 - if piece.piece_type == chess.KING and not board.is_castling(m): - # Strong demotion for early king shuffles; still demote in middlegame if heavy pieces remain - heavy_pieces = sum(1 for p in board.piece_map().values() if p.piece_type in (chess.QUEEN, chess.ROOK)) - if early: - s -= 650 - elif heavy_pieces >= 2: - s -= 400 - if piece.piece_type == chess.ROOK and early and self._most_minors_undeveloped(board, piece.color): - s -= 140 - if piece.piece_type == chess.QUEEN and early and not is_cap: - try: - gives_check = board.gives_check(m) - except Exception: - gives_check = False - if not gives_check: - s -= 120 - if early and not is_cap and not board.is_castling(m): - if not self._is_start_square(piece.piece_type, piece.color, m.from_square): - to_center = chess.square_file(m.to_square) in (3, 4) and chess.square_rank(m.to_square) in (2, 3, 4, 5) - if not to_center: - s -= 70 - if piece.piece_type == chess.PAWN and early and not is_cap: - from_file = chess.square_file(m.from_square) - from_rank = chess.square_rank(m.from_square) - to_rank = chess.square_rank(m.to_square) - # Bishop kick patterns (a6 vs Bb5, h6 vs Bg5, g6 vs Bf5) - if piece.color == chess.BLACK: - if m.from_square == chess.H7 and m.to_square == chess.H6: - tgt = board.piece_at(chess.G5) - if tgt and tgt.color != piece.color and tgt.piece_type == chess.BISHOP: - s += 130 - if m.from_square == chess.A7 and m.to_square == chess.A6: - tgt = board.piece_at(chess.B5) - if tgt and tgt.color != piece.color and tgt.piece_type == chess.BISHOP: - s += 120 - if m.from_square == chess.G7 and m.to_square == chess.G6: - tgt = board.piece_at(chess.F5) - if tgt and tgt.color != piece.color and tgt.piece_type == chess.BISHOP: - s += 90 - else: - if m.from_square == chess.H2 and m.to_square == chess.H3: - tgt = board.piece_at(chess.G4) - if tgt and tgt.color != piece.color and tgt.piece_type == chess.BISHOP: - s += 130 - if m.from_square == chess.A2 and m.to_square == chess.A3: - tgt = board.piece_at(chess.B4) - if tgt and tgt.color != piece.color and tgt.piece_type == chess.BISHOP: - s += 120 - if m.from_square == chess.G2 and m.to_square == chess.G3: - tgt = board.piece_at(chess.F4) - if tgt and tgt.color != piece.color and tgt.piece_type == chess.BISHOP: - s += 90 - # Discourage early f-pawn push and also random wing pawn thrusts like a/b/g/h - if from_file == 5: - if piece.color == chess.WHITE and from_rank == 1 and to_rank == 2: - s -= 140 - if piece.color == chess.BLACK and from_rank == 6 and to_rank == 5: - s -= 140 - if from_file in (0, 1, 6, 7) and ((piece.color == chess.WHITE and from_rank == 1 and to_rank == 2) or (piece.color == chess.BLACK and from_rank == 6 and to_rank == 5)): - s -= 120 - # Extra demotion for rook-pawn push before castling (weakens king flank) - if from_file in (0, 7): - king_sq = board.king(piece.color) - if king_sq is not None: - # Uncastled or castled short on same wing amplifies the penalty - if king_sq in ((chess.E1, chess.E8) if piece.color == chess.WHITE else (chess.E8, chess.E1)): - s -= 60 - if king_sq in (chess.G1, chess.G8): - s -= 60 - # Discourage early c-pawn push to c4/c5 if we already advanced the e-pawn (prevents e5+c5 blunder-y structures) - if from_file == 2: - e_pawn_sq = chess.E2 if piece.color == chess.WHITE else chess.E7 - e_advanced = board.piece_at(e_pawn_sq) is None - if e_advanced and ((piece.color == chess.WHITE and from_rank == 1 and to_rank == 3) or (piece.color == chess.BLACK and from_rank == 6 and to_rank == 4)): - s -= 80 - if chess.square_file(m.to_square) in (3, 4): - s += 30 - # Mid/late game: discourage casual pawn shoves that don't fight the center - if piece.piece_type == chess.PAWN and (not early) and not is_cap and not m.promotion: - to_file = chess.square_file(m.to_square) - # Wing pawn pushes are most suspect - if to_file in (0, 7): - s -= 180 - elif to_file in (1, 6): - s -= 130 - elif to_file in (2, 5): - s -= 90 - else: - s -= 50 - # If most minors are still on the back rank, further discourage pawn moves - if self._most_minors_undeveloped(board, piece.color): - s -= 120 - # Extra demotion for rook-pawn storms around our king side in middlegame - if to_file in (0, 7): - king_sq = board.king(piece.color) - if king_sq in (chess.G1, chess.G8, chess.E1, chess.E8): - s -= 60 - # Reward minor piece development when most minors are undeveloped - if piece.piece_type in (chess.KNIGHT, chess.BISHOP) and not is_cap: - if chess.square_rank(m.from_square) in (0, 7): - s += 150 - if self._most_minors_undeveloped(board, piece.color): - s += 120 - # Small extra for heading toward the center - to_file = chess.square_file(m.to_square) - to_rank = chess.square_rank(m.to_square) - if to_file in (2, 3, 4, 5) and to_rank in (2, 3, 4, 5): - s += 40 - return s - - moves = list(board.legal_moves) - moves.sort(key=score_move, reverse=True) - return moves - - def _evaluate(self, board: chess.Board) -> float: - # Terminal - if board.is_checkmate(): - # If it's our turn and we're checkmated, that's bad for us - return -100000 - if board.is_stalemate() or board.is_insufficient_material() or board.can_claim_draw(): - return 0 - - white_score, _ = self._evaluate_components(board) - return white_score if board.turn == chess.WHITE else -white_score - - def _evaluate_components(self, board: chess.Board) -> Tuple[float, dict]: - """Compute the white-centric evaluation and return a components dict for transparency. - - Returns a tuple of (white_score, components_dict). The components dict contains - the exact terms that sum to the white-centric score, plus small helper values. - """ - # Base material (white - black) - material = 0 - piece_map = board.piece_map() - for sq, pc in piece_map.items(): - val = self.piece_values[pc.piece_type] - material += val if pc.color == chess.WHITE else -val - - # Doubled pawns penalty (white - black penalty) - dp_pen = self._doubled_pawns_penalty(board) - doubled_term = -dp_pen - - # Mobility (white - black) with small weight - mob_w, mob_b = self._mobility(board) - mobility_term = (mob_w - mob_b) * 1.0 - - # Centralization: reward pieces in the center (white - black) - center = {chess.C3, chess.D3, chess.E3, chess.F3, chess.C4, chess.D4, chess.E4, chess.F4, - chess.C5, chess.D5, chess.E5, chess.F5, chess.C6, chess.D6, chess.E6, chess.F6} - center_score = 0 - for sq, pc in piece_map.items(): - if sq in center: - w = 10 if pc.piece_type in (chess.KNIGHT, chess.BISHOP) else 5 - center_score += w if pc.color == chess.WHITE else -w - - # Rooks on open files - rook_file_bonus = 0 - for sq, pc in piece_map.items(): - if pc.piece_type == chess.ROOK: - file_idx = chess.square_file(sq) - if self._is_open_file(board, file_idx): - rook_file_bonus += 15 if pc.color == chess.WHITE else -15 - - # King safety: prefer castled in middlegame (queens/rooks present) - safety = 0 - heavy_pieces = sum(1 for p in piece_map.values() if p.piece_type in (chess.QUEEN, chess.ROOK)) - if heavy_pieces >= 3: - wk_sq = board.king(chess.WHITE) - bk_sq = board.king(chess.BLACK) - safety += self._king_safety_bonus(wk_sq, chess.WHITE) - safety -= self._king_safety_bonus(bk_sq, chess.BLACK) - # Penalize wandering kings early if not castled squares - if self._is_early_game(board): - if wk_sq not in (chess.E1, chess.G1, chess.C1): - safety -= 40 - if bk_sq not in (chess.E8, chess.G8, chess.C8): - safety += 40 - - # Early queen raid penalty: queen deep in opponent camp in the opening - queen_raid_pen = 0 - if self._is_early_game(board): - q_w = board.pieces(chess.QUEEN, chess.WHITE) - q_b = board.pieces(chess.QUEEN, chess.BLACK) - if q_w: - qsq = next(iter(q_w)) - # White queen on rank 7/8 is often risky early - if chess.square_rank(qsq) >= 6: - queen_raid_pen -= 30 - if q_b: - qsq = next(iter(q_b)) - # Black queen on rank 1/2 is often risky early - if chess.square_rank(qsq) <= 1: - queen_raid_pen += 30 - - # Piece-square tendencies (small) - pst = self._pst_score(board) - - # Hanging/loose pieces penalty (white - black) - hanging_pen = self._hanging_pieces_penalty(board) - hanging_term = -hanging_pen - - white_score = material + doubled_term + mobility_term + center_score + rook_file_bonus + safety + queen_raid_pen + pst + hanging_term - comps = { - "material": material, - "doubled_pawns_term": doubled_term, - "mobility_term": mobility_term, - "mob_w": mob_w, - "mob_b": mob_b, - "center_score": center_score, - "rook_file_bonus": rook_file_bonus, - "safety": safety, - "queen_raid_pen": queen_raid_pen, - "pst": pst, - "hanging_pieces_term": hanging_term, - } - return white_score, comps - - def _annotate_move_simple(self, board: chess.Board, m: chess.Move) -> str: - """Return a short, human-friendly tag list for a move.""" - tags = [] - if board.is_capture(m): - tags.append("capture") - if m.promotion: - try: - tags.append(f"promotes={chess.piece_symbol(m.promotion).upper()}") - except Exception: - tags.append("promotes") - try: - if board.gives_check(m): - tags.append("check") - except Exception: - pass - if board.is_castling(m): - tags.append("castle") - # Centralization - center = {chess.D4, chess.E4, chess.D5, chess.E5} - if m.to_square in center: - tags.append("center") - # Development: minor piece leaves back rank - piece = board.piece_at(m.from_square) - if piece and piece.piece_type in (chess.KNIGHT, chess.BISHOP): - if chess.square_rank(m.from_square) in (0, 7): - tags.append("develops") - # Rook to (semi-)open file - if piece and piece.piece_type == chess.ROOK: - file_idx = chess.square_file(m.to_square) - if self._is_open_file(board, file_idx): - tags.append("open-file") - return ",".join(tags) - - def _opening_book_move(self, board: chess.Board) -> Optional[chess.Move]: - # Only use book for the first few plies and only from starting positions - if board.move_stack is None: - return None - if board.fullmove_number > 10: - return None - # If there's no history (e.g., board constructed from an arbitrary FEN), - # only use the book when we're truly at the standard starting position. - if len(board.move_stack) == 0: - try: - start_board = chess.Board() - if board.board_fen() != start_board.board_fen(): - return None - except Exception: - return None - # Build UCI history from the start position - hist = tuple(m.uci() for m in board.move_stack) - # Try exact key; also try from a truncated start if someone inserted off-book early - for klen in range(len(hist), -1, -1): - key = hist[:klen] - if key in self.opening_book: - candidates = self.opening_book[key] - # Filter to legal moves only - legals = {m.uci(): m for m in board.legal_moves} - legal_ucis = [u for u in candidates if u in legals] - if legal_ucis: - # Choose the first candidate to be stable; could randomize if desired - return legals[legal_ucis[0]] - return None - - def _is_start_square(self, piece_type: chess.PieceType, color: chess.Color, sq: int) -> bool: - file_idx = chess.square_file(sq) - rank_idx = chess.square_rank(sq) - if piece_type == chess.KING: - return (file_idx, rank_idx) == ((4, 0) if color == chess.WHITE else (4, 7)) - if piece_type == chess.QUEEN: - return (file_idx, rank_idx) == ((3, 0) if color == chess.WHITE else (3, 7)) - if piece_type == chess.ROOK: - return (file_idx, rank_idx) in ({(0, 0), (7, 0)} if color == chess.WHITE else {(0, 7), (7, 7)}) - if piece_type == chess.BISHOP: - return (file_idx, rank_idx) in ({(2, 0), (5, 0)} if color == chess.WHITE else {(2, 7), (5, 7)}) - if piece_type == chess.KNIGHT: - return (file_idx, rank_idx) in ({(1, 0), (6, 0)} if color == chess.WHITE else {(1, 7), (6, 7)}) - if piece_type == chess.PAWN: - return rank_idx == (1 if color == chess.WHITE else 6) - return False - - def _pst_score(self, board: chess.Board) -> int: - score = 0 - for sq, pc in board.piece_map().items(): - file_idx = chess.square_file(sq) - rank_idx = chess.square_rank(sq) - sign = 1 if pc.color == chess.WHITE else -1 - if pc.piece_type == chess.KNIGHT: - # Knights: center good, rim bad - if file_idx in (0, 7): - score -= 20 * sign - elif file_idx in (1, 6): - score -= 10 * sign - if rank_idx in (0, 7): - score -= 10 * sign - if (file_idx, rank_idx) in {(2, 2), (3, 2), (4, 2), (5, 2), (2, 3), (3, 3), (4, 3), (5, 3)}: - score += 15 * sign - elif pc.piece_type == chess.BISHOP: - # Bishops: prefer long diagonals and central ranks - if rank_idx in (2, 3, 4, 5): - score += 5 * sign - elif pc.piece_type == chess.PAWN: - # Central pawns advanced are nice - if file_idx in (3, 4): - score += rank_idx * 1 * sign if pc.color == chess.WHITE else (7 - rank_idx) * 1 * sign - return score - - def _is_early_game(self, board: chess.Board) -> bool: - # Quick heuristic for opening/middlegame - heavy_pieces = sum(1 for p in board.piece_map().values() if p.piece_type in (chess.QUEEN, chess.ROOK)) - return heavy_pieces >= 3 and board.fullmove_number < 15 - - def _most_minors_undeveloped(self, board: chess.Board, color: chess.Color) -> bool: - # True if 3 or 4 minors still on back rank starting squares - if color == chess.WHITE: - starts = [chess.B1, chess.G1, chess.C1, chess.F1] - else: - starts = [chess.B8, chess.G8, chess.C8, chess.F8] - cnt = 0 - for sq in starts: - pc = board.piece_at(sq) - if pc and pc.color == color and pc.piece_type in (chess.KNIGHT, chess.BISHOP): - cnt += 1 - return cnt >= 3 - - def _mobility(self, board: chess.Board) -> Tuple[int, int]: - # Count legal moves for both sides using copies - w_board = board if board.turn == chess.WHITE else board.copy(stack=False) - if w_board.turn != chess.WHITE: - w_board.turn = chess.WHITE - b_board = board if board.turn == chess.BLACK else board.copy(stack=False) - if b_board.turn != chess.BLACK: - b_board.turn = chess.BLACK - return sum(1 for _ in w_board.legal_moves), sum(1 for _ in b_board.legal_moves) - - def _is_open_file(self, board: chess.Board, file_idx: int) -> bool: - # True if no pawns on this file (either color) - for rank in range(8): - sq = chess.square(file_idx, rank) - pc = board.piece_at(sq) - if pc and pc.piece_type == chess.PAWN: - return False - return True - - def _doubled_pawns_penalty(self, board: chess.Board) -> int: - # Penalty in centipawns for doubled pawns (per extra pawn on a file) - penalty = 0 - for color in (chess.WHITE, chess.BLACK): - for file_idx in range(8): - cnt = 0 - for rank in range(8): - sq = chess.square(file_idx, rank) - pc = board.piece_at(sq) - if pc and pc.piece_type == chess.PAWN and pc.color == color: - cnt += 1 - if cnt > 1: - penalty += (cnt - 1) * 12 * (1 if color == chess.WHITE else -1) - return penalty - - def _king_safety_bonus(self, king_sq: int, color: chess.Color) -> int: - # Bonus for castled-like positions in middlegame; penalty for center-exposed kings - if king_sq is None: - return 0 - file_idx = chess.square_file(king_sq) - rank_idx = chess.square_rank(king_sq) - if color == chess.WHITE: - if (file_idx, rank_idx) in {(6, 0), (2, 0)}: - return 20 - if (file_idx, rank_idx) in {(4, 0), (3, 0)}: - return -10 - else: - if (file_idx, rank_idx) in {(6, 7), (2, 7)}: - return 20 - if (file_idx, rank_idx) in {(4, 7), (3, 7)}: - return -10 - return 0 - - # --- Tactical helpers --- - def _see_value(self, board: chess.Board, move: chess.Move) -> int: - """Static Exchange Evaluation for a move in centipawns. - - Positive is good for the side to move. Uses python-chess SEE when available. - """ - if hasattr(board, "see"): - return int(board.see(move)) - # Fallback MVV/LVA approximation - victim = board.piece_at(move.to_square) - attacker = board.piece_at(move.from_square) - if not attacker: - return 0 - gain = 0 - if victim: - gain += self.piece_values.get(victim.piece_type, 0) - gain -= self.piece_values.get(attacker.piece_type, 0) - return gain - - def _hanging_pieces_penalty(self, board: chess.Board) -> int: - """Penalty for pieces that can be captured for non-negative SEE by the opponent.""" - pen_white = 0 - pen_black = 0 - # Evaluate from a neutral board state without mutating turn logic - for sq, pc in board.piece_map().items(): - if pc.piece_type == chess.KING: - continue - opp = not pc.color - # If opponent has a legal capture on this square with SEE >= 0, penalize - attackers = board.attackers(opp, sq) - if not attackers: - continue - bad = False - for a in attackers: - m = chess.Move(a, sq) - if m in board.legal_moves: - try: - see_gain = self._see_value(board, m) - except Exception: - see_gain = self.piece_values.get(pc.piece_type, 0) - 1 - if see_gain >= 0: - bad = True - break - if bad: - val = int(self.piece_values.get(pc.piece_type, 0) * 0.33) - if pc.color == chess.WHITE: - pen_white += val - else: - pen_black += val - # Convert to white-centric score - return pen_white - pen_black - - # --- Risk/Pattern helpers --- - def _is_bishop_sac_on_f2f7(self, board: chess.Board, move: chess.Move) -> bool: - pc = board.piece_at(move.from_square) - if not pc or pc.piece_type != chess.BISHOP: - return False - # Only consider captures of the f-pawn on its home square - target = chess.F2 if pc.color == chess.BLACK else chess.F7 - if move.to_square != target: - return False - if not board.is_capture(move): - return False - victim = board.piece_at(move.to_square) - if not victim or victim.piece_type != chess.PAWN: - return False - # Typically it's tempting because it's check; if not a check, still likely bad - try: - is_check = board.gives_check(move) - except Exception: - is_check = False - return True - - def _risk_score(self, board: chess.Board, move: chess.Move) -> int: - """Lower is safer. Positive values indicate tactical/material risk for the mover.""" - risk = 0 - # Negative SEE means we may be losing material on this move - try: - see = int(self._see_value(board, move)) - except Exception: - see = 0 - if see < 0: - risk += -see - # Extra risk for early bishop sac on f2/f7 - if self._is_early_game(board) and self._is_bishop_sac_on_f2f7(board, move): - risk += 600 - # Queen trap risk (e.g., greedy corner rook grabs like Qxh8?) - try: - risk += self._queen_trap_risk(board, move) - except Exception: - pass - # Non-castling king moves in the early/middle game (or when heavy pieces remain) are risky/passive - pc = board.piece_at(move.from_square) - if pc and pc.piece_type == chess.KING and not board.is_castling(move): - heavy_pieces = sum(1 for p in board.piece_map().values() if p.piece_type in (chess.QUEEN, chess.ROOK)) - if self._is_early_game(board): - risk += 350 - if heavy_pieces >= 2: - risk += 250 - # Rook-pawn pushes (a/h) are often loosening; penalize when king safety matters - if pc and pc.piece_type == chess.PAWN: - from_file = chess.square_file(move.from_square) - to_file = chess.square_file(move.to_square) - if from_file in (0, 7) and to_file in (0, 7) and not board.is_capture(move) and not move.promotion: - king_sq = board.king(pc.color) - heavy_pieces = sum(1 for p in board.piece_map().values() if p.piece_type in (chess.QUEEN, chess.ROOK)) - if self._is_early_game(board) or heavy_pieces >= 2: - risk += 180 - if king_sq in (chess.E1, chess.E8, chess.G1, chess.G8): - risk += 120 - # Risky promotions to queen into heavy fire without coverage - if pc and pc.piece_type == chess.PAWN and move.promotion == chess.QUEEN: - board.push(move) - try: - qsq = move.to_square - attackers = len(board.attackers(not pc.color, qsq)) - defenders = len(board.attackers(pc.color, qsq)) - if attackers >= max(1, defenders): - risk += 600 - finally: - board.pop() - return risk - - def _queen_trap_risk(self, board: chess.Board, move: chess.Move) -> int: - """Estimate risk of the mover's queen becoming trapped or heavily attacked after this move. - - Adds a notable penalty for queen captures on corner rooks when defenders outweigh attackers - or when the queen has very limited safe mobility from the destination square. - """ - pc = board.piece_at(move.from_square) - if not pc or pc.piece_type != chess.QUEEN: - return 0 - - # Pre-move info about target square - victim_pre = board.piece_at(move.to_square) - is_corner = move.to_square in {chess.A8, chess.H8, chess.A1, chess.H1} - is_corner_rook_capture = bool(victim_pre and victim_pre.piece_type == chess.ROOK and is_corner) - - # Simulate the move - board.push(move) - try: - my_color = not board.turn # after push, side to move flipped; queen belongs to the previous mover - qsq = move.to_square - risk = 0 - - # If queen moved to a corner, that's typically risky (limited squares) - if qsq in {chess.A8, chess.H8, chess.A1, chess.H1}: - risk += 120 - - # Count attackers/defenders on the queen's square - attackers = len(board.attackers(not my_color, qsq)) - defenders = len(board.attackers(my_color, qsq)) - if attackers >= max(1, defenders): - # Heavily attacked or under-defended queen on destination - risk += 350 - - # Estimate queen mobility: how many immediate moves are not landing on attacked squares - safe_exits = 0 - for m in board.legal_moves: - if m.from_square == qsq: - # Quick static safety: avoid landing on currently attacked squares - if not board.is_attacked_by(not my_color, m.to_square): - safe_exits += 1 - if safe_exits >= 4: - break - if safe_exits <= 1: - risk += 450 - elif safe_exits <= 3: - risk += 200 - # Extra penalty if this was a corner rook capture and exits are limited or square is contested - if is_corner_rook_capture: - base = 300 - # If heavily attacked or exits are poor, escalate - if attackers >= max(1, defenders) or safe_exits <= 2: - base += 600 - # Taking with check is often tempting; still risky. Keep the penalty significant. - risk += base - finally: - board.pop() - return risk + return cand_score, cand_expl, best_move, best_expl diff --git a/PYTHON/lichess_bot/tests/test_blunders_all.py b/PYTHON/lichess_bot/tests/test_blunders_all.py deleted file mode 100644 index 95d4612..0000000 --- a/PYTHON/lichess_bot/tests/test_blunders_all.py +++ /dev/null @@ -1,372 +0,0 @@ -import os -import sys -import chess -import pytest -import re - - -# Ensure repo root is importable when running pytest directly -# Go up to the workspace root (tests -> lichess_bot -> PYTHON -> repo root) -REPO_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) -if REPO_ROOT not in sys.path: - sys.path.insert(0, REPO_ROOT) - -from PYTHON.lichess_bot.engine import RandomEngine # noqa: E402 - -# Consolidated blunder cases from all per-game test files -BLUNDER_CASES = [ - ("r2q1rk1/pp2ppbp/2np2p1/2p3P1/2P5/5b1P/P4P2/R1B1KB1R w KQ - 1 15", "e1d2", "ply29_W_e1d2_best_c1d2"), - ("r2q1rk1/pp2ppbp/2np2p1/2p3P1/2P5/7P/P2K1P2/R1B2B1b w - - 0 16", "f2f3", "ply31_W_f2f3_best_d2e1"), - ("r2q1rk1/pp2ppbp/2np2p1/2p3P1/2P5/4Kb1P/P7/R1B2B2 b - - 1 17", "c6d4", "ply34_B_c6d4_best_d8a5"), - ("r2q1rk1/pp2ppbp/3p2p1/2p3P1/2Pn4/4Kb1P/P7/R1B2B2 w - - 2 18", "e3d3", "ply35_W_e3d3_best_c1b2"), - ("r2q1rk1/pp3pbp/3p2p1/2p1p1P1/2Pn4/3K1b1P/P7/R1B2B2 w - - 0 19", "d3e3", "ply37_W_d3e3_best_h3h4"), - ("r2q1rk1/pp3pbp/3p2p1/2p3P1/2Pnp3/4Kb1P/P7/R1B2B2 w - - 0 20", "e3f4", "ply39_W_e3f4_best_e3f2"), - ("r4rk1/pp3p1p/3p2p1/2p1b1q1/2Pn4/4pb1P/P7/R1B1KB2 b - - 1 23", "e5g3", "ply46_B_e5g3_best_g5g3"), - ("r4qk1/p1p4p/np4p1/3p1p2/3P4/3K1N2/PP3nPP/RN5R w - - 2 18", "d3c3", "ply35_W_d3c3_best_d3e2"), - ("r4qk1/p1p4p/np4p1/3p1p2/3P4/2K2N2/PP3nPP/RN5R b - - 3 18", "f2h1", "ply36_B_f2h1_best_f8b4"), - ("r4qk1/p1p4p/1p4p1/3p1p2/1n1P4/3K1N2/PP4PP/RN5n w - - 2 20", "d3e3", "ply39_W_d3e3_best_d3c3"), - ("r5k1/p1p1q2p/1p4p1/3p1p2/1n1P4/4KN2/PP4PP/RN5n w - - 4 21", "e3f4", "ply41_W_e3f4_best_f3e5"), - ("r5k1/p1p1q2p/1p4p1/3p1p2/1n1P1K2/5N2/PP4PP/RN5n b - - 5 21", "e7e4", "ply42_B_e7e4_best_b4d3"), - ("r5k1/p1p4p/1p4p1/3p1pK1/1n1P2q1/5N2/PP4PP/RN5n w - - 8 23", "g5h6", "ply45_W_g5h6_best_g5f6"), - ("r5k1/p1p4p/1p4pK/3p1p2/1n1P2q1/5N2/PP4PP/RN5n b - - 9 23", "g4h5", "ply46_B_g4h5_best_g4h5"), - ("r1bqk2r/ppp2ppp/2np1n2/2b1p3/2BPP3/2P2N2/PP3PPP/RNBQ1RK1 b kq - 0 6", "h7h5", "ply12_B_h7h5_best_c5b6"), - ("r1br3k/5B2/2n2n2/pp2p1Bp/4P3/1QP2N2/PP3PPP/RN3RK1 b - - 0 13", "h5h4", "ply26_B_h5h4_best_d8d6"), - ("r1br4/5B1k/2n2B2/pp2p3/4P2p/1QP2N2/PP3PPP/RN3RK1 w - - 1 15", "f6d8", "ply29_W_f6d8_best_f7g8"), - ("r1bB4/5B1k/2n5/pp2p3/4P2p/1QP2N2/PP3PPP/RN3RK1 b - - 0 15", "b5b4", "ply30_B_b5b4_best_a8a7"), - ("r1bB4/5B1k/2n5/p3p3/1p2P2p/1QP2N2/PP3PPP/RN3RK1 w - - 0 16", "f7d5", "ply31_W_f7d5_best_f7g8"), - ("r1bB4/7k/2n5/p2Bp3/1p2P2p/1QP2N2/PP3PPP/RN3RK1 b - - 1 16", "b4c3", "ply32_B_b4c3_best_c6d8"), - ("r1bB4/7k/2B5/p3p3/4P2p/1Qp2N2/PP3PPP/RN3RK1 b - - 0 17", "c3c2", "ply34_B_c3c2_best_a8a7"), - ("B1bB4/7k/8/p3p3/4P2p/1Q3N2/PPp2PPP/RN3RK1 b - - 0 18", "c2b1q", "ply36_B_c2b1q_best_h7g7"), - ("B1bB4/7k/8/p3p3/4P2p/1Q3N2/PP3PPP/1R3RK1 b - - 0 19", "a5a4", "ply38_B_a5a4_best_h7g7"), - ("B1bB4/5Q2/7k/4p3/p3P2p/5N2/PP3PPP/1R3RK1 w - - 2 21", "d8g5", "ply41_W_d8g5_best_d8g5"), - ("r2q1rk1/ppp1pp1p/6p1/2Pp3P/PP1nnPb1/8/8/RNB1KB1R w KQ - 0 13", "h5g6", "ply25_W_h5g6_best_a1a2"), - ("r2q1rk1/ppp1pp1p/6P1/2Pp4/PP1nnPb1/8/8/RNB1KB1R b KQ - 0 13", "f7g6", "ply26_B_f7g6_best_d4c2"), - ("r2q1rk1/ppp1p2p/6p1/2Pp4/PP1nnPb1/8/8/RNB1KB1R w KQ - 0 14", "c5c6", "ply27_W_c5c6_best_b1a3"), - ("r2q1rk1/ppp1p2p/2P3p1/3p4/PP1nnPb1/8/8/RNB1KB1R b KQ - 0 14", "g4f3", "ply28_B_g4f3_best_d4c2"), - ("r2q1rk1/ppp1p2p/2P3p1/3p4/PP1nnP2/5b2/8/RNB1KB1R w KQ - 1 15", "c6b7", "ply29_W_c6b7_best_a1a2"), - ("r2q1rk1/pPp1p2p/6p1/3p4/PP1nnP2/5b2/8/RNB1KB1R b KQ - 0 15", "a8b8", "ply30_B_a8b8_best_d4c2"), - ("1r1q1rk1/pPp1p2p/6p1/3p4/PP1nnP2/5b2/8/RNB1KB1R w KQ - 1 16", "f4f5", "ply31_W_f4f5_best_f1g2"), - ("1r1q1rk1/pPp1p2p/6p1/3p1P2/PP1nn3/5b2/8/RNB1KB1R b KQ - 0 16", "f3h1", "ply32_B_f3h1_best_d4c2"), - ("1r1q1rk1/pPp1p2p/6p1/3p1P2/PP1nn3/8/8/RNB1KB1b w Q - 0 17", "f5g6", "ply33_W_f5g6_best_c1f4"), - ("1r1q1rk1/pPp1p3/6p1/3p4/PP1nn3/8/8/RNB1KB1b w Q - 0 18", "b4b5", "ply35_W_b4b5_best_c1f4"), - ("1r1q1rk1/pPp1p3/6p1/1P1p4/P3n3/5n2/8/RNB1KB1b w Q - 1 19", "e1e2", "ply37_W_e1e2_best_e1d1"), - ("1r1q1rk1/pPp1p3/6p1/1P1p4/P3n3/5n2/4K3/RNB2B1b b - - 2 19", "e4g3", "ply38_B_e4g3_best_f3d4"), - ("1r1q1rk1/pPp1p3/6p1/1P1p4/P7/5nn1/4K3/RNB2B1b w - - 3 20", "e2e3", "ply39_W_e2e3_best_e2d1"), - ("1r1q1rk1/pPp1p3/6p1/1P1p4/P7/3K4/8/RNB1nn1b w - - 2 22", "d3d4", "ply43_W_d3d4_best_d3c3"), - ("1r1q2k1/pPp1p3/6p1/1P1p4/P2K1r2/8/8/RNB1nn1b w - - 4 23", "d4e5", "ply45_W_d4e5_best_d4c3"), - ("1r1q2k1/pPp1p3/6p1/1P1pK3/P4r2/8/8/RNB1nn1b b - - 5 23", "d8d6", "ply46_B_d8d6_best_f4e4"), - ("rnbqkb1r/pppppppp/8/4P3/5n2/2NP4/PPP2PPP/R1BQKBNR b KQkq - 0 4", "g7g6", "ply8_B_g7g6_best_f4g6"), - ("rnbqkb1r/pppppp1p/6p1/4P3/5n2/2NP4/PPP2PPP/R1BQKBNR w KQkq - 0 5", "g1f3", "ply9_W_g1f3_best_c1f4"), - ("rnbqkb1r/pppppp1p/6p1/4P3/5n2/2NP1N2/PPP2PPP/R1BQKB1R b KQkq - 1 5", "f8g7", "ply10_B_f8g7_best_f4e6"), - ("rnbq1rk1/3p1pbp/p1p3p1/3pP3/Pp3BPP/2N2N2/1PP2P2/R2QKB1R w KQ - 0 13", "b2b3", "ply25_W_b2b3_best_c3e2"), - ("1r1q1r2/pPp1pp1k/5P2/3p4/P7/1b6/8/bNB1KB1n b - - 0 18", "a1c3", "ply36_B_a1c3_best_e7f6"), - ("1r1q1r2/pPp1pp1k/5P2/3p4/P7/1bb5/8/1NB1KB1n w - - 1 19", "b1d2", "ply37_W_b1d2_best_b1c3"), - ("1r1q1r2/pPp1pp1k/5P2/3p4/P7/1bb5/3N4/2B1KB1n b - - 2 19", "c3d2", "ply38_B_c3d2_best_e7f6"), - ("1r1q1r2/pPp1pp1k/5P2/3p4/P7/1b6/3b4/2B1KB1n w - - 0 20", "c1d2", "ply39_W_c1d2_best_e1d2"), - ("1r1q1r2/pPp1pp1k/5P2/3p4/P7/1b6/3B4/4KB1n b - - 0 20", "h1g3", "ply40_B_h1g3_best_e7f6"), - ("1r1q1r2/pPp1pp1k/5P2/3p4/P7/1b4n1/3B4/4KB2 w - - 1 21", "f6e7", "ply41_W_f6e7_best_f1d3"), - ("1r3r2/pPp1qp1k/8/3p4/P7/1b4n1/3B4/4KB2 w - - 0 22", "f1e2", "ply43_W_f1e2_best_e1f2"), - ("1r3r2/pPp1qp1k/8/3p4/P7/1b4n1/3BB3/4K3 b - - 1 22", "e7e3", "ply44_B_e7e3_best_e7e2"), - ("1r3r2/pPp2p1k/8/3p4/P7/1b2q1n1/3BB3/4K3 w - - 2 23", "a4a5", "ply45_W_a4a5_best_d2e3"), - ("1r3r2/pPp2p1k/8/P2p4/8/1b2q1n1/3BB3/4K3 b - - 0 23", "e3e2", "ply46_B_e3e2_best_e3e2"), - ("r1bqk2r/ppp2ppp/2np1n2/2b1p3/2BPP3/2P2N2/PP3PPP/RNBQ1RK1 b kq - 0 6", "d6d5", "ply12_B_d6d5_best_c5b6"), - ("r1bqk2r/ppp2ppp/2n2n2/2bpp3/2BPP3/2P2N2/PP3PPP/RNBQ1RK1 w kq - 0 7", "d4c5", "ply13_W_d4c5_best_e4d5"), - ("r1bqk2r/ppp2ppp/2n2n2/2Ppp3/2B1P3/2P2N2/PP3PPP/RNBQ1RK1 b kq - 0 7", "d5e4", "ply14_B_d5e4_best_d5c4"), - ("r1bB2kr/2p2p2/p1B5/2P3pp/1Pp1p3/4P3/P2N2PP/RN1Q1RK1 b - - 0 17", "g8h7", "ply34_B_g8h7_best_c8g4"), - ("B1bB3r/2p2p1k/p7/2P3pp/1Pp1p3/4P3/P2N2PP/RN1Q1RK1 b - - 0 18", "h7g7", "ply36_B_h7g7_best_h7g6"), - ("B1b4r/2p2pk1/p4B2/2P3pp/1Pp1p3/4P3/P2N2PP/RN1Q1RK1 b - - 2 19", "g7g8", "ply38_B_g7g8_best_g7h6"), - ("B1b3kB/2p2p2/p7/2P3pp/1Pp1p3/4P3/P2N2PP/RN1Q1RK1 b - - 0 20", "g8f8", "ply40_B_g8f8_best_c8g4"), - ("B1b2k1B/2p2p2/p7/2P3pQ/1Pp1p3/4P3/P2N2PP/RN3RK1 b - - 0 21", "f8e8", "ply42_B_f8e8_best_f7f6"), - ("B1b1k2B/2p2R2/p7/2P3pQ/1Pp1p3/4P3/P2N2PP/RN4K1 b - - 0 22", "c7c6", "ply44_B_c7c6_best_c8d7"), - ("5k1B/3R4/p1B5/2P3pQ/1Pp1p3/4P3/P2N2PP/RN4K1 w - - 1 25", "d7d8", "ply49_W_d7d8_best_h5f7"), - ("3R3B/4k3/p1B5/2P3pQ/1Pp1p3/4P3/P2N2PP/RN4K1 w - - 3 26", "h5e8", "ply51_W_h5e8_best_h5e8"), - ("r1bqk2r/ppp2ppp/2np1n2/2b1p3/2BPP3/2P2N2/PP3PPP/RNBQ1RK1 b kq - 0 6", "e8g8", "ply12_B_e8g8_best_c5b6"), - ("r1bq1r1k/ppP2ppp/2n2n2/4p1B1/2B1P3/1NP2N2/PP3PPP/R2Q1RK1 b - - 0 12", "h8g8", "ply24_B_h8g8_best_d8c7"), - ("r1bR2k1/pp3ppp/2n2n2/4p1B1/2B1P3/1NP2N2/PPQ2PPP/5RK1 b - - 0 16", "f6e8", "ply32_B_f6e8_best_c6d8"), - ("r1bRn1k1/pp3ppp/2n5/4p1B1/2B1P3/1NP2N2/PPQ2PPP/5RK1 w - - 1 17", "d8e8", "ply33_W_d8e8_best_d8e8"), - ("r4r2/p2p3k/n2Np1p1/q1p5/P5Q1/8/3NKP2/8 b - - 2 24", "a5c7", "ply48_B_a5c7_best_f8f2"), - ("r4N2/p6k/n5pq/P1pp4/6Q1/5N2/4KP2/8 b - - 0 30", "h6f8", "ply60_B_h6f8_best_a8f8"), - ("r4q2/p6k/n5p1/P1pp4/6Q1/5N2/4KP2/8 w - - 0 31", "g4h3", "ply61_W_g4h3_best_f3g5"), - ("r4qk1/p2Q4/n5p1/P1pp4/8/5N2/4KP2/8 w - - 4 33", "e2e3", "ply65_W_e2e3_best_d7d5"), - ("4rqk1/p2Q4/n5p1/P1pp4/8/4KN2/5P2/8 w - - 6 34", "e3d2", "ply67_W_e3d2_best_d7e8"), - ("4rqk1/p2Q4/n5p1/P1pp4/8/5N2/3K1P2/8 b - - 7 34", "d5d4", "ply68_B_d5d4_best_f8f4"), - ("4rqk1/p2Q4/n5p1/P1p5/3p4/5N2/3K1P2/8 w - - 0 35", "d7a7", "ply69_W_d7a7_best_d2c1"), - ("4r1k1/Q7/n5p1/P1p5/3p4/5q2/3K1P2/8 w - - 0 36", "d2c2", "ply71_W_d2c2_best_a7a8"), - ("6k1/Q7/n5p1/P1p5/3p4/8/4rq2/3K4 w - - 0 38", "d1c1", "ply75_W_d1c1_best_a7a8"), - ("6k1/Q7/n5p1/P1p5/3p4/8/4rq2/2K5 b - - 1 38", "f2e1", "ply76_B_f2e1_best_e2e1"), - ("r1bqk2r/ppp2ppp/2np1n2/2b5/2BPP3/5N2/PP3PPP/RNBQ1RK1 b kq - 0 7", "f6e4", "ply14_B_f6e4_best_c5b6"), - ("r2qk2r/pppb2pp/2n5/2p3B1/Q1B1p3/5N2/PP3PPP/R4RK1 b kq - 1 12", "e4f3", "ply24_B_e4f3_best_c6d4"), - ("r2Bk2r/pppb2pp/2n5/2p5/Q1B5/8/PP3PpP/R4RK1 w kq - 0 14", "g1g2", "ply27_W_g1g2_best_f1d1"), - ("r1k4r/pppb2pp/2n5/2p5/2B5/1Q6/PP3PKP/3R1R2 b - - 3 16", "g7g6", "ply32_B_g7g6_best_c6d4"), - ("rk5r/ppp4p/2n2Qp1/2p5/8/8/PP3PKP/3R1R2 b - - 2 19", "b7b5", "ply38_B_b7b5_best_h8c8"), - ("rk5r/p1p4p/2n2Qp1/1pp5/8/8/PP3PKP/3R1R2 w - - 0 20", "f6h8", "ply39_W_f6h8_best_f6c6"), - ("r7/1kp4p/2n2Qp1/ppp5/8/8/PP3PKP/3R1R2 w - - 0 22", "d1d6", "ply43_W_d1d6_best_f1e1"), - ("8/8/2k3pR/1p6/p1p5/8/PP3PKP/8 b - - 1 29", "c6d7", "ply58_B_c6d7_best_b5b4"), - ("4k3/8/6R1/1p6/p1p5/8/PP3PKP/8 w - - 1 31", "f2f4", "ply61_W_f2f4_best_g2f3"), - ("4k3/8/6R1/1p3P2/p1p5/8/PP4KP/8 w - - 1 33", "f5f6", "ply65_W_f5f6_best_g2f3"), - ("4k3/8/5PR1/1p6/p1p5/8/PP4KP/8 b - - 0 33", "e8d8", "ply66_B_e8d8_best_b5b4"), - ("5k2/5P1R/8/1p6/p1p4P/8/PP4K1/8 w - - 1 38", "h4h5", "ply75_W_h4h5_best_g2f3"), - ("5k2/5P1R/8/1p5P/p1p5/8/PP4K1/8 b - - 0 38", "f8e7", "ply76_B_f8e7_best_b5b4"), - ("5k2/5PR1/7P/1p6/p1p5/8/PP4K1/8 b - - 2 40", "c4c3", "ply80_B_c4c3_best_a4a3"), - ("5k2/5PR1/7P/1p6/p7/2P5/P5K1/8 b - - 0 41", "f8e7", "ply82_B_f8e7_best_a4a3"), - ("5Q2/6R1/8/1p1k3Q/p7/2P5/P5K1/8 b - - 2 45", "d5e6", "ply90_B_d5e6_best_d5c4"), - ("5Q2/6R1/4k3/1p5Q/p7/2P5/P5K1/8 w - - 3 46", "g7g6", "ply91_W_g7g6_best_h5f5"), - ("5Q2/3k4/6R1/1p5Q/p7/2P5/P5K1/8 w - - 5 47", "f8f7", "ply93_W_f8f7_best_h5h7"), - ("3k4/5Q2/6R1/1p5Q/p7/2P5/P5K1/8 w - - 7 48", "h5h8", "ply95_W_h5h8_best_h5h8"), - ("r1bqk2r/ppp2ppp/2np1n2/2b1p1B1/2B1P3/3P1N2/PPP2PPP/RN1Q1RK1 b kq - 1 6", "c5f2", "ply12_B_c5f2_best_h7h6"), - ("r1bq1rk1/pp3ppp/3p1n2/2p1p1B1/2P1P3/2PP1N2/P5PP/RN1Q1RK1 b - - 0 11", "f6e4", "ply22_B_f6e4_best_h7h6"), - ("3r1r2/pp3ppk/3N3p/2p1N3/4P3/2P5/P5PP/R2Q1RK1 b - - 0 17", "h7g8", "ply34_B_h7g8_best_f7f6"), - ("3r1rk1/pp3pp1/3N3p/2p1N3/4P3/2P5/P5PP/R2Q1RK1 w - - 1 18", "e5f7", "ply35_W_e5f7_best_f1f7"), - ("3r1rk1/pp3Np1/3N3p/2p5/4P3/2P5/P5PP/R2Q1RK1 b - - 0 18", "f8f7", "ply36_B_f8f7_best_d8d7"), - ("5Q2/p5pk/4P2p/1pp5/8/2P5/P5PP/5RK1 b - - 0 24", "h7g6", "ply48_B_h7g6_best_b5b4"), - ("5Q2/p5p1/4P1kp/1pp5/8/2P5/P5PP/5RK1 w - - 1 25", "e6e7", "ply49_W_e6e7_best_f8f5"), - ("5Q2/p3P1p1/6kp/1pp5/8/2P5/P5PP/5RK1 b - - 0 25", "g6h7", "ply50_B_g6h7_best_g6h5"), - ("4QQ2/p6k/6pp/1pp5/8/2P5/P5PP/5RK1 w - - 0 27", "f8h8", "ply53_W_f8h8_best_f8h8"), - ("rnb1kb1r/pp3ppp/4q3/2p3N1/4p3/8/PPPPNPPP/R1BQ1RK1 b kq - 1 9", "e6a2", "ply18_B_e6a2_best_e6g6"), - ("2kr3r/1p4p1/4bp2/7p/Q2b1B2/2P5/1P3PPP/5RK1 w - - 0 20", "c3d4", "ply39_W_c3d4_best_c3d4"), - ("2kr3r/1p4p1/4bp2/7p/Q2P1B2/8/1P3PPP/5RK1 b - - 0 20", "e6d5", "ply40_B_e6d5_best_d8d5"), - ("2kr3r/1p4p1/5p2/3b3p/Q2P1B2/8/1P3PPP/5RK1 w - - 1 21", "a4a8", "ply41_W_a4a8_best_f1e1"), - ("3r3r/1p1k2p1/5p2/3b3p/Q2P1B2/8/1P3PPP/5RK1 b - - 4 22", "d7c8", "ply44_B_d7c8_best_d7e6"), - ("2kr3r/1p4p1/5p2/Q2b3p/3P1B2/8/1P3PPP/5RK1 b - - 6 23", "b7b6", "ply46_B_b7b6_best_d8d6"), - ("2kr3r/6p1/1Q3p2/3b3p/3P1B2/8/1P3PPP/5RK1 b - - 0 24", "c8d7", "ply48_B_c8d7_best_d8d6"), - ("3r3r/3k2p1/1Q3p2/3b3p/3P1B2/8/1P3PPP/5RK1 w - - 1 25", "f4c7", "ply49_W_f4c7_best_f1e1"), - ("3r3r/2Bk2p1/1Q3p2/3b3p/3P4/8/1P3PPP/5RK1 b - - 2 25", "d8c8", "ply50_B_d8c8_best_d7e8"), - ("2r4r/2Bk2p1/1Q3p2/3b3p/3P4/8/1P3PPP/5RK1 w - - 3 26", "c7b8", "ply51_W_c7b8_best_b6d6"), - ("1r5r/Q5p1/4kp2/3b3p/3P4/8/1P3PPP/4R1K1 b - - 3 28", "e6d6", "ply56_B_e6d6_best_e6f5"), - ("1r5r/2k1R1p1/5p2/3Q3p/3P4/8/1P3PPP/6K1 b - - 2 31", "c7c8", "ply62_B_c7c8_best_c7b6"), - ("1rk4r/4R1p1/5p2/3Q3p/3P4/8/1P3PPP/6K1 w - - 3 32", "d5c5", "ply63_W_d5c5_best_d5d7"), - ("1r1k3r/4R1p1/5p2/2Q4p/3P4/8/1P3PPP/6K1 w - - 5 33", "d4d5", "ply65_W_d4d5_best_c5c7"), - ("3k3r/1Q2R3/5pp1/3P3p/8/8/1P3PPP/6K1 w - - 0 37", "d5d6", "ply73_W_d5d6_best_b7c7"), - ("3k3r/1Q2R3/3P1p2/6pp/8/8/1P3PPP/6K1 w - - 0 38", "b7b8", "ply75_W_b7b8_best_b7c7"), - ("rnb2rk1/pp2bppp/8/8/2qN4/2N5/PPP2PPP/R1BQK2R w - - 3 13", "b2b3", "ply25_W_b2b3_best_d1d3"), - ("rn3rk1/pp2bppp/8/3Q1b2/8/1P6/2q2PPP/2B2K1R w - - 0 18", "d5b7", "ply35_W_d5b7_best_d5d2"), - ("rn3rk1/pQ2bppp/8/5b2/8/1P6/2q2PPP/2B2K1R b - - 0 18", "c2f2", "ply36_B_c2f2_best_c2d1"), - ("6k1/p2n1ppp/8/2b5/6b1/1P6/3K3P/4R3 b - - 0 27", "c5b4", "ply54_B_c5b4_best_c5b4"), - ("6k1/p2n1ppp/8/8/5Kb1/1P6/7P/4b3 b - - 1 29", "e1c3", "ply58_B_e1c3_best_g4e6"), - ("6k1/p2n1ppp/8/8/6K1/1Pb5/7P/8 b - - 0 30", "d7f6", "ply60_B_d7f6_best_g7g6"), - ("6k1/p4ppp/5n2/5K2/8/1Pb5/7P/8 b - - 2 31", "f6d7", "ply62_B_f6d7_best_g7g6"), - ("6k1/p2n1ppp/8/5K2/8/1Pb5/7P/8 w - - 3 32", "f5e4", "ply63_W_f5e4_best_f5e4"), - ("6k1/p2n1ppp/8/8/4K3/1Pb5/7P/8 b - - 4 32", "d7f6", "ply64_B_d7f6_best_d7b6"), - ("6k1/p4ppp/8/4b2n/4K3/1P5P/8/8 b - - 2 35", "e5b8", "ply70_B_e5b8_best_e5c7"), - ("1b4k1/p4ppp/8/7n/4K3/1P5P/8/8 w - - 3 36", "e4f5", "ply71_W_e4f5_best_e4d3"), - ("1b4k1/p4ppp/8/5K1n/8/1P5P/8/8 b - - 4 36", "b8d6", "ply72_B_b8d6_best_g7g6"), - ("6k1/p4ppp/3b4/5K1n/8/1P5P/8/8 w - - 5 37", "f5g5", "ply73_W_f5g5_best_f5e4"), - ("6k1/p4ppp/3b4/6Kn/8/1P5P/8/8 b - - 6 37", "h5f4", "ply74_B_h5f4_best_g7g6"), - ("6k1/p4ppp/3b4/8/5nK1/1P5P/8/8 b - - 8 38", "h7h5", "ply76_B_h7h5_best_g7g6"), - ("6k1/p4pp1/3b4/7p/5nK1/1P5P/8/8 w - - 0 39", "g4g3", "ply77_W_g4g3_best_g4f3"), - ("6k1/p4pp1/3b4/7p/5n2/1P4KP/8/8 b - - 1 39", "f4e6", "ply78_B_f4e6_best_f4d5"), - ("6k1/p4pp1/3bn3/7p/8/1P4KP/8/8 w - - 2 40", "g3h4", "ply79_W_g3h4_best_g3f3"), - ("6k1/p4pp1/3bn3/7p/7K/1P5P/8/8 b - - 3 40", "e6c5", "ply80_B_e6c5_best_g7g6"), - ("6k1/p4pp1/3b4/2n4p/7K/1P5P/8/8 w - - 4 41", "h4h5", "ply81_W_h4h5_best_h4h5"), - ("6k1/p4pp1/3b4/2n4K/8/1P5P/8/8 b - - 0 41", "c5b3", "ply82_B_c5b3_best_g7g6"), - ("6k1/p4pp1/3b4/6K1/8/1n5P/8/8 b - - 1 42", "d6e7", "ply84_B_d6e7_best_b3d4"), - ("6k1/p3bpp1/8/5K2/8/1n5P/8/8 b - - 3 43", "b3d4", "ply86_B_b3d4_best_a7a5"), - ("8/p3kpp1/5b2/8/3nK3/7P/8/8 w - - 10 47", "e4f4", "ply93_W_e4f4_best_e4d3"), - ("8/p3kpp1/4nb2/8/5K2/7P/8/8 w - - 12 48", "f4f5", "ply95_W_f4f5_best_f4e3"), - ("6k1/p4pp1/5b2/3K4/3n4/7P/8/8 w - - 18 51", "d5d6", "ply101_W_d5d6_best_d5c4"), - ("6k1/p4pp1/4nb2/2K5/8/7P/8/8 w - - 24 54", "c5d6", "ply107_W_c5d6_best_c5b4"), - ("6k1/p4pp1/3K1b2/8/5n2/7P/8/8 w - - 26 55", "d6d7", "ply109_W_d6d7_best_h3h4"), - ("6k1/p2K1pp1/5b2/8/8/7n/8/8 w - - 0 56", "d7e8", "ply111_W_d7e8_best_d7c6"), - -] - - -# --- Helpers to resolve optimal move (UCI) from past game logs by label --- -_PLY_LABEL_RE = re.compile( - r"^ply(?P\d+)_([WB])_([a-h][1-8][a-h][1-8](?:[qrbn])?)(?:_best_([a-h][1-8][a-h][1-8](?:[qrbn])?))?$" -) - - -def _parse_label(label: str) -> tuple[int | None, str | None, str | None, str | None]: - m = _PLY_LABEL_RE.match(label) - if not m: - return None, None, None, None - num = int(m.group("num")) - # Extract side and the trailing UCI from label for context if needed - parts = label.split("_") - side = parts[1] if len(parts) > 1 else None - uci = parts[2] if len(parts) > 2 else None - best_uci = None - if len(parts) >= 5 and parts[3] == 'best': - best_uci = parts[4] - return num, side, uci, best_uci - - -def _iter_past_game_logs(repo_root: str): - # Search both the canonical tools/past_games folder and repo root, since logs may be kept in either - candidate_dirs = [ - os.path.join(repo_root, "PYTHON", "lichess_bot", "tools", "past_games"), - repo_root, - ] - seen = set() - for logs_dir in candidate_dirs: - if not os.path.isdir(logs_dir): - continue - for name in os.listdir(logs_dir): - if not name.startswith("lichess_bot_game_") or not name.endswith(".log"): - continue - if (logs_dir, name) in seen: - continue - seen.add((logs_dir, name)) - path = os.path.join(logs_dir, name) - try: - with open(path, "r", encoding="utf-8", errors="ignore") as f: - yield name, f.read() - except Exception: - continue - - -def _bot_side_from_log(txt: str) -> str | None: - # Try to infer whether VibeBot was White (W) or Black (B) - # Look for line: Players: VibeBot vs Reduktor OR Reduktor vs VibeBot - for line in txt.splitlines(): - if line.startswith("Players:"): - low = line.lower() - if "vibebot vs" in low: - return "W" - if "vs vibebot" in low: - return "B" - return None - - -def _resolve_optimal_uci_from_logs(repo_root: str, label: str, fen: str) -> str | None: - ply_num, side, _, _ = _parse_label(label) - if ply_num is None or side is None: - return None - target_prefix = f"ply {ply_num}:" - # We'll only return a move that's legal in the provided FEN - try: - position = chess.Board(fen) - except Exception: - position = None - for fname, txt in _iter_past_game_logs(repo_root): - bot_side = _bot_side_from_log(txt) - # Prefer logs where the bot side matches the label side (heuristic) - if bot_side is not None and bot_side != side: - continue - lines = txt.splitlines() - for i, line in enumerate(lines): - if line.strip().startswith(target_prefix): - # Look for a following line starting with 'best ' - # Usually immediate or within a couple of lines - for j in range(i + 1, min(i + 6, len(lines))): - s = lines[j].strip() - if s.startswith("best "): - # Expect format: best () ... - m = re.search(r"\(([a-h][1-8][a-h][1-8](?:[qrbn])?)\)", s) - if m: - uci = m.group(1) - if position is not None: - try: - mv = chess.Move.from_uci(uci) - if mv in position.legal_moves: - return uci - else: - # Skip illegal candidates and keep searching - continue - except Exception: - continue - return uci - # Fallback: sometimes lines like 'avoided risky ...' appear first, keep scanning - continue - # If we hit here, this ply in this log lacks a 'best' line nearby - # Try a little farther just in case - for j in range(i + 1, min(i + 12, len(lines))): - s = lines[j].strip() - if s.startswith("best "): - m = re.search(r"\(([a-h][1-8][a-h][1-8](?:[qrbn])?)\)", s) - if m: - uci = m.group(1) - if position is not None: - try: - mv = chess.Move.from_uci(uci) - if mv in position.legal_moves: - return uci - else: - continue - except Exception: - continue - return uci - # Otherwise move to next log - return None - - -@pytest.mark.parametrize('fen,blunder_uci,label', BLUNDER_CASES, ids=[c[2] for c in BLUNDER_CASES]) -def test_engine_avoids_logged_blunder(fen, blunder_uci, label): - board = chess.Board(fen) - eng = RandomEngine(depth=4, max_time_sec=1.2) - # Prefer explanation variant if available for better failure messages - move = None - explanation = '' - if hasattr(eng, 'choose_move_with_explanation'): - try: - mv, expl = eng.choose_move_with_explanation(board, time_budget_sec=1.2) - move, explanation = mv, expl or '' - except Exception: - move = eng.choose_move(board) - else: - move = eng.choose_move(board) - assert move is not None, 'Engine returned no move' - assert move in board.legal_moves, 'Engine move is illegal' - # If the engine repeats the blunder, gather deeper diagnostics: compare eval of blunder vs engine's best - if move.uci() == blunder_uci: - # Pick optimal move from parametrized data, with fallback to resolving from logs - # First, try to extract an explicit optimal UCI encoded in the test label - _, _, _, optimal_from_label = _parse_label(label) - optimal_from_logs = optimal_from_label or _resolve_optimal_uci_from_logs(REPO_ROOT, label, fen) - details = [ - f'Engine repeated blunder {blunder_uci} at {label}.', - f'engine_move_explanation: {explanation}'.strip(), - ] - # Try to request a side-by-side evaluation (blunder vs optimal) from the engine if available - try: - if hasattr(eng, 'evaluate_proposed_move_with_suggestion'): - try: - proposed_score_cp, proposed_expl, best_move, best_expl = eng.evaluate_proposed_move_with_suggestion( - board, blunder_uci, time_budget_sec=1.0 - ) - details.append('--- comparative analysis (engine provided) ---') - details.append(f'blunder {blunder_uci}: score={proposed_score_cp}cp explanation: {proposed_expl}') - # If we found an optimal move from logs, evaluate that exact move as well - if optimal_from_logs: - try: - opt_score_cp, opt_expl, _, _ = eng.evaluate_proposed_move_with_suggestion( - board, optimal_from_logs, time_budget_sec=1.0 - ) - details.append(f'optimal_from_logs {optimal_from_logs}: score={opt_score_cp}cp explanation: {opt_expl}') - except Exception as e: - details.append(f'optimal_from_logs {optimal_from_logs}: ') - elif optimal_from_logs is None: - details.append('optimal_from_logs: ') - if best_move is not None: - details.append( - f'engine_optimal {best_move.uci()}: explanation: {best_expl}' - ) - else: - details.append('engine_optimal: ') - except Exception as e: # fall back if the evaluation API fails - # Fallback 1: at least get engine optimal move + explanation - if hasattr(eng, 'choose_move_with_explanation'): - try: - best_mv, best_expl = eng.choose_move_with_explanation(board, time_budget_sec=1.0) - details.append('--- fallback analysis ---') - details.append(f'engine_optimal {best_mv.uci() if best_mv else ""}: explanation: {best_expl}') - except Exception: - details.append(f'engine_optimal: ') - else: - # Fallback 2: engine lacks evaluation API; get its best move explanation as context - if hasattr(eng, 'choose_move_with_explanation'): - try: - best_mv, best_expl = eng.choose_move_with_explanation(board, time_budget_sec=1.0) - details.append('--- fallback analysis ---') - details.append(f'engine_optimal {best_mv.uci() if best_mv else ""}: explanation: {best_expl}') - if optimal_from_logs: - details.append(f'optimal_from_logs {optimal_from_logs} (no eval available)') - except Exception as e: - details.append(f'engine_optimal: ') - except Exception as outer: - details.append(f'') - - pytest.fail("\n".join(d for d in details if d)) - - assert move.uci() != blunder_uci, f'Engine repeated blunder {blunder_uci} at {label}. Explanation: {explanation}' diff --git a/PYTHON/lichess_bot/tests/test_engine.py b/PYTHON/lichess_bot/tests/test_engine.py deleted file mode 100644 index fa0c845..0000000 --- a/PYTHON/lichess_bot/tests/test_engine.py +++ /dev/null @@ -1,10 +0,0 @@ -import chess -from PYTHON.lichess_bot.engine import RandomEngine - - -def test_random_engine_returns_move_on_start_position(): - board = chess.Board() - eng = RandomEngine() - move = eng.choose_move(board) - assert move is not None - assert move in board.legal_moves diff --git a/PYTHON/lichess_bot/tests/test_puzzles_top10.py b/PYTHON/lichess_bot/tests/test_puzzles_top10.py new file mode 100644 index 0000000..95b1f62 --- /dev/null +++ b/PYTHON/lichess_bot/tests/test_puzzles_top10.py @@ -0,0 +1,64 @@ +import csv +import os +from typing import List, Tuple + +import chess +import pytest + +from PYTHON.lichess_bot.engine import RandomEngine + + +def _load_top_puzzles(csv_path: str, limit: int = 10) -> List[Tuple[str, str]]: + """ + Return a list of (FEN, solution_moves_str) for the first `limit` rows in the CSV. + CSV columns: PuzzleId,FEN,Moves,... + """ + puzzles: List[Tuple[str, str]] = [] + with open(csv_path, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + fen = row["FEN"].strip() + moves = row["Moves"].strip() + if fen and moves: + puzzles.append((fen, moves)) + if len(puzzles) >= limit: + break + return puzzles + + +@pytest.mark.parametrize("fen,moves_str", _load_top_puzzles(os.path.join(os.path.dirname(__file__), "lichess_db_puzzle.csv"))) +def test_puzzle_engine_follow_solution(fen: str, moves_str: str): + board = chess.Board(fen) + eng = RandomEngine(max_time_sec=1.0) + + # Moves are space-separated UCIs alternating sides starting from side-to-move in the FEN + solution_moves = moves_str.split() + step = 0 + for uci in solution_moves: + step += 1 + # Engine move on this ply + mv, expl = eng.choose_move_with_explanation(board, time_budget_sec=0.5) + assert mv is not None, f"No move returned at step {step}.\nExplanation: {expl}" + + # If engine move differs from solution, fail immediately but provide analysis of the correct move + if mv.uci() != uci: + # Ask the engine to analyze the correct move for debug + score_cp, proposed_expl, best_mv, best_expl = eng.evaluate_proposed_move_with_suggestion(board, uci, time_budget_sec=0.5) + details = [ + f"Puzzle failed at step {step}.", + f"FEN: {fen}", + f"Expected: {uci}", + f"Engine played: {mv.uci()}", + "--- engine explanation ---", + expl, + "--- analysis of expected move ---", + f"score_cp: {score_cp}", + proposed_expl, + ] + if best_mv is not None: + details.append("--- engine best move analysis ---") + details.append(best_expl) + pytest.fail("\n".join(details)) + + # Apply the move and continue + board.push(mv) diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..e69de29