From 52476d1b15ee52deb876ad5363a87103e3452647 Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Sun, 30 Nov 2025 23:57:49 +0100 Subject: [PATCH] fix(lint): fix G004 and PTH123 violations across codebase - Convert f-string logging to % style (G004) - Convert open() to Path.open() (PTH123) - Remove G004 and PTH123 from global ignores in pyproject.toml --- python_pkg/download_cats/generate_cats.py | 2 +- python_pkg/extract_links/main.py | 4 +- python_pkg/keyboard_coop/main.py | 2 +- python_pkg/lichess_bot/main.py | 92 +++++++++++-------- python_pkg/lichess_bot/tests/test_puzzles.py | 2 +- .../lichess_bot/tests/test_versioning.py | 2 +- .../tools/generate_blunder_tests.py | 39 ++++---- python_pkg/lichess_bot/utils.py | 6 +- python_pkg/scrape_website/scrape_comics.py | 2 +- .../stockfish_analysis/analyze_chess_game.py | 31 ++++--- 10 files changed, 106 insertions(+), 76 deletions(-) diff --git a/python_pkg/download_cats/generate_cats.py b/python_pkg/download_cats/generate_cats.py index b5ec6f9..5ed2788 100644 --- a/python_pkg/download_cats/generate_cats.py +++ b/python_pkg/download_cats/generate_cats.py @@ -31,7 +31,7 @@ def _download_single_image(url: str) -> None: image_path = Path("./CATS2/") / image_name # Save the image to the directory - with open(image_path, "wb") as file: + with image_path.open("wb") as file: file.write(response.content) _logger.info("Saved %s as %s", url, image_path) diff --git a/python_pkg/extract_links/main.py b/python_pkg/extract_links/main.py index 22cc768..d5a87ba 100755 --- a/python_pkg/extract_links/main.py +++ b/python_pkg/extract_links/main.py @@ -83,12 +83,12 @@ def main() -> int: else: out_path = Path(out_path) - with open(input_path, encoding="utf-8", errors="ignore") as f: + with input_path.open(encoding="utf-8", errors="ignore") as f: html_text = f.read() hosts = extract_hosts_from_html(html_text) - with open(out_path, "w", encoding="utf-8") as f: + with out_path.open("w", encoding="utf-8") as f: f.writelines(f"*{host}*\n" for host in hosts) _logger.info("Wrote %s host(s) to %s", len(hosts), out_path) diff --git a/python_pkg/keyboard_coop/main.py b/python_pkg/keyboard_coop/main.py index 5022ad7..fbe1d3d 100644 --- a/python_pkg/keyboard_coop/main.py +++ b/python_pkg/keyboard_coop/main.py @@ -103,7 +103,7 @@ class KeyboardCoopGame: """Load dictionary from words_dictionary.json file.""" try: dictionary_path = Path(__file__).parent / "words_dictionary.json" - with open(dictionary_path, encoding="utf-8") as f: + with dictionary_path.open(encoding="utf-8") as f: dictionary_data = json.load(f) # Convert to set for faster lookup (we only need the keys) return set(dictionary_data.keys()) diff --git a/python_pkg/lichess_bot/main.py b/python_pkg/lichess_bot/main.py index 53b9525..25218ec 100644 --- a/python_pkg/lichess_bot/main.py +++ b/python_pkg/lichess_bot/main.py @@ -76,14 +76,14 @@ def _apply_move_to_board(board: chess.Board, move: str, game_id: str) -> None: try: board.push_uci(move) except ValueError: - _logger.debug(f"Game {game_id}: could not apply move {move}") + _logger.debug("Game %s: could not apply move %s", game_id, move) def _init_game_log(game_id: str, bot_version: int) -> Path | None: """Initialize the game log file.""" game_log_path = Path.cwd() / f"lichess_bot_game_{game_id}.log" try: - with open(game_log_path, "w") as lf: + with game_log_path.open("w") as lf: lf.write(f"game {game_id} started\n") lf.write(f"bot_version v{bot_version}\n") except OSError: @@ -192,7 +192,7 @@ def _log_move_to_file( ) -> None: """Log a move to the game log file.""" if log_path: - with open(log_path, "a") as lf: + with log_path.open("a") as lf: lf.write(f"ply {ply}: {move.uci()}\n{reason}\n\n") @@ -209,7 +209,7 @@ def _attempt_move( ) if move is None: - _logger.info(f"Game {meta.game_id}: no legal moves (game likely over)") + _logger.info("Game %s: no legal moves (game likely over)", meta.game_id) return False time_left_sec = (state.my_ms or 0) / 1000.0 @@ -218,18 +218,21 @@ def _attempt_move( try: if move not in board.legal_moves: _logger.info( - f"Game {meta.game_id}: selected move no longer legal; skipping send" + "Game %s: selected move no longer legal; skipping send", meta.game_id ) else: _logger.info( - f"Game {meta.game_id}: playing {move.uci()} " - f"(budget={budget:.2f}s, my_time_left={time_left_sec:.1f}s, " - f"inc={inc_sec:.2f}s)" + "Game %s: playing %s (budget=%.2fs, my_time_left=%.1fs, inc=%.2fs)", + meta.game_id, + move.uci(), + budget, + time_left_sec, + inc_sec, ) _log_move_to_file(state.log_path, state.last_handled_len + 1, move, reason) ctx.api.make_move(meta.game_id, move) except requests.RequestException as e: - _logger.warning(f"Game {meta.game_id}: move {move.uci()} failed: {e}") + _logger.warning("Game %s: move %s failed: %s", meta.game_id, move.uci(), e) return True @@ -260,7 +263,7 @@ def _handle_move_if_needed( """Handle making a move if it's our turn. Returns False if game ends.""" my_turn = _is_my_turn(state.board, state.color) turn_str = "white" if state.board.turn else "black" - _logger.info(f"Game {meta.game_id}: turn={turn_str}, my_turn={my_turn}") + _logger.info("Game %s: turn=%s, my_turn=%s", meta.game_id, turn_str, my_turn) # Move policy allow_move = (et == "gameState") or (et == "gameFull" and new_len == 0) @@ -293,7 +296,7 @@ def _process_game_event( # Extract moves and status based on event type if event_type == "gameFull": moves, status = _extract_game_full_data(event, state, meta, ctx.api) - _logger.info(f"Game {meta.game_id}: joined as {state.color} (gameFull)") + _logger.info("Game %s: joined as %s (gameFull)", meta.game_id, state.color) else: moves, status = _extract_game_state_data(event, state) @@ -301,12 +304,16 @@ def _process_game_event( new_len = len(moves_list) _logger.info( - f"Game {meta.game_id}: event={event_type}, moves={new_len}, color={state.color}" + "Game %s: event=%s, moves=%s, color=%s", + meta.game_id, + event_type, + new_len, + state.color, ) if new_len == state.last_handled_len: _logger.debug( - f"Game {meta.game_id}: position unchanged (len={new_len}), skipping" + "Game %s: position unchanged (len=%s), skipping", meta.game_id, new_len ) return True @@ -314,7 +321,7 @@ def _process_game_event( state.board = _rebuild_board_from_moves(moves_list, meta.game_id) if state.color is None: - _logger.info(f"Game {meta.game_id}: color unknown yet; waiting for gameFull") + _logger.info("Game %s: color unknown yet; waiting for gameFull", meta.game_id) if event_type == "gameState": state.last_handled_len = new_len return True @@ -324,7 +331,7 @@ def _process_game_event( # Check for game end if status in _GAME_END_STATUSES: - _logger.info(f"Game {meta.game_id} finished: {status}") + _logger.info("Game %s finished: %s", meta.game_id, status) return False return True @@ -344,7 +351,7 @@ def _write_pgn_to_log(log_path: Path, board: chess.Board, meta: GameMeta) -> Non if meta.black_name: game.headers["Black"] = meta.black_name - with open(log_path, "a") as lf: + with log_path.open("a") as lf: lf.write("\nPGN:\n") exporter = chess.pgn.StringExporter( headers=True, variations=False, comments=False @@ -365,12 +372,15 @@ def _run_analysis_subprocess( if not analyze_script.is_file(): _logger.info( - f"Game {game_id}: analysis script not found at {analyze_script}; " - "skipping analysis" + "Game %s: analysis script not found at %s; skipping analysis", + game_id, + analyze_script, ) return None - _logger.info(f"Game {game_id}: starting post-game analysis ({total_plies} plies)") + _logger.info( + "Game %s: starting post-game analysis (%s plies)", game_id, total_plies + ) proc = subprocess.Popen( # noqa: S603 - trusted internal analysis script [sys.executable, "-u", str(analyze_script), str(log_path)], @@ -399,11 +409,11 @@ def _run_analysis_subprocess( analysis_text = "".join(lines) if ret != 0: - _logger.warning(f"Game {game_id}: analysis script exited with code {ret}") + _logger.warning("Game %s: analysis script exited with code %s", game_id, ret) if stderr_text: analysis_text += "\n[stderr]\n" + stderr_text - _logger.info(f"Game {game_id}: analysis complete") + _logger.info("Game %s: analysis complete", game_id) return analysis_text @@ -413,12 +423,16 @@ def _log_analysis_progress(game_id: str, analyzed: int, total_plies: int) -> Non left = max(0, total_plies - analyzed) pct = analyzed / total_plies * 100.0 _logger.info( - f"Game {game_id}: analysis progress " - f"{analyzed}/{total_plies} ({pct:.0f}%), left {left}" + "Game %s: analysis progress %s/%s (%.0f%%), left %s", + game_id, + analyzed, + total_plies, + pct, + left, ) else: _logger.info( - f"Game {game_id}: analysis progress {analyzed} plies (total unknown)" + "Game %s: analysis progress %s plies (total unknown)", game_id, analyzed ) @@ -427,7 +441,7 @@ def _insert_analysis_into_log( ) -> None: """Insert analysis text into the log file before PGN section.""" try: - with open(log_path, encoding="utf-8", errors="replace") as f: + with log_path.open(encoding="utf-8", errors="replace") as f: content = f.read() # Find insertion point (before PGN) @@ -453,10 +467,10 @@ def _insert_analysis_into_log( analysis_block = f"{meta_block}ANALYSIS:\n{analysis_text.rstrip()}\n\n" new_content = content[:insert_idx] + analysis_block + content[insert_idx:] - with open(log_path, "w", encoding="utf-8") as f: + with log_path.open("w", encoding="utf-8") as f: f.write(new_content) except OSError as e: - _logger.debug(f"Game {meta.game_id}: could not write analysis to log: {e}") + _logger.debug("Game %s: could not write analysis to log: %s", meta.game_id, e) def _finalize_game(state: GameState, meta: GameMeta) -> None: @@ -467,7 +481,7 @@ def _finalize_game(state: GameState, meta: GameMeta) -> None: try: _write_pgn_to_log(state.log_path, state.board, meta) except OSError as e: - _logger.debug(f"Game {meta.game_id}: could not write PGN: {e}") + _logger.debug("Game %s: could not write PGN: %s", meta.game_id, e) return # Run analysis @@ -483,12 +497,12 @@ def _finalize_game(state: GameState, meta: GameMeta) -> None: if analysis_text: _insert_analysis_into_log(state.log_path, analysis_text, meta) except (subprocess.SubprocessError, OSError) as e: - _logger.debug(f"Game {meta.game_id}: analysis run failed: {e}") + _logger.debug("Game %s: analysis run failed: %s", meta.game_id, e) def _handle_game(game_id: str, ctx: BotContext, my_color: str | None = None) -> None: """Handle a single game from start to finish.""" - _logger.info(f"Starting game thread for {game_id} [bot v{ctx.bot_version}]") + _logger.info("Starting game thread for %s [bot v%s]", game_id, ctx.bot_version) meta = GameMeta(game_id=game_id, bot_version=ctx.bot_version) state = GameState(color=my_color) @@ -502,10 +516,10 @@ def _handle_game(game_id: str, ctx: BotContext, my_color: str | None = None) -> if not _process_game_event(event, ctx, state, meta): break except requests.RequestException: - _logger.exception(f"Game {game_id} thread error") + _logger.exception("Game %s thread error", game_id) finally: _finalize_game(state, meta) - _logger.info(f"Ending game thread for {game_id}") + _logger.info("Ending game thread for %s", game_id) def _handle_challenge( @@ -525,10 +539,12 @@ def _handle_challenge( not_corr = speed != "correspondence" or not decline_correspondence if variant == "standard" and perf_ok and not_corr: - _logger.info(f"Accepting challenge {ch_id} ({speed})") + _logger.info("Accepting challenge %s (%s)", ch_id, speed) api.accept_challenge(str(ch_id)) else: - _logger.info(f"Declining challenge {ch_id} (variant={variant}, speed={speed})") + _logger.info( + "Declining challenge %s (variant=%s, speed=%s)", ch_id, variant, speed + ) api.decline_challenge(str(ch_id)) @@ -567,10 +583,10 @@ def _process_bot_event( game_data = event.get("game", {}) if isinstance(game_data, dict): game_id = game_data.get("id", "") - _logger.info(f"Game finished event: {game_id}") + _logger.info("Game finished event: %s", game_id) else: - _logger.debug(f"Unhandled event: {json.dumps(event)}") + _logger.debug("Unhandled event: %s", json.dumps(event)) def _stream_bot_events(ctx: BotContext) -> Iterator[dict[str, object]]: @@ -605,7 +621,7 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> _logger.info("Token present. Initializing client and engine...") bot_version = get_and_increment_version() - _logger.info(f"Bot version: v{bot_version}") + _logger.info("Bot version: v%s", bot_version) ctx = BotContext( api=LichessAPI(token), @@ -623,7 +639,7 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> try: backoff = _run_event_loop_iteration(ctx, game_threads) except requests.RequestException as e: # noqa: PERF203 - intentional reconnection loop - _logger.warning(f"Event stream error: {e}") + _logger.warning("Event stream error: %s", e) backoff = backoff_sleep(backoff) diff --git a/python_pkg/lichess_bot/tests/test_puzzles.py b/python_pkg/lichess_bot/tests/test_puzzles.py index b084ecd..16455cc 100644 --- a/python_pkg/lichess_bot/tests/test_puzzles.py +++ b/python_pkg/lichess_bot/tests/test_puzzles.py @@ -19,7 +19,7 @@ def _load_top_puzzles(csv_path: str | Path, limit: int = 8) -> list[tuple[str, s puzzles: list[tuple[str, str]] = [] if not Path(csv_path).is_file(): return puzzles - with open(csv_path, newline="", encoding="utf-8") as f: + with Path(csv_path).open(newline="", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: fen = row["FEN"].strip() diff --git a/python_pkg/lichess_bot/tests/test_versioning.py b/python_pkg/lichess_bot/tests/test_versioning.py index 3bfd8e8..4dcaed3 100644 --- a/python_pkg/lichess_bot/tests/test_versioning.py +++ b/python_pkg/lichess_bot/tests/test_versioning.py @@ -21,5 +21,5 @@ def test_version_file_increments_and_persists( assert v2 == 2 # Ensure it persisted - with open(version_file) as f: + with version_file.open() as f: assert f.read().strip() == "2" diff --git a/python_pkg/lichess_bot/tools/generate_blunder_tests.py b/python_pkg/lichess_bot/tools/generate_blunder_tests.py index f0d6da4..b65294a 100755 --- a/python_pkg/lichess_bot/tools/generate_blunder_tests.py +++ b/python_pkg/lichess_bot/tools/generate_blunder_tests.py @@ -189,7 +189,7 @@ def ensure_unified_test_file(target_path: str | Path) -> None: if Path(target_path).exists(): return # Create skeleton unified test file - with open(target_path, "w", encoding="utf-8") as f: + with Path(target_path).open("w", encoding="utf-8") as f: f.write( """import os import sys @@ -246,7 +246,7 @@ def append_cases_to_unified_test( Returns the number of cases actually appended. """ ensure_unified_test_file(unified_path) - with open(unified_path, encoding="utf-8") as f: + with Path(unified_path).open(encoding="utf-8") as f: content = f.read() # Extract current cases as a set of (fen, uci) to de-duplicate @@ -320,7 +320,7 @@ def append_cases_to_unified_test( ) # Apply the changes (either updates to existing labels and/or appended lines) - with open(unified_path, "w", encoding="utf-8") as f: + with Path(unified_path).open("w", encoding="utf-8") as f: f.write(new_content) return len(lines) + updated_existing @@ -338,8 +338,10 @@ def _process_single_log(log_path: str | Path) -> int: unified = unified.resolve() added = append_cases_to_unified_test(unified, cases) _logger.info( - f"Appended {added} new blunder checks to " - f"{unified.relative_to(Path.cwd())} (game {game_id})." + "Appended %s new blunder checks to %s (game %s).", + added, + unified.relative_to(Path.cwd()), + game_id, ) return 0 @@ -371,10 +373,10 @@ def _parse_and_extract_blunders( def _read_log_file(log_path: str | Path) -> tuple[str | None, int | None]: """Read log file contents. Returns (text, None) or (None, error_code).""" try: - with open(log_path, encoding="utf-8") as fh: + with Path(log_path).open(encoding="utf-8") as fh: return fh.read(), None except FileNotFoundError: - _logger.exception(f"Log file not found: {log_path}") + _logger.exception("Log file not found: %s", log_path) return None, 2 @@ -383,10 +385,10 @@ def _parse_blunders(text: str, base: str) -> tuple[list[Blunder] | None, int | N try: blunders = parse_columns_for_blunders(text) except Exception: - _logger.exception(f"Error parsing Columns in {base}") + _logger.exception("Error parsing Columns in %s", base) return None, 2 if not blunders: - _logger.warning(f"No blunders found in Columns section: {base}") + _logger.warning("No blunders found in Columns section: %s", base) return None, 1 return blunders, None @@ -397,16 +399,18 @@ def _extract_cases( """Extract FEN/UCI cases from PGN. Returns (cases, None) or (None, error_code).""" pgn_text = extract_pgn(text) if not pgn_text: - _logger.warning(f"No PGN section found: {base}") + _logger.warning("No PGN section found: %s", base) return None, 1 try: cases = fen_and_uci_for_blunders(pgn_text, blunders) except Exception: - _logger.exception(f"Error converting SAN to UCI in {base}") + _logger.exception("Error converting SAN to UCI in %s", base) return None, 2 if not cases: - _logger.warning(f"Failed to reconstruct any blunder positions from PGN: {base}") + _logger.warning( + "Failed to reconstruct any blunder positions from PGN: %s", base + ) return None, 1 return cases, None @@ -419,7 +423,7 @@ def main(argv: list[str]) -> int: # No argument: process all logs in past_games if len(argv) == 1: if not past_dir.is_dir(): - _logger.error(f"No past_games directory found at {past_dir}") + _logger.error("No past_games directory found at %s", past_dir) return 2 logs = [ path @@ -427,7 +431,7 @@ def main(argv: list[str]) -> int: if re.match(r"lichess_bot_game_[A-Za-z0-9]+\.log$", path.name) ] if not logs: - _logger.warning(f"No logs found in {past_dir}") + _logger.warning("No logs found in %s", past_dir) return 1 # Sort by mtime ascending for determinism logs.sort(key=lambda p: Path(p).stat().st_mtime) @@ -437,8 +441,11 @@ def main(argv: list[str]) -> int: if rc == 0: ok += 1 _logger.info( - f"Processed {len(logs)} logs from {past_dir}, " - f"succeeded: {ok}, failed: {len(logs) - ok}" + "Processed %s logs from %s, succeeded: %s, failed: %s", + len(logs), + past_dir, + ok, + len(logs) - ok, ) return 0 if ok > 0 else 1 diff --git a/python_pkg/lichess_bot/utils.py b/python_pkg/lichess_bot/utils.py index 248c796..cf63844 100644 --- a/python_pkg/lichess_bot/utils.py +++ b/python_pkg/lichess_bot/utils.py @@ -27,7 +27,7 @@ def get_and_increment_version() -> int: path = _version_file_path() current = 0 try: - with open(path) as f: + with Path(path).open() as f: raw = f.read().strip() if raw: current = int(raw) @@ -38,13 +38,13 @@ def get_and_increment_version() -> int: new_version = current + 1 try: tmp_path = Path(path + ".tmp") - with open(tmp_path, "w") as f: + with tmp_path.open("w") as f: f.write(str(new_version)) tmp_path.replace(path) except OSError: # As a fallback, try a direct write; failure is non-fatal to bot operation try: - with open(path, "w") as f: + with Path(path).open("w") as f: f.write(str(new_version)) except OSError: _logger.debug("Could not persist bot version to %s", path) diff --git a/python_pkg/scrape_website/scrape_comics.py b/python_pkg/scrape_website/scrape_comics.py index 09e8828..225cf84 100644 --- a/python_pkg/scrape_website/scrape_comics.py +++ b/python_pkg/scrape_website/scrape_comics.py @@ -43,7 +43,7 @@ def download_image(url: str) -> bool: return False _logger.info("Downloading image from URL: %s", url) img_data = requests.get(url, timeout=REQUEST_TIMEOUT).content - with open(image_path, "wb") as handler: + with image_path.open("wb") as handler: handler.write(img_data) _logger.info("Image %s downloaded successfully", image_name) return True diff --git a/python_pkg/stockfish_analysis/analyze_chess_game.py b/python_pkg/stockfish_analysis/analyze_chess_game.py index f22b94b..01f088d 100755 --- a/python_pkg/stockfish_analysis/analyze_chess_game.py +++ b/python_pkg/stockfish_analysis/analyze_chess_game.py @@ -185,7 +185,7 @@ def _detect_total_mem_mb() -> int | None: # Fallback approach for Linux systems using proc meminfo. with ( contextlib.suppress(Exception), - open("/proc/meminfo", encoding="utf-8", errors="ignore") as f, + Path("/proc/meminfo").open(encoding="utf-8", errors="ignore") as f, ): for line in f: if line.startswith("MemTotal:"): @@ -301,10 +301,10 @@ def _build_argument_parser() -> argparse.ArgumentParser: def _load_game(file_path: str) -> chess.pgn.Game: """Load and parse a chess game from a file.""" if not Path(file_path).is_file(): - _logger.error(f"Input not found: {file_path}") + _logger.error("Input not found: %s", file_path) sys.exit(1) - with open(file_path, encoding="utf-8", errors="replace") as f: + with Path(file_path).open(encoding="utf-8", errors="replace") as f: raw = f.read() pgn_text = extract_pgn_text(raw) @@ -398,7 +398,7 @@ def _setup_engine( try: engine = chess.engine.SimpleEngine.popen_uci([args.engine]) except FileNotFoundError: - _logger.exception(f"Could not launch engine at: {args.engine}") + _logger.exception("Could not launch engine at: %s", args.engine) _logger.exception( "Ensure Stockfish is installed and in PATH, or specify with --engine." ) @@ -435,11 +435,13 @@ def _log_engine_config( hash_show = None if hash_show is not None: _logger.info( - f"Using engine options: Threads={threads}, " - f"Hash={hash_show} MB, MultiPV={multipv}" + "Using engine options: Threads=%s, Hash=%s MB, MultiPV=%s", + threads, + hash_show, + multipv, ) else: - _logger.info(f"Using engine options: Threads={threads}, MultiPV={multipv}") + _logger.info("Using engine options: Threads=%s, MultiPV=%s", threads, multipv) def _get_best_move( @@ -539,10 +541,15 @@ def _log_move_analysis(ply: int, result: MoveAnalysis, *, mover_white: bool) -> side = "W" if mover_white else "B" loss_str = str(result.cp_loss) if result.cp_loss is not None else "—" _logger.info( - f"{ply:>3} {side} {result.san:<8} " - f"{fmt_eval(result.played_cp, result.played_mate):>10} " - f"{fmt_eval(result.best_cp, result.best_mate):>9} " - f"{loss_str:>5} {result.classification:<12} {result.best_san}" + "%3d %s %-8s %10s %9s %5s %-12s %s", + ply, + side, + result.san, + fmt_eval(result.played_cp, result.played_mate), + fmt_eval(result.best_cp, result.best_mate), + loss_str, + result.classification, + result.best_san, ) @@ -555,7 +562,7 @@ def _run_analysis( white = game.headers.get("White", "White") black = game.headers.get("Black", "Black") result = game.headers.get("Result", "*") - _logger.info(f" {white} vs {black} Result: {result}") + _logger.info(" %s vs %s Result: %s", white, black, result) _logger.info("") _logger.info( "Columns: ply side move played_eval best_eval loss class best_suggestion"