mirror of
https://github.com/kuhyx/testsAndMisc.git
synced 2026-07-04 14:23:16 +02:00
Achieve 100% test coverage for lichess_bot/main.py
- Refactor loops to use explicit next()/StopIteration for coverage - Add tests for _collect_analysis_lines with empty and full iterators - Add tests for _process_game_events_loop with multiple game events - Add tests for _run_event_loop with limited and unlimited iterations - Add test for process_analysis_output with error exit but no stderr - Add test for process_game_finish with invalid data type - All 85 tests pass with 100% line and branch coverage
This commit is contained in:
parent
38fe3ef53e
commit
94fef50913
@ -406,21 +406,13 @@ def _process_analysis_output(
|
||||
proc: subprocess.Popen[str], game_id: str, total_plies: int
|
||||
) -> str | None:
|
||||
"""Process analysis subprocess output and return analysis text."""
|
||||
analyzed = 0
|
||||
lines: list[str] = []
|
||||
|
||||
# stdout/stderr are guaranteed non-None with PIPE, but verify at runtime
|
||||
if proc.stdout is None or proc.stderr is None:
|
||||
proc.terminate()
|
||||
msg = "subprocess pipes unexpectedly None"
|
||||
raise RuntimeError(msg)
|
||||
|
||||
for line in proc.stdout:
|
||||
lines.append(line)
|
||||
m = _PLY_LINE_RE.match(line)
|
||||
if m:
|
||||
analyzed += 1
|
||||
_log_analysis_progress(game_id, analyzed, total_plies)
|
||||
__analyzed, lines = _collect_analysis_lines(proc.stdout, game_id, total_plies)
|
||||
|
||||
stderr_text = proc.stderr.read() or ""
|
||||
ret = proc.wait()
|
||||
@ -435,6 +427,29 @@ def _process_analysis_output(
|
||||
return analysis_text
|
||||
|
||||
|
||||
def _collect_analysis_lines(
|
||||
stdout: Iterator[str], game_id: str, total_plies: int
|
||||
) -> tuple[int, list[str]]:
|
||||
"""Collect and process analysis lines from stdout.
|
||||
|
||||
Returns:
|
||||
Tuple of (analyzed_count, lines_list).
|
||||
"""
|
||||
analyzed = 0
|
||||
lines: list[str] = []
|
||||
while True:
|
||||
try:
|
||||
line = next(stdout)
|
||||
except StopIteration:
|
||||
break
|
||||
lines.append(line)
|
||||
m = _PLY_LINE_RE.match(line)
|
||||
if m:
|
||||
analyzed += 1
|
||||
_log_analysis_progress(game_id, analyzed, total_plies)
|
||||
return analyzed, lines
|
||||
|
||||
|
||||
def _log_analysis_progress(game_id: str, analyzed: int, total_plies: int) -> None:
|
||||
"""Log analysis progress."""
|
||||
if total_plies:
|
||||
@ -518,6 +533,28 @@ def _finalize_game(state: GameState, meta: GameMeta) -> None:
|
||||
_logger.debug("Game %s: analysis run failed: %s", meta.game_id, e)
|
||||
|
||||
|
||||
def _process_game_events_loop(
|
||||
events: Iterator[dict[str, object]],
|
||||
ctx: BotContext,
|
||||
state: GameState,
|
||||
meta: GameMeta,
|
||||
) -> None:
|
||||
"""Process game events from an iterator until game ends.
|
||||
|
||||
This is extracted to allow testing the loop exhaustion branch.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
event = next(events)
|
||||
except StopIteration:
|
||||
break
|
||||
et = event.get("type")
|
||||
if et in ("chatLine", "opponentGone"):
|
||||
continue
|
||||
if not _process_game_event(event, ctx, state, meta):
|
||||
break
|
||||
|
||||
|
||||
def _handle_game(game_id: str, ctx: BotContext, my_color: str | None = None) -> None:
|
||||
"""Handle a single game from start to finish."""
|
||||
_logger.info("Starting game thread for %s [bot v%s]", game_id, ctx.bot_version)
|
||||
@ -527,12 +564,8 @@ def _handle_game(game_id: str, ctx: BotContext, my_color: str | None = None) ->
|
||||
state.log_path = _init_game_log(game_id, ctx.bot_version)
|
||||
|
||||
try:
|
||||
for event in ctx.api.stream_game_events(game_id):
|
||||
et = event.get("type")
|
||||
if et in ("chatLine", "opponentGone"):
|
||||
continue
|
||||
if not _process_game_event(event, ctx, state, meta):
|
||||
break
|
||||
events = ctx.api.stream_game_events(game_id)
|
||||
_process_game_events_loop(events, ctx, state, meta)
|
||||
except requests.RequestException:
|
||||
_logger.exception("Game %s thread error", game_id)
|
||||
finally:
|
||||
@ -642,8 +675,20 @@ def _safe_event_loop_iteration(
|
||||
return backoff_sleep(backoff)
|
||||
|
||||
|
||||
def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> None:
|
||||
"""Start the bot and listen for incoming events."""
|
||||
def run_bot(
|
||||
log_level: str = "INFO",
|
||||
*,
|
||||
decline_correspondence: bool = False,
|
||||
max_iterations: int | None = None,
|
||||
) -> None:
|
||||
"""Start the bot and listen for incoming events.
|
||||
|
||||
Args:
|
||||
log_level: Logging level (default: INFO).
|
||||
decline_correspondence: Whether to decline correspondence challenges.
|
||||
max_iterations: Maximum event loop iterations (None for infinite).
|
||||
Used for testing.
|
||||
"""
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, log_level.upper(), logging.INFO),
|
||||
format="[%(asctime)s] %(levelname)s %(threadName)s: %(message)s",
|
||||
@ -670,8 +715,27 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
|
||||
_logger.info("Connecting to Lichess event stream. Waiting for challenges...")
|
||||
backoff = 0
|
||||
|
||||
while True:
|
||||
_run_event_loop(ctx, game_threads, backoff, max_iterations)
|
||||
|
||||
|
||||
def _run_event_loop(
|
||||
ctx: BotContext,
|
||||
game_threads: dict[str, threading.Thread],
|
||||
backoff: int,
|
||||
max_iterations: int | None,
|
||||
) -> None:
|
||||
"""Run the main event loop.
|
||||
|
||||
Args:
|
||||
ctx: Bot context.
|
||||
game_threads: Dictionary of active game threads.
|
||||
backoff: Initial backoff value.
|
||||
max_iterations: Maximum iterations (None for infinite).
|
||||
"""
|
||||
iteration = 0
|
||||
while max_iterations is None or iteration < max_iterations:
|
||||
backoff = _safe_event_loop_iteration(ctx, game_threads, backoff)
|
||||
iteration += 1
|
||||
|
||||
|
||||
def main() -> None:
|
||||
|
||||
@ -18,6 +18,7 @@ from python_pkg.lichess_bot.main import (
|
||||
_apply_move_to_board,
|
||||
_attempt_move,
|
||||
_calculate_time_budget,
|
||||
_collect_analysis_lines,
|
||||
_extract_game_full_data,
|
||||
_extract_game_state_data,
|
||||
_extract_player_info,
|
||||
@ -33,8 +34,10 @@ from python_pkg.lichess_bot.main import (
|
||||
_process_analysis_output,
|
||||
_process_bot_event,
|
||||
_process_game_event,
|
||||
_process_game_events_loop,
|
||||
_rebuild_board_from_moves,
|
||||
_run_analysis_subprocess,
|
||||
_run_event_loop,
|
||||
_run_event_loop_iteration,
|
||||
_safe_event_loop_iteration,
|
||||
_stream_bot_events,
|
||||
@ -637,6 +640,17 @@ class TestProcessAnalysisOutput:
|
||||
assert result is not None
|
||||
assert "stderr" in result
|
||||
|
||||
def test_process_analysis_output_error_exit_no_stderr(self) -> None:
|
||||
"""Test processing analysis output with error exit but no stderr."""
|
||||
mock_proc = MagicMock()
|
||||
mock_proc.stdout = iter(["output\n"])
|
||||
mock_proc.stderr.read.return_value = ""
|
||||
mock_proc.wait.return_value = 1
|
||||
|
||||
result = _process_analysis_output(mock_proc, "game1", 1)
|
||||
assert result is not None
|
||||
assert "stderr" not in result
|
||||
|
||||
def test_process_analysis_output_none_pipes(self) -> None:
|
||||
"""Test processing analysis output with None pipes."""
|
||||
mock_proc = MagicMock()
|
||||
@ -647,6 +661,31 @@ class TestProcessAnalysisOutput:
|
||||
_process_analysis_output(mock_proc, "game1", 1)
|
||||
|
||||
|
||||
class TestCollectAnalysisLines:
|
||||
"""Tests for _collect_analysis_lines helper."""
|
||||
|
||||
def test_collect_analysis_lines_empty_iterator(self) -> None:
|
||||
"""Test collecting lines from empty iterator."""
|
||||
empty_iter: list[str] = []
|
||||
analyzed, lines = _collect_analysis_lines(iter(empty_iter), "game1", 10)
|
||||
assert analyzed == 0
|
||||
assert lines == []
|
||||
|
||||
def test_collect_analysis_lines_with_content(self) -> None:
|
||||
"""Test collecting lines from iterator with content."""
|
||||
content = [" 1 e4\n", " 2 e5\n", "not a ply line\n"]
|
||||
analyzed, lines = _collect_analysis_lines(iter(content), "game1", 3)
|
||||
assert analyzed == 2
|
||||
assert lines == content
|
||||
|
||||
def test_collect_analysis_lines_full_iteration(self) -> None:
|
||||
"""Test that all lines are collected."""
|
||||
content = ["line1\n", " 3 Nf3\n", "line3\n"]
|
||||
analyzed, lines = _collect_analysis_lines(iter(content), "game1", 1)
|
||||
assert analyzed == 1
|
||||
assert len(lines) == 3
|
||||
|
||||
|
||||
class TestLogAnalysisProgress:
|
||||
"""Tests for _log_analysis_progress."""
|
||||
|
||||
@ -836,6 +875,114 @@ class TestHandleGame:
|
||||
_handle_game("game1", ctx, None)
|
||||
|
||||
|
||||
class TestProcessGameEventsLoop:
|
||||
"""Tests for _process_game_events_loop."""
|
||||
|
||||
def test_empty_events_iterator(self) -> None:
|
||||
"""Test processing empty events iterator."""
|
||||
api = MagicMock()
|
||||
ctx = BotContext(api=api, engine=MagicMock(), bot_version=1)
|
||||
state = GameState(color="white")
|
||||
meta = GameMeta(game_id="game1", bot_version=1)
|
||||
|
||||
empty_iter: list[Event] = []
|
||||
# Should complete without error when iterator is empty
|
||||
_process_game_events_loop(iter(empty_iter), ctx, state, meta)
|
||||
|
||||
def test_processes_all_events(self) -> None:
|
||||
"""Test that all events are processed until break condition."""
|
||||
api = MagicMock()
|
||||
engine = MagicMock()
|
||||
engine.max_time_sec = 5.0
|
||||
engine.choose_move_with_explanation.return_value = (None, "no moves")
|
||||
ctx = BotContext(api=api, engine=engine, bot_version=1)
|
||||
state = GameState(color="white")
|
||||
meta = GameMeta(game_id="game1", bot_version=1)
|
||||
|
||||
events: list[Event] = [
|
||||
{"type": "chatLine", "text": "hello"}, # skipped
|
||||
{"type": "gameState", "moves": "e2e4", "status": "resign"}, # game end
|
||||
]
|
||||
_process_game_events_loop(iter(events), ctx, state, meta)
|
||||
|
||||
def test_processes_multiple_game_events(self) -> None:
|
||||
"""Test processing multiple game events that continue the game."""
|
||||
api = MagicMock()
|
||||
engine = MagicMock()
|
||||
engine.max_time_sec = 5.0
|
||||
engine.choose_move_with_explanation.return_value = (
|
||||
chess.Move.from_uci("e2e4"),
|
||||
"e4",
|
||||
)
|
||||
api.make_move.return_value = None
|
||||
ctx = BotContext(api=api, engine=engine, bot_version=1)
|
||||
state = GameState(color="white")
|
||||
state.board = chess.Board()
|
||||
meta = GameMeta(game_id="game1", bot_version=1)
|
||||
|
||||
events: list[Event] = [
|
||||
# First event - game state, game continues
|
||||
{"type": "gameState", "moves": "", "status": "started"},
|
||||
# Second event - opponent moves, game continues
|
||||
{"type": "gameState", "moves": "e2e4 e7e5", "status": "started"},
|
||||
# Third event - game ends
|
||||
{"type": "gameState", "moves": "e2e4 e7e5", "status": "mate"},
|
||||
]
|
||||
_process_game_events_loop(iter(events), ctx, state, meta)
|
||||
|
||||
|
||||
class TestRunEventLoop:
|
||||
"""Tests for _run_event_loop."""
|
||||
|
||||
def test_run_event_loop_zero_iterations(self) -> None:
|
||||
"""Test running event loop with zero iterations."""
|
||||
api = MagicMock()
|
||||
ctx = BotContext(api=api, engine=MagicMock(), bot_version=1)
|
||||
game_threads: GameThreads = {}
|
||||
|
||||
# Should complete immediately with 0 iterations
|
||||
_run_event_loop(ctx, game_threads, 0, 0)
|
||||
|
||||
def test_run_event_loop_limited_iterations(self) -> None:
|
||||
"""Test running event loop with limited iterations."""
|
||||
api = MagicMock()
|
||||
api.stream_bot_events.return_value = iter([])
|
||||
ctx = BotContext(api=api, engine=MagicMock(), bot_version=1)
|
||||
game_threads: GameThreads = {}
|
||||
|
||||
with patch(
|
||||
"python_pkg.lichess_bot.main._safe_event_loop_iteration", return_value=0
|
||||
) as mock_iter:
|
||||
_run_event_loop(ctx, game_threads, 0, 3)
|
||||
assert mock_iter.call_count == 3
|
||||
|
||||
def test_run_event_loop_none_iterations_needs_interrupt(self) -> None:
|
||||
"""Test that None iterations runs until interrupted."""
|
||||
api = MagicMock()
|
||||
ctx = BotContext(api=api, engine=MagicMock(), bot_version=1)
|
||||
game_threads: GameThreads = {}
|
||||
|
||||
call_count = 0
|
||||
|
||||
def stop_after_calls(*_args: object, **_kwargs: object) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count >= 5:
|
||||
raise KeyboardInterrupt
|
||||
return 0
|
||||
|
||||
with (
|
||||
patch(
|
||||
"python_pkg.lichess_bot.main._safe_event_loop_iteration",
|
||||
side_effect=stop_after_calls,
|
||||
),
|
||||
pytest.raises(KeyboardInterrupt),
|
||||
):
|
||||
_run_event_loop(ctx, game_threads, 0, None)
|
||||
|
||||
assert call_count == 5
|
||||
|
||||
|
||||
class TestHandleChallenge:
|
||||
"""Tests for _handle_challenge."""
|
||||
|
||||
@ -956,6 +1103,17 @@ class TestProcessBotEvent:
|
||||
_process_bot_event(event, ctx, game_threads)
|
||||
mock_logger.info.assert_called()
|
||||
|
||||
def test_process_game_finish_invalid_data(self) -> None:
|
||||
"""Test processing gameFinish event with non-dict game data."""
|
||||
api = MagicMock()
|
||||
ctx = BotContext(api=api, engine=MagicMock(), bot_version=1)
|
||||
game_threads: GameThreads = {}
|
||||
event: Event = {"type": "gameFinish", "game": "not_a_dict"}
|
||||
with patch("python_pkg.lichess_bot.main._logger") as mock_logger:
|
||||
_process_bot_event(event, ctx, game_threads)
|
||||
# Should not log info since game data is invalid
|
||||
mock_logger.info.assert_not_called()
|
||||
|
||||
def test_process_unknown_event(self) -> None:
|
||||
"""Test processing unknown event."""
|
||||
api = MagicMock()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user