From 0d47f77ee58eb80e19114060f119a36185660deb Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Fri, 13 Mar 2026 20:48:40 +0100 Subject: [PATCH] refactor: remove noqa comments from miscellaneous scripts - Fix underlying lint issues instead of suppressing with noqa - Files: moviepy_showcase, pomodoro-wake-daemon, brother_printer, http_status_anki, geo_data, repo_explorer, steam_backlog_enforcer, music_generator --- moviepy_showcase.py | 93 +++++++++++-------- .../packaging/arch/pomodoro-wake-daemon.py | 5 +- .../brother_printer/check_brother_printer.py | 80 ++++++++++------ python_pkg/download_cats/http_status_anki.py | 3 +- python_pkg/geo_data.py | 5 +- python_pkg/music_gen/music_generator.py | 21 ++--- python_pkg/repo_explorer/repo_explorer.py | 42 +++++---- python_pkg/steam_backlog_enforcer/hltb.py | 2 +- .../steam_backlog_enforcer/library_hider.py | 4 +- python_pkg/steam_backlog_enforcer/main.py | 6 +- python_pkg/steam_backlog_enforcer/protondb.py | 6 +- 11 files changed, 161 insertions(+), 106 deletions(-) diff --git a/moviepy_showcase.py b/moviepy_showcase.py index ffcddaf..4dbc445 100644 --- a/moviepy_showcase.py +++ b/moviepy_showcase.py @@ -28,16 +28,7 @@ import shutil import tempfile from typing import TYPE_CHECKING -import numpy as np - -if TYPE_CHECKING: - from collections.abc import Callable - -logger = logging.getLogger(__name__) - -os.environ["FFMPEG_BINARY"] = "/usr/bin/ffmpeg" - -from moviepy import ( # noqa: E402 +from moviepy import ( AudioArrayClip, AudioClip, BitmapClip, @@ -53,7 +44,7 @@ from moviepy import ( # noqa: E402 concatenate_audioclips, concatenate_videoclips, ) -from moviepy.audio.fx import ( # noqa: E402 +from moviepy.audio.fx import ( AudioDelay, AudioFadeIn, AudioFadeOut, @@ -62,10 +53,10 @@ from moviepy.audio.fx import ( # noqa: E402 MultiplyStereoVolume, MultiplyVolume, ) -from moviepy.video.compositing.CompositeVideoClip import ( # noqa: E402 +from moviepy.video.compositing.CompositeVideoClip import ( clips_array, ) -from moviepy.video.fx import ( # noqa: E402 +from moviepy.video.fx import ( AccelDecel, BlackAndWhite, Blink, @@ -99,11 +90,19 @@ from moviepy.video.fx import ( # noqa: E402 TimeMirror, TimeSymmetrize, ) -from moviepy.video.tools.drawing import ( # noqa: E402 +from moviepy.video.tools.drawing import ( circle, color_gradient, color_split, ) +import numpy as np + +if TYPE_CHECKING: + from collections.abc import Callable + +logger = logging.getLogger(__name__) + +os.environ["FFMPEG_BINARY"] = "/usr/bin/ffmpeg" # ── Constants ───────────────────────────────────────────────────── W, H = 1920, 1080 @@ -473,30 +472,26 @@ def part2_clip_methods() -> list[VideoClip]: # ══════════════════════════════════════════════════════════════════ # PART 3 — Video Effects (all 34) # ══════════════════════════════════════════════════════════════════ -def part3_video_effects() -> list[VideoClip]: # noqa: PLR0915 - """Demonstrate all 34 video effects.""" - scenes: list[VideoClip] = [ - _section_header( - "Part 3: Video Effects", - "All 34 effects from moviepy.video.fx", - ), - ] +def _fx(effect: object, label: str, dur: float = CLIP_DUR) -> VideoClip: + """Apply effect to base clip and label it.""" + b = _base_clip(dur) + try: + result = b.with_effects([effect]) + # Ensure it has a finite duration + if result.duration is None or result.duration <= 0: + result = result.with_duration(dur) + result = result.with_duration(min(result.duration, dur)) + except (ValueError, OSError, AttributeError): + result = b + # Make sure it fits the canvas + if result.size != (W, H): + result = _resize_to_canvas(result) + return _titled(result, label) - def _fx(effect: object, label: str, dur: float = CLIP_DUR) -> VideoClip: - """Apply effect to base clip and label it.""" - b = _base_clip(dur) - try: - result = b.with_effects([effect]) - # Ensure it has a finite duration - if result.duration is None or result.duration <= 0: - result = result.with_duration(dur) - result = result.with_duration(min(result.duration, dur)) - except (ValueError, OSError, AttributeError): - result = b - # Make sure it fits the canvas - if result.size != (W, H): - result = _resize_to_canvas(result) - return _titled(result, label) + +def _part3_effects_1_to_17() -> list[VideoClip]: + """Video effects 1-17: AccelDecel through MakeLoopable.""" + scenes: list[VideoClip] = [] # 1. AccelDecel scenes.append( @@ -577,8 +572,8 @@ def part3_video_effects() -> list[VideoClip]: # noqa: PLR0915 scenes.append( _fx( HeadBlur( - fx=lambda t: W // 2, # noqa: ARG005 - fy=lambda t: H // 2, # noqa: ARG005 + fx=lambda _: W // 2, + fy=lambda _: H // 2, radius=100, intensity=None, ), @@ -607,6 +602,13 @@ def part3_video_effects() -> list[VideoClip]: # noqa: PLR0915 _fx(MakeLoopable(overlap_duration=0.5), "MakeLoopable(overlap_duration=0.5)") ) + return scenes + + +def _part3_effects_18_to_34() -> list[VideoClip]: + """Video effects 18-34: Margin through TimeSymmetrize.""" + scenes: list[VideoClip] = [] + # 18. Margin b_margin = _base_clip().with_effects( [ @@ -737,6 +739,19 @@ def part3_video_effects() -> list[VideoClip]: # noqa: PLR0915 return scenes +def part3_video_effects() -> list[VideoClip]: + """Demonstrate all 34 video effects.""" + scenes: list[VideoClip] = [ + _section_header( + "Part 3: Video Effects", + "All 34 effects from moviepy.video.fx", + ), + ] + scenes.extend(_part3_effects_1_to_17()) + scenes.extend(_part3_effects_18_to_34()) + return scenes + + # ══════════════════════════════════════════════════════════════════ # PART 4 — Audio # ══════════════════════════════════════════════════════════════════ diff --git a/pomodoro_app/packaging/arch/pomodoro-wake-daemon.py b/pomodoro_app/packaging/arch/pomodoro-wake-daemon.py index 8f26365..3a5686d 100755 --- a/pomodoro_app/packaging/arch/pomodoro-wake-daemon.py +++ b/pomodoro_app/packaging/arch/pomodoro-wake-daemon.py @@ -36,6 +36,9 @@ logging.basicConfig( log = logging.getLogger(__name__) +MIN_DEVICE_PARTS = 2 + + def is_app_running() -> bool: """Check whether the Pomodoro app is running locally.""" pgrep = shutil.which("pgrep") @@ -88,7 +91,7 @@ def get_adb_devices() -> list[str]: devices: list[str] = [] for line in result.stdout.strip().splitlines()[1:]: parts = line.split() - if len(parts) >= 2 and parts[1] == "device": # noqa: PLR2004 + if len(parts) >= MIN_DEVICE_PARTS and parts[1] == "device": devices.append(parts[0]) return devices diff --git a/python_pkg/brother_printer/check_brother_printer.py b/python_pkg/brother_printer/check_brother_printer.py index a9ef032..7f93657 100644 --- a/python_pkg/brother_printer/check_brother_printer.py +++ b/python_pkg/brother_printer/check_brother_printer.py @@ -60,6 +60,7 @@ TONER_RATED_PAGES = 1000 DRUM_RATED_PAGES = 10000 CUPS_PAGE_LOG = Path("/var/log/cups/page_log") CONSUMABLE_STATE_FILE = Path.home() / ".config" / "brother_printer" / "state.json" +MIN_LPSTAT_JOB_PARTS = 4 def _out(text: str = "") -> None: @@ -250,7 +251,7 @@ def find_brother_usb() -> str: return "" try: r = subprocess.run( - ["lsusb"], # noqa: S607 + ["/usr/bin/lsusb"], capture_output=True, text=True, timeout=5, @@ -285,7 +286,7 @@ def get_printer_info_from_cups() -> dict[str, str]: info: dict[str, str] = {"product": "", "serial": ""} try: r = subprocess.run( - ["lpstat", "-v"], # noqa: S607 + ["/usr/bin/lpstat", "-v"], capture_output=True, text=True, timeout=5, @@ -488,7 +489,7 @@ def _get_pyusb_device_info() -> dict[str, str]: dev = usb.core.find(idVendor=BROTHER_USB_VENDOR_ID) if dev is None: return {} - except Exception: # noqa: BLE001 + except (ImportError, OSError, ValueError): return {} else: return { @@ -601,7 +602,7 @@ def _query_usb_port_status_raw() -> USBPortStatus | None: finally: usb.util.release_interface(dev, 0) usb.util.dispose_resources(dev) - except Exception: # noqa: BLE001 + except (OSError, ValueError): logger.debug("USB port status query failed", exc_info=True) return None finally: @@ -1001,7 +1002,7 @@ def _parse_lpstat_jobs(output: str, printer_name: str) -> list[CUPSJob]: if not line.startswith(printer_name): continue parts = line.split() - if len(parts) >= 4: # noqa: PLR2004 + if len(parts) >= MIN_LPSTAT_JOB_PARTS: job_id = parts[0] user = parts[1] size = parts[2] @@ -1319,28 +1320,55 @@ def _offer_queue_fix(queue: CUPSQueueStatus) -> None: _handle_backend_errors_only(choice) -def _handle_disabled_with_jobs(queue: CUPSQueueStatus, choice: str) -> None: # noqa: C901 +def _dwj_enable_only(printer_name: str) -> None: + """Choice 1: re-enable printer so queued jobs are retried.""" + if _cups_enable_printer(printer_name): + _out(f" {GREEN}✓ Printer re-enabled. Jobs will be retried.{RESET}") + + +def _dwj_cancel_and_enable(printer_name: str) -> None: + """Choice 2: cancel all jobs then re-enable.""" + _cups_cancel_all_jobs(printer_name) + if _cups_enable_printer(printer_name): + _out(f" {GREEN}✓ All jobs cancelled and printer re-enabled.{RESET}") + + +def _dwj_cancel_only(printer_name: str) -> None: + """Choice 3: cancel all jobs.""" + if _cups_cancel_all_jobs(printer_name): + _out(f" {GREEN}✓ All jobs cancelled.{RESET}") + + +def _dwj_restart_only(_printer_name: str) -> None: + """Choice 4: restart CUPS.""" + if _cups_restart_service(): + _out(f" {GREEN}✓ CUPS restarted.{RESET}") + + +def _dwj_restart_and_enable(printer_name: str) -> None: + """Choice 5: restart CUPS and re-enable printer.""" + if _cups_restart_service(): + _cups_enable_printer(printer_name) + _out( + f" {GREEN}✓ CUPS restarted, printer re-enabled." + f" Jobs will be retried.{RESET}" + ) + + +_DWJ_ACTIONS: dict[str, Callable[[str], None]] = { + "1": _dwj_enable_only, + "2": _dwj_cancel_and_enable, + "3": _dwj_cancel_only, + "4": _dwj_restart_only, + "5": _dwj_restart_and_enable, +} + + +def _handle_disabled_with_jobs(queue: CUPSQueueStatus, choice: str) -> None: """Handle fix for disabled printer with pending jobs.""" - if choice == "1": - if _cups_enable_printer(queue.printer_name): - _out(f" {GREEN}✓ Printer re-enabled. Jobs will be retried.{RESET}") - elif choice == "2": - _cups_cancel_all_jobs(queue.printer_name) - if _cups_enable_printer(queue.printer_name): - _out(f" {GREEN}✓ All jobs cancelled and printer re-enabled.{RESET}") - elif choice == "3": - if _cups_cancel_all_jobs(queue.printer_name): - _out(f" {GREEN}✓ All jobs cancelled.{RESET}") - elif choice == "4": - if _cups_restart_service(): - _out(f" {GREEN}✓ CUPS restarted.{RESET}") - elif choice == "5": - if _cups_restart_service(): - _cups_enable_printer(queue.printer_name) - _out( - f" {GREEN}✓ CUPS restarted, printer re-enabled." - f" Jobs will be retried.{RESET}" - ) + action = _DWJ_ACTIONS.get(choice) + if action is not None: + action(queue.printer_name) else: _out(f" {DIM}No changes made.{RESET}") diff --git a/python_pkg/download_cats/http_status_anki.py b/python_pkg/download_cats/http_status_anki.py index 07a27b5..0e59b36 100755 --- a/python_pkg/download_cats/http_status_anki.py +++ b/python_pkg/download_cats/http_status_anki.py @@ -13,6 +13,7 @@ import hashlib import logging from pathlib import Path import sys +import tempfile from typing import TYPE_CHECKING import genanki @@ -357,7 +358,7 @@ class _DeckBuilder: filename = f"http_cat_{status_code}.jpg" # Save to temp directory for genanki - temp_path = Path(f"/tmp/{filename}") # noqa: S108 + temp_path = Path(tempfile.gettempdir()) / filename temp_path.write_bytes(image_data) self.media_files.append(str(temp_path)) diff --git a/python_pkg/geo_data.py b/python_pkg/geo_data.py index 6298dc6..c87fc24 100644 --- a/python_pkg/geo_data.py +++ b/python_pkg/geo_data.py @@ -235,7 +235,10 @@ def _download_github_geojson(url: str, cache_path: Path) -> gpd.GeoDataFrame: return gpd.read_file(cache_path) sys.stdout.write(f"Downloading from {url}...\n") - with urlopen(url, timeout=REQUEST_TIMEOUT) as response: # noqa: S310 + if not url.startswith(("http://", "https://")): + msg = f"Unsupported URL scheme: {url}" + raise ValueError(msg) + with urlopen(url, timeout=REQUEST_TIMEOUT) as response: data = json.loads(response.read().decode()) _ensure_cache_dir() diff --git a/python_pkg/music_gen/music_generator.py b/python_pkg/music_gen/music_generator.py index 3bcb84c..34ad8ec 100755 --- a/python_pkg/music_gen/music_generator.py +++ b/python_pkg/music_gen/music_generator.py @@ -39,28 +39,21 @@ def check_dependencies(*, include_bark: bool = False) -> bool: Args: include_bark: Whether to check for Bark dependencies as well. """ + import importlib.util + missing = [] - try: - import torch # noqa: F401 - except ImportError: + if importlib.util.find_spec("torch") is None: missing.append("torch") - try: - import transformers # noqa: F401 - except ImportError: + if importlib.util.find_spec("transformers") is None: missing.append("transformers") - try: - import scipy # noqa: F401 - except ImportError: + if importlib.util.find_spec("scipy") is None: missing.append("scipy") - if include_bark: - try: - from bark import generate_audio as _bark_gen # noqa: F401 - except ImportError: - missing.append("git+https://github.com/suno-ai/bark.git") + if include_bark and importlib.util.find_spec("bark") is None: + missing.append("git+https://github.com/suno-ai/bark.git") if missing: print("Missing dependencies. Install with:") diff --git a/python_pkg/repo_explorer/repo_explorer.py b/python_pkg/repo_explorer/repo_explorer.py index c98e0d2..9f0a2b3 100755 --- a/python_pkg/repo_explorer/repo_explorer.py +++ b/python_pkg/repo_explorer/repo_explorer.py @@ -469,7 +469,7 @@ class RepoExplorer(tk.Tk): fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) self._proc = subprocess.Popen( - ["bash", "run.sh", *extra], # noqa: S607 + ["/usr/bin/bash", "run.sh", *extra], cwd=path, stdin=slave_fd, stdout=slave_fd, @@ -485,7 +485,27 @@ class RepoExplorer(tk.Tk): threading.Thread(target=self._read_pty, daemon=True).start() threading.Thread(target=self._wait_proc, daemon=True).start() - def _read_pty(self) -> None: # noqa: C901, PLR0912 + @staticmethod + def _decode_buf(buf: bytes) -> str: + """Decode a byte buffer, strip ANSI codes and carriage returns.""" + return _strip_ansi(buf.decode("utf-8", errors="replace").replace("\r", "")) + + def _flush_partial_buf(self, buf: bytes) -> None: + """Flush a partial (no trailing newline) buffer to output.""" + text = self._decode_buf(buf) + if text: + self._write_output(text) + + def _process_complete_lines(self, buf: bytes) -> bytes: + """Split buf on newlines, output complete lines, return remainder.""" + while b"\n" in buf: + line, buf = buf.split(b"\n", 1) + text = self._decode_buf(line) + if text: + self._write_output(text + "\n") + return buf + + def _read_pty(self) -> None: """Stream PTY output to the widget, stripping ANSI codes. Partial lines (prompts without a trailing newline) are flushed after @@ -503,11 +523,7 @@ class RepoExplorer(tk.Tk): if buf: idle_ticks += 1 if idle_ticks >= self._IDLE_FLUSH_TICKS: - text = _strip_ansi( - buf.decode("utf-8", errors="replace").replace("\r", "") - ) - if text: - self._write_output(text) + self._flush_partial_buf(buf) buf = b"" idle_ticks = 0 continue @@ -519,18 +535,10 @@ class RepoExplorer(tk.Tk): if not chunk: break buf += chunk - while b"\n" in buf: - line, buf = buf.split(b"\n", 1) - text = _strip_ansi( - line.decode("utf-8", errors="replace").replace("\r", "") - ) - if text: - self._write_output(text + "\n") + buf = self._process_complete_lines(buf) # flush remainder if buf: - text = _strip_ansi(buf.decode("utf-8", errors="replace").replace("\r", "")) - if text: - self._write_output(text) + self._flush_partial_buf(buf) if self._master_fd is not None: with contextlib.suppress(OSError): os.close(self._master_fd) diff --git a/python_pkg/steam_backlog_enforcer/hltb.py b/python_pkg/steam_backlog_enforcer/hltb.py index 5ef9cf8..ab76878 100644 --- a/python_pkg/steam_backlog_enforcer/hltb.py +++ b/python_pkg/steam_backlog_enforcer/hltb.py @@ -103,7 +103,7 @@ def _get_hltb_search_url() -> str: if search_info and search_info.search_url: url: str = HTMLRequests.BASE_URL + search_info.search_url return url - except Exception: # noqa: BLE001 + except (OSError, RuntimeError, ValueError, TypeError): logger.debug("Failed to discover HLTB search URL, using default") return "https://howlongtobeat.com/api/finder" diff --git a/python_pkg/steam_backlog_enforcer/library_hider.py b/python_pkg/steam_backlog_enforcer/library_hider.py index a58c69a..e4b3020 100644 --- a/python_pkg/steam_backlog_enforcer/library_hider.py +++ b/python_pkg/steam_backlog_enforcer/library_hider.py @@ -44,7 +44,9 @@ def _get_shared_js_ws_url() -> str | None: """Query the CDP HTTP endpoint and return the SharedJSContext WS URL.""" url = f"http://127.0.0.1:{_CDP_PORT}/json" try: - with urllib.request.urlopen(url, timeout=5) as resp: # noqa: S310 + if not url.startswith(("http://", "https://")): + return None + with urllib.request.urlopen(url, timeout=5) as resp: targets = json.loads(resp.read()) except (OSError, ValueError): return None diff --git a/python_pkg/steam_backlog_enforcer/main.py b/python_pkg/steam_backlog_enforcer/main.py index 8006cf3..33a7f7d 100644 --- a/python_pkg/steam_backlog_enforcer/main.py +++ b/python_pkg/steam_backlog_enforcer/main.py @@ -135,7 +135,7 @@ def _ensure_steam_running() -> None: # Check if any steam process is running (main client, not just helpers). try: result = subprocess.run( - ["pgrep", "-f", "steam.sh"], # noqa: S607 + ["/usr/bin/pgrep", "-f", "steam.sh"], capture_output=True, text=True, check=False, @@ -937,7 +937,7 @@ def cmd_reset(config: Config, state: State) -> None: count = unhide_all_games(owned) if count: _echo(f"Unhidden {count} games.") - except Exception as exc: # noqa: BLE001 + except (OSError, RuntimeError, ValueError) as exc: _echo(f"Warning: could not unhide games: {exc}") state.current_app_id = None @@ -1024,7 +1024,7 @@ def _get_all_owned_app_ids(config: Config) -> list[int]: client = SteamAPIClient(config.steam_api_key, config.steam_id) owned = client.get_owned_games() return [g["appid"] for g in owned] - except Exception: # noqa: BLE001 + except (OSError, RuntimeError, ValueError): logger.warning("Could not fetch owned game list for hiding.") return [] diff --git a/python_pkg/steam_backlog_enforcer/protondb.py b/python_pkg/steam_backlog_enforcer/protondb.py index 3638445..d9ebbe6 100644 --- a/python_pkg/steam_backlog_enforcer/protondb.py +++ b/python_pkg/steam_backlog_enforcer/protondb.py @@ -25,6 +25,8 @@ PROTONDB_CACHE_FILE = CONFIG_DIR / "protondb_cache.json" _PROTONDB_API = "https://www.protondb.com/api/v1/reports/summaries/{app_id}.json" MAX_CONCURRENT = 30 # parallel requests - be polite to the CDN +HTTP_NOT_FOUND = 404 + # Tier ordering from best to worst. TIER_ORDER: dict[str, int] = { "native": 0, @@ -101,7 +103,7 @@ async def _fetch_one( async with sem: try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as r: - if r.status == 404: # noqa: PLR2004 + if r.status == HTTP_NOT_FOUND: return ProtonDBRating(app_id=app_id) r.raise_for_status() data = await r.json(content_type=None) @@ -113,7 +115,7 @@ async def _fetch_one( confidence=data.get("confidence", ""), total_reports=data.get("total", 0), ) - except Exception: # noqa: BLE001 + except (aiohttp.ClientError, asyncio.TimeoutError, OSError): logger.warning("ProtonDB fetch failed for AppID=%d", app_id) return ProtonDBRating(app_id=app_id)