feat: improvements in hosts bluetooth focus mode and backglog scripts

This commit is contained in:
Krzysztof kuhy Rudnicki 2026-03-11 20:47:03 +01:00
parent 2dc4a8d759
commit ccc1900adf
3 changed files with 135 additions and 65 deletions

View File

@ -18,6 +18,7 @@ from http import HTTPStatus
import json import json
import logging import logging
import time import time
from typing import Any
import aiohttp import aiohttp
from howlongtobeatpy.HTMLRequests import HTMLRequests from howlongtobeatpy.HTMLRequests import HTMLRequests
@ -175,6 +176,31 @@ def _build_search_payload(game_name: str) -> str:
) )
def _pick_best_hltb_entry(
search_name: str,
candidates: list[tuple[dict[str, Any], float]],
) -> tuple[dict[str, Any], float] | None:
"""Pick the best HLTB entry, preferring full editions over demos/chapters.
When a short name like "FAITH" matches both "FAITH" (demo) and
"FAITH: The Unholy Trinity" (full game), prefer the full game
since Steam often lists the full game under the shorter name.
"""
if not candidates:
return None
if len(candidates) == 1:
return candidates[0]
lower = search_name.lower()
for entry, sim in candidates:
entry_name = (entry.get("game_name") or "").lower()
if entry_name.startswith((lower + ":", lower + " -")):
return entry, sim
# Fall back to highest similarity.
return max(candidates, key=lambda x: x[1])
# ────────────────────────────────────────────────────────────── # ──────────────────────────────────────────────────────────────
# Async fetching with shared session & progress # Async fetching with shared session & progress
# ────────────────────────────────────────────────────────────── # ──────────────────────────────────────────────────────────────
@ -211,6 +237,8 @@ async def _search_one(
) as resp: ) as resp:
if resp.status == HTTPStatus.OK: if resp.status == HTTPStatus.OK:
data = await resp.json() data = await resp.json()
candidates: list[tuple[dict[str, Any], float]] = []
lower_name = name.lower()
for entry in data.get("data", []): for entry in data.get("data", []):
entry_name = entry.get("game_name", "") entry_name = entry.get("game_name", "")
entry_alias = entry.get("game_alias", "") or "" entry_alias = entry.get("game_alias", "") or ""
@ -218,18 +246,24 @@ async def _search_one(
_similarity(name, entry_name), _similarity(name, entry_name),
_similarity(name, entry_alias), _similarity(name, entry_alias),
) )
if sim >= MIN_SIMILARITY: is_full_edition = entry_name.lower().startswith(
lower_name + ":"
) or entry_name.lower().startswith(lower_name + " -")
if sim >= MIN_SIMILARITY or is_full_edition:
comp_100 = entry.get("comp_100", 0) comp_100 = entry.get("comp_100", 0)
if comp_100 and comp_100 > 0: if comp_100 and comp_100 > 0:
hours = round(comp_100 / 3600, 2) candidates.append((entry, sim))
result = HLTBResult( best = _pick_best_hltb_entry(name, candidates)
app_id=app_id, if best is not None:
game_name=name, entry, sim = best
completionist_hours=hours, hours = round(entry["comp_100"] / 3600, 2)
similarity=sim, result = HLTBResult(
hltb_game_id=entry.get("game_id", 0), app_id=app_id,
) game_name=name,
break completionist_hours=hours,
similarity=sim,
hltb_game_id=entry.get("game_id", 0),
)
except (aiohttp.ClientError, asyncio.TimeoutError) as exc: except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
logger.debug("HLTB search failed for '%s': %s", name, exc) logger.debug("HLTB search failed for '%s': %s", name, exc)

View File

