diff --git a/python_pkg/lichess_bot/main.py b/python_pkg/lichess_bot/main.py index 307edc9..bb218d7 100644 --- a/python_pkg/lichess_bot/main.py +++ b/python_pkg/lichess_bot/main.py @@ -406,21 +406,13 @@ def _process_analysis_output( proc: subprocess.Popen[str], game_id: str, total_plies: int ) -> str | None: """Process analysis subprocess output and return analysis text.""" - analyzed = 0 - lines: list[str] = [] - # stdout/stderr are guaranteed non-None with PIPE, but verify at runtime if proc.stdout is None or proc.stderr is None: proc.terminate() msg = "subprocess pipes unexpectedly None" raise RuntimeError(msg) - for line in proc.stdout: - lines.append(line) - m = _PLY_LINE_RE.match(line) - if m: - analyzed += 1 - _log_analysis_progress(game_id, analyzed, total_plies) + __analyzed, lines = _collect_analysis_lines(proc.stdout, game_id, total_plies) stderr_text = proc.stderr.read() or "" ret = proc.wait() @@ -435,6 +427,29 @@ def _process_analysis_output( return analysis_text +def _collect_analysis_lines( + stdout: Iterator[str], game_id: str, total_plies: int +) -> tuple[int, list[str]]: + """Collect and process analysis lines from stdout. + + Returns: + Tuple of (analyzed_count, lines_list). + """ + analyzed = 0 + lines: list[str] = [] + while True: + try: + line = next(stdout) + except StopIteration: + break + lines.append(line) + m = _PLY_LINE_RE.match(line) + if m: + analyzed += 1 + _log_analysis_progress(game_id, analyzed, total_plies) + return analyzed, lines + + def _log_analysis_progress(game_id: str, analyzed: int, total_plies: int) -> None: """Log analysis progress.""" if total_plies: @@ -518,6 +533,28 @@ def _finalize_game(state: GameState, meta: GameMeta) -> None: _logger.debug("Game %s: analysis run failed: %s", meta.game_id, e) +def _process_game_events_loop( + events: Iterator[dict[str, object]], + ctx: BotContext, + state: GameState, + meta: GameMeta, +) -> None: + """Process game events from an iterator until game ends. + + This is extracted to allow testing the loop exhaustion branch. + """ + while True: + try: + event = next(events) + except StopIteration: + break + et = event.get("type") + if et in ("chatLine", "opponentGone"): + continue + if not _process_game_event(event, ctx, state, meta): + break + + def _handle_game(game_id: str, ctx: BotContext, my_color: str | None = None) -> None: """Handle a single game from start to finish.""" _logger.info("Starting game thread for %s [bot v%s]", game_id, ctx.bot_version) @@ -527,12 +564,8 @@ def _handle_game(game_id: str, ctx: BotContext, my_color: str | None = None) -> state.log_path = _init_game_log(game_id, ctx.bot_version) try: - for event in ctx.api.stream_game_events(game_id): - et = event.get("type") - if et in ("chatLine", "opponentGone"): - continue - if not _process_game_event(event, ctx, state, meta): - break + events = ctx.api.stream_game_events(game_id) + _process_game_events_loop(events, ctx, state, meta) except requests.RequestException: _logger.exception("Game %s thread error", game_id) finally: @@ -642,8 +675,20 @@ def _safe_event_loop_iteration( return backoff_sleep(backoff) -def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> None: - """Start the bot and listen for incoming events.""" +def run_bot( + log_level: str = "INFO", + *, + decline_correspondence: bool = False, + max_iterations: int | None = None, +) -> None: + """Start the bot and listen for incoming events. + + Args: + log_level: Logging level (default: INFO). + decline_correspondence: Whether to decline correspondence challenges. + max_iterations: Maximum event loop iterations (None for infinite). + Used for testing. + """ logging.basicConfig( level=getattr(logging, log_level.upper(), logging.INFO), format="[%(asctime)s] %(levelname)s %(threadName)s: %(message)s", @@ -670,8 +715,27 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> _logger.info("Connecting to Lichess event stream. Waiting for challenges...") backoff = 0 - while True: + _run_event_loop(ctx, game_threads, backoff, max_iterations) + + +def _run_event_loop( + ctx: BotContext, + game_threads: dict[str, threading.Thread], + backoff: int, + max_iterations: int | None, +) -> None: + """Run the main event loop. + + Args: + ctx: Bot context. + game_threads: Dictionary of active game threads. + backoff: Initial backoff value. + max_iterations: Maximum iterations (None for infinite). + """ + iteration = 0 + while max_iterations is None or iteration < max_iterations: backoff = _safe_event_loop_iteration(ctx, game_threads, backoff) + iteration += 1 def main() -> None: diff --git a/python_pkg/lichess_bot/tests/test_main.py b/python_pkg/lichess_bot/tests/test_main.py index 99b173f..f389290 100644 --- a/python_pkg/lichess_bot/tests/test_main.py +++ b/python_pkg/lichess_bot/tests/test_main.py @@ -18,6 +18,7 @@ from python_pkg.lichess_bot.main import ( _apply_move_to_board, _attempt_move, _calculate_time_budget, + _collect_analysis_lines, _extract_game_full_data, _extract_game_state_data, _extract_player_info, @@ -33,8 +34,10 @@ from python_pkg.lichess_bot.main import ( _process_analysis_output, _process_bot_event, _process_game_event, + _process_game_events_loop, _rebuild_board_from_moves, _run_analysis_subprocess, + _run_event_loop, _run_event_loop_iteration, _safe_event_loop_iteration, _stream_bot_events, @@ -637,6 +640,17 @@ class TestProcessAnalysisOutput: assert result is not None assert "stderr" in result + def test_process_analysis_output_error_exit_no_stderr(self) -> None: + """Test processing analysis output with error exit but no stderr.""" + mock_proc = MagicMock() + mock_proc.stdout = iter(["output\n"]) + mock_proc.stderr.read.return_value = "" + mock_proc.wait.return_value = 1 + + result = _process_analysis_output(mock_proc, "game1", 1) + assert result is not None + assert "stderr" not in result + def test_process_analysis_output_none_pipes(self) -> None: """Test processing analysis output with None pipes.""" mock_proc = MagicMock() @@ -647,6 +661,31 @@ class TestProcessAnalysisOutput: _process_analysis_output(mock_proc, "game1", 1) +class TestCollectAnalysisLines: + """Tests for _collect_analysis_lines helper.""" + + def test_collect_analysis_lines_empty_iterator(self) -> None: + """Test collecting lines from empty iterator.""" + empty_iter: list[str] = [] + analyzed, lines = _collect_analysis_lines(iter(empty_iter), "game1", 10) + assert analyzed == 0 + assert lines == [] + + def test_collect_analysis_lines_with_content(self) -> None: + """Test collecting lines from iterator with content.""" + content = [" 1 e4\n", " 2 e5\n", "not a ply line\n"] + analyzed, lines = _collect_analysis_lines(iter(content), "game1", 3) + assert analyzed == 2 + assert lines == content + + def test_collect_analysis_lines_full_iteration(self) -> None: + """Test that all lines are collected.""" + content = ["line1\n", " 3 Nf3\n", "line3\n"] + analyzed, lines = _collect_analysis_lines(iter(content), "game1", 1) + assert analyzed == 1 + assert len(lines) == 3 + + class TestLogAnalysisProgress: """Tests for _log_analysis_progress.""" @@ -836,6 +875,114 @@ class TestHandleGame: _handle_game("game1", ctx, None) +class TestProcessGameEventsLoop: + """Tests for _process_game_events_loop.""" + + def test_empty_events_iterator(self) -> None: + """Test processing empty events iterator.""" + api = MagicMock() + ctx = BotContext(api=api, engine=MagicMock(), bot_version=1) + state = GameState(color="white") + meta = GameMeta(game_id="game1", bot_version=1) + + empty_iter: list[Event] = [] + # Should complete without error when iterator is empty + _process_game_events_loop(iter(empty_iter), ctx, state, meta) + + def test_processes_all_events(self) -> None: + """Test that all events are processed until break condition.""" + api = MagicMock() + engine = MagicMock() + engine.max_time_sec = 5.0 + engine.choose_move_with_explanation.return_value = (None, "no moves") + ctx = BotContext(api=api, engine=engine, bot_version=1) + state = GameState(color="white") + meta = GameMeta(game_id="game1", bot_version=1) + + events: list[Event] = [ + {"type": "chatLine", "text": "hello"}, # skipped + {"type": "gameState", "moves": "e2e4", "status": "resign"}, # game end + ] + _process_game_events_loop(iter(events), ctx, state, meta) + + def test_processes_multiple_game_events(self) -> None: + """Test processing multiple game events that continue the game.""" + api = MagicMock() + engine = MagicMock() + engine.max_time_sec = 5.0 + engine.choose_move_with_explanation.return_value = ( + chess.Move.from_uci("e2e4"), + "e4", + ) + api.make_move.return_value = None + ctx = BotContext(api=api, engine=engine, bot_version=1) + state = GameState(color="white") + state.board = chess.Board() + meta = GameMeta(game_id="game1", bot_version=1) + + events: list[Event] = [ + # First event - game state, game continues + {"type": "gameState", "moves": "", "status": "started"}, + # Second event - opponent moves, game continues + {"type": "gameState", "moves": "e2e4 e7e5", "status": "started"}, + # Third event - game ends + {"type": "gameState", "moves": "e2e4 e7e5", "status": "mate"}, + ] + _process_game_events_loop(iter(events), ctx, state, meta) + + +class TestRunEventLoop: + """Tests for _run_event_loop.""" + + def test_run_event_loop_zero_iterations(self) -> None: + """Test running event loop with zero iterations.""" + api = MagicMock() + ctx = BotContext(api=api, engine=MagicMock(), bot_version=1) + game_threads: GameThreads = {} + + # Should complete immediately with 0 iterations + _run_event_loop(ctx, game_threads, 0, 0) + + def test_run_event_loop_limited_iterations(self) -> None: + """Test running event loop with limited iterations.""" + api = MagicMock() + api.stream_bot_events.return_value = iter([]) + ctx = BotContext(api=api, engine=MagicMock(), bot_version=1) + game_threads: GameThreads = {} + + with patch( + "python_pkg.lichess_bot.main._safe_event_loop_iteration", return_value=0 + ) as mock_iter: + _run_event_loop(ctx, game_threads, 0, 3) + assert mock_iter.call_count == 3 + + def test_run_event_loop_none_iterations_needs_interrupt(self) -> None: + """Test that None iterations runs until interrupted.""" + api = MagicMock() + ctx = BotContext(api=api, engine=MagicMock(), bot_version=1) + game_threads: GameThreads = {} + + call_count = 0 + + def stop_after_calls(*_args: object, **_kwargs: object) -> int: + nonlocal call_count + call_count += 1 + if call_count >= 5: + raise KeyboardInterrupt + return 0 + + with ( + patch( + "python_pkg.lichess_bot.main._safe_event_loop_iteration", + side_effect=stop_after_calls, + ), + pytest.raises(KeyboardInterrupt), + ): + _run_event_loop(ctx, game_threads, 0, None) + + assert call_count == 5 + + class TestHandleChallenge: """Tests for _handle_challenge.""" @@ -956,6 +1103,17 @@ class TestProcessBotEvent: _process_bot_event(event, ctx, game_threads) mock_logger.info.assert_called() + def test_process_game_finish_invalid_data(self) -> None: + """Test processing gameFinish event with non-dict game data.""" + api = MagicMock() + ctx = BotContext(api=api, engine=MagicMock(), bot_version=1) + game_threads: GameThreads = {} + event: Event = {"type": "gameFinish", "game": "not_a_dict"} + with patch("python_pkg.lichess_bot.main._logger") as mock_logger: + _process_bot_event(event, ctx, game_threads) + # Should not log info since game data is invalid + mock_logger.info.assert_not_called() + def test_process_unknown_event(self) -> None: """Test processing unknown event.""" api = MagicMock()