fix(lint): fix G004 and PTH123 violations across codebase

- Convert f-string logging to % style (G004)
- Convert open() to Path.open() (PTH123)
- Remove G004 and PTH123 from global ignores in pyproject.toml
This commit is contained in:
Krzysztof kuhy Rudnicki 2025-11-30 23:57:49 +01:00
parent 6f1dfb769f
commit 8ecc13cb56
12 changed files with 110 additions and 84 deletions

View File

@ -34,10 +34,6 @@ ignore = [
# Formatter conflicts - these rules conflict with ruff format
"COM812", # Trailing comma missing (conflicts with formatter)
"ISC001", # Implicit string concatenation (conflicts with formatter)
# Logging style preference - f-strings are more readable
"G004", # Logging statement uses f-string (stylistic preference)
# Path style preference - open() with Path objects is valid Python
"PTH123", # open() should be Path.open() (style preference, not required)
]
# Allow ALL rules to be auto-fixed

View File

@ -31,7 +31,7 @@ def _download_single_image(url: str) -> None:
image_path = Path("./CATS2/") / image_name
# Save the image to the directory
with open(image_path, "wb") as file:
with image_path.open("wb") as file:
file.write(response.content)
_logger.info("Saved %s as %s", url, image_path)

View File

@ -83,12 +83,12 @@ def main() -> int:
else:
out_path = Path(out_path)
with open(input_path, encoding="utf-8", errors="ignore") as f:
with input_path.open(encoding="utf-8", errors="ignore") as f:
html_text = f.read()
hosts = extract_hosts_from_html(html_text)
with open(out_path, "w", encoding="utf-8") as f:
with out_path.open("w", encoding="utf-8") as f:
f.writelines(f"*{host}*\n" for host in hosts)
_logger.info("Wrote %s host(s) to %s", len(hosts), out_path)

View File

@ -103,7 +103,7 @@ class KeyboardCoopGame:
"""Load dictionary from words_dictionary.json file."""
try:
dictionary_path = Path(__file__).parent / "words_dictionary.json"
with open(dictionary_path, encoding="utf-8") as f:
with dictionary_path.open(encoding="utf-8") as f:
dictionary_data = json.load(f)
# Convert to set for faster lookup (we only need the keys)
return set(dictionary_data.keys())

View File

