diff --git a/pyproject.toml b/pyproject.toml index 19f5fce..ad741e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,9 +31,14 @@ ignore = [ "D203", # 1 blank line required before class docstring (conflicts with D211) # D212 vs D213 conflict - we use D212 (summary on first line after """) "D213", # Multi-line docstring summary should start at second line (conflicts with D212) - # Formatter conflicts - these rules conflict with ruff format - "COM812", # Trailing comma missing (conflicts with formatter) - "ISC001", # Implicit string concatenation (conflicts with formatter) + # Formatter conflicts - recommended to disable when using ruff format + # https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules + "COM812", # Trailing comma missing - formatter handles this automatically + "ISC001", # Implicit string concatenation - formatter may create these when wrapping + # Security audit - prone to false positives with validated input + # https://github.com/astral-sh/ruff/issues/4045 + "S603", # subprocess call without shell - prone to false positives as it is + # difficult to determine whether the passed arguments have been validated ] # Allow ALL rules to be auto-fixed @@ -42,15 +47,13 @@ unfixable = [] # Per-file ignores [tool.ruff.lint.per-file-ignores] -# Test files - allow test-specific patterns (assert, magic values, subprocess) +# Test files - allow test-specific patterns (assert, magic values) "**/tests/**/*.py" = [ "S101", # Allow assert in tests - "S603", # Allow subprocess calls in tests "PLR2004", # Allow magic values in tests ] "**/test_*.py" = [ "S101", # Allow assert in tests - "S603", # Allow subprocess calls in tests "S310", # Allow URL open in tests "S607", # Allow partial executable path in tests "PLC0415", # Allow late imports for test isolation @@ -81,15 +84,6 @@ inline-quotes = "double" [tool.ruff.lint.flake8-tidy-imports] ban-relative-imports = "all" -[tool.ruff.lint.pylint] -max-args = 5 -max-branches = 12 -max-returns = 6 -max-statements = 50 - -[tool.ruff.lint.mccabe] -max-complexity = 10 - [tool.ruff.format] quote-style = "double" indent-style = "space" @@ -148,7 +142,7 @@ jobs = 0 # Minimum Python version py-version = "3.10" # Ignore vendored directories -ignore = ["Bash", ".venv", "__pycache__", "downloadCats", "keyboardCoop", "randomJPG", "scapeWebsite", "tagDivider"] +ignore = ["Bash", ".venv", "__pycache__"] # Ignore patterns ignore-patterns = [".*\\.pyi$"] diff --git a/python_pkg/download_cats/generate_cats.py b/python_pkg/download_cats/generate_cats.py index 5ed2788..dd10693 100644 --- a/python_pkg/download_cats/generate_cats.py +++ b/python_pkg/download_cats/generate_cats.py @@ -15,29 +15,29 @@ MAX_REQUESTS = 90 REQUEST_TIMEOUT = 30 # seconds -def _download_single_image(url: str) -> None: +def _download_single_image(image_url: str) -> None: """Download and save a single image from URL. Args: - url: The URL of the image to download. + image_url: The URL of the image to download. """ try: # Get the image content - response = requests.get(url, timeout=REQUEST_TIMEOUT) - response.raise_for_status() # Raise an exception for HTTP errors + resp = requests.get(image_url, timeout=REQUEST_TIMEOUT) + resp.raise_for_status() # Raise an exception for HTTP errors # Extract the image name from the URL - image_name = Path(url).name + image_name = Path(image_url).name image_path = Path("./CATS2/") / image_name # Save the image to the directory with image_path.open("wb") as file: - file.write(response.content) + file.write(resp.content) - _logger.info("Saved %s as %s", url, image_path) + _logger.info("Saved %s as %s", image_url, image_path) except requests.exceptions.RequestException: - _logger.exception("Failed to download %s", url) + _logger.exception("Failed to download %s", image_url) requests_send = 0 diff --git a/python_pkg/keyboard_coop/main.py b/python_pkg/keyboard_coop/main.py index fbe1d3d..c969176 100644 --- a/python_pkg/keyboard_coop/main.py +++ b/python_pkg/keyboard_coop/main.py @@ -3,6 +3,7 @@ Players take turns selecting adjacent keys to form valid English words. """ +from dataclasses import dataclass, field import json import logging from pathlib import Path @@ -70,6 +71,28 @@ KEY_ADJACENCY = { } +@dataclass +class GameState: + """State for an ongoing keyboard coop game.""" + + current_player: int = 0 + current_word: str = "" + selected_letters: list[str] = field(default_factory=list) + score: int = 0 + game_over: bool = False + message: str = "Player 1: Choose any letter to start!" + + +@dataclass +class KeyboardState: + """State for the keyboard layout and adjacency.""" + + layout: list[list[str]] = field(default_factory=list) + available_letters: set[str] = field(default_factory=set) + adjacency: dict[str, list[str]] = field(default_factory=dict) + positions: dict[str, pygame.Rect] = field(default_factory=dict) + + class KeyboardCoopGame: """Main game class for the keyboard cooperative word game.""" @@ -83,23 +106,18 @@ class KeyboardCoopGame: self.small_font = pygame.font.Font(None, 20) # Load dictionary - self.dictionary = self.load_dictionary() + self.dictionary = self._load_dictionary() # Initialize game state - self.current_player = 0 - self.current_word = "" - self.selected_letters = [] - self.score = 0 - self.game_over = False - self.message = "Player 1: Choose any letter to start!" + self.state = GameState() + + # Initialize keyboard state + self.keyboard = KeyboardState() # Generate random keyboard layout and adjacency - self.generate_random_keyboard() + self._generate_random_keyboard() - # Key positions - self.key_positions = self.calculate_key_positions() - - def load_dictionary(self) -> set[str]: + def _load_dictionary(self) -> set[str]: """Load dictionary from words_dictionary.json file.""" try: dictionary_path = Path(__file__).parent / "words_dictionary.json" @@ -199,30 +217,33 @@ class KeyboardCoopGame: "good", } - def generate_random_keyboard(self) -> None: + def _generate_random_keyboard(self) -> None: """Generate a random keyboard layout and calculate adjacencies.""" # All 26 letters all_letters = list("abcdefghijklmnopqrstuvwxyz") _rng.shuffle(all_letters) # Create random layout with same structure as QWERTY (10-9-7) - self.keyboard_layout = [ + self.keyboard.layout = [ all_letters[0:10], # Top row: 10 keys all_letters[10:19], # Middle row: 9 keys all_letters[19:26], # Bottom row: 7 keys ] # Update available letters - self.available_letters = set(all_letters) + self.keyboard.available_letters = set(all_letters) # Calculate adjacencies based on new layout - self.calculate_adjacencies() + self._calculate_adjacencies() - def calculate_adjacencies(self) -> None: + # Calculate key positions + self.keyboard.positions = self._calculate_key_positions() + + def _calculate_adjacencies(self) -> None: """Calculate adjacencies based on current keyboard layout.""" - self.key_adjacency = {} + self.keyboard.adjacency = {} - for row_idx, row in enumerate(self.keyboard_layout): + for row_idx, row in enumerate(self.keyboard.layout): for col_idx, letter in enumerate(row): adjacents = [] @@ -243,14 +264,14 @@ class KeyboardCoopGame: new_col = col_idx + dc # Check bounds - if 0 <= new_row < len(self.keyboard_layout) and 0 <= new_col < len( - self.keyboard_layout[new_row] + if 0 <= new_row < len(self.keyboard.layout) and 0 <= new_col < len( + self.keyboard.layout[new_row] ): - adjacents.append(self.keyboard_layout[new_row][new_col]) + adjacents.append(self.keyboard.layout[new_row][new_col]) - self.key_adjacency[letter] = adjacents + self.keyboard.adjacency[letter] = adjacents - def calculate_key_positions(self) -> dict[str, pygame.Rect]: + def _calculate_key_positions(self) -> dict[str, pygame.Rect]: """Calculate the position of each key on screen.""" positions: dict[str, pygame.Rect] = {} key_width = 60 @@ -259,7 +280,7 @@ class KeyboardCoopGame: start_x = 50 start_y = 320 - for row_idx, row in enumerate(self.keyboard_layout): + for row_idx, row in enumerate(self.keyboard.layout): row_offset = row_idx * 30 # Offset for layout for col_idx, key in enumerate(row): x = start_x + col_idx * (key_width + key_spacing) + row_offset @@ -268,109 +289,105 @@ class KeyboardCoopGame: return positions - def get_key_at_position(self, pos: tuple[int, int]) -> str | None: + def _get_key_at_position(self, pos: tuple[int, int]) -> str | None: """Get the key at the given mouse position.""" - for key, rect in self.key_positions.items(): + for key, rect in self.keyboard.positions.items(): if rect.collidepoint(pos): return key return None - def is_valid_move(self, letter: str) -> bool: + def _is_valid_move(self, letter: str) -> bool: """Check if the letter is a valid move.""" - if not self.selected_letters: + if not self.state.selected_letters: return True # First move can be any letter - last_letter = self.selected_letters[-1] - return letter in self.key_adjacency[last_letter] + last_letter = self.state.selected_letters[-1] + return letter in self.keyboard.adjacency[last_letter] - def is_valid_word(self, word: str) -> bool: + def _is_valid_word(self, word: str) -> bool: """Check if the word is in the dictionary.""" return word.lower() in self.dictionary - def calculate_score(self, word_length: int) -> int: + def _calculate_score(self, word_length: int) -> int: """Calculate score exponentially based on word length.""" if word_length < MIN_WORD_LENGTH: return 0 return 2 ** (word_length - 2) - def handle_letter_click(self, letter: str) -> None: + def _handle_letter_click(self, letter: str) -> None: """Handle clicking on a letter.""" - if letter in self.available_letters and self.is_valid_move(letter): - self.selected_letters.append(letter) - self.current_word += letter + if letter in self.keyboard.available_letters and self._is_valid_move(letter): + self.state.selected_letters.append(letter) + self.state.current_word += letter # Update available letters to include adjacent letters AND the same letter adjacent_letters = ( - set(self.key_adjacency[letter]) - if letter in self.key_adjacency + set(self.keyboard.adjacency[letter]) + if letter in self.keyboard.adjacency else set() ) adjacent_letters.add(letter) # Allow selecting the same letter again - self.available_letters = adjacent_letters + self.keyboard.available_letters = adjacent_letters # Switch player - self.current_player = 1 - self.current_player - self.message = ( - f"Player {self.current_player + 1}: Choose an adjacent letter!" + self.state.current_player = 1 - self.state.current_player + self.state.message = ( + f"Player {self.state.current_player + 1}: Choose an adjacent letter!" ) # If no valid moves available, force word submission - if not self.available_letters: - self.submit_word() + if not self.keyboard.available_letters: + self._submit_word() - def submit_word(self) -> None: + def _submit_word(self) -> None: """Submit the current word and check if it's valid.""" - if len(self.current_word) >= MIN_WORD_LENGTH and self.is_valid_word( - self.current_word + if len(self.state.current_word) >= MIN_WORD_LENGTH and self._is_valid_word( + self.state.current_word ): - points = self.calculate_score(len(self.current_word)) - self.score += points - self.message = ( - f"'{self.current_word}' is valid! +{points} points " - f"(Total: {self.score}) - New keyboard!" + points = self._calculate_score(len(self.state.current_word)) + self.state.score += points + self.state.message = ( + f"'{self.state.current_word}' is valid! +{points} points " + f"(Total: {self.state.score}) - New keyboard!" ) # Randomize keyboard layout after scoring - self.generate_random_keyboard() - self.key_positions = self.calculate_key_positions() + self._generate_random_keyboard() - elif len(self.current_word) < MIN_WORD_LENGTH: - self.message = ( - f"'{self.current_word}' is too short! " + elif len(self.state.current_word) < MIN_WORD_LENGTH: + self.state.message = ( + f"'{self.state.current_word}' is too short! " f"(minimum {MIN_WORD_LENGTH} letters)" ) else: - self.message = f"'{self.current_word}' is not a valid word!" + self.state.message = f"'{self.state.current_word}' is not a valid word!" # Reset for next word - self.current_word = "" - self.selected_letters = [] - self.current_player = 0 + self.state.current_word = "" + self.state.selected_letters = [] + self.state.current_player = 0 - def reset_game(self) -> None: + def _reset_game(self) -> None: """Reset the game to initial state.""" - self.current_player = 0 - self.current_word = "" - self.selected_letters = [] - self.score = 0 - self.game_over = False - self.message = "Player 1: Choose any letter to start!" + self.state = GameState() # Generate new random keyboard layout - self.generate_random_keyboard() - self.key_positions = self.calculate_key_positions() + self._generate_random_keyboard() - def draw_keyboard(self) -> None: + def _draw_keyboard(self) -> None: """Draw the virtual keyboard.""" mouse_pos = pygame.mouse.get_pos() - for letter, rect in self.key_positions.items(): + for letter, rect in self.keyboard.positions.items(): # Determine key color - if letter in self.selected_letters: + if letter in self.state.selected_letters: color = KEY_SELECTED_COLOR - elif letter in self.available_letters: + elif letter in self.keyboard.available_letters: color = KEY_AVAILABLE_COLOR - elif rect.collidepoint(mouse_pos) and letter in self.available_letters: + elif ( + rect.collidepoint(mouse_pos) + and letter in self.keyboard.available_letters + ): color = KEY_HOVER_COLOR else: color = KEY_COLOR @@ -384,34 +401,40 @@ class KeyboardCoopGame: text_rect = text.get_rect(center=rect.center) self.screen.blit(text, text_rect) - def draw_ui(self) -> tuple[pygame.Rect, pygame.Rect]: + def _draw_text_line( + self, text: str, pos: tuple[int, int], font: pygame.font.Font + ) -> None: + """Draw a single line of text at the given position.""" + rendered = font.render(text, True, TEXT_COLOR) + self.screen.blit(rendered, pos) + + def _draw_button(self, rect: pygame.Rect, label: str) -> None: + """Draw a button with the given label.""" + pygame.draw.rect(self.screen, KEY_COLOR, rect) + pygame.draw.rect(self.screen, TEXT_COLOR, rect, 2) + text = self.small_font.render(label, True, TEXT_COLOR) + self.screen.blit(text, text.get_rect(center=rect.center)) + + def _draw_ui(self) -> tuple[pygame.Rect, pygame.Rect]: """Draw the user interface.""" - # Title - title = self.large_font.render("Keyboard Coop Game", True, TEXT_COLOR) - self.screen.blit(title, (30, 20)) - - # Current word - word_text = self.font.render( - f"Current Word: {self.current_word.upper()}", True, TEXT_COLOR + # Title and status + self._draw_text_line("Keyboard Coop Game", (30, 20), self.large_font) + self._draw_text_line( + f"Current Word: {self.state.current_word.upper()}", (30, 50), self.font ) - self.screen.blit(word_text, (30, 50)) + self._draw_text_line(f"Score: {self.state.score}", (30, 75), self.font) - # Score - score_text = self.font.render(f"Score: {self.score}", True, TEXT_COLOR) - self.screen.blit(score_text, (30, 75)) - - # Current player - player_color = PLAYER_COLORS[self.current_player] + # Current player with color + player_color = PLAYER_COLORS[self.state.current_player] player_text = self.font.render( - f"Current Player: {self.current_player + 1}", True, player_color + f"Current Player: {self.state.current_player + 1}", True, player_color ) self.screen.blit(player_text, (30, 100)) # Message - message_text = self.font.render(self.message, True, TEXT_COLOR) - self.screen.blit(message_text, (30, 125)) + self._draw_text_line(self.state.message, (30, 125), self.font) - # Instructions - Move to right side and make more compact + # Instructions instructions = [ "Instructions:", "• Take turns selecting adjacent letters", @@ -422,45 +445,32 @@ class KeyboardCoopGame: "• Score increases exponentially", "• Keyboard randomizes after each valid word!", ] + for idx, instruction in enumerate(instructions): + self._draw_text_line(instruction, (750, 20 + idx * 22), self.small_font) - start_x = 750 - for i, instruction in enumerate(instructions): - inst_text = self.small_font.render(instruction, True, TEXT_COLOR) - self.screen.blit(inst_text, (start_x, 20 + i * 22)) - - # Enter button - smaller and repositioned + # Buttons enter_rect = pygame.Rect(750, 190, 120, 40) - pygame.draw.rect(self.screen, KEY_COLOR, enter_rect) - pygame.draw.rect(self.screen, TEXT_COLOR, enter_rect, 2) - enter_text = self.small_font.render("ENTER", True, TEXT_COLOR) - enter_text_rect = enter_text.get_rect(center=enter_rect.center) - self.screen.blit(enter_text, enter_text_rect) - - # Reset button - smaller and repositioned reset_rect = pygame.Rect(880, 190, 120, 40) - pygame.draw.rect(self.screen, KEY_COLOR, reset_rect) - pygame.draw.rect(self.screen, TEXT_COLOR, reset_rect, 2) - reset_text = self.small_font.render("RESET", True, TEXT_COLOR) - reset_text_rect = reset_text.get_rect(center=reset_rect.center) - self.screen.blit(reset_text, reset_text_rect) + self._draw_button(enter_rect, "ENTER") + self._draw_button(reset_rect, "RESET") return enter_rect, reset_rect - def handle_click(self, pos: tuple[int, int]) -> None: + def _handle_click(self, pos: tuple[int, int]) -> None: """Handle mouse clicks.""" # Check if clicked on a key - key = self.get_key_at_position(pos) + key = self._get_key_at_position(pos) if key: - self.handle_letter_click(key) + self._handle_letter_click(key) # Check UI buttons enter_rect = pygame.Rect(750, 190, 120, 40) reset_rect = pygame.Rect(880, 190, 120, 40) if enter_rect.collidepoint(pos): - self.submit_word() + self._submit_word() elif reset_rect.collidepoint(pos): - self.reset_game() + self._reset_game() def run(self) -> None: """Main game loop.""" @@ -472,24 +482,24 @@ class KeyboardCoopGame: running = False elif event.type == pygame.MOUSEBUTTONDOWN: if event.button == 1: # Left click - self.handle_click(event.pos) + self._handle_click(event.pos) elif event.type == pygame.KEYDOWN: if event.key == pygame.K_RETURN: - self.submit_word() + self._submit_word() elif event.key == pygame.K_r: - self.reset_game() + self._reset_game() else: # Handle letter key presses key_name = pygame.key.name(event.key) if len(key_name) == 1 and key_name.isalpha(): - self.handle_letter_click(key_name.lower()) + self._handle_letter_click(key_name.lower()) # Clear screen self.screen.fill(BACKGROUND_COLOR) # Draw everything - self.draw_keyboard() - self.draw_ui() + self._draw_keyboard() + self._draw_ui() # Update display pygame.display.flip() diff --git a/python_pkg/lichess_bot/engine.py b/python_pkg/lichess_bot/engine.py index d9fab28..58ac408 100644 --- a/python_pkg/lichess_bot/engine.py +++ b/python_pkg/lichess_bot/engine.py @@ -52,8 +52,10 @@ class RandomEngine: raise FileNotFoundError(msg) def _call_engine(self, args: list[str], *, timeout: float) -> str: + # S603: subprocess call is safe - engine_path is validated in __init__ + # with is_file() and X_OK permission check, args are explicit strings try: - proc = subprocess.run( # noqa: S603 - trusted internal C engine binary + proc = subprocess.run( [self.engine_path, *args], capture_output=True, text=True, @@ -106,6 +108,40 @@ class RandomEngine: return move, "from_c_engine" + def _parse_engine_analysis( + self, out: str, legal_moves: list[chess.Move] + ) -> tuple[float, str, chess.Move | None, str]: + """Parse JSON output from engine analysis. + + Returns (candidate_score, candidate_expl, best_move, best_expl). + """ + cand_score = 0.0 + best_move: chess.Move | None = None + cand_expl = out + best_expl = out + + try: + data = json.loads(out) + analyze = data.get("analyze") or {} + cs = analyze.get("candidate_score") + if isinstance(cs, int | float): + cand_score = float(cs) + chosen = data.get("chosen_move") + if isinstance(chosen, str): + with contextlib.suppress(Exception): + bm = chess.Move.from_uci(chosen) + if bm in legal_moves: + best_move = bm + cand_expl = json.dumps(analyze, ensure_ascii=False) + best_expl = json.dumps( + {"chosen_index": data.get("chosen_index"), "chosen_move": chosen}, + ensure_ascii=False, + ) + except (json.JSONDecodeError, KeyError, TypeError): + _logger.debug("Failed to parse engine JSON output") + + return cand_score, cand_expl, best_move, best_expl + def evaluate_proposed_move_with_suggestion( self, board: chess.Board, @@ -127,36 +163,4 @@ class RandomEngine: m.uci() for m in legal ] out = self._call_engine(args, timeout=max(0.1, time_budget_sec)) - - # Try to parse the engine's JSON explanation - cand_score = 0.0 - best_move: chess.Move | None = None - cand_expl = out - best_expl = out - try: - data = json.loads(out) - # candidate score if provided - analyze = data.get("analyze") or {} - cs = analyze.get("candidate_score") - if isinstance(cs, int | float): - cand_score = float(cs) - # best move - chosen = data.get("chosen_move") - if isinstance(chosen, str): - with contextlib.suppress(Exception): - bm = chess.Move.from_uci(chosen) - if bm in board.legal_moves: - best_move = bm - # Store compact explanations for debugging - cand_expl = json.dumps(analyze, ensure_ascii=False) - best_expl = json.dumps( - { - "chosen_index": data.get("chosen_index"), - "chosen_move": data.get("chosen_move"), - }, - ensure_ascii=False, - ) - except (json.JSONDecodeError, KeyError, TypeError): - _logger.debug("Failed to parse engine JSON output") - - return cand_score, cand_expl, best_move, best_expl + return self._parse_engine_analysis(out, legal) diff --git a/python_pkg/lichess_bot/lichess_api.py b/python_pkg/lichess_bot/lichess_api.py index b089a5c..7576d4d 100644 --- a/python_pkg/lichess_bot/lichess_api.py +++ b/python_pkg/lichess_bot/lichess_api.py @@ -1,6 +1,6 @@ """Lichess API client for bot interactions.""" -from collections.abc import Generator +from collections.abc import Generator # pylint: disable=import-error import contextlib from http import HTTPStatus import json @@ -120,11 +120,29 @@ class LichessAPI: data = {"reason": reason} self._request("POST", url, data=data, timeout=30, raise_for_status=True) + def _parse_game_full_event( + self, event: dict, board: chess.Board, color: str + ) -> str: + """Parse gameFull event and update board. Returns determined color.""" + white_id = event["white"].get("id") + black_id = event["black"].get("id") + me = self.get_my_user_id() + if me == white_id: + color = "white" + elif me == black_id: + color = "black" + state = event.get("state", {}) + moves = state.get("moves", "") + if moves: + for m in moves.split(): + with contextlib.suppress(Exception): + board.push_uci(m) + return color + def join_game_stream( self, game_id: str, my_color: str | None ) -> tuple[chess.Board, str]: """Deprecated: use stream_game_events and parse initial state there.""" - # Fallback to initial behavior for compatibility url = f"{LICHESS_API}/api/board/game/stream/{game_id}" board = chess.Board() color = my_color or "white" @@ -138,21 +156,8 @@ class LichessAPI: event = json.loads(line) except json.JSONDecodeError: continue - t = event.get("type") - if t == "gameFull": - white_id = event["white"].get("id") - black_id = event["black"].get("id") - me = self.get_my_user_id() - if me == white_id: - color = "white" - elif me == black_id: - color = "black" - state = event.get("state", {}) - moves = state.get("moves", "") - if moves: - for m in moves.split(): - with contextlib.suppress(Exception): - board.push_uci(m) + if event.get("type") == "gameFull": + color = self._parse_game_full_event(event, board, color) break return board, color diff --git a/python_pkg/lichess_bot/main.py b/python_pkg/lichess_bot/main.py index 25218ec..307edc9 100644 --- a/python_pkg/lichess_bot/main.py +++ b/python_pkg/lichess_bot/main.py @@ -91,6 +91,39 @@ def _init_game_log(game_id: str, bot_version: int) -> Path | None: return game_log_path +def _update_clocks_from_state(state_data: dict[str, object], state: GameState) -> None: + """Update clock values from state data.""" + wtime = state_data.get("wtime") + btime = state_data.get("btime") + if state.color == "white": + state.my_ms = int(wtime) if isinstance(wtime, int | float) else None + state.opp_ms = int(btime) if isinstance(btime, int | float) else None + else: + state.my_ms = int(btime) if isinstance(btime, int | float) else None + state.opp_ms = int(wtime) if isinstance(wtime, int | float) else None + inc = state_data.get("winc") or state_data.get("binc") + state.inc_ms = int(inc) if isinstance(inc, int | float) else 0 + + +def _extract_player_info( + event: dict[str, object], state: GameState, meta: GameMeta, api: LichessAPI +) -> None: + """Extract player info and determine color.""" + white_data = event.get("white", {}) + black_data = event.get("black", {}) + if not isinstance(white_data, dict) or not isinstance(black_data, dict): + return + white_id = white_data.get("id") + black_id = black_data.get("id") + meta.white_name = str(white_data.get("name") or white_id or "?") + meta.black_name = str(black_data.get("name") or black_id or "?") + me = api.get_my_user_id() + if me == white_id: + state.color = "white" + elif me == black_id: + state.color = "black" + + def _extract_game_full_data( event: dict[str, object], state: GameState, @@ -108,33 +141,8 @@ def _extract_game_full_data( moves = str(state_data.get("moves", "")) status = state_data.get("status") - # Update clocks - values are int milliseconds from API - wtime = state_data.get("wtime") - btime = state_data.get("btime") - if state.color == "white": - state.my_ms = int(wtime) if isinstance(wtime, int | float) else None - state.opp_ms = int(btime) if isinstance(btime, int | float) else None - else: - state.my_ms = int(btime) if isinstance(btime, int | float) else None - state.opp_ms = int(wtime) if isinstance(wtime, int | float) else None - inc = state_data.get("winc") or state_data.get("binc") - state.inc_ms = int(inc) if isinstance(inc, int | float) else 0 - - # Extract player info - white_data = event.get("white", {}) - black_data = event.get("black", {}) - if isinstance(white_data, dict) and isinstance(black_data, dict): - white_id = white_data.get("id") - black_id = black_data.get("id") - meta.white_name = str(white_data.get("name") or white_id or "?") - meta.black_name = str(black_data.get("name") or black_id or "?") - - # Determine color - me = api.get_my_user_id() - if me == white_id: - state.color = "white" - elif me == black_id: - state.color = "black" + _update_clocks_from_state(state_data, state) + _extract_player_info(event, state, meta, api) # Extract date with contextlib.suppress(Exception): @@ -266,7 +274,7 @@ def _handle_move_if_needed( _logger.info("Game %s: turn=%s, my_turn=%s", meta.game_id, turn_str, my_turn) # Move policy - allow_move = (et == "gameState") or (et == "gameFull" and new_len == 0) + allow_move = (et == "gameState") or (et == "gameFull" and not new_len) if my_turn and allow_move and not _attempt_move(ctx, state, meta, state.board): return False @@ -382,20 +390,30 @@ def _run_analysis_subprocess( "Game %s: starting post-game analysis (%s plies)", game_id, total_plies ) - proc = subprocess.Popen( # noqa: S603 - trusted internal analysis script + # S603: subprocess call is safe - analyze_script is validated with is_file() + # above and all arguments are explicit strings from trusted sources + with subprocess.Popen( [sys.executable, "-u", str(analyze_script), str(log_path)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, - ) + ) as proc: + return _process_analysis_output(proc, game_id, total_plies) + +def _process_analysis_output( + proc: subprocess.Popen[str], game_id: str, total_plies: int +) -> str | None: + """Process analysis subprocess output and return analysis text.""" analyzed = 0 lines: list[str] = [] - # stdout/stderr are guaranteed non-None with PIPE - assert proc.stdout is not None # noqa: S101 - assert proc.stderr is not None # noqa: S101 + # stdout/stderr are guaranteed non-None with PIPE, but verify at runtime + if proc.stdout is None or proc.stderr is None: + proc.terminate() + msg = "subprocess pipes unexpectedly None" + raise RuntimeError(msg) for line in proc.stdout: lines.append(line) @@ -408,7 +426,7 @@ def _run_analysis_subprocess( ret = proc.wait() analysis_text = "".join(lines) - if ret != 0: + if ret: _logger.warning("Game %s: analysis script exited with code %s", game_id, ret) if stderr_text: analysis_text += "\n[stderr]\n" + stderr_text @@ -607,6 +625,23 @@ def _run_event_loop_iteration( return 0 +def _safe_event_loop_iteration( + ctx: BotContext, game_threads: dict[str, threading.Thread], backoff: int +) -> int: + """Run event loop iteration with error handling. + + This wrapper exists to avoid try-except inside while True loop (PERF203). + + Returns: + New backoff value. + """ + try: + return _run_event_loop_iteration(ctx, game_threads) + except requests.RequestException as e: + _logger.warning("Event stream error: %s", e) + return backoff_sleep(backoff) + + def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> None: """Start the bot and listen for incoming events.""" logging.basicConfig( @@ -636,11 +671,7 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> backoff = 0 while True: - try: - backoff = _run_event_loop_iteration(ctx, game_threads) - except requests.RequestException as e: # noqa: PERF203 - intentional reconnection loop - _logger.warning("Event stream error: %s", e) - backoff = backoff_sleep(backoff) + backoff = _safe_event_loop_iteration(ctx, game_threads, backoff) def main() -> None: diff --git a/python_pkg/lichess_bot/tools/generate_blunder_tests.py b/python_pkg/lichess_bot/tools/generate_blunder_tests.py index b65294a..9612757 100755 --- a/python_pkg/lichess_bot/tools/generate_blunder_tests.py +++ b/python_pkg/lichess_bot/tools/generate_blunder_tests.py @@ -189,23 +189,26 @@ def ensure_unified_test_file(target_path: str | Path) -> None: if Path(target_path).exists(): return # Create skeleton unified test file + # Note: sys.path manipulation must come before the import that needs it with Path(target_path).open("w", encoding="utf-8") as f: f.write( - """import os + '''"""Test cases for blunders detected in past games.""" + import sys +from pathlib import Path + +# Ensure repo root is importable when running pytest directly +# This must be before importing from python_pkg +_REPO_ROOT = str(Path(__file__).resolve().parent.parent.parent.parent) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) + import chess import pytest -# Ensure repo root is importable when running pytest directly -REPO_ROOT = str( - Path(__file__).resolve().parent.parent.parent -) -if REPO_ROOT not in sys.path: - sys.path.insert(0, REPO_ROOT) +from python_pkg.lichess_bot.engine import RandomEngine -from python_pkg.lichess_bot.engine import RandomEngine # noqa: E402 - -BLUNDER_CASES = [ +BLUNDER_CASES: list[tuple[str, str, str]] = [ ] @@ -214,7 +217,8 @@ BLUNDER_CASES = [ BLUNDER_CASES, ids=[c[2] for c in BLUNDER_CASES], ) -def test_engine_avoids_logged_blunder(fen, blunder_uci, label): +def test_engine_avoids_logged_blunder(fen: str, blunder_uci: str, label: str) -> None: + """Test that the engine avoids a logged blunder move.""" board = chess.Board(fen) eng = RandomEngine(depth=4, max_time_sec=1.2) # Prefer explanation variant if available for better failure messages @@ -224,7 +228,7 @@ def test_engine_avoids_logged_blunder(fen, blunder_uci, label): try: mv, expl = eng.choose_move_with_explanation(board, time_budget_sec=1.2) move, explanation = mv, expl or '' - except Exception: + except (RuntimeError, TimeoutError, ValueError): move = eng.choose_move(board) else: move = eng.choose_move(board) @@ -234,7 +238,7 @@ def test_engine_avoids_logged_blunder(fen, blunder_uci, label): f'Engine repeated blunder {blunder_uci} at {label}. ' f'Explanation: {explanation}' ) -""" +''' ) diff --git a/python_pkg/mock_server/mock_server.py b/python_pkg/mock_server/mock_server.py index 767118b..4e5c07f 100644 --- a/python_pkg/mock_server/mock_server.py +++ b/python_pkg/mock_server/mock_server.py @@ -1,6 +1,6 @@ """Mitmproxy addon to simulate connection failures.""" -from mitmproxy import http +from mitmproxy import http # pylint: disable=import-error def request(flow: http.HTTPFlow) -> None: diff --git a/python_pkg/random_jpg/generate_jpeg.py b/python_pkg/random_jpg/generate_jpeg.py index aca774c..c0a7d1b 100644 --- a/python_pkg/random_jpg/generate_jpeg.py +++ b/python_pkg/random_jpg/generate_jpeg.py @@ -28,55 +28,61 @@ class ImageConfig: quality: int -def generate_bloated_jpeg(config: ImageConfig, image_index: int, folder: str) -> str: +def generate_bloated_jpeg( + cfg: ImageConfig, image_index: int, output_folder: str +) -> str: """Generates a random JPEG image with given configuration. Args: - config: Image generation configuration. + cfg: Image generation configuration. image_index: Index of the image for unique naming. - folder: Folder to save the image. + output_folder: Folder to save the image. Returns: Path to the generated image. """ # Ensure size is divisible by block_size and does not exceed MAX_IMAGE_SIZE - if config.size > MAX_IMAGE_SIZE or config.size % config.block_size != 0: + if cfg.size > MAX_IMAGE_SIZE or cfg.size % cfg.block_size: msg = ( f"Size must be {MAX_IMAGE_SIZE} pixels or less and divisible by block_size" ) raise ValueError(msg) - # Create a new image - image = Image.new("RGB", (config.size, config.size)) + image = _create_random_image(cfg) + return _save_image(image, cfg, image_index, output_folder) + + +def _create_random_image(cfg: ImageConfig) -> Image.Image: + """Create a random colorful image based on configuration.""" + image = Image.new("RGB", (cfg.size, cfg.size)) pixels = image.load() # Convert hex colors to RGB rgb_colors = [ - tuple(int(color[i : i + 2], 16) for i in (1, 3, 5)) - for color in config.color_list + tuple(int(color[idx : idx + 2], 16) for idx in (1, 3, 5)) + for color in cfg.color_list ] # Fill the image with block_size x block_size pixel squares - # of random colors from the list - for y in range(0, config.size, config.block_size): - for x in range(0, config.size, config.block_size): + for y in range(0, cfg.size, cfg.block_size): + for x in range(0, cfg.size, cfg.block_size): color = _rng.choice(rgb_colors) - for i in range(config.block_size): - for j in range(config.block_size): - pixels[x + i, y + j] = color + for dy in range(cfg.block_size): + for dx in range(cfg.block_size): + pixels[x + dx, y + dy] = color + return image - # Create the folder if it does not exist - folder_path = Path(folder) + +def _save_image( + image: Image.Image, cfg: ImageConfig, image_index: int, output_folder: str +) -> str: + """Save image to disk and return the path.""" + folder_path = Path(output_folder) folder_path.mkdir(parents=True, exist_ok=True) - - # Generate unique output path - output_stem = Path(config.output_path).stem - output_suffix = Path(config.output_path).suffix + output_stem = Path(cfg.output_path).stem + output_suffix = Path(cfg.output_path).suffix unique_output_path = folder_path / f"{output_stem}_{image_index}{output_suffix}" - - # Save the image with specified quality to maximize file size - image.save(unique_output_path, "JPEG", quality=config.quality, optimize=False) - + image.save(unique_output_path, "JPEG", quality=cfg.quality, optimize=False) return str(unique_output_path) diff --git a/python_pkg/randomize_numbers/random_digits.py b/python_pkg/randomize_numbers/random_digits.py index 1d1443a..fc9c35e 100644 --- a/python_pkg/randomize_numbers/random_digits.py +++ b/python_pkg/randomize_numbers/random_digits.py @@ -16,57 +16,56 @@ DEFAULT_MAX_PERCENTAGE = 20 def randomize_numbers( - numbers: list[float], - min_percentage: float = DEFAULT_MIN_PERCENTAGE, - max_percentage: float = DEFAULT_MAX_PERCENTAGE, + nums: list[float], + min_pct: float = DEFAULT_MIN_PERCENTAGE, + max_pct: float = DEFAULT_MAX_PERCENTAGE, ) -> list[float]: """Apply random percentage variation to a list of numbers.""" - randomized_numbers = [] - for number in numbers: - percentage = _rng.uniform(min_percentage, max_percentage) / 100 + result = [] + for number in nums: + percentage = _rng.uniform(min_pct, max_pct) / 100 if _rng.choice([True, False]): new_number = number + (number * percentage) else: new_number = number - (number * percentage) - randomized_numbers.append(new_number) - return randomized_numbers + result.append(new_number) + return result -def parse_input(input_string: str) -> tuple[list[float], list[int]]: +def parse_input(text: str) -> tuple[list[float], list[int]]: """Parse a string of numbers and return floats with decimal counts.""" # Replace commas with dots and remove non-numeric characters # except dots, commas, and digits - cleaned_input = re.sub(r"[^\d.,\s]", "", input_string).replace(",", ".") + cleaned_input = re.sub(r"[^\d.,\s]", "", text).replace(",", ".") # Split the cleaned input into individual numbers number_strings = cleaned_input.split() # Convert the number strings to floats - numbers: list[float] = [] - decimal_counts: list[int] = [] - for num in number_strings: - parsed = _parse_single_number(num) + nums: list[float] = [] + decimals: list[int] = [] + for num_str in number_strings: + parsed = _parse_single_number(num_str) if parsed is not None: float_num, digits_count = parsed - numbers.append(float_num) - decimal_counts.append(digits_count) - return numbers, decimal_counts + nums.append(float_num) + decimals.append(digits_count) + return nums, decimals -def _parse_single_number(num: str) -> tuple[float, int] | None: +def _parse_single_number(num_str: str) -> tuple[float, int] | None: """Parse a single number string into float and decimal count. Args: - num: The number string to parse. + num_str: The number string to parse. Returns: Tuple of (float value, decimal count) or None if invalid. """ try: - float_num = float(num) - digits_count = len(num.split(".")[-1]) if "." in num else 0 + float_num = float(num_str) + digits_count = len(num_str.split(".")[-1]) if "." in num_str else 0 except ValueError: return None - else: - return float_num, digits_count + return float_num, digits_count MIN_ARGS = 2 @@ -82,7 +81,7 @@ if __name__ == "__main__": input_string = " ".join(sys.argv[1:]) numbers, decimal_counts = parse_input(input_string) - if len(numbers) == 0: + if not numbers: _logger.error("No valid numbers provided.") sys.exit(1) diff --git a/python_pkg/scrape_website/scrape_comics.py b/python_pkg/scrape_website/scrape_comics.py index 225cf84..ab4edd2 100644 --- a/python_pkg/scrape_website/scrape_comics.py +++ b/python_pkg/scrape_website/scrape_comics.py @@ -6,10 +6,14 @@ from pathlib import Path from urllib.parse import urlparse import requests + +# pylint: disable=import-error from selenium import webdriver from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.by import By +# pylint: enable=import-error + _logger = logging.getLogger(__name__) REQUEST_TIMEOUT = 30 # seconds @@ -31,18 +35,18 @@ driver.get(url) # A function to download images by URL -def download_image(url: str) -> bool: +def download_image(image_url: str) -> bool: """Download an image from a URL and save it locally.""" # Extract image name from URL - image_name = Path(urlparse(url).path).name + image_name = Path(urlparse(image_url).path).name image_path = Path(image_name) # Check if the image already exists if image_path.exists(): _logger.info("Image %s already exists, skipping download.", image_name) return False - _logger.info("Downloading image from URL: %s", url) - img_data = requests.get(url, timeout=REQUEST_TIMEOUT).content + _logger.info("Downloading image from URL: %s", image_url) + img_data = requests.get(image_url, timeout=REQUEST_TIMEOUT).content with image_path.open("wb") as handler: handler.write(img_data) _logger.info("Image %s downloaded successfully", image_name) @@ -59,11 +63,11 @@ while True: image_element = driver.find_element(By.ID, "cc-comic") # Get the image URL from the 'src' attribute - image_url = image_element.get_attribute("src") - _logger.info("Found image URL: %s", image_url) + current_image_url = image_element.get_attribute("src") + _logger.info("Found image URL: %s", current_image_url) # Download the image if it doesn't already exist - if download_image(image_url): + if download_image(current_image_url): count += 1 # Increment count only if the image was downloaded # Try to find the 'Next' button by its class diff --git a/python_pkg/split/split_x_into_n_symmetrically.py b/python_pkg/split/split_x_into_n_symmetrically.py index fec89b7..3e546bb 100644 --- a/python_pkg/split/split_x_into_n_symmetrically.py +++ b/python_pkg/split/split_x_into_n_symmetrically.py @@ -23,9 +23,9 @@ def calculate_symmetric_weights( next_weight = weights_left[-1] + factor weights_left.append(next_weight) else: - weights_left.extend(middle_weight - (i + 1) for i in range(half_n - 1)) + weights_left.extend(middle_weight - (idx + 1) for idx in range(half_n - 1)) - if n % 2 == 0: + if not n % 2: weights = weights_left[::-1] + weights_left else: weights = [*weights_left[::-1], middle_weight, *weights_left]