From b18a50d3a9e3e83d70a21bb40e9185d6676b53db Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Wed, 4 Mar 2026 22:40:49 +0100 Subject: [PATCH] feat: improvements in steam backlog and brother printer --- steam_backlog_enforcer/hltb.py | 340 ++++++++++++++++++++++++++--- steam_backlog_enforcer/main.py | 193 +++++++++++++++- steam_backlog_enforcer/protondb.py | 187 ++++++++++++++++ steam_backlog_enforcer/run.sh | 7 + 4 files changed, 686 insertions(+), 41 deletions(-) create mode 100644 steam_backlog_enforcer/protondb.py create mode 100755 steam_backlog_enforcer/run.sh diff --git a/steam_backlog_enforcer/hltb.py b/steam_backlog_enforcer/hltb.py index 4e5b4b2..46c7a3f 100644 --- a/steam_backlog_enforcer/hltb.py +++ b/steam_backlog_enforcer/hltb.py @@ -1,22 +1,39 @@ -"""HowLongToBeat integration for estimating game completion times.""" +"""HowLongToBeat integration for estimating game completion times. + +Fetches completionist hour estimates from howlongtobeat.com with: +- direct API calls (bypassing the slow howlongtobeatpy per-request setup) +- single shared aiohttp session for all requests +- concurrent requests with configurable concurrency +- live progress reporting via callback +- incremental disk-cache saves so crashes don't lose work +""" from __future__ import annotations import asyncio -from dataclasses import dataclass +from collections.abc import Callable +from dataclasses import dataclass, field +from difflib import SequenceMatcher +from http import HTTPStatus import json import logging +import time -from howlongtobeatpy import HowLongToBeat +import aiohttp +from howlongtobeatpy.HTMLRequests import HTMLRequests from python_pkg.steam_backlog_enforcer.config import CONFIG_DIR logger = logging.getLogger(__name__) HLTB_CACHE_FILE = CONFIG_DIR / "hltb_cache.json" -MAX_CONCURRENT = 30 +MAX_CONCURRENT = 60 # parallel requests to HLTB +_SAVE_INTERVAL = 50 # flush cache to disk every N results MIN_SIMILARITY = 0.5 +# Type for progress callbacks: (done, total, found, game_name) +ProgressCb = Callable[[int, int, int, str], None] + @dataclass class HLTBResult: @@ -26,12 +43,21 @@ class HLTBResult: game_name: str completionist_hours: float similarity: float + hltb_game_id: int = 0 + + +HLTB_BASE_URL = "https://howlongtobeat.com" + + +# ────────────────────────────────────────────────────────────── +# Cache I/O +# ────────────────────────────────────────────────────────────── def load_hltb_cache() -> dict[int, float]: """Load the persistent HLTB cache from disk. - Returns: dict mapping app_id -> completionist_hours. + Returns: dict mapping app_id -> completionist_hours (-1 = no data on HLTB). """ if HLTB_CACHE_FILE.exists(): try: @@ -54,51 +80,270 @@ def save_hltb_cache(cache: dict[int, float]) -> None: logger.exception("Failed to save HLTB cache") +# ────────────────────────────────────────────────────────────── +# HLTB API setup (done once, not per-request like the library) +# ────────────────────────────────────────────────────────────── + + +def _get_hltb_search_url() -> str: + """Discover the current HLTB search API endpoint. + + Scrapes the homepage for JS bundles containing the fetch URL. + Falls back to ``/api/finder`` if extraction fails. + """ + try: + search_info = HTMLRequests.send_website_request_getcode( + parse_all_scripts=False, + ) + if search_info is None: + search_info = HTMLRequests.send_website_request_getcode( + parse_all_scripts=True, + ) + if search_info and search_info.search_url: + url: str = HTMLRequests.BASE_URL + search_info.search_url + return url + except Exception: # noqa: BLE001 + logger.debug("Failed to discover HLTB search URL, using default") + return "https://howlongtobeat.com/api/finder" + + +async def _get_auth_token( + search_url: str, + session: aiohttp.ClientSession, +) -> str | None: + """Fetch the HLTB auth token (one GET request).""" + init_url = search_url + "/init" + ts = int(time.time() * 1000) + headers = { + "User-Agent": ( + "Mozilla/5.0 (X11; Linux x86_64; rv:136.0)" " Gecko/20100101 Firefox/136.0" + ), + "referer": "https://howlongtobeat.com/", + } + try: + async with session.get( + init_url, + params={"t": ts}, + headers=headers, + ) as resp: + if resp.status == HTTPStatus.OK: + data = await resp.json() + token: str | None = data.get("token") + return token + except (aiohttp.ClientError, asyncio.TimeoutError): + logger.warning("Failed to get HLTB auth token") + return None + + +def _similarity(a: str, b: str) -> float: + """Case-insensitive SequenceMatcher ratio between two strings.""" + return SequenceMatcher(None, a.lower(), b.lower()).ratio() + + +def _build_search_payload(game_name: str) -> str: + """Build the JSON POST body for an HLTB search.""" + return json.dumps( + { + "searchType": "games", + "searchTerms": game_name.split(), + "searchPage": 1, + "size": 20, + "searchOptions": { + "games": { + "userId": 0, + "platform": "", + "sortCategory": "popular", + "rangeCategory": "main", + "rangeTime": {"min": 0, "max": 0}, + "gameplay": { + "perspective": "", + "flow": "", + "genre": "", + "difficulty": "", + }, + "rangeYear": {"max": "", "min": ""}, + "modifier": "", + }, + "users": {"sortCategory": "postcount"}, + "lists": {"sortCategory": "follows"}, + "filter": "", + "sort": 0, + "randomizer": 0, + }, + "useCache": True, + } + ) + + +# ────────────────────────────────────────────────────────────── +# Async fetching with shared session & progress +# ────────────────────────────────────────────────────────────── + + +@dataclass +class _SearchCtx: + """Shared context for HLTB search requests.""" + + session: aiohttp.ClientSession + search_url: str + headers: dict[str, str] + cache: dict[int, float] + counter: dict[str, int] = field(default_factory=dict) + total: int = 0 + progress_cb: ProgressCb | None = None + + async def _search_one( - sem: asyncio.Semaphore, app_id: int, name: str + sem: asyncio.Semaphore, + ctx: _SearchCtx, + app_id: int, + name: str, ) -> HLTBResult | None: - """Search HLTB for a single game.""" + """Search HLTB for one game via direct POST, update cache.""" async with sem: + result: HLTBResult | None = None + payload = _build_search_payload(name) try: - results = await HowLongToBeat().async_search(name) - if results: - best = max(results, key=lambda r: r.similarity) - if best.similarity >= MIN_SIMILARITY: - comp = best.completionist - if comp and comp > 0: - return HLTBResult( - app_id=app_id, - game_name=name, - completionist_hours=comp, - similarity=best.similarity, + async with ctx.session.post( + ctx.search_url, + headers=ctx.headers, + data=payload, + ) as resp: + if resp.status == HTTPStatus.OK: + data = await resp.json() + for entry in data.get("data", []): + entry_name = entry.get("game_name", "") + entry_alias = entry.get("game_alias", "") or "" + sim = max( + _similarity(name, entry_name), + _similarity(name, entry_alias), ) - except (OSError, ValueError, TypeError, AttributeError) as e: - logger.debug("HLTB search failed for '%s': %s", name, e) - return None + if sim >= MIN_SIMILARITY: + comp_100 = entry.get("comp_100", 0) + if comp_100 and comp_100 > 0: + hours = round(comp_100 / 3600, 2) + result = HLTBResult( + app_id=app_id, + game_name=name, + completionist_hours=hours, + similarity=sim, + hltb_game_id=entry.get("game_id", 0), + ) + break + except (aiohttp.ClientError, asyncio.TimeoutError) as exc: + logger.debug("HLTB search failed for '%s': %s", name, exc) + + # Update cache immediately (miss = -1). + if result is not None: + ctx.cache[app_id] = result.completionist_hours + ctx.counter["found"] += 1 + else: + ctx.cache[app_id] = -1 + + ctx.counter["done"] += 1 + done = ctx.counter["done"] + + # Incremental save every _SAVE_INTERVAL lookups. + if done % _SAVE_INTERVAL == 0: + save_hltb_cache(ctx.cache) + + # Report progress. + if ctx.progress_cb is not None: + ctx.progress_cb(done, ctx.total, ctx.counter["found"], name) + + return result async def _fetch_batch( games: list[tuple[int, str]], + cache: dict[int, float], + progress_cb: ProgressCb | None, ) -> list[HLTBResult]: - """Fetch HLTB data for a batch of games concurrently.""" + """Fetch HLTB data for a batch of games using one shared session.""" + # 1. Discover the search URL (sync, one-time). + search_url = _get_hltb_search_url() + logger.info("HLTB search URL: %s", search_url) + + timeout = aiohttp.ClientTimeout(total=20, sock_read=15) + + # 2. Get auth token (separate session — avoids reuse issues). + async with aiohttp.ClientSession(timeout=timeout) as init_session: + token = await _get_auth_token(search_url, init_session) + if token is None: + logger.warning("Could not get HLTB auth token, aborting fetch.") + return [] + logger.info("HLTB auth token acquired.") + + # 3. Build shared headers for all search requests. + headers = { + "content-type": "application/json", + "accept": "*/*", + "User-Agent": ( + "Mozilla/5.0 (X11; Linux x86_64; rv:136.0)" " Gecko/20100101 Firefox/136.0" + ), + "referer": "https://howlongtobeat.com/", + "x-auth-token": token, + } + + # 4. Fire all searches through a single persistent session. sem = asyncio.Semaphore(MAX_CONCURRENT) - tasks = [_search_one(sem, app_id, name) for app_id, name in games] - results = await asyncio.gather(*tasks) + counter = {"done": 0, "found": 0} + total = len(games) + + connector = aiohttp.TCPConnector( + limit=MAX_CONCURRENT, + keepalive_timeout=30, + ) + async with aiohttp.ClientSession( + timeout=timeout, + connector=connector, + ) as session: + ctx = _SearchCtx( + session=session, + search_url=search_url, + headers=headers, + cache=cache, + counter=counter, + total=total, + progress_cb=progress_cb, + ) + tasks = [ + _search_one( + sem, + ctx, + app_id, + name, + ) + for app_id, name in games + ] + results = await asyncio.gather(*tasks) + return [r for r in results if r is not None] -def fetch_hltb_times(games: list[tuple[int, str]]) -> list[HLTBResult]: +def fetch_hltb_times( + games: list[tuple[int, str]], + cache: dict[int, float] | None = None, + progress_cb: ProgressCb | None = None, +) -> list[HLTBResult]: """Synchronous wrapper: fetch HLTB times for games.""" if not games: return [] - return asyncio.run(_fetch_batch(games)) + if cache is None: + cache = {} + return asyncio.run(_fetch_batch(games, cache, progress_cb)) def fetch_hltb_times_cached( games: list[tuple[int, str]], + progress_cb: ProgressCb | None = None, ) -> dict[int, float]: """Fetch HLTB times, using disk cache for already-known games. + Args: + games: list of (app_id, name) tuples to look up. + progress_cb: optional callback(done, total, found, game_name). + Returns: dict mapping app_id -> completionist_hours. """ cache = load_hltb_cache() @@ -106,20 +351,43 @@ def fetch_hltb_times_cached( if uncached: logger.info( - "Fetching HLTB data for %d uncached games (out of %d total)...", + "Fetching HLTB data for %d uncached games (%d cached)...", len(uncached), - len(games), + len(games) - len(uncached), ) - results = fetch_hltb_times(uncached) - for r in results: - cache[r.app_id] = r.completionist_hours - # Also cache misses as -1 so we don't re-fetch them. - found_ids = {r.app_id for r in results} - for app_id, _ in uncached: - if app_id not in found_ids: - cache[app_id] = -1 + t0 = time.monotonic() + fetch_hltb_times(uncached, cache=cache, progress_cb=progress_cb) + elapsed = time.monotonic() - t0 + + # Final save. save_hltb_cache(cache) + + found = sum(1 for aid, _ in uncached if cache.get(aid, -1) > 0) + rate = len(uncached) / elapsed if elapsed > 0 else 0 + logger.info( + "HLTB fetch done: %d/%d found in %.1fs (%.0f games/s)", + found, + len(uncached), + elapsed, + rate, + ) else: logger.info("All %d games found in HLTB cache.", len(games)) return cache + + +def get_hltb_submit_url(game_name: str) -> str | None: + """Look up a game on HLTB and return its submit page URL. + + Args: + game_name: Name of the game to search for. + + Returns: + URL like ``https://howlongtobeat.com/submit/game/12345``, + or ``None`` if the game wasn't found. + """ + results = fetch_hltb_times([(0, game_name)]) + if results and results[0].hltb_game_id: + return f"{HLTB_BASE_URL}/submit/game/{results[0].hltb_game_id}" + return None diff --git a/steam_backlog_enforcer/main.py b/steam_backlog_enforcer/main.py index 7e0fd0e..12af073 100644 --- a/steam_backlog_enforcer/main.py +++ b/steam_backlog_enforcer/main.py @@ -25,12 +25,19 @@ from python_pkg.steam_backlog_enforcer.enforcer import ( enforce_allowed_game, send_notification, ) -from python_pkg.steam_backlog_enforcer.hltb import fetch_hltb_times_cached +from python_pkg.steam_backlog_enforcer.hltb import ( + fetch_hltb_times_cached, + get_hltb_submit_url, +) from python_pkg.steam_backlog_enforcer.library_hider import ( hide_other_games, restart_steam, unhide_all_games, ) +from python_pkg.steam_backlog_enforcer.protondb import ( + ProtonDBRating, + fetch_protondb_ratings, +) from python_pkg.steam_backlog_enforcer.steam_api import GameInfo, SteamAPIClient from python_pkg.steam_backlog_enforcer.store_blocker import ( block_store, @@ -411,7 +418,21 @@ def do_scan(config: Config, state: State) -> list[GameInfo]: incomplete = [(g.app_id, g.name) for g in games if not g.is_complete] if incomplete: _echo(f"Fetching HLTB completion times for {len(incomplete)} games...") - hltb_cache = fetch_hltb_times_cached(incomplete) + + def hltb_progress(done: int, total: int, found: int, name: str) -> None: + pct = done * 100 // total + bar_w = 30 + filled = bar_w * done // total + bar = "█" * filled + "░" * (bar_w - filled) + _echo( + f"\r HLTB [{bar}] {done}/{total} ({pct}%) " + f"| {found} found | {name[:30]:<30s}", + end="", + flush=True, + ) + + hltb_cache = fetch_hltb_times_cached(incomplete, progress_cb=hltb_progress) + _echo("") # newline after progress bar for g in games: hours = hltb_cache.get(g.app_id, -1) g.completionist_hours = hours @@ -432,8 +453,52 @@ def do_scan(config: Config, state: State) -> list[GameInfo]: return games +# How many candidates to check per ProtonDB batch. +_PROTONDB_BATCH_SIZE = 20 + + +def _pick_playable_candidate( + candidates: list[GameInfo], +) -> GameInfo | None: + """Return the first candidate with an acceptable ProtonDB rating. + + Checks candidates in batches (sorted by HLTB hours, shortest first). + Games rated silver-or-worse, or gold-trending-down, are skipped. + """ + offset = 0 + while offset < len(candidates): + batch = candidates[offset : offset + _PROTONDB_BATCH_SIZE] + app_ids = [g.app_id for g in batch] + ratings = fetch_protondb_ratings(app_ids) + + for game in batch: + rating = ratings.get(game.app_id, ProtonDBRating(app_id=game.app_id)) + if rating.is_playable: + if offset > 0 or game is not batch[0]: + _echo( + f" Skipped {offset + batch.index(game)} game(s) " + f"with poor Linux compatibility" + ) + return game + logger.info( + "Skipping %s (AppID=%d): ProtonDB %s (trending %s)", + game.name, + game.app_id, + rating.tier, + rating.trending_tier, + ) + + offset += _PROTONDB_BATCH_SIZE + + return None + + def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None: - """Select the next game: shortest completionist time first.""" + """Select the next game: shortest completionist time first. + + Games with silver-or-worse ProtonDB ratings (or gold trending + downward) are automatically skipped as unplayable on Linux. + """ skip = set(config.skip_app_ids) | set(state.finished_app_ids) candidates = [g for g in games if not g.is_complete and g.app_id not in skip] @@ -451,7 +516,16 @@ def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None: return (1, g.name.lower().encode().hex().__hash__()) candidates.sort(key=sort_key) - chosen = candidates[0] + + # Filter out Linux-incompatible games via ProtonDB. + chosen = _pick_playable_candidate(candidates) + + if chosen is None: + _echo("\nNo playable games left (all have poor ProtonDB ratings)!") + state.current_app_id = None + state.current_game_name = "" + state.save() + return state.current_app_id = chosen.app_id state.current_game_name = chosen.name @@ -466,7 +540,12 @@ def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None: f" ({chosen.completion_pct:.1f}%)" ) - # Auto-install the newly assigned game. + # Uninstall all other games first, then auto-install the assigned one. + if config.uninstall_other_games: + count = uninstall_other_games(chosen.app_id) + if count: + _echo(f"\n Uninstalled {count} non-assigned games") + if not is_game_installed(chosen.app_id): _echo(f"\n Auto-installing {chosen.name}...") install_game(chosen.app_id, chosen.name, config.steam_id) @@ -986,6 +1065,109 @@ def cmd_unhide(config: Config, _state: State) -> None: _echo("Done!") +def _open_hltb_submit_page( + game_name: str, + app_id: int, + snapshot_data: list[dict[str, Any]] | None, +) -> None: + """Show playtime and open the HLTB submit page in the browser.""" + playtime_minutes = 0 + if snapshot_data: + for entry in snapshot_data: + if entry.get("app_id") == app_id: + playtime_minutes = entry.get("playtime_minutes", 0) + break + + playtime_h = playtime_minutes / 60 + _echo(f"\n Steam playtime: {playtime_h:.1f} hours") + + _echo(" Looking up game on HowLongToBeat...") + submit_url = get_hltb_submit_url(game_name) + if submit_url: + _echo(f" HLTB submit page: {submit_url}") + _echo(" Opening in browser (log in & submit your time)...") + import webbrowser + + webbrowser.open(submit_url) + else: + _echo(" Could not find game on HLTB (submit manually).") + + +def cmd_done(config: Config, state: State) -> None: + """Check completion, open HLTB submit, pick next game, uninstall & hide. + + All-in-one command for after finishing a game: + 1. Verify 100% achievements on Steam. + 2. Show playtime and open HLTB submit page in browser. + 3. Pick the next game (shortest HLTB 100% time). + 4. Uninstall all non-assigned games. + 5. Hide all non-assigned games in the Steam library. + 6. Install the newly assigned game. + """ + if state.current_app_id is None: + _echo("No game currently assigned. Run 'scan' first.") + return + + client = SteamAPIClient(config.steam_api_key, config.steam_id) + game_name = state.current_game_name + app_id = state.current_app_id + + _echo(f"Checking {game_name} (AppID={app_id})...") + game = client.refresh_single_game(app_id, game_name) + if game is None: + _echo(" Could not fetch achievement data from Steam.") + return + + _echo( + f" Progress: {game.unlocked_achievements}/{game.total_achievements}" + f" ({game.completion_pct:.1f}%)" + ) + + if not game.is_complete: + remaining = game.total_achievements - game.unlocked_achievements + _echo(f"\n NOT COMPLETE: {remaining} achievements remaining. Keep going!") + return + + # ── Step 1: Mark complete ── + _echo(f"\n COMPLETED: {game_name}!") + state.finished_app_ids.append(app_id) + + # ── Step 2: HLTB submit ── + snapshot_data = load_snapshot() + _open_hltb_submit_page(game_name, app_id, snapshot_data) + + # ── Step 3: Pick next game ── + _echo("\nPicking next game...") + if not snapshot_data: + _echo(" No snapshot found. Run 'scan' first.") + state.current_app_id = None + state.current_game_name = "" + state.save() + return + + games = [GameInfo.from_snapshot(d) for d in snapshot_data] + pick_next_game(games, state, config) + + if state.current_app_id is None: + _echo(" No more games to assign!") + return + + # ── Step 4: Hide non-assigned games in library ── + owned_ids = _get_all_owned_app_ids(config) + if owned_ids: + hidden = hide_other_games(owned_ids, state.current_app_id) + if hidden > 0: + _echo(f"\n Library: hid {hidden} games") + restart_steam() + _echo(" Steam restarted to apply library changes.") + + send_notification( + "Game Complete!", + f"Finished {game_name}! Now playing: {state.current_game_name}", + ) + _echo(f"\nAll done! Go play {state.current_game_name}!") + + COMMANDS = { "scan": ("Scan library & assign a game", do_scan), "check": ("Check assigned game completion", do_check), @@ -1001,6 +1183,7 @@ COMMANDS = { "installed": ("List installed games", cmd_installed), "uninstall": ("Uninstall all non-assigned games", cmd_uninstall), "setup": ("Run first-time setup", cmd_setup), + "done": ("Finish game, open HLTB, pick next", cmd_done), } diff --git a/steam_backlog_enforcer/protondb.py b/steam_backlog_enforcer/protondb.py new file mode 100644 index 0000000..3638445 --- /dev/null +++ b/steam_backlog_enforcer/protondb.py @@ -0,0 +1,187 @@ +"""ProtonDB integration for Linux compatibility ratings. + +Fetches game compatibility tiers from ProtonDB's public API to filter out +games that don't work well on Linux. Ratings are cached locally so repeated +lookups are free. + +Tier hierarchy (best → worst): native, platinum, gold, silver, bronze, borked. +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +import json +import logging +from typing import Any + +import aiohttp + +from python_pkg.steam_backlog_enforcer.config import CONFIG_DIR + +logger = logging.getLogger(__name__) + +PROTONDB_CACHE_FILE = CONFIG_DIR / "protondb_cache.json" +_PROTONDB_API = "https://www.protondb.com/api/v1/reports/summaries/{app_id}.json" +MAX_CONCURRENT = 30 # parallel requests - be polite to the CDN + +# Tier ordering from best to worst. +TIER_ORDER: dict[str, int] = { + "native": 0, + "platinum": 1, + "gold": 2, + "silver": 3, + "bronze": 4, + "borked": 5, + "pending": 6, +} + +# Games at or below this tier are skipped. +MIN_PLAYABLE_TIER = "gold" + + +@dataclass +class ProtonDBRating: + """ProtonDB compatibility rating for a game.""" + + app_id: int + tier: str = "" + trending_tier: str = "" + score: float = 0.0 + confidence: str = "" + total_reports: int = 0 + + @property + def is_playable(self) -> bool: + """True if the game has at least gold-tier compatibility. + + A game is considered unplayable when: + - Its tier is silver, bronze, or borked. + - Its tier is gold but trending to silver or worse. + - No data exists (unknown compatibility). + """ + if not self.tier: + return True # No data → don't block; user can skip manually. + tier_rank = TIER_ORDER.get(self.tier, 99) + min_rank = TIER_ORDER[MIN_PLAYABLE_TIER] + + if tier_rank > min_rank: + # Silver, bronze, borked → skip. + return False + + if tier_rank == min_rank and self.trending_tier: + # Gold but trending silver/bronze/borked → skip. + trend_rank = TIER_ORDER.get(self.trending_tier, 99) + if trend_rank > min_rank: + return False + + return True + + +def _load_cache() -> dict[str, Any]: + """Load the on-disk ProtonDB cache.""" + if PROTONDB_CACHE_FILE.exists(): + return json.loads(PROTONDB_CACHE_FILE.read_text(encoding="utf-8")) # type: ignore[no-any-return] + return {} + + +def _save_cache(cache: dict[str, Any]) -> None: + """Persist the ProtonDB cache.""" + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + PROTONDB_CACHE_FILE.write_text(json.dumps(cache, indent=2) + "\n", encoding="utf-8") + + +async def _fetch_one( + session: aiohttp.ClientSession, + sem: asyncio.Semaphore, + app_id: int, +) -> ProtonDBRating: + """Fetch a single game's ProtonDB rating.""" + url = _PROTONDB_API.format(app_id=app_id) + async with sem: + try: + async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as r: + if r.status == 404: # noqa: PLR2004 + return ProtonDBRating(app_id=app_id) + r.raise_for_status() + data = await r.json(content_type=None) + return ProtonDBRating( + app_id=app_id, + tier=data.get("tier", ""), + trending_tier=data.get("trendingTier", ""), + score=data.get("score", 0.0), + confidence=data.get("confidence", ""), + total_reports=data.get("total", 0), + ) + except Exception: # noqa: BLE001 + logger.warning("ProtonDB fetch failed for AppID=%d", app_id) + return ProtonDBRating(app_id=app_id) + + +async def _fetch_batch(app_ids: list[int]) -> list[ProtonDBRating]: + """Fetch ProtonDB ratings for a batch of app IDs concurrently.""" + sem = asyncio.Semaphore(MAX_CONCURRENT) + async with aiohttp.ClientSession() as session: + tasks = [_fetch_one(session, sem, aid) for aid in app_ids] + return await asyncio.gather(*tasks) + + +def _rating_to_dict(r: ProtonDBRating) -> dict[str, Any]: + """Serialize a rating to a cache-friendly dict.""" + return { + "tier": r.tier, + "trending_tier": r.trending_tier, + "score": r.score, + "confidence": r.confidence, + "total_reports": r.total_reports, + } + + +def _rating_from_cache(app_id: int, data: dict[str, Any]) -> ProtonDBRating: + """Deserialize a rating from cached data.""" + return ProtonDBRating( + app_id=app_id, + tier=data.get("tier", ""), + trending_tier=data.get("trending_tier", ""), + score=data.get("score", 0.0), + confidence=data.get("confidence", ""), + total_reports=data.get("total_reports", 0), + ) + + +def fetch_protondb_ratings( + app_ids: list[int], +) -> dict[int, ProtonDBRating]: + """Fetch ProtonDB ratings with local caching. + + Returns a dict mapping app_id → ProtonDBRating for every requested ID. + Cached results are reused; only missing IDs are fetched from the network. + """ + cache = _load_cache() + + # Separate cached vs. uncached. + results: dict[int, ProtonDBRating] = {} + to_fetch: list[int] = [] + for aid in app_ids: + key = str(aid) + if key in cache: + results[aid] = _rating_from_cache(aid, cache[key]) + else: + to_fetch.append(aid) + + if to_fetch: + logger.info( + "Fetching ProtonDB ratings for %d games (%d cached)...", + len(to_fetch), + len(results), + ) + fetched = asyncio.run(_fetch_batch(to_fetch)) + for r in fetched: + results[r.app_id] = r + cache[str(r.app_id)] = _rating_to_dict(r) + _save_cache(cache) + logger.info("ProtonDB: fetched %d, total cached %d", len(fetched), len(cache)) + else: + logger.info("All %d ProtonDB ratings found in cache.", len(results)) + + return results diff --git a/steam_backlog_enforcer/run.sh b/steam_backlog_enforcer/run.sh new file mode 100755 index 0000000..b99c6b4 --- /dev/null +++ b/steam_backlog_enforcer/run.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +# Quick launcher for the "done" workflow: +# check completion → open HLTB → pick next game → uninstall & hide others +set -euo pipefail + +cd "$(dirname "$0")/../.." +exec python -m python_pkg.steam_backlog_enforcer.main "done"