mirror of
https://github.com/kuhyx/testsAndMisc.git
synced 2026-07-04 16:03:03 +02:00
Replaces the auto-reassign-to-shorter-game logic (which fired while the current game was still in progress) with a strict workflow: 1. Check if assigned game is finished. 2. If not, do nothing. 3. If yes, pick the next shortest game and prompt the user. 4. If the user skips, ignore that game for 7 days and pick the next shortest candidate. Changes: - State: add skipped_until + skip_for_days + active_skipped_ids. - scanning.pick_next_game: optional on_select callback drives a sequential picker that filters skipped IDs; legacy cmd_pick flow preserved when on_select is None. - _cmd_done._finalize_completion: pick + prompt via on_select. - _cmd_done: remove _try_reassign_shorter_game and helpers (_apply_cached_confidence_to_games, _should_reassign_candidate, _echo_reassign_decision, _evaluate_reassign_iteration) plus call site in cmd_done. - Tests: drop obsolete _try_reassign_shorter_game suite; add TestPromptKeepOrSkip, TestPickNextGameSequential, and State skipped_until tests.
519 lines
17 KiB
Python
519 lines
17 KiB
Python
"""Game scanning, selection, checking, and enforcement daemon."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from python_pkg.steam_backlog_enforcer._hltb_types import (
|
|
load_hltb_count_comp_cache,
|
|
load_hltb_polls_cache,
|
|
)
|
|
from python_pkg.steam_backlog_enforcer._scanning_confidence import (
|
|
_apply_cached_confidence_to_candidates,
|
|
_candidate_passes_hltb_confidence,
|
|
_report_poll_confidence,
|
|
)
|
|
from python_pkg.steam_backlog_enforcer.config import (
|
|
Config,
|
|
State,
|
|
load_snapshot,
|
|
save_snapshot,
|
|
)
|
|
from python_pkg.steam_backlog_enforcer.enforcer import (
|
|
send_notification,
|
|
)
|
|
from python_pkg.steam_backlog_enforcer.game_install import (
|
|
_echo,
|
|
install_game,
|
|
is_game_installed,
|
|
uninstall_other_games,
|
|
)
|
|
from python_pkg.steam_backlog_enforcer.hltb import (
|
|
fetch_hltb_times_cached,
|
|
)
|
|
from python_pkg.steam_backlog_enforcer.protondb import (
|
|
ProtonDBRating,
|
|
fetch_protondb_ratings,
|
|
)
|
|
from python_pkg.steam_backlog_enforcer.steam_api import GameInfo, SteamAPIClient
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Callable
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_TAMPER_CHECK_LIMIT = 3
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# Scanning & game selection
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
|
|
def do_scan(config: Config, state: State) -> list[GameInfo]:
|
|
"""Full library scan: Steam API + HLTB times."""
|
|
client = SteamAPIClient(config.steam_api_key, config.steam_id)
|
|
|
|
start = time.time()
|
|
done_count = 0
|
|
|
|
def progress(current: int, total: int) -> None:
|
|
nonlocal done_count
|
|
done_count = current
|
|
if current % 50 == 0 or current == total:
|
|
_echo(f"\r Scanning achievements: {current}/{total}", end="", flush=True)
|
|
|
|
_echo("Scanning Steam library...")
|
|
games = client.build_game_list(
|
|
progress_callback=progress,
|
|
)
|
|
elapsed = time.time() - start
|
|
_echo(f"\n Scanned {len(games)} games with achievements in {elapsed:.1f}s")
|
|
|
|
# Fetch HLTB times (cached).
|
|
incomplete = [(g.app_id, g.name) for g in games if not g.is_complete]
|
|
if incomplete:
|
|
_echo(f"Fetching HLTB completion times for {len(incomplete)} games...")
|
|
|
|
def hltb_progress(done: int, total: int, found: int, name: str) -> None:
|
|
pct = done * 100 // total
|
|
bar_w = 30
|
|
filled = bar_w * done // total
|
|
bar = "█" * filled + "░" * (bar_w - filled)
|
|
_echo(
|
|
f"\r HLTB [{bar}] {done}/{total} ({pct}%) "
|
|
f"| {found} found | {name[:30]:<30s}",
|
|
end="",
|
|
flush=True,
|
|
)
|
|
|
|
hltb_cache = fetch_hltb_times_cached(incomplete, progress_cb=hltb_progress)
|
|
_echo("") # newline after progress bar
|
|
polls_cache = load_hltb_polls_cache()
|
|
count_comp_cache = load_hltb_count_comp_cache()
|
|
for g in games:
|
|
hours = hltb_cache.get(g.app_id, -1)
|
|
g.completionist_hours = hours
|
|
g.comp_100_count = polls_cache.get(g.app_id, 0)
|
|
g.count_comp = count_comp_cache.get(g.app_id, 0)
|
|
found = sum(1 for h in hltb_cache.values() if h > 0)
|
|
_echo(f" HLTB data: {found} games have completion estimates")
|
|
|
|
# Save snapshot.
|
|
save_snapshot([g.to_snapshot() for g in games])
|
|
|
|
complete = [g for g in games if g.is_complete]
|
|
incomplete_games = [g for g in games if not g.is_complete]
|
|
_echo(f"\nResults: {len(complete)} complete, {len(incomplete_games)} incomplete")
|
|
|
|
# Auto-pick a game if none assigned.
|
|
if state.current_app_id is None:
|
|
pick_next_game(games, state, config)
|
|
else:
|
|
# Show confidence info for the already-assigned game too.
|
|
current = next(
|
|
(g for g in games if g.app_id == state.current_app_id),
|
|
None,
|
|
)
|
|
if current is not None:
|
|
_echo(f"\n>>> CURRENT: {current.name} (AppID={current.app_id})")
|
|
_report_poll_confidence(current, games, state)
|
|
|
|
return games
|
|
|
|
|
|
# How many candidates to check per ProtonDB batch.
|
|
_PROTONDB_BATCH_SIZE = 20
|
|
|
|
|
|
def _pick_playable_candidate(
|
|
candidates: list[GameInfo],
|
|
) -> GameInfo | None:
|
|
"""Return the first candidate with an acceptable ProtonDB rating.
|
|
|
|
Checks candidates in batches (sorted by HLTB hours, shortest first).
|
|
Games rated silver-or-worse, or gold-trending-down, are skipped.
|
|
"""
|
|
offset = 0
|
|
while offset < len(candidates):
|
|
batch = candidates[offset : offset + _PROTONDB_BATCH_SIZE]
|
|
app_ids = [g.app_id for g in batch]
|
|
ratings = fetch_protondb_ratings(app_ids)
|
|
|
|
for game in batch:
|
|
rating = ratings.get(game.app_id, ProtonDBRating(app_id=game.app_id))
|
|
if rating.is_playable:
|
|
if offset > 0 or game is not batch[0]:
|
|
_echo(
|
|
f" Skipped {offset + batch.index(game)} game(s) "
|
|
f"with poor Linux compatibility"
|
|
)
|
|
return game
|
|
logger.info(
|
|
"Skipping %s (AppID=%d): ProtonDB %s (trending %s)",
|
|
game.name,
|
|
game.app_id,
|
|
rating.tier,
|
|
rating.trending_tier,
|
|
)
|
|
|
|
offset += _PROTONDB_BATCH_SIZE
|
|
|
|
return None
|
|
|
|
|
|
_PICK_LIST_SIZE = 10
|
|
|
|
_NO_CONF_MSG = (
|
|
"\nNo assignable games found "
|
|
"(HLTB confidence thresholds: comp_100 polls>=3, "
|
|
"count_comp>=15, sum>=18)."
|
|
)
|
|
|
|
|
|
def _sort_key(g: GameInfo) -> tuple[int, float]:
|
|
"""Sort by known HLTB time (shortest first), then unknown games."""
|
|
if g.completionist_hours > 0:
|
|
return (0, g.completionist_hours)
|
|
return (1, g.name.lower().encode().hex().__hash__())
|
|
|
|
|
|
def _collect_qualified_candidates(
|
|
candidates: list[GameInfo],
|
|
) -> tuple[list[GameInfo], int, int]:
|
|
"""Collect up to _PICK_LIST_SIZE playable, HLTB-confident candidates."""
|
|
qualified: list[GameInfo] = []
|
|
confidence_skipped = 0
|
|
linux_skipped = 0
|
|
for game in candidates:
|
|
if len(qualified) >= _PICK_LIST_SIZE:
|
|
break
|
|
if not _candidate_passes_hltb_confidence(game):
|
|
confidence_skipped += 1
|
|
continue
|
|
playable = _pick_playable_candidate([game])
|
|
if playable is not None:
|
|
qualified.append(playable)
|
|
else:
|
|
linux_skipped += 1
|
|
return qualified, confidence_skipped, linux_skipped
|
|
|
|
|
|
def _prompt_user_pick(qualified: list[GameInfo]) -> int:
|
|
"""Present numbered list, return 0-based index of user's choice."""
|
|
for i, g in enumerate(qualified, 1):
|
|
hours_str = (
|
|
f" (~{g.completionist_hours:.1f}h)" if g.completionist_hours > 0 else ""
|
|
)
|
|
_echo(f" {i}. {g.name} (AppID={g.app_id}){hours_str}")
|
|
while True:
|
|
raw = input("Select game number: ")
|
|
try:
|
|
idx = int(raw)
|
|
except ValueError:
|
|
_echo(f"Invalid input: {raw!r}")
|
|
continue
|
|
if idx < 1 or idx > len(qualified):
|
|
_echo(f"Out of range: {idx}")
|
|
continue
|
|
return idx - 1
|
|
|
|
|
|
def _assign_chosen_game(
|
|
chosen: GameInfo,
|
|
games: list[GameInfo],
|
|
state: State,
|
|
config: Config,
|
|
) -> None:
|
|
"""Save assignment, announce it, and handle install/uninstall."""
|
|
state.current_app_id = chosen.app_id
|
|
state.current_game_name = chosen.name
|
|
state.save()
|
|
hours_str = (
|
|
f" (~{chosen.completionist_hours:.1f}h leisure+dlc)"
|
|
if chosen.completionist_hours > 0
|
|
else ""
|
|
)
|
|
_echo(f"\n>>> ASSIGNED: {chosen.name} (AppID={chosen.app_id}){hours_str}")
|
|
_echo(
|
|
f" Progress: {chosen.unlocked_achievements}/{chosen.total_achievements}"
|
|
f" ({chosen.completion_pct:.1f}%)"
|
|
)
|
|
_report_poll_confidence(chosen, games, state)
|
|
if config.uninstall_other_games:
|
|
count = uninstall_other_games(chosen.app_id)
|
|
if count:
|
|
_echo(f"\n Uninstalled {count} non-assigned games")
|
|
if not is_game_installed(chosen.app_id):
|
|
_echo(f"\n Auto-installing {chosen.name}...")
|
|
install_game(
|
|
chosen.app_id, chosen.name, config.steam_id, use_steam_protocol=True
|
|
)
|
|
|
|
|
|
def _pick_next_game_sequential(
|
|
games: list[GameInfo],
|
|
state: State,
|
|
config: Config,
|
|
on_select: Callable[[GameInfo], bool],
|
|
) -> None:
|
|
"""Pick the next-shortest playable game, asking the user per candidate.
|
|
|
|
``on_select`` is called with each prospective pick. Returning ``True``
|
|
accepts the assignment; returning ``False`` records a 7-day skip on
|
|
``state`` for that game and the next candidate is evaluated.
|
|
"""
|
|
while True:
|
|
skip = set(state.finished_app_ids) | state.active_skipped_ids()
|
|
candidates = [g for g in games if not g.is_complete and g.app_id not in skip]
|
|
if not candidates:
|
|
_echo(_NO_CONF_MSG)
|
|
state.current_app_id = None
|
|
state.current_game_name = ""
|
|
state.save()
|
|
return
|
|
|
|
candidates.sort(key=_sort_key)
|
|
_apply_cached_confidence_to_candidates(candidates)
|
|
chosen, confidence_skipped, linux_skipped = _pick_next_shortest_candidate(
|
|
candidates
|
|
)
|
|
if chosen is None:
|
|
_echo(
|
|
_NO_CONF_MSG
|
|
if confidence_skipped > 0 and linux_skipped == 0
|
|
else "\nNo playable games left (all have poor ProtonDB ratings)!"
|
|
)
|
|
state.current_app_id = None
|
|
state.current_game_name = ""
|
|
state.save()
|
|
return
|
|
|
|
if not on_select(chosen):
|
|
state.skip_for_days(chosen.app_id, 7)
|
|
state.save()
|
|
_echo(f"\n Skipped {chosen.name} for 7 days; picking next...")
|
|
continue
|
|
|
|
_assign_chosen_game(chosen, games, state, config)
|
|
return
|
|
|
|
|
|
def pick_next_game(
|
|
games: list[GameInfo],
|
|
state: State,
|
|
config: Config,
|
|
*,
|
|
on_select: Callable[[GameInfo], bool] | None = None,
|
|
) -> None:
|
|
"""Present a ranked list of eligible games and let the user pick one.
|
|
|
|
Games are ranked by shortest completionist time first. Games with
|
|
silver-or-worse ProtonDB ratings (or gold trending downward) are
|
|
excluded as unplayable on Linux.
|
|
|
|
If ``on_select`` is provided, the legacy 10-candidate picker is
|
|
bypassed: the function instead presents the shortest playable
|
|
candidate to ``on_select`` (typically a yes/no prompt) and, if the
|
|
callback rejects it, records a 7-day skip and re-evaluates.
|
|
"""
|
|
if on_select is not None:
|
|
_pick_next_game_sequential(games, state, config, on_select)
|
|
return
|
|
|
|
skip = set(state.finished_app_ids) | state.active_skipped_ids()
|
|
candidates = [g for g in games if not g.is_complete and g.app_id not in skip]
|
|
|
|
if not candidates:
|
|
_echo(_NO_CONF_MSG)
|
|
state.current_app_id = None
|
|
state.current_game_name = ""
|
|
state.save()
|
|
return
|
|
|
|
candidates.sort(key=_sort_key)
|
|
_apply_cached_confidence_to_candidates(candidates)
|
|
qualified, confidence_skipped, linux_skipped = _collect_qualified_candidates(
|
|
candidates
|
|
)
|
|
|
|
if not qualified:
|
|
_echo(
|
|
_NO_CONF_MSG
|
|
if confidence_skipped > 0 and linux_skipped == 0
|
|
else "\nNo playable games left (all have poor ProtonDB ratings)!"
|
|
)
|
|
state.current_app_id = None
|
|
state.current_game_name = ""
|
|
state.save()
|
|
return
|
|
|
|
idx = _prompt_user_pick(qualified)
|
|
_assign_chosen_game(qualified[idx], games, state, config)
|
|
|
|
|
|
def _pick_next_shortest_candidate(
|
|
candidates: list[GameInfo],
|
|
) -> tuple[GameInfo | None, int, int]:
|
|
"""Pick next game by checking confidence one candidate at a time.
|
|
|
|
The list must be pre-sorted by desired priority (shortest first).
|
|
"""
|
|
confidence_skipped = 0
|
|
linux_skipped = 0
|
|
for game in candidates:
|
|
if not _candidate_passes_hltb_confidence(game):
|
|
confidence_skipped += 1
|
|
continue
|
|
|
|
# Reuse existing ProtonDB compatibility gate for one candidate.
|
|
playable = _pick_playable_candidate([game])
|
|
if playable is not None:
|
|
if linux_skipped > 0:
|
|
_echo(
|
|
f" Skipped {linux_skipped} game(s) with poor Linux compatibility"
|
|
)
|
|
return playable, confidence_skipped, linux_skipped
|
|
linux_skipped += 1
|
|
|
|
if linux_skipped > 0:
|
|
_echo(f" Skipped {linux_skipped} game(s) with poor Linux compatibility")
|
|
return None, confidence_skipped, linux_skipped
|
|
|
|
|
|
def _collect_top_candidates(
|
|
candidates: list[GameInfo],
|
|
n: int = 3,
|
|
) -> tuple[list[GameInfo], int, int]:
|
|
"""Collect up to n candidates that pass the Linux compatibility gate.
|
|
|
|
Args:
|
|
candidates: Pre-sorted list of candidate games.
|
|
n: Maximum number of qualified games to collect.
|
|
|
|
Returns:
|
|
Tuple of (qualified_list, conf_skipped, linux_skipped).
|
|
"""
|
|
qualified: list[GameInfo] = []
|
|
linux_skipped = 0
|
|
for game in candidates:
|
|
if len(qualified) >= n:
|
|
break
|
|
playable = _pick_playable_candidate([game])
|
|
if playable is not None:
|
|
qualified.append(playable)
|
|
else:
|
|
linux_skipped += 1
|
|
if linux_skipped > 0:
|
|
_echo(f" Skipped {linux_skipped} game(s) with poor Linux compatibility")
|
|
return qualified, 0, linux_skipped
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# Checking & tampering detection
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
|
|
def do_check(config: Config, state: State) -> None:
|
|
"""Check assigned game completion status; detect tampering."""
|
|
if state.current_app_id is None:
|
|
_echo("No game currently assigned. Run 'scan' first.")
|
|
return
|
|
|
|
client = SteamAPIClient(config.steam_api_key, config.steam_id)
|
|
_echo(f"Checking {state.current_game_name} (AppID={state.current_app_id})...")
|
|
|
|
game = client.refresh_single_game(state.current_app_id, state.current_game_name)
|
|
if game is None:
|
|
_echo(" Could not fetch achievement data.")
|
|
return
|
|
|
|
_echo(
|
|
f" Progress: {game.unlocked_achievements}/{game.total_achievements}"
|
|
f" ({game.completion_pct:.1f}%)"
|
|
)
|
|
|
|
if game.is_complete:
|
|
_echo(f"\n COMPLETED: {state.current_game_name}!")
|
|
state.finished_app_ids.append(state.current_app_id)
|
|
send_notification(
|
|
"Game Complete!",
|
|
f"You finished {state.current_game_name}! Picking next game...",
|
|
)
|
|
|
|
# Load snapshot and pick next.
|
|
snapshot_data = load_snapshot()
|
|
if snapshot_data:
|
|
games = [GameInfo.from_snapshot(d) for d in snapshot_data]
|
|
pick_next_game(games, state, config)
|
|
else:
|
|
state.current_app_id = None
|
|
state.current_game_name = ""
|
|
state.save()
|
|
_echo(" Run 'scan' to pick the next game.")
|
|
else:
|
|
remaining = game.total_achievements - game.unlocked_achievements
|
|
_echo(f" {remaining} achievements remaining. Keep going!")
|
|
|
|
# Tampering detection on snapshot.
|
|
detect_tampering(config, state)
|
|
|
|
|
|
def _check_game_tampering(
|
|
client: SteamAPIClient,
|
|
entry: dict[str, Any],
|
|
state: State,
|
|
) -> tuple[str, int, int] | None:
|
|
"""Check if a single game has unexpected achievement progress.
|
|
|
|
Args:
|
|
client: Steam API client.
|
|
entry: Snapshot entry for the game.
|
|
state: Current enforcer state.
|
|
|
|
Returns:
|
|
Tuple of (name, app_id, diff) if tampering detected, else None.
|
|
"""
|
|
app_id = entry["app_id"]
|
|
if app_id == state.current_app_id:
|
|
return None
|
|
if entry["unlocked_achievements"] >= entry["total_achievements"]:
|
|
return None
|
|
if entry.get("playtime_minutes", 0) <= 0:
|
|
return None
|
|
game = client.refresh_single_game(
|
|
app_id, entry["name"], entry.get("playtime_minutes", 0)
|
|
)
|
|
if game and game.unlocked_achievements > entry["unlocked_achievements"]:
|
|
diff = game.unlocked_achievements - entry["unlocked_achievements"]
|
|
return (entry["name"], app_id, diff)
|
|
return None
|
|
|
|
|
|
def detect_tampering(config: Config, state: State) -> None:
|
|
"""Check if achievements were unlocked on non-assigned games."""
|
|
old_snapshot = load_snapshot()
|
|
if old_snapshot is None:
|
|
return
|
|
|
|
client = SteamAPIClient(config.steam_api_key, config.steam_id)
|
|
|
|
# Quick check: only re-fetch a few random non-assigned games.
|
|
suspicious: list[tuple[str, int, int]] = []
|
|
for entry in old_snapshot:
|
|
result = _check_game_tampering(client, entry, state)
|
|
if result:
|
|
suspicious.append(result)
|
|
if len(suspicious) >= _TAMPER_CHECK_LIMIT:
|
|
break
|
|
|
|
if suspicious:
|
|
_echo("\n TAMPERING DETECTED:")
|
|
for name, app_id, diff in suspicious:
|
|
_echo(f" {name} (AppID={app_id}): +{diff} new achievements!")
|
|
send_notification(
|
|
"Tampering Detected!",
|
|
f"Achievements unlocked on {len(suspicious)} non-assigned games!",
|
|
)
|