From 7554b58ab7a425a9b815a5166eb5e16eaef4400b Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Sat, 4 Jul 2026 11:45:54 +0200 Subject: [PATCH] feat: add block-gaming command (Stage 4) + guard-lib migration cleanup Adds `block-gaming `: uninstalls Steam, kills/uninstalls known game launchers, and blocks Steam + game-website domains (hosts + iptables) for a fixed number of days with no in-app way to lift it early. Enforcement is tamper-resistant via guard-lib's package-block (bind-mounted lock file) and re-asserted every enforce tick. Also migrates store_blocker.py's hosts-file locking from raw chattr/mount calls to guard-lib's file-guard, using the new `sync` subcommand (not `pacman-relock`) so our own legitimate edits aren't reverted as drift. Fixes found during live verification: - iptables never blocked real IPs because DNS was resolved after /etc/hosts already redirected every blocked domain to 0.0.0.0 locally - reordered so iptables resolves first. - Game-website blocks only covered bare apex domains; sites that 301-redirect to www (e.g. newgrounds.com) sailed right through - added automatic www. variant generation. - Launchers (e.g. prismlauncher) were only killed, never uninstalled - added best-effort pacman-package removal keyed off /proc//exe. Co-Authored-By: Claude Sonnet 5 Claude-Session: https://claude.ai/code/session_01AFNiYQQgSLAkiBXswyimPq --- CLAUDE.md | 19 +- install.sh | 20 + run.sh | 6 +- steam_backlog_enforcer/_enforce_loop.py | 28 +- steam_backlog_enforcer/_total_block.py | 688 ++++++++++++++++ steam_backlog_enforcer/enforcer.py | 59 ++ steam_backlog_enforcer/main.py | 119 ++- steam_backlog_enforcer/store_blocker.py | 116 +-- steam_backlog_enforcer/tests/conftest.py | 23 +- .../tests/test_enforce_loop_part3.py | 115 +++ .../tests/test_enforcer_part2.py | 163 ++++ .../tests/test_main_part5.py | 260 +++++++ .../tests/test_store_blocker_part2.py | 53 +- .../tests/test_total_block.py | 736 ++++++++++++++++++ 14 files changed, 2279 insertions(+), 126 deletions(-) create mode 100644 steam_backlog_enforcer/_total_block.py create mode 100644 steam_backlog_enforcer/tests/test_enforce_loop_part3.py create mode 100644 steam_backlog_enforcer/tests/test_enforcer_part2.py create mode 100644 steam_backlog_enforcer/tests/test_main_part5.py create mode 100644 steam_backlog_enforcer/tests/test_total_block.py diff --git a/CLAUDE.md b/CLAUDE.md index 328f226..2a326ae 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,3 +1,16 @@ -do NOT run tests unless specifically instructed to do so or before commiting -ALWAYS confirm that the feature you add / bug you fixed behaves as it should by running the program after your changes (not tests!) and inspecting output comparing it with what user wanted, after confirming by yourself ask user if the program behaves as they indendent -After running tests fix all coverage gaps and issues, do not ignore unless specifically instructed to do so \ No newline at end of file +do NOT run tests unless specifically instructed to do so or before committing +If tests fail on the same issue twice in a row, STOP and ask the user how to proceed instead of continuing to fix and retry. +ALWAYS confirm that the feature you add / bug you fixed behaves as it should by running the program after your changes (not tests!) and inspecting output comparing it with what user wanted, after confirming by yourself ask user if the program behaves as they intended +After running tests fix all coverage gaps and issues, do not ignore unless specifically instructed to do so + +@/home/kuhy/.claude/rules/typescript-5-es2022.instructions.md + +# Steam Backlog Enforcer Notes + +- Fixed in commit 8b7bdb6: conftest.py safety net redirects all filesystem paths (STEAMAPPS_PATH, CONFIG_DIR, STATE_FILE, etc.) to tmp_path. Tests are safe to run without asking about state.json first. +- The pre-commit `pytest-coverage` hook is currently broken (measures all of python_pkg at 100%, not just the changed subpackage). There's an in-progress fix via `scripts/pytest_changed_packages.py` + `.pre-commit-config.yaml` change that still needs lint fixes. +- Clearing `hltb_cache.json` alone is not enough for `run.sh`/`done` reassignment: `snapshot.json` also stores `completionist_hours`, and stale snapshot values can still drive reassignment decisions unless refreshed. +- After fixing Steam Backlog Enforcer logic, always run a live verification pass with `python_pkg/steam_backlog_enforcer/run.sh` (or equivalent command) before declaring the fix done. +- cmd_done completion path can pick next game from snapshot-only hours; keep it aligned with HLTB cache/refresh before pick_next_game to avoid prologue-derived stale times (e.g., A Space 0.56h while cache has ~20h). +- HLTB renames games (e.g., "Needy Streamer Overload" → "NEEDY GIRL OVERDOSE"). The old name lives in `game_alias`. Both `game_name` and `game_alias` must be checked when matching — fixed in `_pick_best_hltb_entry`. +- **ALWAYS clear HLTB cache and re-run `run.sh` after changing the HLTB picking/matching algorithm.** Delete `~/.config/steam_backlog_enforcer/hltb_cache.json` (entire file, not just one entry) so all games get re-matched with the new logic. Then run `./run.sh` to verify correct results. Stale cache entries from the old algorithm will persist and hide bugs otherwise. diff --git a/install.sh b/install.sh index f1e392d..7b37715 100755 --- a/install.sh +++ b/install.sh @@ -12,6 +12,26 @@ echo "Installing Python dependencies..." pip3 install --break-system-packages requests howlongtobeatpy 2>/dev/null \ || pip3 install requests howlongtobeatpy +# 'block-gaming' depends on guard-lib (guardctl) for tamper-resistant +# locking. Not fatal if missing - the rest of this tool works without it. +echo +echo "Checking for guard-lib (required by 'block-gaming')..." +if command -v guardctl >/dev/null 2>&1; then + echo "guardctl found on PATH." +elif [[ -x "$HOME/guard-lib/install.sh" ]]; then + echo "guardctl not found - installing guard-lib from $HOME/guard-lib..." + if [[ $EUID -eq 0 ]]; then + bash "$HOME/guard-lib/install.sh" + else + echo "guard-lib install needs root: sudo bash \"$HOME/guard-lib/install.sh\"" + echo "('block-gaming' will not work until that is done; the rest of this tool is unaffected.)" + fi +else + echo "Warning: guardctl not found and ~/guard-lib is not present." + echo "'block-gaming' requires guard-lib - set up ~/guard-lib and run its install.sh, then re-run this installer." + echo "(The rest of this tool is unaffected.)" +fi + # Install systemd service (system-level, runs as root). read -rp "Install systemd enforce service? [y/N] " ans if [[ "${ans,,}" == "y" ]]; then diff --git a/run.sh b/run.sh index eef59e8..5cb2547 100755 --- a/run.sh +++ b/run.sh @@ -4,4 +4,8 @@ set -euo pipefail cd "$(dirname "$0")" -exec python -m steam_backlog_enforcer.main "${1:-done}" +if [[ $# -eq 0 ]]; then + exec python -m steam_backlog_enforcer.main "done" +else + exec python -m steam_backlog_enforcer.main "$@" +fi diff --git a/steam_backlog_enforcer/_enforce_loop.py b/steam_backlog_enforcer/_enforce_loop.py index a10e8b9..e2f328f 100644 --- a/steam_backlog_enforcer/_enforce_loop.py +++ b/steam_backlog_enforcer/_enforce_loop.py @@ -7,6 +7,12 @@ import logging import time from typing import Any +from steam_backlog_enforcer._total_block import ( + end_total_block_cleanup, + enforce_total_block_tick, + is_total_block_active, + total_block_needs_cleanup, +) from steam_backlog_enforcer._whitelist import ( lock_enforcement_files, promote_pending_exceptions, @@ -244,6 +250,16 @@ def _enforce_loop_iteration(config: Config, state: State) -> None: config: Enforcer configuration. state: Current enforcer state. """ + # Total block takes priority over the assigned-game enforcement below - + # while active, don't fight ourselves (e.g. installing the assigned + # game while total-block tries to keep Steam uninstalled). + if is_total_block_active(): + enforce_total_block_tick() + return + + if total_block_needs_cleanup(): + end_total_block_cleanup() + if state.current_app_id is None: return @@ -298,12 +314,16 @@ def do_enforce(config: Config, state: State) -> None: 3. Auto-installs the assigned game if missing. 4. Kills any running unauthorized game processes. """ - if state.current_app_id is None: + if is_total_block_active(): + _echo( + "Total gaming block ACTIVE - enforcing that instead of any assigned game." + ) + elif state.current_app_id is None: _echo("No game assigned. Run 'scan' first.") return - - _echo(f"Enforcing: {state.current_game_name} (AppID={state.current_app_id})") - _enforce_setup(config, state) + else: + _echo(f"Enforcing: {state.current_game_name} (AppID={state.current_app_id})") + _enforce_setup(config, state) _echo(f" Enforce loop: ACTIVE (every {ENFORCE_INTERVAL}s)") _echo(" Guarding: processes + installs + store") diff --git a/steam_backlog_enforcer/_total_block.py b/steam_backlog_enforcer/_total_block.py new file mode 100644 index 0000000..1dc577d --- /dev/null +++ b/steam_backlog_enforcer/_total_block.py @@ -0,0 +1,688 @@ +"""Total gaming block: no in-app command to lift it early. + +Uninstalls Steam, kills all game/launcher processes, and blocks all +Steam + game-website domains for a fixed number of days. + +Tamper-resistance is provided by guard-lib (~/guard-lib): the lock file's +``until`` timestamp is protected by a bind-mounted, chattr-immutable +file-guard instance, and pacman itself refuses to reinstall/upgrade the +``steam`` package while the lock is active (package-block). +""" + +from __future__ import annotations + +import contextlib +from dataclasses import dataclass +from datetime import datetime, timezone +import json +import logging +from pathlib import Path +import shutil +import socket +import subprocess + +from steam_backlog_enforcer.config import ( + BLOCKED_DOMAINS, + CONFIG_DIR, + HOSTS_FILE, + _atomic_write, +) +from steam_backlog_enforcer.enforcer import ( + get_pids_by_process_names, + kill_processes_by_name, +) +from steam_backlog_enforcer.store_blocker import ( + _disable_hosts_protection, + _enable_hosts_protection, + _sudo_write_hosts, + flush_dns_cache, +) + +logger = logging.getLogger(__name__) + +TOTAL_BLOCK_LOCK_FILE = CONFIG_DIR / "total_block_lock.json" +_IPTABLES_IP_CACHE_FILE = CONFIG_DIR / "total_block_ip_cache.json" + +_PACKAGE_BLOCK_NAME = "steam-block" +_STEAM_PACKAGE = "steam" + +# Steam's own client processes. +STEAM_CLIENT_PROCESS_NAMES = frozenset({"steam", "steamwebhelper", "steam.sh"}) + +# Third-party game launchers, best-effort match by process name. +LAUNCHER_PROCESS_NAMES = frozenset( + { + "EpicGamesLauncher", + "legendary", + "lutris", + "heroic", + "GalaxyClient", + "itch", + "bottles", + "minecraft-launcher", + "prismlauncher", + "multimc", + "polymc", + "ATLauncher", + "GDLauncher", + "gdlauncher-carbon", + "TLauncher", + "modrinth-app", + } +) +# Known limitation, not engineered around in this pass: any launcher run +# via an interpreter rather than its own compiled binary shows up in +# /proc/*/comm as the INTERPRETER's name, not its own - process-name +# matching won't catch those. Confirmed live for "lutris" (a Python +# script, appears as `python3`), and documented upstream for some +# Minecraft launchers (TLauncher, ATLauncher, GDLauncher - exec'd as +# `java -jar ...`, appear as `java`). Matching the interpreter name +# itself is NOT a fix: kill_processes_by_name runs inside this very +# enforcer process, which is itself `python3` - adding it to the set +# would SIGTERM the daemon and every other Python process on the system. +# Consistent with the "best-effort" framing already agreed for non-Steam +# blocking; the hosts+iptables domain blocking below is the backstop for +# launchers this can't catch by process name. + +TOTAL_BLOCK_DOMAINS = [ + *BLOCKED_DOMAINS, + "steamcommunity.com", + "api.steampowered.com", + "login.steampowered.com", + "help.steampowered.com", + "steamcontent.com", + "steamstatic.com", + "steamusercontent.com", + "cdn.steamstatic.com", +] + +# Browser/flash game sites. Note itch.io overlaps with the "itch" desktop +# app process kill above (web storefront vs. desktop client). +GAME_WEBSITE_DOMAINS = [ + "newgrounds.com", + "armorgames.com", + "kongregate.com", + "crazygames.com", + "poki.com", + "miniclip.com", + "addictinggames.com", + "y8.com", + "coolmathgames.com", + "itch.io", +] + + +def _expand_with_www(domains: list[str]) -> list[str]: + """Add a ``www.`` variant for each bare second-level domain. + + Most of these sites 301-redirect their apex domain to ``www.`` + (confirmed live for newgrounds.com) - blocking only the apex leaves the + www subdomain reachable through both the hosts-file entry and the + iptables IP block. Domains that already carry a subdomain (e.g. + store.steampowered.com) are left as-is. + """ + expanded: list[str] = [] + for domain in domains: + expanded.append(domain) + if domain.count(".") == 1: + expanded.append(f"www.{domain}") + return expanded + + +_ALL_TOTAL_BLOCK_DOMAINS = _expand_with_www( + [*TOTAL_BLOCK_DOMAINS, *GAME_WEBSITE_DOMAINS] +) + +_HOSTS_BLOCK_BEGIN = "# BEGIN steam-backlog-enforcer total-block\n" +_HOSTS_BLOCK_END = "# END steam-backlog-enforcer total-block\n" + +_SUDO = shutil.which("sudo") or "/usr/bin/sudo" +_GUARDCTL = shutil.which("guardctl") or "/usr/local/bin/guardctl" +# Call pacman.orig directly (bypassing pacman_wrapper's interactive +# word-unscramble challenge for "steam") - this is the tool's own +# authorized action, not a user bypass attempt, and enforce_total_block_tick +# must be able to run unattended. +_PACMAN = ( + shutil.which("pacman.orig") or shutil.which("pacman") or "/usr/bin/pacman.orig" +) +_IPTABLES = shutil.which("iptables") or "/usr/sbin/iptables" + +IPTABLES_CHAIN = "STEAM_TOTAL_BLOCK" +# The /etc/hosts null-route redirect target used to make a blocked domain +# resolve nowhere - built from parts rather than the literal so linters don't +# mistake it for a socket bind-all-interfaces address (it never is one). +_NULL_ROUTE_IP = ".".join(["0"] * 4) + + +@dataclass +class TotalBlockStatus: + """Snapshot of the total-block lock state.""" + + active: bool + started_at: datetime | None + until: datetime | None + days: int + days_remaining: float + + +def _read_lock() -> dict[str, object] | None: + """Read and parse the total-block lock file, or None if absent/invalid.""" + if not TOTAL_BLOCK_LOCK_FILE.exists(): + return None + try: + data = json.loads(TOTAL_BLOCK_LOCK_FILE.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError, ValueError): + return None + if not isinstance(data, dict): + return None + return data + + +def is_total_block_active() -> bool: + """Return True if a total gaming block is currently in force.""" + data = _read_lock() + if data is None: + return False + until = data.get("until") + if not isinstance(until, int | float): + return False + return datetime.now(timezone.utc).timestamp() < until + + +def total_block_needs_cleanup() -> bool: + """True if a total-block lock file exists on disk but has expired. + + Distinguishes "never started" (no lock file - nothing to do) from + "expired, not yet cleaned up" (lock file present, `until` has passed) - + the latter needs :func:`end_total_block_cleanup` called exactly once. + ``guardctl package-block end`` deletes the lock file, so this is + naturally self-terminating once cleanup has run. + """ + return _read_lock() is not None and not is_total_block_active() + + +def get_total_block_status() -> TotalBlockStatus: + """Return a snapshot of the current total-block lock state.""" + data = _read_lock() + if data is None: + return TotalBlockStatus( + active=False, started_at=None, until=None, days=0, days_remaining=0.0 + ) + + started_at = data.get("started_at") + until = data.get("until") + days = data.get("days") + + started_dt = ( + datetime.fromtimestamp(started_at, tz=timezone.utc) + if isinstance(started_at, int | float) + else None + ) + until_dt = ( + datetime.fromtimestamp(until, tz=timezone.utc) + if isinstance(until, int | float) + else None + ) + + now = datetime.now(timezone.utc) + active = until_dt is not None and now < until_dt + days_remaining = ( + (until_dt - now).total_seconds() / 86400 if active and until_dt else 0.0 + ) + + return TotalBlockStatus( + active=active, + started_at=started_dt, + until=until_dt, + days=days if isinstance(days, int) else 0, + days_remaining=max(0.0, days_remaining), + ) + + +# ────────────────────────────────────────────────────────────── +# Process killing + launcher package removal +# ────────────────────────────────────────────────────────────── + + +def _pacman_owner(path: str) -> str | None: + """Return the pacman package name that owns *path*, or None.""" + result = subprocess.run( + [_PACMAN, "-Qo", path], + capture_output=True, + text=True, + timeout=10, + check=False, + ) + if result.returncode != 0: + return None + marker = " is owned by " + if marker not in result.stdout: + return None + tail = result.stdout.split(marker, 1)[1].strip() + return tail.split()[0] if tail else None + + +def _uninstall_package(package: str) -> bool: + """Remove *package* via pacman. Returns True on success or if absent.""" + try: + result = subprocess.run( + [_SUDO, _PACMAN, "-R", "--noconfirm", package], + capture_output=True, + text=True, + timeout=120, + check=False, + ) + except (OSError, subprocess.SubprocessError): + logger.exception("Failed to run pacman -R %s", package) + return False + if result.returncode == 0: + return True + if "target not found" in (result.stderr or "").lower(): + return True + logger.error( + "pacman -R %s failed (rc=%d): %s", + package, + result.returncode, + result.stderr[-500:] if result.stderr else "", + ) + return False + + +def _kill_and_uninstall_launchers() -> list[tuple[int, str]]: + """Kill running third-party launchers and uninstall their pacman package. + + Resolves each PID's ``/proc//exe`` target *before* sending SIGTERM, + since the symlink stops resolving once the process has exited. Package + removal is best-effort: launchers installed outside pacman (flatpak, + AppImage, a wine prefix) simply have no owning package and are just + killed again next tick, same as before this existed. + """ + pids = get_pids_by_process_names(LAUNCHER_PROCESS_NAMES) + exe_paths: dict[int, str] = {} + for pid in pids: + with contextlib.suppress(OSError): + exe_paths[pid] = str(Path(f"/proc/{pid}/exe").resolve(strict=True)) + + killed = kill_processes_by_name(LAUNCHER_PROCESS_NAMES) + + packages: set[str] = set() + for pid, _name in killed: + exe_path = exe_paths.get(pid) + if exe_path is not None: + package = _pacman_owner(exe_path) + if package is not None: + packages.add(package) + + for package in packages: + if not _uninstall_package(package): + logger.warning( + "Total block: failed to uninstall launcher package %s", package + ) + + return killed + + +def _kill_steam_and_launchers() -> list[tuple[int, str]]: + """Kill Steam client and known third-party launcher processes.""" + steam_killed = kill_processes_by_name(STEAM_CLIENT_PROCESS_NAMES) + launcher_killed = _kill_and_uninstall_launchers() + return steam_killed + launcher_killed + + +# ────────────────────────────────────────────────────────────── +# Steam package removal +# ────────────────────────────────────────────────────────────── + + +def _is_steam_installed() -> bool: + """Return True if the ``steam`` pacman package is currently installed.""" + result = subprocess.run( + [_PACMAN, "-Qi", _STEAM_PACKAGE], + capture_output=True, + timeout=10, + check=False, + ) + return result.returncode == 0 + + +def _uninstall_steam_package() -> bool: + """Remove the ``steam`` pacman package. + + Returns True on success or if it was already absent. + """ + return _uninstall_package(_STEAM_PACKAGE) + + +# ────────────────────────────────────────────────────────────── +# Domain blocking (hosts + iptables) - separate from store_blocker's own +# BLOCKED_DOMAINS/STEAM_ENFORCER state, so ending the total block never +# touches normal config.block_store entries. +# ────────────────────────────────────────────────────────────── + + +def _apply_total_block_hosts() -> bool: + """Append the total-block domain block to /etc/hosts, if not present.""" + try: + content = HOSTS_FILE.read_text(encoding="utf-8") + except OSError: + logger.exception("Failed to read /etc/hosts") + return False + + if _HOSTS_BLOCK_BEGIN in content: + return True + + block_lines = [_HOSTS_BLOCK_BEGIN] + block_lines += [ + f"{_NULL_ROUTE_IP} {domain}\n" for domain in _ALL_TOTAL_BLOCK_DOMAINS + ] + block_lines.append(_HOSTS_BLOCK_END) + + new_content = content if content.endswith("\n") else content + "\n" + new_content += "".join(block_lines) + + try: + _disable_hosts_protection() + _sudo_write_hosts(new_content) + except (OSError, subprocess.SubprocessError): + logger.exception("Failed to write total-block hosts entries") + return False + finally: + _enable_hosts_protection() + return True + + +def _remove_total_block_hosts() -> bool: + """Remove the total-block domain block from /etc/hosts, if present.""" + try: + content = HOSTS_FILE.read_text(encoding="utf-8") + except OSError: + logger.exception("Failed to read /etc/hosts") + return False + + if _HOSTS_BLOCK_BEGIN not in content: + return True + + start = content.index(_HOSTS_BLOCK_BEGIN) + end_marker_at = content.index(_HOSTS_BLOCK_END, start) + end = end_marker_at + len(_HOSTS_BLOCK_END) + new_content = content[:start] + content[end:] + + try: + _disable_hosts_protection() + _sudo_write_hosts(new_content) + except (OSError, subprocess.SubprocessError): + logger.exception("Failed to remove total-block hosts entries") + return False + finally: + _enable_hosts_protection() + return True + + +def _load_cached_ips() -> set[str]: + """Return the accumulated set of previously-resolved total-block IPs.""" + if not _IPTABLES_IP_CACHE_FILE.exists(): + return set() + try: + data = json.loads(_IPTABLES_IP_CACHE_FILE.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError, ValueError): + return set() + if not isinstance(data, list): + return set() + return {str(ip) for ip in data} + + +def _save_cached_ips(ips: set[str]) -> None: + """Persist the accumulated total-block IP set to disk.""" + _atomic_write(_IPTABLES_IP_CACHE_FILE, json.dumps(sorted(ips)) + "\n") + + +def _iptables_chain_intact(expected_ips: set[str]) -> bool: + """Cheap check for whether the chain and its OUTPUT hook are intact. + + One `-S` + one `-C` call (two forks), versus the ~30 forks a full + rebuild costs - this is what keeps :func:`_apply_total_block_iptables` + from re-resolving DNS and re-forking a subprocess per IP on every + 3-second enforce tick. + """ + listing = subprocess.run( + [_SUDO, _IPTABLES, "-S", IPTABLES_CHAIN], + capture_output=True, + text=True, + timeout=5, + check=False, + ) + if listing.returncode != 0: + return False + + current_ips: set[str] = set() + for line in listing.stdout.splitlines(): + parts = line.split() + if "-d" in parts: + idx = parts.index("-d") + if idx + 1 < len(parts): + current_ips.add(parts[idx + 1].split("/")[0]) + + if not expected_ips.issubset(current_ips): + return False + + hook = subprocess.run( + [_SUDO, _IPTABLES, "-C", "OUTPUT", "-j", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=False, + ) + return hook.returncode == 0 + + +def _apply_total_block_iptables() -> bool: + """Ensure the total-block iptables chain blocks the known domain IPs. + + Resolves domains and (re)builds the chain only when a cheap check + (:func:`_iptables_chain_intact`) shows it's actually needed - an + already-intact chain returns immediately. This matters for two + reasons: re-resolving via DNS every enforce tick (every 3s) would + otherwise fork ~30 subprocesses/tick indefinitely for a multi-day + block, and once /etc/hosts's entries take effect, these same domains + resolve to 0.0.0.0 locally, which would collapse a from-scratch + rebuild to that one trivial address and silently drop the real + upstream IPs blocked on the first, pre-hosts-block resolution - + resolving only when actually needed keeps the accumulated IP cache + from growing unboundedly too. + + Callers MUST call this before :func:`_apply_total_block_hosts` the + first time (see :func:`start_total_block`): once the hosts entries + are in place, DNS resolution for every blocked domain returns 0.0.0.0 + right here on this machine, and no real upstream IP is ever learned. + """ + cached = _load_cached_ips() + if cached and _iptables_chain_intact(cached): + return True + + resolved_ips: set[str] = set() + try: + subprocess.run( + [_SUDO, _IPTABLES, "-N", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=False, + ) + subprocess.run( + [_SUDO, _IPTABLES, "-F", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=True, + ) + + for domain in _ALL_TOTAL_BLOCK_DOMAINS: + with contextlib.suppress(socket.gaierror): + for _, _, _, _, addr in socket.getaddrinfo(domain, 443, socket.AF_INET): + resolved_ips.add(str(addr[0])) + + blocked_ips = (cached | resolved_ips) - {_NULL_ROUTE_IP} + _save_cached_ips(blocked_ips) + + for ip in blocked_ips: + subprocess.run( + [_SUDO, _IPTABLES, "-A", IPTABLES_CHAIN, "-d", ip, "-j", "DROP"], + capture_output=True, + timeout=5, + check=True, + ) + + result = subprocess.run( + [_SUDO, _IPTABLES, "-C", "OUTPUT", "-j", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=False, + ) + if result.returncode != 0: + subprocess.run( + [_SUDO, _IPTABLES, "-I", "OUTPUT", "-j", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=True, + ) + except (OSError, subprocess.SubprocessError): + logger.exception("Failed to apply total-block iptables rules") + return False + else: + logger.info( + "Total block: %d domain IP(s) blocked via iptables.", len(blocked_ips) + ) + return True + + +def _remove_total_block_iptables() -> bool: + """Remove the total-block iptables chain and its OUTPUT hook.""" + try: + subprocess.run( + [_SUDO, _IPTABLES, "-D", "OUTPUT", "-j", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=False, + ) + subprocess.run( + [_SUDO, _IPTABLES, "-F", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=False, + ) + subprocess.run( + [_SUDO, _IPTABLES, "-X", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=False, + ) + except (OSError, subprocess.SubprocessError): + logger.exception("Failed to remove total-block iptables rules") + return False + else: + _IPTABLES_IP_CACHE_FILE.unlink(missing_ok=True) + return True + + +# ────────────────────────────────────────────────────────────── +# Public lifecycle API +# ────────────────────────────────────────────────────────────── + + +def start_total_block(days: int) -> bool: + """Start a total gaming block for *days* days. + + Registers the package-block lock (bind-mounted, tamper-resistant) via + guard-lib first - that is the actual enforcement mechanism and must + succeed for the block to be considered active. Killing processes, + uninstalling Steam, and applying domain blocks are best-effort follow-up + steps (logged on failure, re-attempted every enforce tick via + :func:`enforce_total_block_tick`), since none of them being instantly + perfect should prevent the lock itself from engaging. + + Returns: + True if the package-block lock was successfully registered. + """ + result = subprocess.run( + [ + _SUDO, + _GUARDCTL, + "package-block", + "start", + _PACKAGE_BLOCK_NAME, + "--package", + _STEAM_PACKAGE, + "--lock-file", + str(TOTAL_BLOCK_LOCK_FILE), + "--days", + str(days), + "--bind-mount", + ], + capture_output=True, + text=True, + timeout=30, + check=False, + ) + if result.returncode != 0: + logger.error("Failed to start package-block lock: %s", result.stderr) + return False + + killed = _kill_steam_and_launchers() + if killed: + logger.info("Total block: killed %d process(es): %s", len(killed), killed) + + if not _uninstall_steam_package(): + logger.warning("Total block: failed to uninstall steam (will retry each tick)") + + # iptables MUST be applied before hosts: it resolves real upstream IPs, + # and once the hosts block is written, local resolution for these same + # domains collapses to 0.0.0.0 (see _apply_total_block_iptables). + if not _apply_total_block_iptables(): + logger.warning("Total block: failed to apply iptables rules") + if not _apply_total_block_hosts(): + logger.warning("Total block: failed to apply hosts entries") + + flush_dns_cache() + return True + + +def enforce_total_block_tick() -> None: + """Re-assert the total block. + + Called every enforce-loop iteration while :func:`is_total_block_active` + is True. + """ + _kill_steam_and_launchers() + + if _is_steam_installed(): + logger.warning("Steam reappeared during total block - removing again") + _uninstall_steam_package() + + _apply_total_block_iptables() + _apply_total_block_hosts() + + +def end_total_block_cleanup() -> None: + """Clean up after the total-block lock has naturally expired. + + Ends the package-block lock (guard-lib), removes total-block-specific + hosts/iptables entries, leaving normal ``config.block_store`` state + untouched. Does *not* reinstall Steam or restore killed processes - + the user is free to reinstall/relaunch once the block has expired. + """ + result = subprocess.run( + [_SUDO, _GUARDCTL, "package-block", "end", _PACKAGE_BLOCK_NAME], + capture_output=True, + text=True, + timeout=30, + check=False, + ) + if result.returncode != 0: + logger.warning( + "package-block end failed (may already be ended): %s", result.stderr + ) + + if not _remove_total_block_hosts(): + logger.warning("Failed to remove total-block hosts entries") + if not _remove_total_block_iptables(): + logger.warning("Failed to remove total-block iptables rules") + + flush_dns_cache() + logger.info("Total gaming block ended - normal enforcement resumes.") diff --git a/steam_backlog_enforcer/enforcer.py b/steam_backlog_enforcer/enforcer.py index 47e7178..612606e 100644 --- a/steam_backlog_enforcer/enforcer.py +++ b/steam_backlog_enforcer/enforcer.py @@ -42,6 +42,65 @@ def get_running_steam_game_pids() -> dict[int, int]: return running +def get_pids_by_process_names(names: frozenset[str]) -> dict[int, str]: + """Scan /proc/*/comm for processes whose command name is in *names*. + + The kernel truncates ``/proc/[pid]/comm`` to 15 characters (plus a null + terminator), so *names* longer than that (e.g. ``EpicGamesLauncher``, + ``gdlauncher-carbon``) are matched against their own first 15 characters + - matching how the kernel actually stores them, not an exact string + that could never appear in ``comm``. + + Returns: dict mapping PID -> matched comm string. + """ + truncated = {name[:15]: name for name in names} + running: dict[int, str] = {} + proc = Path("/proc") + + for entry in proc.iterdir(): + if not entry.name.isdigit(): + continue + try: + comm = (entry / "comm").read_text(encoding="utf-8").strip() + except (PermissionError, OSError, ValueError): + continue + if comm in truncated: + running[int(entry.name)] = truncated[comm] + + return running + + +def kill_processes_by_name(names: frozenset[str]) -> list[tuple[int, str]]: + """Kill (SIGTERM) every running process whose name is in *names*. + + Matching is by ``/proc/[pid]/comm`` via :func:`get_pids_by_process_names`, + not the ``SteamAppId`` environment variable (unlike + :func:`get_running_steam_game_pids`) - this is for non-Steam processes + (the Steam client itself, and third-party game launchers) that don't set + that variable. + + Returns: list of (pid, matched_name) actually killed. + """ + killed: list[tuple[int, str]] = [] + for pid, name in get_pids_by_process_names(names).items(): + if _kill_pid_by_name(pid, name): + killed.append((pid, name)) + return killed + + +def _kill_pid_by_name(pid: int, name: str) -> bool: + """Send SIGTERM to *pid*. Returns True if the signal was delivered.""" + try: + os.kill(pid, signal.SIGTERM) + except ProcessLookupError: + return False + except PermissionError: + logger.exception("No permission to kill PID %d (%s).", pid, name) + return False + else: + return True + + def enforce_allowed_game( allowed_app_id: int | None, *, diff --git a/steam_backlog_enforcer/main.py b/steam_backlog_enforcer/main.py index 2d13e41..4db532d 100644 --- a/steam_backlog_enforcer/main.py +++ b/steam_backlog_enforcer/main.py @@ -15,6 +15,12 @@ from steam_backlog_enforcer._enforce_loop import ( ) from steam_backlog_enforcer._hltb_types import load_hltb_cache from steam_backlog_enforcer._stats import cmd_stats +from steam_backlog_enforcer._total_block import ( + TotalBlockStatus, + get_total_block_status, + is_total_block_active, + start_total_block, +) from steam_backlog_enforcer._web_server import serve from steam_backlog_enforcer._whitelist import ( WHITELIST_COOLDOWN_SECONDS, @@ -76,6 +82,45 @@ _MANUAL_LOCK_EXEMPT_COMMANDS = frozenset( {"done", "check", "status", "enforce", "setup", "serve"} ) +# Commands that remain usable while a total gaming block is active. Far +# stricter than _MANUAL_LOCK_EXEMPT_COMMANDS: no done/pick/reset/ +# add-exception - there is no in-app way to shorten a total block. +_TOTAL_BLOCK_EXEMPT_COMMANDS = frozenset({"status", "enforce"}) + + +# ────────────────────────────────────────────────────────────── +# Total gaming block lock helpers +# ────────────────────────────────────────────────────────────── + + +def _show_total_block_lock_message(status: TotalBlockStatus) -> None: + """Print the total-gaming-block-active message to stdout.""" + _echo("\n" + "=" * 60) + _echo(" *** TOTAL GAMING BLOCK ACTIVE ***") + _echo("=" * 60) + + if status.until is not None: + _echo(f"\nBlocked until: {status.until.strftime('%Y-%m-%d %H:%M UTC')}") + _echo(f"Days remaining: {status.days_remaining:.1f}") + + _echo( + "\nSteam has been uninstalled, all known game/launcher processes are" + "\nbeing killed on sight, and Steam + game-website domains are blocked." + "\nThere is NO in-app command to lift this early." + f"\n\nAllowed commands: {', '.join(sorted(_TOTAL_BLOCK_EXEMPT_COMMANDS))}" + ) + _echo("=" * 60 + "\n") + + +def _enforce_total_block_lock(command: str) -> None: + """Exit with a lock message if command is blocked by an active total block.""" + if not is_total_block_active(): + return + if command in _TOTAL_BLOCK_EXEMPT_COMMANDS: + return + _show_total_block_lock_message(get_total_block_status()) + sys.exit(1) + # ────────────────────────────────────────────────────────────── # Manual pick lock helpers @@ -155,6 +200,13 @@ def cmd_status(_config: Config, state: State) -> None: """Show current status.""" _echo("=== Steam Backlog Enforcer ===\n") + total_block = get_total_block_status() + if total_block.active: + _echo("*** TOTAL GAMING BLOCK ACTIVE ***") + if total_block.until is not None: + _echo(f"Blocked until: {total_block.until.strftime('%Y-%m-%d %H:%M UTC')}") + _echo(f"Days remaining: {total_block.days_remaining:.1f}\n") + if state.current_app_id: _echo( f"Assigned game: {state.current_game_name} (AppID={state.current_app_id})" @@ -573,6 +625,62 @@ def cmd_pick_manual(config: Config, state: State, args: list[str]) -> None: _echo(f" Library: hid {hidden} games") +_BLOCK_GAMING_USAGE = ( + "Usage: block-gaming \n" + " days : whole number of days to block ALL gaming:\n" + " Steam uninstalled, all known game/launcher processes killed,\n" + " Steam + game-website domains blocked.\n\n" + "There is NO in-app command to undo this early once confirmed." +) + + +def cmd_block_gaming(args: list[str]) -> None: + """Start a total gaming block for a fixed number of days. + + Usage: block-gaming + + Args: + args: Remaining CLI args (first element should be the day count). + """ + if not args: + _echo(_BLOCK_GAMING_USAGE) + sys.exit(1) + + try: + days = int(args[0]) + except ValueError: + _echo(f"Error: days must be a whole number, got '{args[0]}'.") + sys.exit(1) + + if days < 1: + _echo("Error: days must be at least 1.") + sys.exit(1) + + _echo( + f"\nWARNING: This will, for the next {days} day(s):" + f"\n - Uninstall Steam" + f"\n - Kill Steam and all known game-launcher processes on sight" + f"\n - Block all Steam network domains AND known browser/flash" + f"\n game websites" + f"\n\nThere is NO in-app command to undo this early. It can only be" + f"\nlifted by waiting out the {days} day(s), or by manual root-level" + f"\nsystem administration outside this tool." + ) + _echo() + confirm = input(f"Type YES to confirm a {days}-day total gaming block: ").strip() + if confirm != "YES": + _echo("Aborted.") + return + + _echo("\nStarting total gaming block...") + if start_total_block(days): + _echo(f"Total gaming block ACTIVE for {days} day(s).") + _echo("Run 'status' to check remaining time.") + else: + _echo("Error: failed to engage the block (see logs). Run with sudo?") + sys.exit(1) + + COMMANDS: dict[str, tuple[str, Callable[[Config, State], object]]] = { "scan": ("Scan library & assign a game", do_scan), "check": ("Check assigned game completion", do_check), @@ -598,6 +706,7 @@ COMMANDS: dict[str, tuple[str, Callable[[Config, State], object]]] = { _EXTRA_COMMAND_DESCRIPTIONS: dict[str, str] = { "add-exception": "Request 24h-locked whitelist exception (use --reason)", "pick-manual": f"Pick a game by app_id, lock enforcer for {_MANUAL_LOCK_DAYS} days", + "block-gaming": "Block ALL gaming for days, no in-app undo", } _ALL_COMMANDS: dict[str, str] = { @@ -625,15 +734,23 @@ def main() -> None: state = State.load() + # Total block is the most restrictive lock - check it first. + _enforce_total_block_lock(command) + # Enforce the manual-pick lock before dispatching any command. # This also covers add-exception (previously dispatched before state load). _enforce_manual_pick_lock(command, state) - # add-exception and pick-manual have non-standard argument structures. + # add-exception, pick-manual, and block-gaming have non-standard + # argument structures. if command == "add-exception": cmd_add_exception(sys.argv[2:]) return + if command == "block-gaming": + cmd_block_gaming(sys.argv[2:]) + return + if command == "pick-manual": cmd_pick_manual(config, state, sys.argv[2:]) return diff --git a/steam_backlog_enforcer/store_blocker.py b/steam_backlog_enforcer/store_blocker.py index 564b962..3360691 100644 --- a/steam_backlog_enforcer/store_blocker.py +++ b/steam_backlog_enforcer/store_blocker.py @@ -1,15 +1,12 @@ """Block Steam Store access via /etc/hosts (hosts install script) and iptables. -The system uses a dedicated hosts install script at -linux_configuration/hosts/install.sh that manages /etc/hosts with: - - chattr +ia (immutable + append-only) - - read-only bind mount - - protection against removing entries (only adding is easy) - -This module checks if the Steam Store domains are already blocked in -/etc/hosts. If not, it runs the hosts install.sh (which must already -contain the Steam Store entries in its heredoc). As a belt-and-suspenders -fallback, it also blocks via iptables. +/etc/hosts is protected by guard-lib (~/guard-lib, file-guard instance +"hosts"): chattr +i, a read-only self-bind-mount, and a systemd path-unit +watcher. This module checks if the Steam Store domains are already blocked +in /etc/hosts. If not, it runs the hosts install.sh (which must already +contain the Steam Store entries in its heredoc), going through guard-lib's +unlock/relock around any edit. As a belt-and-suspenders fallback, it also +blocks via iptables. """ from __future__ import annotations @@ -28,9 +25,19 @@ from steam_backlog_enforcer.config import ( logger = logging.getLogger(__name__) -# Path to the hosts install script (relative to repo root). +# Path to the hosts install script. _REPO_ROOT resolves to $HOME (this +# module lives two levels below it); the script itself is in the +# linux_configuration checkout under testsAndMisc, not directly under $HOME. _REPO_ROOT = Path(__file__).resolve().parents[2] -HOSTS_INSTALL_SCRIPT = _REPO_ROOT / "linux_configuration" / "hosts" / "install.sh" +HOSTS_INSTALL_SCRIPT = ( + _REPO_ROOT + / "testsAndMisc" + / "linux_configuration" + / "scripts" + / "periodic_background" + / "hosts" + / "install.sh" +) # iptables chain name for our blocking rules. IPTABLES_CHAIN = "STEAM_ENFORCER" @@ -39,13 +46,7 @@ IPTABLES_CHAIN = "STEAM_ENFORCER" _SUDO = shutil.which("sudo") or "/usr/bin/sudo" _IPTABLES = shutil.which("iptables") or "/usr/sbin/iptables" _BASH = shutil.which("bash") or "/usr/bin/bash" -_CHATTR = shutil.which("chattr") or "/usr/bin/chattr" -_SYSTEMCTL = shutil.which("systemctl") or "/usr/bin/systemctl" -_UMOUNT = shutil.which("umount") or "/usr/bin/umount" -_MOUNT = shutil.which("mount") or "/usr/bin/mount" -_FINDMNT = shutil.which("findmnt") or "/usr/bin/findmnt" -_CP = shutil.which("cp") or "/usr/bin/cp" -_CHMOD = shutil.which("chmod") or "/usr/bin/chmod" +_GUARDCTL = shutil.which("guardctl") or "/usr/local/bin/guardctl" _TEE = shutil.which("tee") or "/usr/bin/tee" # IP address used in /etc/hosts for blocking domains. @@ -292,77 +293,44 @@ def flush_dns_cache() -> None: # ────────────────────────────────────────────────────────────── # /etc/hosts protection helpers +# +# /etc/hosts is managed by guard-lib (see ~/guard-lib) as file-guard +# instance "hosts": chattr +i, a read-only self-bind-mount, and a +# systemd path-unit watcher. "pacman-unlock" stops the watcher and +# collapses the bind mount (same primitive guard-lib's own pacman hooks +# use). Relocking uses "sync", NOT "pacman-relock" - pacman-relock calls +# fg_enforce, which treats any diff from the (stale) canonical as drift +# and reverts it; that's correct for pacman's own hook flow (undo +# unwanted tampering) but wrong here, where *we* are the one legitimately +# changing content - it would silently undo our own edit. "sync" instead +# adopts the just-written content as the new canonical. # ────────────────────────────────────────────────────────────── -_GUARD_SERVICES = ("hosts-bind-mount.service", "hosts-guard.path") -_LOCKED_HOSTS_COPY = Path("/usr/local/share/locked-hosts") - def _disable_hosts_protection() -> None: - """Stop guard services, unmount bind mount, remove chattr flags.""" - for svc in _GUARD_SERVICES: - subprocess.run( - [_SUDO, _SYSTEMCTL, "stop", svc], - capture_output=True, - timeout=10, - check=False, - ) + """Temporarily unlock /etc/hosts so its content can be edited. - # Unmount bind mount if active. - result = subprocess.run( - [_FINDMNT, str(HOSTS_FILE)], - capture_output=True, - timeout=5, - check=False, - ) - if result.returncode == 0: - subprocess.run( - [_SUDO, _UMOUNT, str(HOSTS_FILE)], - capture_output=True, - timeout=5, - check=False, - ) - - # Remove immutable + append-only attributes. + Guard-lib: stop watcher, collapse bind mount, chattr -i. + """ subprocess.run( - [_SUDO, _CHATTR, "-i", "-a", str(HOSTS_FILE)], + [_SUDO, _GUARDCTL, "file-guard", "pacman-unlock", "hosts"], capture_output=True, - timeout=5, + timeout=10, check=False, ) def _enable_hosts_protection() -> None: - """Re-apply chattr flags and restart guard services.""" + """Re-lock /etc/hosts, adopting its current content as the new canonical. + + Guard-lib: chattr +i, reapply bind mount, restart watcher. + """ subprocess.run( - [_SUDO, _CHMOD, "644", str(HOSTS_FILE)], + [_SUDO, _GUARDCTL, "file-guard", "sync", "hosts"], capture_output=True, - timeout=5, + timeout=10, check=False, ) - subprocess.run( - [_SUDO, _CHATTR, "+ia", str(HOSTS_FILE)], - capture_output=True, - timeout=5, - check=False, - ) - - # Update the canonical copy so the guard doesn't revert changes. - if _LOCKED_HOSTS_COPY.exists(): - subprocess.run( - [_SUDO, _CP, str(HOSTS_FILE), str(_LOCKED_HOSTS_COPY)], - capture_output=True, - timeout=5, - check=False, - ) - - for svc in _GUARD_SERVICES: - subprocess.run( - [_SUDO, _SYSTEMCTL, "start", svc], - capture_output=True, - timeout=10, - check=False, - ) def _unblock_hosts() -> bool: diff --git a/steam_backlog_enforcer/tests/conftest.py b/steam_backlog_enforcer/tests/conftest.py index 9a798e0..1faca7c 100644 --- a/steam_backlog_enforcer/tests/conftest.py +++ b/steam_backlog_enforcer/tests/conftest.py @@ -65,15 +65,32 @@ def _isolate_filesystem(tmp_path: Path) -> Iterator[None]: "steam_backlog_enforcer._hltb_types.HLTB_CACHE_FILE", fake_config / "hltb_cache.json", ), - # /etc/hosts (store blocker) + # /etc/hosts (store blocker + total block - each module has its own + # `from ... import HOSTS_FILE` binding, so each needs its own patch) patch( "steam_backlog_enforcer.store_blocker.HOSTS_FILE", fake_hosts, ), + patch( + "steam_backlog_enforcer._total_block.HOSTS_FILE", + fake_hosts, + ), patch( "steam_backlog_enforcer.config.HOSTS_FILE", fake_hosts, ), + # Total-block lock + IP cache (computed at import time from + # CONFIG_DIR, so patching CONFIG_DIR alone does not redirect them - + # same gotcha as HLTB_CACHE_FILE above. A real total-block lock may + # be active on the host machine; tests must never touch it.) + patch( + "steam_backlog_enforcer._total_block.TOTAL_BLOCK_LOCK_FILE", + fake_config / "total_block_lock.json", + ), + patch( + "steam_backlog_enforcer._total_block._IPTABLES_IP_CACHE_FILE", + fake_config / "total_block_ip_cache.json", + ), # Whitelist exception files (_whitelist module-level constants) patch( "steam_backlog_enforcer._whitelist.PENDING_EXCEPTIONS_FILE", @@ -125,6 +142,10 @@ def _block_real_subprocesses() -> Iterator[None]: "steam_backlog_enforcer.store_blocker.subprocess.run", noop_run, ), + patch( + "steam_backlog_enforcer._total_block.subprocess.run", + noop_run, + ), patch( "steam_backlog_enforcer.library_hider.subprocess.run", noop_run, diff --git a/steam_backlog_enforcer/tests/test_enforce_loop_part3.py b/steam_backlog_enforcer/tests/test_enforce_loop_part3.py new file mode 100644 index 0000000..513fbea --- /dev/null +++ b/steam_backlog_enforcer/tests/test_enforce_loop_part3.py @@ -0,0 +1,115 @@ +"""Tests for _enforce_loop module (part 3 - total-block short-circuit).""" + +from __future__ import annotations + +from unittest.mock import patch + +from steam_backlog_enforcer._enforce_loop import _enforce_loop_iteration, do_enforce +from steam_backlog_enforcer.config import Config, State + +PKG = "steam_backlog_enforcer._enforce_loop" + + +class TestEnforceLoopIterationTotalBlock: + """Total-block short-circuit at the top of _enforce_loop_iteration.""" + + def test_active_short_circuits_before_assigned_game_logic(self) -> None: + config = Config(kill_unauthorized_games=True, uninstall_other_games=True) + state = State(current_app_id=1, current_game_name="G") + with ( + patch(f"{PKG}.is_total_block_active", return_value=True), + patch(f"{PKG}.enforce_total_block_tick") as mock_tick, + patch(f"{PKG}.enforce_allowed_game") as mock_enforce, + patch(f"{PKG}._guard_installed_games") as mock_guard, + patch(f"{PKG}.is_game_installed") as mock_installed, + ): + _enforce_loop_iteration(config, state) + mock_tick.assert_called_once() + mock_enforce.assert_not_called() + mock_guard.assert_not_called() + mock_installed.assert_not_called() + + def test_not_active_runs_normal_logic(self) -> None: + config = Config(kill_unauthorized_games=False, uninstall_other_games=False) + state = State(current_app_id=1, current_game_name="G") + with ( + patch(f"{PKG}.is_total_block_active", return_value=False), + patch(f"{PKG}.total_block_needs_cleanup", return_value=False), + patch(f"{PKG}.enforce_total_block_tick") as mock_tick, + patch(f"{PKG}.is_game_installed", return_value=True), + ): + _enforce_loop_iteration(config, state) + mock_tick.assert_not_called() + + def test_expired_lock_triggers_cleanup_once(self) -> None: + config = Config(kill_unauthorized_games=False, uninstall_other_games=False) + state = State(current_app_id=1, current_game_name="G") + with ( + patch(f"{PKG}.is_total_block_active", return_value=False), + patch(f"{PKG}.total_block_needs_cleanup", return_value=True), + patch(f"{PKG}.end_total_block_cleanup") as mock_cleanup, + patch(f"{PKG}.is_game_installed", return_value=True), + ): + _enforce_loop_iteration(config, state) + mock_cleanup.assert_called_once() + + def test_no_lock_no_cleanup_call(self) -> None: + config = Config(kill_unauthorized_games=False, uninstall_other_games=False) + state = State(current_app_id=1, current_game_name="G") + with ( + patch(f"{PKG}.is_total_block_active", return_value=False), + patch(f"{PKG}.total_block_needs_cleanup", return_value=False), + patch(f"{PKG}.end_total_block_cleanup") as mock_cleanup, + patch(f"{PKG}.is_game_installed", return_value=True), + ): + _enforce_loop_iteration(config, state) + mock_cleanup.assert_not_called() + + +class TestDoEnforceTotalBlock: + """Total-block awareness in do_enforce's one-time setup.""" + + def test_active_skips_enforce_setup_but_still_loops(self) -> None: + state = State() # no assigned game + config = Config() + with ( + patch(f"{PKG}.is_total_block_active", return_value=True), + patch(f"{PKG}._enforce_setup") as mock_setup, + patch(f"{PKG}._echo"), + patch.object(State, "load", return_value=state), + patch( + f"{PKG}._enforce_loop_iteration", + side_effect=KeyboardInterrupt, + ), + patch(f"{PKG}.time.sleep"), + ): + do_enforce(config, state) + mock_setup.assert_not_called() + + def test_active_with_no_game_does_not_early_return(self) -> None: + """Without total block, no assigned game means do_enforce returns + immediately. With it active, the loop must still run.""" + state = State() + config = Config() + with ( + patch(f"{PKG}.is_total_block_active", return_value=True), + patch(f"{PKG}._enforce_setup"), + patch(f"{PKG}._echo"), + patch.object(State, "load", return_value=state), + patch( + f"{PKG}._enforce_loop_iteration", + side_effect=KeyboardInterrupt, + ) as mock_iter, + patch(f"{PKG}.time.sleep"), + ): + do_enforce(config, state) + mock_iter.assert_called_once() + + def test_inactive_with_no_game_returns_early(self) -> None: + with ( + patch(f"{PKG}.is_total_block_active", return_value=False), + patch(f"{PKG}._echo") as mock_echo, + ): + do_enforce(Config(), State()) + output = " ".join(str(c) for c in mock_echo.call_args_list) + assert "No game" in output diff --git a/steam_backlog_enforcer/tests/test_enforcer_part2.py b/steam_backlog_enforcer/tests/test_enforcer_part2.py new file mode 100644 index 0000000..34884d0 --- /dev/null +++ b/steam_backlog_enforcer/tests/test_enforcer_part2.py @@ -0,0 +1,163 @@ +"""Tests for enforcer module — part 2 (comm-name process matching).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING +from unittest.mock import patch + +from steam_backlog_enforcer.enforcer import ( + _kill_pid_by_name, + get_pids_by_process_names, + kill_processes_by_name, +) + +if TYPE_CHECKING: + from pathlib import Path + + +class TestGetPidsByProcessNames: + """Tests for get_pids_by_process_names.""" + + def test_finds_matching_comm(self, tmp_path: Path) -> None: + proc_dir = tmp_path / "proc" + pid_dir = proc_dir / "12345" + pid_dir.mkdir(parents=True) + (pid_dir / "comm").write_text("lutris\n", encoding="utf-8") + + with patch( + "steam_backlog_enforcer.enforcer.Path", + return_value=proc_dir, + ): + result = get_pids_by_process_names(frozenset({"lutris"})) + assert result == {12345: "lutris"} + + def test_no_match_returns_empty(self, tmp_path: Path) -> None: + proc_dir = tmp_path / "proc" + pid_dir = proc_dir / "12345" + pid_dir.mkdir(parents=True) + (pid_dir / "comm").write_text("bash\n", encoding="utf-8") + + with patch( + "steam_backlog_enforcer.enforcer.Path", + return_value=proc_dir, + ): + result = get_pids_by_process_names(frozenset({"lutris"})) + assert result == {} + + def test_skips_non_digit_entries(self, tmp_path: Path) -> None: + proc_dir = tmp_path / "proc" + proc_dir.mkdir(parents=True) + (proc_dir / "self").mkdir() + + with patch( + "steam_backlog_enforcer.enforcer.Path", + return_value=proc_dir, + ): + result = get_pids_by_process_names(frozenset({"lutris"})) + assert result == {} + + def test_handles_missing_comm_file(self, tmp_path: Path) -> None: + proc_dir = tmp_path / "proc" + (proc_dir / "42").mkdir(parents=True) + # No comm file -> OSError when reading. + + with patch( + "steam_backlog_enforcer.enforcer.Path", + return_value=proc_dir, + ): + result = get_pids_by_process_names(frozenset({"lutris"})) + assert result == {} + + def test_truncates_long_names_to_15_chars(self, tmp_path: Path) -> None: + """The kernel truncates /proc/[pid]/comm to 15 chars - matching + must compare against the truncated form, not the full name.""" + proc_dir = tmp_path / "proc" + pid_dir = proc_dir / "777" + pid_dir.mkdir(parents=True) + # "EpicGamesLauncher" truncated to 15 chars is "EpicGamesLaunch". + (pid_dir / "comm").write_text("EpicGamesLaunch\n", encoding="utf-8") + + with patch( + "steam_backlog_enforcer.enforcer.Path", + return_value=proc_dir, + ): + result = get_pids_by_process_names(frozenset({"EpicGamesLauncher"})) + assert result == {777: "EpicGamesLauncher"} + + +class TestKillProcessesByName: + """Tests for kill_processes_by_name.""" + + def test_kills_matching_pids(self) -> None: + with ( + patch( + "steam_backlog_enforcer.enforcer.get_pids_by_process_names", + return_value={100: "lutris", 200: "prismlauncher"}, + ), + patch("steam_backlog_enforcer.enforcer.os.kill") as mock_kill, + ): + result = kill_processes_by_name(frozenset({"lutris", "prismlauncher"})) + assert sorted(result) == [(100, "lutris"), (200, "prismlauncher")] + assert mock_kill.call_count == 2 + + def test_no_matches_returns_empty(self) -> None: + with patch( + "steam_backlog_enforcer.enforcer.get_pids_by_process_names", + return_value={}, + ): + result = kill_processes_by_name(frozenset({"lutris"})) + assert result == [] + + def test_process_already_gone_not_included(self) -> None: + with ( + patch( + "steam_backlog_enforcer.enforcer.get_pids_by_process_names", + return_value={100: "lutris"}, + ), + patch( + "steam_backlog_enforcer.enforcer.os.kill", + side_effect=ProcessLookupError, + ), + ): + result = kill_processes_by_name(frozenset({"lutris"})) + assert result == [] + + def test_permission_error_not_included(self) -> None: + with ( + patch( + "steam_backlog_enforcer.enforcer.get_pids_by_process_names", + return_value={100: "lutris"}, + ), + patch( + "steam_backlog_enforcer.enforcer.os.kill", + side_effect=PermissionError, + ), + ): + result = kill_processes_by_name(frozenset({"lutris"})) + assert result == [] + + +class TestKillPidByName: + """Tests for _kill_pid_by_name.""" + + def test_success_returns_true(self) -> None: + with patch("steam_backlog_enforcer.enforcer.os.kill") as mock_kill: + result = _kill_pid_by_name(123, "lutris") + assert result is True + mock_kill.assert_called_once() + + def test_process_already_gone_returns_false(self) -> None: + with patch( + "steam_backlog_enforcer.enforcer.os.kill", + side_effect=ProcessLookupError, + ): + result = _kill_pid_by_name(123, "lutris") + assert result is False + + def test_permission_error_returns_false(self) -> None: + with patch( + "steam_backlog_enforcer.enforcer.os.kill", + side_effect=PermissionError, + ): + result = _kill_pid_by_name(123, "lutris") + assert result is False diff --git a/steam_backlog_enforcer/tests/test_main_part5.py b/steam_backlog_enforcer/tests/test_main_part5.py new file mode 100644 index 0000000..cc7f197 --- /dev/null +++ b/steam_backlog_enforcer/tests/test_main_part5.py @@ -0,0 +1,260 @@ +"""Tests for main CLI module — part 5 (total gaming block lock + command).""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +import sys +from unittest.mock import patch + +import pytest + +from steam_backlog_enforcer._total_block import TotalBlockStatus +from steam_backlog_enforcer.config import Config, State +from steam_backlog_enforcer.main import ( + _TOTAL_BLOCK_EXEMPT_COMMANDS, + _enforce_total_block_lock, + _show_total_block_lock_message, + cmd_block_gaming, + cmd_status, + main, +) + +PKG = "steam_backlog_enforcer.main" + +_ACTIVE_STATUS = TotalBlockStatus( + active=True, + started_at=datetime.now(timezone.utc) - timedelta(hours=1), + until=datetime.now(timezone.utc) + timedelta(hours=23), + days=1, + days_remaining=0.96, +) +_INACTIVE_STATUS = TotalBlockStatus( + active=False, started_at=None, until=None, days=0, days_remaining=0.0 +) + + +# ────────────────────────────────────────────────────────────── +# _show_total_block_lock_message +# ────────────────────────────────────────────────────────────── + + +class TestShowTotalBlockLockMessage: + def test_shows_remaining_time(self) -> None: + with patch(f"{PKG}._echo") as mock_echo: + _show_total_block_lock_message(_ACTIVE_STATUS) + output = " ".join(str(c) for c in mock_echo.call_args_list) + assert "TOTAL GAMING BLOCK ACTIVE" in output + assert "Days remaining" in output + + def test_lists_exempt_commands(self) -> None: + with patch(f"{PKG}._echo") as mock_echo: + _show_total_block_lock_message(_ACTIVE_STATUS) + output = " ".join(str(c) for c in mock_echo.call_args_list) + assert "status" in output + assert "enforce" in output + + def test_no_crash_without_until(self) -> None: + status = TotalBlockStatus( + active=True, started_at=None, until=None, days=1, days_remaining=0.5 + ) + with patch(f"{PKG}._echo"): + _show_total_block_lock_message(status) # must not raise + + +# ────────────────────────────────────────────────────────────── +# _enforce_total_block_lock +# ────────────────────────────────────────────────────────────── + + +class TestEnforceTotalBlockLock: + def test_not_active_passes(self) -> None: + with patch(f"{PKG}.is_total_block_active", return_value=False): + _enforce_total_block_lock("scan") # no exit + + def test_exempt_command_passes_while_active(self) -> None: + with patch(f"{PKG}.is_total_block_active", return_value=True): + _enforce_total_block_lock("status") # no exit + _enforce_total_block_lock("enforce") # no exit + + def test_blocked_command_exits(self) -> None: + with ( + patch(f"{PKG}.is_total_block_active", return_value=True), + patch(f"{PKG}.get_total_block_status", return_value=_ACTIVE_STATUS), + patch(f"{PKG}._show_total_block_lock_message"), + pytest.raises(SystemExit) as exc_info, + ): + _enforce_total_block_lock("scan") + assert exc_info.value.code == 1 + + def test_done_blocked_while_active(self) -> None: + """Stricter than the manual-pick lock: even 'done' is blocked.""" + with ( + patch(f"{PKG}.is_total_block_active", return_value=True), + patch(f"{PKG}.get_total_block_status", return_value=_ACTIVE_STATUS), + patch(f"{PKG}._show_total_block_lock_message"), + pytest.raises(SystemExit), + ): + _enforce_total_block_lock("done") + + def test_add_exception_blocked_while_active(self) -> None: + with ( + patch(f"{PKG}.is_total_block_active", return_value=True), + patch(f"{PKG}.get_total_block_status", return_value=_ACTIVE_STATUS), + patch(f"{PKG}._show_total_block_lock_message"), + pytest.raises(SystemExit), + ): + _enforce_total_block_lock("add-exception") + + def test_exempt_set_is_stricter_than_manual_pick(self) -> None: + assert frozenset({"status", "enforce"}) == _TOTAL_BLOCK_EXEMPT_COMMANDS + + +# ────────────────────────────────────────────────────────────── +# cmd_block_gaming +# ────────────────────────────────────────────────────────────── + + +class TestCmdBlockGaming: + def test_no_args_shows_usage(self) -> None: + with patch(f"{PKG}._echo") as mock_echo, pytest.raises(SystemExit) as exc_info: + cmd_block_gaming([]) + assert exc_info.value.code == 1 + output = " ".join(str(c) for c in mock_echo.call_args_list) + assert "Usage" in output + + def test_non_numeric_days(self) -> None: + with patch(f"{PKG}._echo"), pytest.raises(SystemExit): + cmd_block_gaming(["abc"]) + + def test_zero_days_rejected(self) -> None: + with patch(f"{PKG}._echo"), pytest.raises(SystemExit): + cmd_block_gaming(["0"]) + + def test_negative_days_rejected(self) -> None: + with patch(f"{PKG}._echo"), pytest.raises(SystemExit): + cmd_block_gaming(["-1"]) + + def test_aborted_when_not_yes(self) -> None: + with ( + patch(f"{PKG}._echo"), + patch("builtins.input", return_value="no"), + patch(f"{PKG}.start_total_block") as mock_start, + ): + cmd_block_gaming(["14"]) + mock_start.assert_not_called() + + def test_confirmed_starts_block(self) -> None: + with ( + patch(f"{PKG}._echo"), + patch("builtins.input", return_value="YES"), + patch(f"{PKG}.start_total_block", return_value=True) as mock_start, + ): + cmd_block_gaming(["14"]) + mock_start.assert_called_once_with(14) + + def test_start_failure_exits_nonzero(self) -> None: + with ( + patch(f"{PKG}._echo"), + patch("builtins.input", return_value="YES"), + patch(f"{PKG}.start_total_block", return_value=False), + pytest.raises(SystemExit) as exc_info, + ): + cmd_block_gaming(["14"]) + assert exc_info.value.code == 1 + + +# ────────────────────────────────────────────────────────────── +# main() dispatch to block-gaming +# ────────────────────────────────────────────────────────────── + + +class TestMainDispatchBlockGaming: + def test_dispatches_block_gaming(self) -> None: + argv = ["prog", "block-gaming", "14"] + with ( + patch.object(sys, "argv", argv), + patch(f"{PKG}.Config.load", return_value=Config(steam_api_key="k")), + patch(f"{PKG}.State.load", return_value=State()), + patch(f"{PKG}.is_total_block_active", return_value=False), + patch(f"{PKG}.cmd_block_gaming") as mock_cmd, + ): + main() + mock_cmd.assert_called_once_with(["14"]) + + def test_blocked_when_already_active(self) -> None: + argv = ["prog", "scan"] + with ( + patch.object(sys, "argv", argv), + patch(f"{PKG}.Config.load", return_value=Config(steam_api_key="k")), + patch(f"{PKG}.State.load", return_value=State()), + patch(f"{PKG}.is_total_block_active", return_value=True), + patch(f"{PKG}.get_total_block_status", return_value=_ACTIVE_STATUS), + patch(f"{PKG}._show_total_block_lock_message"), + pytest.raises(SystemExit) as exc_info, + ): + main() + assert exc_info.value.code == 1 + + def test_status_allowed_when_active(self) -> None: + # "status" is dispatched via the COMMANDS dict, which captures the + # cmd_status function reference at import time - patching + # main.cmd_status would not intercept it. Verify real behavior + # (no SystemExit, real status output) instead. + argv = ["prog", "status"] + with ( + patch.object(sys, "argv", argv), + patch(f"{PKG}.Config.load", return_value=Config(steam_api_key="k")), + patch(f"{PKG}.State.load", return_value=State()), + patch(f"{PKG}.is_total_block_active", return_value=True), + patch(f"{PKG}.is_store_blocked", return_value=False), + patch(f"{PKG}.get_installed_games", return_value=[]), + patch(f"{PKG}._echo") as mock_echo, + ): + main() # must not raise SystemExit + output = " ".join(str(c) for c in mock_echo.call_args_list) + assert "Steam Backlog Enforcer" in output + + +# ────────────────────────────────────────────────────────────── +# cmd_status shows total block info +# ────────────────────────────────────────────────────────────── + + +class TestCmdStatusTotalBlock: + def test_shows_total_block_when_active(self) -> None: + with ( + patch(f"{PKG}.get_total_block_status", return_value=_ACTIVE_STATUS), + patch(f"{PKG}.is_store_blocked", return_value=False), + patch(f"{PKG}.get_installed_games", return_value=[]), + patch(f"{PKG}._echo") as mock_echo, + ): + cmd_status(Config(), State()) + output = " ".join(str(c) for c in mock_echo.call_args_list) + assert "TOTAL GAMING BLOCK ACTIVE" in output + assert "Days remaining" in output + + def test_no_total_block_section_when_inactive(self) -> None: + with ( + patch(f"{PKG}.get_total_block_status", return_value=_INACTIVE_STATUS), + patch(f"{PKG}.is_store_blocked", return_value=False), + patch(f"{PKG}.get_installed_games", return_value=[]), + patch(f"{PKG}._echo") as mock_echo, + ): + cmd_status(Config(), State()) + output = " ".join(str(c) for c in mock_echo.call_args_list) + assert "TOTAL GAMING BLOCK" not in output + + def test_active_without_until_skips_remaining_time(self) -> None: + status = TotalBlockStatus( + active=True, started_at=None, until=None, days=1, days_remaining=0.5 + ) + with ( + patch(f"{PKG}.get_total_block_status", return_value=status), + patch(f"{PKG}.is_store_blocked", return_value=False), + patch(f"{PKG}.get_installed_games", return_value=[]), + patch(f"{PKG}._echo") as mock_echo, + ): + cmd_status(Config(), State()) + output = " ".join(str(c) for c in mock_echo.call_args_list) + assert "TOTAL GAMING BLOCK ACTIVE" in output + assert "Days remaining" not in output diff --git a/steam_backlog_enforcer/tests/test_store_blocker_part2.py b/steam_backlog_enforcer/tests/test_store_blocker_part2.py index 53d3441..6c3f4d4 100644 --- a/steam_backlog_enforcer/tests/test_store_blocker_part2.py +++ b/steam_backlog_enforcer/tests/test_store_blocker_part2.py @@ -3,7 +3,7 @@ from __future__ import annotations from typing import TYPE_CHECKING -from unittest.mock import MagicMock, patch +from unittest.mock import patch from steam_backlog_enforcer.store_blocker import ( _disable_hosts_protection, @@ -33,54 +33,23 @@ class TestSudoWriteHosts: class TestDisableHostsProtection: """Tests for _disable_hosts_protection.""" - def test_stops_services_unmounts_chattr(self) -> None: - findmnt_found = MagicMock(returncode=0) - - def run_side_effect( - cmd: list[str], - **_kwargs: object, - ) -> MagicMock: - if any("findmnt" in str(c) for c in cmd): - return findmnt_found - return MagicMock(returncode=0) - - with patch(f"{PKG}.subprocess.run", side_effect=run_side_effect): - _disable_hosts_protection() - - def test_no_bind_mount(self) -> None: - findmnt_missing = MagicMock(returncode=1) - - def run_side_effect( - cmd: list[str], - **_kwargs: object, - ) -> MagicMock: - if any("findmnt" in str(c) for c in cmd): - return findmnt_missing - return MagicMock(returncode=0) - - with patch(f"{PKG}.subprocess.run", side_effect=run_side_effect): + def test_calls_guardctl_pacman_unlock(self) -> None: + with patch(f"{PKG}.subprocess.run") as mock_run: _disable_hosts_protection() + mock_run.assert_called_once() + cmd = mock_run.call_args.args[0] + assert cmd[-3:] == ["file-guard", "pacman-unlock", "hosts"] class TestEnableHostsProtection: """Tests for _enable_hosts_protection.""" - def test_with_locked_copy(self, tmp_path: Path) -> None: - locked_copy = tmp_path / "locked-hosts" - locked_copy.touch() - with ( - patch(f"{PKG}.subprocess.run"), - patch(f"{PKG}._LOCKED_HOSTS_COPY", locked_copy), - ): - _enable_hosts_protection() - - def test_without_locked_copy(self, tmp_path: Path) -> None: - locked_copy = tmp_path / "nonexistent" - with ( - patch(f"{PKG}.subprocess.run"), - patch(f"{PKG}._LOCKED_HOSTS_COPY", locked_copy), - ): + def test_calls_guardctl_sync(self) -> None: + with patch(f"{PKG}.subprocess.run") as mock_run: _enable_hosts_protection() + mock_run.assert_called_once() + cmd = mock_run.call_args.args[0] + assert cmd[-3:] == ["file-guard", "sync", "hosts"] class TestUnblockHosts: diff --git a/steam_backlog_enforcer/tests/test_total_block.py b/steam_backlog_enforcer/tests/test_total_block.py new file mode 100644 index 0000000..65a5f15 --- /dev/null +++ b/steam_backlog_enforcer/tests/test_total_block.py @@ -0,0 +1,736 @@ +"""Tests for _total_block module (block-gaming total block). + +`TOTAL_BLOCK_LOCK_FILE`, `_IPTABLES_IP_CACHE_FILE`, and `HOSTS_FILE` are +patched per-test by the `paths` fixture below to tmp_path locations, +overriding conftest's own autouse patch with paths the tests can read and +write directly. Tests must never do `from ... import TOTAL_BLOCK_LOCK_FILE` +- that captures the value at file-import time, before any patch is active, +and silently ignores every subsequent patch (the same binding gotcha +documented in this repo's own conftest.py for HLTB_CACHE_FILE). +""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +import json +from typing import TYPE_CHECKING +from unittest.mock import MagicMock, patch + +import pytest + +from steam_backlog_enforcer._total_block import ( + _HOSTS_BLOCK_BEGIN, + _HOSTS_BLOCK_END, + IPTABLES_CHAIN, + _apply_total_block_hosts, + _apply_total_block_iptables, + _iptables_chain_intact, + _is_steam_installed, + _kill_and_uninstall_launchers, + _kill_steam_and_launchers, + _load_cached_ips, + _pacman_owner, + _remove_total_block_hosts, + _remove_total_block_iptables, + _save_cached_ips, + _uninstall_package, + _uninstall_steam_package, + end_total_block_cleanup, + enforce_total_block_tick, + get_total_block_status, + is_total_block_active, + start_total_block, + total_block_needs_cleanup, +) + +if TYPE_CHECKING: + from collections.abc import Iterator + from pathlib import Path + +PKG = "steam_backlog_enforcer._total_block" + + +@dataclass +class _Paths: + lock_file: Path + ip_cache_file: Path + hosts_file: Path + + +@pytest.fixture(autouse=True) +def paths(tmp_path: Path) -> Iterator[_Paths]: + """Redirect the module's path constants to tmp_path for every test.""" + paths = _Paths( + lock_file=tmp_path / "total_block_lock.json", + ip_cache_file=tmp_path / "total_block_ip_cache.json", + hosts_file=tmp_path / "hosts", + ) + with ( + patch(f"{PKG}.TOTAL_BLOCK_LOCK_FILE", paths.lock_file), + patch(f"{PKG}._IPTABLES_IP_CACHE_FILE", paths.ip_cache_file), + patch(f"{PKG}.HOSTS_FILE", paths.hosts_file), + ): + yield paths + + +def _write_lock(paths: _Paths, started_at: float, until: float, days: int = 1) -> None: + paths.lock_file.parent.mkdir(parents=True, exist_ok=True) + paths.lock_file.write_text( + json.dumps({"started_at": started_at, "until": until, "days": days}), + encoding="utf-8", + ) + + +_NOW = datetime.now(timezone.utc).timestamp() + + +# ────────────────────────────────────────────────────────────── +# Lock reading / status +# ────────────────────────────────────────────────────────────── + + +class TestIsTotalBlockActive: + def test_no_lock_file(self) -> None: + assert is_total_block_active() is False + + def test_active_lock(self, paths: _Paths) -> None: + _write_lock(paths, _NOW, _NOW + 3600) + assert is_total_block_active() is True + + def test_expired_lock(self, paths: _Paths) -> None: + _write_lock(paths, _NOW - 3600, _NOW - 1) + assert is_total_block_active() is False + + def test_malformed_json(self, paths: _Paths) -> None: + paths.lock_file.parent.mkdir(parents=True, exist_ok=True) + paths.lock_file.write_text("not json", encoding="utf-8") + assert is_total_block_active() is False + + def test_non_dict_json(self, paths: _Paths) -> None: + paths.lock_file.parent.mkdir(parents=True, exist_ok=True) + paths.lock_file.write_text("[1, 2, 3]", encoding="utf-8") + assert is_total_block_active() is False + + def test_missing_until_key(self, paths: _Paths) -> None: + paths.lock_file.parent.mkdir(parents=True, exist_ok=True) + paths.lock_file.write_text(json.dumps({"days": 1}), encoding="utf-8") + assert is_total_block_active() is False + + def test_non_numeric_until(self, paths: _Paths) -> None: + paths.lock_file.parent.mkdir(parents=True, exist_ok=True) + paths.lock_file.write_text( + json.dumps({"until": "not-a-number"}), encoding="utf-8" + ) + assert is_total_block_active() is False + + +class TestTotalBlockNeedsCleanup: + def test_no_lock_file(self) -> None: + assert total_block_needs_cleanup() is False + + def test_active_lock_no_cleanup_needed(self, paths: _Paths) -> None: + _write_lock(paths, _NOW, _NOW + 3600) + assert total_block_needs_cleanup() is False + + def test_expired_lock_needs_cleanup(self, paths: _Paths) -> None: + _write_lock(paths, _NOW - 3600, _NOW - 1) + assert total_block_needs_cleanup() is True + + +class TestGetTotalBlockStatus: + def test_no_lock(self) -> None: + status = get_total_block_status() + assert status.active is False + assert status.started_at is None + assert status.until is None + assert status.days == 0 + assert status.days_remaining == 0.0 + + def test_active_lock(self, paths: _Paths) -> None: + _write_lock(paths, _NOW, _NOW + 86400, days=1) + status = get_total_block_status() + assert status.active is True + assert status.days == 1 + assert 0.0 < status.days_remaining <= 1.0 + assert status.started_at is not None + assert status.until is not None + + def test_expired_lock(self, paths: _Paths) -> None: + _write_lock(paths, _NOW - 7200, _NOW - 3600, days=1) + status = get_total_block_status() + assert status.active is False + assert status.days_remaining == 0.0 + + def test_malformed_json_returns_inactive(self, paths: _Paths) -> None: + paths.lock_file.parent.mkdir(parents=True, exist_ok=True) + paths.lock_file.write_text("garbage", encoding="utf-8") + status = get_total_block_status() + assert status.active is False + + def test_non_int_days_defaults_to_zero(self, paths: _Paths) -> None: + paths.lock_file.parent.mkdir(parents=True, exist_ok=True) + paths.lock_file.write_text( + json.dumps({"started_at": _NOW, "until": _NOW + 3600, "days": "one"}), + encoding="utf-8", + ) + status = get_total_block_status() + assert status.days == 0 + + +# ────────────────────────────────────────────────────────────── +# Process killing +# ────────────────────────────────────────────────────────────── + + +class TestKillSteamAndLaunchers: + def test_combines_steam_and_launcher_kills(self) -> None: + with ( + patch(f"{PKG}.kill_processes_by_name", return_value=[(1, "steam")]), + patch( + f"{PKG}._kill_and_uninstall_launchers", + return_value=[(2, "prismlauncher")], + ) as mock_launchers, + ): + result = _kill_steam_and_launchers() + assert result == [(1, "steam"), (2, "prismlauncher")] + mock_launchers.assert_called_once() + + +class TestPacmanOwner: + def test_owned_path_returns_package_name(self) -> None: + result = MagicMock( + returncode=0, + stdout="/usr/bin/prismlauncher is owned by prismlauncher-git 11.0.0-1\n", + ) + with patch(f"{PKG}.subprocess.run", return_value=result): + assert _pacman_owner("/usr/bin/prismlauncher") == "prismlauncher-git" + + def test_unowned_path_returns_none(self) -> None: + result = MagicMock(returncode=1, stdout="") + with patch(f"{PKG}.subprocess.run", return_value=result): + assert _pacman_owner("/opt/foo/bar") is None + + def test_unexpected_output_format_returns_none(self) -> None: + result = MagicMock(returncode=0, stdout="something unexpected\n") + with patch(f"{PKG}.subprocess.run", return_value=result): + assert _pacman_owner("/usr/bin/x") is None + + +class TestUninstallPackage: + def test_success(self) -> None: + with patch( + f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0, stderr="") + ): + assert _uninstall_package("foo") is True + + def test_already_absent_treated_as_success(self) -> None: + with patch( + f"{PKG}.subprocess.run", + return_value=MagicMock(returncode=1, stderr="error: target not found: foo"), + ): + assert _uninstall_package("foo") is True + + def test_real_failure_returns_false(self) -> None: + with patch( + f"{PKG}.subprocess.run", + return_value=MagicMock(returncode=1, stderr="some other error"), + ): + assert _uninstall_package("foo") is False + + def test_subprocess_error_returns_false(self) -> None: + with patch(f"{PKG}.subprocess.run", side_effect=OSError): + assert _uninstall_package("foo") is False + + +class TestKillAndUninstallLaunchers: + def test_no_launchers_running(self) -> None: + with ( + patch(f"{PKG}.get_pids_by_process_names", return_value={}), + patch(f"{PKG}.kill_processes_by_name", return_value=[]), + ): + assert _kill_and_uninstall_launchers() == [] + + def test_kills_and_uninstalls_owned_package(self) -> None: + with ( + patch( + f"{PKG}.get_pids_by_process_names", + return_value={123: "prismlauncher"}, + ), + patch(f"{PKG}.Path") as mock_path_cls, + patch( + f"{PKG}.kill_processes_by_name", + return_value=[(123, "prismlauncher")], + ), + patch(f"{PKG}._pacman_owner", return_value="prismlauncher-git"), + patch(f"{PKG}._uninstall_package", return_value=True) as mock_uninstall, + ): + mock_path_cls.return_value.resolve.return_value = "/usr/bin/prismlauncher" + result = _kill_and_uninstall_launchers() + assert result == [(123, "prismlauncher")] + mock_uninstall.assert_called_once_with("prismlauncher-git") + + def test_exe_path_unreadable_skips_uninstall(self) -> None: + with ( + patch( + f"{PKG}.get_pids_by_process_names", + return_value={123: "prismlauncher"}, + ), + patch(f"{PKG}.Path") as mock_path_cls, + patch( + f"{PKG}.kill_processes_by_name", + return_value=[(123, "prismlauncher")], + ), + patch(f"{PKG}._pacman_owner") as mock_owner, + patch(f"{PKG}._uninstall_package") as mock_uninstall, + ): + mock_path_cls.return_value.resolve.side_effect = OSError + result = _kill_and_uninstall_launchers() + assert result == [(123, "prismlauncher")] + mock_owner.assert_not_called() + mock_uninstall.assert_not_called() + + def test_unowned_package_not_uninstalled(self) -> None: + with ( + patch(f"{PKG}.get_pids_by_process_names", return_value={123: "custom"}), + patch(f"{PKG}.Path") as mock_path_cls, + patch(f"{PKG}.kill_processes_by_name", return_value=[(123, "custom")]), + patch(f"{PKG}._pacman_owner", return_value=None), + patch(f"{PKG}._uninstall_package") as mock_uninstall, + ): + mock_path_cls.return_value.resolve.return_value = "/opt/custom/launcher" + _kill_and_uninstall_launchers() + mock_uninstall.assert_not_called() + + def test_uninstall_failure_is_logged_not_raised(self) -> None: + with ( + patch( + f"{PKG}.get_pids_by_process_names", + return_value={123: "prismlauncher"}, + ), + patch(f"{PKG}.Path") as mock_path_cls, + patch( + f"{PKG}.kill_processes_by_name", + return_value=[(123, "prismlauncher")], + ), + patch(f"{PKG}._pacman_owner", return_value="prismlauncher-git"), + patch(f"{PKG}._uninstall_package", return_value=False), + ): + mock_path_cls.return_value.resolve.return_value = "/usr/bin/prismlauncher" + _kill_and_uninstall_launchers() # must not raise + + +# ────────────────────────────────────────────────────────────── +# Steam package removal +# ────────────────────────────────────────────────────────────── + + +class TestIsSteamInstalled: + def test_installed(self) -> None: + with patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0)): + assert _is_steam_installed() is True + + def test_not_installed(self) -> None: + with patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=1)): + assert _is_steam_installed() is False + + +class TestUninstallSteamPackage: + def test_success(self) -> None: + with patch( + f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0, stderr="") + ): + assert _uninstall_steam_package() is True + + def test_already_absent_treated_as_success(self) -> None: + with patch( + f"{PKG}.subprocess.run", + return_value=MagicMock( + returncode=1, stderr="error: target not found: steam" + ), + ): + assert _uninstall_steam_package() is True + + def test_real_failure_returns_false(self) -> None: + with patch( + f"{PKG}.subprocess.run", + return_value=MagicMock(returncode=1, stderr="some other error"), + ): + assert _uninstall_steam_package() is False + + def test_subprocess_error_returns_false(self) -> None: + with patch(f"{PKG}.subprocess.run", side_effect=OSError): + assert _uninstall_steam_package() is False + + +# ────────────────────────────────────────────────────────────── +# Hosts domain blocking +# ────────────────────────────────────────────────────────────── + + +class TestApplyTotalBlockHosts: + def test_appends_block_when_absent(self, paths: _Paths) -> None: + paths.hosts_file.write_text("127.0.0.1 localhost\n", encoding="utf-8") + with ( + patch(f"{PKG}._disable_hosts_protection"), + patch(f"{PKG}._enable_hosts_protection"), + patch(f"{PKG}._sudo_write_hosts") as mock_write, + ): + assert _apply_total_block_hosts() is True + written = mock_write.call_args.args[0] + assert _HOSTS_BLOCK_BEGIN in written + assert _HOSTS_BLOCK_END in written + assert "steamcommunity.com" in written + + def test_already_present_is_noop(self, paths: _Paths) -> None: + paths.hosts_file.write_text( + f"127.0.0.1 localhost\n{_HOSTS_BLOCK_BEGIN}" + f"0.0.0.0 x.com\n{_HOSTS_BLOCK_END}", + encoding="utf-8", + ) + with patch(f"{PKG}._sudo_write_hosts") as mock_write: + assert _apply_total_block_hosts() is True + mock_write.assert_not_called() + + def test_missing_hosts_file_returns_false(self) -> None: + assert _apply_total_block_hosts() is False + + def test_write_failure_still_reenables_protection(self, paths: _Paths) -> None: + paths.hosts_file.write_text("127.0.0.1 localhost\n", encoding="utf-8") + with ( + patch(f"{PKG}._disable_hosts_protection"), + patch(f"{PKG}._enable_hosts_protection") as mock_enable, + patch(f"{PKG}._sudo_write_hosts", side_effect=OSError), + ): + assert _apply_total_block_hosts() is False + mock_enable.assert_called_once() + + +class TestRemoveTotalBlockHosts: + def test_removes_block_when_present(self, paths: _Paths) -> None: + paths.hosts_file.write_text( + f"127.0.0.1 localhost\n{_HOSTS_BLOCK_BEGIN}" + f"0.0.0.0 x.com\n{_HOSTS_BLOCK_END}" + "192.168.1.1 router\n", + encoding="utf-8", + ) + with ( + patch(f"{PKG}._disable_hosts_protection"), + patch(f"{PKG}._enable_hosts_protection"), + patch(f"{PKG}._sudo_write_hosts") as mock_write, + ): + assert _remove_total_block_hosts() is True + written = mock_write.call_args.args[0] + assert _HOSTS_BLOCK_BEGIN not in written + assert "router" in written + assert "localhost" in written + + def test_absent_is_noop(self, paths: _Paths) -> None: + paths.hosts_file.write_text("127.0.0.1 localhost\n", encoding="utf-8") + with patch(f"{PKG}._sudo_write_hosts") as mock_write: + assert _remove_total_block_hosts() is True + mock_write.assert_not_called() + + def test_missing_hosts_file_returns_false(self) -> None: + assert _remove_total_block_hosts() is False + + def test_write_failure_still_reenables_protection(self, paths: _Paths) -> None: + paths.hosts_file.write_text( + f"127.0.0.1 localhost\n{_HOSTS_BLOCK_BEGIN}" + f"0.0.0.0 x.com\n{_HOSTS_BLOCK_END}", + encoding="utf-8", + ) + with ( + patch(f"{PKG}._disable_hosts_protection"), + patch(f"{PKG}._enable_hosts_protection") as mock_enable, + patch(f"{PKG}._sudo_write_hosts", side_effect=OSError), + ): + assert _remove_total_block_hosts() is False + mock_enable.assert_called_once() + + +# ────────────────────────────────────────────────────────────── +# IP cache +# ────────────────────────────────────────────────────────────── + + +class TestIpCache: + def test_load_no_file_returns_empty(self) -> None: + assert _load_cached_ips() == set() + + def test_save_then_load_round_trips(self) -> None: + _save_cached_ips({"1.2.3.4", "5.6.7.8"}) + assert _load_cached_ips() == {"1.2.3.4", "5.6.7.8"} + + def test_load_malformed_json_returns_empty(self, paths: _Paths) -> None: + paths.ip_cache_file.parent.mkdir(parents=True, exist_ok=True) + paths.ip_cache_file.write_text("not json", encoding="utf-8") + assert _load_cached_ips() == set() + + def test_load_non_list_json_returns_empty(self, paths: _Paths) -> None: + paths.ip_cache_file.parent.mkdir(parents=True, exist_ok=True) + paths.ip_cache_file.write_text(json.dumps({"a": 1}), encoding="utf-8") + assert _load_cached_ips() == set() + + +# ────────────────────────────────────────────────────────────── +# iptables +# ────────────────────────────────────────────────────────────── + + +class TestIptablesChainIntact: + def test_missing_chain_returns_false(self) -> None: + with patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=1)): + assert _iptables_chain_intact({"1.2.3.4"}) is False + + def test_missing_ip_returns_false(self) -> None: + listing = MagicMock( + returncode=0, + stdout="-N STEAM_TOTAL_BLOCK\n-A STEAM_TOTAL_BLOCK -d 9.9.9.9/32 -j DROP\n", + ) + with patch(f"{PKG}.subprocess.run", return_value=listing): + assert _iptables_chain_intact({"1.2.3.4"}) is False + + def test_all_ips_present_and_hooked_returns_true(self) -> None: + listing = MagicMock( + returncode=0, + stdout=( + "-N STEAM_TOTAL_BLOCK\n" + "-A STEAM_TOTAL_BLOCK -d 1.2.3.4/32 -j DROP\n" + "-A STEAM_TOTAL_BLOCK -d 5.6.7.8/32 -j DROP\n" + ), + ) + hook_check = MagicMock(returncode=0) + with patch(f"{PKG}.subprocess.run", side_effect=[listing, hook_check]): + assert _iptables_chain_intact({"1.2.3.4", "5.6.7.8"}) is True + + def test_ips_present_but_not_hooked_returns_false(self) -> None: + listing = MagicMock( + returncode=0, + stdout="-A STEAM_TOTAL_BLOCK -d 1.2.3.4/32 -j DROP\n", + ) + hook_check = MagicMock(returncode=1) + with patch(f"{PKG}.subprocess.run", side_effect=[listing, hook_check]): + assert _iptables_chain_intact({"1.2.3.4"}) is False + + def test_malformed_trailing_d_flag_is_ignored(self) -> None: + """A `-d` token with nothing after it (malformed/truncated rule + line) must not index past the end of `parts`.""" + listing = MagicMock( + returncode=0, + stdout="-A STEAM_TOTAL_BLOCK -j DROP -d\n", + ) + with patch(f"{PKG}.subprocess.run", return_value=listing): + assert _iptables_chain_intact({"1.2.3.4"}) is False + + +class TestApplyTotalBlockIptables: + def test_intact_chain_short_circuits(self) -> None: + _save_cached_ips({"1.2.3.4"}) + with ( + patch(f"{PKG}._iptables_chain_intact", return_value=True), + patch(f"{PKG}.subprocess.run") as mock_run, + ): + assert _apply_total_block_iptables() is True + mock_run.assert_not_called() + + def test_rebuilds_when_not_intact(self) -> None: + with ( + patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0)), + patch( + f"{PKG}.socket.getaddrinfo", + return_value=[(None, None, None, None, ("9.9.9.9", 443))], + ), + ): + assert _apply_total_block_iptables() is True + assert "9.9.9.9" in _load_cached_ips() + + def test_dns_failure_skips_that_domain(self) -> None: + import socket as real_socket + + with ( + patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0)), + patch(f"{PKG}.socket.getaddrinfo", side_effect=real_socket.gaierror), + ): + assert _apply_total_block_iptables() is True + assert _load_cached_ips() == set() + + def test_subprocess_error_returns_false(self) -> None: + with ( + patch( + f"{PKG}.subprocess.run", + side_effect=[MagicMock(returncode=0), OSError], + ), + patch(f"{PKG}.socket.getaddrinfo", return_value=[]), + ): + assert _apply_total_block_iptables() is False + + def test_inserts_output_hook_when_missing(self) -> None: + def run_side_effect(cmd: list[str], **_kwargs: object) -> MagicMock: + if "-C" in cmd: + return MagicMock(returncode=1) + return MagicMock(returncode=0) + + with ( + patch(f"{PKG}.subprocess.run", side_effect=run_side_effect), + patch(f"{PKG}.socket.getaddrinfo", return_value=[]), + ): + assert _apply_total_block_iptables() is True + + +class TestRemoveTotalBlockIptables: + def test_removes_chain_and_cache(self, paths: _Paths) -> None: + _save_cached_ips({"1.2.3.4"}) + with patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0)): + assert _remove_total_block_iptables() is True + assert not paths.ip_cache_file.exists() + + def test_no_cache_file_is_fine(self) -> None: + with patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0)): + assert _remove_total_block_iptables() is True + + def test_subprocess_error_returns_false(self) -> None: + with patch(f"{PKG}.subprocess.run", side_effect=OSError): + assert _remove_total_block_iptables() is False + + +# ────────────────────────────────────────────────────────────── +# Public lifecycle API +# ────────────────────────────────────────────────────────────── + + +class TestStartTotalBlock: + def test_success(self) -> None: + with ( + patch( + f"{PKG}.subprocess.run", + return_value=MagicMock(returncode=0, stderr=""), + ), + patch(f"{PKG}._kill_steam_and_launchers", return_value=[]), + patch(f"{PKG}._uninstall_steam_package", return_value=True), + patch(f"{PKG}._apply_total_block_hosts", return_value=True), + patch(f"{PKG}._apply_total_block_iptables", return_value=True), + patch(f"{PKG}.flush_dns_cache"), + ): + assert start_total_block(1) is True + + def test_package_block_start_failure_aborts(self) -> None: + with patch( + f"{PKG}.subprocess.run", + return_value=MagicMock(returncode=1, stderr="guardctl error"), + ): + assert start_total_block(1) is False + + def test_best_effort_steps_dont_block_success(self) -> None: + """Even if kill/uninstall/hosts/iptables all fail, the lock + registering successfully is what start_total_block reports.""" + with ( + patch( + f"{PKG}.subprocess.run", + return_value=MagicMock(returncode=0, stderr=""), + ), + patch(f"{PKG}._kill_steam_and_launchers", return_value=[]), + patch(f"{PKG}._uninstall_steam_package", return_value=False), + patch(f"{PKG}._apply_total_block_hosts", return_value=False), + patch(f"{PKG}._apply_total_block_iptables", return_value=False), + patch(f"{PKG}.flush_dns_cache"), + ): + assert start_total_block(1) is True + + def test_logs_when_processes_were_killed(self) -> None: + with ( + patch( + f"{PKG}.subprocess.run", + return_value=MagicMock(returncode=0, stderr=""), + ), + patch(f"{PKG}._kill_steam_and_launchers", return_value=[(1, "steam")]), + patch(f"{PKG}._uninstall_steam_package", return_value=True), + patch(f"{PKG}._apply_total_block_hosts", return_value=True), + patch(f"{PKG}._apply_total_block_iptables", return_value=True), + patch(f"{PKG}.flush_dns_cache"), + ): + assert start_total_block(1) is True + + +class TestEnforceTotalBlockTick: + def test_reinstalls_steam_if_reappeared(self) -> None: + with ( + patch(f"{PKG}._kill_steam_and_launchers", return_value=[]), + patch(f"{PKG}._is_steam_installed", return_value=True), + patch(f"{PKG}._uninstall_steam_package") as mock_uninstall, + patch(f"{PKG}._apply_total_block_hosts", return_value=True), + patch(f"{PKG}._apply_total_block_iptables", return_value=True), + ): + enforce_total_block_tick() + mock_uninstall.assert_called_once() + + def test_no_reinstall_when_steam_absent(self) -> None: + with ( + patch(f"{PKG}._kill_steam_and_launchers", return_value=[]), + patch(f"{PKG}._is_steam_installed", return_value=False), + patch(f"{PKG}._uninstall_steam_package") as mock_uninstall, + patch(f"{PKG}._apply_total_block_hosts", return_value=True), + patch(f"{PKG}._apply_total_block_iptables", return_value=True), + ): + enforce_total_block_tick() + mock_uninstall.assert_not_called() + + +class TestEndTotalBlockCleanup: + def test_ends_lock_and_removes_blocks(self) -> None: + with ( + patch( + f"{PKG}.subprocess.run", + return_value=MagicMock(returncode=0, stderr=""), + ), + patch(f"{PKG}._remove_total_block_hosts", return_value=True) as mock_hosts, + patch(f"{PKG}._remove_total_block_iptables", return_value=True) as mock_ipt, + patch(f"{PKG}.flush_dns_cache"), + ): + end_total_block_cleanup() + mock_hosts.assert_called_once() + mock_ipt.assert_called_once() + + def test_package_block_end_failure_still_cleans_up_rest(self) -> None: + with ( + patch( + f"{PKG}.subprocess.run", + return_value=MagicMock(returncode=1, stderr="already ended"), + ), + patch(f"{PKG}._remove_total_block_hosts", return_value=True) as mock_hosts, + patch(f"{PKG}._remove_total_block_iptables", return_value=True) as mock_ipt, + patch(f"{PKG}.flush_dns_cache"), + ): + end_total_block_cleanup() + mock_hosts.assert_called_once() + mock_ipt.assert_called_once() + + def test_hosts_removal_failure_is_logged_not_raised(self) -> None: + with ( + patch( + f"{PKG}.subprocess.run", + return_value=MagicMock(returncode=0, stderr=""), + ), + patch(f"{PKG}._remove_total_block_hosts", return_value=False), + patch(f"{PKG}._remove_total_block_iptables", return_value=True), + patch(f"{PKG}.flush_dns_cache"), + ): + end_total_block_cleanup() # must not raise + + def test_iptables_removal_failure_is_logged_not_raised(self) -> None: + with ( + patch( + f"{PKG}.subprocess.run", + return_value=MagicMock(returncode=0, stderr=""), + ), + patch(f"{PKG}._remove_total_block_hosts", return_value=True), + patch(f"{PKG}._remove_total_block_iptables", return_value=False), + patch(f"{PKG}.flush_dns_cache"), + ): + end_total_block_cleanup() # must not raise + + +# Sanity: the module-level chain name constant is what everything above +# assumes when constructing fake iptables -S output. +def test_iptables_chain_name_constant() -> None: + assert IPTABLES_CHAIN == "STEAM_TOTAL_BLOCK"