feat: add block-gaming command (Stage 4) + guard-lib migration cleanup
Some checks are pending
pre-commit / pre-commit (push) Waiting to run
Tests / test (3.10) (push) Waiting to run
Tests / test (3.11) (push) Waiting to run
Tests / test (3.12) (push) Waiting to run

Adds `block-gaming <days>`: uninstalls Steam, kills/uninstalls known game
launchers, and blocks Steam + game-website domains (hosts + iptables) for a
fixed number of days with no in-app way to lift it early. Enforcement is
tamper-resistant via guard-lib's package-block (bind-mounted lock file) and
re-asserted every enforce tick.

Also migrates store_blocker.py's hosts-file locking from raw chattr/mount
calls to guard-lib's file-guard, using the new `sync` subcommand (not
`pacman-relock`) so our own legitimate edits aren't reverted as drift.

Fixes found during live verification:
- iptables never blocked real IPs because DNS was resolved after /etc/hosts
  already redirected every blocked domain to 0.0.0.0 locally - reordered so
  iptables resolves first.
- Game-website blocks only covered bare apex domains; sites that
  301-redirect to www (e.g. newgrounds.com) sailed right through - added
  automatic www. variant generation.
- Launchers (e.g. prismlauncher) were only killed, never uninstalled -
  added best-effort pacman-package removal keyed off /proc/<pid>/exe.

Co-Authored-By: Claude Sonnet 5 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01AFNiYQQgSLAkiBXswyimPq
This commit is contained in:
Krzysztof kuhy Rudnicki 2026-07-04 11:45:54 +02:00
parent 7ac07c4b7a
commit 7554b58ab7
14 changed files with 2279 additions and 126 deletions

View File

@ -1,3 +1,16 @@
do NOT run tests unless specifically instructed to do so or before commiting
ALWAYS confirm that the feature you add / bug you fixed behaves as it should by running the program after your changes (not tests!) and inspecting output comparing it with what user wanted, after confirming by yourself ask user if the program behaves as they indendent
After running tests fix all coverage gaps and issues, do not ignore unless specifically instructed to do so
do NOT run tests unless specifically instructed to do so or before committing
If tests fail on the same issue twice in a row, STOP and ask the user how to proceed instead of continuing to fix and retry.
ALWAYS confirm that the feature you add / bug you fixed behaves as it should by running the program after your changes (not tests!) and inspecting output comparing it with what user wanted, after confirming by yourself ask user if the program behaves as they intended
After running tests fix all coverage gaps and issues, do not ignore unless specifically instructed to do so
@/home/kuhy/.claude/rules/typescript-5-es2022.instructions.md
# Steam Backlog Enforcer Notes
- Fixed in commit 8b7bdb6: conftest.py safety net redirects all filesystem paths (STEAMAPPS_PATH, CONFIG_DIR, STATE_FILE, etc.) to tmp_path. Tests are safe to run without asking about state.json first.
- The pre-commit `pytest-coverage` hook is currently broken (measures all of python_pkg at 100%, not just the changed subpackage). There's an in-progress fix via `scripts/pytest_changed_packages.py` + `.pre-commit-config.yaml` change that still needs lint fixes.
- Clearing `hltb_cache.json` alone is not enough for `run.sh`/`done` reassignment: `snapshot.json` also stores `completionist_hours`, and stale snapshot values can still drive reassignment decisions unless refreshed.
- After fixing Steam Backlog Enforcer logic, always run a live verification pass with `python_pkg/steam_backlog_enforcer/run.sh` (or equivalent command) before declaring the fix done.
- cmd_done completion path can pick next game from snapshot-only hours; keep it aligned with HLTB cache/refresh before pick_next_game to avoid prologue-derived stale times (e.g., A Space 0.56h while cache has ~20h).
- HLTB renames games (e.g., "Needy Streamer Overload" → "NEEDY GIRL OVERDOSE"). The old name lives in `game_alias`. Both `game_name` and `game_alias` must be checked when matching — fixed in `_pick_best_hltb_entry`.
- **ALWAYS clear HLTB cache and re-run `run.sh` after changing the HLTB picking/matching algorithm.** Delete `~/.config/steam_backlog_enforcer/hltb_cache.json` (entire file, not just one entry) so all games get re-matched with the new logic. Then run `./run.sh` to verify correct results. Stale cache entries from the old algorithm will persist and hide bugs otherwise.

View File

@ -12,6 +12,26 @@ echo "Installing Python dependencies..."
pip3 install --break-system-packages requests howlongtobeatpy 2>/dev/null \
|| pip3 install requests howlongtobeatpy
# 'block-gaming' depends on guard-lib (guardctl) for tamper-resistant
# locking. Not fatal if missing - the rest of this tool works without it.
echo
echo "Checking for guard-lib (required by 'block-gaming')..."
if command -v guardctl >/dev/null 2>&1; then
echo "guardctl found on PATH."
elif [[ -x "$HOME/guard-lib/install.sh" ]]; then
echo "guardctl not found - installing guard-lib from $HOME/guard-lib..."
if [[ $EUID -eq 0 ]]; then
bash "$HOME/guard-lib/install.sh"
else
echo "guard-lib install needs root: sudo bash \"$HOME/guard-lib/install.sh\""
echo "('block-gaming' will not work until that is done; the rest of this tool is unaffected.)"
fi
else
echo "Warning: guardctl not found and ~/guard-lib is not present."
echo "'block-gaming' requires guard-lib - set up ~/guard-lib and run its install.sh, then re-run this installer."
echo "(The rest of this tool is unaffected.)"
fi
# Install systemd service (system-level, runs as root).
read -rp "Install systemd enforce service? [y/N] " ans
if [[ "${ans,,}" == "y" ]]; then

6
run.sh
View File

