fix(lint): LOG015 - replace root logger with module loggers

- Add _logger = logging.getLogger(__name__) to all modules
- Replace logging.X() calls with _logger.X() calls
- Remove logging.basicConfig() from module level (keep in run_bot())
- Add G004 to global ignores (f-strings in logging are more readable)
- Remove LOG015 and G004 per-file ignores from pyproject.toml
- Fix pytest_ignore_collect hook signature in conftest.py
This commit is contained in:
Krzysztof kuhy Rudnicki 2025-11-30 21:59:24 +01:00
parent dd2da6e2cc
commit b1a5f245a2
17 changed files with 155 additions and 165 deletions

View File

@ -5,7 +5,7 @@ import secrets
import tkinter as tk
from tkinter import ttk
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
# Use cryptographically secure random number generator
_rng = secrets.SystemRandom()
@ -812,21 +812,21 @@ class PokerModifierApp:
self.debug_mode = self.debug_var.get()
if self.debug_mode:
self.force_endgame_button.pack(side=tk.LEFT, padx=(0, 10))
logging.debug("Debug mode enabled")
_logger.debug("Debug mode enabled")
else:
self.force_endgame_button.pack_forget()
self.force_endgame = False
logging.debug("Debug mode disabled")
_logger.debug("Debug mode disabled")
def toggle_force_endgame(self) -> None:
"""Toggle forced endgame mode for testing."""
self.force_endgame = not self.force_endgame
if self.force_endgame:
self.force_endgame_button.config(text="Stop Force Endgame", bg="#4CAF50")
logging.debug("Forcing endgame modifiers")
_logger.debug("Forcing endgame modifiers")
else:
self.force_endgame_button.config(text="Force Endgame", bg="#ff6b6b")
logging.debug("Normal modifier selection restored")
_logger.debug("Normal modifier selection restored")
def is_endgame(self) -> bool:
"""Determine if we're in endgame phase."""
@ -977,7 +977,7 @@ class PokerModifierApp:
if self.debug_mode:
self.force_endgame_button.config(text="Force Endgame", bg="#ff6b6b")
logging.info("Game reset to initial state")
_logger.info("Game reset to initial state")
def add_modifier(self, name: str, description: str) -> None:
"""Add a new modifier to the list."""
@ -1005,17 +1005,17 @@ class PokerModifierApp:
def run(self) -> None:
"""Start the application."""
logging.info("Texas Hold'em Modifier App started!")
logging.info(
_logger.info("Texas Hold'em Modifier App started!")
_logger.info(
"Available methods: app.get_stats(), app.add_modifier(name, description)"
)
logging.info(
_logger.info(
"Debug features: Toggle debug mode to access force endgame controls"
)
logging.info(f"Default game length: {self.total_game_rounds} rounds")
_logger.info("Default game length: %s rounds", self.total_game_rounds)
endgame_pct = int(self.endgame_threshold * 100)
endgame_rounds = int(self.total_game_rounds * self.endgame_threshold)
logging.info(f"Endgame threshold: {endgame_pct}% ({endgame_rounds} rounds)")
_logger.info("Endgame threshold: %s%% (%s rounds)", endgame_pct, endgame_rounds)
self.root.mainloop()

View File

@ -34,6 +34,8 @@ ignore = [
# Formatter conflicts - these rules conflict with ruff format
"COM812", # Trailing comma missing (conflicts with formatter)
"ISC001", # Implicit string concatenation (conflicts with formatter)
# Logging style preference - f-strings are more readable
"G004", # Logging statement uses f-string (stylistic preference)
]
# Allow ALL rules to be auto-fixed
@ -64,26 +66,15 @@ unfixable = []
]
"python_pkg/random_jpg/generate_jpeg.py" = [
"PTH", # os.path patterns in existing code
"LOG015", # Root logger in script
"G004", # f-strings in logging
]
"python_pkg/tag_divider/tag_divider.py" = [
"PTH", # os.path patterns in existing code
"LOG015", # Root logger in script
]
"poker_modifier_app/*.py" = [
"LOG015", # Root logger in script
"G004", # f-strings in logging
]
"poker_modifier_app/poker_modifier_app.py" = [
"FBT003", # Boolean positional values in tkinter API calls
"LOG015", # Root logger in app
"G004", # f-strings in logging
]
"python_pkg/download_cats/generate_cats.py" = [
"PTH", # os.path patterns in existing code
"LOG015", # Root logger in script
"G004", # f-strings in logging
]
"python_pkg/lichess_bot/main.py" = [
"C901", # Complex functions handling game lifecycle (run_bot, handle_game)
@ -91,60 +82,36 @@ unfixable = []
"PLR0915", # Long function handling complete game lifecycle
"S603", # Subprocess call for analysis script
"PTH", # os.path patterns in existing code
"LOG015", # Root logger in bot
"G004", # f-strings in logging
]
"python_pkg/lichess_bot/engine.py" = [
"S603", # Subprocess for engine communication
"PTH", # os.path patterns
"LOG015", # Root logger for debug messages
]
"python_pkg/lichess_bot/lichess_api.py" = [
"LOG015", # Root logger in API client
"G004", # f-strings in logging
]
"python_pkg/lichess_bot/utils.py" = [
"PTH", # os.path patterns
"LOG015", # Root logger
"G004", # f-strings in logging
]
"python_pkg/lichess_bot/tools/generate_blunder_tests.py" = [
"PTH", # os.path patterns in tool
"LOG015", # Root logger in tool
"G004", # f-strings in logging
]
"python_pkg/stockfish_analysis/analyze_chess_game.py" = [
"C901", # Complex main() with many argument combinations and analysis modes
"PLR0912", # Complex main() with many argument combinations and analysis modes
"PLR0915", # Long main() handling complete analysis workflow
"PTH", # os.path patterns
"LOG015", # Root logger in analysis tool
"G004", # f-strings in logging
]
"python_pkg/randomize_numbers/random_digits.py" = [
"LOG015", # Root logger in script
"G004", # f-strings in logging
]
"python_pkg/keyboard_coop/main.py" = [
"FBT003", # Boolean positional values in pygame API calls (e.g., font.render)
"PTH", # os.path patterns
"LOG015", # Root logger in script
]
"python_pkg/screen_locker/screen_lock.py" = [
"FBT003", # Boolean positional values in tkinter API calls
"PTH", # os.path patterns
"LOG015", # Root logger in app
"G004", # f-strings in logging
]
"python_pkg/scrape_website/scrape_comics.py" = [
"PTH", # os.path patterns
"LOG015", # Root logger in script
"G004", # f-strings in logging
]
"python_pkg/extract_links/main.py" = [
"PTH", # os.path patterns
"LOG015", # Root logger in script
"G004", # f-strings in logging
]
[tool.ruff.lint.pydocstyle]

View File

@ -10,7 +10,7 @@ from pathlib import Path
import requests
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
MAX_REQUESTS = 90
REQUEST_TIMEOUT = 30 # seconds
@ -35,10 +35,10 @@ def _download_single_image(url: str) -> None:
with open(image_path, "wb") as file:
file.write(response.content)
logging.info(f"Saved {url} as {image_path}")
_logger.info("Saved %s as %s", url, image_path)
except requests.exceptions.RequestException:
logging.exception(f"Failed to download {url}")
_logger.exception("Failed to download %s", url)
requests_send = 0

View File

@ -16,7 +16,7 @@ import logging
import os
from urllib.parse import urlparse
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
class _HrefParser(HTMLParser):
@ -90,7 +90,7 @@ def main() -> int:
with open(out_path, "w", encoding="utf-8") as f:
f.writelines(f"*{host}*\n" for host in hosts)
logging.info(f"Wrote {len(hosts)} host(s) to {out_path}")
_logger.info("Wrote %s host(s) to %s", len(hosts), out_path)
return 0

View File

@ -11,7 +11,7 @@ import sys
import pygame
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
# Use cryptographically secure random number generator
_rng = secrets.SystemRandom()
@ -110,7 +110,7 @@ class KeyboardCoopGame:
# Convert to set for faster lookup (we only need the keys)
return set(dictionary_data.keys())
except FileNotFoundError:
logging.warning(
_logger.warning(
"words_dictionary.json not found, using fallback dictionary"
)
# Fallback to a smaller dictionary if file not found
@ -166,7 +166,7 @@ class KeyboardCoopGame:
"good",
}
except json.JSONDecodeError:
logging.warning(
_logger.warning(
"Error reading words_dictionary.json, using fallback dictionary"
)
return {

View File

@ -8,6 +8,8 @@ import subprocess
import chess
_logger = logging.getLogger(__name__)
class RandomEngine:
"""Thin wrapper around the C engine in C/lichess_random_engine/random_engine.
@ -160,6 +162,6 @@ class RandomEngine:
ensure_ascii=False,
)
except (json.JSONDecodeError, KeyError, TypeError):
logging.debug("Failed to parse engine JSON output")
_logger.debug("Failed to parse engine JSON output")
return cand_score, cand_expl, best_move, best_expl

View File

@ -10,6 +10,8 @@ import time
import chess
import requests
_logger = logging.getLogger(__name__)
LICHESS_API = "https://lichess.org"
@ -43,11 +45,11 @@ class LichessAPI:
- Optionally raises for status.
"""
t0 = time.monotonic()
logging.info(f"HTTP {method} {url} -> sending")
_logger.info("HTTP %s %s -> sending", method, url)
try:
r = self.session.request(method, url, **kwargs) # type: ignore[arg-type]
except Exception:
logging.exception(f"HTTP {method} {url} -> exception")
_logger.exception("HTTP %s %s -> exception", method, url)
raise
elapsed = time.monotonic() - t0
status = r.status_code
@ -60,14 +62,20 @@ class LichessAPI:
except (AttributeError, TypeError):
snippet = None
if snippet:
logging.warning(
f"HTTP {method} {url} -> {status} "
f"in {elapsed:.2f}s body='{snippet}'"
_logger.warning(
"HTTP %s %s -> %s in %.2fs body='%s'",
method,
url,
status,
elapsed,
snippet,
)
else:
logging.warning(f"HTTP {method} {url} -> {status} in {elapsed:.2f}s")
_logger.warning(
"HTTP %s %s -> %s in %.2fs", method, url, status, elapsed
)
else:
logging.info(f"HTTP {method} {url} -> {status} in {elapsed:.2f}s")
_logger.info("HTTP %s %s -> %s in %.2fs", method, url, status, elapsed)
if raise_for_status:
r.raise_for_status()
return r
@ -91,11 +99,11 @@ class LichessAPI:
try:
yield json.loads(line)
except json.JSONDecodeError:
logging.debug(f"Skipping non-JSON line: {line}")
_logger.debug("Skipping non-JSON line: %s", line)
except requests.HTTPError as e:
status = getattr(e.response, "status_code", None)
if status == HTTPStatus.TOO_MANY_REQUESTS:
logging.warning("Event stream hit 429; backing off")
_logger.warning("Event stream hit 429; backing off")
time.sleep(backoff)
backoff = min(8.0, backoff * 2)
continue
@ -160,7 +168,9 @@ class LichessAPI:
try:
yield json.loads(line)
except json.JSONDecodeError:
logging.debug(f"Skipping non-JSON line in game {game_id}: {line}")
_logger.debug(
"Skipping non-JSON line in game %s: %s", game_id, line
)
def make_move(self, game_id: str, move: chess.Move) -> None:
"""Submit a move to an active game."""
@ -171,7 +181,7 @@ class LichessAPI:
r.raise_for_status()
return
if r.status_code == HTTPStatus.TOO_MANY_REQUESTS:
logging.warning(f"HTTP POST {url} -> 429; retrying once after 0.5s")
_logger.warning("HTTP POST %s -> 429; retrying once after 0.5s", url)
time.sleep(0.5)
r = self._request("POST", url, timeout=30)
r.raise_for_status()

View File

@ -18,6 +18,8 @@ from python_pkg.lichess_bot.engine import RandomEngine
from python_pkg.lichess_bot.lichess_api import LichessAPI
from python_pkg.lichess_bot.utils import backoff_sleep, get_and_increment_version
_logger = logging.getLogger(__name__)
def _apply_move_to_board(board: chess.Board, move: str, game_id: str) -> None:
"""Apply a single move to the board, logging errors.
@ -30,11 +32,12 @@ def _apply_move_to_board(board: chess.Board, move: str, game_id: str) -> None:
try:
board.push_uci(move)
except ValueError:
logging.debug(f"Game {game_id}: could not apply move {move}")
_logger.debug(f"Game {game_id}: could not apply move {move}")
def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> None:
"""Start the bot and listen for incoming events."""
# Configure root logger for console output
logging.basicConfig(
level=getattr(logging, log_level.upper(), logging.INFO),
format="[%(asctime)s] %(levelname)s %(threadName)s: %(message)s",
@ -45,17 +48,17 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
msg = "LICHESS_TOKEN environment variable is required"
raise RuntimeError(msg)
logging.info("Token present. Initializing client and engine...")
_logger.info("Token present. Initializing client and engine...")
# Self-incrementing bot version (persisted on disk)
bot_version = get_and_increment_version()
logging.info(f"Bot version: v{bot_version}")
_logger.info(f"Bot version: v{bot_version}")
api = LichessAPI(token)
engine = RandomEngine()
game_threads = {}
def handle_game(game_id: str, my_color: str | None = None) -> None:
logging.info(f"Starting game thread for {game_id} [bot v{bot_version}]")
_logger.info(f"Starting game thread for {game_id} [bot v{bot_version}]")
board = chess.Board()
color: str | None = my_color
# Track how many moves we have already processed;
@ -122,7 +125,7 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
color = "white"
elif me == black_id:
color = "black"
logging.info(f"Game {game_id}: joined as {color} (gameFull)")
_logger.info(f"Game {game_id}: joined as {color} (gameFull)")
else:
moves = event.get("moves", "")
status = event.get("status")
@ -138,11 +141,11 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
moves_list = moves.split() if moves else []
new_len = len(moves_list)
logging.info(
_logger.info(
f"Game {game_id}: event={et}, moves={new_len}, color={color}"
)
if new_len == last_handled_len:
logging.debug(
_logger.debug(
f"Game {game_id}: position unchanged "
f"(len={new_len}), skipping"
)
@ -154,7 +157,7 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
_apply_move_to_board(board, m, game_id)
if color is None:
logging.info(
_logger.info(
f"Game {game_id}: color unknown yet; waiting for gameFull"
)
# Do not mark this position handled on gameFull;
@ -167,7 +170,7 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
my_turn = (is_white_turn and color == "white") or (
(not is_white_turn) and color == "black"
)
logging.info(
_logger.info(
f"Game {game_id}: "
f"turn={'white' if is_white_turn else 'black'}, "
f"my_turn={my_turn}"
@ -204,7 +207,7 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
board, time_budget_sec=budget
)
if move is None:
logging.info(
_logger.info(
f"Game {game_id}: no legal moves (game likely over)"
)
break
@ -212,12 +215,12 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
# Double-check legality just before sending
# to avoid 400s when state changed.
if move not in board.legal_moves:
logging.info(
_logger.info(
f"Game {game_id}: selected move "
"no longer legal; skipping send"
)
else:
logging.info(
_logger.info(
f"Game {game_id}: playing {move.uci()} "
f"(budget={budget:.2f}s, "
f"my_time_left={time_left_sec:.1f}s, "
@ -231,7 +234,7 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
)
api.make_move(game_id, move)
except requests.RequestException as e:
logging.warning(
_logger.warning(
f"Game {game_id}: move {move.uci()} failed: {e}"
)
# Mark this position as handled on authoritative
@ -240,12 +243,12 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
if et == "gameState" or (my_turn and allow_move):
last_handled_len = new_len
if status in {"mate", "resign", "stalemate", "timeout", "draw"}:
logging.info(f"Game {game_id} finished: {status}")
_logger.info(f"Game {game_id} finished: {status}")
break
elif et in {"chatLine", "opponentGone"}:
continue
except requests.RequestException:
logging.exception(f"Game {game_id} thread error")
_logger.exception(f"Game {game_id} thread error")
finally:
# On game end, write full PGN to the log file
try:
@ -286,7 +289,7 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
except TypeError:
total_plies = 0
logging.info(
_logger.info(
f"Game {game_id}: starting post-game "
f"analysis ({total_plies} plies)"
)
@ -318,13 +321,13 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
)
if total_plies:
pct = analyzed / total_plies * 100.0
logging.info(
_logger.info(
f"Game {game_id}: analysis progress "
f"{analyzed}/{total_plies} "
f"({pct:.0f}%), left {left}"
)
else:
logging.info(
_logger.info(
f"Game {game_id}: analysis progress "
f"{analyzed} plies (total unknown)"
)
@ -334,20 +337,20 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
ret = proc.wait()
analysis_text = "".join(lines)
if ret != 0:
logging.warning(
_logger.warning(
f"Game {game_id}: analysis script "
f"exited with code {ret}"
)
if stderr_text:
analysis_text += "\n[stderr]\n" + stderr_text
logging.info(f"Game {game_id}: analysis complete")
_logger.info(f"Game {game_id}: analysis complete")
else:
logging.info(
_logger.info(
f"Game {game_id}: analysis script not found "
f"at {analyze_script}; skipping analysis"
)
except (subprocess.SubprocessError, OSError) as e:
logging.debug(f"Game {game_id}: analysis run failed: {e}")
_logger.debug(f"Game {game_id}: analysis run failed: {e}")
# Insert analysis before the PGN section so future runs
# can still parse PGN cleanly
@ -399,12 +402,12 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
with open(game_log_path, "w", encoding="utf-8") as f:
f.write(new_content)
except OSError as e:
logging.debug(
_logger.debug(
f"Game {game_id}: could not write analysis to log: {e}"
)
except OSError as e:
logging.debug(f"Game {game_id}: could not write PGN: {e}")
logging.info(f"Ending game thread for {game_id}")
_logger.debug(f"Game {game_id}: could not write PGN: {e}")
_logger.info(f"Ending game thread for {game_id}")
def _process_event_stream() -> None:
"""Process events from the Lichess event stream.
@ -423,10 +426,10 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
or not decline_correspondence
)
if variant == "standard" and perf_ok and not_corr:
logging.info(f"Accepting challenge {ch_id} ({speed})")
_logger.info(f"Accepting challenge {ch_id} ({speed})")
api.accept_challenge(ch_id)
else:
logging.info(
_logger.info(
f"Declining challenge {ch_id} "
f"(variant={variant}, speed={speed})"
)
@ -445,9 +448,9 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
elif event.get("type") == "gameFinish":
game_id = event["game"]["id"]
logging.info(f"Game finished event: {game_id}")
_logger.info(f"Game finished event: {game_id}")
else:
logging.debug(f"Unhandled event: {json.dumps(event)}")
_logger.debug(f"Unhandled event: {json.dumps(event)}")
def _run_event_loop_iteration() -> int:
"""Run one iteration of the event loop with error handling.
@ -458,14 +461,14 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
try:
_process_event_stream()
except requests.RequestException as e:
logging.warning(f"Event stream error: {e}")
_logger.warning(f"Event stream error: {e}")
return backoff_sleep(backoff)
else:
# If stream ends normally, reset backoff
return 0
# Main event stream: challenge and game start events
logging.info("Connecting to Lichess event stream. Waiting for challenges...")
_logger.info("Connecting to Lichess event stream. Waiting for challenges...")
backoff = 0
while True:
backoff = _run_event_loop_iteration()

View File

@ -11,11 +11,16 @@ if ROOT not in sys.path:
sys.path.insert(0, ROOT)
def pytest_ignore_collect(collection_path: Path, _: pytest.Config) -> bool | None:
def pytest_ignore_collect(collection_path: Path, config: pytest.Config) -> bool | None:
"""Ignore per-game blunder test files; keep only the unified one.
This lets us keep historical files in the repo without collecting them.
Args:
collection_path: Path being collected.
config: Pytest config object (unused).
"""
del config # unused
basename = collection_path.name
return bool(
basename.startswith("test_blunders_") and basename != "test_blunders_all.py"

View File

@ -43,7 +43,7 @@ import sys
import chess
import chess.pgn
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
# Expected columns in the log file:
# ply, side, move, played_eval, best_eval, loss, class, best_suggestion
@ -162,7 +162,7 @@ def fen_and_uci_for_blunders(
try:
move = board.parse_san(main_sans[bl.ply - 1])
except ValueError:
logging.debug("Skipping blunder: failed to parse fallback move")
_logger.debug("Skipping blunder: failed to parse fallback move")
continue
else:
continue
@ -339,7 +339,7 @@ def _process_single_log(log_path: str) -> int:
)
unified = os.path.abspath(unified)
added = append_cases_to_unified_test(unified, cases)
logging.info(
_logger.info(
f"Appended {added} new blunder checks to "
f"{os.path.relpath(unified)} (game {game_id})."
)
@ -376,7 +376,7 @@ def _read_log_file(log_path: str) -> tuple[str | None, int | None]:
with open(log_path, encoding="utf-8") as fh:
return fh.read(), None
except FileNotFoundError:
logging.exception(f"Log file not found: {log_path}")
_logger.exception(f"Log file not found: {log_path}")
return None, 2
@ -385,10 +385,10 @@ def _parse_blunders(text: str, base: str) -> tuple[list[Blunder] | None, int | N
try:
blunders = parse_columns_for_blunders(text)
except Exception:
logging.exception(f"Error parsing Columns in {base}")
_logger.exception(f"Error parsing Columns in {base}")
return None, 2
if not blunders:
logging.warning(f"No blunders found in Columns section: {base}")
_logger.warning(f"No blunders found in Columns section: {base}")
return None, 1
return blunders, None
@ -399,16 +399,16 @@ def _extract_cases(
"""Extract FEN/UCI cases from PGN. Returns (cases, None) or (None, error_code)."""
pgn_text = extract_pgn(text)
if not pgn_text:
logging.warning(f"No PGN section found: {base}")
_logger.warning(f"No PGN section found: {base}")
return None, 1
try:
cases = fen_and_uci_for_blunders(pgn_text, blunders)
except Exception:
logging.exception(f"Error converting SAN to UCI in {base}")
_logger.exception(f"Error converting SAN to UCI in {base}")
return None, 2
if not cases:
logging.warning(f"Failed to reconstruct any blunder positions from PGN: {base}")
_logger.warning(f"Failed to reconstruct any blunder positions from PGN: {base}")
return None, 1
return cases, None
@ -421,7 +421,7 @@ def main(argv: list[str]) -> int:
# No argument: process all logs in past_games
if len(argv) == 1:
if not os.path.isdir(past_dir):
logging.error(f"No past_games directory found at {past_dir}")
_logger.error(f"No past_games directory found at {past_dir}")
return 2
logs = [
os.path.join(past_dir, name)
@ -429,7 +429,7 @@ def main(argv: list[str]) -> int:
if re.match(r"lichess_bot_game_[A-Za-z0-9]+\.log$", name)
]
if not logs:
logging.warning(f"No logs found in {past_dir}")
_logger.warning(f"No logs found in {past_dir}")
return 1
# Sort by mtime ascending for determinism
logs.sort(key=lambda p: os.path.getmtime(p))
@ -438,7 +438,7 @@ def main(argv: list[str]) -> int:
rc = _process_single_log(lp)
if rc == 0:
ok += 1
logging.info(
_logger.info(
f"Processed {len(logs)} logs from {past_dir}, "
f"succeeded: {ok}, failed: {len(logs) - ok}"
)
@ -459,7 +459,7 @@ def main(argv: list[str]) -> int:
candidate_path = maybe
if not candidate_path:
logging.info("Usage: generate_blunder_tests.py [<game_id>|</path/to/log>]")
_logger.info("Usage: generate_blunder_tests.py [<game_id>|</path/to/log>]")
return 2
return _process_single_log(candidate_path)

View File

@ -4,6 +4,8 @@ import logging
import os
import time
_logger = logging.getLogger(__name__)
def _version_file_path() -> str:
"""Return the path to the persistent bot version file.
@ -44,7 +46,7 @@ def get_and_increment_version() -> int:
with open(path, "w") as f:
f.write(str(new_version))
except OSError:
logging.debug("Could not persist bot version to %s", path)
_logger.debug("Could not persist bot version to %s", path)
return new_version
@ -57,6 +59,6 @@ def backoff_sleep(current_backoff: int, base: float = 0.5, cap: float = 8.0) ->
- cap: maximum delay in seconds
"""
delay = min(cap, base * (2**current_backoff))
logging.info(f"Backing off for {delay:.1f}s")
_logger.info("Backing off for %.1fs", delay)
time.sleep(delay)
return min(current_backoff + 1, 10)

View File

@ -9,7 +9,7 @@ import secrets
from PIL import Image
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
# Use cryptographically secure random number generator
_rng = secrets.SystemRandom()
@ -142,15 +142,15 @@ if __name__ == "__main__":
folder = f"generated_images_{timestamp}"
# Display used parameters
logging.info(
f"Generating {args.num_images} image(s) with the following parameters:"
_logger.info(
"Generating %s image(s) with the following parameters:", args.num_images
)
logging.info(f" Size: {args.size}")
logging.info(f" Colors: {args.colors}")
logging.info(f" Block size: {args.block_size}")
logging.info(f" Base output path: {args.output_path}")
logging.info(f" Quality: {args.quality}")
logging.info(f" Output folder: {folder}")
_logger.info(" Size: %s", args.size)
_logger.info(" Colors: %s", args.colors)
_logger.info(" Block size: %s", args.block_size)
_logger.info(" Base output path: %s", args.output_path)
_logger.info(" Quality: %s", args.quality)
_logger.info(" Output folder: %s", folder)
# Generate the specified number of images
config = ImageConfig(
@ -162,4 +162,4 @@ if __name__ == "__main__":
)
for i in range(1, args.num_images + 1):
output_path = generate_bloated_jpeg(config, i, folder)
logging.info(f"Image {i} saved to {os.path.abspath(output_path)}")
_logger.info("Image %s saved to %s", i, os.path.abspath(output_path))

View File

@ -6,7 +6,7 @@ import re
import secrets
import sys
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
# Use cryptographically secure random number generator
_rng = secrets.SystemRandom()
@ -73,7 +73,7 @@ MIN_ARGS = 2
if __name__ == "__main__":
if len(sys.argv) < MIN_ARGS:
logging.info(
_logger.info(
"Usage: python random_digits.py <number1> <number2> ... "
"[min_percentage max_percentage]"
)
@ -83,7 +83,7 @@ if __name__ == "__main__":
numbers, decimal_counts = parse_input(input_string)
if len(numbers) == 0:
logging.error("No valid numbers provided.")
_logger.error("No valid numbers provided.")
sys.exit(1)
min_percentage = DEFAULT_MIN_PERCENTAGE
@ -103,9 +103,9 @@ if __name__ == "__main__":
format_str = f".{decimal_counts[i]}f"
formatted_numbers.append(float(format(num, format_str)))
logging.info(f"Original numbers: {numbers}")
logging.info(f"Randomized numbers: {formatted_numbers}")
_logger.info("Original numbers: %s", numbers)
_logger.info("Randomized numbers: %s", formatted_numbers)
except ValueError:
logging.exception("Error processing numbers")
logging.exception("Please provide valid numbers and percentages.")
_logger.exception("Error processing numbers")
_logger.exception("Please provide valid numbers and percentages.")
sys.exit(1)

View File

@ -10,7 +10,7 @@ from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.by import By
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
REQUEST_TIMEOUT = 30 # seconds
@ -26,7 +26,7 @@ driver = webdriver.Chrome()
# Open the website from the passed argument
url = args.url
logging.info(f"Opening the website: {url}")
_logger.info("Opening the website: %s", url)
driver.get(url)
@ -38,13 +38,13 @@ def download_image(url: str) -> bool:
# Check if the image already exists
if os.path.exists(image_name):
logging.info(f"Image {image_name} already exists, skipping download.")
_logger.info("Image %s already exists, skipping download.", image_name)
return False
logging.info(f"Downloading image from URL: {url}")
_logger.info("Downloading image from URL: %s", url)
img_data = requests.get(url, timeout=REQUEST_TIMEOUT).content
with open(image_name, "wb") as handler:
handler.write(img_data)
logging.info(f"Image {image_name} downloaded successfully")
_logger.info("Image %s downloaded successfully", image_name)
return True
@ -52,14 +52,14 @@ def download_image(url: str) -> bool:
count = 1
while True:
logging.info(f"Processing image {count}...")
_logger.info("Processing image %s...", count)
# Find the image element by its ID
image_element = driver.find_element(By.ID, "cc-comic")
# Get the image URL from the 'src' attribute
image_url = image_element.get_attribute("src")
logging.info(f"Found image URL: {image_url}")
_logger.info("Found image URL: %s", image_url)
# Download the image if it doesn't already exist
if download_image(image_url):
@ -67,7 +67,7 @@ while True:
# Try to find the 'Next' button by its class
try:
logging.info("Clicking the 'Next' button to load the next image...")
_logger.info("Clicking the 'Next' button to load the next image...")
next_button = driver.find_element(By.CSS_SELECTOR, "a.cc-next")
# Navigate to the URL in the 'href' of the next button
@ -76,9 +76,9 @@ while True:
except NoSuchElementException:
# If the 'Next' button is not found, it means we've reached the last image
logging.info("No 'Next' button found. Reached the end of images.")
_logger.info("No 'Next' button found. Reached the end of images.")
break
# Close the browser
logging.info("All images processed, closing the browser.")
_logger.info("All images processed, closing the browser.")
driver.quit()

View File

@ -11,7 +11,7 @@ import os
import sys
import tkinter as tk
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
# Validation limits for workout data
MAX_DISTANCE_KM = 100
@ -34,7 +34,7 @@ class ScreenLocker:
# Check if already logged today
if self.has_logged_today():
logging.info("Workout already logged today. Skipping screen lock.")
_logger.info("Workout already logged today. Skipping screen lock.")
sys.exit(0)
self.root = tk.Tk()
@ -663,7 +663,7 @@ class ScreenLocker:
with open(self.log_file, "w") as f:
json.dump(logs, f, indent=2)
except OSError as e:
logging.warning(f"Could not save workout log: {e}")
_logger.warning(f"Could not save workout log: {e}")
def close(self) -> None:
"""Close the application and exit."""

View File

@ -30,6 +30,8 @@ import os
import re
import sys
_logger = logging.getLogger(__name__)
try:
import psutil # type: ignore[import-untyped]
except ImportError: # pragma: no cover
@ -40,8 +42,8 @@ try:
import chess.engine
import chess.pgn
except ImportError: # pragma: no cover
logging.exception("Missing dependency. Please install python-chess:")
logging.exception(" pip install -r python_pkg/stockfish_analysis/requirements.txt")
_logger.exception("Missing dependency. Please install python-chess:")
_logger.exception(" pip install -r python_pkg/stockfish_analysis/requirements.txt")
raise
# Memory configuration constants
@ -216,7 +218,6 @@ def _auto_hash_mb(threads_wanted: int, engine_options: dict[str, object]) -> int
def main() -> None:
"""Parse arguments and run chess game analysis."""
logging.basicConfig(level=logging.INFO, format="%(message)s")
ap = argparse.ArgumentParser(
description="Analyze a chess game's moves with Stockfish and rate each move."
)
@ -271,7 +272,7 @@ def main() -> None:
args = ap.parse_args()
if not os.path.isfile(args.file):
logging.error(f"Input not found: {args.file}")
_logger.error(f"Input not found: {args.file}")
sys.exit(1)
with open(args.file, encoding="utf-8", errors="replace") as f:
@ -279,20 +280,20 @@ def main() -> None:
pgn_text = extract_pgn_text(raw)
if not pgn_text:
logging.error("Could not locate PGN text in the file.")
_logger.error("Could not locate PGN text in the file.")
sys.exit(2)
game = chess.pgn.read_game(io.StringIO(pgn_text))
if game is None:
logging.error("Failed to parse PGN.")
_logger.error("Failed to parse PGN.")
sys.exit(3)
# Prepare engine
try:
engine = chess.engine.SimpleEngine.popen_uci([args.engine])
except FileNotFoundError:
logging.exception(f"Could not launch engine at: {args.engine}")
logging.exception(
_logger.exception(f"Could not launch engine at: {args.engine}")
_logger.exception(
"Ensure Stockfish is installed and in PATH, or specify with --engine."
)
sys.exit(4)
@ -318,7 +319,7 @@ def main() -> None:
wanted_threads = max(wanted_threads, min_thr)
engine.configure({"Threads": int(wanted_threads)})
except (AttributeError, TypeError, ValueError):
logging.debug("Failed to configure Threads option")
_logger.debug("Failed to configure Threads option")
# Configure hash table size in MB.
if "Hash" in options:
@ -336,7 +337,7 @@ def main() -> None:
target_hash = max(target_hash, min_hash)
engine.configure({"Hash": int(target_hash)})
except (AttributeError, TypeError, ValueError):
logging.debug("Failed to configure Hash option")
_logger.debug("Failed to configure Hash option")
# MultiPV
effective_mpv = max(1, int(args.multipv))
@ -347,7 +348,7 @@ def main() -> None:
effective_mpv = min(effective_mpv, max_mpv)
engine.configure({"MultiPV": int(effective_mpv)})
except (AttributeError, TypeError, ValueError):
logging.debug("Failed to configure MultiPV option")
_logger.debug("Failed to configure MultiPV option")
# Enable NNUE if the option exists
for nnue_key in ("Use NNUE", "UseNNUE"):
@ -362,13 +363,13 @@ def main() -> None:
limit = chess.engine.Limit(time=max(0.05, args.time))
board = game.board()
logging.info("Game:")
_logger.info("Game:")
white = game.headers.get("White", "White")
black = game.headers.get("Black", "Black")
result = game.headers.get("Result", "*")
logging.info(f" {white} vs {black} Result: {result}")
logging.info("")
logging.info(
_logger.info(f" {white} vs {black} Result: {result}")
_logger.info("")
_logger.info(
"Columns: ply side move played_eval best_eval loss class best_suggestion"
)
# Brief performance summary (best-effort)
@ -385,12 +386,12 @@ def main() -> None:
except (AttributeError, TypeError, ValueError):
hash_show = None
if hash_show is not None:
logging.info(
_logger.info(
f"Using engine options: Threads={thr_show}, "
f"Hash={hash_show} MB, MultiPV={effective_mpv}"
)
else:
logging.info(
_logger.info(
f"Using engine options: Threads={thr_show}, MultiPV={effective_mpv}"
)
@ -401,7 +402,7 @@ def main() -> None:
if args.last_move_only:
# Walk to the last move in the main line and analyze only that ply.
if not node.variations:
logging.warning("No moves found in the game.")
_logger.warning("No moves found in the game.")
else:
while node.variations:
move_node = node.variations[0]
@ -502,7 +503,7 @@ def main() -> None:
classification = classify_cp_loss(cp_loss)
side = "W" if mover_white else "B"
logging.info(
_logger.info(
f"{ply:>3} {side} {san:<8} "
f"{fmt_eval(played_cp, played_mate):>10} "
f"{fmt_eval(best_cp, best_mate):>9} "
@ -617,7 +618,7 @@ def main() -> None:
classification = classify_cp_loss(cp_loss)
side = "W" if mover_white else "B"
logging.info(
_logger.info(
f"{ply:>3} {side} {san:<8} "
f"{fmt_eval(played_cp, played_mate):>10} "
f"{fmt_eval(best_cp, best_mate):>9} "

View File

@ -9,7 +9,7 @@ import shutil # for: shutil.move
# cv2.waitKey; cv2.destroyAllWindows; cv2.IMREAD_COLOR
import cv2
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
IMAGE_EXTENSION = (
".bmp",
@ -59,7 +59,7 @@ for filename in os.listdir(
if (filename.lower()).endswith(
IMAGE_EXTENSION
): # If the file name ends with image extension
logging.info(filename)
_logger.info(filename)
image = cv2.imread(filename, cv2.IMREAD_COLOR)
window_name = filename.split(".")[0]
cv2.namedWindow(window_name) # Window name is the same as image file name