From ccc1900adf1ea23f22af11beb8c03c86bde1ffd8 Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Wed, 11 Mar 2026 20:47:03 +0100 Subject: [PATCH] feat: improvements in hosts bluetooth focus mode and backglog scripts --- steam_backlog_enforcer/hltb.py | 54 +++++++++-- steam_backlog_enforcer/main.py | 141 +++++++++++++++++----------- steam_backlog_enforcer/steam_api.py | 5 + 3 files changed, 135 insertions(+), 65 deletions(-) diff --git a/steam_backlog_enforcer/hltb.py b/steam_backlog_enforcer/hltb.py index 46c7a3f..5ef9cf8 100644 --- a/steam_backlog_enforcer/hltb.py +++ b/steam_backlog_enforcer/hltb.py @@ -18,6 +18,7 @@ from http import HTTPStatus import json import logging import time +from typing import Any import aiohttp from howlongtobeatpy.HTMLRequests import HTMLRequests @@ -175,6 +176,31 @@ def _build_search_payload(game_name: str) -> str: ) +def _pick_best_hltb_entry( + search_name: str, + candidates: list[tuple[dict[str, Any], float]], +) -> tuple[dict[str, Any], float] | None: + """Pick the best HLTB entry, preferring full editions over demos/chapters. + + When a short name like "FAITH" matches both "FAITH" (demo) and + "FAITH: The Unholy Trinity" (full game), prefer the full game + since Steam often lists the full game under the shorter name. + """ + if not candidates: + return None + if len(candidates) == 1: + return candidates[0] + + lower = search_name.lower() + for entry, sim in candidates: + entry_name = (entry.get("game_name") or "").lower() + if entry_name.startswith((lower + ":", lower + " -")): + return entry, sim + + # Fall back to highest similarity. + return max(candidates, key=lambda x: x[1]) + + # ────────────────────────────────────────────────────────────── # Async fetching with shared session & progress # ────────────────────────────────────────────────────────────── @@ -211,6 +237,8 @@ async def _search_one( ) as resp: if resp.status == HTTPStatus.OK: data = await resp.json() + candidates: list[tuple[dict[str, Any], float]] = [] + lower_name = name.lower() for entry in data.get("data", []): entry_name = entry.get("game_name", "") entry_alias = entry.get("game_alias", "") or "" @@ -218,18 +246,24 @@ async def _search_one( _similarity(name, entry_name), _similarity(name, entry_alias), ) - if sim >= MIN_SIMILARITY: + is_full_edition = entry_name.lower().startswith( + lower_name + ":" + ) or entry_name.lower().startswith(lower_name + " -") + if sim >= MIN_SIMILARITY or is_full_edition: comp_100 = entry.get("comp_100", 0) if comp_100 and comp_100 > 0: - hours = round(comp_100 / 3600, 2) - result = HLTBResult( - app_id=app_id, - game_name=name, - completionist_hours=hours, - similarity=sim, - hltb_game_id=entry.get("game_id", 0), - ) - break + candidates.append((entry, sim)) + best = _pick_best_hltb_entry(name, candidates) + if best is not None: + entry, sim = best + hours = round(entry["comp_100"] / 3600, 2) + result = HLTBResult( + app_id=app_id, + game_name=name, + completionist_hours=hours, + similarity=sim, + hltb_game_id=entry.get("game_id", 0), + ) except (aiohttp.ClientError, asyncio.TimeoutError) as exc: logger.debug("HLTB search failed for '%s': %s", name, exc) diff --git a/steam_backlog_enforcer/main.py b/steam_backlog_enforcer/main.py index c00b1c2..8006cf3 100644 --- a/steam_backlog_enforcer/main.py +++ b/steam_backlog_enforcer/main.py @@ -27,6 +27,7 @@ from python_pkg.steam_backlog_enforcer.enforcer import ( ) from python_pkg.steam_backlog_enforcer.hltb import ( fetch_hltb_times_cached, + load_hltb_cache, ) from python_pkg.steam_backlog_enforcer.library_hider import ( hide_other_games, @@ -891,27 +892,6 @@ def cmd_list(_config: Config, state: State) -> None: _echo(f"\n COMPLETE: {len(complete)} games") -def cmd_skip(config: Config, state: State) -> None: - """Skip the currently assigned game.""" - if state.current_app_id is None: - _echo("No game currently assigned.") - return - - _echo(f"Skipping: {state.current_game_name}") - config.skip_app_ids.append(state.current_app_id) - config.save() - - snapshot = load_snapshot() - if snapshot: - games = [GameInfo.from_snapshot(d) for d in snapshot] - pick_next_game(games, state, config) - else: - state.current_app_id = None - state.current_game_name = "" - state.save() - _echo("Run 'scan' to pick a new game.") - - def cmd_unblock(_config: Config, _state: State) -> None: """Remove store blocking.""" if unblock_store(): @@ -1083,6 +1063,79 @@ def cmd_unhide(config: Config, _state: State) -> None: _echo("Done!") +def _try_reassign_shorter_game( + hltb_cache: dict[int, float], + app_id: int, + hours: float, + state: State, + config: Config, +) -> bool: + """Check if a shorter game is available and reassign if so.""" + snapshot_data = load_snapshot() + if not snapshot_data: + return False + all_games = [GameInfo.from_snapshot(d) for d in snapshot_data] + for g in all_games: + cached_hours = hltb_cache.get(g.app_id, -1.0) + if cached_hours > 0: + g.completionist_hours = cached_hours + skip = set(config.skip_app_ids) | set(state.finished_app_ids) + candidates = [ + g + for g in all_games + if not g.is_complete and g.app_id not in skip and g.completionist_hours > 0 + ] + candidates.sort(key=lambda g: g.completionist_hours) + if not candidates or candidates[0].app_id == app_id: + return False + shortest = candidates[0] + _echo( + f"\n Reassigning: {shortest.name} is shorter" + f" (~{shortest.completionist_hours:.1f}h vs ~{hours:.1f}h)" + ) + pick_next_game(all_games, state, config) + return True + + +def _finalize_completion( + config: Config, + state: State, + game_name: str, + app_id: int, +) -> None: + """Mark game complete, pick next, hide non-assigned games, notify.""" + _echo(f"\n COMPLETED: {game_name}!") + state.finished_app_ids.append(app_id) + + snapshot_data = load_snapshot() + _echo("\nPicking next game...") + if not snapshot_data: + _echo(" No snapshot found. Run 'scan' first.") + state.current_app_id = None + state.current_game_name = "" + state.save() + return + + games = [GameInfo.from_snapshot(d) for d in snapshot_data] + pick_next_game(games, state, config) + + if state.current_app_id is None: + _echo(" No more games to assign!") + return + + owned_ids = _get_all_owned_app_ids(config) + if owned_ids: + hidden = hide_other_games(owned_ids, state.current_app_id) + if hidden > 0: + _echo(f"\n Library: hid {hidden} games") + + send_notification( + "Game Complete!", + f"Finished {game_name}! Now playing: {state.current_game_name}", + ) + _echo(f"\nAll done! Go play {state.current_game_name}!") + + def cmd_done(config: Config, state: State) -> None: """Check completion, pick next game, uninstall & hide. @@ -1112,44 +1165,23 @@ def cmd_done(config: Config, state: State) -> None: f" ({game.completion_pct:.1f}%)" ) + hltb_cache = load_hltb_cache() + hours = hltb_cache.get(app_id, -1.0) + if hours < 0: + hltb_cache = fetch_hltb_times_cached([(app_id, game_name)]) + hours = hltb_cache.get(app_id, -1.0) + if hours > 0: + _echo(f" HLTB 100% estimate: {hours:.1f} hours") + + if _try_reassign_shorter_game(hltb_cache, app_id, hours, state, config): + return + if not game.is_complete: remaining = game.total_achievements - game.unlocked_achievements _echo(f"\n NOT COMPLETE: {remaining} achievements remaining. Keep going!") return - # ── Step 1: Mark complete ── - _echo(f"\n COMPLETED: {game_name}!") - state.finished_app_ids.append(app_id) - - # ── Step 2: Pick next game ── - snapshot_data = load_snapshot() - _echo("\nPicking next game...") - if not snapshot_data: - _echo(" No snapshot found. Run 'scan' first.") - state.current_app_id = None - state.current_game_name = "" - state.save() - return - - games = [GameInfo.from_snapshot(d) for d in snapshot_data] - pick_next_game(games, state, config) - - if state.current_app_id is None: - _echo(" No more games to assign!") - return - - # ── Step 3: Hide non-assigned games in library ── - owned_ids = _get_all_owned_app_ids(config) - if owned_ids: - hidden = hide_other_games(owned_ids, state.current_app_id) - if hidden > 0: - _echo(f"\n Library: hid {hidden} games") - - send_notification( - "Game Complete!", - f"Finished {game_name}! Now playing: {state.current_game_name}", - ) - _echo(f"\nAll done! Go play {state.current_game_name}!") + _finalize_completion(config, state, game_name, app_id) COMMANDS = { @@ -1157,7 +1189,6 @@ COMMANDS = { "check": ("Check assigned game completion", do_check), "status": ("Show current status", cmd_status), "list": ("List games from snapshot", cmd_list), - "skip": ("Skip currently assigned game", cmd_skip), "enforce": ("Run enforcer: block, uninstall, kill, hide", do_enforce), "install": ("Install the assigned game", cmd_install), "hide": ("Hide all non-assigned games in library", cmd_hide), diff --git a/steam_backlog_enforcer/steam_api.py b/steam_backlog_enforcer/steam_api.py index d189e09..d8f1f9c 100644 --- a/steam_backlog_enforcer/steam_api.py +++ b/steam_backlog_enforcer/steam_api.py @@ -117,6 +117,11 @@ class SteamAPIClient: self.api_key = api_key self.steam_id = steam_id self.session = requests.Session() + adapter = requests.adapters.HTTPAdapter( + pool_maxsize=MAX_WORKERS, + pool_connections=MAX_WORKERS, + ) + self.session.mount("https://", adapter) self.session.headers["Accept"] = "application/json" self._rate_lock = threading.Lock() self._request_times: list[float] = []