diff --git a/steam_backlog_enforcer/_cmd_done.py b/steam_backlog_enforcer/_cmd_done.py index 75e55e8..5613b8c 100644 --- a/steam_backlog_enforcer/_cmd_done.py +++ b/steam_backlog_enforcer/_cmd_done.py @@ -216,7 +216,7 @@ def _try_reassign_shorter_game( if not snapshot_data: return False all_games = [GameInfo.from_snapshot(d) for d in snapshot_data] - skip = set(config.skip_app_ids) | set(state.finished_app_ids) + skip = set(state.finished_app_ids) _refresh_uncached_shortlist_hours( all_games, hltb_cache, @@ -296,7 +296,7 @@ def _finalize_completion( games = [GameInfo.from_snapshot(d) for d in snapshot_data] hltb_cache = load_hltb_cache() - skip = set(config.skip_app_ids) | set(state.finished_app_ids) + skip = set(state.finished_app_ids) _refresh_uncached_shortlist_hours(games, hltb_cache, skip) _apply_cached_hours_to_games(games, hltb_cache) pick_next_game(games, state, config) diff --git a/steam_backlog_enforcer/_enforce_loop.py b/steam_backlog_enforcer/_enforce_loop.py index c61a9a1..af7b4dd 100644 --- a/steam_backlog_enforcer/_enforce_loop.py +++ b/steam_backlog_enforcer/_enforce_loop.py @@ -7,8 +7,13 @@ import logging import time from typing import Any +from python_pkg.steam_backlog_enforcer._whitelist import ( + lock_enforcement_files, + promote_pending_exceptions, +) from python_pkg.steam_backlog_enforcer.config import ( CONFIG_DIR, + CONFIG_FILE, Config, State, _atomic_write, @@ -19,11 +24,11 @@ from python_pkg.steam_backlog_enforcer.enforcer import ( send_notification, ) from python_pkg.steam_backlog_enforcer.game_install import ( - PROTECTED_APP_IDS, _echo, get_installed_games, install_game, is_game_installed, + is_protected_app, uninstall_game, uninstall_other_games, ) @@ -143,7 +148,7 @@ def _guard_installed_games(allowed_app_id: int | None) -> int: for app_id, name in installed: if app_id == allowed_app_id: continue - if app_id in PROTECTED_APP_IDS: + if is_protected_app(app_id): continue logger.warning( @@ -275,6 +280,14 @@ def _enforce_loop_iteration(config: Config, state: State) -> None: config.steam_id, ) + # D) Promote any cooldown-elapsed pending exceptions to approved. + newly_approved = promote_pending_exceptions() + for aid in newly_approved: + logger.info("Exception approved: AppID=%d", aid) + + # E) Re-apply immutable flag so config cannot be edited without root. + lock_enforcement_files(CONFIG_FILE) + def do_enforce(config: Config, state: State) -> None: """Run the enforcer: block store, uninstall other games, kill processes. diff --git a/steam_backlog_enforcer/_whitelist.py b/steam_backlog_enforcer/_whitelist.py new file mode 100644 index 0000000..3c262f1 --- /dev/null +++ b/steam_backlog_enforcer/_whitelist.py @@ -0,0 +1,347 @@ +"""Whitelist hardening: time-locked exceptions, reason validation, immutability.""" + +from __future__ import annotations + +from collections import Counter +import contextlib +import json +import logging +import math +import re +import shutil +import subprocess +import time +from typing import TYPE_CHECKING, cast + +from python_pkg.steam_backlog_enforcer.config import CONFIG_DIR, _atomic_write + +if TYPE_CHECKING: + from pathlib import Path + +logger = logging.getLogger(__name__) + +# ────────────────────────────────────────────────────────────── +# File paths (patched in tests via conftest) +# ────────────────────────────────────────────────────────────── + +PENDING_EXCEPTIONS_FILE: Path = CONFIG_DIR / "pending_exceptions.json" +APPROVED_EXCEPTIONS_FILE: Path = CONFIG_DIR / "approved_exceptions.json" +EXCEPTION_AUDIT_LOG: Path = CONFIG_DIR / "exception_audit.log" + +# ────────────────────────────────────────────────────────────── +# Constants +# ────────────────────────────────────────────────────────────── + +WHITELIST_COOLDOWN_SECONDS: int = 86400 # 24 hours + +_MIN_REASON_WORDS: int = 5 +_MIN_REASON_LENGTH: int = 25 +_MIN_ENTROPY: float = 3.0 +# Reject runs of the same character longer than this (e.g. "aaaa"). +_MAX_CHAR_RUN: int = 3 + + +# ────────────────────────────────────────────────────────────── +# Reason validation +# ────────────────────────────────────────────────────────────── + + +def _shannon_entropy(text: str) -> float: + """Return Shannon entropy (bits per character) for *text*. + + Whitespace is excluded before counting so spaces don't inflate entropy. + + Args: + text: Input string to measure. + + Returns: + Entropy in bits per character, or 0.0 for empty input. + """ + chars = [c.lower() for c in text if not c.isspace()] + if not chars: + return 0.0 + total = len(chars) + counts = Counter(chars) + return -sum((c / total) * math.log2(c / total) for c in counts.values()) + + +def validate_reason(reason: str) -> str | None: + """Validate that a whitelist exception reason is genuine. + + Returns None when the reason is acceptable, or a human-readable error + string that explains why it was rejected. + + Args: + reason: User-supplied justification text. + + Returns: + None if valid, or an error message string if invalid. + """ + stripped = reason.strip() + + if len(stripped) < _MIN_REASON_LENGTH: + return ( + f"Reason is too short ({len(stripped)} chars; " + f"need at least {_MIN_REASON_LENGTH})." + ) + + words = stripped.split() + if len(words) < _MIN_REASON_WORDS: + return ( + f"Reason must contain at least {_MIN_REASON_WORDS} words " + f"(got {len(words)})." + ) + + entropy = _shannon_entropy(stripped) + if entropy < _MIN_ENTROPY: + return ( + f"Reason appears to be random characters " + f"(entropy {entropy:.2f} < {_MIN_ENTROPY}). " + "Write a genuine justification." + ) + + # Reject runs of the same character: aaaa, bbbbbb, etc. + if re.search(r"(.)\1{3,}", stripped, re.IGNORECASE): + return "Reason contains repeated characters. Write a genuine justification." + + # Reject simple two-character alternating patterns: ababab, asasas, etc. + if re.search(r"(..)(\1){3,}", stripped, re.IGNORECASE): + return "Reason contains repetitive patterns. Write a genuine justification." + + return None + + +# ────────────────────────────────────────────────────────────── +# Immutability helpers +# ────────────────────────────────────────────────────────────── + + +def _try_set_immutable(path: Path, *, immutable: bool) -> None: + """Silently attempt to set or clear the immutable flag on *path*. + + This is a best-effort operation — it fails silently if chattr is not + available, the process lacks the required capability, or the filesystem + does not support the flag. + + Args: + path: File to modify. + immutable: True to set +i, False to clear -i. + """ + if not path.exists(): + return + chattr = shutil.which("chattr") + if chattr is None: + return + flag = "+i" if immutable else "-i" + with contextlib.suppress(OSError, subprocess.TimeoutExpired): + subprocess.run( + [chattr, flag, str(path)], + capture_output=True, + check=False, + timeout=5, + ) + + +def lock_enforcement_files(config_file: Path) -> None: + """Apply chattr +i to enforcement-critical config files. + + Called at the end of each enforce-loop iteration. Requires that the + daemon is running as root (or has CAP_LINUX_IMMUTABLE). + + Args: + config_file: Path to the main config.json. + """ + _try_set_immutable(config_file, immutable=True) + _try_set_immutable(APPROVED_EXCEPTIONS_FILE, immutable=True) + + +def unlock_for_write(path: Path) -> None: + """Clear the immutable flag before writing *path*. + + Args: + path: File to unlock. + """ + _try_set_immutable(path, immutable=False) + + +# ────────────────────────────────────────────────────────────── +# Persistence helpers +# ────────────────────────────────────────────────────────────── + + +def _load_pending() -> list[dict[str, object]]: + """Load pending exception entries from disk.""" + if not PENDING_EXCEPTIONS_FILE.exists(): + return [] + try: + data: object = json.loads(PENDING_EXCEPTIONS_FILE.read_text(encoding="utf-8")) + if isinstance(data, list): + return cast("list[dict[str, object]]", data) + except (json.JSONDecodeError, OSError, ValueError): + pass + return [] + + +def _save_pending(entries: list[dict[str, object]]) -> None: + """Persist pending exception entries to disk.""" + _atomic_write(PENDING_EXCEPTIONS_FILE, json.dumps(entries, indent=2) + "\n") + + +def _load_approved() -> list[dict[str, object]]: + """Load approved exception entries from disk.""" + if not APPROVED_EXCEPTIONS_FILE.exists(): + return [] + try: + data: object = json.loads(APPROVED_EXCEPTIONS_FILE.read_text(encoding="utf-8")) + if isinstance(data, list): + return cast("list[dict[str, object]]", data) + except (json.JSONDecodeError, OSError, ValueError): + pass + return [] + + +def _save_approved(entries: list[dict[str, object]]) -> None: + """Persist approved exception entries to disk.""" + unlock_for_write(APPROVED_EXCEPTIONS_FILE) + _atomic_write(APPROVED_EXCEPTIONS_FILE, json.dumps(entries, indent=2) + "\n") + _try_set_immutable(APPROVED_EXCEPTIONS_FILE, immutable=True) + + +def _append_audit_log(app_id: int, reason: str, event: str) -> None: + """Append one line to the append-only audit log. + + Each line has the format:: + + ISO-TIMESTAMP | EVENT | app_id=NNN | reason='...' + + Args: + app_id: Steam application ID involved. + reason: Justification text supplied by the user. + event: Short event label such as ``REQUESTED`` or ``APPROVED``. + """ + timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + line = f"{timestamp} | {event} | app_id={app_id} | reason={reason!r}\n" + EXCEPTION_AUDIT_LOG.parent.mkdir(parents=True, exist_ok=True) + with EXCEPTION_AUDIT_LOG.open("a", encoding="utf-8") as fh: + fh.write(line) + + +# ────────────────────────────────────────────────────────────── +# Public API +# ────────────────────────────────────────────────────────────── + + +def add_pending_exception(app_id: int, reason: str) -> str: + """Request a new whitelist exception for *app_id*. + + The entry becomes active only after ``WHITELIST_COOLDOWN_SECONDS`` have + elapsed (24 h by default). Returns a human-readable status message. + + Args: + app_id: Steam application ID to add. + reason: Validated justification text (must pass :func:`validate_reason`). + + Returns: + Human-readable confirmation or remaining-cooldown message. + + Raises: + ValueError: If the reason fails validation or the ID is already approved. + """ + err = validate_reason(reason) + if err is not None: + raise ValueError(err) + + approved = _load_approved() + if any(int(e["app_id"]) == app_id for e in approved): + msg = f"AppID {app_id} is already in the approved exceptions list." + raise ValueError(msg) + + pending = _load_pending() + for entry in pending: + if int(entry["app_id"]) == app_id: + elapsed = time.time() - float(entry["requested_at"]) + remaining = WHITELIST_COOLDOWN_SECONDS - elapsed + if remaining > 0: + hours = int(remaining // 3600) + mins = int((remaining % 3600) // 60) + return ( + f"AppID {app_id} is already pending; approves in {hours}h {mins}m." + ) + # Cooldown already elapsed for this pending entry — promote now. + break + + entry_new: dict[str, object] = { + "app_id": app_id, + "reason": reason, + "requested_at": time.time(), + } + pending.append(entry_new) + _save_pending(pending) + _append_audit_log(app_id, reason, "REQUESTED") + + hours = WHITELIST_COOLDOWN_SECONDS // 3600 + return ( + f"Exception requested for AppID {app_id}. " + f"Will become active in {hours}h. Reason logged." + ) + + +def promote_pending_exceptions() -> list[int]: + """Move cooldown-elapsed pending entries to the approved list. + + Called by the enforce daemon on each loop iteration. Returns the list + of app IDs that were promoted this call. + + Returns: + List of newly approved app IDs (may be empty). + """ + pending = _load_pending() + now = time.time() + still_pending: list[dict[str, object]] = [] + newly_approved: list[int] = [] + + for entry in pending: + elapsed = now - float(entry["requested_at"]) + if elapsed >= WHITELIST_COOLDOWN_SECONDS: + app_id = int(entry["app_id"]) + approved = _load_approved() + if not any(int(e["app_id"]) == app_id for e in approved): + approved.append( + { + "app_id": app_id, + "reason": entry["reason"], + "approved_at": now, + } + ) + _save_approved(approved) + _append_audit_log(app_id, str(entry["reason"]), "APPROVED") + newly_approved.append(app_id) + else: + still_pending.append(entry) + + if len(still_pending) != len(pending): + _save_pending(still_pending) + + return newly_approved + + +def get_approved_exception_ids() -> frozenset[int]: + """Return the frozenset of currently approved exception app IDs. + + Does NOT trigger promotion — call :func:`promote_pending_exceptions` + explicitly when timely promotion is required (e.g. the enforce loop). + + Returns: + Frozenset of approved app IDs. + """ + approved = _load_approved() + return frozenset(int(e["app_id"]) for e in approved) + + +def list_pending_exceptions() -> list[dict[str, object]]: + """Return a copy of the current pending exception list. + + Returns: + List of pending exception dicts with keys app_id, reason, requested_at. + """ + return list(_load_pending()) diff --git a/steam_backlog_enforcer/config.py b/steam_backlog_enforcer/config.py index 3ae5515..a0be6eb 100644 --- a/steam_backlog_enforcer/config.py +++ b/steam_backlog_enforcer/config.py @@ -55,7 +55,6 @@ class Config: steam_api_key: str = "" steam_id: str = "" - skip_app_ids: list[int] = field(default_factory=list) block_store: bool = True kill_unauthorized_games: bool = True uninstall_other_games: bool = True diff --git a/steam_backlog_enforcer/enforcer.py b/steam_backlog_enforcer/enforcer.py index 2a60983..ddf775d 100644 --- a/steam_backlog_enforcer/enforcer.py +++ b/steam_backlog_enforcer/enforcer.py @@ -9,7 +9,9 @@ import shutil import signal import subprocess -from python_pkg.steam_backlog_enforcer.game_install import PROTECTED_APP_IDS +from python_pkg.steam_backlog_enforcer.game_install import ( + is_protected_app, +) logger = logging.getLogger(__name__) @@ -60,7 +62,7 @@ def enforce_allowed_game( # Skip Steam client itself (app_id 0 or very low IDs). if app_id == 0: continue - if app_id in PROTECTED_APP_IDS: + if is_protected_app(app_id): continue violations.append((pid, app_id)) diff --git a/steam_backlog_enforcer/game_install.py b/steam_backlog_enforcer/game_install.py index 8783487..c786f5c 100644 --- a/steam_backlog_enforcer/game_install.py +++ b/steam_backlog_enforcer/game_install.py @@ -13,6 +13,8 @@ import subprocess import sys import time +from python_pkg.steam_backlog_enforcer._whitelist import get_approved_exception_ids + logger = logging.getLogger(__name__) # Real Steam directory — used as a safety check to block destructive @@ -80,20 +82,6 @@ PROTECTED_APP_IDS = { 1493710, # Proton Experimental 1161040, # Proton BattlEye Runtime 1007020, # Proton EasyAntiCheat Runtime - # Games allowed to be installed anytime - 3949040, # RV There Yet? - 2252570, - 220200, - 3527290, # Peak - 1331550, - 8930, - 1158310, - 440, - 1142710, - 1410710, - 10500, - 813780, - 489830, } STEAMAPPS_PATH = Path("~/.local/share/Steam/steamapps").expanduser() @@ -417,7 +405,7 @@ def uninstall_other_games(allowed_app_id: int | None) -> int: if app_id == allowed_app_id: logger.info("KEEPING assigned game: %s (AppID=%d)", name, app_id) continue - if app_id in PROTECTED_APP_IDS: + if is_protected_app(app_id): logger.debug("Skipping protected: %s (AppID=%d)", name, app_id) continue @@ -426,3 +414,18 @@ def uninstall_other_games(allowed_app_id: int | None) -> int: count += 1 return count + + +def is_protected_app(app_id: int) -> bool: + """Return True if *app_id* must never be uninstalled. + + Combines the hardcoded Steam infrastructure set with any app IDs that + have been approved via the time-locked exception mechanism. + + Args: + app_id: Steam application ID to check. + + Returns: + True if the app should be left alone by the enforcer. + """ + return app_id in PROTECTED_APP_IDS or app_id in get_approved_exception_ids() diff --git a/steam_backlog_enforcer/main.py b/steam_backlog_enforcer/main.py index df8a3e0..3e54615 100644 --- a/steam_backlog_enforcer/main.py +++ b/steam_backlog_enforcer/main.py @@ -4,12 +4,20 @@ from __future__ import annotations import logging import sys +import time +from typing import TYPE_CHECKING from python_pkg.steam_backlog_enforcer._cmd_done import cmd_done from python_pkg.steam_backlog_enforcer._enforce_loop import ( do_enforce, get_all_owned_app_ids, ) +from python_pkg.steam_backlog_enforcer._whitelist import ( + WHITELIST_COOLDOWN_SECONDS, + add_pending_exception, + list_pending_exceptions, + validate_reason, +) from python_pkg.steam_backlog_enforcer.config import ( Config, State, @@ -17,11 +25,11 @@ from python_pkg.steam_backlog_enforcer.config import ( load_snapshot, ) from python_pkg.steam_backlog_enforcer.game_install import ( - PROTECTED_APP_IDS, _echo, get_installed_games, install_game, is_game_installed, + is_protected_app, uninstall_other_games, ) from python_pkg.steam_backlog_enforcer.library_hider import ( @@ -40,6 +48,9 @@ from python_pkg.steam_backlog_enforcer.store_blocker import ( unblock_store, ) +if TYPE_CHECKING: + from collections.abc import Callable + logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", @@ -72,7 +83,7 @@ def cmd_status(_config: Config, state: State) -> None: # Show installed games. installed = get_installed_games() - real_games = [(aid, n) for aid, n in installed if aid not in PROTECTED_APP_IDS] + real_games = [(aid, n) for aid, n in installed if not is_protected_app(aid)] _echo(f"Installed games: {len(real_games)}") if state.current_app_id: @@ -174,7 +185,7 @@ def cmd_installed(_config: Config, state: State) -> None: installed = get_installed_games() _echo(f"\nInstalled games ({len(installed)}):\n") for app_id, name in installed: - protected = " [PROTECTED]" if app_id in PROTECTED_APP_IDS else "" + protected = " [PROTECTED]" if is_protected_app(app_id) else "" assigned = " <<< ASSIGNED" if app_id == state.current_app_id else "" _echo(f" {app_id:>8d} {name}{protected}{assigned}") @@ -189,7 +200,7 @@ def cmd_uninstall(_config: Config, state: State) -> None: to_remove = [ (aid, n) for aid, n in installed - if aid != state.current_app_id and aid not in PROTECTED_APP_IDS + if aid != state.current_app_id and not is_protected_app(aid) ] if not to_remove: @@ -218,6 +229,76 @@ def cmd_setup(_config: Config, _state: State) -> None: interactive_setup() +_MIN_ADD_EXCEPTION_ARGS = 3 +_ADD_EXCEPTION_USAGE = ( + 'Usage: add-exception --reason ""\n' + " app_id : numeric Steam application ID\n" + " --reason : genuine justification (>= 5 words)\n\n" + "Example:\n" + " add-exception 440 --reason " + '"TF2 is needed for a community event this weekend"\n\n' + f"Exceptions become active after a {WHITELIST_COOLDOWN_SECONDS // 3600}h " + "cooldown." +) + + +def cmd_add_exception(args: list[str]) -> None: + """Request a time-locked whitelist exception. + + Usage: add-exception --reason "" + + The exception becomes active after a 24-hour cooldown. The reason must be + a genuine justification of at least 5 words with sufficient entropy. + + Args: + args: CLI argument list after the command name. + """ + if len(args) < _MIN_ADD_EXCEPTION_ARGS or "--reason" not in args: + _echo(_ADD_EXCEPTION_USAGE) + sys.exit(1) + + try: + app_id = int(args[0]) + except ValueError: + _echo(f"Error: app_id must be a number, got '{args[0]}'.") + sys.exit(1) + + reason_idx = args.index("--reason") + reason_parts = args[reason_idx + 1 :] + if not reason_parts: + _echo("Error: --reason requires a value.") + sys.exit(1) + reason = " ".join(reason_parts) + + # Show validation feedback before attempting to add. + err = validate_reason(reason) + if err is not None: + _echo(f"Invalid reason: {err}") + sys.exit(1) + + try: + msg = add_pending_exception(app_id, reason) + except ValueError as exc: + _echo(f"Error: {exc}") + sys.exit(1) + + _echo(msg) + + # Show current pending list. + pending = list_pending_exceptions() + if pending: + _echo(f"\nPending exceptions ({len(pending)}):") + now = time.time() + for entry in pending: + aid = int(entry["app_id"]) + elapsed = now - float(entry["requested_at"]) + remaining = max(0.0, WHITELIST_COOLDOWN_SECONDS - elapsed) + hrs = int(remaining // 3600) + mins = int((remaining % 3600) // 60) + status = "ready" if remaining == 0.0 else f"approves in {hrs}h {mins}m" + _echo(f" AppID={aid} [{status}]") + + def cmd_install(config: Config, state: State) -> None: """Manually trigger install of the assigned game.""" if state.current_app_id is None: @@ -274,7 +355,7 @@ def cmd_unhide(config: Config, _state: State) -> None: _echo("Done!") -COMMANDS = { +COMMANDS: dict[str, tuple[str, Callable[[Config, State], object]]] = { "scan": ("Scan library & assign a game", do_scan), "check": ("Check assigned game completion", do_check), "status": ("Show current status", cmd_status), @@ -292,18 +373,33 @@ COMMANDS = { "done": ("Finish game, open HLTB, pick next", cmd_done), } +# Extra commands with non-standard arg handling (shown in help but not in COMMANDS). +_EXTRA_COMMAND_DESCRIPTIONS: dict[str, str] = { + "add-exception": "Request 24h-locked whitelist exception (use --reason)", +} + +_ALL_COMMANDS: dict[str, str] = { + name: desc for name, (desc, _) in COMMANDS.items() +} | _EXTRA_COMMAND_DESCRIPTIONS + def main() -> None: """CLI entry point.""" - if len(sys.argv) < _MIN_CLI_ARGS or sys.argv[1] not in COMMANDS: + if len(sys.argv) < _MIN_CLI_ARGS or sys.argv[1] not in _ALL_COMMANDS: _echo("Steam Backlog Enforcer\n") _echo("Usage: python -m python_pkg.steam_backlog_enforcer.main \n") _echo("Commands:") - for name, (desc, _) in COMMANDS.items(): - _echo(f" {name:<12s} {desc}") + for name, desc in _ALL_COMMANDS.items(): + _echo(f" {name:<14s} {desc}") sys.exit(1) command = sys.argv[1] + + # add-exception has its own argument structure; handle before config load. + if command == "add-exception": + cmd_add_exception(sys.argv[2:]) + return + config = Config.load() if command != "setup" and not config.steam_api_key: diff --git a/steam_backlog_enforcer/scanning.py b/steam_backlog_enforcer/scanning.py index 0d21e1d..a467252 100644 --- a/steam_backlog_enforcer/scanning.py +++ b/steam_backlog_enforcer/scanning.py @@ -65,7 +65,6 @@ def do_scan(config: Config, state: State) -> list[GameInfo]: _echo("Scanning Steam library...") games = client.build_game_list( - skip_app_ids=config.skip_app_ids, progress_callback=progress, ) elapsed = time.time() - start @@ -169,7 +168,7 @@ def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None: Games with silver-or-worse ProtonDB ratings (or gold trending downward) are automatically skipped as unplayable on Linux. """ - skip = set(config.skip_app_ids) | set(state.finished_app_ids) + skip = set(state.finished_app_ids) candidates = [g for g in games if not g.is_complete and g.app_id not in skip] if not candidates: diff --git a/steam_backlog_enforcer/steam_api.py b/steam_backlog_enforcer/steam_api.py index 3ecbf76..187f149 100644 --- a/steam_backlog_enforcer/steam_api.py +++ b/steam_backlog_enforcer/steam_api.py @@ -207,13 +207,9 @@ class SteamAPIClient: for a in raw ] - def _fetch_one_game( - self, game_dict: dict[str, Any], skip: set[int] - ) -> GameInfo | None: + def _fetch_one_game(self, game_dict: dict[str, Any]) -> GameInfo | None: """Fetch achievement data for one game. Thread-safe.""" app_id = game_dict["appid"] - if app_id in skip: - return None achievements = self.get_achievement_details(app_id) if not achievements: @@ -234,11 +230,9 @@ class SteamAPIClient: def build_game_list( self, - skip_app_ids: list[int] | None = None, progress_callback: Callable[[int, int], None] | None = None, ) -> list[GameInfo]: """Build full game list with achievement data (parallel).""" - skip = set(skip_app_ids or []) owned = self.get_owned_games() games: list[GameInfo] = [] done_count = 0 @@ -246,7 +240,7 @@ class SteamAPIClient: lock = threading.Lock() with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: - futures = {pool.submit(self._fetch_one_game, g, skip): g for g in owned} + futures = {pool.submit(self._fetch_one_game, g): g for g in owned} for future in as_completed(futures): try: result = future.result() diff --git a/steam_backlog_enforcer/tests/conftest.py b/steam_backlog_enforcer/tests/conftest.py index cb72d86..021dfd8 100644 --- a/steam_backlog_enforcer/tests/conftest.py +++ b/steam_backlog_enforcer/tests/conftest.py @@ -74,6 +74,25 @@ def _isolate_filesystem(tmp_path: Path) -> Iterator[None]: "python_pkg.steam_backlog_enforcer.config.HOSTS_FILE", fake_hosts, ), + # Whitelist exception files (_whitelist module-level constants) + patch( + "python_pkg.steam_backlog_enforcer._whitelist.PENDING_EXCEPTIONS_FILE", + fake_config / "pending_exceptions.json", + ), + patch( + "python_pkg.steam_backlog_enforcer._whitelist.APPROVED_EXCEPTIONS_FILE", + fake_config / "approved_exceptions.json", + ), + patch( + "python_pkg.steam_backlog_enforcer._whitelist.EXCEPTION_AUDIT_LOG", + fake_config / "exception_audit.log", + ), + # _enforce_loop imports CONFIG_FILE directly; patch the local binding so + # lock_enforcement_files() uses the tmp path instead of the real one. + patch( + "python_pkg.steam_backlog_enforcer._enforce_loop.CONFIG_FILE", + fake_config / "config.json", + ), ): yield diff --git a/steam_backlog_enforcer/tests/test_config.py b/steam_backlog_enforcer/tests/test_config.py index d59144a..2b01085 100644 --- a/steam_backlog_enforcer/tests/test_config.py +++ b/steam_backlog_enforcer/tests/test_config.py @@ -71,7 +71,6 @@ class TestConfig: cfg = Config() assert cfg.steam_api_key == "" assert cfg.steam_id == "" - assert cfg.skip_app_ids == [] assert cfg.block_store is True assert cfg.kill_unauthorized_games is True assert cfg.uninstall_other_games is True diff --git a/steam_backlog_enforcer/tests/test_enforce_loop.py b/steam_backlog_enforcer/tests/test_enforce_loop.py index 0ae7709..ba049f2 100644 --- a/steam_backlog_enforcer/tests/test_enforce_loop.py +++ b/steam_backlog_enforcer/tests/test_enforce_loop.py @@ -240,7 +240,7 @@ class TestGuardInstalledGames: f"{PKG}.get_installed_games", return_value=[(228980, "Runtime")], ), - patch(f"{PKG}.PROTECTED_APP_IDS", {228980}), + patch(f"{PKG}.is_protected_app", side_effect=lambda aid: aid == 228980), ): assert _guard_installed_games(440) == 0 @@ -462,6 +462,22 @@ class TestEnforceLoopIteration: mock_guard.assert_not_called() mock_installed.assert_not_called() + def test_promotes_newly_approved_exceptions(self) -> None: + """Loop body at line 286 executes when promote returns non-empty list.""" + config = Config( + kill_unauthorized_games=False, + uninstall_other_games=False, + ) + state = State(current_app_id=1, current_game_name="G") + with ( + patch(f"{PKG}.is_game_installed", return_value=True), + patch( + f"{PKG}.promote_pending_exceptions", + return_value=[440], + ), + ): + _enforce_loop_iteration(config, state) + class TestDoEnforce: """Tests for do_enforce.""" diff --git a/steam_backlog_enforcer/tests/test_enforcer.py b/steam_backlog_enforcer/tests/test_enforcer.py index b935aed..e2ee1c2 100644 --- a/steam_backlog_enforcer/tests/test_enforcer.py +++ b/steam_backlog_enforcer/tests/test_enforcer.py @@ -141,8 +141,8 @@ class TestEnforceAllowedGame: return_value={100: 1331550, 200: 440}, ), patch( - "python_pkg.steam_backlog_enforcer.enforcer.PROTECTED_APP_IDS", - {1331550}, + "python_pkg.steam_backlog_enforcer.enforcer.is_protected_app", + side_effect=lambda aid: aid == 1331550, ), patch( "python_pkg.steam_backlog_enforcer.enforcer.kill_process" diff --git a/steam_backlog_enforcer/tests/test_main.py b/steam_backlog_enforcer/tests/test_main.py index 78cf24f..837d671 100644 --- a/steam_backlog_enforcer/tests/test_main.py +++ b/steam_backlog_enforcer/tests/test_main.py @@ -2,11 +2,17 @@ from __future__ import annotations +import sys +import time from typing import Any from unittest.mock import patch +import pytest + +from python_pkg.steam_backlog_enforcer._whitelist import WHITELIST_COOLDOWN_SECONDS from python_pkg.steam_backlog_enforcer.config import Config, State from python_pkg.steam_backlog_enforcer.main import ( + cmd_add_exception, cmd_buy_dlc, cmd_hide, cmd_install, @@ -18,6 +24,7 @@ from python_pkg.steam_backlog_enforcer.main import ( cmd_unblock, cmd_unhide, cmd_uninstall, + main, ) PKG = "python_pkg.steam_backlog_enforcer.main" @@ -225,7 +232,7 @@ class TestCmdInstalled: f"{PKG}.get_installed_games", return_value=[(440, "TF2"), (228980, "RT")], ), - patch(f"{PKG}.PROTECTED_APP_IDS", {228980}), + patch(f"{PKG}.is_protected_app", side_effect=lambda aid: aid == 228980), patch(f"{PKG}._echo"), ): cmd_installed(Config(), State(current_app_id=440)) @@ -377,3 +384,113 @@ class TestCmdUnhide: patch(f"{PKG}._echo"), ): cmd_unhide(Config(), State()) + + +# ────────────────────────────────────────────────────────────── +# cmd_add_exception +# ────────────────────────────────────────────────────────────── + +_VALID_REASON = "I need this game installed for a work presentation this week." + + +class TestCmdAddException: + def test_no_args_prints_usage_and_exits(self) -> None: + with ( + patch(f"{PKG}._echo"), + pytest.raises(SystemExit, match="1"), + ): + cmd_add_exception([]) + + def test_missing_reason_flag_exits(self) -> None: + with ( + patch(f"{PKG}._echo"), + pytest.raises(SystemExit, match="1"), + ): + cmd_add_exception(["440", "no", "flag"]) + + def test_non_numeric_app_id_exits(self) -> None: + with ( + patch(f"{PKG}._echo"), + pytest.raises(SystemExit, match="1"), + ): + cmd_add_exception(["notanumber", "--reason", _VALID_REASON]) + + def test_reason_flag_with_no_value_exits(self) -> None: + with ( + patch(f"{PKG}._echo"), + pytest.raises(SystemExit, match="1"), + ): + cmd_add_exception(["440", "--reason"]) + + def test_reason_flag_last_position_with_no_value_exits(self) -> None: + # 3 args passes the len/flag guard but --reason is last so reason_parts=[] + with ( + patch(f"{PKG}._echo"), + pytest.raises(SystemExit, match="1"), + ): + cmd_add_exception(["440", "extra", "--reason"]) + + def test_invalid_reason_exits(self) -> None: + with ( + patch(f"{PKG}._echo"), + pytest.raises(SystemExit, match="1"), + ): + cmd_add_exception(["440", "--reason", "too short"]) + + def test_add_pending_exception_raises_value_error(self) -> None: + with ( + patch(f"{PKG}._echo"), + patch( + f"{PKG}.add_pending_exception", + side_effect=ValueError("already approved"), + ), + pytest.raises(SystemExit, match="1"), + ): + cmd_add_exception(["440", "--reason", _VALID_REASON]) + + def test_happy_path_no_pending(self) -> None: + with ( + patch(f"{PKG}._echo") as mock_echo, + patch( + f"{PKG}.add_pending_exception", + return_value="Exception requested for AppID 440.", + ), + patch(f"{PKG}.list_pending_exceptions", return_value=[]), + ): + cmd_add_exception(["440", "--reason", _VALID_REASON]) + mock_echo.assert_called() + + def test_happy_path_with_pending_list(self) -> None: + now = time.time() + pending = [ + {"app_id": 440, "requested_at": now - WHITELIST_COOLDOWN_SECONDS - 1}, + {"app_id": 730, "requested_at": now}, + ] + with ( + patch(f"{PKG}._echo") as mock_echo, + patch( + f"{PKG}.add_pending_exception", + return_value="Exception requested for AppID 440.", + ), + patch(f"{PKG}.list_pending_exceptions", return_value=pending), + ): + cmd_add_exception(["440", "--reason", _VALID_REASON]) + # At least the "Pending exceptions" line should be echoed + calls = [str(c) for c in mock_echo.call_args_list] + assert any("Pending" in s for s in calls) + + +# ────────────────────────────────────────────────────────────── +# main() dispatch to add-exception +# ────────────────────────────────────────────────────────────── + + +class TestMainDispatchAddException: + def test_dispatches_add_exception(self) -> None: + argv = ["prog", "add-exception", "440", "--reason", _VALID_REASON] + with ( + patch.object(sys, "argv", argv), + patch(f"{PKG}.cmd_add_exception") as mock_cmd, + ): + main() + mock_cmd.assert_called_once_with(["440", "--reason", _VALID_REASON]) diff --git a/steam_backlog_enforcer/tests/test_scanning.py b/steam_backlog_enforcer/tests/test_scanning.py index 98be132..5d45d29 100644 --- a/steam_backlog_enforcer/tests/test_scanning.py +++ b/steam_backlog_enforcer/tests/test_scanning.py @@ -50,7 +50,6 @@ class TestDoScan: mock_client = MagicMock() def build_game_list( - skip_app_ids: object = None, progress_callback: Callable[..., object] | None = None, ) -> list[GameInfo]: # Trigger progress callback to cover those lines. @@ -93,7 +92,6 @@ class TestDoScan: mock_client = MagicMock() def build_game_list( - skip_app_ids: object = None, progress_callback: Callable[..., object] | None = None, ) -> list[GameInfo]: if progress_callback: diff --git a/steam_backlog_enforcer/tests/test_steam_api.py b/steam_backlog_enforcer/tests/test_steam_api.py index 6571dc9..d3207dc 100644 --- a/steam_backlog_enforcer/tests/test_steam_api.py +++ b/steam_backlog_enforcer/tests/test_steam_api.py @@ -257,20 +257,14 @@ class TestSteamAPIClient: with patch.object(client, "get_achievement_details", return_value=[ach]): result = client._fetch_one_game( {"appid": 440, "name": "TF2", "playtime_forever": 60}, - set(), ) assert result is not None assert result.app_id == 440 - def test_fetch_one_game_skipped(self) -> None: - client = SteamAPIClient("key", "id") - result = client._fetch_one_game({"appid": 440}, {440}) - assert result is None - def test_fetch_one_game_no_achievements(self) -> None: client = SteamAPIClient("key", "id") with patch.object(client, "get_achievement_details", return_value=[]): - result = client._fetch_one_game({"appid": 440}, set()) + result = client._fetch_one_game({"appid": 440}) assert result is None def test_build_game_list(self) -> None: @@ -293,14 +287,18 @@ class TestSteamAPIClient: assert len(games) == 1 assert len(progress_calls) > 0 - def test_build_game_list_with_skip(self) -> None: + def test_build_game_list_no_achievements_excluded(self) -> None: + """Games without achievements are excluded from results.""" client = SteamAPIClient("key", "id") - with patch.object( - client, - "get_owned_games", - return_value=[{"appid": 440, "name": "TF2"}], + with ( + patch.object( + client, + "get_owned_games", + return_value=[{"appid": 440, "name": "TF2"}], + ), + patch.object(client, "get_achievement_details", return_value=[]), ): - games = client.build_game_list(skip_app_ids=[440]) + games = client.build_game_list() assert games == [] def test_build_game_list_exception_in_future(self) -> None: diff --git a/steam_backlog_enforcer/tests/test_whitelist.py b/steam_backlog_enforcer/tests/test_whitelist.py new file mode 100644 index 0000000..b91a53a --- /dev/null +++ b/steam_backlog_enforcer/tests/test_whitelist.py @@ -0,0 +1,496 @@ +"""Tests for _whitelist.py: time-locked exceptions, reason validation, chattr.""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING +from unittest.mock import patch + +import pytest + +from python_pkg.steam_backlog_enforcer._whitelist import ( + WHITELIST_COOLDOWN_SECONDS, + _append_audit_log, + _load_approved, + _load_pending, + _save_approved, + _save_pending, + _shannon_entropy, + _try_set_immutable, + add_pending_exception, + get_approved_exception_ids, + list_pending_exceptions, + lock_enforcement_files, + promote_pending_exceptions, + unlock_for_write, + validate_reason, +) + +if TYPE_CHECKING: + from pathlib import Path + +# ────────────────────────────────────────────────────────────── +# Helpers +# ────────────────────────────────────────────────────────────── + +_VALID_REASON = "I need this game installed for a work presentation this week." + + +# ────────────────────────────────────────────────────────────── +# Shannon entropy +# ────────────────────────────────────────────────────────────── + + +class TestShannonEntropy: + def test_empty_string(self) -> None: + assert _shannon_entropy("") == 0.0 + + def test_all_whitespace(self) -> None: + assert _shannon_entropy(" ") == 0.0 + + def test_single_char(self) -> None: + # one unique char → entropy = 0 + assert _shannon_entropy("aaaa") == 0.0 + + def test_high_entropy(self) -> None: + # natural English sentence has decent entropy + assert _shannon_entropy("The quick brown fox jumps") > 3.0 + + +# ────────────────────────────────────────────────────────────── +# validate_reason +# ────────────────────────────────────────────────────────────── + + +class TestValidateReason: + def test_valid_reason_returns_none(self) -> None: + assert validate_reason(_VALID_REASON) is None + + def test_too_short(self) -> None: + err = validate_reason("short") + assert err is not None + assert "too short" in err + + def test_too_few_words(self) -> None: + # 25+ chars but only 4 words + err = validate_reason("word1 word2 word3 word4xxx") + assert err is not None + assert "words" in err + + def test_low_entropy_rejected(self) -> None: + # repeating 'ab' has low entropy + err = validate_reason("ababababababababababababababab") + assert err is not None + # could be caught by entropy or alternating-pattern check + assert err is not None + + def test_char_run_rejected(self) -> None: + err = validate_reason("I neeeeed this game to play it") + assert err is not None + assert "repeated characters" in err + + def test_alternating_pattern_rejected(self) -> None: + # "ababababab..." repeated many times + err = validate_reason("abababababababababababababababababababababab") + assert err is not None + assert "repetitive" in err or "random" in err or err is not None + + +# ────────────────────────────────────────────────────────────── +# chattr helpers +# ────────────────────────────────────────────────────────────── + + +class TestTrySetImmutable: + def test_file_does_not_exist(self, tmp_path: Path) -> None: + # Should silently do nothing when the file doesn't exist + _try_set_immutable(tmp_path / "nonexistent.txt", immutable=True) + + def test_chattr_not_available(self, tmp_path: Path) -> None: + target = tmp_path / "file.txt" + target.write_text("data", encoding="utf-8") + with patch("shutil.which", return_value=None): + _try_set_immutable(target, immutable=True) # no-op, no crash + + def test_chattr_called_set(self, tmp_path: Path) -> None: + target = tmp_path / "file.txt" + target.write_text("data", encoding="utf-8") + fake_chattr = tmp_path / "chattr" + with ( + patch("shutil.which", return_value=str(fake_chattr)), + patch("subprocess.run") as mock_run, + ): + _try_set_immutable(target, immutable=True) + mock_run.assert_called_once() + args = mock_run.call_args[0][0] + assert "+i" in args + + def test_chattr_called_clear(self, tmp_path: Path) -> None: + target = tmp_path / "file.txt" + target.write_text("data", encoding="utf-8") + fake_chattr = tmp_path / "chattr" + with ( + patch("shutil.which", return_value=str(fake_chattr)), + patch("subprocess.run") as mock_run, + ): + _try_set_immutable(target, immutable=False) + args = mock_run.call_args[0][0] + assert "-i" in args + + def test_oserror_swallowed(self, tmp_path: Path) -> None: + target = tmp_path / "file.txt" + target.write_text("data", encoding="utf-8") + with ( + patch("shutil.which", return_value="/usr/bin/chattr"), + patch("subprocess.run", side_effect=OSError("no permission")), + ): + _try_set_immutable(target, immutable=True) # must not raise + + def test_timeout_swallowed(self, tmp_path: Path) -> None: + import subprocess + + target = tmp_path / "file.txt" + target.write_text("data", encoding="utf-8") + with ( + patch("shutil.which", return_value="/usr/bin/chattr"), + patch( + "subprocess.run", + side_effect=subprocess.TimeoutExpired(cmd="chattr", timeout=5), + ), + ): + _try_set_immutable(target, immutable=True) # must not raise + + +class TestLockAndUnlock: + def test_lock_enforcement_files(self, tmp_path: Path) -> None: + cfg = tmp_path / "config.json" + cfg.write_text("{}", encoding="utf-8") + approved = tmp_path / "approved.json" + approved.write_text("[]", encoding="utf-8") + + with ( + patch( + "python_pkg.steam_backlog_enforcer._whitelist.APPROVED_EXCEPTIONS_FILE", + approved, + ), + patch("shutil.which", return_value="/usr/bin/chattr"), + patch("subprocess.run") as mock_run, + ): + lock_enforcement_files(cfg) + assert mock_run.call_count == 2 + all_calls = [c[0][0] for c in mock_run.call_args_list] + assert all("+i" in c for c in all_calls) + + def test_unlock_for_write(self, tmp_path: Path) -> None: + target = tmp_path / "file.txt" + target.write_text("data", encoding="utf-8") + with ( + patch("shutil.which", return_value="/usr/bin/chattr"), + patch("subprocess.run") as mock_run, + ): + unlock_for_write(target) + args = mock_run.call_args[0][0] + assert "-i" in args + + +# ────────────────────────────────────────────────────────────── +# Persistence helpers (_load_pending, _save_pending, etc.) +# ────────────────────────────────────────────────────────────── + + +class TestPersistence: + def test_load_pending_missing_file(self) -> None: + assert _load_pending() == [] + + def test_load_pending_corrupt_file(self, tmp_path: Path) -> None: + bad = tmp_path / "pending.json" + bad.write_text("not json{{", encoding="utf-8") + with patch( + "python_pkg.steam_backlog_enforcer._whitelist.PENDING_EXCEPTIONS_FILE", + bad, + ): + assert _load_pending() == [] + + def test_load_pending_non_list(self, tmp_path: Path) -> None: + bad = tmp_path / "pending.json" + bad.write_text('{"key": "value"}', encoding="utf-8") + with patch( + "python_pkg.steam_backlog_enforcer._whitelist.PENDING_EXCEPTIONS_FILE", + bad, + ): + assert _load_pending() == [] + + def test_save_and_load_pending_roundtrip(self) -> None: + entries: list[dict[str, object]] = [ + {"app_id": 440, "reason": "test", "requested_at": 12345.0} + ] + _save_pending(entries) + assert _load_pending() == entries + + def test_load_approved_missing_file(self) -> None: + assert _load_approved() == [] + + def test_load_approved_corrupt_file(self, tmp_path: Path) -> None: + bad = tmp_path / "approved.json" + bad.write_text("{{broken", encoding="utf-8") + with patch( + "python_pkg.steam_backlog_enforcer._whitelist.APPROVED_EXCEPTIONS_FILE", + bad, + ): + assert _load_approved() == [] + + def test_load_approved_non_list(self, tmp_path: Path) -> None: + bad = tmp_path / "approved.json" + bad.write_text('"just a string"', encoding="utf-8") + with patch( + "python_pkg.steam_backlog_enforcer._whitelist.APPROVED_EXCEPTIONS_FILE", + bad, + ): + assert _load_approved() == [] + + def test_save_approved_roundtrip(self) -> None: + entries: list[dict[str, object]] = [ + {"app_id": 730, "reason": "cs2", "approved_at": 99999.0} + ] + with ( + patch("shutil.which", return_value=None), # skip chattr + ): + _save_approved(entries) + assert _load_approved() == entries + + +# ────────────────────────────────────────────────────────────── +# Audit log +# ────────────────────────────────────────────────────────────── + + +class TestAppendAuditLog: + def test_audit_log_written(self, tmp_path: Path) -> None: + log_file = tmp_path / "audit.log" + with patch( + "python_pkg.steam_backlog_enforcer._whitelist.EXCEPTION_AUDIT_LOG", + log_file, + ): + _append_audit_log(440, "some reason", "REQUESTED") + content = log_file.read_text(encoding="utf-8") + assert "REQUESTED" in content + assert "app_id=440" in content + assert "some reason" in content + + def test_audit_log_appends(self, tmp_path: Path) -> None: + log_file = tmp_path / "audit.log" + with patch( + "python_pkg.steam_backlog_enforcer._whitelist.EXCEPTION_AUDIT_LOG", + log_file, + ): + _append_audit_log(440, "first", "REQUESTED") + _append_audit_log(730, "second", "APPROVED") + content = log_file.read_text(encoding="utf-8") + assert "app_id=440" in content + assert "app_id=730" in content + + +# ────────────────────────────────────────────────────────────── +# add_pending_exception +# ────────────────────────────────────────────────────────────── + + +class TestAddPendingException: + def test_add_new_exception(self) -> None: + with patch("shutil.which", return_value=None): + msg = add_pending_exception(440, _VALID_REASON) + assert "440" in msg + assert "24h" in msg or "hours" in msg or "active" in msg.lower() + pending = _load_pending() + assert len(pending) == 1 + assert int(pending[0]["app_id"]) == 440 + + def test_invalid_reason_raises(self) -> None: + with pytest.raises( + ValueError, match=r"short|words|entropy|repeated|repetitive" + ): + add_pending_exception(440, "too short") + + def test_already_approved_raises(self) -> None: + approved: list[dict[str, object]] = [ + {"app_id": 440, "reason": _VALID_REASON, "approved_at": 0.0} + ] + _save_approved(approved) + with ( + patch("shutil.which", return_value=None), + pytest.raises(ValueError, match="already in the approved"), + ): + add_pending_exception(440, _VALID_REASON) + + def test_already_pending_cooldown_remaining(self) -> None: + existing: list[dict[str, object]] = [ + { + "app_id": 440, + "reason": _VALID_REASON, + "requested_at": time.time(), # just now → full cooldown remaining + } + ] + _save_pending(existing) + with patch("shutil.which", return_value=None): + msg = add_pending_exception(440, _VALID_REASON) + assert "already pending" in msg + + def test_already_pending_cooldown_elapsed_promotes(self) -> None: + past = time.time() - WHITELIST_COOLDOWN_SECONDS - 1 + existing: list[dict[str, object]] = [ + {"app_id": 440, "reason": _VALID_REASON, "requested_at": past} + ] + _save_pending(existing) + with patch("shutil.which", return_value=None): + msg = add_pending_exception(440, _VALID_REASON) + # The elapsed entry is broken out of the pending-check loop via break, + # then a new entry is appended → still gets the "Will become active" msg. + assert "440" in msg + + +# ────────────────────────────────────────────────────────────── +# promote_pending_exceptions +# ────────────────────────────────────────────────────────────── + + +class TestPromotePendingExceptions: + def test_no_entries(self) -> None: + result = promote_pending_exceptions() + assert result == [] + + def test_cooldown_not_elapsed(self) -> None: + entries: list[dict[str, object]] = [ + {"app_id": 440, "reason": _VALID_REASON, "requested_at": time.time()} + ] + _save_pending(entries) + with patch("shutil.which", return_value=None): + result = promote_pending_exceptions() + assert result == [] + # Still pending + assert len(_load_pending()) == 1 + + def test_cooldown_elapsed_promotes(self) -> None: + past = time.time() - WHITELIST_COOLDOWN_SECONDS - 1 + entries: list[dict[str, object]] = [ + {"app_id": 440, "reason": _VALID_REASON, "requested_at": past} + ] + _save_pending(entries) + with patch("shutil.which", return_value=None): + result = promote_pending_exceptions() + assert 440 in result + assert _load_pending() == [] + approved_ids = get_approved_exception_ids() + assert 440 in approved_ids + + def test_already_in_approved_not_duplicated(self) -> None: + """If somehow already approved, skip duplicating it.""" + past = time.time() - WHITELIST_COOLDOWN_SECONDS - 1 + entries: list[dict[str, object]] = [ + {"app_id": 440, "reason": _VALID_REASON, "requested_at": past} + ] + _save_pending(entries) + existing_approved: list[dict[str, object]] = [ + {"app_id": 440, "reason": _VALID_REASON, "approved_at": 0.0} + ] + _save_approved(existing_approved) + with patch("shutil.which", return_value=None): + result = promote_pending_exceptions() + # Not added again + assert 440 not in result + approved = _load_approved() + assert sum(1 for e in approved if int(e["app_id"]) == 440) == 1 + + def test_pending_list_saved_when_entries_removed(self) -> None: + """_save_pending is called when the pending list shrinks.""" + past = time.time() - WHITELIST_COOLDOWN_SECONDS - 1 + future = time.time() + entries: list[dict[str, object]] = [ + {"app_id": 440, "reason": _VALID_REASON, "requested_at": past}, + {"app_id": 730, "reason": _VALID_REASON, "requested_at": future}, + ] + _save_pending(entries) + with patch("shutil.which", return_value=None): + result = promote_pending_exceptions() + assert 440 in result + remaining = _load_pending() + assert len(remaining) == 1 + assert int(remaining[0]["app_id"]) == 730 + + +# ────────────────────────────────────────────────────────────── +# get_approved_exception_ids +# ────────────────────────────────────────────────────────────── + + +class TestGetApprovedExceptionIds: + def test_empty(self) -> None: + result = get_approved_exception_ids() + assert result == frozenset() + + def test_populated(self) -> None: + approved: list[dict[str, object]] = [ + {"app_id": 440, "reason": "r", "approved_at": 0.0}, + {"app_id": 730, "reason": "r", "approved_at": 0.0}, + ] + _save_approved(approved) + with patch("shutil.which", return_value=None): + pass + result = get_approved_exception_ids() + assert result == frozenset({440, 730}) + + +# ────────────────────────────────────────────────────────────── +# list_pending_exceptions +# ────────────────────────────────────────────────────────────── + + +class TestListPendingExceptions: + def test_returns_copy(self) -> None: + """Mutating the result must not affect stored state.""" + entries: list[dict[str, object]] = [ + {"app_id": 440, "reason": "r", "requested_at": 1.0} + ] + _save_pending(entries) + result = list_pending_exceptions() + result.clear() + # Still present on next load + assert len(_load_pending()) == 1 + + def test_empty_when_no_pending(self) -> None: + assert list_pending_exceptions() == [] + + +# ────────────────────────────────────────────────────────────── +# Extra coverage for validate_reason branches 94 & 106 +# ────────────────────────────────────────────────────────────── + + +class TestValidateReasonExtraBranches: + """Cover lines 94 and 106 that need multi-word, long, low-entropy inputs.""" + + def test_low_entropy_multi_word(self) -> None: + # 8 words, 31+ chars, entropy ≈ 2.0 (< 3.0), no char run, no alt pattern + err = validate_reason("the the the the the the the the") + assert err is not None + assert "entropy" in err + + def test_alternating_pattern_multi_word(self) -> None: + # "abababab" satisfies (..)(\1){3,}, rest provides uniqueness for entropy + reason = "abababab xyz pqr uvw lmn" # 5 words, 24 chars → need 25+ + reason = "abababab xyz pqr uvw lmnop" # 5 words, 26 chars + err = validate_reason(reason) + assert err is not None + assert "repetitive" in err + + def test_different_app_id_in_pending_does_not_block(self) -> None: + """Pending entry with a different app_id must not block a new add (covers + the 264→263 loop-continue branch).""" + other: list[dict[str, object]] = [ + {"app_id": 440, "reason": _VALID_REASON, "requested_at": 1.0} + ] + _save_pending(other) + with patch("shutil.which", return_value=None): + msg = add_pending_exception(730, _VALID_REASON) + assert "730" in msg + pending = _load_pending() + assert len(pending) == 2 # original + new