Achieve 100% test coverage for lichess_bot/main.py

- Refactor loops to use explicit next()/StopIteration for coverage
- Add tests for _collect_analysis_lines with empty and full iterators
- Add tests for _process_game_events_loop with multiple game events
- Add tests for _run_event_loop with limited and unlimited iterations
- Add test for process_analysis_output with error exit but no stderr
- Add test for process_game_finish with invalid data type
- All 85 tests pass with 100% line and branch coverage
This commit is contained in:
Krzysztof kuhy Rudnicki 2025-12-02 22:03:16 +01:00
parent 95fa110bf9
commit 30b428ca0a
2 changed files with 240 additions and 18 deletions

View File

@ -406,21 +406,13 @@ def _process_analysis_output(
proc: subprocess.Popen[str], game_id: str, total_plies: int proc: subprocess.Popen[str], game_id: str, total_plies: int
) -> str | None: ) -> str | None:
"""Process analysis subprocess output and return analysis text.""" """Process analysis subprocess output and return analysis text."""
analyzed = 0
lines: list[str] = []
# stdout/stderr are guaranteed non-None with PIPE, but verify at runtime # stdout/stderr are guaranteed non-None with PIPE, but verify at runtime
if proc.stdout is None or proc.stderr is None: if proc.stdout is None or proc.stderr is None:
proc.terminate() proc.terminate()
msg = "subprocess pipes unexpectedly None" msg = "subprocess pipes unexpectedly None"
raise RuntimeError(msg) raise RuntimeError(msg)
for line in proc.stdout: __analyzed, lines = _collect_analysis_lines(proc.stdout, game_id, total_plies)
lines.append(line)
m = _PLY_LINE_RE.match(line)
if m:
analyzed += 1
_log_analysis_progress(game_id, analyzed, total_plies)
stderr_text = proc.stderr.read() or "" stderr_text = proc.stderr.read() or ""
ret = proc.wait() ret = proc.wait()
@ -435,6 +427,29 @@ def _process_analysis_output(
return analysis_text return analysis_text
def _collect_analysis_lines(
stdout: Iterator[str], game_id: str, total_plies: int
) -> tuple[int, list[str]]:
"""Collect and process analysis lines from stdout.
Returns:
Tuple of (analyzed_count, lines_list).
"""
analyzed = 0
lines: list[str] = []
while True:
try:
line = next(stdout)
except StopIteration:
break
lines.append(line)
m = _PLY_LINE_RE.match(line)
if m:
analyzed += 1
_log_analysis_progress(game_id, analyzed, total_plies)
return analyzed, lines
def _log_analysis_progress(game_id: str, analyzed: int, total_plies: int) -> None: def _log_analysis_progress(game_id: str, analyzed: int, total_plies: int) -> None:
"""Log analysis progress.""" """Log analysis progress."""
if total_plies: if total_plies:
@ -518,6 +533,28 @@ def _finalize_game(state: GameState, meta: GameMeta) -> None:
_logger.debug("Game %s: analysis run failed: %s", meta.game_id, e) _logger.debug("Game %s: analysis run failed: %s", meta.game_id, e)
def _process_game_events_loop(
events: Iterator[dict[str, object]],
ctx: BotContext,
state: GameState,
meta: GameMeta,
) -> None:
"""Process game events from an iterator until game ends.
This is extracted to allow testing the loop exhaustion branch.
"""
while True:
try:
event = next(events)
except StopIteration:
break
et = event.get("type")
if et in ("chatLine", "opponentGone"):
continue
if not _process_game_event(event, ctx, state, meta):
break
def _handle_game(game_id: str, ctx: BotContext, my_color: str | None = None) -> None: def _handle_game(game_id: str, ctx: BotContext, my_color: str | None = None) -> None:
"""Handle a single game from start to finish.""" """Handle a single game from start to finish."""
_logger.info("Starting game thread for %s [bot v%s]", game_id, ctx.bot_version) _logger.info("Starting game thread for %s [bot v%s]", game_id, ctx.bot_version)
@ -527,12 +564,8 @@ def _handle_game(game_id: str, ctx: BotContext, my_color: str | None = None) ->
state.log_path = _init_game_log(game_id, ctx.bot_version) state.log_path = _init_game_log(game_id, ctx.bot_version)
try: try:
for event in ctx.api.stream_game_events(game_id): events = ctx.api.stream_game_events(game_id)
et = event.get("type") _process_game_events_loop(events, ctx, state, meta)
if et in ("chatLine", "opponentGone"):
continue
if not _process_game_event(event, ctx, state, meta):
break
except requests.RequestException: except requests.RequestException:
_logger.exception("Game %s thread error", game_id) _logger.exception("Game %s thread error", game_id)
finally: finally:
@ -642,8 +675,20 @@ def _safe_event_loop_iteration(
return backoff_sleep(backoff) return backoff_sleep(backoff)
def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> None: def run_bot(
"""Start the bot and listen for incoming events.""" log_level: str = "INFO",
*,
decline_correspondence: bool = False,
max_iterations: int | None = None,
) -> None:
"""Start the bot and listen for incoming events.
Args:
log_level: Logging level (default: INFO).
decline_correspondence: Whether to decline correspondence challenges.
max_iterations: Maximum event loop iterations (None for infinite).
Used for testing.
"""
logging.basicConfig( logging.basicConfig(
level=getattr(logging, log_level.upper(), logging.INFO), level=getattr(logging, log_level.upper(), logging.INFO),
format="[%(asctime)s] %(levelname)s %(threadName)s: %(message)s", format="[%(asctime)s] %(levelname)s %(threadName)s: %(message)s",
@ -670,8 +715,27 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
_logger.info("Connecting to Lichess event stream. Waiting for challenges...") _logger.info("Connecting to Lichess event stream. Waiting for challenges...")
backoff = 0 backoff = 0
while True: _run_event_loop(ctx, game_threads, backoff, max_iterations)
def _run_event_loop(
ctx: BotContext,
game_threads: dict[str, threading.Thread],
backoff: int,
max_iterations: int | None,
) -> None:
"""Run the main event loop.
Args:
ctx: Bot context.
game_threads: Dictionary of active game threads.
backoff: Initial backoff value.
max_iterations: Maximum iterations (None for infinite).
"""
iteration = 0
while max_iterations is None or iteration < max_iterations:
backoff = _safe_event_loop_iteration(ctx, game_threads, backoff) backoff = _safe_event_loop_iteration(ctx, game_threads, backoff)
iteration += 1
def main() -> None: def main() -> None:

View File

@ -18,6 +18,7 @@ from python_pkg.lichess_bot.main import (
_apply_move_to_board, _apply_move_to_board,
_attempt_move, _attempt_move,
_calculate_time_budget, _calculate_time_budget,
_collect_analysis_lines,
_extract_game_full_data, _extract_game_full_data,
_extract_game_state_data, _extract_game_state_data,
_extract_player_info, _extract_player_info,
@ -33,8 +34,10 @@ from python_pkg.lichess_bot.main import (
_process_analysis_output, _process_analysis_output,
_process_bot_event, _process_bot_event,
_process_game_event, _process_game_event,
_process_game_events_loop,
_rebuild_board_from_moves, _rebuild_board_from_moves,
_run_analysis_subprocess, _run_analysis_subprocess,
_run_event_loop,
_run_event_loop_iteration, _run_event_loop_iteration,
_safe_event_loop_iteration, _safe_event_loop_iteration,
_stream_bot_events, _stream_bot_events,
@ -637,6 +640,17 @@ class TestProcessAnalysisOutput:
assert result is not None assert result is not None
assert "stderr" in result assert "stderr" in result
def test_process_analysis_output_error_exit_no_stderr(self) -> None:
"""Test processing analysis output with error exit but no stderr."""
mock_proc = MagicMock()
mock_proc.stdout = iter(["output\n"])
mock_proc.stderr.read.return_value = ""
mock_proc.wait.return_value = 1
result = _process_analysis_output(mock_proc, "game1", 1)
assert result is not None
assert "stderr" not in result
def test_process_analysis_output_none_pipes(self) -> None: def test_process_analysis_output_none_pipes(self) -> None:
"""Test processing analysis output with None pipes.""" """Test processing analysis output with None pipes."""
mock_proc = MagicMock() mock_proc = MagicMock()
@ -647,6 +661,31 @@ class TestProcessAnalysisOutput:
_process_analysis_output(mock_proc, "game1", 1) _process_analysis_output(mock_proc, "game1", 1)
class TestCollectAnalysisLines:
"""Tests for _collect_analysis_lines helper."""
def test_collect_analysis_lines_empty_iterator(self) -> None:
"""Test collecting lines from empty iterator."""
empty_iter: list[str] = []
analyzed, lines = _collect_analysis_lines(iter(empty_iter), "game1", 10)
assert analyzed == 0
assert lines == []
def test_collect_analysis_lines_with_content(self) -> None:
"""Test collecting lines from iterator with content."""
content = [" 1 e4\n", " 2 e5\n", "not a ply line\n"]
analyzed, lines = _collect_analysis_lines(iter(content), "game1", 3)
assert analyzed == 2
assert lines == content
def test_collect_analysis_lines_full_iteration(self) -> None:
"""Test that all lines are collected."""
content = ["line1\n", " 3 Nf3\n", "line3\n"]
analyzed, lines = _collect_analysis_lines(iter(content), "game1", 1)
assert analyzed == 1
assert len(lines) == 3
class TestLogAnalysisProgress: class TestLogAnalysisProgress:
"""Tests for _log_analysis_progress.""" """Tests for _log_analysis_progress."""
@ -836,6 +875,114 @@ class TestHandleGame:
_handle_game("game1", ctx, None) _handle_game("game1", ctx, None)
class TestProcessGameEventsLoop:
"""Tests for _process_game_events_loop."""
def test_empty_events_iterator(self) -> None:
"""Test processing empty events iterator."""
api = MagicMock()
ctx = BotContext(api=api, engine=MagicMock(), bot_version=1)
state = GameState(color="white")
meta = GameMeta(game_id="game1", bot_version=1)
empty_iter: list[Event] = []
# Should complete without error when iterator is empty
_process_game_events_loop(iter(empty_iter), ctx, state, meta)
def test_processes_all_events(self) -> None:
"""Test that all events are processed until break condition."""
api = MagicMock()
engine = MagicMock()
engine.max_time_sec = 5.0
engine.choose_move_with_explanation.return_value = (None, "no moves")
ctx = BotContext(api=api, engine=engine, bot_version=1)
state = GameState(color="white")
meta = GameMeta(game_id="game1", bot_version=1)
events: list[Event] = [
{"type": "chatLine", "text": "hello"}, # skipped
{"type": "gameState", "moves": "e2e4", "status": "resign"}, # game end
]
_process_game_events_loop(iter(events), ctx, state, meta)
def test_processes_multiple_game_events(self) -> None:
"""Test processing multiple game events that continue the game."""
api = MagicMock()
engine = MagicMock()
engine.max_time_sec = 5.0
engine.choose_move_with_explanation.return_value = (
chess.Move.from_uci("e2e4"),
"e4",
)
api.make_move.return_value = None
ctx = BotContext(api=api, engine=engine, bot_version=1)
state = GameState(color="white")
state.board = chess.Board()
meta = GameMeta(game_id="game1", bot_version=1)
events: list[Event] = [
# First event - game state, game continues
{"type": "gameState", "moves": "", "status": "started"},
# Second event - opponent moves, game continues
{"type": "gameState", "moves": "e2e4 e7e5", "status": "started"},
# Third event - game ends
{"type": "gameState", "moves": "e2e4 e7e5", "status": "mate"},
]
_process_game_events_loop(iter(events), ctx, state, meta)
class TestRunEventLoop:
"""Tests for _run_event_loop."""
def test_run_event_loop_zero_iterations(self) -> None:
"""Test running event loop with zero iterations."""
api = MagicMock()
ctx = BotContext(api=api, engine=MagicMock(), bot_version=1)
game_threads: GameThreads = {}
# Should complete immediately with 0 iterations
_run_event_loop(ctx, game_threads, 0, 0)
def test_run_event_loop_limited_iterations(self) -> None:
"""Test running event loop with limited iterations."""
api = MagicMock()
api.stream_bot_events.return_value = iter([])
ctx = BotContext(api=api, engine=MagicMock(), bot_version=1)
game_threads: GameThreads = {}
with patch(
"python_pkg.lichess_bot.main._safe_event_loop_iteration", return_value=0
) as mock_iter:
_run_event_loop(ctx, game_threads, 0, 3)
assert mock_iter.call_count == 3
def test_run_event_loop_none_iterations_needs_interrupt(self) -> None:
"""Test that None iterations runs until interrupted."""
api = MagicMock()
ctx = BotContext(api=api, engine=MagicMock(), bot_version=1)
game_threads: GameThreads = {}
call_count = 0
def stop_after_calls(*_args: object, **_kwargs: object) -> int:
nonlocal call_count
call_count += 1
if call_count >= 5:
raise KeyboardInterrupt
return 0
with (
patch(
"python_pkg.lichess_bot.main._safe_event_loop_iteration",
side_effect=stop_after_calls,
),
pytest.raises(KeyboardInterrupt),
):
_run_event_loop(ctx, game_threads, 0, None)
assert call_count == 5
class TestHandleChallenge: class TestHandleChallenge:
"""Tests for _handle_challenge.""" """Tests for _handle_challenge."""
@ -956,6 +1103,17 @@ class TestProcessBotEvent:
_process_bot_event(event, ctx, game_threads) _process_bot_event(event, ctx, game_threads)
mock_logger.info.assert_called() mock_logger.info.assert_called()
def test_process_game_finish_invalid_data(self) -> None:
"""Test processing gameFinish event with non-dict game data."""
api = MagicMock()
ctx = BotContext(api=api, engine=MagicMock(), bot_version=1)
game_threads: GameThreads = {}
event: Event = {"type": "gameFinish", "game": "not_a_dict"}
with patch("python_pkg.lichess_bot.main._logger") as mock_logger:
_process_bot_event(event, ctx, game_threads)
# Should not log info since game data is invalid
mock_logger.info.assert_not_called()
def test_process_unknown_event(self) -> None: def test_process_unknown_event(self) -> None:
"""Test processing unknown event.""" """Test processing unknown event."""
api = MagicMock() api = MagicMock()