mirror of
https://github.com/kuhyx/steam-backlog-enforcer.git
synced 2026-07-04 13:43:45 +02:00
feat(steam_backlog_enforcer): harden whitelist against circumvention
- Remove skip_app_ids from user-editable Config; callers updated - Split PROTECTED_APP_IDS: only Steam infra/Proton IDs remain; game IDs moved to a new time-locked exception system - Add _whitelist.py: 24-hour cooldown on new exceptions, entropy- checked justification (>= 5 words), append-only audit log, chattr +i immutability on enforcement-critical config files - Add is_protected_app() in game_install.py; used everywhere instead of direct PROTECTED_APP_IDS membership checks - Add 'add-exception' CLI command (cmd_add_exception in main.py) - Call promote_pending_exceptions() and lock_enforcement_files() in each _enforce_loop_iteration - 590 tests, 100% branch coverage on all steam_backlog_enforcer modules - Add .worktrees to .gitignore
This commit is contained in:
parent
c8eb20b118
commit
d0d1037e1b
@ -216,7 +216,7 @@ def _try_reassign_shorter_game(
|
||||
if not snapshot_data:
|
||||
return False
|
||||
all_games = [GameInfo.from_snapshot(d) for d in snapshot_data]
|
||||
skip = set(config.skip_app_ids) | set(state.finished_app_ids)
|
||||
skip = set(state.finished_app_ids)
|
||||
_refresh_uncached_shortlist_hours(
|
||||
all_games,
|
||||
hltb_cache,
|
||||
@ -296,7 +296,7 @@ def _finalize_completion(
|
||||
|
||||
games = [GameInfo.from_snapshot(d) for d in snapshot_data]
|
||||
hltb_cache = load_hltb_cache()
|
||||
skip = set(config.skip_app_ids) | set(state.finished_app_ids)
|
||||
skip = set(state.finished_app_ids)
|
||||
_refresh_uncached_shortlist_hours(games, hltb_cache, skip)
|
||||
_apply_cached_hours_to_games(games, hltb_cache)
|
||||
pick_next_game(games, state, config)
|
||||
|
||||
@ -7,8 +7,13 @@ import logging
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from python_pkg.steam_backlog_enforcer._whitelist import (
|
||||
lock_enforcement_files,
|
||||
promote_pending_exceptions,
|
||||
)
|
||||
from python_pkg.steam_backlog_enforcer.config import (
|
||||
CONFIG_DIR,
|
||||
CONFIG_FILE,
|
||||
Config,
|
||||
State,
|
||||
_atomic_write,
|
||||
@ -19,11 +24,11 @@ from python_pkg.steam_backlog_enforcer.enforcer import (
|
||||
send_notification,
|
||||
)
|
||||
from python_pkg.steam_backlog_enforcer.game_install import (
|
||||
PROTECTED_APP_IDS,
|
||||
_echo,
|
||||
get_installed_games,
|
||||
install_game,
|
||||
is_game_installed,
|
||||
is_protected_app,
|
||||
uninstall_game,
|
||||
uninstall_other_games,
|
||||
)
|
||||
@ -143,7 +148,7 @@ def _guard_installed_games(allowed_app_id: int | None) -> int:
|
||||
for app_id, name in installed:
|
||||
if app_id == allowed_app_id:
|
||||
continue
|
||||
if app_id in PROTECTED_APP_IDS:
|
||||
if is_protected_app(app_id):
|
||||
continue
|
||||
|
||||
logger.warning(
|
||||
@ -275,6 +280,14 @@ def _enforce_loop_iteration(config: Config, state: State) -> None:
|
||||
config.steam_id,
|
||||
)
|
||||
|
||||
# D) Promote any cooldown-elapsed pending exceptions to approved.
|
||||
newly_approved = promote_pending_exceptions()
|
||||
for aid in newly_approved:
|
||||
logger.info("Exception approved: AppID=%d", aid)
|
||||
|
||||
# E) Re-apply immutable flag so config cannot be edited without root.
|
||||
lock_enforcement_files(CONFIG_FILE)
|
||||
|
||||
|
||||
def do_enforce(config: Config, state: State) -> None:
|
||||
"""Run the enforcer: block store, uninstall other games, kill processes.
|
||||
|
||||
347
steam_backlog_enforcer/_whitelist.py
Normal file
347
steam_backlog_enforcer/_whitelist.py
Normal file
@ -0,0 +1,347 @@
|
||||
"""Whitelist hardening: time-locked exceptions, reason validation, immutability."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import Counter
|
||||
import contextlib
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
from python_pkg.steam_backlog_enforcer.config import CONFIG_DIR, _atomic_write
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# File paths (patched in tests via conftest)
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
PENDING_EXCEPTIONS_FILE: Path = CONFIG_DIR / "pending_exceptions.json"
|
||||
APPROVED_EXCEPTIONS_FILE: Path = CONFIG_DIR / "approved_exceptions.json"
|
||||
EXCEPTION_AUDIT_LOG: Path = CONFIG_DIR / "exception_audit.log"
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Constants
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
WHITELIST_COOLDOWN_SECONDS: int = 86400 # 24 hours
|
||||
|
||||
_MIN_REASON_WORDS: int = 5
|
||||
_MIN_REASON_LENGTH: int = 25
|
||||
_MIN_ENTROPY: float = 3.0
|
||||
# Reject runs of the same character longer than this (e.g. "aaaa").
|
||||
_MAX_CHAR_RUN: int = 3
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Reason validation
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _shannon_entropy(text: str) -> float:
|
||||
"""Return Shannon entropy (bits per character) for *text*.
|
||||
|
||||
Whitespace is excluded before counting so spaces don't inflate entropy.
|
||||
|
||||
Args:
|
||||
text: Input string to measure.
|
||||
|
||||
Returns:
|
||||
Entropy in bits per character, or 0.0 for empty input.
|
||||
"""
|
||||
chars = [c.lower() for c in text if not c.isspace()]
|
||||
if not chars:
|
||||
return 0.0
|
||||
total = len(chars)
|
||||
counts = Counter(chars)
|
||||
return -sum((c / total) * math.log2(c / total) for c in counts.values())
|
||||
|
||||
|
||||
def validate_reason(reason: str) -> str | None:
|
||||
"""Validate that a whitelist exception reason is genuine.
|
||||
|
||||
Returns None when the reason is acceptable, or a human-readable error
|
||||
string that explains why it was rejected.
|
||||
|
||||
Args:
|
||||
reason: User-supplied justification text.
|
||||
|
||||
Returns:
|
||||
None if valid, or an error message string if invalid.
|
||||
"""
|
||||
stripped = reason.strip()
|
||||
|
||||
if len(stripped) < _MIN_REASON_LENGTH:
|
||||
return (
|
||||
f"Reason is too short ({len(stripped)} chars; "
|
||||
f"need at least {_MIN_REASON_LENGTH})."
|
||||
)
|
||||
|
||||
words = stripped.split()
|
||||
if len(words) < _MIN_REASON_WORDS:
|
||||
return (
|
||||
f"Reason must contain at least {_MIN_REASON_WORDS} words "
|
||||
f"(got {len(words)})."
|
||||
)
|
||||
|
||||
entropy = _shannon_entropy(stripped)
|
||||
if entropy < _MIN_ENTROPY:
|
||||
return (
|
||||
f"Reason appears to be random characters "
|
||||
f"(entropy {entropy:.2f} < {_MIN_ENTROPY}). "
|
||||
"Write a genuine justification."
|
||||
)
|
||||
|
||||
# Reject runs of the same character: aaaa, bbbbbb, etc.
|
||||
if re.search(r"(.)\1{3,}", stripped, re.IGNORECASE):
|
||||
return "Reason contains repeated characters. Write a genuine justification."
|
||||
|
||||
# Reject simple two-character alternating patterns: ababab, asasas, etc.
|
||||
if re.search(r"(..)(\1){3,}", stripped, re.IGNORECASE):
|
||||
return "Reason contains repetitive patterns. Write a genuine justification."
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Immutability helpers
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _try_set_immutable(path: Path, *, immutable: bool) -> None:
|
||||
"""Silently attempt to set or clear the immutable flag on *path*.
|
||||
|
||||
This is a best-effort operation — it fails silently if chattr is not
|
||||
available, the process lacks the required capability, or the filesystem
|
||||
does not support the flag.
|
||||
|
||||
Args:
|
||||
path: File to modify.
|
||||
immutable: True to set +i, False to clear -i.
|
||||
"""
|
||||
if not path.exists():
|
||||
return
|
||||
chattr = shutil.which("chattr")
|
||||
if chattr is None:
|
||||
return
|
||||
flag = "+i" if immutable else "-i"
|
||||
with contextlib.suppress(OSError, subprocess.TimeoutExpired):
|
||||
subprocess.run(
|
||||
[chattr, flag, str(path)],
|
||||
capture_output=True,
|
||||
check=False,
|
||||
timeout=5,
|
||||
)
|
||||
|
||||
|
||||
def lock_enforcement_files(config_file: Path) -> None:
|
||||
"""Apply chattr +i to enforcement-critical config files.
|
||||
|
||||
Called at the end of each enforce-loop iteration. Requires that the
|
||||
daemon is running as root (or has CAP_LINUX_IMMUTABLE).
|
||||
|
||||
Args:
|
||||
config_file: Path to the main config.json.
|
||||
"""
|
||||
_try_set_immutable(config_file, immutable=True)
|
||||
_try_set_immutable(APPROVED_EXCEPTIONS_FILE, immutable=True)
|
||||
|
||||
|
||||
def unlock_for_write(path: Path) -> None:
|
||||
"""Clear the immutable flag before writing *path*.
|
||||
|
||||
Args:
|
||||
path: File to unlock.
|
||||
"""
|
||||
_try_set_immutable(path, immutable=False)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Persistence helpers
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _load_pending() -> list[dict[str, object]]:
|
||||
"""Load pending exception entries from disk."""
|
||||
if not PENDING_EXCEPTIONS_FILE.exists():
|
||||
return []
|
||||
try:
|
||||
data: object = json.loads(PENDING_EXCEPTIONS_FILE.read_text(encoding="utf-8"))
|
||||
if isinstance(data, list):
|
||||
return cast("list[dict[str, object]]", data)
|
||||
except (json.JSONDecodeError, OSError, ValueError):
|
||||
pass
|
||||
return []
|
||||
|
||||
|
||||
def _save_pending(entries: list[dict[str, object]]) -> None:
|
||||
"""Persist pending exception entries to disk."""
|
||||
_atomic_write(PENDING_EXCEPTIONS_FILE, json.dumps(entries, indent=2) + "\n")
|
||||
|
||||
|
||||
def _load_approved() -> list[dict[str, object]]:
|
||||
"""Load approved exception entries from disk."""
|
||||
if not APPROVED_EXCEPTIONS_FILE.exists():
|
||||
return []
|
||||
try:
|
||||
data: object = json.loads(APPROVED_EXCEPTIONS_FILE.read_text(encoding="utf-8"))
|
||||
if isinstance(data, list):
|
||||
return cast("list[dict[str, object]]", data)
|
||||
except (json.JSONDecodeError, OSError, ValueError):
|
||||
pass
|
||||
return []
|
||||
|
||||
|
||||
def _save_approved(entries: list[dict[str, object]]) -> None:
|
||||
"""Persist approved exception entries to disk."""
|
||||
unlock_for_write(APPROVED_EXCEPTIONS_FILE)
|
||||
_atomic_write(APPROVED_EXCEPTIONS_FILE, json.dumps(entries, indent=2) + "\n")
|
||||
_try_set_immutable(APPROVED_EXCEPTIONS_FILE, immutable=True)
|
||||
|
||||
|
||||
def _append_audit_log(app_id: int, reason: str, event: str) -> None:
|
||||
"""Append one line to the append-only audit log.
|
||||
|
||||
Each line has the format::
|
||||
|
||||
ISO-TIMESTAMP | EVENT | app_id=NNN | reason='...'
|
||||
|
||||
Args:
|
||||
app_id: Steam application ID involved.
|
||||
reason: Justification text supplied by the user.
|
||||
event: Short event label such as ``REQUESTED`` or ``APPROVED``.
|
||||
"""
|
||||
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
||||
line = f"{timestamp} | {event} | app_id={app_id} | reason={reason!r}\n"
|
||||
EXCEPTION_AUDIT_LOG.parent.mkdir(parents=True, exist_ok=True)
|
||||
with EXCEPTION_AUDIT_LOG.open("a", encoding="utf-8") as fh:
|
||||
fh.write(line)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Public API
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def add_pending_exception(app_id: int, reason: str) -> str:
|
||||
"""Request a new whitelist exception for *app_id*.
|
||||
|
||||
The entry becomes active only after ``WHITELIST_COOLDOWN_SECONDS`` have
|
||||
elapsed (24 h by default). Returns a human-readable status message.
|
||||
|
||||
Args:
|
||||
app_id: Steam application ID to add.
|
||||
reason: Validated justification text (must pass :func:`validate_reason`).
|
||||
|
||||
Returns:
|
||||
Human-readable confirmation or remaining-cooldown message.
|
||||
|
||||
Raises:
|
||||
ValueError: If the reason fails validation or the ID is already approved.
|
||||
"""
|
||||
err = validate_reason(reason)
|
||||
if err is not None:
|
||||
raise ValueError(err)
|
||||
|
||||
approved = _load_approved()
|
||||
if any(int(e["app_id"]) == app_id for e in approved):
|
||||
msg = f"AppID {app_id} is already in the approved exceptions list."
|
||||
raise ValueError(msg)
|
||||
|
||||
pending = _load_pending()
|
||||
for entry in pending:
|
||||
if int(entry["app_id"]) == app_id:
|
||||
elapsed = time.time() - float(entry["requested_at"])
|
||||
remaining = WHITELIST_COOLDOWN_SECONDS - elapsed
|
||||
if remaining > 0:
|
||||
hours = int(remaining // 3600)
|
||||
mins = int((remaining % 3600) // 60)
|
||||
return (
|
||||
f"AppID {app_id} is already pending; approves in {hours}h {mins}m."
|
||||
)
|
||||
# Cooldown already elapsed for this pending entry — promote now.
|
||||
break
|
||||
|
||||
entry_new: dict[str, object] = {
|
||||
"app_id": app_id,
|
||||
"reason": reason,
|
||||
"requested_at": time.time(),
|
||||
}
|
||||
pending.append(entry_new)
|
||||
_save_pending(pending)
|
||||
_append_audit_log(app_id, reason, "REQUESTED")
|
||||
|
||||
hours = WHITELIST_COOLDOWN_SECONDS // 3600
|
||||
return (
|
||||
f"Exception requested for AppID {app_id}. "
|
||||
f"Will become active in {hours}h. Reason logged."
|
||||
)
|
||||
|
||||
|
||||
def promote_pending_exceptions() -> list[int]:
|
||||
"""Move cooldown-elapsed pending entries to the approved list.
|
||||
|
||||
Called by the enforce daemon on each loop iteration. Returns the list
|
||||
of app IDs that were promoted this call.
|
||||
|
||||
Returns:
|
||||
List of newly approved app IDs (may be empty).
|
||||
"""
|
||||
pending = _load_pending()
|
||||
now = time.time()
|
||||
still_pending: list[dict[str, object]] = []
|
||||
newly_approved: list[int] = []
|
||||
|
||||
for entry in pending:
|
||||
elapsed = now - float(entry["requested_at"])
|
||||
if elapsed >= WHITELIST_COOLDOWN_SECONDS:
|
||||
app_id = int(entry["app_id"])
|
||||
approved = _load_approved()
|
||||
if not any(int(e["app_id"]) == app_id for e in approved):
|
||||
approved.append(
|
||||
{
|
||||
"app_id": app_id,
|
||||
"reason": entry["reason"],
|
||||
"approved_at": now,
|
||||
}
|
||||
)
|
||||
_save_approved(approved)
|
||||
_append_audit_log(app_id, str(entry["reason"]), "APPROVED")
|
||||
newly_approved.append(app_id)
|
||||
else:
|
||||
still_pending.append(entry)
|
||||
|
||||
if len(still_pending) != len(pending):
|
||||
_save_pending(still_pending)
|
||||
|
||||
return newly_approved
|
||||
|
||||
|
||||
def get_approved_exception_ids() -> frozenset[int]:
|
||||
"""Return the frozenset of currently approved exception app IDs.
|
||||
|
||||
Does NOT trigger promotion — call :func:`promote_pending_exceptions`
|
||||
explicitly when timely promotion is required (e.g. the enforce loop).
|
||||
|
||||
Returns:
|
||||
Frozenset of approved app IDs.
|
||||
"""
|
||||
approved = _load_approved()
|
||||
return frozenset(int(e["app_id"]) for e in approved)
|
||||
|
||||
|
||||
def list_pending_exceptions() -> list[dict[str, object]]:
|
||||
"""Return a copy of the current pending exception list.
|
||||
|
||||
Returns:
|
||||
List of pending exception dicts with keys app_id, reason, requested_at.
|
||||
"""
|
||||
return list(_load_pending())
|
||||
@ -55,7 +55,6 @@ class Config:
|
||||
|
||||
steam_api_key: str = ""
|
||||
steam_id: str = ""
|
||||
skip_app_ids: list[int] = field(default_factory=list)
|
||||
block_store: bool = True
|
||||
kill_unauthorized_games: bool = True
|
||||
uninstall_other_games: bool = True
|
||||
|
||||
@ -9,7 +9,9 @@ import shutil
|
||||
import signal
|
||||
import subprocess
|
||||
|
||||
from python_pkg.steam_backlog_enforcer.game_install import PROTECTED_APP_IDS
|
||||
from python_pkg.steam_backlog_enforcer.game_install import (
|
||||
is_protected_app,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -60,7 +62,7 @@ def enforce_allowed_game(
|
||||
# Skip Steam client itself (app_id 0 or very low IDs).
|
||||
if app_id == 0:
|
||||
continue
|
||||
if app_id in PROTECTED_APP_IDS:
|
||||
if is_protected_app(app_id):
|
||||
continue
|
||||
|
||||
violations.append((pid, app_id))
|
||||
|
||||
@ -13,6 +13,8 @@ import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
from python_pkg.steam_backlog_enforcer._whitelist import get_approved_exception_ids
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Real Steam directory — used as a safety check to block destructive
|
||||
@ -80,20 +82,6 @@ PROTECTED_APP_IDS = {
|
||||
1493710, # Proton Experimental
|
||||
1161040, # Proton BattlEye Runtime
|
||||
1007020, # Proton EasyAntiCheat Runtime
|
||||
# Games allowed to be installed anytime
|
||||
3949040, # RV There Yet?
|
||||
2252570,
|
||||
220200,
|
||||
3527290, # Peak
|
||||
1331550,
|
||||
8930,
|
||||
1158310,
|
||||
440,
|
||||
1142710,
|
||||
1410710,
|
||||
10500,
|
||||
813780,
|
||||
489830,
|
||||
}
|
||||
|
||||
STEAMAPPS_PATH = Path("~/.local/share/Steam/steamapps").expanduser()
|
||||
@ -417,7 +405,7 @@ def uninstall_other_games(allowed_app_id: int | None) -> int:
|
||||
if app_id == allowed_app_id:
|
||||
logger.info("KEEPING assigned game: %s (AppID=%d)", name, app_id)
|
||||
continue
|
||||
if app_id in PROTECTED_APP_IDS:
|
||||
if is_protected_app(app_id):
|
||||
logger.debug("Skipping protected: %s (AppID=%d)", name, app_id)
|
||||
continue
|
||||
|
||||
@ -426,3 +414,18 @@ def uninstall_other_games(allowed_app_id: int | None) -> int:
|
||||
count += 1
|
||||
|
||||
return count
|
||||
|
||||
|
||||
def is_protected_app(app_id: int) -> bool:
|
||||
"""Return True if *app_id* must never be uninstalled.
|
||||
|
||||
Combines the hardcoded Steam infrastructure set with any app IDs that
|
||||
have been approved via the time-locked exception mechanism.
|
||||
|
||||
Args:
|
||||
app_id: Steam application ID to check.
|
||||
|
||||
Returns:
|
||||
True if the app should be left alone by the enforcer.
|
||||
"""
|
||||
return app_id in PROTECTED_APP_IDS or app_id in get_approved_exception_ids()
|
||||
|
||||
@ -4,12 +4,20 @@ from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from python_pkg.steam_backlog_enforcer._cmd_done import cmd_done
|
||||
from python_pkg.steam_backlog_enforcer._enforce_loop import (
|
||||
do_enforce,
|
||||
get_all_owned_app_ids,
|
||||
)
|
||||
from python_pkg.steam_backlog_enforcer._whitelist import (
|
||||
WHITELIST_COOLDOWN_SECONDS,
|
||||
add_pending_exception,
|
||||
list_pending_exceptions,
|
||||
validate_reason,
|
||||
)
|
||||
from python_pkg.steam_backlog_enforcer.config import (
|
||||
Config,
|
||||
State,
|
||||
@ -17,11 +25,11 @@ from python_pkg.steam_backlog_enforcer.config import (
|
||||
load_snapshot,
|
||||
)
|
||||
from python_pkg.steam_backlog_enforcer.game_install import (
|
||||
PROTECTED_APP_IDS,
|
||||
_echo,
|
||||
get_installed_games,
|
||||
install_game,
|
||||
is_game_installed,
|
||||
is_protected_app,
|
||||
uninstall_other_games,
|
||||
)
|
||||
from python_pkg.steam_backlog_enforcer.library_hider import (
|
||||
@ -40,6 +48,9 @@ from python_pkg.steam_backlog_enforcer.store_blocker import (
|
||||
unblock_store,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Callable
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
@ -72,7 +83,7 @@ def cmd_status(_config: Config, state: State) -> None:
|
||||
|
||||
# Show installed games.
|
||||
installed = get_installed_games()
|
||||
real_games = [(aid, n) for aid, n in installed if aid not in PROTECTED_APP_IDS]
|
||||
real_games = [(aid, n) for aid, n in installed if not is_protected_app(aid)]
|
||||
_echo(f"Installed games: {len(real_games)}")
|
||||
|
||||
if state.current_app_id:
|
||||
@ -174,7 +185,7 @@ def cmd_installed(_config: Config, state: State) -> None:
|
||||
installed = get_installed_games()
|
||||
_echo(f"\nInstalled games ({len(installed)}):\n")
|
||||
for app_id, name in installed:
|
||||
protected = " [PROTECTED]" if app_id in PROTECTED_APP_IDS else ""
|
||||
protected = " [PROTECTED]" if is_protected_app(app_id) else ""
|
||||
assigned = " <<< ASSIGNED" if app_id == state.current_app_id else ""
|
||||
_echo(f" {app_id:>8d} {name}{protected}{assigned}")
|
||||
|
||||
@ -189,7 +200,7 @@ def cmd_uninstall(_config: Config, state: State) -> None:
|
||||
to_remove = [
|
||||
(aid, n)
|
||||
for aid, n in installed
|
||||
if aid != state.current_app_id and aid not in PROTECTED_APP_IDS
|
||||
if aid != state.current_app_id and not is_protected_app(aid)
|
||||
]
|
||||
|
||||
if not to_remove:
|
||||
@ -218,6 +229,76 @@ def cmd_setup(_config: Config, _state: State) -> None:
|
||||
interactive_setup()
|
||||
|
||||
|
||||
_MIN_ADD_EXCEPTION_ARGS = 3
|
||||
_ADD_EXCEPTION_USAGE = (
|
||||
'Usage: add-exception <app_id> --reason "<justification>"\n'
|
||||
" app_id : numeric Steam application ID\n"
|
||||
" --reason : genuine justification (>= 5 words)\n\n"
|
||||
"Example:\n"
|
||||
" add-exception 440 --reason "
|
||||
'"TF2 is needed for a community event this weekend"\n\n'
|
||||
f"Exceptions become active after a {WHITELIST_COOLDOWN_SECONDS // 3600}h "
|
||||
"cooldown."
|
||||
)
|
||||
|
||||
|
||||
def cmd_add_exception(args: list[str]) -> None:
|
||||
"""Request a time-locked whitelist exception.
|
||||
|
||||
Usage: add-exception <app_id> --reason "<text>"
|
||||
|
||||
The exception becomes active after a 24-hour cooldown. The reason must be
|
||||
a genuine justification of at least 5 words with sufficient entropy.
|
||||
|
||||
Args:
|
||||
args: CLI argument list after the command name.
|
||||
"""
|
||||
if len(args) < _MIN_ADD_EXCEPTION_ARGS or "--reason" not in args:
|
||||
_echo(_ADD_EXCEPTION_USAGE)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
app_id = int(args[0])
|
||||
except ValueError:
|
||||
_echo(f"Error: app_id must be a number, got '{args[0]}'.")
|
||||
sys.exit(1)
|
||||
|
||||
reason_idx = args.index("--reason")
|
||||
reason_parts = args[reason_idx + 1 :]
|
||||
if not reason_parts:
|
||||
_echo("Error: --reason requires a value.")
|
||||
sys.exit(1)
|
||||
reason = " ".join(reason_parts)
|
||||
|
||||
# Show validation feedback before attempting to add.
|
||||
err = validate_reason(reason)
|
||||
if err is not None:
|
||||
_echo(f"Invalid reason: {err}")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
msg = add_pending_exception(app_id, reason)
|
||||
except ValueError as exc:
|
||||
_echo(f"Error: {exc}")
|
||||
sys.exit(1)
|
||||
|
||||
_echo(msg)
|
||||
|
||||
# Show current pending list.
|
||||
pending = list_pending_exceptions()
|
||||
if pending:
|
||||
_echo(f"\nPending exceptions ({len(pending)}):")
|
||||
now = time.time()
|
||||
for entry in pending:
|
||||
aid = int(entry["app_id"])
|
||||
elapsed = now - float(entry["requested_at"])
|
||||
remaining = max(0.0, WHITELIST_COOLDOWN_SECONDS - elapsed)
|
||||
hrs = int(remaining // 3600)
|
||||
mins = int((remaining % 3600) // 60)
|
||||
status = "ready" if remaining == 0.0 else f"approves in {hrs}h {mins}m"
|
||||
_echo(f" AppID={aid} [{status}]")
|
||||
|
||||
|
||||
def cmd_install(config: Config, state: State) -> None:
|
||||
"""Manually trigger install of the assigned game."""
|
||||
if state.current_app_id is None:
|
||||
@ -274,7 +355,7 @@ def cmd_unhide(config: Config, _state: State) -> None:
|
||||
_echo("Done!")
|
||||
|
||||
|
||||
COMMANDS = {
|
||||
COMMANDS: dict[str, tuple[str, Callable[[Config, State], object]]] = {
|
||||
"scan": ("Scan library & assign a game", do_scan),
|
||||
"check": ("Check assigned game completion", do_check),
|
||||
"status": ("Show current status", cmd_status),
|
||||
@ -292,18 +373,33 @@ COMMANDS = {
|
||||
"done": ("Finish game, open HLTB, pick next", cmd_done),
|
||||
}
|
||||
|
||||
# Extra commands with non-standard arg handling (shown in help but not in COMMANDS).
|
||||
_EXTRA_COMMAND_DESCRIPTIONS: dict[str, str] = {
|
||||
"add-exception": "Request 24h-locked whitelist exception (use --reason)",
|
||||
}
|
||||
|
||||
_ALL_COMMANDS: dict[str, str] = {
|
||||
name: desc for name, (desc, _) in COMMANDS.items()
|
||||
} | _EXTRA_COMMAND_DESCRIPTIONS
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""CLI entry point."""
|
||||
if len(sys.argv) < _MIN_CLI_ARGS or sys.argv[1] not in COMMANDS:
|
||||
if len(sys.argv) < _MIN_CLI_ARGS or sys.argv[1] not in _ALL_COMMANDS:
|
||||
_echo("Steam Backlog Enforcer\n")
|
||||
_echo("Usage: python -m python_pkg.steam_backlog_enforcer.main <command>\n")
|
||||
_echo("Commands:")
|
||||
for name, (desc, _) in COMMANDS.items():
|
||||
_echo(f" {name:<12s} {desc}")
|
||||
for name, desc in _ALL_COMMANDS.items():
|
||||
_echo(f" {name:<14s} {desc}")
|
||||
sys.exit(1)
|
||||
|
||||
command = sys.argv[1]
|
||||
|
||||
# add-exception has its own argument structure; handle before config load.
|
||||
if command == "add-exception":
|
||||
cmd_add_exception(sys.argv[2:])
|
||||
return
|
||||
|
||||
config = Config.load()
|
||||
|
||||
if command != "setup" and not config.steam_api_key:
|
||||
|
||||
@ -65,7 +65,6 @@ def do_scan(config: Config, state: State) -> list[GameInfo]:
|
||||
|
||||
_echo("Scanning Steam library...")
|
||||
games = client.build_game_list(
|
||||
skip_app_ids=config.skip_app_ids,
|
||||
progress_callback=progress,
|
||||
)
|
||||
elapsed = time.time() - start
|
||||
@ -169,7 +168,7 @@ def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None:
|
||||
Games with silver-or-worse ProtonDB ratings (or gold trending
|
||||
downward) are automatically skipped as unplayable on Linux.
|
||||
"""
|
||||
skip = set(config.skip_app_ids) | set(state.finished_app_ids)
|
||||
skip = set(state.finished_app_ids)
|
||||
candidates = [g for g in games if not g.is_complete and g.app_id not in skip]
|
||||
|
||||
if not candidates:
|
||||
|
||||
@ -207,13 +207,9 @@ class SteamAPIClient:
|
||||
for a in raw
|
||||
]
|
||||
|
||||
def _fetch_one_game(
|
||||
self, game_dict: dict[str, Any], skip: set[int]
|
||||
) -> GameInfo | None:
|
||||
def _fetch_one_game(self, game_dict: dict[str, Any]) -> GameInfo | None:
|
||||
"""Fetch achievement data for one game. Thread-safe."""
|
||||
app_id = game_dict["appid"]
|
||||
if app_id in skip:
|
||||
return None
|
||||
|
||||
achievements = self.get_achievement_details(app_id)
|
||||
if not achievements:
|
||||
@ -234,11 +230,9 @@ class SteamAPIClient:
|
||||
|
||||
def build_game_list(
|
||||
self,
|
||||
skip_app_ids: list[int] | None = None,
|
||||
progress_callback: Callable[[int, int], None] | None = None,
|
||||
) -> list[GameInfo]:
|
||||
"""Build full game list with achievement data (parallel)."""
|
||||
skip = set(skip_app_ids or [])
|
||||
owned = self.get_owned_games()
|
||||
games: list[GameInfo] = []
|
||||
done_count = 0
|
||||
@ -246,7 +240,7 @@ class SteamAPIClient:
|
||||
lock = threading.Lock()
|
||||
|
||||
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
|
||||
futures = {pool.submit(self._fetch_one_game, g, skip): g for g in owned}
|
||||
futures = {pool.submit(self._fetch_one_game, g): g for g in owned}
|
||||
for future in as_completed(futures):
|
||||
try:
|
||||
result = future.result()
|
||||
|
||||
@ -74,6 +74,25 @@ def _isolate_filesystem(tmp_path: Path) -> Iterator[None]:
|
||||
"python_pkg.steam_backlog_enforcer.config.HOSTS_FILE",
|
||||
fake_hosts,
|
||||
),
|
||||
# Whitelist exception files (_whitelist module-level constants)
|
||||
patch(
|
||||
"python_pkg.steam_backlog_enforcer._whitelist.PENDING_EXCEPTIONS_FILE",
|
||||
fake_config / "pending_exceptions.json",
|
||||
),
|
||||
patch(
|
||||
"python_pkg.steam_backlog_enforcer._whitelist.APPROVED_EXCEPTIONS_FILE",
|
||||
fake_config / "approved_exceptions.json",
|
||||
),
|
||||
patch(
|
||||
"python_pkg.steam_backlog_enforcer._whitelist.EXCEPTION_AUDIT_LOG",
|
||||
fake_config / "exception_audit.log",
|
||||
),
|
||||
# _enforce_loop imports CONFIG_FILE directly; patch the local binding so
|
||||
# lock_enforcement_files() uses the tmp path instead of the real one.
|
||||
patch(
|
||||
"python_pkg.steam_backlog_enforcer._enforce_loop.CONFIG_FILE",
|
||||
fake_config / "config.json",
|
||||
),
|
||||
):
|
||||
yield
|
||||
|
||||
|
||||
@ -71,7 +71,6 @@ class TestConfig:
|
||||
cfg = Config()
|
||||
assert cfg.steam_api_key == ""
|
||||
assert cfg.steam_id == ""
|
||||
assert cfg.skip_app_ids == []
|
||||
assert cfg.block_store is True
|
||||
assert cfg.kill_unauthorized_games is True
|
||||
assert cfg.uninstall_other_games is True
|
||||
|
||||
@ -240,7 +240,7 @@ class TestGuardInstalledGames:
|
||||
f"{PKG}.get_installed_games",
|
||||
return_value=[(228980, "Runtime")],
|
||||
),
|
||||
patch(f"{PKG}.PROTECTED_APP_IDS", {228980}),
|
||||
patch(f"{PKG}.is_protected_app", side_effect=lambda aid: aid == 228980),
|
||||
):
|
||||
assert _guard_installed_games(440) == 0
|
||||
|
||||
@ -462,6 +462,22 @@ class TestEnforceLoopIteration:
|
||||
mock_guard.assert_not_called()
|
||||
mock_installed.assert_not_called()
|
||||
|
||||
def test_promotes_newly_approved_exceptions(self) -> None:
|
||||
"""Loop body at line 286 executes when promote returns non-empty list."""
|
||||
config = Config(
|
||||
kill_unauthorized_games=False,
|
||||
uninstall_other_games=False,
|
||||
)
|
||||
state = State(current_app_id=1, current_game_name="G")
|
||||
with (
|
||||
patch(f"{PKG}.is_game_installed", return_value=True),
|
||||
patch(
|
||||
f"{PKG}.promote_pending_exceptions",
|
||||
return_value=[440],
|
||||
),
|
||||
):
|
||||
_enforce_loop_iteration(config, state)
|
||||
|
||||
|
||||
class TestDoEnforce:
|
||||
"""Tests for do_enforce."""
|
||||
|
||||
@ -141,8 +141,8 @@ class TestEnforceAllowedGame:
|
||||
return_value={100: 1331550, 200: 440},
|
||||
),
|
||||
patch(
|
||||
"python_pkg.steam_backlog_enforcer.enforcer.PROTECTED_APP_IDS",
|
||||
{1331550},
|
||||
"python_pkg.steam_backlog_enforcer.enforcer.is_protected_app",
|
||||
side_effect=lambda aid: aid == 1331550,
|
||||
),
|
||||
patch(
|
||||
"python_pkg.steam_backlog_enforcer.enforcer.kill_process"
|
||||
|
||||
@ -2,11 +2,17 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import time
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from python_pkg.steam_backlog_enforcer._whitelist import WHITELIST_COOLDOWN_SECONDS
|
||||
from python_pkg.steam_backlog_enforcer.config import Config, State
|
||||
from python_pkg.steam_backlog_enforcer.main import (
|
||||
cmd_add_exception,
|
||||
cmd_buy_dlc,
|
||||
cmd_hide,
|
||||
cmd_install,
|
||||
@ -18,6 +24,7 @@ from python_pkg.steam_backlog_enforcer.main import (
|
||||
cmd_unblock,
|
||||
cmd_unhide,
|
||||
cmd_uninstall,
|
||||
main,
|
||||
)
|
||||
|
||||
PKG = "python_pkg.steam_backlog_enforcer.main"
|
||||
@ -225,7 +232,7 @@ class TestCmdInstalled:
|
||||
f"{PKG}.get_installed_games",
|
||||
return_value=[(440, "TF2"), (228980, "RT")],
|
||||
),
|
||||
patch(f"{PKG}.PROTECTED_APP_IDS", {228980}),
|
||||
patch(f"{PKG}.is_protected_app", side_effect=lambda aid: aid == 228980),
|
||||
patch(f"{PKG}._echo"),
|
||||
):
|
||||
cmd_installed(Config(), State(current_app_id=440))
|
||||
@ -377,3 +384,113 @@ class TestCmdUnhide:
|
||||
patch(f"{PKG}._echo"),
|
||||
):
|
||||
cmd_unhide(Config(), State())
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# cmd_add_exception
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
_VALID_REASON = "I need this game installed for a work presentation this week."
|
||||
|
||||
|
||||
class TestCmdAddException:
|
||||
def test_no_args_prints_usage_and_exits(self) -> None:
|
||||
with (
|
||||
patch(f"{PKG}._echo"),
|
||||
pytest.raises(SystemExit, match="1"),
|
||||
):
|
||||
cmd_add_exception([])
|
||||
|
||||
def test_missing_reason_flag_exits(self) -> None:
|
||||
with (
|
||||
patch(f"{PKG}._echo"),
|
||||
pytest.raises(SystemExit, match="1"),
|
||||
):
|
||||
cmd_add_exception(["440", "no", "flag"])
|
||||
|
||||
def test_non_numeric_app_id_exits(self) -> None:
|
||||
with (
|
||||
patch(f"{PKG}._echo"),
|
||||
pytest.raises(SystemExit, match="1"),
|
||||
):
|
||||
cmd_add_exception(["notanumber", "--reason", _VALID_REASON])
|
||||
|
||||
def test_reason_flag_with_no_value_exits(self) -> None:
|
||||
with (
|
||||
patch(f"{PKG}._echo"),
|
||||
pytest.raises(SystemExit, match="1"),
|
||||
):
|
||||
cmd_add_exception(["440", "--reason"])
|
||||
|
||||
def test_reason_flag_last_position_with_no_value_exits(self) -> None:
|
||||
# 3 args passes the len/flag guard but --reason is last so reason_parts=[]
|
||||
with (
|
||||
patch(f"{PKG}._echo"),
|
||||
pytest.raises(SystemExit, match="1"),
|
||||
):
|
||||
cmd_add_exception(["440", "extra", "--reason"])
|
||||
|
||||
def test_invalid_reason_exits(self) -> None:
|
||||
with (
|
||||
patch(f"{PKG}._echo"),
|
||||
pytest.raises(SystemExit, match="1"),
|
||||
):
|
||||
cmd_add_exception(["440", "--reason", "too short"])
|
||||
|
||||
def test_add_pending_exception_raises_value_error(self) -> None:
|
||||
with (
|
||||
patch(f"{PKG}._echo"),
|
||||
patch(
|
||||
f"{PKG}.add_pending_exception",
|
||||
side_effect=ValueError("already approved"),
|
||||
),
|
||||
pytest.raises(SystemExit, match="1"),
|
||||
):
|
||||
cmd_add_exception(["440", "--reason", _VALID_REASON])
|
||||
|
||||
def test_happy_path_no_pending(self) -> None:
|
||||
with (
|
||||
patch(f"{PKG}._echo") as mock_echo,
|
||||
patch(
|
||||
f"{PKG}.add_pending_exception",
|
||||
return_value="Exception requested for AppID 440.",
|
||||
),
|
||||
patch(f"{PKG}.list_pending_exceptions", return_value=[]),
|
||||
):
|
||||
cmd_add_exception(["440", "--reason", _VALID_REASON])
|
||||
mock_echo.assert_called()
|
||||
|
||||
def test_happy_path_with_pending_list(self) -> None:
|
||||
now = time.time()
|
||||
pending = [
|
||||
{"app_id": 440, "requested_at": now - WHITELIST_COOLDOWN_SECONDS - 1},
|
||||
{"app_id": 730, "requested_at": now},
|
||||
]
|
||||
with (
|
||||
patch(f"{PKG}._echo") as mock_echo,
|
||||
patch(
|
||||
f"{PKG}.add_pending_exception",
|
||||
return_value="Exception requested for AppID 440.",
|
||||
),
|
||||
patch(f"{PKG}.list_pending_exceptions", return_value=pending),
|
||||
):
|
||||
cmd_add_exception(["440", "--reason", _VALID_REASON])
|
||||
# At least the "Pending exceptions" line should be echoed
|
||||
calls = [str(c) for c in mock_echo.call_args_list]
|
||||
assert any("Pending" in s for s in calls)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# main() dispatch to add-exception
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMainDispatchAddException:
|
||||
def test_dispatches_add_exception(self) -> None:
|
||||
argv = ["prog", "add-exception", "440", "--reason", _VALID_REASON]
|
||||
with (
|
||||
patch.object(sys, "argv", argv),
|
||||
patch(f"{PKG}.cmd_add_exception") as mock_cmd,
|
||||
):
|
||||
main()
|
||||
mock_cmd.assert_called_once_with(["440", "--reason", _VALID_REASON])
|
||||
|
||||
@ -50,7 +50,6 @@ class TestDoScan:
|
||||
mock_client = MagicMock()
|
||||
|
||||
def build_game_list(
|
||||
skip_app_ids: object = None,
|
||||
progress_callback: Callable[..., object] | None = None,
|
||||
) -> list[GameInfo]:
|
||||
# Trigger progress callback to cover those lines.
|
||||
@ -93,7 +92,6 @@ class TestDoScan:
|
||||
mock_client = MagicMock()
|
||||
|
||||
def build_game_list(
|
||||
skip_app_ids: object = None,
|
||||
progress_callback: Callable[..., object] | None = None,
|
||||
) -> list[GameInfo]:
|
||||
if progress_callback:
|
||||
|
||||
@ -257,20 +257,14 @@ class TestSteamAPIClient:
|
||||
with patch.object(client, "get_achievement_details", return_value=[ach]):
|
||||
result = client._fetch_one_game(
|
||||
{"appid": 440, "name": "TF2", "playtime_forever": 60},
|
||||
set(),
|
||||
)
|
||||
assert result is not None
|
||||
assert result.app_id == 440
|
||||
|
||||
def test_fetch_one_game_skipped(self) -> None:
|
||||
client = SteamAPIClient("key", "id")
|
||||
result = client._fetch_one_game({"appid": 440}, {440})
|
||||
assert result is None
|
||||
|
||||
def test_fetch_one_game_no_achievements(self) -> None:
|
||||
client = SteamAPIClient("key", "id")
|
||||
with patch.object(client, "get_achievement_details", return_value=[]):
|
||||
result = client._fetch_one_game({"appid": 440}, set())
|
||||
result = client._fetch_one_game({"appid": 440})
|
||||
assert result is None
|
||||
|
||||
def test_build_game_list(self) -> None:
|
||||
@ -293,14 +287,18 @@ class TestSteamAPIClient:
|
||||
assert len(games) == 1
|
||||
assert len(progress_calls) > 0
|
||||
|
||||
def test_build_game_list_with_skip(self) -> None:
|
||||
def test_build_game_list_no_achievements_excluded(self) -> None:
|
||||
"""Games without achievements are excluded from results."""
|
||||
client = SteamAPIClient("key", "id")
|
||||
with patch.object(
|
||||
with (
|
||||
patch.object(
|
||||
client,
|
||||
"get_owned_games",
|
||||
return_value=[{"appid": 440, "name": "TF2"}],
|
||||
),
|
||||
patch.object(client, "get_achievement_details", return_value=[]),
|
||||
):
|
||||
games = client.build_game_list(skip_app_ids=[440])
|
||||
games = client.build_game_list()
|
||||
assert games == []
|
||||
|
||||
def test_build_game_list_exception_in_future(self) -> None:
|
||||
|
||||
496
steam_backlog_enforcer/tests/test_whitelist.py
Normal file
496
steam_backlog_enforcer/tests/test_whitelist.py
Normal file
@ -0,0 +1,496 @@
|
||||
"""Tests for _whitelist.py: time-locked exceptions, reason validation, chattr."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from python_pkg.steam_backlog_enforcer._whitelist import (
|
||||
WHITELIST_COOLDOWN_SECONDS,
|
||||
_append_audit_log,
|
||||
_load_approved,
|
||||
_load_pending,
|
||||
_save_approved,
|
||||
_save_pending,
|
||||
_shannon_entropy,
|
||||
_try_set_immutable,
|
||||
add_pending_exception,
|
||||
get_approved_exception_ids,
|
||||
list_pending_exceptions,
|
||||
lock_enforcement_files,
|
||||
promote_pending_exceptions,
|
||||
unlock_for_write,
|
||||
validate_reason,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Helpers
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
_VALID_REASON = "I need this game installed for a work presentation this week."
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Shannon entropy
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestShannonEntropy:
|
||||
def test_empty_string(self) -> None:
|
||||
assert _shannon_entropy("") == 0.0
|
||||
|
||||
def test_all_whitespace(self) -> None:
|
||||
assert _shannon_entropy(" ") == 0.0
|
||||
|
||||
def test_single_char(self) -> None:
|
||||
# one unique char → entropy = 0
|
||||
assert _shannon_entropy("aaaa") == 0.0
|
||||
|
||||
def test_high_entropy(self) -> None:
|
||||
# natural English sentence has decent entropy
|
||||
assert _shannon_entropy("The quick brown fox jumps") > 3.0
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# validate_reason
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestValidateReason:
|
||||
def test_valid_reason_returns_none(self) -> None:
|
||||
assert validate_reason(_VALID_REASON) is None
|
||||
|
||||
def test_too_short(self) -> None:
|
||||
err = validate_reason("short")
|
||||
assert err is not None
|
||||
assert "too short" in err
|
||||
|
||||
def test_too_few_words(self) -> None:
|
||||
# 25+ chars but only 4 words
|
||||
err = validate_reason("word1 word2 word3 word4xxx")
|
||||
assert err is not None
|
||||
assert "words" in err
|
||||
|
||||
def test_low_entropy_rejected(self) -> None:
|
||||
# repeating 'ab' has low entropy
|
||||
err = validate_reason("ababababababababababababababab")
|
||||
assert err is not None
|
||||
# could be caught by entropy or alternating-pattern check
|
||||
assert err is not None
|
||||
|
||||
def test_char_run_rejected(self) -> None:
|
||||
err = validate_reason("I neeeeed this game to play it")
|
||||
assert err is not None
|
||||
assert "repeated characters" in err
|
||||
|
||||
def test_alternating_pattern_rejected(self) -> None:
|
||||
# "ababababab..." repeated many times
|
||||
err = validate_reason("abababababababababababababababababababababab")
|
||||
assert err is not None
|
||||
assert "repetitive" in err or "random" in err or err is not None
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# chattr helpers
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestTrySetImmutable:
|
||||
def test_file_does_not_exist(self, tmp_path: Path) -> None:
|
||||
# Should silently do nothing when the file doesn't exist
|
||||
_try_set_immutable(tmp_path / "nonexistent.txt", immutable=True)
|
||||
|
||||
def test_chattr_not_available(self, tmp_path: Path) -> None:
|
||||
target = tmp_path / "file.txt"
|
||||
target.write_text("data", encoding="utf-8")
|
||||
with patch("shutil.which", return_value=None):
|
||||
_try_set_immutable(target, immutable=True) # no-op, no crash
|
||||
|
||||
def test_chattr_called_set(self, tmp_path: Path) -> None:
|
||||
target = tmp_path / "file.txt"
|
||||
target.write_text("data", encoding="utf-8")
|
||||
fake_chattr = tmp_path / "chattr"
|
||||
with (
|
||||
patch("shutil.which", return_value=str(fake_chattr)),
|
||||
patch("subprocess.run") as mock_run,
|
||||
):
|
||||
_try_set_immutable(target, immutable=True)
|
||||
mock_run.assert_called_once()
|
||||
args = mock_run.call_args[0][0]
|
||||
assert "+i" in args
|
||||
|
||||
def test_chattr_called_clear(self, tmp_path: Path) -> None:
|
||||
target = tmp_path / "file.txt"
|
||||
target.write_text("data", encoding="utf-8")
|
||||
fake_chattr = tmp_path / "chattr"
|
||||
with (
|
||||
patch("shutil.which", return_value=str(fake_chattr)),
|
||||
patch("subprocess.run") as mock_run,
|
||||
):
|
||||
_try_set_immutable(target, immutable=False)
|
||||
args = mock_run.call_args[0][0]
|
||||
assert "-i" in args
|
||||
|
||||
def test_oserror_swallowed(self, tmp_path: Path) -> None:
|
||||
target = tmp_path / "file.txt"
|
||||
target.write_text("data", encoding="utf-8")
|
||||
with (
|
||||
patch("shutil.which", return_value="/usr/bin/chattr"),
|
||||
patch("subprocess.run", side_effect=OSError("no permission")),
|
||||
):
|
||||
_try_set_immutable(target, immutable=True) # must not raise
|
||||
|
||||
def test_timeout_swallowed(self, tmp_path: Path) -> None:
|
||||
import subprocess
|
||||
|
||||
target = tmp_path / "file.txt"
|
||||
target.write_text("data", encoding="utf-8")
|
||||
with (
|
||||
patch("shutil.which", return_value="/usr/bin/chattr"),
|
||||
patch(
|
||||
"subprocess.run",
|
||||
side_effect=subprocess.TimeoutExpired(cmd="chattr", timeout=5),
|
||||
),
|
||||
):
|
||||
_try_set_immutable(target, immutable=True) # must not raise
|
||||
|
||||
|
||||
class TestLockAndUnlock:
|
||||
def test_lock_enforcement_files(self, tmp_path: Path) -> None:
|
||||
cfg = tmp_path / "config.json"
|
||||
cfg.write_text("{}", encoding="utf-8")
|
||||
approved = tmp_path / "approved.json"
|
||||
approved.write_text("[]", encoding="utf-8")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"python_pkg.steam_backlog_enforcer._whitelist.APPROVED_EXCEPTIONS_FILE",
|
||||
approved,
|
||||
),
|
||||
patch("shutil.which", return_value="/usr/bin/chattr"),
|
||||
patch("subprocess.run") as mock_run,
|
||||
):
|
||||
lock_enforcement_files(cfg)
|
||||
assert mock_run.call_count == 2
|
||||
all_calls = [c[0][0] for c in mock_run.call_args_list]
|
||||
assert all("+i" in c for c in all_calls)
|
||||
|
||||
def test_unlock_for_write(self, tmp_path: Path) -> None:
|
||||
target = tmp_path / "file.txt"
|
||||
target.write_text("data", encoding="utf-8")
|
||||
with (
|
||||
patch("shutil.which", return_value="/usr/bin/chattr"),
|
||||
patch("subprocess.run") as mock_run,
|
||||
):
|
||||
unlock_for_write(target)
|
||||
args = mock_run.call_args[0][0]
|
||||
assert "-i" in args
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Persistence helpers (_load_pending, _save_pending, etc.)
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestPersistence:
|
||||
def test_load_pending_missing_file(self) -> None:
|
||||
assert _load_pending() == []
|
||||
|
||||
def test_load_pending_corrupt_file(self, tmp_path: Path) -> None:
|
||||
bad = tmp_path / "pending.json"
|
||||
bad.write_text("not json{{", encoding="utf-8")
|
||||
with patch(
|
||||
"python_pkg.steam_backlog_enforcer._whitelist.PENDING_EXCEPTIONS_FILE",
|
||||
bad,
|
||||
):
|
||||
assert _load_pending() == []
|
||||
|
||||
def test_load_pending_non_list(self, tmp_path: Path) -> None:
|
||||
bad = tmp_path / "pending.json"
|
||||
bad.write_text('{"key": "value"}', encoding="utf-8")
|
||||
with patch(
|
||||
"python_pkg.steam_backlog_enforcer._whitelist.PENDING_EXCEPTIONS_FILE",
|
||||
bad,
|
||||
):
|
||||
assert _load_pending() == []
|
||||
|
||||
def test_save_and_load_pending_roundtrip(self) -> None:
|
||||
entries: list[dict[str, object]] = [
|
||||
{"app_id": 440, "reason": "test", "requested_at": 12345.0}
|
||||
]
|
||||
_save_pending(entries)
|
||||
assert _load_pending() == entries
|
||||
|
||||
def test_load_approved_missing_file(self) -> None:
|
||||
assert _load_approved() == []
|
||||
|
||||
def test_load_approved_corrupt_file(self, tmp_path: Path) -> None:
|
||||
bad = tmp_path / "approved.json"
|
||||
bad.write_text("{{broken", encoding="utf-8")
|
||||
with patch(
|
||||
"python_pkg.steam_backlog_enforcer._whitelist.APPROVED_EXCEPTIONS_FILE",
|
||||
bad,
|
||||
):
|
||||
assert _load_approved() == []
|
||||
|
||||
def test_load_approved_non_list(self, tmp_path: Path) -> None:
|
||||
bad = tmp_path / "approved.json"
|
||||
bad.write_text('"just a string"', encoding="utf-8")
|
||||
with patch(
|
||||
"python_pkg.steam_backlog_enforcer._whitelist.APPROVED_EXCEPTIONS_FILE",
|
||||
bad,
|
||||
):
|
||||
assert _load_approved() == []
|
||||
|
||||
def test_save_approved_roundtrip(self) -> None:
|
||||
entries: list[dict[str, object]] = [
|
||||
{"app_id": 730, "reason": "cs2", "approved_at": 99999.0}
|
||||
]
|
||||
with (
|
||||
patch("shutil.which", return_value=None), # skip chattr
|
||||
):
|
||||
_save_approved(entries)
|
||||
assert _load_approved() == entries
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Audit log
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestAppendAuditLog:
|
||||
def test_audit_log_written(self, tmp_path: Path) -> None:
|
||||
log_file = tmp_path / "audit.log"
|
||||
with patch(
|
||||
"python_pkg.steam_backlog_enforcer._whitelist.EXCEPTION_AUDIT_LOG",
|
||||
log_file,
|
||||
):
|
||||
_append_audit_log(440, "some reason", "REQUESTED")
|
||||
content = log_file.read_text(encoding="utf-8")
|
||||
assert "REQUESTED" in content
|
||||
assert "app_id=440" in content
|
||||
assert "some reason" in content
|
||||
|
||||
def test_audit_log_appends(self, tmp_path: Path) -> None:
|
||||
log_file = tmp_path / "audit.log"
|
||||
with patch(
|
||||
"python_pkg.steam_backlog_enforcer._whitelist.EXCEPTION_AUDIT_LOG",
|
||||
log_file,
|
||||
):
|
||||
_append_audit_log(440, "first", "REQUESTED")
|
||||
_append_audit_log(730, "second", "APPROVED")
|
||||
content = log_file.read_text(encoding="utf-8")
|
||||
assert "app_id=440" in content
|
||||
assert "app_id=730" in content
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# add_pending_exception
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestAddPendingException:
|
||||
def test_add_new_exception(self) -> None:
|
||||
with patch("shutil.which", return_value=None):
|
||||
msg = add_pending_exception(440, _VALID_REASON)
|
||||
assert "440" in msg
|
||||
assert "24h" in msg or "hours" in msg or "active" in msg.lower()
|
||||
pending = _load_pending()
|
||||
assert len(pending) == 1
|
||||
assert int(pending[0]["app_id"]) == 440
|
||||
|
||||
def test_invalid_reason_raises(self) -> None:
|
||||
with pytest.raises(
|
||||
ValueError, match=r"short|words|entropy|repeated|repetitive"
|
||||
):
|
||||
add_pending_exception(440, "too short")
|
||||
|
||||
def test_already_approved_raises(self) -> None:
|
||||
approved: list[dict[str, object]] = [
|
||||
{"app_id": 440, "reason": _VALID_REASON, "approved_at": 0.0}
|
||||
]
|
||||
_save_approved(approved)
|
||||
with (
|
||||
patch("shutil.which", return_value=None),
|
||||
pytest.raises(ValueError, match="already in the approved"),
|
||||
):
|
||||
add_pending_exception(440, _VALID_REASON)
|
||||
|
||||
def test_already_pending_cooldown_remaining(self) -> None:
|
||||
existing: list[dict[str, object]] = [
|
||||
{
|
||||
"app_id": 440,
|
||||
"reason": _VALID_REASON,
|
||||
"requested_at": time.time(), # just now → full cooldown remaining
|
||||
}
|
||||
]
|
||||
_save_pending(existing)
|
||||
with patch("shutil.which", return_value=None):
|
||||
msg = add_pending_exception(440, _VALID_REASON)
|
||||
assert "already pending" in msg
|
||||
|
||||
def test_already_pending_cooldown_elapsed_promotes(self) -> None:
|
||||
past = time.time() - WHITELIST_COOLDOWN_SECONDS - 1
|
||||
existing: list[dict[str, object]] = [
|
||||
{"app_id": 440, "reason": _VALID_REASON, "requested_at": past}
|
||||
]
|
||||
_save_pending(existing)
|
||||
with patch("shutil.which", return_value=None):
|
||||
msg = add_pending_exception(440, _VALID_REASON)
|
||||
# The elapsed entry is broken out of the pending-check loop via break,
|
||||
# then a new entry is appended → still gets the "Will become active" msg.
|
||||
assert "440" in msg
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# promote_pending_exceptions
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestPromotePendingExceptions:
|
||||
def test_no_entries(self) -> None:
|
||||
result = promote_pending_exceptions()
|
||||
assert result == []
|
||||
|
||||
def test_cooldown_not_elapsed(self) -> None:
|
||||
entries: list[dict[str, object]] = [
|
||||
{"app_id": 440, "reason": _VALID_REASON, "requested_at": time.time()}
|
||||
]
|
||||
_save_pending(entries)
|
||||
with patch("shutil.which", return_value=None):
|
||||
result = promote_pending_exceptions()
|
||||
assert result == []
|
||||
# Still pending
|
||||
assert len(_load_pending()) == 1
|
||||
|
||||
def test_cooldown_elapsed_promotes(self) -> None:
|
||||
past = time.time() - WHITELIST_COOLDOWN_SECONDS - 1
|
||||
entries: list[dict[str, object]] = [
|
||||
{"app_id": 440, "reason": _VALID_REASON, "requested_at": past}
|
||||
]
|
||||
_save_pending(entries)
|
||||
with patch("shutil.which", return_value=None):
|
||||
result = promote_pending_exceptions()
|
||||
assert 440 in result
|
||||
assert _load_pending() == []
|
||||
approved_ids = get_approved_exception_ids()
|
||||
assert 440 in approved_ids
|
||||
|
||||
def test_already_in_approved_not_duplicated(self) -> None:
|
||||
"""If somehow already approved, skip duplicating it."""
|
||||
past = time.time() - WHITELIST_COOLDOWN_SECONDS - 1
|
||||
entries: list[dict[str, object]] = [
|
||||
{"app_id": 440, "reason": _VALID_REASON, "requested_at": past}
|
||||
]
|
||||
_save_pending(entries)
|
||||
existing_approved: list[dict[str, object]] = [
|
||||
{"app_id": 440, "reason": _VALID_REASON, "approved_at": 0.0}
|
||||
]
|
||||
_save_approved(existing_approved)
|
||||
with patch("shutil.which", return_value=None):
|
||||
result = promote_pending_exceptions()
|
||||
# Not added again
|
||||
assert 440 not in result
|
||||
approved = _load_approved()
|
||||
assert sum(1 for e in approved if int(e["app_id"]) == 440) == 1
|
||||
|
||||
def test_pending_list_saved_when_entries_removed(self) -> None:
|
||||
"""_save_pending is called when the pending list shrinks."""
|
||||
past = time.time() - WHITELIST_COOLDOWN_SECONDS - 1
|
||||
future = time.time()
|
||||
entries: list[dict[str, object]] = [
|
||||
{"app_id": 440, "reason": _VALID_REASON, "requested_at": past},
|
||||
{"app_id": 730, "reason": _VALID_REASON, "requested_at": future},
|
||||
]
|
||||
_save_pending(entries)
|
||||
with patch("shutil.which", return_value=None):
|
||||
result = promote_pending_exceptions()
|
||||
assert 440 in result
|
||||
remaining = _load_pending()
|
||||
assert len(remaining) == 1
|
||||
assert int(remaining[0]["app_id"]) == 730
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# get_approved_exception_ids
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestGetApprovedExceptionIds:
|
||||
def test_empty(self) -> None:
|
||||
result = get_approved_exception_ids()
|
||||
assert result == frozenset()
|
||||
|
||||
def test_populated(self) -> None:
|
||||
approved: list[dict[str, object]] = [
|
||||
{"app_id": 440, "reason": "r", "approved_at": 0.0},
|
||||
{"app_id": 730, "reason": "r", "approved_at": 0.0},
|
||||
]
|
||||
_save_approved(approved)
|
||||
with patch("shutil.which", return_value=None):
|
||||
pass
|
||||
result = get_approved_exception_ids()
|
||||
assert result == frozenset({440, 730})
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# list_pending_exceptions
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestListPendingExceptions:
|
||||
def test_returns_copy(self) -> None:
|
||||
"""Mutating the result must not affect stored state."""
|
||||
entries: list[dict[str, object]] = [
|
||||
{"app_id": 440, "reason": "r", "requested_at": 1.0}
|
||||
]
|
||||
_save_pending(entries)
|
||||
result = list_pending_exceptions()
|
||||
result.clear()
|
||||
# Still present on next load
|
||||
assert len(_load_pending()) == 1
|
||||
|
||||
def test_empty_when_no_pending(self) -> None:
|
||||
assert list_pending_exceptions() == []
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Extra coverage for validate_reason branches 94 & 106
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestValidateReasonExtraBranches:
|
||||
"""Cover lines 94 and 106 that need multi-word, long, low-entropy inputs."""
|
||||
|
||||
def test_low_entropy_multi_word(self) -> None:
|
||||
# 8 words, 31+ chars, entropy ≈ 2.0 (< 3.0), no char run, no alt pattern
|
||||
err = validate_reason("the the the the the the the the")
|
||||
assert err is not None
|
||||
assert "entropy" in err
|
||||
|
||||
def test_alternating_pattern_multi_word(self) -> None:
|
||||
# "abababab" satisfies (..)(\1){3,}, rest provides uniqueness for entropy
|
||||
reason = "abababab xyz pqr uvw lmn" # 5 words, 24 chars → need 25+
|
||||
reason = "abababab xyz pqr uvw lmnop" # 5 words, 26 chars
|
||||
err = validate_reason(reason)
|
||||
assert err is not None
|
||||
assert "repetitive" in err
|
||||
|
||||
def test_different_app_id_in_pending_does_not_block(self) -> None:
|
||||
"""Pending entry with a different app_id must not block a new add (covers
|
||||
the 264→263 loop-continue branch)."""
|
||||
other: list[dict[str, object]] = [
|
||||
{"app_id": 440, "reason": _VALID_REASON, "requested_at": 1.0}
|
||||
]
|
||||
_save_pending(other)
|
||||
with patch("shutil.which", return_value=None):
|
||||
msg = add_pending_exception(730, _VALID_REASON)
|
||||
assert "730" in msg
|
||||
pending = _load_pending()
|
||||
assert len(pending) == 2 # original + new
|
||||
Loading…
Reference in New Issue
Block a user