diff --git a/python_pkg/brother_printer/check_brother_printer.py b/python_pkg/brother_printer/check_brother_printer.py index a97280a..c7c5e23 100644 --- a/python_pkg/brother_printer/check_brother_printer.py +++ b/python_pkg/brother_printer/check_brother_printer.py @@ -2,7 +2,8 @@ Supports both USB-connected and network printers on Arch Linux. -USB: Queries via PJL over /dev/usb/lp* (requires root). +USB: Queries via PJL over /dev/usb/lp* (requires root + usblp module). + Falls back to CUPS IPP status when usblp is unavailable (no root needed). Network: Queries via SNMP (requires net-snmp). Usage: @@ -16,6 +17,7 @@ from __future__ import annotations import contextlib from dataclasses import dataclass, field import fcntl +import json import logging import os from pathlib import Path @@ -51,6 +53,12 @@ SUPPLY_LOW_PCT = 10 SUPPLY_WARN_PCT = 25 PROGRESS_BAR_WIDTH = 25 +# Brother HL-1110 consumable page ratings +TONER_RATED_PAGES = 1000 +DRUM_RATED_PAGES = 10000 +CUPS_PAGE_LOG = Path("/var/log/cups/page_log") +CONSUMABLE_STATE_FILE = Path.home() / ".config" / "brother_printer" / "state.json" + def _out(text: str = "") -> None: """Write a line to stdout.""" @@ -173,6 +181,30 @@ class CUPSQueueStatus: last_backend_error: str = "" +@dataclass +class PageCountEstimate: + """Estimated consumable life based on CUPS page count.""" + + total_pages: int = 0 + toner_pages: int = 0 + drum_pages: int = 0 + toner_pct_remaining: int = 100 + drum_pct_remaining: int = 100 + toner_exhausted: bool = False + toner_low: bool = False + drum_near_end: bool = False + + +@dataclass +class USBPortStatus: + """IEEE 1284 USB printer port status bits.""" + + paper_empty: bool = False + online: bool = True + error: bool = False + raw_byte: int = 0 + + @dataclass class USBResult: """Result from a USB PJL query.""" @@ -186,6 +218,7 @@ class USBResult: online: str = "" economode: str = "" error: str = "" + port_status: USBPortStatus | None = None @dataclass @@ -395,7 +428,7 @@ def query_usb_pjl(max_retries: int = 2) -> USBResult: """Query a Brother printer via PJL over /dev/usb/lp*.""" dev_path = find_usb_printer_dev() if not dev_path: - return USBResult(error="No USB printer device found at /dev/usb/lp*") + return _query_usb_via_cups() result = _init_usb_result(dev_path) if not os.access(dev_path, os.R_OK | os.W_OK): @@ -415,6 +448,424 @@ def query_usb_pjl(max_retries: int = 2) -> USBResult: return result +# ── CUPS-based USB fallback ────────────────────────────────────────── +# When the usblp kernel module is not available, /dev/usb/lp* devices +# don't exist even though CUPS can print fine via its own libusb backend. +# These functions query printer status through CUPS IPP instead. + +_CUPS_REASONS_TO_STATUS: dict[str, int] = { + "paused": 10023, + "moving-to-paused": 10023, + "toner-low": 30010, + "toner-empty": 40310, + "marker-supply-low": 30010, + "marker-supply-empty": 40310, + "media-empty": 40302, + "media-needed": 40302, + "media-jam": 40000, + "cover-open": 41000, + "door-open": 41000, + "input-tray-missing": 40300, +} + +_CUPS_STATE_TO_STATUS: dict[str, int] = { + "idle": 10001, + "processing": 10007, + "stopped": 10023, +} + + +BROTHER_USB_VENDOR_ID = 0x04F9 + + +def _get_pyusb_device_info() -> dict[str, str]: + """Get Brother USB printer info via pyusb (no interface claim needed).""" + try: + import usb.core + + dev = usb.core.find(idVendor=BROTHER_USB_VENDOR_ID) + if dev is None: + return {} + except Exception: # noqa: BLE001 + return {} + else: + return { + "product": dev.product or "", + "serial": dev.serial_number or "", + } + + +def _stop_cups() -> bool: + """Stop CUPS service and sockets. Returns True on success.""" + systemctl = shutil.which("systemctl") + if not systemctl: + return False + try: + subprocess.run( + [systemctl, "stop", "cups.service", "cups.socket", "cups.path"], + timeout=15, + check=True, + ) + time.sleep(2) + except (subprocess.TimeoutExpired, subprocess.CalledProcessError, OSError): + return False + return True + + +def _is_cups_scheduler_running() -> bool: + """Check if the CUPS scheduler is currently running.""" + lpstat = shutil.which("lpstat") + if not lpstat: + return False + try: + r = subprocess.run( + [lpstat, "-r"], + capture_output=True, + text=True, + timeout=3, + check=False, + ) + return ( + "is running" in r.stdout.lower() and "not running" not in r.stdout.lower() + ) + except (subprocess.TimeoutExpired, OSError): + return False + + +def _start_cups() -> bool: + """Start CUPS service, socket, and path units. Returns True on success.""" + systemctl = shutil.which("systemctl") + if not systemctl: + return False + try: + subprocess.run( + [systemctl, "start", "cups.service", "cups.socket", "cups.path"], + timeout=15, + check=True, + ) + except (subprocess.TimeoutExpired, subprocess.CalledProcessError, OSError): + return False + # Verify CUPS is actually responding + for _ in range(10): + if _is_cups_scheduler_running(): + return True + time.sleep(1) + return False + + +def _query_usb_port_status_raw() -> USBPortStatus | None: + """Query USB printer port status via pyusb control transfer. + + Requires root and temporarily stops CUPS to access the USB device. + Returns None if the query fails. + """ + try: + import usb.core + import usb.util + except ImportError: + return None + + dev = usb.core.find(idVendor=BROTHER_USB_VENDOR_ID) + if dev is None: + return None + + if not _stop_cups(): + return None + + try: + dev.reset() + time.sleep(2) + dev = usb.core.find(idVendor=BROTHER_USB_VENDOR_ID) + if dev is None: + return None + + try: + if dev.is_kernel_driver_active(0): + dev.detach_kernel_driver(0) + except (usb.core.USBError, NotImplementedError): + pass + + usb.util.claim_interface(dev, 0) + try: + # USB Printer Class GET_PORT_STATUS (bRequest=0x01) + raw = dev.ctrl_transfer(0xA1, 0x01, 0, 0, 1, timeout=5000) + port_byte = raw[0] + return USBPortStatus( + paper_empty=bool(port_byte & 0x20), + online=bool(port_byte & 0x10), + error=not bool(port_byte & 0x08), + raw_byte=port_byte, + ) + finally: + usb.util.release_interface(dev, 0) + usb.util.dispose_resources(dev) + except Exception: # noqa: BLE001 + logger.debug("USB port status query failed", exc_info=True) + return None + finally: + _start_cups() + + +def _get_cups_total_pages() -> int: + """Parse CUPS page_log to get total pages printed (deduplicated by job).""" + if not CUPS_PAGE_LOG.exists(): + return 0 + try: + text = CUPS_PAGE_LOG.read_text(encoding="utf-8", errors="replace") + except OSError: + return 0 + # page_log format: printer user job_id [date] total N ... + # Deduplicate by job_id (retries produce repeated lines) + jobs: dict[str, int] = {} + for line in text.splitlines(): + match = re.search(r"\s(\d+)\s+\[.*?\]\s+total\s+(\d+)", line) + if match: + job_id = match.group(1) + pages = int(match.group(2)) + jobs[job_id] = max(jobs.get(job_id, 0), pages) + return sum(jobs.values()) + + +def _load_consumable_state() -> dict[str, int]: + """Load consumable replacement state from disk. + + Returns dict with keys 'toner_replaced_at' and 'drum_replaced_at' + (page counts when each consumable was last replaced). + """ + defaults: dict[str, int] = {"toner_replaced_at": 0, "drum_replaced_at": 0} + if not CONSUMABLE_STATE_FILE.exists(): + return defaults + try: + data = json.loads( + CONSUMABLE_STATE_FILE.read_text(encoding="utf-8"), + ) + return { + "toner_replaced_at": int(data.get("toner_replaced_at", 0)), + "drum_replaced_at": int(data.get("drum_replaced_at", 0)), + } + except (OSError, json.JSONDecodeError, ValueError, TypeError): + return defaults + + +def _save_consumable_state(state: dict[str, int]) -> None: + """Persist consumable replacement state to disk.""" + CONSUMABLE_STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + CONSUMABLE_STATE_FILE.write_text( + json.dumps(state, indent=2) + "\n", + encoding="utf-8", + ) + + +def _reset_consumable(name: str) -> None: + """Record current page count as replacement point for a consumable.""" + total = _get_cups_total_pages() + state = _load_consumable_state() + key = f"{name}_replaced_at" + state[key] = total + _save_consumable_state(state) + _out( + f"{GREEN}✓ {name.capitalize()} counter reset at page count" f" {total}.{RESET}" + ) + _out(f" State saved to {CONSUMABLE_STATE_FILE}") + + +def _estimate_consumable_life() -> PageCountEstimate: + """Estimate toner/drum life from CUPS page count since last replacement.""" + total = _get_cups_total_pages() + if total <= 0: + return PageCountEstimate() + state = _load_consumable_state() + toner_pages = max(0, total - state["toner_replaced_at"]) + drum_pages = max(0, total - state["drum_replaced_at"]) + toner_pct = max(0, 100 - (toner_pages * 100 // TONER_RATED_PAGES)) + drum_pct = max(0, 100 - (drum_pages * 100 // DRUM_RATED_PAGES)) + return PageCountEstimate( + total_pages=total, + toner_pages=toner_pages, + drum_pages=drum_pages, + toner_pct_remaining=toner_pct, + drum_pct_remaining=drum_pct, + toner_exhausted=toner_pages >= TONER_RATED_PAGES, + toner_low=toner_pages >= TONER_RATED_PAGES * 80 // 100, + drum_near_end=drum_pages >= DRUM_RATED_PAGES * 90 // 100, + ) + + +def _parse_ipp_attributes(output: str) -> dict[str, str]: + """Parse ipptool verbose output into an attribute dict.""" + attrs: dict[str, str] = {} + for line in output.splitlines(): + match = re.match(r"\s+(\S+)\s+\([^)]+\)\s+=\s+(.*)", line) + if match: + attrs[match.group(1)] = match.group(2).strip() + return attrs + + +def _get_cups_ipp_status(printer_name: str) -> dict[str, str]: + """Query printer attributes via CUPS IPP using ipptool.""" + ipptool_path = shutil.which("ipptool") + if not ipptool_path: + return {} + uri = f"ipp://localhost/printers/{printer_name}" + try: + r = subprocess.run( + [ipptool_path, "-tv", uri, "get-printer-attributes.test"], + capture_output=True, + text=True, + timeout=10, + check=False, + ) + return _parse_ipp_attributes(r.stdout) + except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError): + return {} + + +def _get_cups_economode(printer_name: str) -> str: + """Query toner save mode setting via lpoptions.""" + lpoptions_path = shutil.which("lpoptions") + if not lpoptions_path: + return "" + try: + r = subprocess.run( + [lpoptions_path, "-p", printer_name, "-l"], + capture_output=True, + text=True, + timeout=5, + check=False, + ) + for line in r.stdout.splitlines(): + if "conomode" in line.lower(): + match = re.search(r"\*(\w+)", line) + if match: + return "ON" if match.group(1).lower() == "true" else "OFF" + except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError): + pass + return "" + + +def _map_cups_to_status_code(state: str, reasons: str) -> str: + """Map CUPS state + reasons to a Brother PJL status code string.""" + for keyword, code in _CUPS_REASONS_TO_STATUS.items(): + if keyword in reasons.lower(): + return str(code) + clean_state = re.sub(r"\(.*\)", "", state).strip().lower() + return str(_CUPS_STATE_TO_STATUS.get(clean_state, 10001)) + + +_ERROR_REASON_MAP: tuple[tuple[tuple[str, ...], str, str], ...] = ( + (("media-jam",), "40000", "Paper Jam"), + (("cover-open", "door-open"), "41000", "Cover Open"), + (("toner-empty",), "40310", "Toner End"), + (("toner-low",), "30010", "Toner Low"), +) + + +def _cups_reasons_to_error(cups_reasons: str) -> tuple[str, str]: + """Map CUPS reason keywords to a (status_code, display) pair.""" + reasons_lower = cups_reasons.lower() + for keywords, code, display in _ERROR_REASON_MAP: + if any(kw in reasons_lower for kw in keywords): + return code, display + return "42000", "Printer Error" + + +def _port_status_to_status_code( + ps: USBPortStatus, + cups_reasons: str, +) -> tuple[str, str]: + """Map USB port status + CUPS reasons to (status_code, display).""" + # Hardware error flags take priority + if ps.error and ps.paper_empty: + return "40302", "No Paper" + if ps.error and not ps.online: + return "41000", "Cover Open" + if ps.error: + return _cups_reasons_to_error(cups_reasons) + if ps.paper_empty: + return "40302", "No Paper" + if not ps.online: + return "10002", "Offline / Sleep" + return "", "" + + +def _ensure_cups_running() -> bool: + """Make sure CUPS is running, starting it if necessary.""" + if _is_cups_scheduler_running(): + return True + # CUPS not running — try to start it (needs root) + if os.geteuid() == 0: + return _start_cups() + return False + + +def _query_usb_via_cups() -> USBResult: + """Query USB printer status through CUPS when /dev/usb/lp* is unavailable.""" + _ensure_cups_running() + printer_name = _find_cups_printer_name() + if not printer_name: + return USBResult( + error="No USB printer device at /dev/usb/lp*" + " (usblp module not available)" + " and no Brother printer found in CUPS.", + ) + + pyusb_info = _get_pyusb_device_info() + cups_info = get_printer_info_from_cups() + + result = USBResult( + device="cups", + product=( + pyusb_info.get("product") + or cups_info.get("product") + or "Brother Laser Printer" + ), + serial=pyusb_info.get("serial") or cups_info.get("serial", ""), + ) + + ipp = _get_cups_ipp_status(printer_name) + state = ipp.get("printer-state", "") + reasons = ipp.get("printer-state-reasons", "none") + result.economode = _get_cups_economode(printer_name) + + # Try direct USB hardware status query (requires root) + if os.geteuid() == 0: + port_status = _query_usb_port_status_raw() + if port_status is not None: + result.port_status = port_status + hw_code, hw_display = _port_status_to_status_code( + port_status, + reasons, + ) + if hw_code: + result.status_code = hw_code + result.display = hw_display + result.online = "TRUE" if port_status.online else "FALSE" + return result + # Hardware says OK — check page count for toner/drum warnings + estimate = _estimate_consumable_life() + if estimate.toner_exhausted: + result.status_code = "40310" + result.display = "Toner End (estimated from page count)" + result.online = "TRUE" + return result + if estimate.toner_low: + result.status_code = "30010" + result.display = "Toner Low (estimated from page count)" + result.online = "TRUE" + return result + result.status_code = _map_cups_to_status_code(state, reasons) + result.display = ipp.get("printer-state-message", "") + result.online = "TRUE" + return result + + # Non-root or pyusb unavailable: CUPS-only fallback + result.status_code = _map_cups_to_status_code(state, reasons) + result.display = ipp.get("printer-state-message", "") + result.online = "TRUE" if state.lower() in {"idle", "processing"} else "FALSE" + + return result + + # ── SNMP network query ────────────────────────────────────────────── @@ -670,33 +1121,77 @@ def _cups_restart_service() -> bool: if not systemctl_path: _out(f" {RED}systemctl not found.{RESET}") return False + sys.stdout.write(f" {DIM}Restarting CUPS...{RESET}") + sys.stdout.flush() try: - subprocess.run( + proc = subprocess.Popen( [systemctl_path, "restart", "cups"], - timeout=15, - check=True, ) - time.sleep(2) # wait for CUPS to come back up - except (subprocess.TimeoutExpired, subprocess.CalledProcessError, OSError) as e: + deadline = time.time() + 30 + while proc.poll() is None: + if time.time() > deadline: + proc.kill() + proc.wait() + sys.stdout.write("\n") + _out( + f" {RED}CUPS restart timed out" + f" (stuck backend process?).{RESET}" + ) + _out( + f" {DIM}Try: sudo kill -9 $(pgrep -f 'cups/backend/usb')" + f" && sudo systemctl restart cups{RESET}" + ) + return False + sys.stdout.write(".") + sys.stdout.flush() + time.sleep(1) + sys.stdout.write("\n") + if proc.returncode != 0: + _out( + f" {RED}CUPS restart failed" f" (exit code {proc.returncode}).{RESET}" + ) + return False + except OSError as e: + sys.stdout.write("\n") _out(f" {RED}Failed to restart CUPS: {e}{RESET}") return False - else: - return True + time.sleep(2) # wait for CUPS to come back up + return True -def _check_cups_backend_errors( - printer_name: str, # noqa: ARG001 -) -> tuple[bool, str]: - """Check CUPS error log for backend errors. Returns (has_errors, last_error).""" - log_path = Path("/var/log/cups/error_log") - if not log_path.exists(): - return False, "" +def _is_cups_printer_healthy(printer_name: str) -> bool: + """Check live CUPS state via lpstat. Returns True if enabled with no issues.""" + lpstat_path = shutil.which("lpstat") + if not lpstat_path: + return False try: - lines = log_path.read_text(encoding="utf-8", errors="replace").splitlines() - except OSError: - return False, "" + r = subprocess.run( + [lpstat_path, "-p", printer_name], + capture_output=True, + text=True, + timeout=5, + check=False, + ) + for line in r.stdout.splitlines(): + if ( + printer_name in line + and "idle" in line.lower() + and "enabled" in line.lower() + ): + return True + except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError): + pass + return False - # Look for backend errors related to this printer (scan from end) + +def _find_backend_error_in_log( + lines: list[str], +) -> tuple[str, str, str]: + """Scan CUPS log lines (reversed) for backend errors. + + Returns: + (backend_error, error_timestamp, last_success_timestamp) + """ backend_error = "" error_timestamp = "" last_success_timestamp = "" @@ -716,6 +1211,29 @@ def _check_cups_backend_errors( last_success_timestamp = ts_match.group(1) break + return backend_error, error_timestamp, last_success_timestamp + + +def _check_cups_backend_errors( + printer_name: str, +) -> tuple[bool, str]: + """Check CUPS error log for backend errors. Returns (has_errors, last_error).""" + # If the printer is currently healthy, ignore stale log entries. + if _is_cups_printer_healthy(printer_name): + return False, "" + + log_path = Path("/var/log/cups/error_log") + if not log_path.exists(): + return False, "" + try: + lines = log_path.read_text(encoding="utf-8", errors="replace").splitlines() + except OSError: + return False, "" + + backend_error, error_timestamp, last_success_timestamp = _find_backend_error_in_log( + lines + ) + if not backend_error: return False, "" @@ -890,6 +1408,67 @@ def _display_report_header() -> None: _out() +def _display_page_count_estimate() -> None: + """Show estimated consumable life based on CUPS page count.""" + estimate = _estimate_consumable_life() + if estimate.total_pages <= 0: + return + _out(f"{BOLD}── Page Count Estimate ──{RESET}") + _out() + _out( + f" {BOLD}Total pages printed:{RESET} {estimate.total_pages}" + f" (toner: {estimate.toner_pages} since replacement," + f" drum: {estimate.drum_pages} since replacement)" + ) + _out() + # Toner bar + toner_pct = estimate.toner_pct_remaining + toner_filled = toner_pct * PROGRESS_BAR_WIDTH // 100 + toner_empty = PROGRESS_BAR_WIDTH - toner_filled + toner_bar = f"[{'█' * toner_filled}{'░' * toner_empty}]" + if estimate.toner_exhausted: + toner_color = RED + toner_note = " ← REPLACE NOW" + elif estimate.toner_low: + toner_color = YELLOW + toner_note = " ← order soon" + else: + toner_color = GREEN + toner_note = "" + _out( + f" {BOLD}Toner:{RESET} {toner_color}{toner_bar} ~{toner_pct}%" + f"{toner_note}{RESET}" + ) + # Drum bar + drum_pct = estimate.drum_pct_remaining + drum_filled = drum_pct * PROGRESS_BAR_WIDTH // 100 + drum_empty = PROGRESS_BAR_WIDTH - drum_filled + drum_bar = f"[{'█' * drum_filled}{'░' * drum_empty}]" + if estimate.drum_near_end: + drum_color = YELLOW + drum_note = " ← nearing end" + else: + drum_color = GREEN + drum_note = "" + _out( + f" {BOLD}Drum:{RESET} {drum_color}{drum_bar} ~{drum_pct}%" + f"{drum_note}{RESET}" + ) + _out( + f" {DIM}Based on pages since last replacement" + f" vs rated capacity (toner ~{TONER_RATED_PAGES}," + f" drum ~{DRUM_RATED_PAGES}).{RESET}" + ) + _out(f" {DIM}Reset after replacing: --reset-toner" f" or --reset-drum{RESET}") + if estimate.toner_exhausted: + _out() + _out( + f" {RED}{BOLD}⚠ Toner is likely exhausted." + f" This is probably why the orange light is flashing.{RESET}" + ) + _out() + + def _display_consumables_reference() -> None: """Print compatible consumables reference.""" _out(f"{BOLD}── Compatible Consumables ──{RESET}") @@ -988,6 +1567,25 @@ def _display_pjl_status(result: USBResult) -> None: # ── Display: USB results ──────────────────────────────────────────── +def _display_cups_fallback_note(result: USBResult) -> None: + """Show a note when running in CUPS fallback mode.""" + _out() + if result.port_status is not None: + _out( + f" {DIM}Note: Hardware status obtained via USB port query." + f" Toner/drum percentages not available.{RESET}" + ) + else: + _out( + f" {DIM}Note: Status obtained via CUPS only" + f" (run with sudo for direct hardware query).{RESET}" + ) + _out( + f" {DIM}Detailed toner/drum levels are not available in this" + f" mode.{RESET}" + ) + + def display_usb_results(result: USBResult) -> None: """Print a formatted report for USB PJL query results.""" if result.error: @@ -997,7 +1595,12 @@ def display_usb_results(result: USBResult) -> None: _display_report_header() _display_usb_device_info(result) _display_pjl_status(result) + + if result.device == "cups": + _display_cups_fallback_note(result) + _out() + _display_page_count_estimate() _display_consumables_reference() queue = get_cups_queue_status() @@ -1193,7 +1796,8 @@ def _run_network_mode(printer_ip: str) -> None: def _run_usb_mode(usb_line: str) -> None: """Handle USB printer mode.""" _out(f"{CYAN}Found Brother printer on USB: {usb_line}{RESET}") - if os.geteuid() != 0: + has_dev = find_usb_printer_dev() is not None + if has_dev and os.geteuid() != 0: _out(f"{RED}Root access required for USB printer. Re-run with sudo.{RESET}") sys.exit(1) display_usb_results(query_usb_pjl()) @@ -1214,6 +1818,15 @@ def _no_printer_found() -> None: def main(argv: list[str] | None = None) -> None: """Entry point: auto-detect USB or network Brother printer.""" args = argv if argv is not None else sys.argv[1:] + + # Handle consumable reset commands + if args and args[0] == "--reset-toner": + _reset_consumable("toner") + return + if args and args[0] == "--reset-drum": + _reset_consumable("drum") + return + printer_ip = args[0] if args else "" if printer_ip: diff --git a/python_pkg/brother_printer/run.sh b/python_pkg/brother_printer/run.sh index 8a2a0fb..8af7323 100755 --- a/python_pkg/brother_printer/run.sh +++ b/python_pkg/brother_printer/run.sh @@ -2,9 +2,15 @@ set -e SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -# No pip dependencies — script only uses stdlib -# Requires root for USB mode; pass printer IP as argument for network/SNMP mode +# No pip dependencies — script only uses stdlib (+pyusb for fallback info) +# Requires root for USB access (PJL via usblp or port status via pyusb) # Usage: ./run.sh # auto-detect # ./run.sh # network/SNMP mode -echo "Note: sudo may prompt for your password (required for USB printer access)." -sudo python3 "$SCRIPT_DIR/check_brother_printer.py" "$@" + +# Use sudo when a Brother printer is on USB (for /dev/usb/lp* or pyusb hw query) +if ls /dev/usb/lp* &>/dev/null || lsusb 2>/dev/null | grep -qi "04f9.*brother"; then + echo "Note: sudo may prompt for your password (required for USB printer access)." + sudo python3 "$SCRIPT_DIR/check_brother_printer.py" "$@" +else + python3 "$SCRIPT_DIR/check_brother_printer.py" "$@" +fi diff --git a/python_pkg/steam_backlog_enforcer/hltb.py b/python_pkg/steam_backlog_enforcer/hltb.py index 4e5b4b2..46c7a3f 100644 --- a/python_pkg/steam_backlog_enforcer/hltb.py +++ b/python_pkg/steam_backlog_enforcer/hltb.py @@ -1,22 +1,39 @@ -"""HowLongToBeat integration for estimating game completion times.""" +"""HowLongToBeat integration for estimating game completion times. + +Fetches completionist hour estimates from howlongtobeat.com with: +- direct API calls (bypassing the slow howlongtobeatpy per-request setup) +- single shared aiohttp session for all requests +- concurrent requests with configurable concurrency +- live progress reporting via callback +- incremental disk-cache saves so crashes don't lose work +""" from __future__ import annotations import asyncio -from dataclasses import dataclass +from collections.abc import Callable +from dataclasses import dataclass, field +from difflib import SequenceMatcher +from http import HTTPStatus import json import logging +import time -from howlongtobeatpy import HowLongToBeat +import aiohttp +from howlongtobeatpy.HTMLRequests import HTMLRequests from python_pkg.steam_backlog_enforcer.config import CONFIG_DIR logger = logging.getLogger(__name__) HLTB_CACHE_FILE = CONFIG_DIR / "hltb_cache.json" -MAX_CONCURRENT = 30 +MAX_CONCURRENT = 60 # parallel requests to HLTB +_SAVE_INTERVAL = 50 # flush cache to disk every N results MIN_SIMILARITY = 0.5 +# Type for progress callbacks: (done, total, found, game_name) +ProgressCb = Callable[[int, int, int, str], None] + @dataclass class HLTBResult: @@ -26,12 +43,21 @@ class HLTBResult: game_name: str completionist_hours: float similarity: float + hltb_game_id: int = 0 + + +HLTB_BASE_URL = "https://howlongtobeat.com" + + +# ────────────────────────────────────────────────────────────── +# Cache I/O +# ────────────────────────────────────────────────────────────── def load_hltb_cache() -> dict[int, float]: """Load the persistent HLTB cache from disk. - Returns: dict mapping app_id -> completionist_hours. + Returns: dict mapping app_id -> completionist_hours (-1 = no data on HLTB). """ if HLTB_CACHE_FILE.exists(): try: @@ -54,51 +80,270 @@ def save_hltb_cache(cache: dict[int, float]) -> None: logger.exception("Failed to save HLTB cache") +# ────────────────────────────────────────────────────────────── +# HLTB API setup (done once, not per-request like the library) +# ────────────────────────────────────────────────────────────── + + +def _get_hltb_search_url() -> str: + """Discover the current HLTB search API endpoint. + + Scrapes the homepage for JS bundles containing the fetch URL. + Falls back to ``/api/finder`` if extraction fails. + """ + try: + search_info = HTMLRequests.send_website_request_getcode( + parse_all_scripts=False, + ) + if search_info is None: + search_info = HTMLRequests.send_website_request_getcode( + parse_all_scripts=True, + ) + if search_info and search_info.search_url: + url: str = HTMLRequests.BASE_URL + search_info.search_url + return url + except Exception: # noqa: BLE001 + logger.debug("Failed to discover HLTB search URL, using default") + return "https://howlongtobeat.com/api/finder" + + +async def _get_auth_token( + search_url: str, + session: aiohttp.ClientSession, +) -> str | None: + """Fetch the HLTB auth token (one GET request).""" + init_url = search_url + "/init" + ts = int(time.time() * 1000) + headers = { + "User-Agent": ( + "Mozilla/5.0 (X11; Linux x86_64; rv:136.0)" " Gecko/20100101 Firefox/136.0" + ), + "referer": "https://howlongtobeat.com/", + } + try: + async with session.get( + init_url, + params={"t": ts}, + headers=headers, + ) as resp: + if resp.status == HTTPStatus.OK: + data = await resp.json() + token: str | None = data.get("token") + return token + except (aiohttp.ClientError, asyncio.TimeoutError): + logger.warning("Failed to get HLTB auth token") + return None + + +def _similarity(a: str, b: str) -> float: + """Case-insensitive SequenceMatcher ratio between two strings.""" + return SequenceMatcher(None, a.lower(), b.lower()).ratio() + + +def _build_search_payload(game_name: str) -> str: + """Build the JSON POST body for an HLTB search.""" + return json.dumps( + { + "searchType": "games", + "searchTerms": game_name.split(), + "searchPage": 1, + "size": 20, + "searchOptions": { + "games": { + "userId": 0, + "platform": "", + "sortCategory": "popular", + "rangeCategory": "main", + "rangeTime": {"min": 0, "max": 0}, + "gameplay": { + "perspective": "", + "flow": "", + "genre": "", + "difficulty": "", + }, + "rangeYear": {"max": "", "min": ""}, + "modifier": "", + }, + "users": {"sortCategory": "postcount"}, + "lists": {"sortCategory": "follows"}, + "filter": "", + "sort": 0, + "randomizer": 0, + }, + "useCache": True, + } + ) + + +# ────────────────────────────────────────────────────────────── +# Async fetching with shared session & progress +# ────────────────────────────────────────────────────────────── + + +@dataclass +class _SearchCtx: + """Shared context for HLTB search requests.""" + + session: aiohttp.ClientSession + search_url: str + headers: dict[str, str] + cache: dict[int, float] + counter: dict[str, int] = field(default_factory=dict) + total: int = 0 + progress_cb: ProgressCb | None = None + + async def _search_one( - sem: asyncio.Semaphore, app_id: int, name: str + sem: asyncio.Semaphore, + ctx: _SearchCtx, + app_id: int, + name: str, ) -> HLTBResult | None: - """Search HLTB for a single game.""" + """Search HLTB for one game via direct POST, update cache.""" async with sem: + result: HLTBResult | None = None + payload = _build_search_payload(name) try: - results = await HowLongToBeat().async_search(name) - if results: - best = max(results, key=lambda r: r.similarity) - if best.similarity >= MIN_SIMILARITY: - comp = best.completionist - if comp and comp > 0: - return HLTBResult( - app_id=app_id, - game_name=name, - completionist_hours=comp, - similarity=best.similarity, + async with ctx.session.post( + ctx.search_url, + headers=ctx.headers, + data=payload, + ) as resp: + if resp.status == HTTPStatus.OK: + data = await resp.json() + for entry in data.get("data", []): + entry_name = entry.get("game_name", "") + entry_alias = entry.get("game_alias", "") or "" + sim = max( + _similarity(name, entry_name), + _similarity(name, entry_alias), ) - except (OSError, ValueError, TypeError, AttributeError) as e: - logger.debug("HLTB search failed for '%s': %s", name, e) - return None + if sim >= MIN_SIMILARITY: + comp_100 = entry.get("comp_100", 0) + if comp_100 and comp_100 > 0: + hours = round(comp_100 / 3600, 2) + result = HLTBResult( + app_id=app_id, + game_name=name, + completionist_hours=hours, + similarity=sim, + hltb_game_id=entry.get("game_id", 0), + ) + break + except (aiohttp.ClientError, asyncio.TimeoutError) as exc: + logger.debug("HLTB search failed for '%s': %s", name, exc) + + # Update cache immediately (miss = -1). + if result is not None: + ctx.cache[app_id] = result.completionist_hours + ctx.counter["found"] += 1 + else: + ctx.cache[app_id] = -1 + + ctx.counter["done"] += 1 + done = ctx.counter["done"] + + # Incremental save every _SAVE_INTERVAL lookups. + if done % _SAVE_INTERVAL == 0: + save_hltb_cache(ctx.cache) + + # Report progress. + if ctx.progress_cb is not None: + ctx.progress_cb(done, ctx.total, ctx.counter["found"], name) + + return result async def _fetch_batch( games: list[tuple[int, str]], + cache: dict[int, float], + progress_cb: ProgressCb | None, ) -> list[HLTBResult]: - """Fetch HLTB data for a batch of games concurrently.""" + """Fetch HLTB data for a batch of games using one shared session.""" + # 1. Discover the search URL (sync, one-time). + search_url = _get_hltb_search_url() + logger.info("HLTB search URL: %s", search_url) + + timeout = aiohttp.ClientTimeout(total=20, sock_read=15) + + # 2. Get auth token (separate session — avoids reuse issues). + async with aiohttp.ClientSession(timeout=timeout) as init_session: + token = await _get_auth_token(search_url, init_session) + if token is None: + logger.warning("Could not get HLTB auth token, aborting fetch.") + return [] + logger.info("HLTB auth token acquired.") + + # 3. Build shared headers for all search requests. + headers = { + "content-type": "application/json", + "accept": "*/*", + "User-Agent": ( + "Mozilla/5.0 (X11; Linux x86_64; rv:136.0)" " Gecko/20100101 Firefox/136.0" + ), + "referer": "https://howlongtobeat.com/", + "x-auth-token": token, + } + + # 4. Fire all searches through a single persistent session. sem = asyncio.Semaphore(MAX_CONCURRENT) - tasks = [_search_one(sem, app_id, name) for app_id, name in games] - results = await asyncio.gather(*tasks) + counter = {"done": 0, "found": 0} + total = len(games) + + connector = aiohttp.TCPConnector( + limit=MAX_CONCURRENT, + keepalive_timeout=30, + ) + async with aiohttp.ClientSession( + timeout=timeout, + connector=connector, + ) as session: + ctx = _SearchCtx( + session=session, + search_url=search_url, + headers=headers, + cache=cache, + counter=counter, + total=total, + progress_cb=progress_cb, + ) + tasks = [ + _search_one( + sem, + ctx, + app_id, + name, + ) + for app_id, name in games + ] + results = await asyncio.gather(*tasks) + return [r for r in results if r is not None] -def fetch_hltb_times(games: list[tuple[int, str]]) -> list[HLTBResult]: +def fetch_hltb_times( + games: list[tuple[int, str]], + cache: dict[int, float] | None = None, + progress_cb: ProgressCb | None = None, +) -> list[HLTBResult]: """Synchronous wrapper: fetch HLTB times for games.""" if not games: return [] - return asyncio.run(_fetch_batch(games)) + if cache is None: + cache = {} + return asyncio.run(_fetch_batch(games, cache, progress_cb)) def fetch_hltb_times_cached( games: list[tuple[int, str]], + progress_cb: ProgressCb | None = None, ) -> dict[int, float]: """Fetch HLTB times, using disk cache for already-known games. + Args: + games: list of (app_id, name) tuples to look up. + progress_cb: optional callback(done, total, found, game_name). + Returns: dict mapping app_id -> completionist_hours. """ cache = load_hltb_cache() @@ -106,20 +351,43 @@ def fetch_hltb_times_cached( if uncached: logger.info( - "Fetching HLTB data for %d uncached games (out of %d total)...", + "Fetching HLTB data for %d uncached games (%d cached)...", len(uncached), - len(games), + len(games) - len(uncached), ) - results = fetch_hltb_times(uncached) - for r in results: - cache[r.app_id] = r.completionist_hours - # Also cache misses as -1 so we don't re-fetch them. - found_ids = {r.app_id for r in results} - for app_id, _ in uncached: - if app_id not in found_ids: - cache[app_id] = -1 + t0 = time.monotonic() + fetch_hltb_times(uncached, cache=cache, progress_cb=progress_cb) + elapsed = time.monotonic() - t0 + + # Final save. save_hltb_cache(cache) + + found = sum(1 for aid, _ in uncached if cache.get(aid, -1) > 0) + rate = len(uncached) / elapsed if elapsed > 0 else 0 + logger.info( + "HLTB fetch done: %d/%d found in %.1fs (%.0f games/s)", + found, + len(uncached), + elapsed, + rate, + ) else: logger.info("All %d games found in HLTB cache.", len(games)) return cache + + +def get_hltb_submit_url(game_name: str) -> str | None: + """Look up a game on HLTB and return its submit page URL. + + Args: + game_name: Name of the game to search for. + + Returns: + URL like ``https://howlongtobeat.com/submit/game/12345``, + or ``None`` if the game wasn't found. + """ + results = fetch_hltb_times([(0, game_name)]) + if results and results[0].hltb_game_id: + return f"{HLTB_BASE_URL}/submit/game/{results[0].hltb_game_id}" + return None diff --git a/python_pkg/steam_backlog_enforcer/main.py b/python_pkg/steam_backlog_enforcer/main.py index 7e0fd0e..12af073 100644 --- a/python_pkg/steam_backlog_enforcer/main.py +++ b/python_pkg/steam_backlog_enforcer/main.py @@ -25,12 +25,19 @@ from python_pkg.steam_backlog_enforcer.enforcer import ( enforce_allowed_game, send_notification, ) -from python_pkg.steam_backlog_enforcer.hltb import fetch_hltb_times_cached +from python_pkg.steam_backlog_enforcer.hltb import ( + fetch_hltb_times_cached, + get_hltb_submit_url, +) from python_pkg.steam_backlog_enforcer.library_hider import ( hide_other_games, restart_steam, unhide_all_games, ) +from python_pkg.steam_backlog_enforcer.protondb import ( + ProtonDBRating, + fetch_protondb_ratings, +) from python_pkg.steam_backlog_enforcer.steam_api import GameInfo, SteamAPIClient from python_pkg.steam_backlog_enforcer.store_blocker import ( block_store, @@ -411,7 +418,21 @@ def do_scan(config: Config, state: State) -> list[GameInfo]: incomplete = [(g.app_id, g.name) for g in games if not g.is_complete] if incomplete: _echo(f"Fetching HLTB completion times for {len(incomplete)} games...") - hltb_cache = fetch_hltb_times_cached(incomplete) + + def hltb_progress(done: int, total: int, found: int, name: str) -> None: + pct = done * 100 // total + bar_w = 30 + filled = bar_w * done // total + bar = "█" * filled + "░" * (bar_w - filled) + _echo( + f"\r HLTB [{bar}] {done}/{total} ({pct}%) " + f"| {found} found | {name[:30]:<30s}", + end="", + flush=True, + ) + + hltb_cache = fetch_hltb_times_cached(incomplete, progress_cb=hltb_progress) + _echo("") # newline after progress bar for g in games: hours = hltb_cache.get(g.app_id, -1) g.completionist_hours = hours @@ -432,8 +453,52 @@ def do_scan(config: Config, state: State) -> list[GameInfo]: return games +# How many candidates to check per ProtonDB batch. +_PROTONDB_BATCH_SIZE = 20 + + +def _pick_playable_candidate( + candidates: list[GameInfo], +) -> GameInfo | None: + """Return the first candidate with an acceptable ProtonDB rating. + + Checks candidates in batches (sorted by HLTB hours, shortest first). + Games rated silver-or-worse, or gold-trending-down, are skipped. + """ + offset = 0 + while offset < len(candidates): + batch = candidates[offset : offset + _PROTONDB_BATCH_SIZE] + app_ids = [g.app_id for g in batch] + ratings = fetch_protondb_ratings(app_ids) + + for game in batch: + rating = ratings.get(game.app_id, ProtonDBRating(app_id=game.app_id)) + if rating.is_playable: + if offset > 0 or game is not batch[0]: + _echo( + f" Skipped {offset + batch.index(game)} game(s) " + f"with poor Linux compatibility" + ) + return game + logger.info( + "Skipping %s (AppID=%d): ProtonDB %s (trending %s)", + game.name, + game.app_id, + rating.tier, + rating.trending_tier, + ) + + offset += _PROTONDB_BATCH_SIZE + + return None + + def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None: - """Select the next game: shortest completionist time first.""" + """Select the next game: shortest completionist time first. + + Games with silver-or-worse ProtonDB ratings (or gold trending + downward) are automatically skipped as unplayable on Linux. + """ skip = set(config.skip_app_ids) | set(state.finished_app_ids) candidates = [g for g in games if not g.is_complete and g.app_id not in skip] @@ -451,7 +516,16 @@ def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None: return (1, g.name.lower().encode().hex().__hash__()) candidates.sort(key=sort_key) - chosen = candidates[0] + + # Filter out Linux-incompatible games via ProtonDB. + chosen = _pick_playable_candidate(candidates) + + if chosen is None: + _echo("\nNo playable games left (all have poor ProtonDB ratings)!") + state.current_app_id = None + state.current_game_name = "" + state.save() + return state.current_app_id = chosen.app_id state.current_game_name = chosen.name @@ -466,7 +540,12 @@ def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None: f" ({chosen.completion_pct:.1f}%)" ) - # Auto-install the newly assigned game. + # Uninstall all other games first, then auto-install the assigned one. + if config.uninstall_other_games: + count = uninstall_other_games(chosen.app_id) + if count: + _echo(f"\n Uninstalled {count} non-assigned games") + if not is_game_installed(chosen.app_id): _echo(f"\n Auto-installing {chosen.name}...") install_game(chosen.app_id, chosen.name, config.steam_id) @@ -986,6 +1065,109 @@ def cmd_unhide(config: Config, _state: State) -> None: _echo("Done!") +def _open_hltb_submit_page( + game_name: str, + app_id: int, + snapshot_data: list[dict[str, Any]] | None, +) -> None: + """Show playtime and open the HLTB submit page in the browser.""" + playtime_minutes = 0 + if snapshot_data: + for entry in snapshot_data: + if entry.get("app_id") == app_id: + playtime_minutes = entry.get("playtime_minutes", 0) + break + + playtime_h = playtime_minutes / 60 + _echo(f"\n Steam playtime: {playtime_h:.1f} hours") + + _echo(" Looking up game on HowLongToBeat...") + submit_url = get_hltb_submit_url(game_name) + if submit_url: + _echo(f" HLTB submit page: {submit_url}") + _echo(" Opening in browser (log in & submit your time)...") + import webbrowser + + webbrowser.open(submit_url) + else: + _echo(" Could not find game on HLTB (submit manually).") + + +def cmd_done(config: Config, state: State) -> None: + """Check completion, open HLTB submit, pick next game, uninstall & hide. + + All-in-one command for after finishing a game: + 1. Verify 100% achievements on Steam. + 2. Show playtime and open HLTB submit page in browser. + 3. Pick the next game (shortest HLTB 100% time). + 4. Uninstall all non-assigned games. + 5. Hide all non-assigned games in the Steam library. + 6. Install the newly assigned game. + """ + if state.current_app_id is None: + _echo("No game currently assigned. Run 'scan' first.") + return + + client = SteamAPIClient(config.steam_api_key, config.steam_id) + game_name = state.current_game_name + app_id = state.current_app_id + + _echo(f"Checking {game_name} (AppID={app_id})...") + game = client.refresh_single_game(app_id, game_name) + if game is None: + _echo(" Could not fetch achievement data from Steam.") + return + + _echo( + f" Progress: {game.unlocked_achievements}/{game.total_achievements}" + f" ({game.completion_pct:.1f}%)" + ) + + if not game.is_complete: + remaining = game.total_achievements - game.unlocked_achievements + _echo(f"\n NOT COMPLETE: {remaining} achievements remaining. Keep going!") + return + + # ── Step 1: Mark complete ── + _echo(f"\n COMPLETED: {game_name}!") + state.finished_app_ids.append(app_id) + + # ── Step 2: HLTB submit ── + snapshot_data = load_snapshot() + _open_hltb_submit_page(game_name, app_id, snapshot_data) + + # ── Step 3: Pick next game ── + _echo("\nPicking next game...") + if not snapshot_data: + _echo(" No snapshot found. Run 'scan' first.") + state.current_app_id = None + state.current_game_name = "" + state.save() + return + + games = [GameInfo.from_snapshot(d) for d in snapshot_data] + pick_next_game(games, state, config) + + if state.current_app_id is None: + _echo(" No more games to assign!") + return + + # ── Step 4: Hide non-assigned games in library ── + owned_ids = _get_all_owned_app_ids(config) + if owned_ids: + hidden = hide_other_games(owned_ids, state.current_app_id) + if hidden > 0: + _echo(f"\n Library: hid {hidden} games") + restart_steam() + _echo(" Steam restarted to apply library changes.") + + send_notification( + "Game Complete!", + f"Finished {game_name}! Now playing: {state.current_game_name}", + ) + _echo(f"\nAll done! Go play {state.current_game_name}!") + + COMMANDS = { "scan": ("Scan library & assign a game", do_scan), "check": ("Check assigned game completion", do_check), @@ -1001,6 +1183,7 @@ COMMANDS = { "installed": ("List installed games", cmd_installed), "uninstall": ("Uninstall all non-assigned games", cmd_uninstall), "setup": ("Run first-time setup", cmd_setup), + "done": ("Finish game, open HLTB, pick next", cmd_done), } diff --git a/python_pkg/steam_backlog_enforcer/protondb.py b/python_pkg/steam_backlog_enforcer/protondb.py new file mode 100644 index 0000000..3638445 --- /dev/null +++ b/python_pkg/steam_backlog_enforcer/protondb.py @@ -0,0 +1,187 @@ +"""ProtonDB integration for Linux compatibility ratings. + +Fetches game compatibility tiers from ProtonDB's public API to filter out +games that don't work well on Linux. Ratings are cached locally so repeated +lookups are free. + +Tier hierarchy (best → worst): native, platinum, gold, silver, bronze, borked. +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +import json +import logging +from typing import Any + +import aiohttp + +from python_pkg.steam_backlog_enforcer.config import CONFIG_DIR + +logger = logging.getLogger(__name__) + +PROTONDB_CACHE_FILE = CONFIG_DIR / "protondb_cache.json" +_PROTONDB_API = "https://www.protondb.com/api/v1/reports/summaries/{app_id}.json" +MAX_CONCURRENT = 30 # parallel requests - be polite to the CDN + +# Tier ordering from best to worst. +TIER_ORDER: dict[str, int] = { + "native": 0, + "platinum": 1, + "gold": 2, + "silver": 3, + "bronze": 4, + "borked": 5, + "pending": 6, +} + +# Games at or below this tier are skipped. +MIN_PLAYABLE_TIER = "gold" + + +@dataclass +class ProtonDBRating: + """ProtonDB compatibility rating for a game.""" + + app_id: int + tier: str = "" + trending_tier: str = "" + score: float = 0.0 + confidence: str = "" + total_reports: int = 0 + + @property + def is_playable(self) -> bool: + """True if the game has at least gold-tier compatibility. + + A game is considered unplayable when: + - Its tier is silver, bronze, or borked. + - Its tier is gold but trending to silver or worse. + - No data exists (unknown compatibility). + """ + if not self.tier: + return True # No data → don't block; user can skip manually. + tier_rank = TIER_ORDER.get(self.tier, 99) + min_rank = TIER_ORDER[MIN_PLAYABLE_TIER] + + if tier_rank > min_rank: + # Silver, bronze, borked → skip. + return False + + if tier_rank == min_rank and self.trending_tier: + # Gold but trending silver/bronze/borked → skip. + trend_rank = TIER_ORDER.get(self.trending_tier, 99) + if trend_rank > min_rank: + return False + + return True + + +def _load_cache() -> dict[str, Any]: + """Load the on-disk ProtonDB cache.""" + if PROTONDB_CACHE_FILE.exists(): + return json.loads(PROTONDB_CACHE_FILE.read_text(encoding="utf-8")) # type: ignore[no-any-return] + return {} + + +def _save_cache(cache: dict[str, Any]) -> None: + """Persist the ProtonDB cache.""" + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + PROTONDB_CACHE_FILE.write_text(json.dumps(cache, indent=2) + "\n", encoding="utf-8") + + +async def _fetch_one( + session: aiohttp.ClientSession, + sem: asyncio.Semaphore, + app_id: int, +) -> ProtonDBRating: + """Fetch a single game's ProtonDB rating.""" + url = _PROTONDB_API.format(app_id=app_id) + async with sem: + try: + async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as r: + if r.status == 404: # noqa: PLR2004 + return ProtonDBRating(app_id=app_id) + r.raise_for_status() + data = await r.json(content_type=None) + return ProtonDBRating( + app_id=app_id, + tier=data.get("tier", ""), + trending_tier=data.get("trendingTier", ""), + score=data.get("score", 0.0), + confidence=data.get("confidence", ""), + total_reports=data.get("total", 0), + ) + except Exception: # noqa: BLE001 + logger.warning("ProtonDB fetch failed for AppID=%d", app_id) + return ProtonDBRating(app_id=app_id) + + +async def _fetch_batch(app_ids: list[int]) -> list[ProtonDBRating]: + """Fetch ProtonDB ratings for a batch of app IDs concurrently.""" + sem = asyncio.Semaphore(MAX_CONCURRENT) + async with aiohttp.ClientSession() as session: + tasks = [_fetch_one(session, sem, aid) for aid in app_ids] + return await asyncio.gather(*tasks) + + +def _rating_to_dict(r: ProtonDBRating) -> dict[str, Any]: + """Serialize a rating to a cache-friendly dict.""" + return { + "tier": r.tier, + "trending_tier": r.trending_tier, + "score": r.score, + "confidence": r.confidence, + "total_reports": r.total_reports, + } + + +def _rating_from_cache(app_id: int, data: dict[str, Any]) -> ProtonDBRating: + """Deserialize a rating from cached data.""" + return ProtonDBRating( + app_id=app_id, + tier=data.get("tier", ""), + trending_tier=data.get("trending_tier", ""), + score=data.get("score", 0.0), + confidence=data.get("confidence", ""), + total_reports=data.get("total_reports", 0), + ) + + +def fetch_protondb_ratings( + app_ids: list[int], +) -> dict[int, ProtonDBRating]: + """Fetch ProtonDB ratings with local caching. + + Returns a dict mapping app_id → ProtonDBRating for every requested ID. + Cached results are reused; only missing IDs are fetched from the network. + """ + cache = _load_cache() + + # Separate cached vs. uncached. + results: dict[int, ProtonDBRating] = {} + to_fetch: list[int] = [] + for aid in app_ids: + key = str(aid) + if key in cache: + results[aid] = _rating_from_cache(aid, cache[key]) + else: + to_fetch.append(aid) + + if to_fetch: + logger.info( + "Fetching ProtonDB ratings for %d games (%d cached)...", + len(to_fetch), + len(results), + ) + fetched = asyncio.run(_fetch_batch(to_fetch)) + for r in fetched: + results[r.app_id] = r + cache[str(r.app_id)] = _rating_to_dict(r) + _save_cache(cache) + logger.info("ProtonDB: fetched %d, total cached %d", len(fetched), len(cache)) + else: + logger.info("All %d ProtonDB ratings found in cache.", len(results)) + + return results diff --git a/python_pkg/steam_backlog_enforcer/run.sh b/python_pkg/steam_backlog_enforcer/run.sh new file mode 100755 index 0000000..b99c6b4 --- /dev/null +++ b/python_pkg/steam_backlog_enforcer/run.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +# Quick launcher for the "done" workflow: +# check completion → open HLTB → pick next game → uninstall & hide others +set -euo pipefail + +cd "$(dirname "$0")/../.." +exec python -m python_pkg.steam_backlog_enforcer.main "done"