mirror of
https://github.com/kuhyx/testsAndMisc-archive.git
synced 2026-07-04 12:43:15 +02:00
fix: address all pylint warnings
- R0914 (too many locals): Extract helper functions in generate_jpeg.py, engine.py, lichess_api.py, main.py - R0902 (too many instance attributes): Use dataclasses in keyboard_coop - W0621 (redefined outer name): Rename parameters/variables to avoid shadowing - W0201 (attribute outside init): Initialize all attrs in __init__ - R1705 (no-else-return): Remove unnecessary else after return - C1805 (implicit booleaness): Use implicit boolean checks - R1732 (consider-using-with): Use context manager for subprocess.Popen - E0401 (import-error): Add pylint disable for optional deps (selenium, mitmproxy) - Clean up pyproject.toml: update comments, remove redundant settings Pylint score: 10.00/10
This commit is contained in:
parent
52476d1b15
commit
077a31cb54
@ -15,29 +15,29 @@ MAX_REQUESTS = 90
|
||||
REQUEST_TIMEOUT = 30 # seconds
|
||||
|
||||
|
||||
def _download_single_image(url: str) -> None:
|
||||
def _download_single_image(image_url: str) -> None:
|
||||
"""Download and save a single image from URL.
|
||||
|
||||
Args:
|
||||
url: The URL of the image to download.
|
||||
image_url: The URL of the image to download.
|
||||
"""
|
||||
try:
|
||||
# Get the image content
|
||||
response = requests.get(url, timeout=REQUEST_TIMEOUT)
|
||||
response.raise_for_status() # Raise an exception for HTTP errors
|
||||
resp = requests.get(image_url, timeout=REQUEST_TIMEOUT)
|
||||
resp.raise_for_status() # Raise an exception for HTTP errors
|
||||
|
||||
# Extract the image name from the URL
|
||||
image_name = Path(url).name
|
||||
image_name = Path(image_url).name
|
||||
image_path = Path("./CATS2/") / image_name
|
||||
|
||||
# Save the image to the directory
|
||||
with image_path.open("wb") as file:
|
||||
file.write(response.content)
|
||||
file.write(resp.content)
|
||||
|
||||
_logger.info("Saved %s as %s", url, image_path)
|
||||
_logger.info("Saved %s as %s", image_url, image_path)
|
||||
|
||||
except requests.exceptions.RequestException:
|
||||
_logger.exception("Failed to download %s", url)
|
||||
_logger.exception("Failed to download %s", image_url)
|
||||
|
||||
|
||||
requests_send = 0
|
||||
|
||||
@ -3,6 +3,7 @@
|
||||
Players take turns selecting adjacent keys to form valid English words.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
@ -70,6 +71,28 @@ KEY_ADJACENCY = {
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class GameState:
|
||||
"""State for an ongoing keyboard coop game."""
|
||||
|
||||
current_player: int = 0
|
||||
current_word: str = ""
|
||||
selected_letters: list[str] = field(default_factory=list)
|
||||
score: int = 0
|
||||
game_over: bool = False
|
||||
message: str = "Player 1: Choose any letter to start!"
|
||||
|
||||
|
||||
@dataclass
|
||||
class KeyboardState:
|
||||
"""State for the keyboard layout and adjacency."""
|
||||
|
||||
layout: list[list[str]] = field(default_factory=list)
|
||||
available_letters: set[str] = field(default_factory=set)
|
||||
adjacency: dict[str, list[str]] = field(default_factory=dict)
|
||||
positions: dict[str, pygame.Rect] = field(default_factory=dict)
|
||||
|
||||
|
||||
class KeyboardCoopGame:
|
||||
"""Main game class for the keyboard cooperative word game."""
|
||||
|
||||
@ -83,23 +106,18 @@ class KeyboardCoopGame:
|
||||
self.small_font = pygame.font.Font(None, 20)
|
||||
|
||||
# Load dictionary
|
||||
self.dictionary = self.load_dictionary()
|
||||
self.dictionary = self._load_dictionary()
|
||||
|
||||
# Initialize game state
|
||||
self.current_player = 0
|
||||
self.current_word = ""
|
||||
self.selected_letters = []
|
||||
self.score = 0
|
||||
self.game_over = False
|
||||
self.message = "Player 1: Choose any letter to start!"
|
||||
self.state = GameState()
|
||||
|
||||
# Initialize keyboard state
|
||||
self.keyboard = KeyboardState()
|
||||
|
||||
# Generate random keyboard layout and adjacency
|
||||
self.generate_random_keyboard()
|
||||
self._generate_random_keyboard()
|
||||
|
||||
# Key positions
|
||||
self.key_positions = self.calculate_key_positions()
|
||||
|
||||
def load_dictionary(self) -> set[str]:
|
||||
def _load_dictionary(self) -> set[str]:
|
||||
"""Load dictionary from words_dictionary.json file."""
|
||||
try:
|
||||
dictionary_path = Path(__file__).parent / "words_dictionary.json"
|
||||
@ -199,30 +217,33 @@ class KeyboardCoopGame:
|
||||
"good",
|
||||
}
|
||||
|
||||
def generate_random_keyboard(self) -> None:
|
||||
def _generate_random_keyboard(self) -> None:
|
||||
"""Generate a random keyboard layout and calculate adjacencies."""
|
||||
# All 26 letters
|
||||
all_letters = list("abcdefghijklmnopqrstuvwxyz")
|
||||
_rng.shuffle(all_letters)
|
||||
|
||||
# Create random layout with same structure as QWERTY (10-9-7)
|
||||
self.keyboard_layout = [
|
||||
self.keyboard.layout = [
|
||||
all_letters[0:10], # Top row: 10 keys
|
||||
all_letters[10:19], # Middle row: 9 keys
|
||||
all_letters[19:26], # Bottom row: 7 keys
|
||||
]
|
||||
|
||||
# Update available letters
|
||||
self.available_letters = set(all_letters)
|
||||
self.keyboard.available_letters = set(all_letters)
|
||||
|
||||
# Calculate adjacencies based on new layout
|
||||
self.calculate_adjacencies()
|
||||
self._calculate_adjacencies()
|
||||
|
||||
def calculate_adjacencies(self) -> None:
|
||||
# Calculate key positions
|
||||
self.keyboard.positions = self._calculate_key_positions()
|
||||
|
||||
def _calculate_adjacencies(self) -> None:
|
||||
"""Calculate adjacencies based on current keyboard layout."""
|
||||
self.key_adjacency = {}
|
||||
self.keyboard.adjacency = {}
|
||||
|
||||
for row_idx, row in enumerate(self.keyboard_layout):
|
||||
for row_idx, row in enumerate(self.keyboard.layout):
|
||||
for col_idx, letter in enumerate(row):
|
||||
adjacents = []
|
||||
|
||||
@ -243,14 +264,14 @@ class KeyboardCoopGame:
|
||||
new_col = col_idx + dc
|
||||
|
||||
# Check bounds
|
||||
if 0 <= new_row < len(self.keyboard_layout) and 0 <= new_col < len(
|
||||
self.keyboard_layout[new_row]
|
||||
if 0 <= new_row < len(self.keyboard.layout) and 0 <= new_col < len(
|
||||
self.keyboard.layout[new_row]
|
||||
):
|
||||
adjacents.append(self.keyboard_layout[new_row][new_col])
|
||||
adjacents.append(self.keyboard.layout[new_row][new_col])
|
||||
|
||||
self.key_adjacency[letter] = adjacents
|
||||
self.keyboard.adjacency[letter] = adjacents
|
||||
|
||||
def calculate_key_positions(self) -> dict[str, pygame.Rect]:
|
||||
def _calculate_key_positions(self) -> dict[str, pygame.Rect]:
|
||||
"""Calculate the position of each key on screen."""
|
||||
positions: dict[str, pygame.Rect] = {}
|
||||
key_width = 60
|
||||
@ -259,7 +280,7 @@ class KeyboardCoopGame:
|
||||
start_x = 50
|
||||
start_y = 320
|
||||
|
||||
for row_idx, row in enumerate(self.keyboard_layout):
|
||||
for row_idx, row in enumerate(self.keyboard.layout):
|
||||
row_offset = row_idx * 30 # Offset for layout
|
||||
for col_idx, key in enumerate(row):
|
||||
x = start_x + col_idx * (key_width + key_spacing) + row_offset
|
||||
@ -268,109 +289,105 @@ class KeyboardCoopGame:
|
||||
|
||||
return positions
|
||||
|
||||
def get_key_at_position(self, pos: tuple[int, int]) -> str | None:
|
||||
def _get_key_at_position(self, pos: tuple[int, int]) -> str | None:
|
||||
"""Get the key at the given mouse position."""
|
||||
for key, rect in self.key_positions.items():
|
||||
for key, rect in self.keyboard.positions.items():
|
||||
if rect.collidepoint(pos):
|
||||
return key
|
||||
return None
|
||||
|
||||
def is_valid_move(self, letter: str) -> bool:
|
||||
def _is_valid_move(self, letter: str) -> bool:
|
||||
"""Check if the letter is a valid move."""
|
||||
if not self.selected_letters:
|
||||
if not self.state.selected_letters:
|
||||
return True # First move can be any letter
|
||||
|
||||
last_letter = self.selected_letters[-1]
|
||||
return letter in self.key_adjacency[last_letter]
|
||||
last_letter = self.state.selected_letters[-1]
|
||||
return letter in self.keyboard.adjacency[last_letter]
|
||||
|
||||
def is_valid_word(self, word: str) -> bool:
|
||||
def _is_valid_word(self, word: str) -> bool:
|
||||
"""Check if the word is in the dictionary."""
|
||||
return word.lower() in self.dictionary
|
||||
|
||||
def calculate_score(self, word_length: int) -> int:
|
||||
def _calculate_score(self, word_length: int) -> int:
|
||||
"""Calculate score exponentially based on word length."""
|
||||
if word_length < MIN_WORD_LENGTH:
|
||||
return 0
|
||||
return 2 ** (word_length - 2)
|
||||
|
||||
def handle_letter_click(self, letter: str) -> None:
|
||||
def _handle_letter_click(self, letter: str) -> None:
|
||||
"""Handle clicking on a letter."""
|
||||
if letter in self.available_letters and self.is_valid_move(letter):
|
||||
self.selected_letters.append(letter)
|
||||
self.current_word += letter
|
||||
if letter in self.keyboard.available_letters and self._is_valid_move(letter):
|
||||
self.state.selected_letters.append(letter)
|
||||
self.state.current_word += letter
|
||||
|
||||
# Update available letters to include adjacent letters AND the same letter
|
||||
adjacent_letters = (
|
||||
set(self.key_adjacency[letter])
|
||||
if letter in self.key_adjacency
|
||||
set(self.keyboard.adjacency[letter])
|
||||
if letter in self.keyboard.adjacency
|
||||
else set()
|
||||
)
|
||||
adjacent_letters.add(letter) # Allow selecting the same letter again
|
||||
self.available_letters = adjacent_letters
|
||||
self.keyboard.available_letters = adjacent_letters
|
||||
|
||||
# Switch player
|
||||
self.current_player = 1 - self.current_player
|
||||
self.message = (
|
||||
f"Player {self.current_player + 1}: Choose an adjacent letter!"
|
||||
self.state.current_player = 1 - self.state.current_player
|
||||
self.state.message = (
|
||||
f"Player {self.state.current_player + 1}: Choose an adjacent letter!"
|
||||
)
|
||||
|
||||
# If no valid moves available, force word submission
|
||||
if not self.available_letters:
|
||||
self.submit_word()
|
||||
if not self.keyboard.available_letters:
|
||||
self._submit_word()
|
||||
|
||||
def submit_word(self) -> None:
|
||||
def _submit_word(self) -> None:
|
||||
"""Submit the current word and check if it's valid."""
|
||||
if len(self.current_word) >= MIN_WORD_LENGTH and self.is_valid_word(
|
||||
self.current_word
|
||||
if len(self.state.current_word) >= MIN_WORD_LENGTH and self._is_valid_word(
|
||||
self.state.current_word
|
||||
):
|
||||
points = self.calculate_score(len(self.current_word))
|
||||
self.score += points
|
||||
self.message = (
|
||||
f"'{self.current_word}' is valid! +{points} points "
|
||||
f"(Total: {self.score}) - New keyboard!"
|
||||
points = self._calculate_score(len(self.state.current_word))
|
||||
self.state.score += points
|
||||
self.state.message = (
|
||||
f"'{self.state.current_word}' is valid! +{points} points "
|
||||
f"(Total: {self.state.score}) - New keyboard!"
|
||||
)
|
||||
|
||||
# Randomize keyboard layout after scoring
|
||||
self.generate_random_keyboard()
|
||||
self.key_positions = self.calculate_key_positions()
|
||||
self._generate_random_keyboard()
|
||||
|
||||
elif len(self.current_word) < MIN_WORD_LENGTH:
|
||||
self.message = (
|
||||
f"'{self.current_word}' is too short! "
|
||||
elif len(self.state.current_word) < MIN_WORD_LENGTH:
|
||||
self.state.message = (
|
||||
f"'{self.state.current_word}' is too short! "
|
||||
f"(minimum {MIN_WORD_LENGTH} letters)"
|
||||
)
|
||||
else:
|
||||
self.message = f"'{self.current_word}' is not a valid word!"
|
||||
self.state.message = f"'{self.state.current_word}' is not a valid word!"
|
||||
|
||||
# Reset for next word
|
||||
self.current_word = ""
|
||||
self.selected_letters = []
|
||||
self.current_player = 0
|
||||
self.state.current_word = ""
|
||||
self.state.selected_letters = []
|
||||
self.state.current_player = 0
|
||||
|
||||
def reset_game(self) -> None:
|
||||
def _reset_game(self) -> None:
|
||||
"""Reset the game to initial state."""
|
||||
self.current_player = 0
|
||||
self.current_word = ""
|
||||
self.selected_letters = []
|
||||
self.score = 0
|
||||
self.game_over = False
|
||||
self.message = "Player 1: Choose any letter to start!"
|
||||
self.state = GameState()
|
||||
|
||||
# Generate new random keyboard layout
|
||||
self.generate_random_keyboard()
|
||||
self.key_positions = self.calculate_key_positions()
|
||||
self._generate_random_keyboard()
|
||||
|
||||
def draw_keyboard(self) -> None:
|
||||
def _draw_keyboard(self) -> None:
|
||||
"""Draw the virtual keyboard."""
|
||||
mouse_pos = pygame.mouse.get_pos()
|
||||
|
||||
for letter, rect in self.key_positions.items():
|
||||
for letter, rect in self.keyboard.positions.items():
|
||||
# Determine key color
|
||||
if letter in self.selected_letters:
|
||||
if letter in self.state.selected_letters:
|
||||
color = KEY_SELECTED_COLOR
|
||||
elif letter in self.available_letters:
|
||||
elif letter in self.keyboard.available_letters:
|
||||
color = KEY_AVAILABLE_COLOR
|
||||
elif rect.collidepoint(mouse_pos) and letter in self.available_letters:
|
||||
elif (
|
||||
rect.collidepoint(mouse_pos)
|
||||
and letter in self.keyboard.available_letters
|
||||
):
|
||||
color = KEY_HOVER_COLOR
|
||||
else:
|
||||
color = KEY_COLOR
|
||||
@ -384,34 +401,40 @@ class KeyboardCoopGame:
|
||||
text_rect = text.get_rect(center=rect.center)
|
||||
self.screen.blit(text, text_rect)
|
||||
|
||||
def draw_ui(self) -> tuple[pygame.Rect, pygame.Rect]:
|
||||
def _draw_text_line(
|
||||
self, text: str, pos: tuple[int, int], font: pygame.font.Font
|
||||
) -> None:
|
||||
"""Draw a single line of text at the given position."""
|
||||
rendered = font.render(text, True, TEXT_COLOR)
|
||||
self.screen.blit(rendered, pos)
|
||||
|
||||
def _draw_button(self, rect: pygame.Rect, label: str) -> None:
|
||||
"""Draw a button with the given label."""
|
||||
pygame.draw.rect(self.screen, KEY_COLOR, rect)
|
||||
pygame.draw.rect(self.screen, TEXT_COLOR, rect, 2)
|
||||
text = self.small_font.render(label, True, TEXT_COLOR)
|
||||
self.screen.blit(text, text.get_rect(center=rect.center))
|
||||
|
||||
def _draw_ui(self) -> tuple[pygame.Rect, pygame.Rect]:
|
||||
"""Draw the user interface."""
|
||||
# Title
|
||||
title = self.large_font.render("Keyboard Coop Game", True, TEXT_COLOR)
|
||||
self.screen.blit(title, (30, 20))
|
||||
|
||||
# Current word
|
||||
word_text = self.font.render(
|
||||
f"Current Word: {self.current_word.upper()}", True, TEXT_COLOR
|
||||
# Title and status
|
||||
self._draw_text_line("Keyboard Coop Game", (30, 20), self.large_font)
|
||||
self._draw_text_line(
|
||||
f"Current Word: {self.state.current_word.upper()}", (30, 50), self.font
|
||||
)
|
||||
self.screen.blit(word_text, (30, 50))
|
||||
self._draw_text_line(f"Score: {self.state.score}", (30, 75), self.font)
|
||||
|
||||
# Score
|
||||
score_text = self.font.render(f"Score: {self.score}", True, TEXT_COLOR)
|
||||
self.screen.blit(score_text, (30, 75))
|
||||
|
||||
# Current player
|
||||
player_color = PLAYER_COLORS[self.current_player]
|
||||
# Current player with color
|
||||
player_color = PLAYER_COLORS[self.state.current_player]
|
||||
player_text = self.font.render(
|
||||
f"Current Player: {self.current_player + 1}", True, player_color
|
||||
f"Current Player: {self.state.current_player + 1}", True, player_color
|
||||
)
|
||||
self.screen.blit(player_text, (30, 100))
|
||||
|
||||
# Message
|
||||
message_text = self.font.render(self.message, True, TEXT_COLOR)
|
||||
self.screen.blit(message_text, (30, 125))
|
||||
self._draw_text_line(self.state.message, (30, 125), self.font)
|
||||
|
||||
# Instructions - Move to right side and make more compact
|
||||
# Instructions
|
||||
instructions = [
|
||||
"Instructions:",
|
||||
"• Take turns selecting adjacent letters",
|
||||
@ -422,45 +445,32 @@ class KeyboardCoopGame:
|
||||
"• Score increases exponentially",
|
||||
"• Keyboard randomizes after each valid word!",
|
||||
]
|
||||
for idx, instruction in enumerate(instructions):
|
||||
self._draw_text_line(instruction, (750, 20 + idx * 22), self.small_font)
|
||||
|
||||
start_x = 750
|
||||
for i, instruction in enumerate(instructions):
|
||||
inst_text = self.small_font.render(instruction, True, TEXT_COLOR)
|
||||
self.screen.blit(inst_text, (start_x, 20 + i * 22))
|
||||
|
||||
# Enter button - smaller and repositioned
|
||||
# Buttons
|
||||
enter_rect = pygame.Rect(750, 190, 120, 40)
|
||||
pygame.draw.rect(self.screen, KEY_COLOR, enter_rect)
|
||||
pygame.draw.rect(self.screen, TEXT_COLOR, enter_rect, 2)
|
||||
enter_text = self.small_font.render("ENTER", True, TEXT_COLOR)
|
||||
enter_text_rect = enter_text.get_rect(center=enter_rect.center)
|
||||
self.screen.blit(enter_text, enter_text_rect)
|
||||
|
||||
# Reset button - smaller and repositioned
|
||||
reset_rect = pygame.Rect(880, 190, 120, 40)
|
||||
pygame.draw.rect(self.screen, KEY_COLOR, reset_rect)
|
||||
pygame.draw.rect(self.screen, TEXT_COLOR, reset_rect, 2)
|
||||
reset_text = self.small_font.render("RESET", True, TEXT_COLOR)
|
||||
reset_text_rect = reset_text.get_rect(center=reset_rect.center)
|
||||
self.screen.blit(reset_text, reset_text_rect)
|
||||
self._draw_button(enter_rect, "ENTER")
|
||||
self._draw_button(reset_rect, "RESET")
|
||||
|
||||
return enter_rect, reset_rect
|
||||
|
||||
def handle_click(self, pos: tuple[int, int]) -> None:
|
||||
def _handle_click(self, pos: tuple[int, int]) -> None:
|
||||
"""Handle mouse clicks."""
|
||||
# Check if clicked on a key
|
||||
key = self.get_key_at_position(pos)
|
||||
key = self._get_key_at_position(pos)
|
||||
if key:
|
||||
self.handle_letter_click(key)
|
||||
self._handle_letter_click(key)
|
||||
|
||||
# Check UI buttons
|
||||
enter_rect = pygame.Rect(750, 190, 120, 40)
|
||||
reset_rect = pygame.Rect(880, 190, 120, 40)
|
||||
|
||||
if enter_rect.collidepoint(pos):
|
||||
self.submit_word()
|
||||
self._submit_word()
|
||||
elif reset_rect.collidepoint(pos):
|
||||
self.reset_game()
|
||||
self._reset_game()
|
||||
|
||||
def run(self) -> None:
|
||||
"""Main game loop."""
|
||||
@ -472,24 +482,24 @@ class KeyboardCoopGame:
|
||||
running = False
|
||||
elif event.type == pygame.MOUSEBUTTONDOWN:
|
||||
if event.button == 1: # Left click
|
||||
self.handle_click(event.pos)
|
||||
self._handle_click(event.pos)
|
||||
elif event.type == pygame.KEYDOWN:
|
||||
if event.key == pygame.K_RETURN:
|
||||
self.submit_word()
|
||||
self._submit_word()
|
||||
elif event.key == pygame.K_r:
|
||||
self.reset_game()
|
||||
self._reset_game()
|
||||
else:
|
||||
# Handle letter key presses
|
||||
key_name = pygame.key.name(event.key)
|
||||
if len(key_name) == 1 and key_name.isalpha():
|
||||
self.handle_letter_click(key_name.lower())
|
||||
self._handle_letter_click(key_name.lower())
|
||||
|
||||
# Clear screen
|
||||
self.screen.fill(BACKGROUND_COLOR)
|
||||
|
||||
# Draw everything
|
||||
self.draw_keyboard()
|
||||
self.draw_ui()
|
||||
self._draw_keyboard()
|
||||
self._draw_ui()
|
||||
|
||||
# Update display
|
||||
pygame.display.flip()
|
||||
|
||||
@ -52,8 +52,10 @@ class RandomEngine:
|
||||
raise FileNotFoundError(msg)
|
||||
|
||||
def _call_engine(self, args: list[str], *, timeout: float) -> str:
|
||||
# S603: subprocess call is safe - engine_path is validated in __init__
|
||||
# with is_file() and X_OK permission check, args are explicit strings
|
||||
try:
|
||||
proc = subprocess.run( # noqa: S603 - trusted internal C engine binary
|
||||
proc = subprocess.run(
|
||||
[self.engine_path, *args],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
@ -106,6 +108,40 @@ class RandomEngine:
|
||||
|
||||
return move, "from_c_engine"
|
||||
|
||||
def _parse_engine_analysis(
|
||||
self, out: str, legal_moves: list[chess.Move]
|
||||
) -> tuple[float, str, chess.Move | None, str]:
|
||||
"""Parse JSON output from engine analysis.
|
||||
|
||||
Returns (candidate_score, candidate_expl, best_move, best_expl).
|
||||
"""
|
||||
cand_score = 0.0
|
||||
best_move: chess.Move | None = None
|
||||
cand_expl = out
|
||||
best_expl = out
|
||||
|
||||
try:
|
||||
data = json.loads(out)
|
||||
analyze = data.get("analyze") or {}
|
||||
cs = analyze.get("candidate_score")
|
||||
if isinstance(cs, int | float):
|
||||
cand_score = float(cs)
|
||||
chosen = data.get("chosen_move")
|
||||
if isinstance(chosen, str):
|
||||
with contextlib.suppress(Exception):
|
||||
bm = chess.Move.from_uci(chosen)
|
||||
if bm in legal_moves:
|
||||
best_move = bm
|
||||
cand_expl = json.dumps(analyze, ensure_ascii=False)
|
||||
best_expl = json.dumps(
|
||||
{"chosen_index": data.get("chosen_index"), "chosen_move": chosen},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
except (json.JSONDecodeError, KeyError, TypeError):
|
||||
_logger.debug("Failed to parse engine JSON output")
|
||||
|
||||
return cand_score, cand_expl, best_move, best_expl
|
||||
|
||||
def evaluate_proposed_move_with_suggestion(
|
||||
self,
|
||||
board: chess.Board,
|
||||
@ -127,36 +163,4 @@ class RandomEngine:
|
||||
m.uci() for m in legal
|
||||
]
|
||||
out = self._call_engine(args, timeout=max(0.1, time_budget_sec))
|
||||
|
||||
# Try to parse the engine's JSON explanation
|
||||
cand_score = 0.0
|
||||
best_move: chess.Move | None = None
|
||||
cand_expl = out
|
||||
best_expl = out
|
||||
try:
|
||||
data = json.loads(out)
|
||||
# candidate score if provided
|
||||
analyze = data.get("analyze") or {}
|
||||
cs = analyze.get("candidate_score")
|
||||
if isinstance(cs, int | float):
|
||||
cand_score = float(cs)
|
||||
# best move
|
||||
chosen = data.get("chosen_move")
|
||||
if isinstance(chosen, str):
|
||||
with contextlib.suppress(Exception):
|
||||
bm = chess.Move.from_uci(chosen)
|
||||
if bm in board.legal_moves:
|
||||
best_move = bm
|
||||
# Store compact explanations for debugging
|
||||
cand_expl = json.dumps(analyze, ensure_ascii=False)
|
||||
best_expl = json.dumps(
|
||||
{
|
||||
"chosen_index": data.get("chosen_index"),
|
||||
"chosen_move": data.get("chosen_move"),
|
||||
},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
except (json.JSONDecodeError, KeyError, TypeError):
|
||||
_logger.debug("Failed to parse engine JSON output")
|
||||
|
||||
return cand_score, cand_expl, best_move, best_expl
|
||||
return self._parse_engine_analysis(out, legal)
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
"""Lichess API client for bot interactions."""
|
||||
|
||||
from collections.abc import Generator
|
||||
from collections.abc import Generator # pylint: disable=import-error
|
||||
import contextlib
|
||||
from http import HTTPStatus
|
||||
import json
|
||||
@ -120,11 +120,29 @@ class LichessAPI:
|
||||
data = {"reason": reason}
|
||||
self._request("POST", url, data=data, timeout=30, raise_for_status=True)
|
||||
|
||||
def _parse_game_full_event(
|
||||
self, event: dict, board: chess.Board, color: str
|
||||
) -> str:
|
||||
"""Parse gameFull event and update board. Returns determined color."""
|
||||
white_id = event["white"].get("id")
|
||||
black_id = event["black"].get("id")
|
||||
me = self.get_my_user_id()
|
||||
if me == white_id:
|
||||
color = "white"
|
||||
elif me == black_id:
|
||||
color = "black"
|
||||
state = event.get("state", {})
|
||||
moves = state.get("moves", "")
|
||||
if moves:
|
||||
for m in moves.split():
|
||||
with contextlib.suppress(Exception):
|
||||
board.push_uci(m)
|
||||
return color
|
||||
|
||||
def join_game_stream(
|
||||
self, game_id: str, my_color: str | None
|
||||
) -> tuple[chess.Board, str]:
|
||||
"""Deprecated: use stream_game_events and parse initial state there."""
|
||||
# Fallback to initial behavior for compatibility
|
||||
url = f"{LICHESS_API}/api/board/game/stream/{game_id}"
|
||||
board = chess.Board()
|
||||
color = my_color or "white"
|
||||
@ -138,21 +156,8 @@ class LichessAPI:
|
||||
event = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
t = event.get("type")
|
||||
if t == "gameFull":
|
||||
white_id = event["white"].get("id")
|
||||
black_id = event["black"].get("id")
|
||||
me = self.get_my_user_id()
|
||||
if me == white_id:
|
||||
color = "white"
|
||||
elif me == black_id:
|
||||
color = "black"
|
||||
state = event.get("state", {})
|
||||
moves = state.get("moves", "")
|
||||
if moves:
|
||||
for m in moves.split():
|
||||
with contextlib.suppress(Exception):
|
||||
board.push_uci(m)
|
||||
if event.get("type") == "gameFull":
|
||||
color = self._parse_game_full_event(event, board, color)
|
||||
break
|
||||
return board, color
|
||||
|
||||
|
||||
@ -91,6 +91,39 @@ def _init_game_log(game_id: str, bot_version: int) -> Path | None:
|
||||
return game_log_path
|
||||
|
||||
|
||||
def _update_clocks_from_state(state_data: dict[str, object], state: GameState) -> None:
|
||||
"""Update clock values from state data."""
|
||||
wtime = state_data.get("wtime")
|
||||
btime = state_data.get("btime")
|
||||
if state.color == "white":
|
||||
state.my_ms = int(wtime) if isinstance(wtime, int | float) else None
|
||||
state.opp_ms = int(btime) if isinstance(btime, int | float) else None
|
||||
else:
|
||||
state.my_ms = int(btime) if isinstance(btime, int | float) else None
|
||||
state.opp_ms = int(wtime) if isinstance(wtime, int | float) else None
|
||||
inc = state_data.get("winc") or state_data.get("binc")
|
||||
state.inc_ms = int(inc) if isinstance(inc, int | float) else 0
|
||||
|
||||
|
||||
def _extract_player_info(
|
||||
event: dict[str, object], state: GameState, meta: GameMeta, api: LichessAPI
|
||||
) -> None:
|
||||
"""Extract player info and determine color."""
|
||||
white_data = event.get("white", {})
|
||||
black_data = event.get("black", {})
|
||||
if not isinstance(white_data, dict) or not isinstance(black_data, dict):
|
||||
return
|
||||
white_id = white_data.get("id")
|
||||
black_id = black_data.get("id")
|
||||
meta.white_name = str(white_data.get("name") or white_id or "?")
|
||||
meta.black_name = str(black_data.get("name") or black_id or "?")
|
||||
me = api.get_my_user_id()
|
||||
if me == white_id:
|
||||
state.color = "white"
|
||||
elif me == black_id:
|
||||
state.color = "black"
|
||||
|
||||
|
||||
def _extract_game_full_data(
|
||||
event: dict[str, object],
|
||||
state: GameState,
|
||||
@ -108,33 +141,8 @@ def _extract_game_full_data(
|
||||
moves = str(state_data.get("moves", ""))
|
||||
status = state_data.get("status")
|
||||
|
||||
# Update clocks - values are int milliseconds from API
|
||||
wtime = state_data.get("wtime")
|
||||
btime = state_data.get("btime")
|
||||
if state.color == "white":
|
||||
state.my_ms = int(wtime) if isinstance(wtime, int | float) else None
|
||||
state.opp_ms = int(btime) if isinstance(btime, int | float) else None
|
||||
else:
|
||||
state.my_ms = int(btime) if isinstance(btime, int | float) else None
|
||||
state.opp_ms = int(wtime) if isinstance(wtime, int | float) else None
|
||||
inc = state_data.get("winc") or state_data.get("binc")
|
||||
state.inc_ms = int(inc) if isinstance(inc, int | float) else 0
|
||||
|
||||
# Extract player info
|
||||
white_data = event.get("white", {})
|
||||
black_data = event.get("black", {})
|
||||
if isinstance(white_data, dict) and isinstance(black_data, dict):
|
||||
white_id = white_data.get("id")
|
||||
black_id = black_data.get("id")
|
||||
meta.white_name = str(white_data.get("name") or white_id or "?")
|
||||
meta.black_name = str(black_data.get("name") or black_id or "?")
|
||||
|
||||
# Determine color
|
||||
me = api.get_my_user_id()
|
||||
if me == white_id:
|
||||
state.color = "white"
|
||||
elif me == black_id:
|
||||
state.color = "black"
|
||||
_update_clocks_from_state(state_data, state)
|
||||
_extract_player_info(event, state, meta, api)
|
||||
|
||||
# Extract date
|
||||
with contextlib.suppress(Exception):
|
||||
@ -266,7 +274,7 @@ def _handle_move_if_needed(
|
||||
_logger.info("Game %s: turn=%s, my_turn=%s", meta.game_id, turn_str, my_turn)
|
||||
|
||||
# Move policy
|
||||
allow_move = (et == "gameState") or (et == "gameFull" and new_len == 0)
|
||||
allow_move = (et == "gameState") or (et == "gameFull" and not new_len)
|
||||
|
||||
if my_turn and allow_move and not _attempt_move(ctx, state, meta, state.board):
|
||||
return False
|
||||
@ -382,20 +390,30 @@ def _run_analysis_subprocess(
|
||||
"Game %s: starting post-game analysis (%s plies)", game_id, total_plies
|
||||
)
|
||||
|
||||
proc = subprocess.Popen( # noqa: S603 - trusted internal analysis script
|
||||
# S603: subprocess call is safe - analyze_script is validated with is_file()
|
||||
# above and all arguments are explicit strings from trusted sources
|
||||
with subprocess.Popen(
|
||||
[sys.executable, "-u", str(analyze_script), str(log_path)],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
)
|
||||
) as proc:
|
||||
return _process_analysis_output(proc, game_id, total_plies)
|
||||
|
||||
|
||||
def _process_analysis_output(
|
||||
proc: subprocess.Popen[str], game_id: str, total_plies: int
|
||||
) -> str | None:
|
||||
"""Process analysis subprocess output and return analysis text."""
|
||||
analyzed = 0
|
||||
lines: list[str] = []
|
||||
|
||||
# stdout/stderr are guaranteed non-None with PIPE
|
||||
assert proc.stdout is not None # noqa: S101
|
||||
assert proc.stderr is not None # noqa: S101
|
||||
# stdout/stderr are guaranteed non-None with PIPE, but verify at runtime
|
||||
if proc.stdout is None or proc.stderr is None:
|
||||
proc.terminate()
|
||||
msg = "subprocess pipes unexpectedly None"
|
||||
raise RuntimeError(msg)
|
||||
|
||||
for line in proc.stdout:
|
||||
lines.append(line)
|
||||
@ -408,7 +426,7 @@ def _run_analysis_subprocess(
|
||||
ret = proc.wait()
|
||||
analysis_text = "".join(lines)
|
||||
|
||||
if ret != 0:
|
||||
if ret:
|
||||
_logger.warning("Game %s: analysis script exited with code %s", game_id, ret)
|
||||
if stderr_text:
|
||||
analysis_text += "\n[stderr]\n" + stderr_text
|
||||
@ -607,6 +625,23 @@ def _run_event_loop_iteration(
|
||||
return 0
|
||||
|
||||
|
||||
def _safe_event_loop_iteration(
|
||||
ctx: BotContext, game_threads: dict[str, threading.Thread], backoff: int
|
||||
) -> int:
|
||||
"""Run event loop iteration with error handling.
|
||||
|
||||
This wrapper exists to avoid try-except inside while True loop (PERF203).
|
||||
|
||||
Returns:
|
||||
New backoff value.
|
||||
"""
|
||||
try:
|
||||
return _run_event_loop_iteration(ctx, game_threads)
|
||||
except requests.RequestException as e:
|
||||
_logger.warning("Event stream error: %s", e)
|
||||
return backoff_sleep(backoff)
|
||||
|
||||
|
||||
def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) -> None:
|
||||
"""Start the bot and listen for incoming events."""
|
||||
logging.basicConfig(
|
||||
@ -636,11 +671,7 @@ def run_bot(log_level: str = "INFO", *, decline_correspondence: bool = False) ->
|
||||
backoff = 0
|
||||
|
||||
while True:
|
||||
try:
|
||||
backoff = _run_event_loop_iteration(ctx, game_threads)
|
||||
except requests.RequestException as e: # noqa: PERF203 - intentional reconnection loop
|
||||
_logger.warning("Event stream error: %s", e)
|
||||
backoff = backoff_sleep(backoff)
|
||||
backoff = _safe_event_loop_iteration(ctx, game_threads, backoff)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
|
||||
@ -189,23 +189,26 @@ def ensure_unified_test_file(target_path: str | Path) -> None:
|
||||
if Path(target_path).exists():
|
||||
return
|
||||
# Create skeleton unified test file
|
||||
# Note: sys.path manipulation must come before the import that needs it
|
||||
with Path(target_path).open("w", encoding="utf-8") as f:
|
||||
f.write(
|
||||
"""import os
|
||||
'''"""Test cases for blunders detected in past games."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Ensure repo root is importable when running pytest directly
|
||||
# This must be before importing from python_pkg
|
||||
_REPO_ROOT = str(Path(__file__).resolve().parent.parent.parent.parent)
|
||||
if _REPO_ROOT not in sys.path:
|
||||
sys.path.insert(0, _REPO_ROOT)
|
||||
|
||||
import chess
|
||||
import pytest
|
||||
|
||||
# Ensure repo root is importable when running pytest directly
|
||||
REPO_ROOT = str(
|
||||
Path(__file__).resolve().parent.parent.parent
|
||||
)
|
||||
if REPO_ROOT not in sys.path:
|
||||
sys.path.insert(0, REPO_ROOT)
|
||||
from python_pkg.lichess_bot.engine import RandomEngine
|
||||
|
||||
from python_pkg.lichess_bot.engine import RandomEngine # noqa: E402
|
||||
|
||||
BLUNDER_CASES = [
|
||||
BLUNDER_CASES: list[tuple[str, str, str]] = [
|
||||
]
|
||||
|
||||
|
||||
@ -214,7 +217,8 @@ BLUNDER_CASES = [
|
||||
BLUNDER_CASES,
|
||||
ids=[c[2] for c in BLUNDER_CASES],
|
||||
)
|
||||
def test_engine_avoids_logged_blunder(fen, blunder_uci, label):
|
||||
def test_engine_avoids_logged_blunder(fen: str, blunder_uci: str, label: str) -> None:
|
||||
"""Test that the engine avoids a logged blunder move."""
|
||||
board = chess.Board(fen)
|
||||
eng = RandomEngine(depth=4, max_time_sec=1.2)
|
||||
# Prefer explanation variant if available for better failure messages
|
||||
@ -224,7 +228,7 @@ def test_engine_avoids_logged_blunder(fen, blunder_uci, label):
|
||||
try:
|
||||
mv, expl = eng.choose_move_with_explanation(board, time_budget_sec=1.2)
|
||||
move, explanation = mv, expl or ''
|
||||
except Exception:
|
||||
except (RuntimeError, TimeoutError, ValueError):
|
||||
move = eng.choose_move(board)
|
||||
else:
|
||||
move = eng.choose_move(board)
|
||||
@ -234,7 +238,7 @@ def test_engine_avoids_logged_blunder(fen, blunder_uci, label):
|
||||
f'Engine repeated blunder {blunder_uci} at {label}. '
|
||||
f'Explanation: {explanation}'
|
||||
)
|
||||
"""
|
||||
'''
|
||||
)
|
||||
|
||||
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
"""Mitmproxy addon to simulate connection failures."""
|
||||
|
||||
from mitmproxy import http
|
||||
from mitmproxy import http # pylint: disable=import-error
|
||||
|
||||
|
||||
def request(flow: http.HTTPFlow) -> None:
|
||||
|
||||
@ -16,57 +16,56 @@ DEFAULT_MAX_PERCENTAGE = 20
|
||||
|
||||
|
||||
def randomize_numbers(
|
||||
numbers: list[float],
|
||||
min_percentage: float = DEFAULT_MIN_PERCENTAGE,
|
||||
max_percentage: float = DEFAULT_MAX_PERCENTAGE,
|
||||
nums: list[float],
|
||||
min_pct: float = DEFAULT_MIN_PERCENTAGE,
|
||||
max_pct: float = DEFAULT_MAX_PERCENTAGE,
|
||||
) -> list[float]:
|
||||
"""Apply random percentage variation to a list of numbers."""
|
||||
randomized_numbers = []
|
||||
for number in numbers:
|
||||
percentage = _rng.uniform(min_percentage, max_percentage) / 100
|
||||
result = []
|
||||
for number in nums:
|
||||
percentage = _rng.uniform(min_pct, max_pct) / 100
|
||||
if _rng.choice([True, False]):
|
||||
new_number = number + (number * percentage)
|
||||
else:
|
||||
new_number = number - (number * percentage)
|
||||
randomized_numbers.append(new_number)
|
||||
return randomized_numbers
|
||||
result.append(new_number)
|
||||
return result
|
||||
|
||||
|
||||
def parse_input(input_string: str) -> tuple[list[float], list[int]]:
|
||||
def parse_input(text: str) -> tuple[list[float], list[int]]:
|
||||
"""Parse a string of numbers and return floats with decimal counts."""
|
||||
# Replace commas with dots and remove non-numeric characters
|
||||
# except dots, commas, and digits
|
||||
cleaned_input = re.sub(r"[^\d.,\s]", "", input_string).replace(",", ".")
|
||||
cleaned_input = re.sub(r"[^\d.,\s]", "", text).replace(",", ".")
|
||||
# Split the cleaned input into individual numbers
|
||||
number_strings = cleaned_input.split()
|
||||
# Convert the number strings to floats
|
||||
numbers: list[float] = []
|
||||
decimal_counts: list[int] = []
|
||||
for num in number_strings:
|
||||
parsed = _parse_single_number(num)
|
||||
nums: list[float] = []
|
||||
decimals: list[int] = []
|
||||
for num_str in number_strings:
|
||||
parsed = _parse_single_number(num_str)
|
||||
if parsed is not None:
|
||||
float_num, digits_count = parsed
|
||||
numbers.append(float_num)
|
||||
decimal_counts.append(digits_count)
|
||||
return numbers, decimal_counts
|
||||
nums.append(float_num)
|
||||
decimals.append(digits_count)
|
||||
return nums, decimals
|
||||
|
||||
|
||||
def _parse_single_number(num: str) -> tuple[float, int] | None:
|
||||
def _parse_single_number(num_str: str) -> tuple[float, int] | None:
|
||||
"""Parse a single number string into float and decimal count.
|
||||
|
||||
Args:
|
||||
num: The number string to parse.
|
||||
num_str: The number string to parse.
|
||||
|
||||
Returns:
|
||||
Tuple of (float value, decimal count) or None if invalid.
|
||||
"""
|
||||
try:
|
||||
float_num = float(num)
|
||||
digits_count = len(num.split(".")[-1]) if "." in num else 0
|
||||
float_num = float(num_str)
|
||||
digits_count = len(num_str.split(".")[-1]) if "." in num_str else 0
|
||||
except ValueError:
|
||||
return None
|
||||
else:
|
||||
return float_num, digits_count
|
||||
return float_num, digits_count
|
||||
|
||||
|
||||
MIN_ARGS = 2
|
||||
@ -82,7 +81,7 @@ if __name__ == "__main__":
|
||||
input_string = " ".join(sys.argv[1:])
|
||||
numbers, decimal_counts = parse_input(input_string)
|
||||
|
||||
if len(numbers) == 0:
|
||||
if not numbers:
|
||||
_logger.error("No valid numbers provided.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@ -6,10 +6,14 @@ from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
|
||||
# pylint: disable=import-error
|
||||
from selenium import webdriver
|
||||
from selenium.common.exceptions import NoSuchElementException
|
||||
from selenium.webdriver.common.by import By
|
||||
|
||||
# pylint: enable=import-error
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
REQUEST_TIMEOUT = 30 # seconds
|
||||
@ -31,18 +35,18 @@ driver.get(url)
|
||||
|
||||
|
||||
# A function to download images by URL
|
||||
def download_image(url: str) -> bool:
|
||||
def download_image(image_url: str) -> bool:
|
||||
"""Download an image from a URL and save it locally."""
|
||||
# Extract image name from URL
|
||||
image_name = Path(urlparse(url).path).name
|
||||
image_name = Path(urlparse(image_url).path).name
|
||||
image_path = Path(image_name)
|
||||
|
||||
# Check if the image already exists
|
||||
if image_path.exists():
|
||||
_logger.info("Image %s already exists, skipping download.", image_name)
|
||||
return False
|
||||
_logger.info("Downloading image from URL: %s", url)
|
||||
img_data = requests.get(url, timeout=REQUEST_TIMEOUT).content
|
||||
_logger.info("Downloading image from URL: %s", image_url)
|
||||
img_data = requests.get(image_url, timeout=REQUEST_TIMEOUT).content
|
||||
with image_path.open("wb") as handler:
|
||||
handler.write(img_data)
|
||||
_logger.info("Image %s downloaded successfully", image_name)
|
||||
@ -59,11 +63,11 @@ while True:
|
||||
image_element = driver.find_element(By.ID, "cc-comic")
|
||||
|
||||
# Get the image URL from the 'src' attribute
|
||||
image_url = image_element.get_attribute("src")
|
||||
_logger.info("Found image URL: %s", image_url)
|
||||
current_image_url = image_element.get_attribute("src")
|
||||
_logger.info("Found image URL: %s", current_image_url)
|
||||
|
||||
# Download the image if it doesn't already exist
|
||||
if download_image(image_url):
|
||||
if download_image(current_image_url):
|
||||
count += 1 # Increment count only if the image was downloaded
|
||||
|
||||
# Try to find the 'Next' button by its class
|
||||
|
||||
Loading…
Reference in New Issue
Block a user