@ -76,14 +76,14 @@ def _apply_move_to_board(board: chess.Board, move: str, game_id: str) -> None:
try:
board.push_uci(move)
except ValueError:
_logger.debug(f"Game {game_id}: could not apply move {move}")
_logger.debug("Game %s: could not apply move %s", game_id, move)
def _init_game_log(game_id: str, bot_version: int) -> Path | None:
"""Initialize the game log file."""
game_log_path = Path.cwd() / f"lichess_bot_game_{game_id}.log"
try:
with open(game_log_path, "w") as lf:
with game_log_path.open("w") as lf:
lf.write(f"game {game_id} started\n")
lf.write(f"bot_version v{bot_version}\n")
except OSError:
@ -192,7 +192,7 @@ def _log_move_to_file(
) -> None:
"""Log a move to the game log file."""
if log_path:
with open(log_path, "a") as lf:
with log_path.open("a") as lf:
lf.write(f"ply {ply}: {move.uci()}\n{reason}\n\n")
@ -209,7 +209,7 @@ def _attempt_move(
)
if move is None:
_logger.info(f"Game {meta.game_id}: no legal moves (game likely over)")
_logger.info("Game %s: no legal moves (game likely over)", meta.game_id)
return False
time_left_sec = (state.my_ms or 0) / 1000.0
@ -218,18 +218,21 @@ def _attempt_move(
try:
if move not in board.legal_moves:
_logger.info(
f"Game {meta.game_id}: selected move no longer legal; skipping send"
"Game %s: selected move no longer legal; skipping send", meta.game_id
)
else:
_logger.info(
f"Game {meta.game_id}: playing {move.uci()} "
f"(budget={budget:.2f}s, my_time_left={time_left_sec:.1f}s, "
f"inc={inc_sec:.2f}s)"
"Game %s: playing %s (budget=%.2fs, my_time_left=%.1fs, inc=%.2fs)",
meta.game_id,
move.uci(),
budget,
time_left_sec,
inc_sec,
)
_log_move_to_file(state.log_path, state.last_handled_len + 1, move, reason)
ctx.api.make_move(meta.game_id, move)
except requests.RequestException as e:
_logger.warning(f"Game {meta.game_id}: move {move.uci()} failed: {e}")
_logger.warning("Game %s: move %s failed: %s", meta.game_id, move.uci(), e)
return True
@ -260,7 +263,7 @@ def _handle_move_if_needed(
"""Handle making a move if it's our turn. Returns False if game ends."""
my_turn = _is_my_turn(state.board, state.color)
turn_str = "white" if state.board.turn else "black"
_logger.info(f"Game {meta.game_id}: turn={turn_str}, my_turn={my_turn}")
_logger.info("Game %s: turn=%s, my_turn=%s", meta.game_id, turn_str, my_turn)
# Move policy
allow_move = (et == "gameState") or (et == "gameFull" and new_len == 0)
@ -293,7 +296,7 @@ def _process_game_event(
# Extract moves and status based on event type
if event_type == "gameFull":
moves, status = _extract_game_full_data(event, state, meta, ctx.api)
_logger.info(f"Game {meta.game_id}: joined as {state.color} (gameFull)")
_logger.info("Game %s: joined as %s (gameFull)", meta.game_id, state.color)
else:
moves, status = _extract_game_state_data(event, state)
@ -301,12 +304,16 @@ def _process_game_event(
new_len = len(moves_list)
_logger.info(
f"Game {meta.game_id}: event={event_type}, moves={new_len}, color={state.color}"
"Game %s: event=%s, moves=%s, color=%s",
meta.game_id,
event_type,
new_len,
state.color,
)
if new_len == state.last_handled_len:
_logger.debug(
f"Game {meta.game_id}: position unchanged (len={new_len}), skipping"
"Game %s: position unchanged (len=%s), skipping", meta.game_id, new_len
)
return True
@ -314,7 +321,7 @@ def _process_game_event(
state.board = _rebuild_board_from_moves(moves_list, meta.game_id)
if state.color is None:
_logger.info(f"Game {meta.game_id}: color unknown yet; waiting for gameFull")
_logger.info("Game %s: color unknown yet; waiting for gameFull", meta.game_id)
if event_type == "gameState":
state.last_handled_len = new_len
return True
@ -324,7 +331,7 @@ def _process_game_event(
# Check for game end
if status in _GAME_END_STATUSES:
_logger.info(f"Game {meta.game_id} finished: {status}")
_logger.info("Game %s finished: %s", meta.game_id, status)
return False
return True
@ -344,7 +351,7 @@ def _write_pgn_to_log(log_path: Path, board: chess.Board, meta: GameMeta) -> Non
if meta.black_name:
game.headers["Black"] = meta.black_name
with open(log_path, "a") as lf:
with log_path.open("a") as lf:
lf.write("\nPGN:\n")
exporter = chess.pgn.StringExporter(
headers=True, variations=False, comments=False
@ -365,12 +372,15 @@ def _run_analysis_subprocess(
if not analyze_script.is_file():
_logger.info(
f"Game {game_id}: analysis script not found at {analyze_script}; "
"skipping analysis"
"Game %s: analysis script not found at %s; skipping analysis",
game_id,
analyze_script,
)
return None
_logger.info(f"Game {game_id}: starting post-game analysis ({total_plies} plies)")
_logger.info(
"Game %s: starting post-game analysis (%s plies)", game_id, total_plies
)
proc = subprocess.Popen( # noqa: S603 - trusted internal analysis script
[sys.executable, "-u", str(analyze_script), str(log_path)],
@ -399,11 +409,11 @@ def _run_analysis_subprocess(
analysis_text = "".join(lines)
if ret != 0:
_logger.warning(f"Game {game_id}: analysis script exited with code {ret}")
_logger.warning("Game %s: analysis script exited with code %s", game_id, ret)
if stderr_text:
analysis_text += "\n[stderr]\n" + stderr_text
_logger.info(f"Game {game_id}: analysis complete")
_logger.info("Game %s: analysis complete", game_id)
return analysis_text
@ -413,12 +423,16 @@ def _log_analysis_progress(game_id: str, analyzed: int, total_plies: int) -> Non
left = max(0, total_plies - analyzed)
pct = analyzed / total_plies * 100.0
_logger.info(
f"Game {game_id}: analysis progress "
f"{analyzed}/{total_plies} ({pct:.0f}%), left {left}"
"Game %s: analysis progress %s/%s (%.0f%%), left %s",
game_id,
analyzed,
total_plies,
pct,
left,
)
else:
_logger.info(
f"Game {game_id}: analysis progress {analyzed} plies (total unknown)"
"Game %s: analysis progress %s plies (total unknown)", game_id, analyzed
)
@ -427,7 +441,7 @@ def _insert_analysis_into_log(
) -> None:
"""Insert analysis text into the log file before PGN section."""
try:
with open(log_path, encoding="utf-8", errors="replace") as f:
with log_path.open(encoding="utf-8", errors="replace") as f:
content = f.read()
# Find insertion point (before PGN)
@ -453,10 +467,10 @@ def _insert_analysis_into_log(
analysis_block = f"{meta_block}ANALYSIS:\n{analysis_text.rstrip()}\n\n"
new_content = content[:insert_idx] + analysis_block + content[insert_idx:]
with open(log_path, "w", encoding="utf-8") as f:
with log_path.open("w", encoding="utf-8") as f:
f.write(new_content)
except OSError as e:
_logger.debug(f"Game {meta.game_id}: could not write analysis to log: {e}")
_logger.debug("Game %s: could not write analysis to log: %s", meta.game_id, e)
def _finalize_game(state: GameState, meta: GameMeta) -> None:
@ -467,7 +481,7 @@ def _finalize_game(state: GameState, meta: GameMeta) -> None:
try:
_write_pgn_to_log(state.log_path, state.board, meta)
except OSError as e:
_logger.debug(f"Game {meta.game_id}: could not write PGN: {e}")
_logger.debug("Game %s: could not write PGN: %s", meta.game_id, e)
return
# Run analysis
@ -483,12 +497,12 @@ def _finalize_game(state: GameState, meta: GameMeta) -> None:
if analysis_text:
_insert_analysis_into_log(state.log_path, analysis_text, meta)
except (subprocess.SubprocessError, OSError) as e:
_logger.debug(f"Game {meta.game_id}: analysis run failed: {e}")
_logger.debug("Game %s: analysis run failed: %s", meta.game_id, e)
def _handle_game(game_id: str, ctx: BotContext, my_color: str | None = None) -> None:
"""Handle a single game from start to finish."""
_logger.info(f"Starting game thread for {game_id} [bot v{ctx.bot_version}]")
_logger.info("Starting game thread for %s [bot v%s]", game_id, ctx.bot_version)
meta = GameMeta(game_id=game_id, bot_version=ctx.bot_version)
state = GameState(color=my_color)
@ -502,10 +516,10 @@ def _handle_game(game_id: str, ctx: BotContext, my_color: str | None = None) ->
if not _process_game_event(event, ctx, state, meta):
break
except requests.RequestException:
_logger.exception(f"Game {game_id} thread error")
_logger.exception("Game %s thread error", game_id)
finally:
_finalize_game(state, meta)
_logger.info(f"Ending game thread for {game_id}")
_logger.info("Ending game thread for %s", game_id)
def _handle_challenge(
@ -525,10 +539,12 @@ def _handle_challenge(
not_corr = speed != "correspondence" or not decline_correspondence
if variant == "standard" and perf_ok and not_corr:
_logger.info(f"Accepting challenge {ch_id} ({speed})")
_logger.info("Accepting challenge %s (%s)", ch_id, speed)
api.accept_challenge(str(ch_id))
else:
_logger.info(f"Declining challenge {ch_id} (variant={variant}, speed={speed})")
_logger.info(
"Declining challenge %s (variant=%s, speed=%s)", ch_id, variant, speed
)
api.decline_challenge(str(ch_id))
@ -567,10 +583,10 @@ def _process_bot_event(
game_data = event.get("game", {})
if isinstance(game_data, dict):
game_id = game_data.get("id", "")
_logger.info(f"Game finished event: {game_id}")
_logger.info("Game finished event: %s", game_id)
else:
_logger.debug(f"Unhandled event: {json.dumps(event)}")
_logger.debug("Unhandled event: %s", json.dumps(event))
def _stream_bot_events(ctx: BotContext) -> Iterator[dict[str, object]]:
@ -605,7 +621,7 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
_logger.info("Token present. Initializing client and engine...")
bot_version = get_and_increment_version()
_logger.info(f"Bot version: v{bot_version}")
_logger.info("Bot version: v%s", bot_version)
ctx = BotContext(
api=LichessAPI(token),
@ -623,7 +639,7 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
try:
backoff = _run_event_loop_iteration(ctx, game_threads)
except requests.RequestException as e: # noqa: PERF203 - intentional reconnection loop
_logger.warning(f"Event stream error: {e}")
_logger.warning("Event stream error: %s", e)
backoff = backoff_sleep(backoff)

View File

@ -19,7 +19,7 @@ def _load_top_puzzles(csv_path: str | Path, limit: int = 8) -> list[tuple[str, s
puzzles: list[tuple[str, str]] = []
if not Path(csv_path).is_file():
return puzzles
with open(csv_path, newline="", encoding="utf-8") as f:
with Path(csv_path).open(newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
fen = row["FEN"].strip()

View File

@ -21,5 +21,5 @@ def test_version_file_increments_and_persists(
assert v2 == 2
# Ensure it persisted
with open(version_file) as f:
with version_file.open() as f:
assert f.read().strip() == "2"

View File

@ -189,7 +189,7 @@ def ensure_unified_test_file(target_path: str | Path) -> None:
if Path(target_path).exists():
return
# Create skeleton unified test file
with open(target_path, "w", encoding="utf-8") as f:
with Path(target_path).open("w", encoding="utf-8") as f:
f.write(
"""import os
import sys
@ -246,7 +246,7 @@ def append_cases_to_unified_test(
Returns the number of cases actually appended.
"""
ensure_unified_test_file(unified_path)
with open(unified_path, encoding="utf-8") as f:
with Path(unified_path).open(encoding="utf-8") as f:
content = f.read()
# Extract current cases as a set of (fen, uci) to de-duplicate
@ -320,7 +320,7 @@ def append_cases_to_unified_test(
)
# Apply the changes (either updates to existing labels and/or appended lines)
with open(unified_path, "w", encoding="utf-8") as f:
with Path(unified_path).open("w", encoding="utf-8") as f:
f.write(new_content)
return len(lines) + updated_existing
@ -338,8 +338,10 @@ def _process_single_log(log_path: str | Path) -> int:
unified = unified.resolve()
added = append_cases_to_unified_test(unified, cases)
_logger.info(
f"Appended {added} new blunder checks to "
f"{unified.relative_to(Path.cwd())} (game {game_id})."
"Appended %s new blunder checks to %s (game %s).",
added,
unified.relative_to(Path.cwd()),
game_id,
)
return 0
@ -371,10 +373,10 @@ def _parse_and_extract_blunders(
def _read_log_file(log_path: str | Path) -> tuple[str | None, int | None]:
"""Read log file contents. Returns (text, None) or (None, error_code)."""
try:
with open(log_path, encoding="utf-8") as fh:
with Path(log_path).open(encoding="utf-8") as fh:
return fh.read(), None
except FileNotFoundError:
_logger.exception(f"Log file not found: {log_path}")
_logger.exception("Log file not found: %s", log_path)
return None, 2
@ -383,10 +385,10 @@ def _parse_blunders(text: str, base: str) -> tuple[list[Blunder] | None, int | N
try:
blunders = parse_columns_for_blunders(text)
except Exception:
_logger.exception(f"Error parsing Columns in {base}")
_logger.exception("Error parsing Columns in %s", base)
return None, 2
if not blunders:
_logger.warning(f"No blunders found in Columns section: {base}")
_logger.warning("No blunders found in Columns section: %s", base)
return None, 1
return blunders, None
@ -397,16 +399,18 @@ def _extract_cases(
"""Extract FEN/UCI cases from PGN. Returns (cases, None) or (None, error_code)."""
pgn_text = extract_pgn(text)
if not pgn_text:
_logger.warning(f"No PGN section found: {base}")
_logger.warning("No PGN section found: %s", base)
return None, 1
try:
cases = fen_and_uci_for_blunders(pgn_text, blunders)
except Exception:
_logger.exception(f"Error converting SAN to UCI in {base}")
_logger.exception("Error converting SAN to UCI in %s", base)
return None, 2
if not cases:
_logger.warning(f"Failed to reconstruct any blunder positions from PGN: {base}")
_logger.warning(
"Failed to reconstruct any blunder positions from PGN: %s", base
)
return None, 1
return cases, None
@ -419,7 +423,7 @@ def main(argv: list[str]) -> int:
# No argument: process all logs in past_games
if len(argv) == 1:
if not past_dir.is_dir():
_logger.error(f"No past_games directory found at {past_dir}")
_logger.error("No past_games directory found at %s", past_dir)
return 2
logs = [
path
@ -427,7 +431,7 @@ def main(argv: list[str]) -> int:
if re.match(r"lichess_bot_game_[A-Za-z0-9]+\.log$", path.name)
]
if not logs:
_logger.warning(f"No logs found in {past_dir}")
_logger.warning("No logs found in %s", past_dir)
return 1
# Sort by mtime ascending for determinism
logs.sort(key=lambda p: Path(p).stat().st_mtime)
@ -437,8 +441,11 @@ def main(argv: list[str]) -> int:
if rc == 0:
ok += 1
_logger.info(
f"Processed {len(logs)} logs from {past_dir}, "
f"succeeded: {ok}, failed: {len(logs) - ok}"
"Processed %s logs from %s, succeeded: %s, failed: %s",
len(logs),
past_dir,
ok,
len(logs) - ok,
)
return 0 if ok > 0 else 1

View File

@ -27,7 +27,7 @@ def get_and_increment_version() -> int:
path = _version_file_path()
current = 0
try:
with open(path) as f:
with Path(path).open() as f:
raw = f.read().strip()
if raw:
current = int(raw)
@ -38,13 +38,13 @@ def get_and_increment_version() -> int:
new_version = current + 1
try:
tmp_path = Path(path + ".tmp")
with open(tmp_path, "w") as f:
with tmp_path.open("w") as f:
f.write(str(new_version))
tmp_path.replace(path)
except OSError:
# As a fallback, try a direct write; failure is non-fatal to bot operation
try:
with open(path, "w") as f:
with Path(path).open("w") as f:
f.write(str(new_version))
except OSError:
_logger.debug("Could not persist bot version to %s", path)

View File

@ -43,7 +43,7 @@ def download_image(url: str) -> bool:
return False
_logger.info("Downloading image from URL: %s", url)
img_data = requests.get(url, timeout=REQUEST_TIMEOUT).content
with open(image_path, "wb") as handler:
with image_path.open("wb") as handler:
handler.write(img_data)
_logger.info("Image %s downloaded successfully", image_name)
return True

View File

@ -632,7 +632,7 @@ class ScreenLocker:
return False
try:
with open(self.log_file) as f:
with self.log_file.open() as f:
logs = json.load(f)
except (OSError, json.JSONDecodeError):
return False
@ -646,7 +646,7 @@ class ScreenLocker:
logs = {}
if self.log_file.exists():
try:
with open(self.log_file) as f:
with self.log_file.open() as f:
logs = json.load(f)
except (OSError, json.JSONDecodeError):
logs = {}
@ -660,10 +660,10 @@ class ScreenLocker:
# Save updated logs
try:
with open(self.log_file, "w") as f:
with self.log_file.open("w") as f:
json.dump(logs, f, indent=2)
except OSError as e:
_logger.warning(f"Could not save workout log: {e}")
_logger.warning("Could not save workout log: %s", e)
def close(self) -> None:
"""Close the application and exit."""

View File

@ -185,7 +185,7 @@ def _detect_total_mem_mb() -> int | None:
# Fallback approach for Linux systems using proc meminfo.
with (
contextlib.suppress(Exception),
open("/proc/meminfo", encoding="utf-8", errors="ignore") as f,
Path("/proc/meminfo").open(encoding="utf-8", errors="ignore") as f,
):
for line in f:
if line.startswith("MemTotal:"):
@ -301,10 +301,10 @@ def _build_argument_parser() -> argparse.ArgumentParser:
def _load_game(file_path: str) -> chess.pgn.Game:
"""Load and parse a chess game from a file."""
if not Path(file_path).is_file():
_logger.error(f"Input not found: {file_path}")
_logger.error("Input not found: %s", file_path)
sys.exit(1)
with open(file_path, encoding="utf-8", errors="replace") as f:
with Path(file_path).open(encoding="utf-8", errors="replace") as f:
raw = f.read()
pgn_text = extract_pgn_text(raw)
@ -398,7 +398,7 @@ def _setup_engine(
try:
engine = chess.engine.SimpleEngine.popen_uci([args.engine])
except FileNotFoundError:
_logger.exception(f"Could not launch engine at: {args.engine}")
_logger.exception("Could not launch engine at: %s", args.engine)
_logger.exception(
"Ensure Stockfish is installed and in PATH, or specify with --engine."
)
@ -435,11 +435,13 @@ def _log_engine_config(
hash_show = None
if hash_show is not None:
_logger.info(
f"Using engine options: Threads={threads}, "
f"Hash={hash_show} MB, MultiPV={multipv}"
"Using engine options: Threads=%s, Hash=%s MB, MultiPV=%s",
threads,
hash_show,
multipv,
)
else:
_logger.info(f"Using engine options: Threads={threads}, MultiPV={multipv}")
_logger.info("Using engine options: Threads=%s, MultiPV=%s", threads, multipv)
def _get_best_move(
@ -539,10 +541,15 @@ def _log_move_analysis(ply: int, result: MoveAnalysis, *, mover_white: bool) ->
side = "W" if mover_white else "B"
loss_str = str(result.cp_loss) if result.cp_loss is not None else ""
_logger.info(
f"{ply:>3} {side} {result.san:<8} "
f"{fmt_eval(result.played_cp, result.played_mate):>10} "
f"{fmt_eval(result.best_cp, result.best_mate):>9} "
f"{loss_str:>5} {result.classification:<12} {result.best_san}"
"%3d %s %-8s %10s %9s %5s %-12s %s",
ply,
side,
result.san,
fmt_eval(result.played_cp, result.played_mate),
fmt_eval(result.best_cp, result.best_mate),
loss_str,
result.classification,
result.best_san,
)
@ -555,7 +562,7 @@ def _run_analysis(
white = game.headers.get("White", "White")
black = game.headers.get("Black", "Black")
result = game.headers.get("Result", "*")
_logger.info(f" {white} vs {black} Result: {result}")
_logger.info(" %s vs %s Result: %s", white, black, result)
_logger.info("")
_logger.info(
"Columns: ply side move played_eval best_eval loss class best_suggestion"