@ -27,6 +27,7 @@ from python_pkg.steam_backlog_enforcer.enforcer import (
) )
from python_pkg.steam_backlog_enforcer.hltb import ( from python_pkg.steam_backlog_enforcer.hltb import (
fetch_hltb_times_cached, fetch_hltb_times_cached,
load_hltb_cache,
) )
from python_pkg.steam_backlog_enforcer.library_hider import ( from python_pkg.steam_backlog_enforcer.library_hider import (
hide_other_games, hide_other_games,
@ -891,27 +892,6 @@ def cmd_list(_config: Config, state: State) -> None:
_echo(f"\n COMPLETE: {len(complete)} games") _echo(f"\n COMPLETE: {len(complete)} games")
def cmd_skip(config: Config, state: State) -> None:
"""Skip the currently assigned game."""
if state.current_app_id is None:
_echo("No game currently assigned.")
return
_echo(f"Skipping: {state.current_game_name}")
config.skip_app_ids.append(state.current_app_id)
config.save()
snapshot = load_snapshot()
if snapshot:
games = [GameInfo.from_snapshot(d) for d in snapshot]
pick_next_game(games, state, config)
else:
state.current_app_id = None
state.current_game_name = ""
state.save()
_echo("Run 'scan' to pick a new game.")
def cmd_unblock(_config: Config, _state: State) -> None: def cmd_unblock(_config: Config, _state: State) -> None:
"""Remove store blocking.""" """Remove store blocking."""
if unblock_store(): if unblock_store():
@ -1083,6 +1063,79 @@ def cmd_unhide(config: Config, _state: State) -> None:
_echo("Done!") _echo("Done!")
def _try_reassign_shorter_game(
hltb_cache: dict[int, float],
app_id: int,
hours: float,
state: State,
config: Config,
) -> bool:
"""Check if a shorter game is available and reassign if so."""
snapshot_data = load_snapshot()
if not snapshot_data:
return False
all_games = [GameInfo.from_snapshot(d) for d in snapshot_data]
for g in all_games:
cached_hours = hltb_cache.get(g.app_id, -1.0)
if cached_hours > 0:
g.completionist_hours = cached_hours
skip = set(config.skip_app_ids) | set(state.finished_app_ids)
candidates = [
g
for g in all_games
if not g.is_complete and g.app_id not in skip and g.completionist_hours > 0
]
candidates.sort(key=lambda g: g.completionist_hours)
if not candidates or candidates[0].app_id == app_id:
return False
shortest = candidates[0]
_echo(
f"\n Reassigning: {shortest.name} is shorter"
f" (~{shortest.completionist_hours:.1f}h vs ~{hours:.1f}h)"
)
pick_next_game(all_games, state, config)
return True
def _finalize_completion(
config: Config,
state: State,
game_name: str,
app_id: int,
) -> None:
"""Mark game complete, pick next, hide non-assigned games, notify."""
_echo(f"\n COMPLETED: {game_name}!")
state.finished_app_ids.append(app_id)
snapshot_data = load_snapshot()
_echo("\nPicking next game...")
if not snapshot_data:
_echo(" No snapshot found. Run 'scan' first.")
state.current_app_id = None
state.current_game_name = ""
state.save()
return
games = [GameInfo.from_snapshot(d) for d in snapshot_data]
pick_next_game(games, state, config)
if state.current_app_id is None:
_echo(" No more games to assign!")
return
owned_ids = _get_all_owned_app_ids(config)
if owned_ids:
hidden = hide_other_games(owned_ids, state.current_app_id)
if hidden > 0:
_echo(f"\n Library: hid {hidden} games")
send_notification(
"Game Complete!",
f"Finished {game_name}! Now playing: {state.current_game_name}",
)
_echo(f"\nAll done! Go play {state.current_game_name}!")
def cmd_done(config: Config, state: State) -> None: def cmd_done(config: Config, state: State) -> None:
"""Check completion, pick next game, uninstall & hide. """Check completion, pick next game, uninstall & hide.
@ -1112,44 +1165,23 @@ def cmd_done(config: Config, state: State) -> None:
f" ({game.completion_pct:.1f}%)" f" ({game.completion_pct:.1f}%)"
) )
hltb_cache = load_hltb_cache()
hours = hltb_cache.get(app_id, -1.0)
if hours < 0:
hltb_cache = fetch_hltb_times_cached([(app_id, game_name)])
hours = hltb_cache.get(app_id, -1.0)
if hours > 0:
_echo(f" HLTB 100% estimate: {hours:.1f} hours")
if _try_reassign_shorter_game(hltb_cache, app_id, hours, state, config):
return
if not game.is_complete: if not game.is_complete:
remaining = game.total_achievements - game.unlocked_achievements remaining = game.total_achievements - game.unlocked_achievements
_echo(f"\n NOT COMPLETE: {remaining} achievements remaining. Keep going!") _echo(f"\n NOT COMPLETE: {remaining} achievements remaining. Keep going!")
return return
# ── Step 1: Mark complete ── _finalize_completion(config, state, game_name, app_id)
_echo(f"\n COMPLETED: {game_name}!")
state.finished_app_ids.append(app_id)
# ── Step 2: Pick next game ──
snapshot_data = load_snapshot()
_echo("\nPicking next game...")
if not snapshot_data:
_echo(" No snapshot found. Run 'scan' first.")
state.current_app_id = None
state.current_game_name = ""
state.save()
return
games = [GameInfo.from_snapshot(d) for d in snapshot_data]
pick_next_game(games, state, config)
if state.current_app_id is None:
_echo(" No more games to assign!")
return
# ── Step 3: Hide non-assigned games in library ──
owned_ids = _get_all_owned_app_ids(config)
if owned_ids:
hidden = hide_other_games(owned_ids, state.current_app_id)
if hidden > 0:
_echo(f"\n Library: hid {hidden} games")
send_notification(
"Game Complete!",
f"Finished {game_name}! Now playing: {state.current_game_name}",
)
_echo(f"\nAll done! Go play {state.current_game_name}!")
COMMANDS = { COMMANDS = {
@ -1157,7 +1189,6 @@ COMMANDS = {
"check": ("Check assigned game completion", do_check), "check": ("Check assigned game completion", do_check),
"status": ("Show current status", cmd_status), "status": ("Show current status", cmd_status),
"list": ("List games from snapshot", cmd_list), "list": ("List games from snapshot", cmd_list),
"skip": ("Skip currently assigned game", cmd_skip),
"enforce": ("Run enforcer: block, uninstall, kill, hide", do_enforce), "enforce": ("Run enforcer: block, uninstall, kill, hide", do_enforce),
"install": ("Install the assigned game", cmd_install), "install": ("Install the assigned game", cmd_install),
"hide": ("Hide all non-assigned games in library", cmd_hide), "hide": ("Hide all non-assigned games in library", cmd_hide),

View File

@ -117,6 +117,11 @@ class SteamAPIClient:
self.api_key = api_key self.api_key = api_key
self.steam_id = steam_id self.steam_id = steam_id
self.session = requests.Session() self.session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_maxsize=MAX_WORKERS,
pool_connections=MAX_WORKERS,
)
self.session.mount("https://", adapter)
self.session.headers["Accept"] = "application/json" self.session.headers["Accept"] = "application/json"
self._rate_lock = threading.Lock() self._rate_lock = threading.Lock()
self._request_times: list[float] = [] self._request_times: list[float] = []