diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e4a8913..18b3cf1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -64,6 +64,7 @@ repos: - id: ruff args: - --fix + - --unsafe-fixes - --exit-non-zero-on-fix - --show-fixes types_or: [python, pyi] diff --git a/linux_configuration/hosts/install.sh b/linux_configuration/hosts/install.sh index 6004125..a7a7171 100755 --- a/linux_configuration/hosts/install.sh +++ b/linux_configuration/hosts/install.sh @@ -187,6 +187,27 @@ elif ! grep -q '^ReadEtcHosts=yes' "$RESOLVED_CONF" 2>/dev/null; then sudo systemctl restart systemd-resolved fi +# ============================================================================ +# TEMPORARILY DISABLE HOSTS GUARD PROTECTIONS FOR INSTALLATION +# ============================================================================ +# The guard system uses a read-only bind mount and path watcher that prevent +# any writes to /etc/hosts. We must stop them before installing, then restart. +GUARD_SERVICES_STOPPED=0 + +for svc in hosts-bind-mount.service hosts-guard.path; do + if systemctl is-active --quiet "$svc" 2>/dev/null; then + echo "Stopping $svc for installation..." + systemctl stop "$svc" 2>/dev/null || true + GUARD_SERVICES_STOPPED=1 + fi +done + +# If bind mount is still active, unmount it +if findmnt /etc/hosts >/dev/null 2>&1; then + echo "Unmounting read-only bind mount on /etc/hosts..." + umount /etc/hosts 2>/dev/null || mount -o remount,rw,bind /etc/hosts 2>/dev/null || true +fi + # Remove all attributes from /etc/hosts to allow modifications sudo chattr -i -a /etc/hosts 2>/dev/null || true @@ -372,6 +393,11 @@ tee -a /etc/hosts >/dev/null <<'EOF' 0.0.0.0 invidious.io.lol # Steam Store +0.0.0.0 store.steampowered.com +0.0.0.0 checkout.steampowered.com +0.0.0.0 store.akamai.steamstatic.com +0.0.0.0 storefront.steampowered.com +0.0.0.0 store.cloudflare.steamstatic.com # Discord - media allowed # 0.0.0.0 cdn.discordapp.com @@ -610,6 +636,24 @@ sudo chmod 644 /etc/hosts # Make the file immutable and append-only for maximum protection sudo chattr +ia /etc/hosts +# ============================================================================ +# RESTART HOSTS GUARD SERVICES +# ============================================================================ +if [[ $GUARD_SERVICES_STOPPED -eq 1 ]]; then + echo "Restarting hosts guard services..." + # Update the canonical copy so the guard doesn't revert our changes + if [[ -f /usr/local/share/locked-hosts ]]; then + cp /etc/hosts /usr/local/share/locked-hosts + echo " Updated canonical snapshot." + fi + for svc in hosts-bind-mount.service hosts-guard.path; do + if systemctl is-enabled --quiet "$svc" 2>/dev/null; then + systemctl start "$svc" 2>/dev/null || true + echo " Restarted $svc" + fi + done +fi + # ============================================================================ # SAVE CUSTOM ENTRIES STATE FOR FUTURE PROTECTION CHECKS # ============================================================================ diff --git a/phone_focus_mode/config.sh b/phone_focus_mode/config.sh index 9551858..985577e 100755 --- a/phone_focus_mode/config.sh +++ b/phone_focus_mode/config.sh @@ -18,11 +18,10 @@ export RADIUS=250 export HYSTERESIS=50 # --- Location check interval in seconds --- -export CHECK_INTERVAL=60 - -# --- Fail-safe: if location unavailable for this many consecutive checks, -# switch to unrestricted mode to avoid locking user out --- -export MAX_LOCATION_FAILS=5 +# When focus mode is ON (at home): check frequently (phone can charge). +# When focus mode is OFF (away): check less often to save battery. +export CHECK_INTERVAL_FOCUS=30 +export CHECK_INTERVAL_NORMAL=120 # --- Log file --- export LOG_FILE="/data/local/tmp/focus_mode/focus_mode.log" diff --git a/phone_focus_mode/deploy.sh b/phone_focus_mode/deploy.sh index dcd230e..2bee1fb 100755 --- a/phone_focus_mode/deploy.sh +++ b/phone_focus_mode/deploy.sh @@ -54,18 +54,18 @@ check_adb() { check_coords() { local lat lon - lat="$(grep '^HOME_LAT=' "$SCRIPT_DIR/config.sh" | cut -d'"' -f2)" - lon="$(grep '^HOME_LON=' "$SCRIPT_DIR/config.sh" | cut -d'"' -f2)" + lat="$(grep '^.*HOME_LAT=' "$SCRIPT_DIR/config.sh" "$SCRIPT_DIR/config_secrets.sh" 2>/dev/null | tail -1 | cut -d'"' -f2)" + lon="$(grep '^.*HOME_LON=' "$SCRIPT_DIR/config.sh" "$SCRIPT_DIR/config_secrets.sh" 2>/dev/null | tail -1 | cut -d'"' -f2)" + # Allow redacted values locally - real coords live only on the phone if [ "$lat" = "0.000000" ] && [ "$lon" = "0.000000" ]; then - echo "ERROR: You must set your home coordinates in config.sh before deploying!" - echo "" - echo " 1. Find your coords on Google Maps (right-click your apartment)" - echo " 2. Edit phone_focus_mode/config_secrets.sh:" - echo " HOME_LAT=\"-48.876667\"" - echo " HOME_LON=\"-123.393333\"" + echo "ERROR: Home coordinates not set (all zeros). Set them in config_secrets.sh." exit 1 fi - echo " Home location: $lat, $lon" + if [ -z "$lat" ] || [ -z "$lon" ]; then + echo " Home location: (not set locally - will use values on phone)" + else + echo " Home location: $lat, $lon" + fi } check_ip() { @@ -119,40 +119,40 @@ do_deploy() { adb_root "chmod 777 /data/local/tmp/focus_stage" echo "[4/6] Uploading scripts..." - adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/config.sh" "/data/local/tmp/focus_stage/config.sh" - adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/focus_daemon.sh" "/data/local/tmp/focus_stage/focus_daemon.sh" - adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/focus_ctl.sh" "/data/local/tmp/focus_stage/focus_ctl.sh" - adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/magisk_service.sh" "/data/local/tmp/focus_stage/99-focus-mode.sh" + adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/config.sh" "/data/local/tmp/focus_stage/config.sh" + adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/focus_daemon.sh" "/data/local/tmp/focus_stage/focus_daemon.sh" + adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/focus_ctl.sh" "/data/local/tmp/focus_stage/focus_ctl.sh" + adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/magisk_service.sh" "/data/local/tmp/focus_stage/99-focus-mode.sh" + + # Only push config_secrets.sh if phone doesn't already have one + if adb_root "test -f $REMOTE_DIR/config_secrets.sh" 2>/dev/null; then + echo " config_secrets.sh already exists on phone - skipping (preserving real coords)" + else + echo " Pushing config_secrets.sh (first install)..." + adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/config_secrets.sh" "/data/local/tmp/focus_stage/config_secrets.sh" + adb_root "cp /data/local/tmp/focus_stage/config_secrets.sh $REMOTE_DIR/config_secrets.sh" + fi # Move staged files into place with root - adb_root "cp /data/local/tmp/focus_stage/config.sh $REMOTE_DIR/config.sh" - adb_root "cp /data/local/tmp/focus_stage/focus_daemon.sh $REMOTE_DIR/focus_daemon.sh" - adb_root "cp /data/local/tmp/focus_stage/focus_ctl.sh $REMOTE_DIR/focus_ctl.sh" - adb_root "cp /data/local/tmp/focus_stage/99-focus-mode.sh /data/adb/service.d/99-focus-mode.sh" + adb_root "cp /data/local/tmp/focus_stage/config.sh $REMOTE_DIR/config.sh" + adb_root "cp /data/local/tmp/focus_stage/focus_daemon.sh $REMOTE_DIR/focus_daemon.sh" + adb_root "cp /data/local/tmp/focus_stage/focus_ctl.sh $REMOTE_DIR/focus_ctl.sh" + adb_root "cp /data/local/tmp/focus_stage/99-focus-mode.sh /data/adb/service.d/99-focus-mode.sh" adb_root "rm -rf /data/local/tmp/focus_stage" echo "[5/6] Setting permissions..." - adb_root "chmod 755 $REMOTE_DIR/config.sh $REMOTE_DIR/focus_daemon.sh $REMOTE_DIR/focus_ctl.sh" + adb_root "chmod 755 $REMOTE_DIR/config.sh $REMOTE_DIR/focus_daemon.sh $REMOTE_DIR/focus_ctl.sh" || true adb_root "chmod 755 /data/adb/service.d/99-focus-mode.sh" adb_root "touch $REMOTE_DIR/disabled_by_focus.txt" adb_root "touch $REMOTE_DIR/focus_mode.log" echo "[6/6] Starting daemon..." - # Kill existing daemon via pidfile to avoid hitting the ADB shell process - adb_root " - PIDFILE=$REMOTE_DIR/daemon.pid - if [ -f \"\$PIDFILE\" ]; then - OLD_PID=\$(cat \"\$PIDFILE\") - kill -9 \"\$OLD_PID\" 2>/dev/null - rm -f \"\$PIDFILE\" - fi - # Also kill any stray instances - for p in \$(pgrep -f focus_daemon.sh 2>/dev/null); do kill -9 \$p 2>/dev/null; done - sleep 1 - setsid sh $REMOTE_DIR/focus_daemon.sh /dev/null 2>&1 & - echo \$! - " - sleep 3 + # Stop existing daemon, then start fresh + adb_root "kill \$(cat $REMOTE_DIR/daemon.pid 2>/dev/null) 2>/dev/null; true" + sleep 1 + adb_root "rm -f $REMOTE_DIR/daemon.pid" + adb -s "$PHONE_IP:5555" shell su -c 'sh /data/local/tmp/focus_mode/focus_daemon.sh /dev/null 2>/dev/null &' + sleep 4 echo "" echo "=== Deploy complete! ===" diff --git a/phone_focus_mode/focus_daemon.sh b/phone_focus_mode/focus_daemon.sh index 3cd95d6..5e48e77 100755 --- a/phone_focus_mode/focus_daemon.sh +++ b/phone_focus_mode/focus_daemon.sh @@ -66,8 +66,8 @@ init() { CURRENT_MODE="normal" fi - LOCATION_FAIL_COUNT=0 log "Focus mode daemon started (PID=$$, mode=$CURRENT_MODE, home=$HOME_LAT,$HOME_LON, radius=${RADIUS}m)" + log "Intervals: focus=${CHECK_INTERVAL_FOCUS}s normal=${CHECK_INTERVAL_NORMAL}s" } # ---- Location ---- @@ -194,19 +194,18 @@ main() { disable_focus_mode fi - LOCATION_FAIL_COUNT=0 log "Location: $lat,$lon | Distance: ${distance}m | Threshold: ${threshold}m | Mode: $CURRENT_MODE" else - LOCATION_FAIL_COUNT=$((LOCATION_FAIL_COUNT + 1)) - log "Location unavailable (attempt $LOCATION_FAIL_COUNT/$MAX_LOCATION_FAILS)" - - if [ "$LOCATION_FAIL_COUNT" -ge "$MAX_LOCATION_FAILS" ]; then - log "FAIL-SAFE: Location unavailable too long, switching to normal mode" - disable_focus_mode - fi + log "Location unavailable - defaulting to focus mode (restrictions ON)" + enable_focus_mode fi - sleep "$CHECK_INTERVAL" + # Dynamic interval: shorter at home (can charge), longer away (save battery) + if [ "$CURRENT_MODE" = "focus" ]; then + sleep "$CHECK_INTERVAL_FOCUS" + else + sleep "$CHECK_INTERVAL_NORMAL" + fi done } diff --git a/python_pkg/steam_backlog_enforcer/README.md b/python_pkg/steam_backlog_enforcer/README.md new file mode 100644 index 0000000..2ed4598 --- /dev/null +++ b/python_pkg/steam_backlog_enforcer/README.md @@ -0,0 +1,55 @@ +# Steam Backlog Enforcer + +Forces you to 100% complete one Steam game at a time before moving on. + +## Features + +- **Achievement tracking**: Picks the next game by shortest HLTB completionist time +- **Store blocking**: Blocks `store.steampowered.com` via `/etc/hosts` +- **Game uninstalling**: Removes all installed games except the assigned one +- **Process enforcement**: Kills unauthorized game processes +- **Tampering detection**: Detects achievement unlocks on non-assigned games +- **HLTB integration**: Estimates completion time with persistent cache + +## Setup + +```bash +python -m python_pkg.steam_backlog_enforcer.main setup +``` + +## Commands + +| Command | Description | +| ----------- | ------------------------------------------ | +| `scan` | Scan library, fetch HLTB data, assign game | +| `check` | Check if assigned game is complete | +| `status` | Show current assignment and blocking | +| `list` | List incomplete games from snapshot | +| `skip` | Skip the currently assigned game | +| `enforce` | Run enforcer (block, uninstall, kill) | +| `unblock` | Remove store blocking | +| `reset` | Reset all state | +| `installed` | List currently installed Steam games | +| `uninstall` | Interactively uninstall non-assigned games | +| `setup` | First-time configuration | + +## Enforce mode + +```bash +sudo python -m python_pkg.steam_backlog_enforcer.main enforce +``` + +This will: + +1. Block Steam store in `/etc/hosts` +2. Uninstall all games except the assigned one +3. Continuously kill any unauthorized game processes + +## Game Uninstall + +Directly removes appmanifest files and game directories from `~/.local/share/Steam/steamapps/`. +Preserves Proton versions and Steam Linux Runtime. + +```bash +python -m python_pkg.steam_backlog_enforcer.main uninstall +``` diff --git a/python_pkg/steam_backlog_enforcer/__init__.py b/python_pkg/steam_backlog_enforcer/__init__.py new file mode 100644 index 0000000..57c2864 --- /dev/null +++ b/python_pkg/steam_backlog_enforcer/__init__.py @@ -0,0 +1 @@ +"""Steam Backlog Enforcer - forces you to finish your Steam games.""" diff --git a/python_pkg/steam_backlog_enforcer/config.py b/python_pkg/steam_backlog_enforcer/config.py new file mode 100644 index 0000000..cd670b0 --- /dev/null +++ b/python_pkg/steam_backlog_enforcer/config.py @@ -0,0 +1,114 @@ +"""Configuration management for Steam Backlog Enforcer.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +import json +from pathlib import Path +import sys +from typing import Any + +CONFIG_DIR = Path.home() / ".config" / "steam_backlog_enforcer" +CONFIG_FILE = CONFIG_DIR / "config.json" +STATE_FILE = CONFIG_DIR / "state.json" +SNAPSHOT_FILE = CONFIG_DIR / "snapshot.json" +LOG_FILE = CONFIG_DIR / "enforcer.log" + +# Steam store domains to block. +BLOCKED_DOMAINS = [ + "store.steampowered.com", + "checkout.steampowered.com", + "store.akamai.steamstatic.com", + "storefront.steampowered.com", + "store.cloudflare.steamstatic.com", +] + +HOSTS_FILE = Path("/etc/hosts") + + +@dataclass +class Config: + """User configuration.""" + + steam_api_key: str = "" + steam_id: str = "" + skip_app_ids: list[int] = field(default_factory=list) + block_store: bool = True + kill_unauthorized_games: bool = True + uninstall_other_games: bool = True + desktop_notifications: bool = True + + def save(self) -> None: + """Persist config to disk.""" + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + CONFIG_FILE.write_text( + json.dumps(self.__dict__, indent=2) + "\n", encoding="utf-8" + ) + + @classmethod + def load(cls) -> Config: + """Load config from disk, or return defaults.""" + if CONFIG_FILE.exists(): + data = json.loads(CONFIG_FILE.read_text(encoding="utf-8")) + return cls( + **{k: v for k, v in data.items() if k in cls.__dataclass_fields__} + ) + return cls() + + +@dataclass +class State: + """Persistent state across runs.""" + + current_app_id: int | None = None + current_game_name: str = "" + finished_app_ids: list[int] = field(default_factory=list) + + def save(self) -> None: + """Persist state to disk.""" + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + STATE_FILE.write_text( + json.dumps(self.__dict__, indent=2) + "\n", encoding="utf-8" + ) + + @classmethod + def load(cls) -> State: + """Load state from disk, or return defaults.""" + if STATE_FILE.exists(): + data = json.loads(STATE_FILE.read_text(encoding="utf-8")) + return cls( + **{k: v for k, v in data.items() if k in cls.__dataclass_fields__} + ) + return cls() + + +def save_snapshot(data: list[dict[str, Any]]) -> None: + """Save an achievement snapshot to disk.""" + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + SNAPSHOT_FILE.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8") + + +def load_snapshot() -> list[dict[str, Any]] | None: + """Load the cached achievement snapshot, or None if absent.""" + if SNAPSHOT_FILE.exists(): + result: list[dict[str, Any]] = json.loads( + SNAPSHOT_FILE.read_text(encoding="utf-8") + ) + return result + return None + + +def interactive_setup() -> Config: + """Run first-time interactive setup.""" + api_key = input("Enter your Steam Web API key: ").strip() + if not api_key: + sys.exit(1) + + steam_id = input("Enter your Steam64 ID: ").strip() + if not steam_id: + sys.exit(1) + + config = Config(steam_api_key=api_key, steam_id=steam_id) + config.save() + CONFIG_FILE.chmod(0o600) + return config diff --git a/python_pkg/steam_backlog_enforcer/enforcer.py b/python_pkg/steam_backlog_enforcer/enforcer.py new file mode 100644 index 0000000..54d1206 --- /dev/null +++ b/python_pkg/steam_backlog_enforcer/enforcer.py @@ -0,0 +1,89 @@ +"""Enforce that only the assigned game may run.""" + +from __future__ import annotations + +import logging +import os +from pathlib import Path +import shutil +import signal +import subprocess + +logger = logging.getLogger(__name__) + + +def get_running_steam_game_pids() -> dict[int, int]: + """Scan /proc to find running Steam game processes. + + Returns: dict mapping PID -> SteamAppId. + """ + running: dict[int, int] = {} + proc = Path("/proc") + + for entry in proc.iterdir(): + if not entry.name.isdigit(): + continue + try: + environ = (entry / "environ").read_bytes() + pairs = environ.split(b"\x00") + for pair in pairs: + if pair.startswith(b"SteamAppId="): + value = pair.split(b"=", 1)[1].decode("utf-8", errors="replace") + if value.isdigit(): + running[int(entry.name)] = int(value) + break + except (PermissionError, OSError, ValueError): + continue + + return running + + +def enforce_allowed_game( + allowed_app_id: int | None, + *, + kill_unauthorized: bool = True, +) -> list[tuple[int, int]]: + """Check running games; optionally kill unauthorized ones. + + Returns list of (pid, app_id) that were killed or detected. + """ + running = get_running_steam_game_pids() + violations: list[tuple[int, int]] = [] + + for pid, app_id in running.items(): + if allowed_app_id is not None and app_id == allowed_app_id: + continue + # Skip Steam client itself (app_id 0 or very low IDs). + if app_id == 0: + continue + + violations.append((pid, app_id)) + if kill_unauthorized: + kill_process(pid, app_id) + + return violations + + +def kill_process(pid: int, app_id: int) -> None: + """Kill a process by PID.""" + try: + logger.warning("Killing unauthorized game (AppID=%d, PID=%d)", app_id, pid) + os.kill(pid, signal.SIGTERM) + except ProcessLookupError: + logger.debug("Process %d already gone.", pid) + except PermissionError: + logger.exception("No permission to kill PID %d.", pid) + + +def send_notification(title: str, body: str) -> None: + """Send a desktop notification.""" + _notify_send = shutil.which("notify-send") or "/usr/bin/notify-send" + try: + subprocess.run( + [_notify_send, title, body, "--icon=dialog-warning"], + capture_output=True, + timeout=5, + check=False, + ) + except (FileNotFoundError, OSError): + logger.debug("notify-send not available.") diff --git a/python_pkg/steam_backlog_enforcer/hltb.py b/python_pkg/steam_backlog_enforcer/hltb.py new file mode 100644 index 0000000..4e5b4b2 --- /dev/null +++ b/python_pkg/steam_backlog_enforcer/hltb.py @@ -0,0 +1,125 @@ +"""HowLongToBeat integration for estimating game completion times.""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +import json +import logging + +from howlongtobeatpy import HowLongToBeat + +from python_pkg.steam_backlog_enforcer.config import CONFIG_DIR + +logger = logging.getLogger(__name__) + +HLTB_CACHE_FILE = CONFIG_DIR / "hltb_cache.json" +MAX_CONCURRENT = 30 +MIN_SIMILARITY = 0.5 + + +@dataclass +class HLTBResult: + """Result from a HowLongToBeat lookup.""" + + app_id: int + game_name: str + completionist_hours: float + similarity: float + + +def load_hltb_cache() -> dict[int, float]: + """Load the persistent HLTB cache from disk. + + Returns: dict mapping app_id -> completionist_hours. + """ + if HLTB_CACHE_FILE.exists(): + try: + data = json.loads(HLTB_CACHE_FILE.read_text(encoding="utf-8")) + return {int(k): float(v) for k, v in data.items()} + except (json.JSONDecodeError, ValueError, OSError): + logger.warning("Corrupt HLTB cache, starting fresh.") + return {} + + +def save_hltb_cache(cache: dict[int, float]) -> None: + """Save the HLTB cache to disk.""" + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + try: + HLTB_CACHE_FILE.write_text( + json.dumps({str(k): v for k, v in cache.items()}, indent=2) + "\n", + encoding="utf-8", + ) + except OSError: + logger.exception("Failed to save HLTB cache") + + +async def _search_one( + sem: asyncio.Semaphore, app_id: int, name: str +) -> HLTBResult | None: + """Search HLTB for a single game.""" + async with sem: + try: + results = await HowLongToBeat().async_search(name) + if results: + best = max(results, key=lambda r: r.similarity) + if best.similarity >= MIN_SIMILARITY: + comp = best.completionist + if comp and comp > 0: + return HLTBResult( + app_id=app_id, + game_name=name, + completionist_hours=comp, + similarity=best.similarity, + ) + except (OSError, ValueError, TypeError, AttributeError) as e: + logger.debug("HLTB search failed for '%s': %s", name, e) + return None + + +async def _fetch_batch( + games: list[tuple[int, str]], +) -> list[HLTBResult]: + """Fetch HLTB data for a batch of games concurrently.""" + sem = asyncio.Semaphore(MAX_CONCURRENT) + tasks = [_search_one(sem, app_id, name) for app_id, name in games] + results = await asyncio.gather(*tasks) + return [r for r in results if r is not None] + + +def fetch_hltb_times(games: list[tuple[int, str]]) -> list[HLTBResult]: + """Synchronous wrapper: fetch HLTB times for games.""" + if not games: + return [] + return asyncio.run(_fetch_batch(games)) + + +def fetch_hltb_times_cached( + games: list[tuple[int, str]], +) -> dict[int, float]: + """Fetch HLTB times, using disk cache for already-known games. + + Returns: dict mapping app_id -> completionist_hours. + """ + cache = load_hltb_cache() + uncached = [(app_id, name) for app_id, name in games if app_id not in cache] + + if uncached: + logger.info( + "Fetching HLTB data for %d uncached games (out of %d total)...", + len(uncached), + len(games), + ) + results = fetch_hltb_times(uncached) + for r in results: + cache[r.app_id] = r.completionist_hours + # Also cache misses as -1 so we don't re-fetch them. + found_ids = {r.app_id for r in results} + for app_id, _ in uncached: + if app_id not in found_ids: + cache[app_id] = -1 + save_hltb_cache(cache) + else: + logger.info("All %d games found in HLTB cache.", len(games)) + + return cache diff --git a/python_pkg/steam_backlog_enforcer/install.sh b/python_pkg/steam_backlog_enforcer/install.sh new file mode 100755 index 0000000..78c9762 --- /dev/null +++ b/python_pkg/steam_backlog_enforcer/install.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash +# Install script for Steam Backlog Enforcer. +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/../../.." && pwd)" + +echo "=== Steam Backlog Enforcer Installer ===" +echo + +# Install Python deps. +echo "Installing Python dependencies..." +pip3 install --break-system-packages requests howlongtobeatpy 2>/dev/null \ + || pip3 install requests howlongtobeatpy + +# Install systemd service (system-level, runs as root). +read -rp "Install systemd enforce service? [y/N] " ans +if [[ "${ans,,}" == "y" ]]; then + if [[ $EUID -ne 0 ]]; then + echo "Error: systemd service install needs root. Re-run with sudo." + exit 1 + fi + + SERVICE_SRC="$SCRIPT_DIR/steam-backlog-enforcer.service" + SERVICE_DST="/etc/systemd/system/steam-backlog-enforcer.service" + + # Set the correct working directory in the service file. + sed "s|WorkingDirectory=.*|WorkingDirectory=$REPO_ROOT|" "$SERVICE_SRC" \ + > "$SERVICE_DST" + + systemctl daemon-reload + systemctl enable steam-backlog-enforcer + echo "Service installed and enabled." + echo " Start now: sudo systemctl start steam-backlog-enforcer" + echo " Check: sudo systemctl status steam-backlog-enforcer" + echo " Logs: sudo journalctl -u steam-backlog-enforcer -f" +fi + +echo +echo "Done! Run manually with:" +echo " sudo python3 -m python_pkg.steam_backlog_enforcer.main enforce" diff --git a/python_pkg/steam_backlog_enforcer/library_hider.py b/python_pkg/steam_backlog_enforcer/library_hider.py new file mode 100644 index 0000000..5284bfb --- /dev/null +++ b/python_pkg/steam_backlog_enforcer/library_hider.py @@ -0,0 +1,354 @@ +"""Hide / unhide games in the Steam library via sharedconfig.vdf. + +Steam stores per-app settings (including the "hidden" flag) in +``userdata//7/remote/sharedconfig.vdf`` under the path: + + UserRoamingConfigStore > Software > Valve > Steam > apps > + +Setting ``"hidden" "1"`` makes the game invisible in the default +library view. This module provides functions to bulk-hide every owned +game *except* the currently assigned one, and to unhide them all when +enforcement is lifted. + +Steam must be restarted (or not running) for the changes to take effect, +because it overwrites the file on exit. +""" + +from __future__ import annotations + +import contextlib +import logging +import os +from pathlib import Path +import pwd +import re +import shutil +import subprocess +from typing import Any + +logger = logging.getLogger(__name__) + +# Steam user-data paths. +_STEAM_DIR = Path.home() / ".local" / "share" / "Steam" +_USERDATA_DIR = _STEAM_DIR / "userdata" +_SHARED_CONFIG_REL = Path("7") / "remote" / "sharedconfig.vdf" + + +# ────────────────────────────────────────────────────────────── +# Minimal VDF parser / writer +# ────────────────────────────────────────────────────────────── + + +def _parse_vdf(text: str) -> dict[str, Any]: + """Parse a Valve VDF text file into nested dicts. + + Only handles the subset used by sharedconfig.vdf (string values and + nested sections). + """ + tokens: list[str] = [] + for m in re.finditer(r'"([^"]*)"|\{|\}', text): + if m.group(1) is not None: + tokens.append(m.group(1)) + else: + tokens.append(m.group(0)) # "{" or "}" + idx = 0 + + def _parse_obj() -> dict[str, Any]: + nonlocal idx + obj: dict[str, Any] = {} + while idx < len(tokens): + token = tokens[idx] + if token == "}": # noqa: S105 + idx += 1 + return obj + # Key. + key = token + idx += 1 + if idx >= len(tokens): + break + # Value: either a string or a nested object. + nxt = tokens[idx] + if nxt == "{": + idx += 1 + obj[key] = _parse_obj() + elif nxt == "}": + # Key without value right before closing brace — skip. + obj[key] = "" + # Don't advance; let the outer loop consume '}'. + else: + obj[key] = nxt + idx += 1 + return obj + + return _parse_obj() + + +def _write_vdf(data: dict[str, Any], indent: int = 0) -> str: + """Serialize a nested dict back to VDF text.""" + lines: list[str] = [] + prefix = "\t" * indent + + for key, value in data.items(): + if isinstance(value, dict): + lines.append(f'{prefix}"{key}"') + lines.append(f"{prefix}{{") + lines.append(_write_vdf(value, indent + 1)) + lines.append(f"{prefix}}}") + else: + lines.append(f'{prefix}"{key}"\t\t"{value}"') + + return "\n".join(lines) + + +# ────────────────────────────────────────────────────────────── +# Discover Steam user IDs on this machine +# ────────────────────────────────────────────────────────────── + + +def _find_user_dirs() -> list[Path]: + """Return paths to all numeric userdata directories except '0'.""" + if not _USERDATA_DIR.is_dir(): + return [] + return [p for p in _USERDATA_DIR.iterdir() if p.name.isdigit() and p.name != "0"] + + +# ────────────────────────────────────────────────────────────── +# Hide / unhide logic +# ────────────────────────────────────────────────────────────── + + +def _get_apps_section( + vdf_data: dict[str, Any], +) -> dict[str, Any] | None: + """Navigate to the ``apps`` dict inside the VDF tree.""" + try: + steam_section = vdf_data["UserRoamingConfigStore"]["Software"]["Valve"]["Steam"] + if "apps" not in steam_section: + steam_section["apps"] = {} + except (KeyError, TypeError): + return None + else: + result: dict[str, Any] = steam_section["apps"] + return result + + +def _hide_games_in_profile( + config_path: Path, + user_dir: Path, + owned_app_ids: list[int], + allowed_app_id: int | None, +) -> int: + """Hide games in a single Steam user profile. + + Args: + config_path: Path to the sharedconfig.vdf file. + user_dir: Path to the user's data directory. + owned_app_ids: List of owned game app IDs. + allowed_app_id: App ID of the game that should remain visible. + + Returns: + Number of games hidden in this profile. + """ + # Back up the original. + backup = config_path.with_suffix(".vdf.bak") + if not backup.exists(): + shutil.copy2(config_path, backup) + + text = config_path.read_text(encoding="utf-8") + vdf_data = _parse_vdf(text) + apps = _get_apps_section(vdf_data) + if apps is None: + logger.warning("Could not find apps section in %s", config_path) + return 0 + + hidden_count = _apply_hide_flags(apps, owned_app_ids, allowed_app_id) + + output = _write_vdf(vdf_data) + "\n" + config_path.write_text(output, encoding="utf-8") + _fix_ownership(config_path, user_dir) + + logger.info("Hidden %d games in profile %s", hidden_count, user_dir.name) + return hidden_count + + +def _apply_hide_flags( + apps: dict[str, Any], + owned_app_ids: list[int], + allowed_app_id: int | None, +) -> int: + """Set hidden flags on all games except the allowed one. + + Args: + apps: The VDF apps section dict. + owned_app_ids: List of owned app IDs. + allowed_app_id: App ID to keep visible. + + Returns: + Number of games newly hidden. + """ + hidden_count = 0 + for app_id in owned_app_ids: + sid = str(app_id) + if app_id == allowed_app_id: + if sid in apps and isinstance(apps[sid], dict): + apps[sid].pop("hidden", None) + continue + + if sid not in apps or not isinstance(apps[sid], dict): + apps[sid] = {} + if apps[sid].get("hidden") != "1": + apps[sid]["hidden"] = "1" + hidden_count += 1 + return hidden_count + + +def hide_other_games( + owned_app_ids: list[int], + allowed_app_id: int | None, +) -> int: + """Hide every owned game except *allowed_app_id* in the Steam library. + + Modifies ``sharedconfig.vdf`` for every local Steam user profile. + Steam must be restarted for changes to take effect. + + Returns the number of games that were hidden. + """ + user_dirs = _find_user_dirs() + if not user_dirs: + logger.warning("No Steam userdata directories found.") + return 0 + + total_hidden = 0 + + for user_dir in user_dirs: + config_path = user_dir / _SHARED_CONFIG_REL + if not config_path.exists(): + logger.debug("No sharedconfig.vdf in %s", user_dir.name) + continue + + total_hidden += _hide_games_in_profile( + config_path, user_dir, owned_app_ids, allowed_app_id + ) + + return total_hidden + + +def unhide_all_games(owned_app_ids: list[int]) -> int: + """Remove the hidden flag from all owned games. + + Returns the number of games that were unhidden. + """ + user_dirs = _find_user_dirs() + total = 0 + + for user_dir in user_dirs: + config_path = user_dir / _SHARED_CONFIG_REL + if not config_path.exists(): + continue + + text = config_path.read_text(encoding="utf-8") + vdf_data = _parse_vdf(text) + apps = _get_apps_section(vdf_data) + if apps is None: + continue + + count = 0 + for app_id in owned_app_ids: + sid = str(app_id) + if sid in apps and isinstance(apps[sid], dict): + if apps[sid].pop("hidden", None) is not None: + count += 1 + # Remove the entry entirely if it's now empty. + if not apps[sid]: + del apps[sid] + + output = _write_vdf(vdf_data) + "\n" + config_path.write_text(output, encoding="utf-8") + _fix_ownership(config_path, user_dir) + + logger.info("Unhidden %d games in profile %s", count, user_dir.name) + total += count + + return total + + +# ────────────────────────────────────────────────────────────── +# Steam restart helper +# ────────────────────────────────────────────────────────────── + + +def restart_steam() -> None: + """Gracefully restart the Steam client. + + Sends ``steam -shutdown``, waits, then launches again with ``-silent``. + """ + real_user = os.environ.get("SUDO_USER") or os.environ.get("USER") + logger.info("Restarting Steam client...") + + # Shut down Steam gracefully. + try: + _run_as_user(["steam", "-shutdown"], real_user) + except FileNotFoundError: + logger.warning("Steam executable not found for restart.") + return + + # Wait for Steam to exit. + import time + + _pgrep = shutil.which("pgrep") or "/usr/bin/pgrep" + for _ in range(30): + result = subprocess.run( + [_pgrep, "-f", "steam.sh"], + capture_output=True, + check=False, + ) + if result.returncode != 0: + break + time.sleep(1) + + # Relaunch silently. + with contextlib.suppress(FileNotFoundError): + _run_as_user(["steam", "-silent"], real_user) + + +def _run_as_user(cmd: list[str], user: str | None) -> None: + """Run a command, dropping to *user* if currently root.""" + if os.geteuid() == 0 and user and user != "root": + try: + pw = pwd.getpwnam(user) + uid = pw.pw_uid + except KeyError: + uid = 1000 + + dbus_default = f"unix:path=/run/user/{uid}/bus" + dbus_addr = os.environ.get("DBUS_SESSION_BUS_ADDRESS", dbus_default) + xauth = os.environ.get("XAUTHORITY", f"/home/{user}/.Xauthority") + full_cmd = [ + "sudo", + "-u", + user, + "env", + f"DISPLAY={os.environ.get('DISPLAY', ':0')}", + f"XAUTHORITY={xauth}", + f"DBUS_SESSION_BUS_ADDRESS={dbus_addr}", + *cmd, + ] + else: + full_cmd = cmd + + subprocess.Popen( + full_cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + +def _fix_ownership(path: Path, user_dir: Path) -> None: + """If running as root, chown the file to the user who owns user_dir.""" + if os.geteuid() != 0: + return + try: + stat = user_dir.stat() + os.chown(path, stat.st_uid, stat.st_gid) + except OSError: + pass diff --git a/python_pkg/steam_backlog_enforcer/main.py b/python_pkg/steam_backlog_enforcer/main.py new file mode 100644 index 0000000..7e0fd0e --- /dev/null +++ b/python_pkg/steam_backlog_enforcer/main.py @@ -0,0 +1,1030 @@ +"""Main CLI for Steam Backlog Enforcer.""" + +from __future__ import annotations + +import contextlib +import logging +import os +from pathlib import Path +import pwd +import re +import shutil +import subprocess +import sys +import time +from typing import Any + +from python_pkg.steam_backlog_enforcer.config import ( + Config, + State, + interactive_setup, + load_snapshot, + save_snapshot, +) +from python_pkg.steam_backlog_enforcer.enforcer import ( + enforce_allowed_game, + send_notification, +) +from python_pkg.steam_backlog_enforcer.hltb import fetch_hltb_times_cached +from python_pkg.steam_backlog_enforcer.library_hider import ( + hide_other_games, + restart_steam, + unhide_all_games, +) +from python_pkg.steam_backlog_enforcer.steam_api import GameInfo, SteamAPIClient +from python_pkg.steam_backlog_enforcer.store_blocker import ( + block_store, + is_store_blocked, + unblock_store, +) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def _echo(msg: str = "", *, end: str = "\n", flush: bool = False) -> None: + """Write user-facing CLI output to stdout. + + Args: + msg: Text to output. + end: String appended after the message. + flush: Whether to flush stdout immediately. + """ + sys.stdout.write(msg + end) + if flush: + sys.stdout.flush() + + +# Steam infrastructure app IDs that should NEVER be uninstalled. +PROTECTED_APP_IDS = { + 228980, # Steamworks Common Redistributables + 1070560, # Steam Linux Runtime 1.0 (scout) + 1391110, # Steam Linux Runtime 2.0 (soldier) + 1628350, # Steam Linux Runtime 3.0 (sniper) + 961940, # Steam Linux Runtime (legacy) + # Proton versions (never uninstall these) + 858280, # Proton 3.7 (Beta) + 930400, # Proton 3.16 (Beta) + 1054830, # Proton 4.2 + 1113280, # Proton 4.11 + 1245040, # Proton 5.0 + 1420170, # Proton 5.13 + 1580130, # Proton 6.3 + 1887720, # Proton 7.0 + 2230260, # Proton 7.0 (alt) + 2348590, # Proton 8.0 + 2805730, # Proton 9.0 + 3201940, # Proton 9.0 (alt) + 3658110, # Proton 10.0 + 2180100, # Proton Hotfix + 1493710, # Proton Experimental + 1161040, # Proton BattlEye Runtime + 1007020, # Proton EasyAntiCheat Runtime +} + +STEAMAPPS_PATH = Path("~/.local/share/Steam/steamapps").expanduser() + +_LIST_DISPLAY_LIMIT = 50 +_MIN_CLI_ARGS = 2 +_TAMPER_CHECK_LIMIT = 3 + + +# ────────────────────────────────────────────────────────────── +# Game install management +# ────────────────────────────────────────────────────────────── + + +def _get_real_user() -> str | None: + """Get the real (non-root) user when running under sudo.""" + return os.environ.get("SUDO_USER") or os.environ.get("USER") + + +def _get_uid_gid_for_user(username: str) -> tuple[int, int]: + """Get (uid, gid) for a username.""" + try: + pw = pwd.getpwnam(username) + except KeyError: + return 1000, 1000 + else: + return pw.pw_uid, pw.pw_gid + + +def is_game_installed(app_id: int) -> bool: + """Check if a game is installed by looking for its appmanifest. + + A manifest with StateFlags != 4 (FullyInstalled) means the game is + still downloading or queued, which still counts as "install triggered". + """ + manifest = STEAMAPPS_PATH / f"appmanifest_{app_id}.acf" + return manifest.exists() + + +def _ensure_steam_running() -> None: + """Start the Steam client if it is not already running.""" + # Check if any steam process is running (main client, not just helpers). + try: + result = subprocess.run( + ["pgrep", "-f", "steam.sh"], # noqa: S607 + capture_output=True, + text=True, + check=False, + ) + if result.returncode == 0: + logger.debug("Steam client already running") + return + except FileNotFoundError: + pass + + real_user = _get_real_user() + logger.info("Starting Steam client...") + + try: + if os.geteuid() == 0 and real_user and real_user != "root": + uid, _ = _get_uid_gid_for_user(real_user) + dbus_default = f"unix:path=/run/user/{uid}/bus" + dbus_addr = os.environ.get("DBUS_SESSION_BUS_ADDRESS", dbus_default) + xauth_default = f"/home/{real_user}/.Xauthority" + xauth = os.environ.get("XAUTHORITY", xauth_default) + cmd = [ + "sudo", + "-u", + real_user, + "env", + f"DISPLAY={os.environ.get('DISPLAY', ':0')}", + f"XAUTHORITY={xauth}", + f"DBUS_SESSION_BUS_ADDRESS={dbus_addr}", + "steam", + "-silent", + ] + else: + cmd = ["steam", "-silent"] + + subprocess.Popen( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + # Give Steam time to initialize and start scanning manifests. + time.sleep(15) + except FileNotFoundError: + logger.exception("Steam executable not found") + + +def install_game(app_id: int, game_name: str, steam_id: str) -> bool: + """Install a game by writing an appmanifest that triggers Steam's download. + + Creates a minimal appmanifest with StateFlags=1026 (UpdateRequired | + UpdateStarted) in the steamapps directory. The running Steam client + detects the new manifest and automatically queues the download — no + dialog or user interaction required. + + If Steam is not running it will be started in silent mode first. + + Args: + app_id: Steam application ID. + game_name: Human-readable game name. + steam_id: Steam64 ID of the account that owns the game. + + Returns True if the manifest was written successfully. + """ + label = game_name or f"AppID={app_id}" + + if is_game_installed(app_id): + logger.info("Game already installed: %s", label) + return True + + # Build a minimal appmanifest. StateFlags 1026 = UpdateRequired (2) + + # UpdateStarted (1024), which tells Steam "this app needs downloading". + manifest_content = ( + '"AppState"\n' + "{\n" + f'\t"appid"\t\t"{app_id}"\n' + '\t"universe"\t\t"1"\n' + f'\t"name"\t\t"{game_name}"\n' + '\t"StateFlags"\t\t"1026"\n' + f'\t"installdir"\t\t"{game_name}"\n' + '\t"LastUpdated"\t\t"0"\n' + '\t"LastPlayed"\t\t"0"\n' + '\t"SizeOnDisk"\t\t"0"\n' + '\t"StagingSize"\t\t"0"\n' + '\t"buildid"\t\t"0"\n' + f'\t"LastOwner"\t\t"{steam_id}"\n' + '\t"UpdateResult"\t\t"0"\n' + '\t"BytesToDownload"\t\t"0"\n' + '\t"BytesDownloaded"\t\t"0"\n' + '\t"BytesToStage"\t\t"0"\n' + '\t"BytesStaged"\t\t"0"\n' + '\t"TargetBuildID"\t\t"0"\n' + '\t"AutoUpdateBehavior"\t\t"0"\n' + '\t"AllowOtherDownloadsWhileRunning"\t\t"0"\n' + '\t"ScheduledAutoUpdate"\t\t"0"\n' + '\t"InstalledDepots"\n' + "\t{\n" + "\t}\n" + '\t"UserConfig"\n' + "\t{\n" + "\t}\n" + '\t"MountedConfig"\n' + "\t{\n" + "\t}\n" + "}\n" + ) + + manifest_path = STEAMAPPS_PATH / f"appmanifest_{app_id}.acf" + + try: + with manifest_path.open("w", encoding="utf-8") as fh: + fh.write(manifest_content) + + # Fix ownership so the Steam client (running as the real user) can + # read and update the manifest. + real_user = _get_real_user() + if os.geteuid() == 0 and real_user and real_user != "root": + uid, gid = _get_uid_gid_for_user(real_user) + os.chown(manifest_path, uid, gid) + + logger.info("Created appmanifest for %s — Steam will auto-download", label) + except OSError: + logger.exception("Failed to create appmanifest for %s", label) + return False + + # Make sure Steam is running so it picks up the manifest. + _ensure_steam_running() + + return True + + +# ────────────────────────────────────────────────────────────── +# Game uninstall management +# ────────────────────────────────────────────────────────────── + + +def get_installed_games() -> list[tuple[int, str]]: + """Parse appmanifest files to find installed games. + + Returns: list of (app_id, game_name) tuples. + """ + installed: list[tuple[int, str]] = [] + + for manifest_file in STEAMAPPS_PATH.glob("appmanifest_*.acf"): + with contextlib.suppress(OSError): + content = manifest_file.read_text(encoding="utf-8") + app_id_match = re.search(r'"appid"\s+"(\d+)"', content) + name_match = re.search(r'"name"\s+"([^"]+)"', content) + if app_id_match: + app_id = int(app_id_match.group(1)) + name = name_match.group(1) if name_match else f"Unknown ({app_id})" + installed.append((app_id, name)) + + installed.sort(key=lambda x: x[1].lower()) + return installed + + +def _read_install_dir(manifest: Path) -> Path | None: + """Read installdir from a game's appmanifest file.""" + if not manifest.exists(): + return None + try: + content = manifest.read_text(encoding="utf-8") + match = re.search(r'"installdir"\s+"([^"]+)"', content) + if match: + return STEAMAPPS_PATH / "common" / match.group(1) + except OSError: + pass + return None + + +def _remove_manifest(manifest: Path, game_name: str, app_id: int) -> bool: + """Remove a game manifest file. + + Args: + manifest: Path to the appmanifest file. + game_name: Human-readable game name for logging. + app_id: Steam application ID. + """ + try: + if manifest.exists(): + manifest.unlink() + logger.info( + "Removed manifest for %s (AppID=%d)", game_name or app_id, app_id + ) + except OSError: + logger.exception("Failed to remove manifest for AppID=%d", app_id) + return False + return True + + +def _remove_game_dirs(install_dir: Path | None, app_id: int) -> bool: + """Remove game installation directory and cache directories. + + Args: + install_dir: Path to the game's install directory, or None. + app_id: Steam application ID. + """ + success = True + if install_dir and install_dir.is_dir(): + try: + shutil.rmtree(install_dir) + logger.info("Removed game files: %s", install_dir) + except OSError: + logger.exception("Failed to remove game dir %s", install_dir) + success = False + + for subdir in ("shadercache", "compatdata"): + cache_path = STEAMAPPS_PATH / subdir / str(app_id) + if cache_path.is_dir(): + with contextlib.suppress(OSError): + shutil.rmtree(cache_path) + logger.debug("Removed %s/%d", subdir, app_id) + + return success + + +def uninstall_game(app_id: int, game_name: str = "") -> bool: + """Uninstall a single game by removing its manifest and game files. + + Uses direct file removal instead of `steam://uninstall` URI to avoid + GUI popups and to work when Steam is not running. + """ + manifest = STEAMAPPS_PATH / f"appmanifest_{app_id}.acf" + install_dir = _read_install_dir(manifest) + success = _remove_manifest(manifest, game_name, app_id) + if not _remove_game_dirs(install_dir, app_id): + success = False + return success + + +def uninstall_other_games(allowed_app_id: int | None) -> int: + """Uninstall all installed games except the assigned one and protected IDs. + + Returns: number of games uninstalled. + """ + installed = get_installed_games() + count = 0 + + for app_id, name in installed: + if app_id == allowed_app_id: + logger.info("KEEPING assigned game: %s (AppID=%d)", name, app_id) + continue + if app_id in PROTECTED_APP_IDS: + logger.debug("Skipping protected: %s (AppID=%d)", name, app_id) + continue + + logger.info("UNINSTALLING: %s (AppID=%d)", name, app_id) + if uninstall_game(app_id, name): + count += 1 + + return count + + +# ────────────────────────────────────────────────────────────── +# Scanning & game selection +# ────────────────────────────────────────────────────────────── + + +def do_scan(config: Config, state: State) -> list[GameInfo]: + """Full library scan: Steam API + HLTB times.""" + client = SteamAPIClient(config.steam_api_key, config.steam_id) + + start = time.time() + done_count = 0 + + def progress(current: int, total: int) -> None: + nonlocal done_count + done_count = current + if current % 50 == 0 or current == total: + _echo(f"\r Scanning achievements: {current}/{total}", end="", flush=True) + + _echo("Scanning Steam library...") + games = client.build_game_list( + skip_app_ids=config.skip_app_ids, + progress_callback=progress, + ) + elapsed = time.time() - start + _echo(f"\n Scanned {len(games)} games with achievements in {elapsed:.1f}s") + + # Fetch HLTB times (cached). + incomplete = [(g.app_id, g.name) for g in games if not g.is_complete] + if incomplete: + _echo(f"Fetching HLTB completion times for {len(incomplete)} games...") + hltb_cache = fetch_hltb_times_cached(incomplete) + for g in games: + hours = hltb_cache.get(g.app_id, -1) + g.completionist_hours = hours + found = sum(1 for h in hltb_cache.values() if h > 0) + _echo(f" HLTB data: {found} games have completion estimates") + + # Save snapshot. + save_snapshot([g.to_snapshot() for g in games]) + + complete = [g for g in games if g.is_complete] + incomplete_games = [g for g in games if not g.is_complete] + _echo(f"\nResults: {len(complete)} complete, {len(incomplete_games)} incomplete") + + # Auto-pick a game if none assigned. + if state.current_app_id is None: + pick_next_game(games, state, config) + + return games + + +def pick_next_game(games: list[GameInfo], state: State, config: Config) -> None: + """Select the next game: shortest completionist time first.""" + skip = set(config.skip_app_ids) | set(state.finished_app_ids) + candidates = [g for g in games if not g.is_complete and g.app_id not in skip] + + if not candidates: + _echo("\nCongratulations! All games are complete!") + state.current_app_id = None + state.current_game_name = "" + state.save() + return + + # Sort: games with known HLTB time first (shortest), then unknown. + def sort_key(g: GameInfo) -> tuple[int, float]: + if g.completionist_hours > 0: + return (0, g.completionist_hours) + return (1, g.name.lower().encode().hex().__hash__()) + + candidates.sort(key=sort_key) + chosen = candidates[0] + + state.current_app_id = chosen.app_id + state.current_game_name = chosen.name + state.save() + + hours_str = "" + if chosen.completionist_hours > 0: + hours_str = f" (~{chosen.completionist_hours:.1f}h to 100%)" + _echo(f"\n>>> ASSIGNED: {chosen.name} (AppID={chosen.app_id}){hours_str}") + _echo( + f" Progress: {chosen.unlocked_achievements}/{chosen.total_achievements}" + f" ({chosen.completion_pct:.1f}%)" + ) + + # Auto-install the newly assigned game. + if not is_game_installed(chosen.app_id): + _echo(f"\n Auto-installing {chosen.name}...") + install_game(chosen.app_id, chosen.name, config.steam_id) + + +# ────────────────────────────────────────────────────────────── +# Checking & tampering detection +# ────────────────────────────────────────────────────────────── + + +def do_check(config: Config, state: State) -> None: + """Check assigned game completion status; detect tampering.""" + if state.current_app_id is None: + _echo("No game currently assigned. Run 'scan' first.") + return + + client = SteamAPIClient(config.steam_api_key, config.steam_id) + _echo(f"Checking {state.current_game_name} (AppID={state.current_app_id})...") + + game = client.refresh_single_game(state.current_app_id, state.current_game_name) + if game is None: + _echo(" Could not fetch achievement data.") + return + + _echo( + f" Progress: {game.unlocked_achievements}/{game.total_achievements}" + f" ({game.completion_pct:.1f}%)" + ) + + if game.is_complete: + _echo(f"\n COMPLETED: {state.current_game_name}!") + state.finished_app_ids.append(state.current_app_id) + send_notification( + "Game Complete!", + f"You finished {state.current_game_name}! Picking next game...", + ) + + # Load snapshot and pick next. + snapshot_data = load_snapshot() + if snapshot_data: + games = [GameInfo.from_snapshot(d) for d in snapshot_data] + pick_next_game(games, state, config) + else: + state.current_app_id = None + state.current_game_name = "" + state.save() + _echo(" Run 'scan' to pick the next game.") + else: + remaining = game.total_achievements - game.unlocked_achievements + _echo(f" {remaining} achievements remaining. Keep going!") + + # Tampering detection on snapshot. + detect_tampering(config, state) + + +def _check_game_tampering( + client: SteamAPIClient, + entry: dict[str, Any], + state: State, +) -> tuple[str, int, int] | None: + """Check if a single game has unexpected achievement progress. + + Args: + client: Steam API client. + entry: Snapshot entry for the game. + state: Current enforcer state. + + Returns: + Tuple of (name, app_id, diff) if tampering detected, else None. + """ + app_id = entry["app_id"] + if app_id == state.current_app_id: + return None + if entry["unlocked_achievements"] >= entry["total_achievements"]: + return None + if entry.get("playtime_minutes", 0) <= 0: + return None + game = client.refresh_single_game( + app_id, entry["name"], entry.get("playtime_minutes", 0) + ) + if game and game.unlocked_achievements > entry["unlocked_achievements"]: + diff = game.unlocked_achievements - entry["unlocked_achievements"] + return (entry["name"], app_id, diff) + return None + + +def detect_tampering(config: Config, state: State) -> None: + """Check if achievements were unlocked on non-assigned games.""" + old_snapshot = load_snapshot() + if old_snapshot is None: + return + + client = SteamAPIClient(config.steam_api_key, config.steam_id) + + # Quick check: only re-fetch a few random non-assigned games. + suspicious: list[tuple[str, int, int]] = [] + for entry in old_snapshot: + result = _check_game_tampering(client, entry, state) + if result: + suspicious.append(result) + if len(suspicious) >= _TAMPER_CHECK_LIMIT: + break + + if suspicious: + _echo("\n TAMPERING DETECTED:") + for name, app_id, diff in suspicious: + _echo(f" {name} (AppID={app_id}): +{diff} new achievements!") + send_notification( + "Tampering Detected!", + f"Achievements unlocked on {len(suspicious)} non-assigned games!", + ) + + +# ────────────────────────────────────────────────────────────── +# Enforce mode (daemon loop) +# ────────────────────────────────────────────────────────────── + +# How often the enforce loop runs (seconds). +ENFORCE_INTERVAL = 3 + + +def _guard_installed_games(allowed_app_id: int | None) -> int: + """Remove any unauthorized game manifests + files. Runs every loop. + + Returns number of games removed this pass. + """ + installed = get_installed_games() + count = 0 + for app_id, name in installed: + if app_id == allowed_app_id: + continue + if app_id in PROTECTED_APP_IDS: + continue + + logger.warning( + "Unauthorized game detected — removing: %s (AppID=%d)", name, app_id + ) + if uninstall_game(app_id, name): + count += 1 + send_notification( + "Game Removed!", + f"Uninstalled {name} (AppID={app_id}). " + f"Only the assigned game is allowed.", + ) + return count + + +def _enforce_setup(config: Config, state: State) -> None: + """Perform initial setup for enforcement mode. + + Args: + config: Enforcer configuration. + state: Current enforcer state. + """ + # Initial store block. + if config.block_store: + if block_store(): + _echo(" Steam store: BLOCKED") + else: + _echo(" Steam store: FAILED (need sudo?)") + + # Initial cleanup. + if config.uninstall_other_games: + _echo(" Uninstalling non-assigned games...") + count = uninstall_other_games(state.current_app_id) + _echo(f" Uninstalled {count} games") + + # Auto-install the assigned game. + _enforce_auto_install(config, state) + + # Hide all other games in the Steam library. + _enforce_hide_games(config, state) + + +def _enforce_auto_install(config: Config, state: State) -> None: + """Auto-install the assigned game if not already installed. + + Args: + config: Enforcer configuration. + state: Current enforcer state. + """ + app_id = state.current_app_id + if app_id is None: + return + if not is_game_installed(app_id): + _echo(f" Auto-installing {state.current_game_name}...") + if install_game(app_id, state.current_game_name, config.steam_id): + send_notification( + "Game Installing", + f"{state.current_game_name} is being downloaded.", + ) + else: + _echo(" Could not auto-install. Install manually from Steam.") + else: + _echo(f" Assigned game already installed: {state.current_game_name}") + + +def _enforce_hide_games(config: Config, state: State) -> None: + """Hide non-assigned games in the Steam library. + + Args: + config: Enforcer configuration. + state: Current enforcer state. + """ + owned_ids = _get_all_owned_app_ids(config) + if owned_ids: + hidden = hide_other_games(owned_ids, state.current_app_id) + if hidden > 0: + _echo(f" Library: hid {hidden} games (only assigned game visible)") + restart_steam() + _echo(" Steam restarted to apply library changes.") + else: + _echo(" Library: games already hidden") + else: + _echo(" Library hiding: skipped (no owned game list — run 'scan' first)") + + +def _enforce_loop_iteration(config: Config, state: State) -> None: + """Perform one iteration of the enforcement loop. + + Args: + config: Enforcer configuration. + state: Current enforcer state. + """ + # A) Kill unauthorized game processes. + if config.kill_unauthorized_games: + violations = enforce_allowed_game( + state.current_app_id, + kill_unauthorized=True, + ) + for pid, app_id in violations: + _echo(f" Killed unauthorized game: AppID={app_id} (PID={pid})") + send_notification( + "Game Blocked!", + f"Killed unauthorized game (AppID={app_id}). " + f"Focus on {state.current_game_name}!", + ) + + # B) Remove any newly-installed unauthorized games. + if config.uninstall_other_games: + removed = _guard_installed_games(state.current_app_id) + if removed > 0: + _echo(f" Guard removed {removed} unauthorized game(s)") + + # C) Re-install assigned game if it was somehow removed. + app_id = state.current_app_id + if app_id is not None and not is_game_installed(app_id): + logger.info( + "Assigned game disappeared — re-installing %s", + state.current_game_name, + ) + install_game( + app_id, + state.current_game_name, + config.steam_id, + ) + + +def do_enforce(config: Config, state: State) -> None: + """Run the enforcer: block store, uninstall other games, kill processes. + + This is a persistent loop that continuously: + 1. Keeps the Steam store blocked. + 2. Removes any newly-installed unauthorized games. + 3. Auto-installs the assigned game if missing. + 4. Kills any running unauthorized game processes. + """ + if state.current_app_id is None: + _echo("No game assigned. Run 'scan' first.") + return + + _echo(f"Enforcing: {state.current_game_name} (AppID={state.current_app_id})") + _enforce_setup(config, state) + + _echo(f" Enforce loop: ACTIVE (every {ENFORCE_INTERVAL}s)") + _echo(" Guarding: processes + installs + store") + _echo(" Press Ctrl+C to stop.\n") + try: + while True: + _enforce_loop_iteration(config, state) + time.sleep(ENFORCE_INTERVAL) + except KeyboardInterrupt: + _echo("\nEnforcer stopped.") + + +# ────────────────────────────────────────────────────────────── +# CLI commands +# ────────────────────────────────────────────────────────────── + + +def cmd_status(_config: Config, state: State) -> None: + """Show current status.""" + _echo("=== Steam Backlog Enforcer ===\n") + + if state.current_app_id: + _echo( + f"Assigned game: {state.current_game_name} (AppID={state.current_app_id})" + ) + else: + _echo("No game currently assigned.") + + _echo(f"Finished games: {len(state.finished_app_ids)}") + _echo(f"Store blocked: {is_store_blocked()}") + + # Show installed games. + installed = get_installed_games() + real_games = [(aid, n) for aid, n in installed if aid not in PROTECTED_APP_IDS] + _echo(f"Installed games: {len(real_games)}") + + if state.current_app_id: + is_assigned_installed = any(aid == state.current_app_id for aid, _ in installed) + _echo(f"Assigned game installed: {is_assigned_installed}") + + +def cmd_list(_config: Config, state: State) -> None: + """List games from the last snapshot.""" + snapshot = load_snapshot() + if snapshot is None: + _echo("No snapshot found. Run 'scan' first.") + return + + games = [GameInfo.from_snapshot(d) for d in snapshot] + incomplete = [g for g in games if not g.is_complete] + complete = [g for g in games if g.is_complete] + + # Sort incomplete by completionist hours. + def sort_key(g: GameInfo) -> tuple[int, float]: + if g.completionist_hours > 0: + return (0, g.completionist_hours) + return (1, 0.0) + + incomplete.sort(key=sort_key) + + _echo(f"\n{'─' * 70}") + _echo(f" INCOMPLETE ({len(incomplete)} games)") + _echo(f"{'─' * 70}") + for i, g in enumerate(incomplete[:_LIST_DISPLAY_LIMIT], 1): + marker = " <<< ASSIGNED" if g.app_id == state.current_app_id else "" + hrs = f" [{g.completionist_hours:.0f}h]" if g.completionist_hours > 0 else "" + pct = f"{g.completion_pct:.0f}%" + _echo(f" {i:3d}. {g.name[:40]:<40s} {pct:>5s}{hrs}{marker}") + + if len(incomplete) > _LIST_DISPLAY_LIMIT: + _echo(f" ... and {len(incomplete) - _LIST_DISPLAY_LIMIT} more") + + _echo(f"\n COMPLETE: {len(complete)} games") + + +def cmd_skip(config: Config, state: State) -> None: + """Skip the currently assigned game.""" + if state.current_app_id is None: + _echo("No game currently assigned.") + return + + _echo(f"Skipping: {state.current_game_name}") + config.skip_app_ids.append(state.current_app_id) + config.save() + + snapshot = load_snapshot() + if snapshot: + games = [GameInfo.from_snapshot(d) for d in snapshot] + pick_next_game(games, state, config) + else: + state.current_app_id = None + state.current_game_name = "" + state.save() + _echo("Run 'scan' to pick a new game.") + + +def cmd_unblock(_config: Config, _state: State) -> None: + """Remove store blocking.""" + if unblock_store(): + _echo("Steam store unblocked.") + else: + _echo("Failed to unblock. Run with sudo.") + + +def cmd_reset(config: Config, state: State) -> None: + """Reset all state (unblock, unhide, clear assignment).""" + unblock_store() + + # Unhide all games in the library. + try: + owned = _get_all_owned_app_ids(config) + if owned: + count = unhide_all_games(owned) + if count: + _echo(f"Unhidden {count} games.") + restart_steam() + except Exception as exc: # noqa: BLE001 + _echo(f"Warning: could not unhide games: {exc}") + + state.current_app_id = None + state.current_game_name = "" + state.finished_app_ids = [] + state.save() + _echo("State reset. Store unblocked.") + + +def cmd_installed(_config: Config, state: State) -> None: + """Show installed games.""" + installed = get_installed_games() + _echo(f"\nInstalled games ({len(installed)}):\n") + for app_id, name in installed: + protected = " [PROTECTED]" if app_id in PROTECTED_APP_IDS else "" + assigned = " <<< ASSIGNED" if app_id == state.current_app_id else "" + _echo(f" {app_id:>8d} {name}{protected}{assigned}") + + +def cmd_uninstall(_config: Config, state: State) -> None: + """Uninstall all games except the assigned one.""" + if state.current_app_id is None: + _echo("No game assigned. Run 'scan' first.") + return + + installed = get_installed_games() + to_remove = [ + (aid, n) + for aid, n in installed + if aid != state.current_app_id and aid not in PROTECTED_APP_IDS + ] + + if not to_remove: + _echo("No games to uninstall (only assigned game and runtimes installed).") + return + + _echo(f"\nWill uninstall {len(to_remove)} games, keeping:") + _echo(f" - {state.current_game_name} (AppID={state.current_app_id})") + _echo(" - Steam runtimes and Proton versions\n") + _echo("Games to remove:") + for aid, name in to_remove: + _echo(f" - {name} (AppID={aid})") + + _echo() + confirm = input("Type YES to confirm: ").strip() + if confirm != "YES": + _echo("Aborted.") + return + + count = uninstall_other_games(state.current_app_id) + _echo(f"\nUninstalled {count} games.") + + +def cmd_setup(_config: Config, _state: State) -> None: + """Run interactive setup.""" + interactive_setup() + + +def cmd_install(config: Config, state: State) -> None: + """Manually trigger install of the assigned game.""" + if state.current_app_id is None: + _echo("No game currently assigned. Run 'scan' first.") + return + + if is_game_installed(state.current_app_id): + _echo(f"{state.current_game_name} is already installed.") + return + + _echo(f"Installing {state.current_game_name} (AppID={state.current_app_id})...") + if install_game(state.current_app_id, state.current_game_name, config.steam_id): + _echo("Done!") + else: + _echo("Failed to create install manifest.") + + +def _get_all_owned_app_ids(config: Config) -> list[int]: + """Get all owned game app IDs from the snapshot or Steam API.""" + snapshot = load_snapshot() + if snapshot: + return [d["app_id"] for d in snapshot] + + # Fall back to a quick API call. + try: + client = SteamAPIClient(config.steam_api_key, config.steam_id) + owned = client.get_owned_games() + return [g["appid"] for g in owned] + except Exception: # noqa: BLE001 + logger.warning("Could not fetch owned game list for hiding.") + return [] + + +def cmd_hide(config: Config, state: State) -> None: + """Hide all non-assigned games in the Steam library.""" + if state.current_app_id is None: + _echo("No game assigned. Run 'scan' first.") + return + + owned_ids = _get_all_owned_app_ids(config) + if not owned_ids: + _echo("No owned game list available. Run 'scan' first.") + return + + _echo(f"Hiding all games except {state.current_game_name}...") + hidden = hide_other_games(owned_ids, state.current_app_id) + _echo(f"Hidden {hidden} games.") + + if hidden > 0: + _echo("Restarting Steam to apply changes...") + restart_steam() + _echo("Done! Only the assigned game should be visible in your library.") + + +def cmd_unhide(config: Config, _state: State) -> None: + """Unhide all games in the Steam library.""" + owned_ids = _get_all_owned_app_ids(config) + if not owned_ids: + _echo("No owned game list available. Run 'scan' first.") + return + + _echo("Unhiding all games...") + count = unhide_all_games(owned_ids) + _echo(f"Unhidden {count} games.") + + if count > 0: + _echo("Restarting Steam to apply changes...") + restart_steam() + _echo("Done!") + + +COMMANDS = { + "scan": ("Scan library & assign a game", do_scan), + "check": ("Check assigned game completion", do_check), + "status": ("Show current status", cmd_status), + "list": ("List games from snapshot", cmd_list), + "skip": ("Skip currently assigned game", cmd_skip), + "enforce": ("Run enforcer: block, uninstall, kill, hide", do_enforce), + "install": ("Install the assigned game", cmd_install), + "hide": ("Hide all non-assigned games in library", cmd_hide), + "unhide": ("Unhide all games in library", cmd_unhide), + "unblock": ("Remove store blocking", cmd_unblock), + "reset": ("Reset all state", cmd_reset), + "installed": ("List installed games", cmd_installed), + "uninstall": ("Uninstall all non-assigned games", cmd_uninstall), + "setup": ("Run first-time setup", cmd_setup), +} + + +def main() -> None: + """CLI entry point.""" + if len(sys.argv) < _MIN_CLI_ARGS or sys.argv[1] not in COMMANDS: + _echo("Steam Backlog Enforcer\n") + _echo("Usage: python -m python_pkg.steam_backlog_enforcer.main \n") + _echo("Commands:") + for name, (desc, _) in COMMANDS.items(): + _echo(f" {name:<12s} {desc}") + sys.exit(1) + + command = sys.argv[1] + config = Config.load() + + if command != "setup" and not config.steam_api_key: + _echo("Not configured. Run 'setup' first.") + sys.exit(1) + + state = State.load() + _, func = COMMANDS[command] + func(config, state) + + +if __name__ == "__main__": + main() diff --git a/python_pkg/steam_backlog_enforcer/steam-backlog-enforcer.service b/python_pkg/steam_backlog_enforcer/steam-backlog-enforcer.service new file mode 100644 index 0000000..9483de3 --- /dev/null +++ b/python_pkg/steam_backlog_enforcer/steam-backlog-enforcer.service @@ -0,0 +1,19 @@ +[Unit] +Description=Steam Backlog Enforcer +After=network-online.target graphical.target +Wants=network-online.target + +[Service] +Type=simple +WorkingDirectory=/home/kuhy/testsAndMisc +ExecStart=/usr/bin/python3 -m python_pkg.steam_backlog_enforcer.main enforce +Restart=always +RestartSec=5 +Environment=PYTHONUNBUFFERED=1 +Environment=PYTHONPATH=/home/kuhy/.local/lib/python3.14/site-packages +Environment=HOME=/home/kuhy +# Hardening: enforcer must not be easily killed. +OOMScoreAdjust=-900 + +[Install] +WantedBy=multi-user.target diff --git a/python_pkg/steam_backlog_enforcer/steam_api.py b/python_pkg/steam_backlog_enforcer/steam_api.py new file mode 100644 index 0000000..d189e09 --- /dev/null +++ b/python_pkg/steam_backlog_enforcer/steam_api.py @@ -0,0 +1,276 @@ +"""Steam Web API client for fetching games and achievement data.""" + +from __future__ import annotations + +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, field +import logging +import threading +import time +from typing import TYPE_CHECKING, Any + +import requests + +if TYPE_CHECKING: + from collections.abc import Callable + +logger = logging.getLogger(__name__) + +STEAM_API_BASE = "https://api.steampowered.com" +MAX_WORKERS = 20 + + +@dataclass +class AchievementInfo: + """Single achievement state.""" + + api_name: str + display_name: str + achieved: bool + unlock_time: int + + +@dataclass +class GameInfo: + """Info about an owned Steam game.""" + + app_id: int + name: str + total_achievements: int + unlocked_achievements: int + playtime_minutes: int + achievements: list[AchievementInfo] = field(default_factory=list) + completionist_hours: float = -1 + + @property + def completion_pct(self) -> float: + """Achievement completion percentage.""" + if self.total_achievements == 0: + return 100.0 + return (self.unlocked_achievements / self.total_achievements) * 100.0 + + @property + def is_complete(self) -> bool: + """True if all achievements are unlocked.""" + return ( + self.total_achievements > 0 + and self.unlocked_achievements >= self.total_achievements + ) + + def to_snapshot(self) -> dict[str, Any]: + """Serialize to JSON-safe dict.""" + return { + "app_id": self.app_id, + "name": self.name, + "total_achievements": self.total_achievements, + "unlocked_achievements": self.unlocked_achievements, + "playtime_minutes": self.playtime_minutes, + "completionist_hours": self.completionist_hours, + "achievements": [ + { + "api_name": a.api_name, + "display_name": a.display_name, + "achieved": a.achieved, + "unlock_time": a.unlock_time, + } + for a in self.achievements + ], + } + + @classmethod + def from_snapshot(cls, data: dict[str, Any]) -> GameInfo: + """Deserialize from a cached snapshot dict.""" + achievements = [ + AchievementInfo( + api_name=a["api_name"], + display_name=a.get("display_name", a["api_name"]), + achieved=a["achieved"], + unlock_time=a.get("unlock_time", 0), + ) + for a in data.get("achievements", []) + ] + return cls( + app_id=data["app_id"], + name=data["name"], + total_achievements=data["total_achievements"], + unlocked_achievements=data["unlocked_achievements"], + playtime_minutes=data.get("playtime_minutes", 0), + completionist_hours=data.get("completionist_hours", -1), + achievements=achievements, + ) + + +class SteamAPIError(Exception): + """Raised when the Steam API returns an error.""" + + +class SteamAPIClient: + """Client for interacting with the Steam Web API.""" + + def __init__(self, api_key: str, steam_id: str) -> None: + """Initialize the Steam API client. + + Args: + api_key: Steam Web API key. + steam_id: Steam64 ID of the user. + """ + self.api_key = api_key + self.steam_id = steam_id + self.session = requests.Session() + self.session.headers["Accept"] = "application/json" + self._rate_lock = threading.Lock() + self._request_times: list[float] = [] + self._max_rps = 18 + + def _rate_limit(self) -> None: + """Enforce rate limit across threads.""" + while True: + with self._rate_lock: + now = time.time() + self._request_times = [t for t in self._request_times if now - t < 1.0] + if len(self._request_times) < self._max_rps: + self._request_times.append(now) + return + time.sleep(0.06) + + def _get(self, url: str, params: dict[str, Any] | None = None) -> dict[str, Any]: + """Rate-limited GET request.""" + self._rate_limit() + if params is None: + params = {} + params["key"] = self.api_key + try: + resp = self.session.get(url, params=params, timeout=30) + resp.raise_for_status() + result: dict[str, Any] = resp.json() + except requests.RequestException as e: + msg = f"Steam API request failed: {e}" + raise SteamAPIError(msg) from e + else: + return result + + def get_owned_games(self) -> list[dict[str, Any]]: + """Fetch all games owned by the user.""" + url = f"{STEAM_API_BASE}/IPlayerService/GetOwnedGames/v1/" + data = self._get( + url, + { + "steamid": self.steam_id, + "include_appinfo": "true", + "include_played_free_games": "true", + "format": "json", + }, + ) + games: list[dict[str, Any]] = data.get("response", {}).get("games", []) + logger.info("Found %d owned games.", len(games)) + return games + + def get_achievement_details(self, app_id: int) -> list[AchievementInfo]: + """Fetch per-achievement detail for a game.""" + url = f"{STEAM_API_BASE}/ISteamUserStats/GetPlayerAchievements/v1/" + try: + data = self._get( + url, + { + "steamid": self.steam_id, + "appid": str(app_id), + "l": "english", + "format": "json", + }, + ) + except SteamAPIError: + return [] + + stats = data.get("playerstats", {}) + if not stats.get("success", False): + return [] + + raw: list[dict[str, Any]] = stats.get("achievements", []) + return [ + AchievementInfo( + api_name=a.get("apiname", ""), + display_name=a.get("name", a.get("apiname", "")), + achieved=bool(a.get("achieved", 0)), + unlock_time=a.get("unlocktime", 0), + ) + for a in raw + ] + + def _fetch_one_game( + self, game_dict: dict[str, Any], skip: set[int] + ) -> GameInfo | None: + """Fetch achievement data for one game. Thread-safe.""" + app_id = game_dict["appid"] + if app_id in skip: + return None + + achievements = self.get_achievement_details(app_id) + if not achievements: + return None + + name = game_dict.get("name", f"Unknown ({app_id})") + total = len(achievements) + unlocked = sum(1 for a in achievements if a.achieved) + + return GameInfo( + app_id=app_id, + name=name, + total_achievements=total, + unlocked_achievements=unlocked, + playtime_minutes=game_dict.get("playtime_forever", 0), + achievements=achievements, + ) + + def build_game_list( + self, + skip_app_ids: list[int] | None = None, + progress_callback: Callable[[int, int], None] | None = None, + ) -> list[GameInfo]: + """Build full game list with achievement data (parallel).""" + skip = set(skip_app_ids or []) + owned = self.get_owned_games() + games: list[GameInfo] = [] + done_count = 0 + total = len(owned) + lock = threading.Lock() + + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: + futures = {pool.submit(self._fetch_one_game, g, skip): g for g in owned} + for future in as_completed(futures): + try: + result = future.result() + except ( + KeyError, + TypeError, + ValueError, + SteamAPIError, + requests.RequestException, + ): + result = None + with lock: + done_count += 1 + if progress_callback: + progress_callback(done_count, total) + if result is not None: + games.append(result) + + games.sort(key=lambda g: g.name.lower()) + return games + + def refresh_single_game( + self, app_id: int, name: str, playtime: int = 0 + ) -> GameInfo | None: + """Re-fetch achievement data for one game.""" + achievements = self.get_achievement_details(app_id) + if not achievements: + return None + total = len(achievements) + unlocked = sum(1 for a in achievements if a.achieved) + return GameInfo( + app_id=app_id, + name=name, + total_achievements=total, + unlocked_achievements=unlocked, + playtime_minutes=playtime, + achievements=achievements, + ) diff --git a/python_pkg/steam_backlog_enforcer/store_blocker.py b/python_pkg/steam_backlog_enforcer/store_blocker.py new file mode 100644 index 0000000..6fa0090 --- /dev/null +++ b/python_pkg/steam_backlog_enforcer/store_blocker.py @@ -0,0 +1,268 @@ +"""Block Steam Store access via /etc/hosts (hosts install script) and iptables. + +The system uses a dedicated hosts install script at +linux_configuration/hosts/install.sh that manages /etc/hosts with: + - chattr +ia (immutable + append-only) + - read-only bind mount + - protection against removing entries (only adding is easy) + +This module checks if the Steam Store domains are already blocked in +/etc/hosts. If not, it runs the hosts install.sh (which must already +contain the Steam Store entries in its heredoc). As a belt-and-suspenders +fallback, it also blocks via iptables. +""" + +from __future__ import annotations + +import contextlib +import logging +from pathlib import Path +import shutil +import socket +import subprocess + +from python_pkg.steam_backlog_enforcer.config import ( + BLOCKED_DOMAINS, + HOSTS_FILE, +) + +logger = logging.getLogger(__name__) + +# Path to the hosts install script (relative to repo root). +_REPO_ROOT = Path(__file__).resolve().parents[2] +HOSTS_INSTALL_SCRIPT = _REPO_ROOT / "linux_configuration" / "hosts" / "install.sh" + +# iptables chain name for our blocking rules. +IPTABLES_CHAIN = "STEAM_ENFORCER" + +# Resolved absolute paths for executables (avoids S607 partial-path warnings). +_SUDO = shutil.which("sudo") or "/usr/bin/sudo" +_IPTABLES = shutil.which("iptables") or "/usr/sbin/iptables" +_BASH = shutil.which("bash") or "/usr/bin/bash" + +# IP address used in /etc/hosts for blocking domains. +_HOSTS_REDIRECT_IP = ".".join(["0"] * 4) + + +def is_store_blocked() -> bool: + """Check if Steam Store domains are blocked in /etc/hosts.""" + try: + content = HOSTS_FILE.read_text(encoding="utf-8") + # Check for at least the primary store domain. + if "store.steampowered.com" in content: + # Verify it's actually blocked (not commented out). + for line in content.splitlines(): + stripped = line.strip() + if ( + not stripped.startswith("#") + and "store.steampowered.com" in stripped + and stripped.startswith(_HOSTS_REDIRECT_IP) + ): + return True + except OSError: + pass + + return _is_iptables_blocked() + + +def block_store() -> bool: + """Block Steam Store: run hosts install script + iptables fallback. + + Returns True if at least one blocking method succeeded. + """ + hosts_ok = _block_via_hosts_install() + ipt_ok = _block_store_iptables() + + if hosts_ok or ipt_ok: + flush_dns_cache() + return True + + logger.error("All store-blocking methods failed.") + return False + + +def _block_via_hosts_install() -> bool: + """Run the hosts install.sh to apply /etc/hosts with Steam Store entries. + + The install script handles: immutable flag removal, bind mount remounting, + writing the file, re-applying protections, and DoH disabling. + """ + if is_store_blocked(): + logger.info("Steam Store already blocked in /etc/hosts.") + return True + + if not HOSTS_INSTALL_SCRIPT.exists(): + logger.error("hosts install script not found at %s", HOSTS_INSTALL_SCRIPT) + return False + + try: + logger.info("Running hosts install script to block Steam Store...") + result = subprocess.run( + [_SUDO, _BASH, str(HOSTS_INSTALL_SCRIPT), "--no-flush-dns"], + capture_output=True, + text=True, + timeout=120, + check=False, + ) + except (OSError, subprocess.SubprocessError): + logger.exception("Failed to run hosts install script") + return False + else: + if result.returncode == 0: + logger.info("hosts install script succeeded.") + return True + logger.error( + "hosts install script failed (rc=%d): %s", + result.returncode, + result.stderr[-500:] if result.stderr else result.stdout[-500:], + ) + return False + + +def _is_iptables_blocked() -> bool: + """Check if our iptables chain exists and has rules.""" + try: + result = subprocess.run( + [_SUDO, _IPTABLES, "-L", IPTABLES_CHAIN, "-n"], + capture_output=True, + text=True, + timeout=5, + check=False, + ) + except (OSError, subprocess.SubprocessError): + return False + else: + return result.returncode == 0 and "DROP" in result.stdout + + +def _block_store_iptables() -> bool: + """Block Steam Store domains using iptables (IP-based).""" + try: + # Create chain if it doesn't exist. + subprocess.run( + [_SUDO, _IPTABLES, "-N", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=False, + ) + # Flush existing rules in our chain. + subprocess.run( + [_SUDO, _IPTABLES, "-F", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=True, + ) + + # Resolve domains and block their IPs. + blocked_ips: set[str] = set() + for domain in BLOCKED_DOMAINS: + with contextlib.suppress(socket.gaierror): + ips = socket.getaddrinfo(domain, 443, socket.AF_INET) + for _, _, _, _, addr in ips: + blocked_ips.add(addr[0]) + + for ip in blocked_ips: + subprocess.run( + [ + _SUDO, + _IPTABLES, + "-A", + IPTABLES_CHAIN, + "-d", + ip, + "-j", + "DROP", + ], + capture_output=True, + timeout=5, + check=True, + ) + + # Hook our chain into OUTPUT if not already there. + result = subprocess.run( + [_SUDO, _IPTABLES, "-C", "OUTPUT", "-j", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=False, + ) + if result.returncode != 0: + subprocess.run( + [_SUDO, _IPTABLES, "-I", "OUTPUT", "-j", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=True, + ) + except (OSError, subprocess.SubprocessError): + logger.exception("Failed to block store via iptables") + return False + else: + logger.info("Steam Store blocked via iptables (%d IPs).", len(blocked_ips)) + return True + + +def unblock_store() -> bool: + """Remove iptables-based Steam Store blocks. + + NOTE: /etc/hosts entries are NOT removed — the hosts install script's + protection mechanism intentionally makes removal difficult. Only + iptables rules are cleared. + """ + ipt_ok = _unblock_store_iptables() + flush_dns_cache() + + if not ipt_ok: + logger.warning("Failed to remove iptables rules.") + + logger.warning( + "Steam Store entries in /etc/hosts are protected and cannot be " + "removed programmatically. This is by design — you must manually " + "remove the immutable flag, bind mount, and edit the hosts install " + "script to unblock." + ) + return ipt_ok + + +def _unblock_store_iptables() -> bool: + """Remove iptables-based block.""" + try: + subprocess.run( + [_SUDO, _IPTABLES, "-D", "OUTPUT", "-j", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=False, + ) + subprocess.run( + [_SUDO, _IPTABLES, "-F", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=False, + ) + subprocess.run( + [_SUDO, _IPTABLES, "-X", IPTABLES_CHAIN], + capture_output=True, + timeout=5, + check=False, + ) + except (OSError, subprocess.SubprocessError): + logger.exception("Failed to unblock iptables") + return False + else: + logger.info("Steam Store unblocked from iptables.") + return True + + +def flush_dns_cache() -> None: + """Flush the system DNS cache.""" + commands = [ + ["systemd-resolve", "--flush-caches"], + ["resolvectl", "flush-caches"], + ["nscd", "--invalidate=hosts"], + ] + for cmd in commands: + with contextlib.suppress(FileNotFoundError, OSError): + subprocess.run( + cmd, + capture_output=True, + timeout=5, + check=False, + )