testsAndMisc/python_pkg/steam_backlog_enforcer/enforcer.py
Krzysztof kuhy Rudnicki cec80c0cb0 feat(steam_backlog_enforcer): harden whitelist against circumvention
- Remove skip_app_ids from user-editable Config; callers updated
- Split PROTECTED_APP_IDS: only Steam infra/Proton IDs remain; game
  IDs moved to a new time-locked exception system
- Add _whitelist.py: 24-hour cooldown on new exceptions, entropy-
  checked justification (>= 5 words), append-only audit log,
  chattr +i immutability on enforcement-critical config files
- Add is_protected_app() in game_install.py; used everywhere
  instead of direct PROTECTED_APP_IDS membership checks
- Add 'add-exception' CLI command (cmd_add_exception in main.py)
- Call promote_pending_exceptions() and lock_enforcement_files()
  in each _enforce_loop_iteration
- 590 tests, 100% branch coverage on all steam_backlog_enforcer modules
- Add .worktrees to .gitignore
2026-05-17 20:44:05 +02:00

98 lines
2.7 KiB
Python

"""Enforce that only the assigned game may run."""
from __future__ import annotations
import logging
import os
from pathlib import Path
import shutil
import signal
import subprocess
from python_pkg.steam_backlog_enforcer.game_install import (
is_protected_app,
)
logger = logging.getLogger(__name__)
def get_running_steam_game_pids() -> dict[int, int]:
"""Scan /proc to find running Steam game processes.
Returns: dict mapping PID -> SteamAppId.
"""
running: dict[int, int] = {}
proc = Path("/proc")
for entry in proc.iterdir():
if not entry.name.isdigit():
continue
try:
environ = (entry / "environ").read_bytes()
pairs = environ.split(b"\x00")
for pair in pairs:
if pair.startswith(b"SteamAppId="):
value = pair.split(b"=", 1)[1].decode("utf-8", errors="replace")
if value.isdigit():
running[int(entry.name)] = int(value)
break
except (PermissionError, OSError, ValueError):
continue
return running
def enforce_allowed_game(
allowed_app_id: int | None,
*,
kill_unauthorized: bool = True,
) -> list[tuple[int, int]]:
"""Check running games; optionally kill unauthorized ones.
Returns list of (pid, app_id) that were killed or detected.
"""
if allowed_app_id is None:
return []
running = get_running_steam_game_pids()
violations: list[tuple[int, int]] = []
for pid, app_id in running.items():
if allowed_app_id is not None and app_id == allowed_app_id:
continue
# Skip Steam client itself (app_id 0 or very low IDs).
if app_id == 0:
continue
if is_protected_app(app_id):
continue
violations.append((pid, app_id))
if kill_unauthorized:
kill_process(pid, app_id)
return violations
def kill_process(pid: int, app_id: int) -> None:
"""Kill a process by PID."""
try:
logger.warning("Killing unauthorized game (AppID=%d, PID=%d)", app_id, pid)
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
logger.debug("Process %d already gone.", pid)
except PermissionError:
logger.exception("No permission to kill PID %d.", pid)
def send_notification(title: str, body: str) -> None:
"""Send a desktop notification."""
_notify_send = shutil.which("notify-send") or "/usr/bin/notify-send"
try:
subprocess.run(
[_notify_send, title, body, "--icon=dialog-warning"],
capture_output=True,
timeout=5,
check=False,
)
except (FileNotFoundError, OSError):
logger.debug("notify-send not available.")