steam_backlog_enforcer: reduce repeated cache refetches

This commit is contained in:
Krzysztof kuhy Rudnicki 2026-05-08 15:07:23 +02:00
parent e020e84a3a
commit b9f31a159c
4 changed files with 247 additions and 0 deletions

View File

@ -5,10 +5,13 @@ from __future__ import annotations
import json import json
import logging import logging
import time import time
from typing import Any
from python_pkg.steam_backlog_enforcer.config import ( from python_pkg.steam_backlog_enforcer.config import (
CONFIG_DIR,
Config, Config,
State, State,
_atomic_write,
load_snapshot, load_snapshot,
) )
from python_pkg.steam_backlog_enforcer.enforcer import ( from python_pkg.steam_backlog_enforcer.enforcer import (
@ -29,6 +32,46 @@ from python_pkg.steam_backlog_enforcer.steam_api import SteamAPIClient
from python_pkg.steam_backlog_enforcer.store_blocker import block_store from python_pkg.steam_backlog_enforcer.store_blocker import block_store
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_OWNED_IDS_CACHE_FILE = CONFIG_DIR / "owned_app_ids_cache.json"
_OWNED_IDS_CACHE_TTL_SECONDS = 3600
def _load_owned_app_ids_cache(steam_id: str) -> list[int] | None:
"""Return fresh cached owned app IDs for this steam_id, if available."""
if not steam_id or not _OWNED_IDS_CACHE_FILE.exists():
return None
try:
data: dict[str, Any] = json.loads(
_OWNED_IDS_CACHE_FILE.read_text(encoding="utf-8")
)
except (json.JSONDecodeError, OSError, ValueError):
return None
cached_steam_id = str(data.get("steam_id", ""))
if cached_steam_id != steam_id:
return None
fetched_at = float(data.get("fetched_at", 0.0) or 0.0)
age = time.time() - fetched_at
if age > _OWNED_IDS_CACHE_TTL_SECONDS:
return None
raw_ids = data.get("app_ids", [])
if not isinstance(raw_ids, list):
return None
return [int(app_id) for app_id in raw_ids]
def _save_owned_app_ids_cache(steam_id: str, app_ids: list[int]) -> None:
"""Persist owned app IDs cache for this steam_id."""
payload = {
"steam_id": steam_id,
"fetched_at": time.time(),
"app_ids": app_ids,
}
_atomic_write(_OWNED_IDS_CACHE_FILE, json.dumps(payload, indent=2) + "\n")
# ────────────────────────────────────────────────────────────── # ──────────────────────────────────────────────────────────────
@ -45,11 +88,24 @@ def get_all_owned_app_ids(config: Config) -> list[int]:
""" """
snapshot = load_snapshot() or [] snapshot = load_snapshot() or []
snapshot_ids = [int(d["app_id"]) for d in snapshot if "app_id" in d] snapshot_ids = [int(d["app_id"]) for d in snapshot if "app_id" in d]
cached_ids = _load_owned_app_ids_cache(config.steam_id)
if cached_ids is not None:
merged_ids: list[int] = []
seen: set[int] = set()
for app_id in [*cached_ids, *snapshot_ids]:
if app_id in seen:
continue
seen.add(app_id)
merged_ids.append(app_id)
logger.info("Using cached Steam owned IDs (%d entries).", len(cached_ids))
return merged_ids
try: try:
client = SteamAPIClient(config.steam_api_key, config.steam_id) client = SteamAPIClient(config.steam_api_key, config.steam_id)
owned = client.get_owned_games() owned = client.get_owned_games()
api_ids = [int(g["appid"]) for g in owned if "appid" in g] api_ids = [int(g["appid"]) for g in owned if "appid" in g]
_save_owned_app_ids_cache(config.steam_id, api_ids)
merged_ids: list[int] = [] merged_ids: list[int] = []
seen: set[int] = set() seen: set[int] = set()

View File

@ -190,6 +190,7 @@ def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None:
return (1, g.name.lower().encode().hex().__hash__()) return (1, g.name.lower().encode().hex().__hash__())
candidates.sort(key=sort_key) candidates.sort(key=sort_key)
_apply_cached_confidence_to_candidates(candidates)
chosen, confidence_skipped, linux_skipped = _pick_next_shortest_candidate( chosen, confidence_skipped, linux_skipped = _pick_next_shortest_candidate(
candidates candidates
@ -239,6 +240,17 @@ def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None:
) )
def _apply_cached_confidence_to_candidates(candidates: list[GameInfo]) -> None:
"""Overlay cached confidence counters onto candidate game objects."""
polls_cache = load_hltb_polls_cache()
count_comp_cache = load_hltb_count_comp_cache()
for game in candidates:
if game.app_id in polls_cache:
game.comp_100_count = polls_cache[game.app_id]
if game.app_id in count_comp_cache:
game.count_comp = count_comp_cache[game.app_id]
def _confidence_fail_reasons(game: GameInfo) -> list[str]: def _confidence_fail_reasons(game: GameInfo) -> list[str]:
"""Return threshold-failure reasons for a game's HLTB confidence data.""" """Return threshold-failure reasons for a game's HLTB confidence data."""
reasons: list[str] = [] reasons: list[str] = []

View File

@ -2,6 +2,8 @@
from __future__ import annotations from __future__ import annotations
import json
from typing import TYPE_CHECKING
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
from python_pkg.steam_backlog_enforcer._enforce_loop import ( from python_pkg.steam_backlog_enforcer._enforce_loop import (
@ -10,11 +12,16 @@ from python_pkg.steam_backlog_enforcer._enforce_loop import (
_enforce_loop_iteration, _enforce_loop_iteration,
_enforce_setup, _enforce_setup,
_guard_installed_games, _guard_installed_games,
_load_owned_app_ids_cache,
_save_owned_app_ids_cache,
do_enforce, do_enforce,
get_all_owned_app_ids, get_all_owned_app_ids,
) )
from python_pkg.steam_backlog_enforcer.config import Config, State from python_pkg.steam_backlog_enforcer.config import Config, State
if TYPE_CHECKING:
from pathlib import Path
PKG = "python_pkg.steam_backlog_enforcer._enforce_loop" PKG = "python_pkg.steam_backlog_enforcer._enforce_loop"
@ -25,6 +32,7 @@ class TestGetAllOwnedAppIds:
snap = [{"app_id": 1}, {"app_id": 2}] snap = [{"app_id": 1}, {"app_id": 2}]
with ( with (
patch(f"{PKG}.load_snapshot", return_value=snap), patch(f"{PKG}.load_snapshot", return_value=snap),
patch(f"{PKG}._load_owned_app_ids_cache", return_value=None),
patch(f"{PKG}.SteamAPIClient", side_effect=OSError("boom")), patch(f"{PKG}.SteamAPIClient", side_effect=OSError("boom")),
): ):
assert get_all_owned_app_ids(Config()) == [1, 2] assert get_all_owned_app_ids(Config()) == [1, 2]
@ -37,6 +45,7 @@ class TestGetAllOwnedAppIds:
] ]
with ( with (
patch(f"{PKG}.load_snapshot", return_value=None), patch(f"{PKG}.load_snapshot", return_value=None),
patch(f"{PKG}._load_owned_app_ids_cache", return_value=None),
patch(f"{PKG}.SteamAPIClient", return_value=mock_client), patch(f"{PKG}.SteamAPIClient", return_value=mock_client),
): ):
result = get_all_owned_app_ids( result = get_all_owned_app_ids(
@ -47,6 +56,7 @@ class TestGetAllOwnedAppIds:
def test_api_fails(self) -> None: def test_api_fails(self) -> None:
with ( with (
patch(f"{PKG}.load_snapshot", return_value=None), patch(f"{PKG}.load_snapshot", return_value=None),
patch(f"{PKG}._load_owned_app_ids_cache", return_value=None),
patch( patch(
f"{PKG}.SteamAPIClient", f"{PKG}.SteamAPIClient",
side_effect=OSError("fail"), side_effect=OSError("fail"),
@ -59,6 +69,7 @@ class TestGetAllOwnedAppIds:
mock_client.get_owned_games.return_value = [{"appid": 5}] mock_client.get_owned_games.return_value = [{"appid": 5}]
with ( with (
patch(f"{PKG}.load_snapshot", return_value=[]), patch(f"{PKG}.load_snapshot", return_value=[]),
patch(f"{PKG}._load_owned_app_ids_cache", return_value=None),
patch(f"{PKG}.SteamAPIClient", return_value=mock_client), patch(f"{PKG}.SteamAPIClient", return_value=mock_client),
): ):
assert get_all_owned_app_ids(Config(steam_api_key="k", steam_id="i")) == [5] assert get_all_owned_app_ids(Config(steam_api_key="k", steam_id="i")) == [5]
@ -70,6 +81,7 @@ class TestGetAllOwnedAppIds:
patch( patch(
f"{PKG}.load_snapshot", return_value=[{"app_id": 20}, {"app_id": 30}] f"{PKG}.load_snapshot", return_value=[{"app_id": 20}, {"app_id": 30}]
), ),
patch(f"{PKG}._load_owned_app_ids_cache", return_value=None),
patch(f"{PKG}.SteamAPIClient", return_value=mock_client), patch(f"{PKG}.SteamAPIClient", return_value=mock_client),
): ):
assert get_all_owned_app_ids(Config(steam_api_key="k", steam_id="i")) == [ assert get_all_owned_app_ids(Config(steam_api_key="k", steam_id="i")) == [
@ -78,6 +90,128 @@ class TestGetAllOwnedAppIds:
30, 30,
] ]
def test_uses_owned_ids_cache_without_api_call(self) -> None:
with (
patch(f"{PKG}.load_snapshot", return_value=[{"app_id": 30}]),
patch(f"{PKG}._load_owned_app_ids_cache", return_value=[10, 20]),
patch(f"{PKG}.SteamAPIClient") as mock_client,
):
result = get_all_owned_app_ids(Config(steam_api_key="k", steam_id="i"))
assert result == [10, 20, 30]
mock_client.assert_not_called()
def test_cached_ids_merge_deduplicates_entries(self) -> None:
with (
patch(
f"{PKG}.load_snapshot", return_value=[{"app_id": 20}, {"app_id": 30}]
),
patch(f"{PKG}._load_owned_app_ids_cache", return_value=[10, 20, 20]),
patch(f"{PKG}.SteamAPIClient") as mock_client,
):
result = get_all_owned_app_ids(Config(steam_api_key="k", steam_id="i"))
assert result == [10, 20, 30]
mock_client.assert_not_called()
def test_api_success_saves_owned_ids_cache(self) -> None:
mock_client = MagicMock()
mock_client.get_owned_games.return_value = [{"appid": 10}, {"appid": 20}]
with (
patch(f"{PKG}.load_snapshot", return_value=[]),
patch(f"{PKG}._load_owned_app_ids_cache", return_value=None),
patch(f"{PKG}.SteamAPIClient", return_value=mock_client),
patch(f"{PKG}._save_owned_app_ids_cache") as mock_save,
):
result = get_all_owned_app_ids(Config(steam_api_key="k", steam_id="i"))
assert result == [10, 20]
mock_save.assert_called_once_with("i", [10, 20])
class TestOwnedIdsCacheHelpers:
"""Tests for owned app IDs cache helper functions."""
def test_load_cache_no_steam_id(self, tmp_path: Path) -> None:
with patch(f"{PKG}._OWNED_IDS_CACHE_FILE", tmp_path / "owned.json"):
assert _load_owned_app_ids_cache("") is None
def test_load_cache_missing_file(self, tmp_path: Path) -> None:
with patch(f"{PKG}._OWNED_IDS_CACHE_FILE", tmp_path / "owned.json"):
assert _load_owned_app_ids_cache("sid") is None
def test_load_cache_invalid_json(self, tmp_path: Path) -> None:
cache_file = tmp_path / "owned.json"
cache_file.write_text("{invalid", encoding="utf-8")
with patch(f"{PKG}._OWNED_IDS_CACHE_FILE", cache_file):
assert _load_owned_app_ids_cache("sid") is None
def test_load_cache_wrong_steam_id(self, tmp_path: Path) -> None:
cache_file = tmp_path / "owned.json"
cache_file.write_text(
json.dumps({"steam_id": "other", "fetched_at": 1e12, "app_ids": [1]}),
encoding="utf-8",
)
with patch(f"{PKG}._OWNED_IDS_CACHE_FILE", cache_file):
assert _load_owned_app_ids_cache("sid") is None
def test_load_cache_stale(self, tmp_path: Path) -> None:
cache_file = tmp_path / "owned.json"
cache_file.write_text(
json.dumps({"steam_id": "sid", "fetched_at": 0, "app_ids": [1]}),
encoding="utf-8",
)
with (
patch(f"{PKG}._OWNED_IDS_CACHE_FILE", cache_file),
patch(f"{PKG}.time.time", return_value=10_000.0),
patch(f"{PKG}._OWNED_IDS_CACHE_TTL_SECONDS", 60),
):
assert _load_owned_app_ids_cache("sid") is None
def test_load_cache_non_list_ids(self, tmp_path: Path) -> None:
cache_file = tmp_path / "owned.json"
cache_file.write_text(
json.dumps({"steam_id": "sid", "fetched_at": 10_000.0, "app_ids": 1}),
encoding="utf-8",
)
with (
patch(f"{PKG}._OWNED_IDS_CACHE_FILE", cache_file),
patch(f"{PKG}.time.time", return_value=10_010.0),
patch(f"{PKG}._OWNED_IDS_CACHE_TTL_SECONDS", 60),
):
assert _load_owned_app_ids_cache("sid") is None
def test_load_cache_valid(self, tmp_path: Path) -> None:
cache_file = tmp_path / "owned.json"
cache_file.write_text(
json.dumps(
{"steam_id": "sid", "fetched_at": 10_000.0, "app_ids": ["1", 2]}
),
encoding="utf-8",
)
with (
patch(f"{PKG}._OWNED_IDS_CACHE_FILE", cache_file),
patch(f"{PKG}.time.time", return_value=10_010.0),
patch(f"{PKG}._OWNED_IDS_CACHE_TTL_SECONDS", 60),
):
assert _load_owned_app_ids_cache("sid") == [1, 2]
def test_save_cache_writes_atomic_payload(self, tmp_path: Path) -> None:
cache_file = tmp_path / "owned.json"
with (
patch(f"{PKG}._OWNED_IDS_CACHE_FILE", cache_file),
patch(f"{PKG}.time.time", return_value=123.0),
patch(f"{PKG}._atomic_write") as mock_atomic,
):
_save_owned_app_ids_cache("sid", [10, 20])
mock_atomic.assert_called_once()
path_arg = mock_atomic.call_args.args[0]
payload_arg = mock_atomic.call_args.args[1]
assert path_arg == cache_file
assert '"steam_id": "sid"' in payload_arg
assert '"app_ids": [\n 10,\n 20\n ]' in payload_arg
class TestGuardInstalledGames: class TestGuardInstalledGames:
"""Tests for _guard_installed_games.""" """Tests for _guard_installed_games."""

View File

@ -526,6 +526,51 @@ class TestPickNextGame:
assert state.current_app_id == 2 assert state.current_app_id == 2
mock_refresh_batch.assert_not_called() mock_refresh_batch.assert_not_called()
def test_cached_confidence_overlay_avoids_refetch_for_zero_snapshot_fields(
self,
) -> None:
"""Use cached confidence before deciding whether refresh is needed."""
low = _game(app_id=1, name="Low", hours=1.0)
low.comp_100_count = 0
low.count_comp = 0
fallback = _game(app_id=2, name="Fallback", hours=2.0)
fallback.comp_100_count = 3
fallback.count_comp = 20
config = Config(steam_api_key="k", steam_id="i")
state = State()
with (
patch(
"python_pkg.steam_backlog_enforcer.scanning.load_hltb_polls_cache",
return_value={1: 1, 2: 3},
),
patch(
"python_pkg.steam_backlog_enforcer.scanning.load_hltb_count_comp_cache",
return_value={1: 8, 2: 20},
),
patch(
"python_pkg.steam_backlog_enforcer.scanning._refresh_candidate_confidence_batch"
) as mock_refresh_batch,
patch(
"python_pkg.steam_backlog_enforcer.scanning._pick_playable_candidate",
side_effect=lambda c: c[0] if c else None,
),
patch("python_pkg.steam_backlog_enforcer.scanning._echo"),
patch(
"python_pkg.steam_backlog_enforcer.scanning.is_game_installed",
return_value=True,
),
patch(
"python_pkg.steam_backlog_enforcer.scanning.uninstall_other_games",
return_value=0,
),
):
pick_next_game([low, fallback], state, config)
assert state.current_app_id == 2
mock_refresh_batch.assert_not_called()
def test_stops_after_first_confident_assignment(self) -> None: def test_stops_after_first_confident_assignment(self) -> None:
"""Only candidates up to the winning one are checked/skipped.""" """Only candidates up to the winning one are checked/skipped."""
low = _game(app_id=1, name="Low", hours=1.0) low = _game(app_id=1, name="Low", hours=1.0)