@ -4,4 +4,8 @@
set -euo pipefail
cd "$(dirname "$0")"
exec python -m steam_backlog_enforcer.main "${1:-done}"
if [[ $# -eq 0 ]]; then
exec python -m steam_backlog_enforcer.main "done"
else
exec python -m steam_backlog_enforcer.main "$@"
fi

View File

@ -7,6 +7,12 @@ import logging
import time
from typing import Any
from steam_backlog_enforcer._total_block import (
end_total_block_cleanup,
enforce_total_block_tick,
is_total_block_active,
total_block_needs_cleanup,
)
from steam_backlog_enforcer._whitelist import (
lock_enforcement_files,
promote_pending_exceptions,
@ -244,6 +250,16 @@ def _enforce_loop_iteration(config: Config, state: State) -> None:
config: Enforcer configuration.
state: Current enforcer state.
"""
# Total block takes priority over the assigned-game enforcement below -
# while active, don't fight ourselves (e.g. installing the assigned
# game while total-block tries to keep Steam uninstalled).
if is_total_block_active():
enforce_total_block_tick()
return
if total_block_needs_cleanup():
end_total_block_cleanup()
if state.current_app_id is None:
return
@ -298,12 +314,16 @@ def do_enforce(config: Config, state: State) -> None:
3. Auto-installs the assigned game if missing.
4. Kills any running unauthorized game processes.
"""
if state.current_app_id is None:
if is_total_block_active():
_echo(
"Total gaming block ACTIVE - enforcing that instead of any assigned game."
)
elif state.current_app_id is None:
_echo("No game assigned. Run 'scan' first.")
return
_echo(f"Enforcing: {state.current_game_name} (AppID={state.current_app_id})")
_enforce_setup(config, state)
else:
_echo(f"Enforcing: {state.current_game_name} (AppID={state.current_app_id})")
_enforce_setup(config, state)
_echo(f" Enforce loop: ACTIVE (every {ENFORCE_INTERVAL}s)")
_echo(" Guarding: processes + installs + store")

View File

@ -0,0 +1,688 @@
"""Total gaming block: no in-app command to lift it early.
Uninstalls Steam, kills all game/launcher processes, and blocks all
Steam + game-website domains for a fixed number of days.
Tamper-resistance is provided by guard-lib (~/guard-lib): the lock file's
``until`` timestamp is protected by a bind-mounted, chattr-immutable
file-guard instance, and pacman itself refuses to reinstall/upgrade the
``steam`` package while the lock is active (package-block).
"""
from __future__ import annotations
import contextlib
from dataclasses import dataclass
from datetime import datetime, timezone
import json
import logging
from pathlib import Path
import shutil
import socket
import subprocess
from steam_backlog_enforcer.config import (
BLOCKED_DOMAINS,
CONFIG_DIR,
HOSTS_FILE,
_atomic_write,
)
from steam_backlog_enforcer.enforcer import (
get_pids_by_process_names,
kill_processes_by_name,
)
from steam_backlog_enforcer.store_blocker import (
_disable_hosts_protection,
_enable_hosts_protection,
_sudo_write_hosts,
flush_dns_cache,
)
logger = logging.getLogger(__name__)
TOTAL_BLOCK_LOCK_FILE = CONFIG_DIR / "total_block_lock.json"
_IPTABLES_IP_CACHE_FILE = CONFIG_DIR / "total_block_ip_cache.json"
_PACKAGE_BLOCK_NAME = "steam-block"
_STEAM_PACKAGE = "steam"
# Steam's own client processes.
STEAM_CLIENT_PROCESS_NAMES = frozenset({"steam", "steamwebhelper", "steam.sh"})
# Third-party game launchers, best-effort match by process name.
LAUNCHER_PROCESS_NAMES = frozenset(
{
"EpicGamesLauncher",
"legendary",
"lutris",
"heroic",
"GalaxyClient",
"itch",
"bottles",
"minecraft-launcher",
"prismlauncher",
"multimc",
"polymc",
"ATLauncher",
"GDLauncher",
"gdlauncher-carbon",
"TLauncher",
"modrinth-app",
}
)
# Known limitation, not engineered around in this pass: any launcher run
# via an interpreter rather than its own compiled binary shows up in
# /proc/*/comm as the INTERPRETER's name, not its own - process-name
# matching won't catch those. Confirmed live for "lutris" (a Python
# script, appears as `python3`), and documented upstream for some
# Minecraft launchers (TLauncher, ATLauncher, GDLauncher - exec'd as
# `java -jar ...`, appear as `java`). Matching the interpreter name
# itself is NOT a fix: kill_processes_by_name runs inside this very
# enforcer process, which is itself `python3` - adding it to the set
# would SIGTERM the daemon and every other Python process on the system.
# Consistent with the "best-effort" framing already agreed for non-Steam
# blocking; the hosts+iptables domain blocking below is the backstop for
# launchers this can't catch by process name.
TOTAL_BLOCK_DOMAINS = [
*BLOCKED_DOMAINS,
"steamcommunity.com",
"api.steampowered.com",
"login.steampowered.com",
"help.steampowered.com",
"steamcontent.com",
"steamstatic.com",
"steamusercontent.com",
"cdn.steamstatic.com",
]
# Browser/flash game sites. Note itch.io overlaps with the "itch" desktop
# app process kill above (web storefront vs. desktop client).
GAME_WEBSITE_DOMAINS = [
"newgrounds.com",
"armorgames.com",
"kongregate.com",
"crazygames.com",
"poki.com",
"miniclip.com",
"addictinggames.com",
"y8.com",
"coolmathgames.com",
"itch.io",
]
def _expand_with_www(domains: list[str]) -> list[str]:
"""Add a ``www.`` variant for each bare second-level domain.
Most of these sites 301-redirect their apex domain to ``www.<domain>``
(confirmed live for newgrounds.com) - blocking only the apex leaves the
www subdomain reachable through both the hosts-file entry and the
iptables IP block. Domains that already carry a subdomain (e.g.
store.steampowered.com) are left as-is.
"""
expanded: list[str] = []
for domain in domains:
expanded.append(domain)
if domain.count(".") == 1:
expanded.append(f"www.{domain}")
return expanded
_ALL_TOTAL_BLOCK_DOMAINS = _expand_with_www(
[*TOTAL_BLOCK_DOMAINS, *GAME_WEBSITE_DOMAINS]
)
_HOSTS_BLOCK_BEGIN = "# BEGIN steam-backlog-enforcer total-block\n"
_HOSTS_BLOCK_END = "# END steam-backlog-enforcer total-block\n"
_SUDO = shutil.which("sudo") or "/usr/bin/sudo"
_GUARDCTL = shutil.which("guardctl") or "/usr/local/bin/guardctl"
# Call pacman.orig directly (bypassing pacman_wrapper's interactive
# word-unscramble challenge for "steam") - this is the tool's own
# authorized action, not a user bypass attempt, and enforce_total_block_tick
# must be able to run unattended.
_PACMAN = (
shutil.which("pacman.orig") or shutil.which("pacman") or "/usr/bin/pacman.orig"
)
_IPTABLES = shutil.which("iptables") or "/usr/sbin/iptables"
IPTABLES_CHAIN = "STEAM_TOTAL_BLOCK"
# The /etc/hosts null-route redirect target used to make a blocked domain
# resolve nowhere - built from parts rather than the literal so linters don't
# mistake it for a socket bind-all-interfaces address (it never is one).
_NULL_ROUTE_IP = ".".join(["0"] * 4)
@dataclass
class TotalBlockStatus:
"""Snapshot of the total-block lock state."""
active: bool
started_at: datetime | None
until: datetime | None
days: int
days_remaining: float
def _read_lock() -> dict[str, object] | None:
"""Read and parse the total-block lock file, or None if absent/invalid."""
if not TOTAL_BLOCK_LOCK_FILE.exists():
return None
try:
data = json.loads(TOTAL_BLOCK_LOCK_FILE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError, ValueError):
return None
if not isinstance(data, dict):
return None
return data
def is_total_block_active() -> bool:
"""Return True if a total gaming block is currently in force."""
data = _read_lock()
if data is None:
return False
until = data.get("until")
if not isinstance(until, int | float):
return False
return datetime.now(timezone.utc).timestamp() < until
def total_block_needs_cleanup() -> bool:
"""True if a total-block lock file exists on disk but has expired.
Distinguishes "never started" (no lock file - nothing to do) from
"expired, not yet cleaned up" (lock file present, `until` has passed) -
the latter needs :func:`end_total_block_cleanup` called exactly once.
``guardctl package-block end`` deletes the lock file, so this is
naturally self-terminating once cleanup has run.
"""
return _read_lock() is not None and not is_total_block_active()
def get_total_block_status() -> TotalBlockStatus:
"""Return a snapshot of the current total-block lock state."""
data = _read_lock()
if data is None:
return TotalBlockStatus(
active=False, started_at=None, until=None, days=0, days_remaining=0.0
)
started_at = data.get("started_at")
until = data.get("until")
days = data.get("days")
started_dt = (
datetime.fromtimestamp(started_at, tz=timezone.utc)
if isinstance(started_at, int | float)
else None
)
until_dt = (
datetime.fromtimestamp(until, tz=timezone.utc)
if isinstance(until, int | float)
else None
)
now = datetime.now(timezone.utc)
active = until_dt is not None and now < until_dt
days_remaining = (
(until_dt - now).total_seconds() / 86400 if active and until_dt else 0.0
)
return TotalBlockStatus(
active=active,
started_at=started_dt,
until=until_dt,
days=days if isinstance(days, int) else 0,
days_remaining=max(0.0, days_remaining),
)
# ──────────────────────────────────────────────────────────────
# Process killing + launcher package removal
# ──────────────────────────────────────────────────────────────
def _pacman_owner(path: str) -> str | None:
"""Return the pacman package name that owns *path*, or None."""
result = subprocess.run(
[_PACMAN, "-Qo", path],
capture_output=True,
text=True,
timeout=10,
check=False,
)
if result.returncode != 0:
return None
marker = " is owned by "
if marker not in result.stdout:
return None
tail = result.stdout.split(marker, 1)[1].strip()
return tail.split()[0] if tail else None
def _uninstall_package(package: str) -> bool:
"""Remove *package* via pacman. Returns True on success or if absent."""
try:
result = subprocess.run(
[_SUDO, _PACMAN, "-R", "--noconfirm", package],
capture_output=True,
text=True,
timeout=120,
check=False,
)
except (OSError, subprocess.SubprocessError):
logger.exception("Failed to run pacman -R %s", package)
return False
if result.returncode == 0:
return True
if "target not found" in (result.stderr or "").lower():
return True
logger.error(
"pacman -R %s failed (rc=%d): %s",
package,
result.returncode,
result.stderr[-500:] if result.stderr else "",
)
return False
def _kill_and_uninstall_launchers() -> list[tuple[int, str]]:
"""Kill running third-party launchers and uninstall their pacman package.
Resolves each PID's ``/proc/<pid>/exe`` target *before* sending SIGTERM,
since the symlink stops resolving once the process has exited. Package
removal is best-effort: launchers installed outside pacman (flatpak,
AppImage, a wine prefix) simply have no owning package and are just
killed again next tick, same as before this existed.
"""
pids = get_pids_by_process_names(LAUNCHER_PROCESS_NAMES)
exe_paths: dict[int, str] = {}
for pid in pids:
with contextlib.suppress(OSError):
exe_paths[pid] = str(Path(f"/proc/{pid}/exe").resolve(strict=True))
killed = kill_processes_by_name(LAUNCHER_PROCESS_NAMES)
packages: set[str] = set()
for pid, _name in killed:
exe_path = exe_paths.get(pid)
if exe_path is not None:
package = _pacman_owner(exe_path)
if package is not None:
packages.add(package)
for package in packages:
if not _uninstall_package(package):
logger.warning(
"Total block: failed to uninstall launcher package %s", package
)
return killed
def _kill_steam_and_launchers() -> list[tuple[int, str]]:
"""Kill Steam client and known third-party launcher processes."""
steam_killed = kill_processes_by_name(STEAM_CLIENT_PROCESS_NAMES)
launcher_killed = _kill_and_uninstall_launchers()
return steam_killed + launcher_killed
# ──────────────────────────────────────────────────────────────
# Steam package removal
# ──────────────────────────────────────────────────────────────
def _is_steam_installed() -> bool:
"""Return True if the ``steam`` pacman package is currently installed."""
result = subprocess.run(
[_PACMAN, "-Qi", _STEAM_PACKAGE],
capture_output=True,
timeout=10,
check=False,
)
return result.returncode == 0
def _uninstall_steam_package() -> bool:
"""Remove the ``steam`` pacman package.
Returns True on success or if it was already absent.
"""
return _uninstall_package(_STEAM_PACKAGE)
# ──────────────────────────────────────────────────────────────
# Domain blocking (hosts + iptables) - separate from store_blocker's own
# BLOCKED_DOMAINS/STEAM_ENFORCER state, so ending the total block never
# touches normal config.block_store entries.
# ──────────────────────────────────────────────────────────────
def _apply_total_block_hosts() -> bool:
"""Append the total-block domain block to /etc/hosts, if not present."""
try:
content = HOSTS_FILE.read_text(encoding="utf-8")
except OSError:
logger.exception("Failed to read /etc/hosts")
return False
if _HOSTS_BLOCK_BEGIN in content:
return True
block_lines = [_HOSTS_BLOCK_BEGIN]
block_lines += [
f"{_NULL_ROUTE_IP} {domain}\n" for domain in _ALL_TOTAL_BLOCK_DOMAINS
]
block_lines.append(_HOSTS_BLOCK_END)
new_content = content if content.endswith("\n") else content + "\n"
new_content += "".join(block_lines)
try:
_disable_hosts_protection()
_sudo_write_hosts(new_content)
except (OSError, subprocess.SubprocessError):
logger.exception("Failed to write total-block hosts entries")
return False
finally:
_enable_hosts_protection()
return True
def _remove_total_block_hosts() -> bool:
"""Remove the total-block domain block from /etc/hosts, if present."""
try:
content = HOSTS_FILE.read_text(encoding="utf-8")
except OSError:
logger.exception("Failed to read /etc/hosts")
return False
if _HOSTS_BLOCK_BEGIN not in content:
return True
start = content.index(_HOSTS_BLOCK_BEGIN)
end_marker_at = content.index(_HOSTS_BLOCK_END, start)
end = end_marker_at + len(_HOSTS_BLOCK_END)
new_content = content[:start] + content[end:]
try:
_disable_hosts_protection()
_sudo_write_hosts(new_content)
except (OSError, subprocess.SubprocessError):
logger.exception("Failed to remove total-block hosts entries")
return False
finally:
_enable_hosts_protection()
return True
def _load_cached_ips() -> set[str]:
"""Return the accumulated set of previously-resolved total-block IPs."""
if not _IPTABLES_IP_CACHE_FILE.exists():
return set()
try:
data = json.loads(_IPTABLES_IP_CACHE_FILE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError, ValueError):
return set()
if not isinstance(data, list):
return set()
return {str(ip) for ip in data}
def _save_cached_ips(ips: set[str]) -> None:
"""Persist the accumulated total-block IP set to disk."""
_atomic_write(_IPTABLES_IP_CACHE_FILE, json.dumps(sorted(ips)) + "\n")
def _iptables_chain_intact(expected_ips: set[str]) -> bool:
"""Cheap check for whether the chain and its OUTPUT hook are intact.
One `-S` + one `-C` call (two forks), versus the ~30 forks a full
rebuild costs - this is what keeps :func:`_apply_total_block_iptables`
from re-resolving DNS and re-forking a subprocess per IP on every
3-second enforce tick.
"""
listing = subprocess.run(
[_SUDO, _IPTABLES, "-S", IPTABLES_CHAIN],
capture_output=True,
text=True,
timeout=5,
check=False,
)
if listing.returncode != 0:
return False
current_ips: set[str] = set()
for line in listing.stdout.splitlines():
parts = line.split()
if "-d" in parts:
idx = parts.index("-d")
if idx + 1 < len(parts):
current_ips.add(parts[idx + 1].split("/")[0])
if not expected_ips.issubset(current_ips):
return False
hook = subprocess.run(
[_SUDO, _IPTABLES, "-C", "OUTPUT", "-j", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=False,
)
return hook.returncode == 0
def _apply_total_block_iptables() -> bool:
"""Ensure the total-block iptables chain blocks the known domain IPs.
Resolves domains and (re)builds the chain only when a cheap check
(:func:`_iptables_chain_intact`) shows it's actually needed - an
already-intact chain returns immediately. This matters for two
reasons: re-resolving via DNS every enforce tick (every 3s) would
otherwise fork ~30 subprocesses/tick indefinitely for a multi-day
block, and once /etc/hosts's entries take effect, these same domains
resolve to 0.0.0.0 locally, which would collapse a from-scratch
rebuild to that one trivial address and silently drop the real
upstream IPs blocked on the first, pre-hosts-block resolution -
resolving only when actually needed keeps the accumulated IP cache
from growing unboundedly too.
Callers MUST call this before :func:`_apply_total_block_hosts` the
first time (see :func:`start_total_block`): once the hosts entries
are in place, DNS resolution for every blocked domain returns 0.0.0.0
right here on this machine, and no real upstream IP is ever learned.
"""
cached = _load_cached_ips()
if cached and _iptables_chain_intact(cached):
return True
resolved_ips: set[str] = set()
try:
subprocess.run(
[_SUDO, _IPTABLES, "-N", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=False,
)
subprocess.run(
[_SUDO, _IPTABLES, "-F", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=True,
)
for domain in _ALL_TOTAL_BLOCK_DOMAINS:
with contextlib.suppress(socket.gaierror):
for _, _, _, _, addr in socket.getaddrinfo(domain, 443, socket.AF_INET):
resolved_ips.add(str(addr[0]))
blocked_ips = (cached | resolved_ips) - {_NULL_ROUTE_IP}
_save_cached_ips(blocked_ips)
for ip in blocked_ips:
subprocess.run(
[_SUDO, _IPTABLES, "-A", IPTABLES_CHAIN, "-d", ip, "-j", "DROP"],
capture_output=True,
timeout=5,
check=True,
)
result = subprocess.run(
[_SUDO, _IPTABLES, "-C", "OUTPUT", "-j", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=False,
)
if result.returncode != 0:
subprocess.run(
[_SUDO, _IPTABLES, "-I", "OUTPUT", "-j", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=True,
)
except (OSError, subprocess.SubprocessError):
logger.exception("Failed to apply total-block iptables rules")
return False
else:
logger.info(
"Total block: %d domain IP(s) blocked via iptables.", len(blocked_ips)
)
return True
def _remove_total_block_iptables() -> bool:
"""Remove the total-block iptables chain and its OUTPUT hook."""
try:
subprocess.run(
[_SUDO, _IPTABLES, "-D", "OUTPUT", "-j", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=False,
)
subprocess.run(
[_SUDO, _IPTABLES, "-F", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=False,
)
subprocess.run(
[_SUDO, _IPTABLES, "-X", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=False,
)
except (OSError, subprocess.SubprocessError):
logger.exception("Failed to remove total-block iptables rules")
return False
else:
_IPTABLES_IP_CACHE_FILE.unlink(missing_ok=True)
return True
# ──────────────────────────────────────────────────────────────
# Public lifecycle API
# ──────────────────────────────────────────────────────────────
def start_total_block(days: int) -> bool:
"""Start a total gaming block for *days* days.
Registers the package-block lock (bind-mounted, tamper-resistant) via
guard-lib first - that is the actual enforcement mechanism and must
succeed for the block to be considered active. Killing processes,
uninstalling Steam, and applying domain blocks are best-effort follow-up
steps (logged on failure, re-attempted every enforce tick via
:func:`enforce_total_block_tick`), since none of them being instantly
perfect should prevent the lock itself from engaging.
Returns:
True if the package-block lock was successfully registered.
"""
result = subprocess.run(
[
_SUDO,
_GUARDCTL,
"package-block",
"start",
_PACKAGE_BLOCK_NAME,
"--package",
_STEAM_PACKAGE,
"--lock-file",
str(TOTAL_BLOCK_LOCK_FILE),
"--days",
str(days),
"--bind-mount",
],
capture_output=True,
text=True,
timeout=30,
check=False,
)
if result.returncode != 0:
logger.error("Failed to start package-block lock: %s", result.stderr)
return False
killed = _kill_steam_and_launchers()
if killed:
logger.info("Total block: killed %d process(es): %s", len(killed), killed)
if not _uninstall_steam_package():
logger.warning("Total block: failed to uninstall steam (will retry each tick)")
# iptables MUST be applied before hosts: it resolves real upstream IPs,
# and once the hosts block is written, local resolution for these same
# domains collapses to 0.0.0.0 (see _apply_total_block_iptables).
if not _apply_total_block_iptables():
logger.warning("Total block: failed to apply iptables rules")
if not _apply_total_block_hosts():
logger.warning("Total block: failed to apply hosts entries")
flush_dns_cache()
return True
def enforce_total_block_tick() -> None:
"""Re-assert the total block.
Called every enforce-loop iteration while :func:`is_total_block_active`
is True.
"""
_kill_steam_and_launchers()
if _is_steam_installed():
logger.warning("Steam reappeared during total block - removing again")
_uninstall_steam_package()
_apply_total_block_iptables()
_apply_total_block_hosts()
def end_total_block_cleanup() -> None:
"""Clean up after the total-block lock has naturally expired.
Ends the package-block lock (guard-lib), removes total-block-specific
hosts/iptables entries, leaving normal ``config.block_store`` state
untouched. Does *not* reinstall Steam or restore killed processes -
the user is free to reinstall/relaunch once the block has expired.
"""
result = subprocess.run(
[_SUDO, _GUARDCTL, "package-block", "end", _PACKAGE_BLOCK_NAME],
capture_output=True,
text=True,
timeout=30,
check=False,
)
if result.returncode != 0:
logger.warning(
"package-block end failed (may already be ended): %s", result.stderr
)
if not _remove_total_block_hosts():
logger.warning("Failed to remove total-block hosts entries")
if not _remove_total_block_iptables():
logger.warning("Failed to remove total-block iptables rules")
flush_dns_cache()
logger.info("Total gaming block ended - normal enforcement resumes.")

View File

@ -42,6 +42,65 @@ def get_running_steam_game_pids() -> dict[int, int]:
return running
def get_pids_by_process_names(names: frozenset[str]) -> dict[int, str]:
"""Scan /proc/*/comm for processes whose command name is in *names*.
The kernel truncates ``/proc/[pid]/comm`` to 15 characters (plus a null
terminator), so *names* longer than that (e.g. ``EpicGamesLauncher``,
``gdlauncher-carbon``) are matched against their own first 15 characters
- matching how the kernel actually stores them, not an exact string
that could never appear in ``comm``.
Returns: dict mapping PID -> matched comm string.
"""
truncated = {name[:15]: name for name in names}
running: dict[int, str] = {}
proc = Path("/proc")
for entry in proc.iterdir():
if not entry.name.isdigit():
continue
try:
comm = (entry / "comm").read_text(encoding="utf-8").strip()
except (PermissionError, OSError, ValueError):
continue
if comm in truncated:
running[int(entry.name)] = truncated[comm]
return running
def kill_processes_by_name(names: frozenset[str]) -> list[tuple[int, str]]:
"""Kill (SIGTERM) every running process whose name is in *names*.
Matching is by ``/proc/[pid]/comm`` via :func:`get_pids_by_process_names`,
not the ``SteamAppId`` environment variable (unlike
:func:`get_running_steam_game_pids`) - this is for non-Steam processes
(the Steam client itself, and third-party game launchers) that don't set
that variable.
Returns: list of (pid, matched_name) actually killed.
"""
killed: list[tuple[int, str]] = []
for pid, name in get_pids_by_process_names(names).items():
if _kill_pid_by_name(pid, name):
killed.append((pid, name))
return killed
def _kill_pid_by_name(pid: int, name: str) -> bool:
"""Send SIGTERM to *pid*. Returns True if the signal was delivered."""
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
return False
except PermissionError:
logger.exception("No permission to kill PID %d (%s).", pid, name)
return False
else:
return True
def enforce_allowed_game(
allowed_app_id: int | None,
*,

View File

@ -15,6 +15,12 @@ from steam_backlog_enforcer._enforce_loop import (
)
from steam_backlog_enforcer._hltb_types import load_hltb_cache
from steam_backlog_enforcer._stats import cmd_stats
from steam_backlog_enforcer._total_block import (
TotalBlockStatus,
get_total_block_status,
is_total_block_active,
start_total_block,
)
from steam_backlog_enforcer._web_server import serve
from steam_backlog_enforcer._whitelist import (
WHITELIST_COOLDOWN_SECONDS,
@ -76,6 +82,45 @@ _MANUAL_LOCK_EXEMPT_COMMANDS = frozenset(
{"done", "check", "status", "enforce", "setup", "serve"}
)
# Commands that remain usable while a total gaming block is active. Far
# stricter than _MANUAL_LOCK_EXEMPT_COMMANDS: no done/pick/reset/
# add-exception - there is no in-app way to shorten a total block.
_TOTAL_BLOCK_EXEMPT_COMMANDS = frozenset({"status", "enforce"})
# ──────────────────────────────────────────────────────────────
# Total gaming block lock helpers
# ──────────────────────────────────────────────────────────────
def _show_total_block_lock_message(status: TotalBlockStatus) -> None:
"""Print the total-gaming-block-active message to stdout."""
_echo("\n" + "=" * 60)
_echo(" *** TOTAL GAMING BLOCK ACTIVE ***")
_echo("=" * 60)
if status.until is not None:
_echo(f"\nBlocked until: {status.until.strftime('%Y-%m-%d %H:%M UTC')}")
_echo(f"Days remaining: {status.days_remaining:.1f}")
_echo(
"\nSteam has been uninstalled, all known game/launcher processes are"
"\nbeing killed on sight, and Steam + game-website domains are blocked."
"\nThere is NO in-app command to lift this early."
f"\n\nAllowed commands: {', '.join(sorted(_TOTAL_BLOCK_EXEMPT_COMMANDS))}"
)
_echo("=" * 60 + "\n")
def _enforce_total_block_lock(command: str) -> None:
"""Exit with a lock message if command is blocked by an active total block."""
if not is_total_block_active():
return
if command in _TOTAL_BLOCK_EXEMPT_COMMANDS:
return
_show_total_block_lock_message(get_total_block_status())
sys.exit(1)
# ──────────────────────────────────────────────────────────────
# Manual pick lock helpers
@ -155,6 +200,13 @@ def cmd_status(_config: Config, state: State) -> None:
"""Show current status."""
_echo("=== Steam Backlog Enforcer ===\n")
total_block = get_total_block_status()
if total_block.active:
_echo("*** TOTAL GAMING BLOCK ACTIVE ***")
if total_block.until is not None:
_echo(f"Blocked until: {total_block.until.strftime('%Y-%m-%d %H:%M UTC')}")
_echo(f"Days remaining: {total_block.days_remaining:.1f}\n")
if state.current_app_id:
_echo(
f"Assigned game: {state.current_game_name} (AppID={state.current_app_id})"
@ -573,6 +625,62 @@ def cmd_pick_manual(config: Config, state: State, args: list[str]) -> None:
_echo(f" Library: hid {hidden} games")
_BLOCK_GAMING_USAGE = (
"Usage: block-gaming <days>\n"
" days : whole number of days to block ALL gaming:\n"
" Steam uninstalled, all known game/launcher processes killed,\n"
" Steam + game-website domains blocked.\n\n"
"There is NO in-app command to undo this early once confirmed."
)
def cmd_block_gaming(args: list[str]) -> None:
"""Start a total gaming block for a fixed number of days.
Usage: block-gaming <days>
Args:
args: Remaining CLI args (first element should be the day count).
"""
if not args:
_echo(_BLOCK_GAMING_USAGE)
sys.exit(1)
try:
days = int(args[0])
except ValueError:
_echo(f"Error: days must be a whole number, got '{args[0]}'.")
sys.exit(1)
if days < 1:
_echo("Error: days must be at least 1.")
sys.exit(1)
_echo(
f"\nWARNING: This will, for the next {days} day(s):"
f"\n - Uninstall Steam"
f"\n - Kill Steam and all known game-launcher processes on sight"
f"\n - Block all Steam network domains AND known browser/flash"
f"\n game websites"
f"\n\nThere is NO in-app command to undo this early. It can only be"
f"\nlifted by waiting out the {days} day(s), or by manual root-level"
f"\nsystem administration outside this tool."
)
_echo()
confirm = input(f"Type YES to confirm a {days}-day total gaming block: ").strip()
if confirm != "YES":
_echo("Aborted.")
return
_echo("\nStarting total gaming block...")
if start_total_block(days):
_echo(f"Total gaming block ACTIVE for {days} day(s).")
_echo("Run 'status' to check remaining time.")
else:
_echo("Error: failed to engage the block (see logs). Run with sudo?")
sys.exit(1)
COMMANDS: dict[str, tuple[str, Callable[[Config, State], object]]] = {
"scan": ("Scan library & assign a game", do_scan),
"check": ("Check assigned game completion", do_check),
@ -598,6 +706,7 @@ COMMANDS: dict[str, tuple[str, Callable[[Config, State], object]]] = {
_EXTRA_COMMAND_DESCRIPTIONS: dict[str, str] = {
"add-exception": "Request 24h-locked whitelist exception (use --reason)",
"pick-manual": f"Pick a game by app_id, lock enforcer for {_MANUAL_LOCK_DAYS} days",
"block-gaming": "Block ALL gaming for <days> days, no in-app undo",
}
_ALL_COMMANDS: dict[str, str] = {
@ -625,15 +734,23 @@ def main() -> None:
state = State.load()
# Total block is the most restrictive lock - check it first.
_enforce_total_block_lock(command)
# Enforce the manual-pick lock before dispatching any command.
# This also covers add-exception (previously dispatched before state load).
_enforce_manual_pick_lock(command, state)
# add-exception and pick-manual have non-standard argument structures.
# add-exception, pick-manual, and block-gaming have non-standard
# argument structures.
if command == "add-exception":
cmd_add_exception(sys.argv[2:])
return
if command == "block-gaming":
cmd_block_gaming(sys.argv[2:])
return
if command == "pick-manual":
cmd_pick_manual(config, state, sys.argv[2:])
return

View File

@ -1,15 +1,12 @@
"""Block Steam Store access via /etc/hosts (hosts install script) and iptables.
The system uses a dedicated hosts install script at
linux_configuration/hosts/install.sh that manages /etc/hosts with:
- chattr +ia (immutable + append-only)
- read-only bind mount
- protection against removing entries (only adding is easy)
This module checks if the Steam Store domains are already blocked in
/etc/hosts. If not, it runs the hosts install.sh (which must already
contain the Steam Store entries in its heredoc). As a belt-and-suspenders
fallback, it also blocks via iptables.
/etc/hosts is protected by guard-lib (~/guard-lib, file-guard instance
"hosts"): chattr +i, a read-only self-bind-mount, and a systemd path-unit
watcher. This module checks if the Steam Store domains are already blocked
in /etc/hosts. If not, it runs the hosts install.sh (which must already
contain the Steam Store entries in its heredoc), going through guard-lib's
unlock/relock around any edit. As a belt-and-suspenders fallback, it also
blocks via iptables.
"""
from __future__ import annotations
@ -28,9 +25,19 @@ from steam_backlog_enforcer.config import (
logger = logging.getLogger(__name__)
# Path to the hosts install script (relative to repo root).
# Path to the hosts install script. _REPO_ROOT resolves to $HOME (this
# module lives two levels below it); the script itself is in the
# linux_configuration checkout under testsAndMisc, not directly under $HOME.
_REPO_ROOT = Path(__file__).resolve().parents[2]
HOSTS_INSTALL_SCRIPT = _REPO_ROOT / "linux_configuration" / "hosts" / "install.sh"
HOSTS_INSTALL_SCRIPT = (
_REPO_ROOT
/ "testsAndMisc"
/ "linux_configuration"
/ "scripts"
/ "periodic_background"
/ "hosts"
/ "install.sh"
)
# iptables chain name for our blocking rules.
IPTABLES_CHAIN = "STEAM_ENFORCER"
@ -39,13 +46,7 @@ IPTABLES_CHAIN = "STEAM_ENFORCER"
_SUDO = shutil.which("sudo") or "/usr/bin/sudo"
_IPTABLES = shutil.which("iptables") or "/usr/sbin/iptables"
_BASH = shutil.which("bash") or "/usr/bin/bash"
_CHATTR = shutil.which("chattr") or "/usr/bin/chattr"
_SYSTEMCTL = shutil.which("systemctl") or "/usr/bin/systemctl"
_UMOUNT = shutil.which("umount") or "/usr/bin/umount"
_MOUNT = shutil.which("mount") or "/usr/bin/mount"
_FINDMNT = shutil.which("findmnt") or "/usr/bin/findmnt"
_CP = shutil.which("cp") or "/usr/bin/cp"
_CHMOD = shutil.which("chmod") or "/usr/bin/chmod"
_GUARDCTL = shutil.which("guardctl") or "/usr/local/bin/guardctl"
_TEE = shutil.which("tee") or "/usr/bin/tee"
# IP address used in /etc/hosts for blocking domains.
@ -292,77 +293,44 @@ def flush_dns_cache() -> None:
# ──────────────────────────────────────────────────────────────
# /etc/hosts protection helpers
#
# /etc/hosts is managed by guard-lib (see ~/guard-lib) as file-guard
# instance "hosts": chattr +i, a read-only self-bind-mount, and a
# systemd path-unit watcher. "pacman-unlock" stops the watcher and
# collapses the bind mount (same primitive guard-lib's own pacman hooks
# use). Relocking uses "sync", NOT "pacman-relock" - pacman-relock calls
# fg_enforce, which treats any diff from the (stale) canonical as drift
# and reverts it; that's correct for pacman's own hook flow (undo
# unwanted tampering) but wrong here, where *we* are the one legitimately
# changing content - it would silently undo our own edit. "sync" instead
# adopts the just-written content as the new canonical.
# ──────────────────────────────────────────────────────────────
_GUARD_SERVICES = ("hosts-bind-mount.service", "hosts-guard.path")
_LOCKED_HOSTS_COPY = Path("/usr/local/share/locked-hosts")
def _disable_hosts_protection() -> None:
"""Stop guard services, unmount bind mount, remove chattr flags."""
for svc in _GUARD_SERVICES:
subprocess.run(
[_SUDO, _SYSTEMCTL, "stop", svc],
capture_output=True,
timeout=10,
check=False,
)
"""Temporarily unlock /etc/hosts so its content can be edited.
# Unmount bind mount if active.
result = subprocess.run(
[_FINDMNT, str(HOSTS_FILE)],
capture_output=True,
timeout=5,
check=False,
)
if result.returncode == 0:
subprocess.run(
[_SUDO, _UMOUNT, str(HOSTS_FILE)],
capture_output=True,
timeout=5,
check=False,
)
# Remove immutable + append-only attributes.
Guard-lib: stop watcher, collapse bind mount, chattr -i.
"""
subprocess.run(
[_SUDO, _CHATTR, "-i", "-a", str(HOSTS_FILE)],
[_SUDO, _GUARDCTL, "file-guard", "pacman-unlock", "hosts"],
capture_output=True,
timeout=5,
timeout=10,
check=False,
)
def _enable_hosts_protection() -> None:
"""Re-apply chattr flags and restart guard services."""
"""Re-lock /etc/hosts, adopting its current content as the new canonical.
Guard-lib: chattr +i, reapply bind mount, restart watcher.
"""
subprocess.run(
[_SUDO, _CHMOD, "644", str(HOSTS_FILE)],
[_SUDO, _GUARDCTL, "file-guard", "sync", "hosts"],
capture_output=True,
timeout=5,
timeout=10,
check=False,
)
subprocess.run(
[_SUDO, _CHATTR, "+ia", str(HOSTS_FILE)],
capture_output=True,
timeout=5,
check=False,
)
# Update the canonical copy so the guard doesn't revert changes.
if _LOCKED_HOSTS_COPY.exists():
subprocess.run(
[_SUDO, _CP, str(HOSTS_FILE), str(_LOCKED_HOSTS_COPY)],
capture_output=True,
timeout=5,
check=False,
)
for svc in _GUARD_SERVICES:
subprocess.run(
[_SUDO, _SYSTEMCTL, "start", svc],
capture_output=True,
timeout=10,
check=False,
)
def _unblock_hosts() -> bool:

View File

@ -65,15 +65,32 @@ def _isolate_filesystem(tmp_path: Path) -> Iterator[None]:
"steam_backlog_enforcer._hltb_types.HLTB_CACHE_FILE",
fake_config / "hltb_cache.json",
),
# /etc/hosts (store blocker)
# /etc/hosts (store blocker + total block - each module has its own
# `from ... import HOSTS_FILE` binding, so each needs its own patch)
patch(
"steam_backlog_enforcer.store_blocker.HOSTS_FILE",
fake_hosts,
),
patch(
"steam_backlog_enforcer._total_block.HOSTS_FILE",
fake_hosts,
),
patch(
"steam_backlog_enforcer.config.HOSTS_FILE",
fake_hosts,
),
# Total-block lock + IP cache (computed at import time from
# CONFIG_DIR, so patching CONFIG_DIR alone does not redirect them -
# same gotcha as HLTB_CACHE_FILE above. A real total-block lock may
# be active on the host machine; tests must never touch it.)
patch(
"steam_backlog_enforcer._total_block.TOTAL_BLOCK_LOCK_FILE",
fake_config / "total_block_lock.json",
),
patch(
"steam_backlog_enforcer._total_block._IPTABLES_IP_CACHE_FILE",
fake_config / "total_block_ip_cache.json",
),
# Whitelist exception files (_whitelist module-level constants)
patch(
"steam_backlog_enforcer._whitelist.PENDING_EXCEPTIONS_FILE",
@ -125,6 +142,10 @@ def _block_real_subprocesses() -> Iterator[None]:
"steam_backlog_enforcer.store_blocker.subprocess.run",
noop_run,
),
patch(
"steam_backlog_enforcer._total_block.subprocess.run",
noop_run,
),
patch(
"steam_backlog_enforcer.library_hider.subprocess.run",
noop_run,

View File

@ -0,0 +1,115 @@
"""Tests for _enforce_loop module (part 3 - total-block short-circuit)."""
from __future__ import annotations
from unittest.mock import patch
from steam_backlog_enforcer._enforce_loop import _enforce_loop_iteration, do_enforce
from steam_backlog_enforcer.config import Config, State
PKG = "steam_backlog_enforcer._enforce_loop"
class TestEnforceLoopIterationTotalBlock:
"""Total-block short-circuit at the top of _enforce_loop_iteration."""
def test_active_short_circuits_before_assigned_game_logic(self) -> None:
config = Config(kill_unauthorized_games=True, uninstall_other_games=True)
state = State(current_app_id=1, current_game_name="G")
with (
patch(f"{PKG}.is_total_block_active", return_value=True),
patch(f"{PKG}.enforce_total_block_tick") as mock_tick,
patch(f"{PKG}.enforce_allowed_game") as mock_enforce,
patch(f"{PKG}._guard_installed_games") as mock_guard,
patch(f"{PKG}.is_game_installed") as mock_installed,
):
_enforce_loop_iteration(config, state)
mock_tick.assert_called_once()
mock_enforce.assert_not_called()
mock_guard.assert_not_called()
mock_installed.assert_not_called()
def test_not_active_runs_normal_logic(self) -> None:
config = Config(kill_unauthorized_games=False, uninstall_other_games=False)
state = State(current_app_id=1, current_game_name="G")
with (
patch(f"{PKG}.is_total_block_active", return_value=False),
patch(f"{PKG}.total_block_needs_cleanup", return_value=False),
patch(f"{PKG}.enforce_total_block_tick") as mock_tick,
patch(f"{PKG}.is_game_installed", return_value=True),
):
_enforce_loop_iteration(config, state)
mock_tick.assert_not_called()
def test_expired_lock_triggers_cleanup_once(self) -> None:
config = Config(kill_unauthorized_games=False, uninstall_other_games=False)
state = State(current_app_id=1, current_game_name="G")
with (
patch(f"{PKG}.is_total_block_active", return_value=False),
patch(f"{PKG}.total_block_needs_cleanup", return_value=True),
patch(f"{PKG}.end_total_block_cleanup") as mock_cleanup,
patch(f"{PKG}.is_game_installed", return_value=True),
):
_enforce_loop_iteration(config, state)
mock_cleanup.assert_called_once()
def test_no_lock_no_cleanup_call(self) -> None:
config = Config(kill_unauthorized_games=False, uninstall_other_games=False)
state = State(current_app_id=1, current_game_name="G")
with (
patch(f"{PKG}.is_total_block_active", return_value=False),
patch(f"{PKG}.total_block_needs_cleanup", return_value=False),
patch(f"{PKG}.end_total_block_cleanup") as mock_cleanup,
patch(f"{PKG}.is_game_installed", return_value=True),
):
_enforce_loop_iteration(config, state)
mock_cleanup.assert_not_called()
class TestDoEnforceTotalBlock:
"""Total-block awareness in do_enforce's one-time setup."""
def test_active_skips_enforce_setup_but_still_loops(self) -> None:
state = State() # no assigned game
config = Config()
with (
patch(f"{PKG}.is_total_block_active", return_value=True),
patch(f"{PKG}._enforce_setup") as mock_setup,
patch(f"{PKG}._echo"),
patch.object(State, "load", return_value=state),
patch(
f"{PKG}._enforce_loop_iteration",
side_effect=KeyboardInterrupt,
),
patch(f"{PKG}.time.sleep"),
):
do_enforce(config, state)
mock_setup.assert_not_called()
def test_active_with_no_game_does_not_early_return(self) -> None:
"""Without total block, no assigned game means do_enforce returns
immediately. With it active, the loop must still run."""
state = State()
config = Config()
with (
patch(f"{PKG}.is_total_block_active", return_value=True),
patch(f"{PKG}._enforce_setup"),
patch(f"{PKG}._echo"),
patch.object(State, "load", return_value=state),
patch(
f"{PKG}._enforce_loop_iteration",
side_effect=KeyboardInterrupt,
) as mock_iter,
patch(f"{PKG}.time.sleep"),
):
do_enforce(config, state)
mock_iter.assert_called_once()
def test_inactive_with_no_game_returns_early(self) -> None:
with (
patch(f"{PKG}.is_total_block_active", return_value=False),
patch(f"{PKG}._echo") as mock_echo,
):
do_enforce(Config(), State())
output = " ".join(str(c) for c in mock_echo.call_args_list)
assert "No game" in output

View File

@ -0,0 +1,163 @@
"""Tests for enforcer module — part 2 (comm-name process matching)."""
from __future__ import annotations
from typing import TYPE_CHECKING
from unittest.mock import patch
from steam_backlog_enforcer.enforcer import (
_kill_pid_by_name,
get_pids_by_process_names,
kill_processes_by_name,
)
if TYPE_CHECKING:
from pathlib import Path
class TestGetPidsByProcessNames:
"""Tests for get_pids_by_process_names."""
def test_finds_matching_comm(self, tmp_path: Path) -> None:
proc_dir = tmp_path / "proc"
pid_dir = proc_dir / "12345"
pid_dir.mkdir(parents=True)
(pid_dir / "comm").write_text("lutris\n", encoding="utf-8")
with patch(
"steam_backlog_enforcer.enforcer.Path",
return_value=proc_dir,
):
result = get_pids_by_process_names(frozenset({"lutris"}))
assert result == {12345: "lutris"}
def test_no_match_returns_empty(self, tmp_path: Path) -> None:
proc_dir = tmp_path / "proc"
pid_dir = proc_dir / "12345"
pid_dir.mkdir(parents=True)
(pid_dir / "comm").write_text("bash\n", encoding="utf-8")
with patch(
"steam_backlog_enforcer.enforcer.Path",
return_value=proc_dir,
):
result = get_pids_by_process_names(frozenset({"lutris"}))
assert result == {}
def test_skips_non_digit_entries(self, tmp_path: Path) -> None:
proc_dir = tmp_path / "proc"
proc_dir.mkdir(parents=True)
(proc_dir / "self").mkdir()
with patch(
"steam_backlog_enforcer.enforcer.Path",
return_value=proc_dir,
):
result = get_pids_by_process_names(frozenset({"lutris"}))
assert result == {}
def test_handles_missing_comm_file(self, tmp_path: Path) -> None:
proc_dir = tmp_path / "proc"
(proc_dir / "42").mkdir(parents=True)
# No comm file -> OSError when reading.
with patch(
"steam_backlog_enforcer.enforcer.Path",
return_value=proc_dir,
):
result = get_pids_by_process_names(frozenset({"lutris"}))
assert result == {}
def test_truncates_long_names_to_15_chars(self, tmp_path: Path) -> None:
"""The kernel truncates /proc/[pid]/comm to 15 chars - matching
must compare against the truncated form, not the full name."""
proc_dir = tmp_path / "proc"
pid_dir = proc_dir / "777"
pid_dir.mkdir(parents=True)
# "EpicGamesLauncher" truncated to 15 chars is "EpicGamesLaunch".
(pid_dir / "comm").write_text("EpicGamesLaunch\n", encoding="utf-8")
with patch(
"steam_backlog_enforcer.enforcer.Path",
return_value=proc_dir,
):
result = get_pids_by_process_names(frozenset({"EpicGamesLauncher"}))
assert result == {777: "EpicGamesLauncher"}
class TestKillProcessesByName:
"""Tests for kill_processes_by_name."""
def test_kills_matching_pids(self) -> None:
with (
patch(
"steam_backlog_enforcer.enforcer.get_pids_by_process_names",
return_value={100: "lutris", 200: "prismlauncher"},
),
patch("steam_backlog_enforcer.enforcer.os.kill") as mock_kill,
):
result = kill_processes_by_name(frozenset({"lutris", "prismlauncher"}))
assert sorted(result) == [(100, "lutris"), (200, "prismlauncher")]
assert mock_kill.call_count == 2
def test_no_matches_returns_empty(self) -> None:
with patch(
"steam_backlog_enforcer.enforcer.get_pids_by_process_names",
return_value={},
):
result = kill_processes_by_name(frozenset({"lutris"}))
assert result == []
def test_process_already_gone_not_included(self) -> None:
with (
patch(
"steam_backlog_enforcer.enforcer.get_pids_by_process_names",
return_value={100: "lutris"},
),
patch(
"steam_backlog_enforcer.enforcer.os.kill",
side_effect=ProcessLookupError,
),
):
result = kill_processes_by_name(frozenset({"lutris"}))
assert result == []
def test_permission_error_not_included(self) -> None:
with (
patch(
"steam_backlog_enforcer.enforcer.get_pids_by_process_names",
return_value={100: "lutris"},
),
patch(
"steam_backlog_enforcer.enforcer.os.kill",
side_effect=PermissionError,
),
):
result = kill_processes_by_name(frozenset({"lutris"}))
assert result == []
class TestKillPidByName:
"""Tests for _kill_pid_by_name."""
def test_success_returns_true(self) -> None:
with patch("steam_backlog_enforcer.enforcer.os.kill") as mock_kill:
result = _kill_pid_by_name(123, "lutris")
assert result is True
mock_kill.assert_called_once()
def test_process_already_gone_returns_false(self) -> None:
with patch(
"steam_backlog_enforcer.enforcer.os.kill",
side_effect=ProcessLookupError,
):
result = _kill_pid_by_name(123, "lutris")
assert result is False
def test_permission_error_returns_false(self) -> None:
with patch(
"steam_backlog_enforcer.enforcer.os.kill",
side_effect=PermissionError,
):
result = _kill_pid_by_name(123, "lutris")
assert result is False

View File

@ -0,0 +1,260 @@
"""Tests for main CLI module — part 5 (total gaming block lock + command)."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import sys
from unittest.mock import patch
import pytest
from steam_backlog_enforcer._total_block import TotalBlockStatus
from steam_backlog_enforcer.config import Config, State
from steam_backlog_enforcer.main import (
_TOTAL_BLOCK_EXEMPT_COMMANDS,
_enforce_total_block_lock,
_show_total_block_lock_message,
cmd_block_gaming,
cmd_status,
main,
)
PKG = "steam_backlog_enforcer.main"
_ACTIVE_STATUS = TotalBlockStatus(
active=True,
started_at=datetime.now(timezone.utc) - timedelta(hours=1),
until=datetime.now(timezone.utc) + timedelta(hours=23),
days=1,
days_remaining=0.96,
)
_INACTIVE_STATUS = TotalBlockStatus(
active=False, started_at=None, until=None, days=0, days_remaining=0.0
)
# ──────────────────────────────────────────────────────────────
# _show_total_block_lock_message
# ──────────────────────────────────────────────────────────────
class TestShowTotalBlockLockMessage:
def test_shows_remaining_time(self) -> None:
with patch(f"{PKG}._echo") as mock_echo:
_show_total_block_lock_message(_ACTIVE_STATUS)
output = " ".join(str(c) for c in mock_echo.call_args_list)
assert "TOTAL GAMING BLOCK ACTIVE" in output
assert "Days remaining" in output
def test_lists_exempt_commands(self) -> None:
with patch(f"{PKG}._echo") as mock_echo:
_show_total_block_lock_message(_ACTIVE_STATUS)
output = " ".join(str(c) for c in mock_echo.call_args_list)
assert "status" in output
assert "enforce" in output
def test_no_crash_without_until(self) -> None:
status = TotalBlockStatus(
active=True, started_at=None, until=None, days=1, days_remaining=0.5
)
with patch(f"{PKG}._echo"):
_show_total_block_lock_message(status) # must not raise
# ──────────────────────────────────────────────────────────────
# _enforce_total_block_lock
# ──────────────────────────────────────────────────────────────
class TestEnforceTotalBlockLock:
def test_not_active_passes(self) -> None:
with patch(f"{PKG}.is_total_block_active", return_value=False):
_enforce_total_block_lock("scan") # no exit
def test_exempt_command_passes_while_active(self) -> None:
with patch(f"{PKG}.is_total_block_active", return_value=True):
_enforce_total_block_lock("status") # no exit
_enforce_total_block_lock("enforce") # no exit
def test_blocked_command_exits(self) -> None:
with (
patch(f"{PKG}.is_total_block_active", return_value=True),
patch(f"{PKG}.get_total_block_status", return_value=_ACTIVE_STATUS),
patch(f"{PKG}._show_total_block_lock_message"),
pytest.raises(SystemExit) as exc_info,
):
_enforce_total_block_lock("scan")
assert exc_info.value.code == 1
def test_done_blocked_while_active(self) -> None:
"""Stricter than the manual-pick lock: even 'done' is blocked."""
with (
patch(f"{PKG}.is_total_block_active", return_value=True),
patch(f"{PKG}.get_total_block_status", return_value=_ACTIVE_STATUS),
patch(f"{PKG}._show_total_block_lock_message"),
pytest.raises(SystemExit),
):
_enforce_total_block_lock("done")
def test_add_exception_blocked_while_active(self) -> None:
with (
patch(f"{PKG}.is_total_block_active", return_value=True),
patch(f"{PKG}.get_total_block_status", return_value=_ACTIVE_STATUS),
patch(f"{PKG}._show_total_block_lock_message"),
pytest.raises(SystemExit),
):
_enforce_total_block_lock("add-exception")
def test_exempt_set_is_stricter_than_manual_pick(self) -> None:
assert frozenset({"status", "enforce"}) == _TOTAL_BLOCK_EXEMPT_COMMANDS
# ──────────────────────────────────────────────────────────────
# cmd_block_gaming
# ──────────────────────────────────────────────────────────────
class TestCmdBlockGaming:
def test_no_args_shows_usage(self) -> None:
with patch(f"{PKG}._echo") as mock_echo, pytest.raises(SystemExit) as exc_info:
cmd_block_gaming([])
assert exc_info.value.code == 1
output = " ".join(str(c) for c in mock_echo.call_args_list)
assert "Usage" in output
def test_non_numeric_days(self) -> None:
with patch(f"{PKG}._echo"), pytest.raises(SystemExit):
cmd_block_gaming(["abc"])
def test_zero_days_rejected(self) -> None:
with patch(f"{PKG}._echo"), pytest.raises(SystemExit):
cmd_block_gaming(["0"])
def test_negative_days_rejected(self) -> None:
with patch(f"{PKG}._echo"), pytest.raises(SystemExit):
cmd_block_gaming(["-1"])
def test_aborted_when_not_yes(self) -> None:
with (
patch(f"{PKG}._echo"),
patch("builtins.input", return_value="no"),
patch(f"{PKG}.start_total_block") as mock_start,
):
cmd_block_gaming(["14"])
mock_start.assert_not_called()
def test_confirmed_starts_block(self) -> None:
with (
patch(f"{PKG}._echo"),
patch("builtins.input", return_value="YES"),
patch(f"{PKG}.start_total_block", return_value=True) as mock_start,
):
cmd_block_gaming(["14"])
mock_start.assert_called_once_with(14)
def test_start_failure_exits_nonzero(self) -> None:
with (
patch(f"{PKG}._echo"),
patch("builtins.input", return_value="YES"),
patch(f"{PKG}.start_total_block", return_value=False),
pytest.raises(SystemExit) as exc_info,
):
cmd_block_gaming(["14"])
assert exc_info.value.code == 1
# ──────────────────────────────────────────────────────────────
# main() dispatch to block-gaming
# ──────────────────────────────────────────────────────────────
class TestMainDispatchBlockGaming:
def test_dispatches_block_gaming(self) -> None:
argv = ["prog", "block-gaming", "14"]
with (
patch.object(sys, "argv", argv),
patch(f"{PKG}.Config.load", return_value=Config(steam_api_key="k")),
patch(f"{PKG}.State.load", return_value=State()),
patch(f"{PKG}.is_total_block_active", return_value=False),
patch(f"{PKG}.cmd_block_gaming") as mock_cmd,
):
main()
mock_cmd.assert_called_once_with(["14"])
def test_blocked_when_already_active(self) -> None:
argv = ["prog", "scan"]
with (
patch.object(sys, "argv", argv),
patch(f"{PKG}.Config.load", return_value=Config(steam_api_key="k")),
patch(f"{PKG}.State.load", return_value=State()),
patch(f"{PKG}.is_total_block_active", return_value=True),
patch(f"{PKG}.get_total_block_status", return_value=_ACTIVE_STATUS),
patch(f"{PKG}._show_total_block_lock_message"),
pytest.raises(SystemExit) as exc_info,
):
main()
assert exc_info.value.code == 1
def test_status_allowed_when_active(self) -> None:
# "status" is dispatched via the COMMANDS dict, which captures the
# cmd_status function reference at import time - patching
# main.cmd_status would not intercept it. Verify real behavior
# (no SystemExit, real status output) instead.
argv = ["prog", "status"]
with (
patch.object(sys, "argv", argv),
patch(f"{PKG}.Config.load", return_value=Config(steam_api_key="k")),
patch(f"{PKG}.State.load", return_value=State()),
patch(f"{PKG}.is_total_block_active", return_value=True),
patch(f"{PKG}.is_store_blocked", return_value=False),
patch(f"{PKG}.get_installed_games", return_value=[]),
patch(f"{PKG}._echo") as mock_echo,
):
main() # must not raise SystemExit
output = " ".join(str(c) for c in mock_echo.call_args_list)
assert "Steam Backlog Enforcer" in output
# ──────────────────────────────────────────────────────────────
# cmd_status shows total block info
# ──────────────────────────────────────────────────────────────
class TestCmdStatusTotalBlock:
def test_shows_total_block_when_active(self) -> None:
with (
patch(f"{PKG}.get_total_block_status", return_value=_ACTIVE_STATUS),
patch(f"{PKG}.is_store_blocked", return_value=False),
patch(f"{PKG}.get_installed_games", return_value=[]),
patch(f"{PKG}._echo") as mock_echo,
):
cmd_status(Config(), State())
output = " ".join(str(c) for c in mock_echo.call_args_list)
assert "TOTAL GAMING BLOCK ACTIVE" in output
assert "Days remaining" in output
def test_no_total_block_section_when_inactive(self) -> None:
with (
patch(f"{PKG}.get_total_block_status", return_value=_INACTIVE_STATUS),
patch(f"{PKG}.is_store_blocked", return_value=False),
patch(f"{PKG}.get_installed_games", return_value=[]),
patch(f"{PKG}._echo") as mock_echo,
):
cmd_status(Config(), State())
output = " ".join(str(c) for c in mock_echo.call_args_list)
assert "TOTAL GAMING BLOCK" not in output
def test_active_without_until_skips_remaining_time(self) -> None:
status = TotalBlockStatus(
active=True, started_at=None, until=None, days=1, days_remaining=0.5
)
with (
patch(f"{PKG}.get_total_block_status", return_value=status),
patch(f"{PKG}.is_store_blocked", return_value=False),
patch(f"{PKG}.get_installed_games", return_value=[]),
patch(f"{PKG}._echo") as mock_echo,
):
cmd_status(Config(), State())
output = " ".join(str(c) for c in mock_echo.call_args_list)
assert "TOTAL GAMING BLOCK ACTIVE" in output
assert "Days remaining" not in output

View File

@ -3,7 +3,7 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from unittest.mock import MagicMock, patch
from unittest.mock import patch
from steam_backlog_enforcer.store_blocker import (
_disable_hosts_protection,
@ -33,54 +33,23 @@ class TestSudoWriteHosts:
class TestDisableHostsProtection:
"""Tests for _disable_hosts_protection."""
def test_stops_services_unmounts_chattr(self) -> None:
findmnt_found = MagicMock(returncode=0)
def run_side_effect(
cmd: list[str],
**_kwargs: object,
) -> MagicMock:
if any("findmnt" in str(c) for c in cmd):
return findmnt_found
return MagicMock(returncode=0)
with patch(f"{PKG}.subprocess.run", side_effect=run_side_effect):
_disable_hosts_protection()
def test_no_bind_mount(self) -> None:
findmnt_missing = MagicMock(returncode=1)
def run_side_effect(
cmd: list[str],
**_kwargs: object,
) -> MagicMock:
if any("findmnt" in str(c) for c in cmd):
return findmnt_missing
return MagicMock(returncode=0)
with patch(f"{PKG}.subprocess.run", side_effect=run_side_effect):
def test_calls_guardctl_pacman_unlock(self) -> None:
with patch(f"{PKG}.subprocess.run") as mock_run:
_disable_hosts_protection()
mock_run.assert_called_once()
cmd = mock_run.call_args.args[0]
assert cmd[-3:] == ["file-guard", "pacman-unlock", "hosts"]
class TestEnableHostsProtection:
"""Tests for _enable_hosts_protection."""
def test_with_locked_copy(self, tmp_path: Path) -> None:
locked_copy = tmp_path / "locked-hosts"
locked_copy.touch()
with (
patch(f"{PKG}.subprocess.run"),
patch(f"{PKG}._LOCKED_HOSTS_COPY", locked_copy),
):
_enable_hosts_protection()
def test_without_locked_copy(self, tmp_path: Path) -> None:
locked_copy = tmp_path / "nonexistent"
with (
patch(f"{PKG}.subprocess.run"),
patch(f"{PKG}._LOCKED_HOSTS_COPY", locked_copy),
):
def test_calls_guardctl_sync(self) -> None:
with patch(f"{PKG}.subprocess.run") as mock_run:
_enable_hosts_protection()
mock_run.assert_called_once()
cmd = mock_run.call_args.args[0]
assert cmd[-3:] == ["file-guard", "sync", "hosts"]
class TestUnblockHosts:

View File

@ -0,0 +1,736 @@
"""Tests for _total_block module (block-gaming total block).
`TOTAL_BLOCK_LOCK_FILE`, `_IPTABLES_IP_CACHE_FILE`, and `HOSTS_FILE` are
patched per-test by the `paths` fixture below to tmp_path locations,
overriding conftest's own autouse patch with paths the tests can read and
write directly. Tests must never do `from ... import TOTAL_BLOCK_LOCK_FILE`
- that captures the value at file-import time, before any patch is active,
and silently ignores every subsequent patch (the same binding gotcha
documented in this repo's own conftest.py for HLTB_CACHE_FILE).
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import json
from typing import TYPE_CHECKING
from unittest.mock import MagicMock, patch
import pytest
from steam_backlog_enforcer._total_block import (
_HOSTS_BLOCK_BEGIN,
_HOSTS_BLOCK_END,
IPTABLES_CHAIN,
_apply_total_block_hosts,
_apply_total_block_iptables,
_iptables_chain_intact,
_is_steam_installed,
_kill_and_uninstall_launchers,
_kill_steam_and_launchers,
_load_cached_ips,
_pacman_owner,
_remove_total_block_hosts,
_remove_total_block_iptables,
_save_cached_ips,
_uninstall_package,
_uninstall_steam_package,
end_total_block_cleanup,
enforce_total_block_tick,
get_total_block_status,
is_total_block_active,
start_total_block,
total_block_needs_cleanup,
)
if TYPE_CHECKING:
from collections.abc import Iterator
from pathlib import Path
PKG = "steam_backlog_enforcer._total_block"
@dataclass
class _Paths:
lock_file: Path
ip_cache_file: Path
hosts_file: Path
@pytest.fixture(autouse=True)
def paths(tmp_path: Path) -> Iterator[_Paths]:
"""Redirect the module's path constants to tmp_path for every test."""
paths = _Paths(
lock_file=tmp_path / "total_block_lock.json",
ip_cache_file=tmp_path / "total_block_ip_cache.json",
hosts_file=tmp_path / "hosts",
)
with (
patch(f"{PKG}.TOTAL_BLOCK_LOCK_FILE", paths.lock_file),
patch(f"{PKG}._IPTABLES_IP_CACHE_FILE", paths.ip_cache_file),
patch(f"{PKG}.HOSTS_FILE", paths.hosts_file),
):
yield paths
def _write_lock(paths: _Paths, started_at: float, until: float, days: int = 1) -> None:
paths.lock_file.parent.mkdir(parents=True, exist_ok=True)
paths.lock_file.write_text(
json.dumps({"started_at": started_at, "until": until, "days": days}),
encoding="utf-8",
)
_NOW = datetime.now(timezone.utc).timestamp()
# ──────────────────────────────────────────────────────────────
# Lock reading / status
# ──────────────────────────────────────────────────────────────
class TestIsTotalBlockActive:
def test_no_lock_file(self) -> None:
assert is_total_block_active() is False
def test_active_lock(self, paths: _Paths) -> None:
_write_lock(paths, _NOW, _NOW + 3600)
assert is_total_block_active() is True
def test_expired_lock(self, paths: _Paths) -> None:
_write_lock(paths, _NOW - 3600, _NOW - 1)
assert is_total_block_active() is False
def test_malformed_json(self, paths: _Paths) -> None:
paths.lock_file.parent.mkdir(parents=True, exist_ok=True)
paths.lock_file.write_text("not json", encoding="utf-8")
assert is_total_block_active() is False
def test_non_dict_json(self, paths: _Paths) -> None:
paths.lock_file.parent.mkdir(parents=True, exist_ok=True)
paths.lock_file.write_text("[1, 2, 3]", encoding="utf-8")
assert is_total_block_active() is False
def test_missing_until_key(self, paths: _Paths) -> None:
paths.lock_file.parent.mkdir(parents=True, exist_ok=True)
paths.lock_file.write_text(json.dumps({"days": 1}), encoding="utf-8")
assert is_total_block_active() is False
def test_non_numeric_until(self, paths: _Paths) -> None:
paths.lock_file.parent.mkdir(parents=True, exist_ok=True)
paths.lock_file.write_text(
json.dumps({"until": "not-a-number"}), encoding="utf-8"
)
assert is_total_block_active() is False
class TestTotalBlockNeedsCleanup:
def test_no_lock_file(self) -> None:
assert total_block_needs_cleanup() is False
def test_active_lock_no_cleanup_needed(self, paths: _Paths) -> None:
_write_lock(paths, _NOW, _NOW + 3600)
assert total_block_needs_cleanup() is False
def test_expired_lock_needs_cleanup(self, paths: _Paths) -> None:
_write_lock(paths, _NOW - 3600, _NOW - 1)
assert total_block_needs_cleanup() is True
class TestGetTotalBlockStatus:
def test_no_lock(self) -> None:
status = get_total_block_status()
assert status.active is False
assert status.started_at is None
assert status.until is None
assert status.days == 0
assert status.days_remaining == 0.0
def test_active_lock(self, paths: _Paths) -> None:
_write_lock(paths, _NOW, _NOW + 86400, days=1)
status = get_total_block_status()
assert status.active is True
assert status.days == 1
assert 0.0 < status.days_remaining <= 1.0
assert status.started_at is not None
assert status.until is not None
def test_expired_lock(self, paths: _Paths) -> None:
_write_lock(paths, _NOW - 7200, _NOW - 3600, days=1)
status = get_total_block_status()
assert status.active is False
assert status.days_remaining == 0.0
def test_malformed_json_returns_inactive(self, paths: _Paths) -> None:
paths.lock_file.parent.mkdir(parents=True, exist_ok=True)
paths.lock_file.write_text("garbage", encoding="utf-8")
status = get_total_block_status()
assert status.active is False
def test_non_int_days_defaults_to_zero(self, paths: _Paths) -> None:
paths.lock_file.parent.mkdir(parents=True, exist_ok=True)
paths.lock_file.write_text(
json.dumps({"started_at": _NOW, "until": _NOW + 3600, "days": "one"}),
encoding="utf-8",
)
status = get_total_block_status()
assert status.days == 0
# ──────────────────────────────────────────────────────────────
# Process killing
# ──────────────────────────────────────────────────────────────
class TestKillSteamAndLaunchers:
def test_combines_steam_and_launcher_kills(self) -> None:
with (
patch(f"{PKG}.kill_processes_by_name", return_value=[(1, "steam")]),
patch(
f"{PKG}._kill_and_uninstall_launchers",
return_value=[(2, "prismlauncher")],
) as mock_launchers,
):
result = _kill_steam_and_launchers()
assert result == [(1, "steam"), (2, "prismlauncher")]
mock_launchers.assert_called_once()
class TestPacmanOwner:
def test_owned_path_returns_package_name(self) -> None:
result = MagicMock(
returncode=0,
stdout="/usr/bin/prismlauncher is owned by prismlauncher-git 11.0.0-1\n",
)
with patch(f"{PKG}.subprocess.run", return_value=result):
assert _pacman_owner("/usr/bin/prismlauncher") == "prismlauncher-git"
def test_unowned_path_returns_none(self) -> None:
result = MagicMock(returncode=1, stdout="")
with patch(f"{PKG}.subprocess.run", return_value=result):
assert _pacman_owner("/opt/foo/bar") is None
def test_unexpected_output_format_returns_none(self) -> None:
result = MagicMock(returncode=0, stdout="something unexpected\n")
with patch(f"{PKG}.subprocess.run", return_value=result):
assert _pacman_owner("/usr/bin/x") is None
class TestUninstallPackage:
def test_success(self) -> None:
with patch(
f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0, stderr="")
):
assert _uninstall_package("foo") is True
def test_already_absent_treated_as_success(self) -> None:
with patch(
f"{PKG}.subprocess.run",
return_value=MagicMock(returncode=1, stderr="error: target not found: foo"),
):
assert _uninstall_package("foo") is True
def test_real_failure_returns_false(self) -> None:
with patch(
f"{PKG}.subprocess.run",
return_value=MagicMock(returncode=1, stderr="some other error"),
):
assert _uninstall_package("foo") is False
def test_subprocess_error_returns_false(self) -> None:
with patch(f"{PKG}.subprocess.run", side_effect=OSError):
assert _uninstall_package("foo") is False
class TestKillAndUninstallLaunchers:
def test_no_launchers_running(self) -> None:
with (
patch(f"{PKG}.get_pids_by_process_names", return_value={}),
patch(f"{PKG}.kill_processes_by_name", return_value=[]),
):
assert _kill_and_uninstall_launchers() == []
def test_kills_and_uninstalls_owned_package(self) -> None:
with (
patch(
f"{PKG}.get_pids_by_process_names",
return_value={123: "prismlauncher"},
),
patch(f"{PKG}.Path") as mock_path_cls,
patch(
f"{PKG}.kill_processes_by_name",
return_value=[(123, "prismlauncher")],
),
patch(f"{PKG}._pacman_owner", return_value="prismlauncher-git"),
patch(f"{PKG}._uninstall_package", return_value=True) as mock_uninstall,
):
mock_path_cls.return_value.resolve.return_value = "/usr/bin/prismlauncher"
result = _kill_and_uninstall_launchers()
assert result == [(123, "prismlauncher")]
mock_uninstall.assert_called_once_with("prismlauncher-git")
def test_exe_path_unreadable_skips_uninstall(self) -> None:
with (
patch(
f"{PKG}.get_pids_by_process_names",
return_value={123: "prismlauncher"},
),
patch(f"{PKG}.Path") as mock_path_cls,
patch(
f"{PKG}.kill_processes_by_name",
return_value=[(123, "prismlauncher")],
),
patch(f"{PKG}._pacman_owner") as mock_owner,
patch(f"{PKG}._uninstall_package") as mock_uninstall,
):
mock_path_cls.return_value.resolve.side_effect = OSError
result = _kill_and_uninstall_launchers()
assert result == [(123, "prismlauncher")]
mock_owner.assert_not_called()
mock_uninstall.assert_not_called()
def test_unowned_package_not_uninstalled(self) -> None:
with (
patch(f"{PKG}.get_pids_by_process_names", return_value={123: "custom"}),
patch(f"{PKG}.Path") as mock_path_cls,
patch(f"{PKG}.kill_processes_by_name", return_value=[(123, "custom")]),
patch(f"{PKG}._pacman_owner", return_value=None),
patch(f"{PKG}._uninstall_package") as mock_uninstall,
):
mock_path_cls.return_value.resolve.return_value = "/opt/custom/launcher"
_kill_and_uninstall_launchers()
mock_uninstall.assert_not_called()
def test_uninstall_failure_is_logged_not_raised(self) -> None:
with (
patch(
f"{PKG}.get_pids_by_process_names",
return_value={123: "prismlauncher"},
),
patch(f"{PKG}.Path") as mock_path_cls,
patch(
f"{PKG}.kill_processes_by_name",
return_value=[(123, "prismlauncher")],
),
patch(f"{PKG}._pacman_owner", return_value="prismlauncher-git"),
patch(f"{PKG}._uninstall_package", return_value=False),
):
mock_path_cls.return_value.resolve.return_value = "/usr/bin/prismlauncher"
_kill_and_uninstall_launchers() # must not raise
# ──────────────────────────────────────────────────────────────
# Steam package removal
# ──────────────────────────────────────────────────────────────
class TestIsSteamInstalled:
def test_installed(self) -> None:
with patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0)):
assert _is_steam_installed() is True
def test_not_installed(self) -> None:
with patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=1)):
assert _is_steam_installed() is False
class TestUninstallSteamPackage:
def test_success(self) -> None:
with patch(
f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0, stderr="")
):
assert _uninstall_steam_package() is True
def test_already_absent_treated_as_success(self) -> None:
with patch(
f"{PKG}.subprocess.run",
return_value=MagicMock(
returncode=1, stderr="error: target not found: steam"
),
):
assert _uninstall_steam_package() is True
def test_real_failure_returns_false(self) -> None:
with patch(
f"{PKG}.subprocess.run",
return_value=MagicMock(returncode=1, stderr="some other error"),
):
assert _uninstall_steam_package() is False
def test_subprocess_error_returns_false(self) -> None:
with patch(f"{PKG}.subprocess.run", side_effect=OSError):
assert _uninstall_steam_package() is False
# ──────────────────────────────────────────────────────────────
# Hosts domain blocking
# ──────────────────────────────────────────────────────────────
class TestApplyTotalBlockHosts:
def test_appends_block_when_absent(self, paths: _Paths) -> None:
paths.hosts_file.write_text("127.0.0.1 localhost\n", encoding="utf-8")
with (
patch(f"{PKG}._disable_hosts_protection"),
patch(f"{PKG}._enable_hosts_protection"),
patch(f"{PKG}._sudo_write_hosts") as mock_write,
):
assert _apply_total_block_hosts() is True
written = mock_write.call_args.args[0]
assert _HOSTS_BLOCK_BEGIN in written
assert _HOSTS_BLOCK_END in written
assert "steamcommunity.com" in written
def test_already_present_is_noop(self, paths: _Paths) -> None:
paths.hosts_file.write_text(
f"127.0.0.1 localhost\n{_HOSTS_BLOCK_BEGIN}"
f"0.0.0.0 x.com\n{_HOSTS_BLOCK_END}",
encoding="utf-8",
)
with patch(f"{PKG}._sudo_write_hosts") as mock_write:
assert _apply_total_block_hosts() is True
mock_write.assert_not_called()
def test_missing_hosts_file_returns_false(self) -> None:
assert _apply_total_block_hosts() is False
def test_write_failure_still_reenables_protection(self, paths: _Paths) -> None:
paths.hosts_file.write_text("127.0.0.1 localhost\n", encoding="utf-8")
with (
patch(f"{PKG}._disable_hosts_protection"),
patch(f"{PKG}._enable_hosts_protection") as mock_enable,
patch(f"{PKG}._sudo_write_hosts", side_effect=OSError),
):
assert _apply_total_block_hosts() is False
mock_enable.assert_called_once()
class TestRemoveTotalBlockHosts:
def test_removes_block_when_present(self, paths: _Paths) -> None:
paths.hosts_file.write_text(
f"127.0.0.1 localhost\n{_HOSTS_BLOCK_BEGIN}"
f"0.0.0.0 x.com\n{_HOSTS_BLOCK_END}"
"192.168.1.1 router\n",
encoding="utf-8",
)
with (
patch(f"{PKG}._disable_hosts_protection"),
patch(f"{PKG}._enable_hosts_protection"),
patch(f"{PKG}._sudo_write_hosts") as mock_write,
):
assert _remove_total_block_hosts() is True
written = mock_write.call_args.args[0]
assert _HOSTS_BLOCK_BEGIN not in written
assert "router" in written
assert "localhost" in written
def test_absent_is_noop(self, paths: _Paths) -> None:
paths.hosts_file.write_text("127.0.0.1 localhost\n", encoding="utf-8")
with patch(f"{PKG}._sudo_write_hosts") as mock_write:
assert _remove_total_block_hosts() is True
mock_write.assert_not_called()
def test_missing_hosts_file_returns_false(self) -> None:
assert _remove_total_block_hosts() is False
def test_write_failure_still_reenables_protection(self, paths: _Paths) -> None:
paths.hosts_file.write_text(
f"127.0.0.1 localhost\n{_HOSTS_BLOCK_BEGIN}"
f"0.0.0.0 x.com\n{_HOSTS_BLOCK_END}",
encoding="utf-8",
)
with (
patch(f"{PKG}._disable_hosts_protection"),
patch(f"{PKG}._enable_hosts_protection") as mock_enable,
patch(f"{PKG}._sudo_write_hosts", side_effect=OSError),
):
assert _remove_total_block_hosts() is False
mock_enable.assert_called_once()
# ──────────────────────────────────────────────────────────────
# IP cache
# ──────────────────────────────────────────────────────────────
class TestIpCache:
def test_load_no_file_returns_empty(self) -> None:
assert _load_cached_ips() == set()
def test_save_then_load_round_trips(self) -> None:
_save_cached_ips({"1.2.3.4", "5.6.7.8"})
assert _load_cached_ips() == {"1.2.3.4", "5.6.7.8"}
def test_load_malformed_json_returns_empty(self, paths: _Paths) -> None:
paths.ip_cache_file.parent.mkdir(parents=True, exist_ok=True)
paths.ip_cache_file.write_text("not json", encoding="utf-8")
assert _load_cached_ips() == set()
def test_load_non_list_json_returns_empty(self, paths: _Paths) -> None:
paths.ip_cache_file.parent.mkdir(parents=True, exist_ok=True)
paths.ip_cache_file.write_text(json.dumps({"a": 1}), encoding="utf-8")
assert _load_cached_ips() == set()
# ──────────────────────────────────────────────────────────────
# iptables
# ──────────────────────────────────────────────────────────────
class TestIptablesChainIntact:
def test_missing_chain_returns_false(self) -> None:
with patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=1)):
assert _iptables_chain_intact({"1.2.3.4"}) is False
def test_missing_ip_returns_false(self) -> None:
listing = MagicMock(
returncode=0,
stdout="-N STEAM_TOTAL_BLOCK\n-A STEAM_TOTAL_BLOCK -d 9.9.9.9/32 -j DROP\n",
)
with patch(f"{PKG}.subprocess.run", return_value=listing):
assert _iptables_chain_intact({"1.2.3.4"}) is False
def test_all_ips_present_and_hooked_returns_true(self) -> None:
listing = MagicMock(
returncode=0,
stdout=(
"-N STEAM_TOTAL_BLOCK\n"
"-A STEAM_TOTAL_BLOCK -d 1.2.3.4/32 -j DROP\n"
"-A STEAM_TOTAL_BLOCK -d 5.6.7.8/32 -j DROP\n"
),
)
hook_check = MagicMock(returncode=0)
with patch(f"{PKG}.subprocess.run", side_effect=[listing, hook_check]):
assert _iptables_chain_intact({"1.2.3.4", "5.6.7.8"}) is True
def test_ips_present_but_not_hooked_returns_false(self) -> None:
listing = MagicMock(
returncode=0,
stdout="-A STEAM_TOTAL_BLOCK -d 1.2.3.4/32 -j DROP\n",
)
hook_check = MagicMock(returncode=1)
with patch(f"{PKG}.subprocess.run", side_effect=[listing, hook_check]):
assert _iptables_chain_intact({"1.2.3.4"}) is False
def test_malformed_trailing_d_flag_is_ignored(self) -> None:
"""A `-d` token with nothing after it (malformed/truncated rule
line) must not index past the end of `parts`."""
listing = MagicMock(
returncode=0,
stdout="-A STEAM_TOTAL_BLOCK -j DROP -d\n",
)
with patch(f"{PKG}.subprocess.run", return_value=listing):
assert _iptables_chain_intact({"1.2.3.4"}) is False
class TestApplyTotalBlockIptables:
def test_intact_chain_short_circuits(self) -> None:
_save_cached_ips({"1.2.3.4"})
with (
patch(f"{PKG}._iptables_chain_intact", return_value=True),
patch(f"{PKG}.subprocess.run") as mock_run,
):
assert _apply_total_block_iptables() is True
mock_run.assert_not_called()
def test_rebuilds_when_not_intact(self) -> None:
with (
patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0)),
patch(
f"{PKG}.socket.getaddrinfo",
return_value=[(None, None, None, None, ("9.9.9.9", 443))],
),
):
assert _apply_total_block_iptables() is True
assert "9.9.9.9" in _load_cached_ips()
def test_dns_failure_skips_that_domain(self) -> None:
import socket as real_socket
with (
patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0)),
patch(f"{PKG}.socket.getaddrinfo", side_effect=real_socket.gaierror),
):
assert _apply_total_block_iptables() is True
assert _load_cached_ips() == set()
def test_subprocess_error_returns_false(self) -> None:
with (
patch(
f"{PKG}.subprocess.run",
side_effect=[MagicMock(returncode=0), OSError],
),
patch(f"{PKG}.socket.getaddrinfo", return_value=[]),
):
assert _apply_total_block_iptables() is False
def test_inserts_output_hook_when_missing(self) -> None:
def run_side_effect(cmd: list[str], **_kwargs: object) -> MagicMock:
if "-C" in cmd:
return MagicMock(returncode=1)
return MagicMock(returncode=0)
with (
patch(f"{PKG}.subprocess.run", side_effect=run_side_effect),
patch(f"{PKG}.socket.getaddrinfo", return_value=[]),
):
assert _apply_total_block_iptables() is True
class TestRemoveTotalBlockIptables:
def test_removes_chain_and_cache(self, paths: _Paths) -> None:
_save_cached_ips({"1.2.3.4"})
with patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0)):
assert _remove_total_block_iptables() is True
assert not paths.ip_cache_file.exists()
def test_no_cache_file_is_fine(self) -> None:
with patch(f"{PKG}.subprocess.run", return_value=MagicMock(returncode=0)):
assert _remove_total_block_iptables() is True
def test_subprocess_error_returns_false(self) -> None:
with patch(f"{PKG}.subprocess.run", side_effect=OSError):
assert _remove_total_block_iptables() is False
# ──────────────────────────────────────────────────────────────
# Public lifecycle API
# ──────────────────────────────────────────────────────────────
class TestStartTotalBlock:
def test_success(self) -> None:
with (
patch(
f"{PKG}.subprocess.run",
return_value=MagicMock(returncode=0, stderr=""),
),
patch(f"{PKG}._kill_steam_and_launchers", return_value=[]),
patch(f"{PKG}._uninstall_steam_package", return_value=True),
patch(f"{PKG}._apply_total_block_hosts", return_value=True),
patch(f"{PKG}._apply_total_block_iptables", return_value=True),
patch(f"{PKG}.flush_dns_cache"),
):
assert start_total_block(1) is True
def test_package_block_start_failure_aborts(self) -> None:
with patch(
f"{PKG}.subprocess.run",
return_value=MagicMock(returncode=1, stderr="guardctl error"),
):
assert start_total_block(1) is False
def test_best_effort_steps_dont_block_success(self) -> None:
"""Even if kill/uninstall/hosts/iptables all fail, the lock
registering successfully is what start_total_block reports."""
with (
patch(
f"{PKG}.subprocess.run",
return_value=MagicMock(returncode=0, stderr=""),
),
patch(f"{PKG}._kill_steam_and_launchers", return_value=[]),
patch(f"{PKG}._uninstall_steam_package", return_value=False),
patch(f"{PKG}._apply_total_block_hosts", return_value=False),
patch(f"{PKG}._apply_total_block_iptables", return_value=False),
patch(f"{PKG}.flush_dns_cache"),
):
assert start_total_block(1) is True
def test_logs_when_processes_were_killed(self) -> None:
with (
patch(
f"{PKG}.subprocess.run",
return_value=MagicMock(returncode=0, stderr=""),
),
patch(f"{PKG}._kill_steam_and_launchers", return_value=[(1, "steam")]),
patch(f"{PKG}._uninstall_steam_package", return_value=True),
patch(f"{PKG}._apply_total_block_hosts", return_value=True),
patch(f"{PKG}._apply_total_block_iptables", return_value=True),
patch(f"{PKG}.flush_dns_cache"),
):
assert start_total_block(1) is True
class TestEnforceTotalBlockTick:
def test_reinstalls_steam_if_reappeared(self) -> None:
with (
patch(f"{PKG}._kill_steam_and_launchers", return_value=[]),
patch(f"{PKG}._is_steam_installed", return_value=True),
patch(f"{PKG}._uninstall_steam_package") as mock_uninstall,
patch(f"{PKG}._apply_total_block_hosts", return_value=True),
patch(f"{PKG}._apply_total_block_iptables", return_value=True),
):
enforce_total_block_tick()
mock_uninstall.assert_called_once()
def test_no_reinstall_when_steam_absent(self) -> None:
with (
patch(f"{PKG}._kill_steam_and_launchers", return_value=[]),
patch(f"{PKG}._is_steam_installed", return_value=False),
patch(f"{PKG}._uninstall_steam_package") as mock_uninstall,
patch(f"{PKG}._apply_total_block_hosts", return_value=True),
patch(f"{PKG}._apply_total_block_iptables", return_value=True),
):
enforce_total_block_tick()
mock_uninstall.assert_not_called()
class TestEndTotalBlockCleanup:
def test_ends_lock_and_removes_blocks(self) -> None:
with (
patch(
f"{PKG}.subprocess.run",
return_value=MagicMock(returncode=0, stderr=""),
),
patch(f"{PKG}._remove_total_block_hosts", return_value=True) as mock_hosts,
patch(f"{PKG}._remove_total_block_iptables", return_value=True) as mock_ipt,
patch(f"{PKG}.flush_dns_cache"),
):
end_total_block_cleanup()
mock_hosts.assert_called_once()
mock_ipt.assert_called_once()
def test_package_block_end_failure_still_cleans_up_rest(self) -> None:
with (
patch(
f"{PKG}.subprocess.run",
return_value=MagicMock(returncode=1, stderr="already ended"),
),
patch(f"{PKG}._remove_total_block_hosts", return_value=True) as mock_hosts,
patch(f"{PKG}._remove_total_block_iptables", return_value=True) as mock_ipt,
patch(f"{PKG}.flush_dns_cache"),
):
end_total_block_cleanup()
mock_hosts.assert_called_once()
mock_ipt.assert_called_once()
def test_hosts_removal_failure_is_logged_not_raised(self) -> None:
with (
patch(
f"{PKG}.subprocess.run",
return_value=MagicMock(returncode=0, stderr=""),
),
patch(f"{PKG}._remove_total_block_hosts", return_value=False),
patch(f"{PKG}._remove_total_block_iptables", return_value=True),
patch(f"{PKG}.flush_dns_cache"),
):
end_total_block_cleanup() # must not raise
def test_iptables_removal_failure_is_logged_not_raised(self) -> None:
with (
patch(
f"{PKG}.subprocess.run",
return_value=MagicMock(returncode=0, stderr=""),
),
patch(f"{PKG}._remove_total_block_hosts", return_value=True),
patch(f"{PKG}._remove_total_block_iptables", return_value=False),
patch(f"{PKG}.flush_dns_cache"),
):
end_total_block_cleanup() # must not raise
# Sanity: the module-level chain name constant is what everything above
# assumes when constructing fake iptables -S output.
def test_iptables_chain_name_constant() -> None:
assert IPTABLES_CHAIN == "STEAM_TOTAL_BLOCK"