feat: improvements in steam backlog and brother printer

This commit is contained in:
Krzysztof kuhy Rudnicki 2026-03-04 22:40:49 +01:00
parent 0447459dab
commit b18a50d3a9
4 changed files with 686 additions and 41 deletions

View File

@ -1,22 +1,39 @@
"""HowLongToBeat integration for estimating game completion times."""
"""HowLongToBeat integration for estimating game completion times.
Fetches completionist hour estimates from howlongtobeat.com with:
- direct API calls (bypassing the slow howlongtobeatpy per-request setup)
- single shared aiohttp session for all requests
- concurrent requests with configurable concurrency
- live progress reporting via callback
- incremental disk-cache saves so crashes don't lose work
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from collections.abc import Callable
from dataclasses import dataclass, field
from difflib import SequenceMatcher
from http import HTTPStatus
import json
import logging
import time
from howlongtobeatpy import HowLongToBeat
import aiohttp
from howlongtobeatpy.HTMLRequests import HTMLRequests
from python_pkg.steam_backlog_enforcer.config import CONFIG_DIR
logger = logging.getLogger(__name__)
HLTB_CACHE_FILE = CONFIG_DIR / "hltb_cache.json"
MAX_CONCURRENT = 30
MAX_CONCURRENT = 60 # parallel requests to HLTB
_SAVE_INTERVAL = 50 # flush cache to disk every N results
MIN_SIMILARITY = 0.5
# Type for progress callbacks: (done, total, found, game_name)
ProgressCb = Callable[[int, int, int, str], None]
@dataclass
class HLTBResult:
@ -26,12 +43,21 @@ class HLTBResult:
game_name: str
completionist_hours: float
similarity: float
hltb_game_id: int = 0
HLTB_BASE_URL = "https://howlongtobeat.com"
# ──────────────────────────────────────────────────────────────
# Cache I/O
# ──────────────────────────────────────────────────────────────
def load_hltb_cache() -> dict[int, float]:
"""Load the persistent HLTB cache from disk.
Returns: dict mapping app_id -> completionist_hours.
Returns: dict mapping app_id -> completionist_hours (-1 = no data on HLTB).
"""
if HLTB_CACHE_FILE.exists():
try:
@ -54,51 +80,270 @@ def save_hltb_cache(cache: dict[int, float]) -> None:
logger.exception("Failed to save HLTB cache")
# ──────────────────────────────────────────────────────────────
# HLTB API setup (done once, not per-request like the library)
# ──────────────────────────────────────────────────────────────
def _get_hltb_search_url() -> str:
"""Discover the current HLTB search API endpoint.
Scrapes the homepage for JS bundles containing the fetch URL.
Falls back to ``/api/finder`` if extraction fails.
"""
try:
search_info = HTMLRequests.send_website_request_getcode(
parse_all_scripts=False,
)
if search_info is None:
search_info = HTMLRequests.send_website_request_getcode(
parse_all_scripts=True,
)
if search_info and search_info.search_url:
url: str = HTMLRequests.BASE_URL + search_info.search_url
return url
except Exception: # noqa: BLE001
logger.debug("Failed to discover HLTB search URL, using default")
return "https://howlongtobeat.com/api/finder"
async def _get_auth_token(
search_url: str,
session: aiohttp.ClientSession,
) -> str | None:
"""Fetch the HLTB auth token (one GET request)."""
init_url = search_url + "/init"
ts = int(time.time() * 1000)
headers = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64; rv:136.0)" " Gecko/20100101 Firefox/136.0"
),
"referer": "https://howlongtobeat.com/",
}
try:
async with session.get(
init_url,
params={"t": ts},
headers=headers,
) as resp:
if resp.status == HTTPStatus.OK:
data = await resp.json()
token: str | None = data.get("token")
return token
except (aiohttp.ClientError, asyncio.TimeoutError):
logger.warning("Failed to get HLTB auth token")
return None
def _similarity(a: str, b: str) -> float:
"""Case-insensitive SequenceMatcher ratio between two strings."""
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
def _build_search_payload(game_name: str) -> str:
"""Build the JSON POST body for an HLTB search."""
return json.dumps(
{
"searchType": "games",
"searchTerms": game_name.split(),
"searchPage": 1,
"size": 20,
"searchOptions": {
"games": {
"userId": 0,
"platform": "",
"sortCategory": "popular",
"rangeCategory": "main",
"rangeTime": {"min": 0, "max": 0},
"gameplay": {
"perspective": "",
"flow": "",
"genre": "",
"difficulty": "",
},
"rangeYear": {"max": "", "min": ""},
"modifier": "",
},
"users": {"sortCategory": "postcount"},
"lists": {"sortCategory": "follows"},
"filter": "",
"sort": 0,
"randomizer": 0,
},
"useCache": True,
}
)
# ──────────────────────────────────────────────────────────────
# Async fetching with shared session & progress
# ──────────────────────────────────────────────────────────────
@dataclass
class _SearchCtx:
"""Shared context for HLTB search requests."""
session: aiohttp.ClientSession
search_url: str
headers: dict[str, str]
cache: dict[int, float]
counter: dict[str, int] = field(default_factory=dict)
total: int = 0
progress_cb: ProgressCb | None = None
async def _search_one(
sem: asyncio.Semaphore, app_id: int, name: str
sem: asyncio.Semaphore,
ctx: _SearchCtx,
app_id: int,
name: str,
) -> HLTBResult | None:
"""Search HLTB for a single game."""
"""Search HLTB for one game via direct POST, update cache."""
async with sem:
result: HLTBResult | None = None
payload = _build_search_payload(name)
try:
results = await HowLongToBeat().async_search(name)
if results:
best = max(results, key=lambda r: r.similarity)
if best.similarity >= MIN_SIMILARITY:
comp = best.completionist
if comp and comp > 0:
return HLTBResult(
app_id=app_id,
game_name=name,
completionist_hours=comp,
similarity=best.similarity,
async with ctx.session.post(
ctx.search_url,
headers=ctx.headers,
data=payload,
) as resp:
if resp.status == HTTPStatus.OK:
data = await resp.json()
for entry in data.get("data", []):
entry_name = entry.get("game_name", "")
entry_alias = entry.get("game_alias", "") or ""
sim = max(
_similarity(name, entry_name),
_similarity(name, entry_alias),
)
except (OSError, ValueError, TypeError, AttributeError) as e:
logger.debug("HLTB search failed for '%s': %s", name, e)
return None
if sim >= MIN_SIMILARITY:
comp_100 = entry.get("comp_100", 0)
if comp_100 and comp_100 > 0:
hours = round(comp_100 / 3600, 2)
result = HLTBResult(
app_id=app_id,
game_name=name,
completionist_hours=hours,
similarity=sim,
hltb_game_id=entry.get("game_id", 0),
)
break
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
logger.debug("HLTB search failed for '%s': %s", name, exc)
# Update cache immediately (miss = -1).
if result is not None:
ctx.cache[app_id] = result.completionist_hours
ctx.counter["found"] += 1
else:
ctx.cache[app_id] = -1
ctx.counter["done"] += 1
done = ctx.counter["done"]
# Incremental save every _SAVE_INTERVAL lookups.
if done % _SAVE_INTERVAL == 0:
save_hltb_cache(ctx.cache)
# Report progress.
if ctx.progress_cb is not None:
ctx.progress_cb(done, ctx.total, ctx.counter["found"], name)
return result
async def _fetch_batch(
games: list[tuple[int, str]],
cache: dict[int, float],
progress_cb: ProgressCb | None,
) -> list[HLTBResult]:
"""Fetch HLTB data for a batch of games concurrently."""
"""Fetch HLTB data for a batch of games using one shared session."""
# 1. Discover the search URL (sync, one-time).
search_url = _get_hltb_search_url()
logger.info("HLTB search URL: %s", search_url)
timeout = aiohttp.ClientTimeout(total=20, sock_read=15)
# 2. Get auth token (separate session — avoids reuse issues).
async with aiohttp.ClientSession(timeout=timeout) as init_session:
token = await _get_auth_token(search_url, init_session)
if token is None:
logger.warning("Could not get HLTB auth token, aborting fetch.")
return []
logger.info("HLTB auth token acquired.")
# 3. Build shared headers for all search requests.
headers = {
"content-type": "application/json",
"accept": "*/*",
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64; rv:136.0)" " Gecko/20100101 Firefox/136.0"
),
"referer": "https://howlongtobeat.com/",
"x-auth-token": token,
}
# 4. Fire all searches through a single persistent session.
sem = asyncio.Semaphore(MAX_CONCURRENT)
tasks = [_search_one(sem, app_id, name) for app_id, name in games]
results = await asyncio.gather(*tasks)
counter = {"done": 0, "found": 0}
total = len(games)
connector = aiohttp.TCPConnector(
limit=MAX_CONCURRENT,
keepalive_timeout=30,
)
async with aiohttp.ClientSession(
timeout=timeout,
connector=connector,
) as session:
ctx = _SearchCtx(
session=session,
search_url=search_url,
headers=headers,
cache=cache,
counter=counter,
total=total,
progress_cb=progress_cb,
)
tasks = [
_search_one(
sem,
ctx,
app_id,
name,
)
for app_id, name in games
]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
def fetch_hltb_times(games: list[tuple[int, str]]) -> list[HLTBResult]:
def fetch_hltb_times(
games: list[tuple[int, str]],
cache: dict[int, float] | None = None,
progress_cb: ProgressCb | None = None,
) -> list[HLTBResult]:
"""Synchronous wrapper: fetch HLTB times for games."""
if not games:
return []
return asyncio.run(_fetch_batch(games))
if cache is None:
cache = {}
return asyncio.run(_fetch_batch(games, cache, progress_cb))
def fetch_hltb_times_cached(
games: list[tuple[int, str]],
progress_cb: ProgressCb | None = None,
) -> dict[int, float]:
"""Fetch HLTB times, using disk cache for already-known games.
Args:
games: list of (app_id, name) tuples to look up.
progress_cb: optional callback(done, total, found, game_name).
Returns: dict mapping app_id -> completionist_hours.
"""
cache = load_hltb_cache()
@ -106,20 +351,43 @@ def fetch_hltb_times_cached(
if uncached:
logger.info(
"Fetching HLTB data for %d uncached games (out of %d total)...",
"Fetching HLTB data for %d uncached games (%d cached)...",
len(uncached),
len(games),
len(games) - len(uncached),
)
results = fetch_hltb_times(uncached)
for r in results:
cache[r.app_id] = r.completionist_hours
# Also cache misses as -1 so we don't re-fetch them.
found_ids = {r.app_id for r in results}
for app_id, _ in uncached:
if app_id not in found_ids:
cache[app_id] = -1
t0 = time.monotonic()
fetch_hltb_times(uncached, cache=cache, progress_cb=progress_cb)
elapsed = time.monotonic() - t0
# Final save.
save_hltb_cache(cache)
found = sum(1 for aid, _ in uncached if cache.get(aid, -1) > 0)
rate = len(uncached) / elapsed if elapsed > 0 else 0
logger.info(
"HLTB fetch done: %d/%d found in %.1fs (%.0f games/s)",
found,
len(uncached),
elapsed,
rate,
)
else:
logger.info("All %d games found in HLTB cache.", len(games))
return cache
def get_hltb_submit_url(game_name: str) -> str | None:
"""Look up a game on HLTB and return its submit page URL.
Args:
game_name: Name of the game to search for.
Returns:
URL like ``https://howlongtobeat.com/submit/game/12345``,
or ``None`` if the game wasn't found.
"""
results = fetch_hltb_times([(0, game_name)])
if results and results[0].hltb_game_id:
return f"{HLTB_BASE_URL}/submit/game/{results[0].hltb_game_id}"
return None

View File

@ -25,12 +25,19 @@ from python_pkg.steam_backlog_enforcer.enforcer import (
enforce_allowed_game,
send_notification,
)
from python_pkg.steam_backlog_enforcer.hltb import fetch_hltb_times_cached
from python_pkg.steam_backlog_enforcer.hltb import (
fetch_hltb_times_cached,
get_hltb_submit_url,
)
from python_pkg.steam_backlog_enforcer.library_hider import (
hide_other_games,
restart_steam,
unhide_all_games,
)
from python_pkg.steam_backlog_enforcer.protondb import (
ProtonDBRating,
fetch_protondb_ratings,
)
from python_pkg.steam_backlog_enforcer.steam_api import GameInfo, SteamAPIClient
from python_pkg.steam_backlog_enforcer.store_blocker import (
block_store,
@ -411,7 +418,21 @@ def do_scan(config: Config, state: State) -> list[GameInfo]:
incomplete = [(g.app_id, g.name) for g in games if not g.is_complete]
if incomplete:
_echo(f"Fetching HLTB completion times for {len(incomplete)} games...")
hltb_cache = fetch_hltb_times_cached(incomplete)
def hltb_progress(done: int, total: int, found: int, name: str) -> None:
pct = done * 100 // total
bar_w = 30
filled = bar_w * done // total
bar = "" * filled + "" * (bar_w - filled)
_echo(
f"\r HLTB [{bar}] {done}/{total} ({pct}%) "
f"| {found} found | {name[:30]:<30s}",
end="",
flush=True,
)
hltb_cache = fetch_hltb_times_cached(incomplete, progress_cb=hltb_progress)
_echo("") # newline after progress bar
for g in games:
hours = hltb_cache.get(g.app_id, -1)
g.completionist_hours = hours
@ -432,8 +453,52 @@ def do_scan(config: Config, state: State) -> list[GameInfo]:
return games
# How many candidates to check per ProtonDB batch.
_PROTONDB_BATCH_SIZE = 20
def _pick_playable_candidate(
candidates: list[GameInfo],
) -> GameInfo | None:
"""Return the first candidate with an acceptable ProtonDB rating.
Checks candidates in batches (sorted by HLTB hours, shortest first).
Games rated silver-or-worse, or gold-trending-down, are skipped.
"""
offset = 0
while offset < len(candidates):
batch = candidates[offset : offset + _PROTONDB_BATCH_SIZE]
app_ids = [g.app_id for g in batch]
ratings = fetch_protondb_ratings(app_ids)
for game in batch:
rating = ratings.get(game.app_id, ProtonDBRating(app_id=game.app_id))
if rating.is_playable:
if offset > 0 or game is not batch[0]:
_echo(
f" Skipped {offset + batch.index(game)} game(s) "
f"with poor Linux compatibility"
)
return game
logger.info(
"Skipping %s (AppID=%d): ProtonDB %s (trending %s)",
game.name,
game.app_id,
rating.tier,
rating.trending_tier,
)
offset += _PROTONDB_BATCH_SIZE
return None
def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None:
"""Select the next game: shortest completionist time first."""
"""Select the next game: shortest completionist time first.
Games with silver-or-worse ProtonDB ratings (or gold trending
downward) are automatically skipped as unplayable on Linux.
"""
skip = set(config.skip_app_ids) | set(state.finished_app_ids)
candidates = [g for g in games if not g.is_complete and g.app_id not in skip]
@ -451,7 +516,16 @@ def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None:
return (1, g.name.lower().encode().hex().__hash__())
candidates.sort(key=sort_key)
chosen = candidates[0]
# Filter out Linux-incompatible games via ProtonDB.
chosen = _pick_playable_candidate(candidates)
if chosen is None:
_echo("\nNo playable games left (all have poor ProtonDB ratings)!")
state.current_app_id = None
state.current_game_name = ""
state.save()
return
state.current_app_id = chosen.app_id
state.current_game_name = chosen.name
@ -466,7 +540,12 @@ def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None:
f" ({chosen.completion_pct:.1f}%)"
)
# Auto-install the newly assigned game.
# Uninstall all other games first, then auto-install the assigned one.
if config.uninstall_other_games:
count = uninstall_other_games(chosen.app_id)
if count:
_echo(f"\n Uninstalled {count} non-assigned games")
if not is_game_installed(chosen.app_id):
_echo(f"\n Auto-installing {chosen.name}...")
install_game(chosen.app_id, chosen.name, config.steam_id)
@ -986,6 +1065,109 @@ def cmd_unhide(config: Config, _state: State) -> None:
_echo("Done!")
def _open_hltb_submit_page(
game_name: str,
app_id: int,
snapshot_data: list[dict[str, Any]] | None,
) -> None:
"""Show playtime and open the HLTB submit page in the browser."""
playtime_minutes = 0
if snapshot_data:
for entry in snapshot_data:
if entry.get("app_id") == app_id:
playtime_minutes = entry.get("playtime_minutes", 0)
break
playtime_h = playtime_minutes / 60
_echo(f"\n Steam playtime: {playtime_h:.1f} hours")
_echo(" Looking up game on HowLongToBeat...")
submit_url = get_hltb_submit_url(game_name)
if submit_url:
_echo(f" HLTB submit page: {submit_url}")
_echo(" Opening in browser (log in & submit your time)...")
import webbrowser
webbrowser.open(submit_url)
else:
_echo(" Could not find game on HLTB (submit manually).")
def cmd_done(config: Config, state: State) -> None:
"""Check completion, open HLTB submit, pick next game, uninstall & hide.
All-in-one command for after finishing a game:
1. Verify 100% achievements on Steam.
2. Show playtime and open HLTB submit page in browser.
3. Pick the next game (shortest HLTB 100% time).
4. Uninstall all non-assigned games.
5. Hide all non-assigned games in the Steam library.
6. Install the newly assigned game.
"""
if state.current_app_id is None:
_echo("No game currently assigned. Run 'scan' first.")
return
client = SteamAPIClient(config.steam_api_key, config.steam_id)
game_name = state.current_game_name
app_id = state.current_app_id
_echo(f"Checking {game_name} (AppID={app_id})...")
game = client.refresh_single_game(app_id, game_name)
if game is None:
_echo(" Could not fetch achievement data from Steam.")
return
_echo(
f" Progress: {game.unlocked_achievements}/{game.total_achievements}"
f" ({game.completion_pct:.1f}%)"
)
if not game.is_complete:
remaining = game.total_achievements - game.unlocked_achievements
_echo(f"\n NOT COMPLETE: {remaining} achievements remaining. Keep going!")
return
# ── Step 1: Mark complete ──
_echo(f"\n COMPLETED: {game_name}!")
state.finished_app_ids.append(app_id)
# ── Step 2: HLTB submit ──
snapshot_data = load_snapshot()
_open_hltb_submit_page(game_name, app_id, snapshot_data)
# ── Step 3: Pick next game ──
_echo("\nPicking next game...")
if not snapshot_data:
_echo(" No snapshot found. Run 'scan' first.")
state.current_app_id = None
state.current_game_name = ""
state.save()
return
games = [GameInfo.from_snapshot(d) for d in snapshot_data]
pick_next_game(games, state, config)
if state.current_app_id is None:
_echo(" No more games to assign!")
return
# ── Step 4: Hide non-assigned games in library ──
owned_ids = _get_all_owned_app_ids(config)
if owned_ids:
hidden = hide_other_games(owned_ids, state.current_app_id)
if hidden > 0:
_echo(f"\n Library: hid {hidden} games")
restart_steam()
_echo(" Steam restarted to apply library changes.")
send_notification(
"Game Complete!",
f"Finished {game_name}! Now playing: {state.current_game_name}",
)
_echo(f"\nAll done! Go play {state.current_game_name}!")
COMMANDS = {
"scan": ("Scan library & assign a game", do_scan),
"check": ("Check assigned game completion", do_check),
@ -1001,6 +1183,7 @@ COMMANDS = {
"installed": ("List installed games", cmd_installed),
"uninstall": ("Uninstall all non-assigned games", cmd_uninstall),
"setup": ("Run first-time setup", cmd_setup),
"done": ("Finish game, open HLTB, pick next", cmd_done),
}

View File

@ -0,0 +1,187 @@
"""ProtonDB integration for Linux compatibility ratings.
Fetches game compatibility tiers from ProtonDB's public API to filter out
games that don't work well on Linux. Ratings are cached locally so repeated
lookups are free.
Tier hierarchy (best worst): native, platinum, gold, silver, bronze, borked.
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
import json
import logging
from typing import Any
import aiohttp
from python_pkg.steam_backlog_enforcer.config import CONFIG_DIR
logger = logging.getLogger(__name__)
PROTONDB_CACHE_FILE = CONFIG_DIR / "protondb_cache.json"
_PROTONDB_API = "https://www.protondb.com/api/v1/reports/summaries/{app_id}.json"
MAX_CONCURRENT = 30 # parallel requests - be polite to the CDN
# Tier ordering from best to worst.
TIER_ORDER: dict[str, int] = {
"native": 0,
"platinum": 1,
"gold": 2,
"silver": 3,
"bronze": 4,
"borked": 5,
"pending": 6,
}
# Games at or below this tier are skipped.
MIN_PLAYABLE_TIER = "gold"
@dataclass
class ProtonDBRating:
"""ProtonDB compatibility rating for a game."""
app_id: int
tier: str = ""
trending_tier: str = ""
score: float = 0.0
confidence: str = ""
total_reports: int = 0
@property
def is_playable(self) -> bool:
"""True if the game has at least gold-tier compatibility.
A game is considered unplayable when:
- Its tier is silver, bronze, or borked.
- Its tier is gold but trending to silver or worse.
- No data exists (unknown compatibility).
"""
if not self.tier:
return True # No data → don't block; user can skip manually.
tier_rank = TIER_ORDER.get(self.tier, 99)
min_rank = TIER_ORDER[MIN_PLAYABLE_TIER]
if tier_rank > min_rank:
# Silver, bronze, borked → skip.
return False
if tier_rank == min_rank and self.trending_tier:
# Gold but trending silver/bronze/borked → skip.
trend_rank = TIER_ORDER.get(self.trending_tier, 99)
if trend_rank > min_rank:
return False
return True
def _load_cache() -> dict[str, Any]:
"""Load the on-disk ProtonDB cache."""
if PROTONDB_CACHE_FILE.exists():
return json.loads(PROTONDB_CACHE_FILE.read_text(encoding="utf-8")) # type: ignore[no-any-return]
return {}
def _save_cache(cache: dict[str, Any]) -> None:
"""Persist the ProtonDB cache."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
PROTONDB_CACHE_FILE.write_text(json.dumps(cache, indent=2) + "\n", encoding="utf-8")
async def _fetch_one(
session: aiohttp.ClientSession,
sem: asyncio.Semaphore,
app_id: int,
) -> ProtonDBRating:
"""Fetch a single game's ProtonDB rating."""
url = _PROTONDB_API.format(app_id=app_id)
async with sem:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as r:
if r.status == 404: # noqa: PLR2004
return ProtonDBRating(app_id=app_id)
r.raise_for_status()
data = await r.json(content_type=None)
return ProtonDBRating(
app_id=app_id,
tier=data.get("tier", ""),
trending_tier=data.get("trendingTier", ""),
score=data.get("score", 0.0),
confidence=data.get("confidence", ""),
total_reports=data.get("total", 0),
)
except Exception: # noqa: BLE001
logger.warning("ProtonDB fetch failed for AppID=%d", app_id)
return ProtonDBRating(app_id=app_id)
async def _fetch_batch(app_ids: list[int]) -> list[ProtonDBRating]:
"""Fetch ProtonDB ratings for a batch of app IDs concurrently."""
sem = asyncio.Semaphore(MAX_CONCURRENT)
async with aiohttp.ClientSession() as session:
tasks = [_fetch_one(session, sem, aid) for aid in app_ids]
return await asyncio.gather(*tasks)
def _rating_to_dict(r: ProtonDBRating) -> dict[str, Any]:
"""Serialize a rating to a cache-friendly dict."""
return {
"tier": r.tier,
"trending_tier": r.trending_tier,
"score": r.score,
"confidence": r.confidence,
"total_reports": r.total_reports,
}
def _rating_from_cache(app_id: int, data: dict[str, Any]) -> ProtonDBRating:
"""Deserialize a rating from cached data."""
return ProtonDBRating(
app_id=app_id,
tier=data.get("tier", ""),
trending_tier=data.get("trending_tier", ""),
score=data.get("score", 0.0),
confidence=data.get("confidence", ""),
total_reports=data.get("total_reports", 0),
)
def fetch_protondb_ratings(
app_ids: list[int],
) -> dict[int, ProtonDBRating]:
"""Fetch ProtonDB ratings with local caching.
Returns a dict mapping app_id ProtonDBRating for every requested ID.
Cached results are reused; only missing IDs are fetched from the network.
"""
cache = _load_cache()
# Separate cached vs. uncached.
results: dict[int, ProtonDBRating] = {}
to_fetch: list[int] = []
for aid in app_ids:
key = str(aid)
if key in cache:
results[aid] = _rating_from_cache(aid, cache[key])
else:
to_fetch.append(aid)
if to_fetch:
logger.info(
"Fetching ProtonDB ratings for %d games (%d cached)...",
len(to_fetch),
len(results),
)
fetched = asyncio.run(_fetch_batch(to_fetch))
for r in fetched:
results[r.app_id] = r
cache[str(r.app_id)] = _rating_to_dict(r)
_save_cache(cache)
logger.info("ProtonDB: fetched %d, total cached %d", len(fetched), len(cache))
else:
logger.info("All %d ProtonDB ratings found in cache.", len(results))
return results

7
steam_backlog_enforcer/run.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# Quick launcher for the "done" workflow:
# check completion → open HLTB → pick next game → uninstall & hide others
set -euo pipefail
cd "$(dirname "$0")/../.."
exec python -m python_pkg.steam_backlog_enforcer.main "done"