feat: steam 100 percent extension

This commit is contained in:
Krzysztof kuhy Rudnicki 2026-03-02 20:29:32 +01:00
parent 253b327b72
commit e76c2c68ee
16 changed files with 2463 additions and 48 deletions

View File

@ -64,6 +64,7 @@ repos:
- id: ruff
args:
- --fix
- --unsafe-fixes
- --exit-non-zero-on-fix
- --show-fixes
types_or: [python, pyi]

View File

@ -187,6 +187,27 @@ elif ! grep -q '^ReadEtcHosts=yes' "$RESOLVED_CONF" 2>/dev/null; then
sudo systemctl restart systemd-resolved
fi
# ============================================================================
# TEMPORARILY DISABLE HOSTS GUARD PROTECTIONS FOR INSTALLATION
# ============================================================================
# The guard system uses a read-only bind mount and path watcher that prevent
# any writes to /etc/hosts. We must stop them before installing, then restart.
GUARD_SERVICES_STOPPED=0
for svc in hosts-bind-mount.service hosts-guard.path; do
if systemctl is-active --quiet "$svc" 2>/dev/null; then
echo "Stopping $svc for installation..."
systemctl stop "$svc" 2>/dev/null || true
GUARD_SERVICES_STOPPED=1
fi
done
# If bind mount is still active, unmount it
if findmnt /etc/hosts >/dev/null 2>&1; then
echo "Unmounting read-only bind mount on /etc/hosts..."
umount /etc/hosts 2>/dev/null || mount -o remount,rw,bind /etc/hosts 2>/dev/null || true
fi
# Remove all attributes from /etc/hosts to allow modifications
sudo chattr -i -a /etc/hosts 2>/dev/null || true
@ -372,6 +393,11 @@ tee -a /etc/hosts >/dev/null <<'EOF'
0.0.0.0 invidious.io.lol
# Steam Store
0.0.0.0 store.steampowered.com
0.0.0.0 checkout.steampowered.com
0.0.0.0 store.akamai.steamstatic.com
0.0.0.0 storefront.steampowered.com
0.0.0.0 store.cloudflare.steamstatic.com
# Discord - media allowed
# 0.0.0.0 cdn.discordapp.com
@ -610,6 +636,24 @@ sudo chmod 644 /etc/hosts
# Make the file immutable and append-only for maximum protection
sudo chattr +ia /etc/hosts
# ============================================================================
# RESTART HOSTS GUARD SERVICES
# ============================================================================
if [[ $GUARD_SERVICES_STOPPED -eq 1 ]]; then
echo "Restarting hosts guard services..."
# Update the canonical copy so the guard doesn't revert our changes
if [[ -f /usr/local/share/locked-hosts ]]; then
cp /etc/hosts /usr/local/share/locked-hosts
echo " Updated canonical snapshot."
fi
for svc in hosts-bind-mount.service hosts-guard.path; do
if systemctl is-enabled --quiet "$svc" 2>/dev/null; then
systemctl start "$svc" 2>/dev/null || true
echo " Restarted $svc"
fi
done
fi
# ============================================================================
# SAVE CUSTOM ENTRIES STATE FOR FUTURE PROTECTION CHECKS
# ============================================================================

View File

@ -18,11 +18,10 @@ export RADIUS=250
export HYSTERESIS=50
# --- Location check interval in seconds ---
export CHECK_INTERVAL=60
# --- Fail-safe: if location unavailable for this many consecutive checks,
# switch to unrestricted mode to avoid locking user out ---
export MAX_LOCATION_FAILS=5
# When focus mode is ON (at home): check frequently (phone can charge).
# When focus mode is OFF (away): check less often to save battery.
export CHECK_INTERVAL_FOCUS=30
export CHECK_INTERVAL_NORMAL=120
# --- Log file ---
export LOG_FILE="/data/local/tmp/focus_mode/focus_mode.log"

View File

@ -54,18 +54,18 @@ check_adb() {
check_coords() {
local lat lon
lat="$(grep '^HOME_LAT=' "$SCRIPT_DIR/config.sh" | cut -d'"' -f2)"
lon="$(grep '^HOME_LON=' "$SCRIPT_DIR/config.sh" | cut -d'"' -f2)"
lat="$(grep '^.*HOME_LAT=' "$SCRIPT_DIR/config.sh" "$SCRIPT_DIR/config_secrets.sh" 2>/dev/null | tail -1 | cut -d'"' -f2)"
lon="$(grep '^.*HOME_LON=' "$SCRIPT_DIR/config.sh" "$SCRIPT_DIR/config_secrets.sh" 2>/dev/null | tail -1 | cut -d'"' -f2)"
# Allow redacted values locally - real coords live only on the phone
if [ "$lat" = "0.000000" ] && [ "$lon" = "0.000000" ]; then
echo "ERROR: You must set your home coordinates in config.sh before deploying!"
echo ""
echo " 1. Find your coords on Google Maps (right-click your apartment)"
echo " 2. Edit phone_focus_mode/config_secrets.sh:"
echo " HOME_LAT=\"-48.876667\""
echo " HOME_LON=\"-123.393333\""
echo "ERROR: Home coordinates not set (all zeros). Set them in config_secrets.sh."
exit 1
fi
echo " Home location: $lat, $lon"
if [ -z "$lat" ] || [ -z "$lon" ]; then
echo " Home location: (not set locally - will use values on phone)"
else
echo " Home location: $lat, $lon"
fi
}
check_ip() {
@ -119,40 +119,40 @@ do_deploy() {
adb_root "chmod 777 /data/local/tmp/focus_stage"
echo "[4/6] Uploading scripts..."
adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/config.sh" "/data/local/tmp/focus_stage/config.sh"
adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/focus_daemon.sh" "/data/local/tmp/focus_stage/focus_daemon.sh"
adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/focus_ctl.sh" "/data/local/tmp/focus_stage/focus_ctl.sh"
adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/magisk_service.sh" "/data/local/tmp/focus_stage/99-focus-mode.sh"
adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/config.sh" "/data/local/tmp/focus_stage/config.sh"
adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/focus_daemon.sh" "/data/local/tmp/focus_stage/focus_daemon.sh"
adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/focus_ctl.sh" "/data/local/tmp/focus_stage/focus_ctl.sh"
adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/magisk_service.sh" "/data/local/tmp/focus_stage/99-focus-mode.sh"
# Only push config_secrets.sh if phone doesn't already have one
if adb_root "test -f $REMOTE_DIR/config_secrets.sh" 2>/dev/null; then
echo " config_secrets.sh already exists on phone - skipping (preserving real coords)"
else
echo " Pushing config_secrets.sh (first install)..."
adb -s "$PHONE_IP:5555" push "$SCRIPT_DIR/config_secrets.sh" "/data/local/tmp/focus_stage/config_secrets.sh"
adb_root "cp /data/local/tmp/focus_stage/config_secrets.sh $REMOTE_DIR/config_secrets.sh"
fi
# Move staged files into place with root
adb_root "cp /data/local/tmp/focus_stage/config.sh $REMOTE_DIR/config.sh"
adb_root "cp /data/local/tmp/focus_stage/focus_daemon.sh $REMOTE_DIR/focus_daemon.sh"
adb_root "cp /data/local/tmp/focus_stage/focus_ctl.sh $REMOTE_DIR/focus_ctl.sh"
adb_root "cp /data/local/tmp/focus_stage/99-focus-mode.sh /data/adb/service.d/99-focus-mode.sh"
adb_root "cp /data/local/tmp/focus_stage/config.sh $REMOTE_DIR/config.sh"
adb_root "cp /data/local/tmp/focus_stage/focus_daemon.sh $REMOTE_DIR/focus_daemon.sh"
adb_root "cp /data/local/tmp/focus_stage/focus_ctl.sh $REMOTE_DIR/focus_ctl.sh"
adb_root "cp /data/local/tmp/focus_stage/99-focus-mode.sh /data/adb/service.d/99-focus-mode.sh"
adb_root "rm -rf /data/local/tmp/focus_stage"
echo "[5/6] Setting permissions..."
adb_root "chmod 755 $REMOTE_DIR/config.sh $REMOTE_DIR/focus_daemon.sh $REMOTE_DIR/focus_ctl.sh"
adb_root "chmod 755 $REMOTE_DIR/config.sh $REMOTE_DIR/focus_daemon.sh $REMOTE_DIR/focus_ctl.sh" || true
adb_root "chmod 755 /data/adb/service.d/99-focus-mode.sh"
adb_root "touch $REMOTE_DIR/disabled_by_focus.txt"
adb_root "touch $REMOTE_DIR/focus_mode.log"
echo "[6/6] Starting daemon..."
# Kill existing daemon via pidfile to avoid hitting the ADB shell process
adb_root "
PIDFILE=$REMOTE_DIR/daemon.pid
if [ -f \"\$PIDFILE\" ]; then
OLD_PID=\$(cat \"\$PIDFILE\")
kill -9 \"\$OLD_PID\" 2>/dev/null
rm -f \"\$PIDFILE\"
fi
# Also kill any stray instances
for p in \$(pgrep -f focus_daemon.sh 2>/dev/null); do kill -9 \$p 2>/dev/null; done
sleep 1
setsid sh $REMOTE_DIR/focus_daemon.sh </dev/null >/dev/null 2>&1 &
echo \$!
"
sleep 3
# Stop existing daemon, then start fresh
adb_root "kill \$(cat $REMOTE_DIR/daemon.pid 2>/dev/null) 2>/dev/null; true"
sleep 1
adb_root "rm -f $REMOTE_DIR/daemon.pid"
adb -s "$PHONE_IP:5555" shell su -c 'sh /data/local/tmp/focus_mode/focus_daemon.sh </dev/null >/dev/null 2>/dev/null &'
sleep 4
echo ""
echo "=== Deploy complete! ==="

View File

@ -66,8 +66,8 @@ init() {
CURRENT_MODE="normal"
fi
LOCATION_FAIL_COUNT=0
log "Focus mode daemon started (PID=$$, mode=$CURRENT_MODE, home=$HOME_LAT,$HOME_LON, radius=${RADIUS}m)"
log "Intervals: focus=${CHECK_INTERVAL_FOCUS}s normal=${CHECK_INTERVAL_NORMAL}s"
}
# ---- Location ----
@ -194,19 +194,18 @@ main() {
disable_focus_mode
fi
LOCATION_FAIL_COUNT=0
log "Location: $lat,$lon | Distance: ${distance}m | Threshold: ${threshold}m | Mode: $CURRENT_MODE"
else
LOCATION_FAIL_COUNT=$((LOCATION_FAIL_COUNT + 1))
log "Location unavailable (attempt $LOCATION_FAIL_COUNT/$MAX_LOCATION_FAILS)"
if [ "$LOCATION_FAIL_COUNT" -ge "$MAX_LOCATION_FAILS" ]; then
log "FAIL-SAFE: Location unavailable too long, switching to normal mode"
disable_focus_mode
fi
log "Location unavailable - defaulting to focus mode (restrictions ON)"
enable_focus_mode
fi
sleep "$CHECK_INTERVAL"
# Dynamic interval: shorter at home (can charge), longer away (save battery)
if [ "$CURRENT_MODE" = "focus" ]; then
sleep "$CHECK_INTERVAL_FOCUS"
else
sleep "$CHECK_INTERVAL_NORMAL"
fi
done
}

View File

@ -0,0 +1,55 @@
# Steam Backlog Enforcer
Forces you to 100% complete one Steam game at a time before moving on.
## Features
- **Achievement tracking**: Picks the next game by shortest HLTB completionist time
- **Store blocking**: Blocks `store.steampowered.com` via `/etc/hosts`
- **Game uninstalling**: Removes all installed games except the assigned one
- **Process enforcement**: Kills unauthorized game processes
- **Tampering detection**: Detects achievement unlocks on non-assigned games
- **HLTB integration**: Estimates completion time with persistent cache
## Setup
```bash
python -m python_pkg.steam_backlog_enforcer.main setup
```
## Commands
| Command | Description |
| ----------- | ------------------------------------------ |
| `scan` | Scan library, fetch HLTB data, assign game |
| `check` | Check if assigned game is complete |
| `status` | Show current assignment and blocking |
| `list` | List incomplete games from snapshot |
| `skip` | Skip the currently assigned game |
| `enforce` | Run enforcer (block, uninstall, kill) |
| `unblock` | Remove store blocking |
| `reset` | Reset all state |
| `installed` | List currently installed Steam games |
| `uninstall` | Interactively uninstall non-assigned games |
| `setup` | First-time configuration |
## Enforce mode
```bash
sudo python -m python_pkg.steam_backlog_enforcer.main enforce
```
This will:
1. Block Steam store in `/etc/hosts`
2. Uninstall all games except the assigned one
3. Continuously kill any unauthorized game processes
## Game Uninstall
Directly removes appmanifest files and game directories from `~/.local/share/Steam/steamapps/`.
Preserves Proton versions and Steam Linux Runtime.
```bash
python -m python_pkg.steam_backlog_enforcer.main uninstall
```

View File

@ -0,0 +1 @@
"""Steam Backlog Enforcer - forces you to finish your Steam games."""

View File

@ -0,0 +1,114 @@
"""Configuration management for Steam Backlog Enforcer."""
from __future__ import annotations
from dataclasses import dataclass, field
import json
from pathlib import Path
import sys
from typing import Any
CONFIG_DIR = Path.home() / ".config" / "steam_backlog_enforcer"
CONFIG_FILE = CONFIG_DIR / "config.json"
STATE_FILE = CONFIG_DIR / "state.json"
SNAPSHOT_FILE = CONFIG_DIR / "snapshot.json"
LOG_FILE = CONFIG_DIR / "enforcer.log"
# Steam store domains to block.
BLOCKED_DOMAINS = [
"store.steampowered.com",
"checkout.steampowered.com",
"store.akamai.steamstatic.com",
"storefront.steampowered.com",
"store.cloudflare.steamstatic.com",
]
HOSTS_FILE = Path("/etc/hosts")
@dataclass
class Config:
"""User configuration."""
steam_api_key: str = ""
steam_id: str = ""
skip_app_ids: list[int] = field(default_factory=list)
block_store: bool = True
kill_unauthorized_games: bool = True
uninstall_other_games: bool = True
desktop_notifications: bool = True
def save(self) -> None:
"""Persist config to disk."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
CONFIG_FILE.write_text(
json.dumps(self.__dict__, indent=2) + "\n", encoding="utf-8"
)
@classmethod
def load(cls) -> Config:
"""Load config from disk, or return defaults."""
if CONFIG_FILE.exists():
data = json.loads(CONFIG_FILE.read_text(encoding="utf-8"))
return cls(
**{k: v for k, v in data.items() if k in cls.__dataclass_fields__}
)
return cls()
@dataclass
class State:
"""Persistent state across runs."""
current_app_id: int | None = None
current_game_name: str = ""
finished_app_ids: list[int] = field(default_factory=list)
def save(self) -> None:
"""Persist state to disk."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(
json.dumps(self.__dict__, indent=2) + "\n", encoding="utf-8"
)
@classmethod
def load(cls) -> State:
"""Load state from disk, or return defaults."""
if STATE_FILE.exists():
data = json.loads(STATE_FILE.read_text(encoding="utf-8"))
return cls(
**{k: v for k, v in data.items() if k in cls.__dataclass_fields__}
)
return cls()
def save_snapshot(data: list[dict[str, Any]]) -> None:
"""Save an achievement snapshot to disk."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
SNAPSHOT_FILE.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
def load_snapshot() -> list[dict[str, Any]] | None:
"""Load the cached achievement snapshot, or None if absent."""
if SNAPSHOT_FILE.exists():
result: list[dict[str, Any]] = json.loads(
SNAPSHOT_FILE.read_text(encoding="utf-8")
)
return result
return None
def interactive_setup() -> Config:
"""Run first-time interactive setup."""
api_key = input("Enter your Steam Web API key: ").strip()
if not api_key:
sys.exit(1)
steam_id = input("Enter your Steam64 ID: ").strip()
if not steam_id:
sys.exit(1)
config = Config(steam_api_key=api_key, steam_id=steam_id)
config.save()
CONFIG_FILE.chmod(0o600)
return config

View File

@ -0,0 +1,89 @@
"""Enforce that only the assigned game may run."""
from __future__ import annotations
import logging
import os
from pathlib import Path
import shutil
import signal
import subprocess
logger = logging.getLogger(__name__)
def get_running_steam_game_pids() -> dict[int, int]:
"""Scan /proc to find running Steam game processes.
Returns: dict mapping PID -> SteamAppId.
"""
running: dict[int, int] = {}
proc = Path("/proc")
for entry in proc.iterdir():
if not entry.name.isdigit():
continue
try:
environ = (entry / "environ").read_bytes()
pairs = environ.split(b"\x00")
for pair in pairs:
if pair.startswith(b"SteamAppId="):
value = pair.split(b"=", 1)[1].decode("utf-8", errors="replace")
if value.isdigit():
running[int(entry.name)] = int(value)
break
except (PermissionError, OSError, ValueError):
continue
return running
def enforce_allowed_game(
allowed_app_id: int | None,
*,
kill_unauthorized: bool = True,
) -> list[tuple[int, int]]:
"""Check running games; optionally kill unauthorized ones.
Returns list of (pid, app_id) that were killed or detected.
"""
running = get_running_steam_game_pids()
violations: list[tuple[int, int]] = []
for pid, app_id in running.items():
if allowed_app_id is not None and app_id == allowed_app_id:
continue
# Skip Steam client itself (app_id 0 or very low IDs).
if app_id == 0:
continue
violations.append((pid, app_id))
if kill_unauthorized:
kill_process(pid, app_id)
return violations
def kill_process(pid: int, app_id: int) -> None:
"""Kill a process by PID."""
try:
logger.warning("Killing unauthorized game (AppID=%d, PID=%d)", app_id, pid)
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
logger.debug("Process %d already gone.", pid)
except PermissionError:
logger.exception("No permission to kill PID %d.", pid)
def send_notification(title: str, body: str) -> None:
"""Send a desktop notification."""
_notify_send = shutil.which("notify-send") or "/usr/bin/notify-send"
try:
subprocess.run(
[_notify_send, title, body, "--icon=dialog-warning"],
capture_output=True,
timeout=5,
check=False,
)
except (FileNotFoundError, OSError):
logger.debug("notify-send not available.")

View File

@ -0,0 +1,125 @@
"""HowLongToBeat integration for estimating game completion times."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
import json
import logging
from howlongtobeatpy import HowLongToBeat
from python_pkg.steam_backlog_enforcer.config import CONFIG_DIR
logger = logging.getLogger(__name__)
HLTB_CACHE_FILE = CONFIG_DIR / "hltb_cache.json"
MAX_CONCURRENT = 30
MIN_SIMILARITY = 0.5
@dataclass
class HLTBResult:
"""Result from a HowLongToBeat lookup."""
app_id: int
game_name: str
completionist_hours: float
similarity: float
def load_hltb_cache() -> dict[int, float]:
"""Load the persistent HLTB cache from disk.
Returns: dict mapping app_id -> completionist_hours.
"""
if HLTB_CACHE_FILE.exists():
try:
data = json.loads(HLTB_CACHE_FILE.read_text(encoding="utf-8"))
return {int(k): float(v) for k, v in data.items()}
except (json.JSONDecodeError, ValueError, OSError):
logger.warning("Corrupt HLTB cache, starting fresh.")
return {}
def save_hltb_cache(cache: dict[int, float]) -> None:
"""Save the HLTB cache to disk."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
try:
HLTB_CACHE_FILE.write_text(
json.dumps({str(k): v for k, v in cache.items()}, indent=2) + "\n",
encoding="utf-8",
)
except OSError:
logger.exception("Failed to save HLTB cache")
async def _search_one(
sem: asyncio.Semaphore, app_id: int, name: str
) -> HLTBResult | None:
"""Search HLTB for a single game."""
async with sem:
try:
results = await HowLongToBeat().async_search(name)
if results:
best = max(results, key=lambda r: r.similarity)
if best.similarity >= MIN_SIMILARITY:
comp = best.completionist
if comp and comp > 0:
return HLTBResult(
app_id=app_id,
game_name=name,
completionist_hours=comp,
similarity=best.similarity,
)
except (OSError, ValueError, TypeError, AttributeError) as e:
logger.debug("HLTB search failed for '%s': %s", name, e)
return None
async def _fetch_batch(
games: list[tuple[int, str]],
) -> list[HLTBResult]:
"""Fetch HLTB data for a batch of games concurrently."""
sem = asyncio.Semaphore(MAX_CONCURRENT)
tasks = [_search_one(sem, app_id, name) for app_id, name in games]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
def fetch_hltb_times(games: list[tuple[int, str]]) -> list[HLTBResult]:
"""Synchronous wrapper: fetch HLTB times for games."""
if not games:
return []
return asyncio.run(_fetch_batch(games))
def fetch_hltb_times_cached(
games: list[tuple[int, str]],
) -> dict[int, float]:
"""Fetch HLTB times, using disk cache for already-known games.
Returns: dict mapping app_id -> completionist_hours.
"""
cache = load_hltb_cache()
uncached = [(app_id, name) for app_id, name in games if app_id not in cache]
if uncached:
logger.info(
"Fetching HLTB data for %d uncached games (out of %d total)...",
len(uncached),
len(games),
)
results = fetch_hltb_times(uncached)
for r in results:
cache[r.app_id] = r.completionist_hours
# Also cache misses as -1 so we don't re-fetch them.
found_ids = {r.app_id for r in results}
for app_id, _ in uncached:
if app_id not in found_ids:
cache[app_id] = -1
save_hltb_cache(cache)
else:
logger.info("All %d games found in HLTB cache.", len(games))
return cache

View File

@ -0,0 +1,41 @@
#!/usr/bin/env bash
# Install script for Steam Backlog Enforcer.
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_ROOT="$(cd "$SCRIPT_DIR/../../.." && pwd)"
echo "=== Steam Backlog Enforcer Installer ==="
echo
# Install Python deps.
echo "Installing Python dependencies..."
pip3 install --break-system-packages requests howlongtobeatpy 2>/dev/null \
|| pip3 install requests howlongtobeatpy
# Install systemd service (system-level, runs as root).
read -rp "Install systemd enforce service? [y/N] " ans
if [[ "${ans,,}" == "y" ]]; then
if [[ $EUID -ne 0 ]]; then
echo "Error: systemd service install needs root. Re-run with sudo."
exit 1
fi
SERVICE_SRC="$SCRIPT_DIR/steam-backlog-enforcer.service"
SERVICE_DST="/etc/systemd/system/steam-backlog-enforcer.service"
# Set the correct working directory in the service file.
sed "s|WorkingDirectory=.*|WorkingDirectory=$REPO_ROOT|" "$SERVICE_SRC" \
> "$SERVICE_DST"
systemctl daemon-reload
systemctl enable steam-backlog-enforcer
echo "Service installed and enabled."
echo " Start now: sudo systemctl start steam-backlog-enforcer"
echo " Check: sudo systemctl status steam-backlog-enforcer"
echo " Logs: sudo journalctl -u steam-backlog-enforcer -f"
fi
echo
echo "Done! Run manually with:"
echo " sudo python3 -m python_pkg.steam_backlog_enforcer.main enforce"

View File

@ -0,0 +1,354 @@
"""Hide / unhide games in the Steam library via sharedconfig.vdf.
Steam stores per-app settings (including the "hidden" flag) in
``userdata/<userid>/7/remote/sharedconfig.vdf`` under the path:
UserRoamingConfigStore > Software > Valve > Steam > apps > <appid>
Setting ``"hidden" "1"`` makes the game invisible in the default
library view. This module provides functions to bulk-hide every owned
game *except* the currently assigned one, and to unhide them all when
enforcement is lifted.
Steam must be restarted (or not running) for the changes to take effect,
because it overwrites the file on exit.
"""
from __future__ import annotations
import contextlib
import logging
import os
from pathlib import Path
import pwd
import re
import shutil
import subprocess
from typing import Any
logger = logging.getLogger(__name__)
# Steam user-data paths.
_STEAM_DIR = Path.home() / ".local" / "share" / "Steam"
_USERDATA_DIR = _STEAM_DIR / "userdata"
_SHARED_CONFIG_REL = Path("7") / "remote" / "sharedconfig.vdf"
# ──────────────────────────────────────────────────────────────
# Minimal VDF parser / writer
# ──────────────────────────────────────────────────────────────
def _parse_vdf(text: str) -> dict[str, Any]:
"""Parse a Valve VDF text file into nested dicts.
Only handles the subset used by sharedconfig.vdf (string values and
nested sections).
"""
tokens: list[str] = []
for m in re.finditer(r'"([^"]*)"|\{|\}', text):
if m.group(1) is not None:
tokens.append(m.group(1))
else:
tokens.append(m.group(0)) # "{" or "}"
idx = 0
def _parse_obj() -> dict[str, Any]:
nonlocal idx
obj: dict[str, Any] = {}
while idx < len(tokens):
token = tokens[idx]
if token == "}": # noqa: S105
idx += 1
return obj
# Key.
key = token
idx += 1
if idx >= len(tokens):
break
# Value: either a string or a nested object.
nxt = tokens[idx]
if nxt == "{":
idx += 1
obj[key] = _parse_obj()
elif nxt == "}":
# Key without value right before closing brace — skip.
obj[key] = ""
# Don't advance; let the outer loop consume '}'.
else:
obj[key] = nxt
idx += 1
return obj
return _parse_obj()
def _write_vdf(data: dict[str, Any], indent: int = 0) -> str:
"""Serialize a nested dict back to VDF text."""
lines: list[str] = []
prefix = "\t" * indent
for key, value in data.items():
if isinstance(value, dict):
lines.append(f'{prefix}"{key}"')
lines.append(f"{prefix}{{")
lines.append(_write_vdf(value, indent + 1))
lines.append(f"{prefix}}}")
else:
lines.append(f'{prefix}"{key}"\t\t"{value}"')
return "\n".join(lines)
# ──────────────────────────────────────────────────────────────
# Discover Steam user IDs on this machine
# ──────────────────────────────────────────────────────────────
def _find_user_dirs() -> list[Path]:
"""Return paths to all numeric userdata directories except '0'."""
if not _USERDATA_DIR.is_dir():
return []
return [p for p in _USERDATA_DIR.iterdir() if p.name.isdigit() and p.name != "0"]
# ──────────────────────────────────────────────────────────────
# Hide / unhide logic
# ──────────────────────────────────────────────────────────────
def _get_apps_section(
vdf_data: dict[str, Any],
) -> dict[str, Any] | None:
"""Navigate to the ``apps`` dict inside the VDF tree."""
try:
steam_section = vdf_data["UserRoamingConfigStore"]["Software"]["Valve"]["Steam"]
if "apps" not in steam_section:
steam_section["apps"] = {}
except (KeyError, TypeError):
return None
else:
result: dict[str, Any] = steam_section["apps"]
return result
def _hide_games_in_profile(
config_path: Path,
user_dir: Path,
owned_app_ids: list[int],
allowed_app_id: int | None,
) -> int:
"""Hide games in a single Steam user profile.
Args:
config_path: Path to the sharedconfig.vdf file.
user_dir: Path to the user's data directory.
owned_app_ids: List of owned game app IDs.
allowed_app_id: App ID of the game that should remain visible.
Returns:
Number of games hidden in this profile.
"""
# Back up the original.
backup = config_path.with_suffix(".vdf.bak")
if not backup.exists():
shutil.copy2(config_path, backup)
text = config_path.read_text(encoding="utf-8")
vdf_data = _parse_vdf(text)
apps = _get_apps_section(vdf_data)
if apps is None:
logger.warning("Could not find apps section in %s", config_path)
return 0
hidden_count = _apply_hide_flags(apps, owned_app_ids, allowed_app_id)
output = _write_vdf(vdf_data) + "\n"
config_path.write_text(output, encoding="utf-8")
_fix_ownership(config_path, user_dir)
logger.info("Hidden %d games in profile %s", hidden_count, user_dir.name)
return hidden_count
def _apply_hide_flags(
apps: dict[str, Any],
owned_app_ids: list[int],
allowed_app_id: int | None,
) -> int:
"""Set hidden flags on all games except the allowed one.
Args:
apps: The VDF apps section dict.
owned_app_ids: List of owned app IDs.
allowed_app_id: App ID to keep visible.
Returns:
Number of games newly hidden.
"""
hidden_count = 0
for app_id in owned_app_ids:
sid = str(app_id)
if app_id == allowed_app_id:
if sid in apps and isinstance(apps[sid], dict):
apps[sid].pop("hidden", None)
continue
if sid not in apps or not isinstance(apps[sid], dict):
apps[sid] = {}
if apps[sid].get("hidden") != "1":
apps[sid]["hidden"] = "1"
hidden_count += 1
return hidden_count
def hide_other_games(
owned_app_ids: list[int],
allowed_app_id: int | None,
) -> int:
"""Hide every owned game except *allowed_app_id* in the Steam library.
Modifies ``sharedconfig.vdf`` for every local Steam user profile.
Steam must be restarted for changes to take effect.
Returns the number of games that were hidden.
"""
user_dirs = _find_user_dirs()
if not user_dirs:
logger.warning("No Steam userdata directories found.")
return 0
total_hidden = 0
for user_dir in user_dirs:
config_path = user_dir / _SHARED_CONFIG_REL
if not config_path.exists():
logger.debug("No sharedconfig.vdf in %s", user_dir.name)
continue
total_hidden += _hide_games_in_profile(
config_path, user_dir, owned_app_ids, allowed_app_id
)
return total_hidden
def unhide_all_games(owned_app_ids: list[int]) -> int:
"""Remove the hidden flag from all owned games.
Returns the number of games that were unhidden.
"""
user_dirs = _find_user_dirs()
total = 0
for user_dir in user_dirs:
config_path = user_dir / _SHARED_CONFIG_REL
if not config_path.exists():
continue
text = config_path.read_text(encoding="utf-8")
vdf_data = _parse_vdf(text)
apps = _get_apps_section(vdf_data)
if apps is None:
continue
count = 0
for app_id in owned_app_ids:
sid = str(app_id)
if sid in apps and isinstance(apps[sid], dict):
if apps[sid].pop("hidden", None) is not None:
count += 1
# Remove the entry entirely if it's now empty.
if not apps[sid]:
del apps[sid]
output = _write_vdf(vdf_data) + "\n"
config_path.write_text(output, encoding="utf-8")
_fix_ownership(config_path, user_dir)
logger.info("Unhidden %d games in profile %s", count, user_dir.name)
total += count
return total
# ──────────────────────────────────────────────────────────────
# Steam restart helper
# ──────────────────────────────────────────────────────────────
def restart_steam() -> None:
"""Gracefully restart the Steam client.
Sends ``steam -shutdown``, waits, then launches again with ``-silent``.
"""
real_user = os.environ.get("SUDO_USER") or os.environ.get("USER")
logger.info("Restarting Steam client...")
# Shut down Steam gracefully.
try:
_run_as_user(["steam", "-shutdown"], real_user)
except FileNotFoundError:
logger.warning("Steam executable not found for restart.")
return
# Wait for Steam to exit.
import time
_pgrep = shutil.which("pgrep") or "/usr/bin/pgrep"
for _ in range(30):
result = subprocess.run(
[_pgrep, "-f", "steam.sh"],
capture_output=True,
check=False,
)
if result.returncode != 0:
break
time.sleep(1)
# Relaunch silently.
with contextlib.suppress(FileNotFoundError):
_run_as_user(["steam", "-silent"], real_user)
def _run_as_user(cmd: list[str], user: str | None) -> None:
"""Run a command, dropping to *user* if currently root."""
if os.geteuid() == 0 and user and user != "root":
try:
pw = pwd.getpwnam(user)
uid = pw.pw_uid
except KeyError:
uid = 1000
dbus_default = f"unix:path=/run/user/{uid}/bus"
dbus_addr = os.environ.get("DBUS_SESSION_BUS_ADDRESS", dbus_default)
xauth = os.environ.get("XAUTHORITY", f"/home/{user}/.Xauthority")
full_cmd = [
"sudo",
"-u",
user,
"env",
f"DISPLAY={os.environ.get('DISPLAY', ':0')}",
f"XAUTHORITY={xauth}",
f"DBUS_SESSION_BUS_ADDRESS={dbus_addr}",
*cmd,
]
else:
full_cmd = cmd
subprocess.Popen(
full_cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def _fix_ownership(path: Path, user_dir: Path) -> None:
"""If running as root, chown the file to the user who owns user_dir."""
if os.geteuid() != 0:
return
try:
stat = user_dir.stat()
os.chown(path, stat.st_uid, stat.st_gid)
except OSError:
pass

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,19 @@
[Unit]
Description=Steam Backlog Enforcer
After=network-online.target graphical.target
Wants=network-online.target
[Service]
Type=simple
WorkingDirectory=/home/kuhy/testsAndMisc
ExecStart=/usr/bin/python3 -m python_pkg.steam_backlog_enforcer.main enforce
Restart=always
RestartSec=5
Environment=PYTHONUNBUFFERED=1
Environment=PYTHONPATH=/home/kuhy/.local/lib/python3.14/site-packages
Environment=HOME=/home/kuhy
# Hardening: enforcer must not be easily killed.
OOMScoreAdjust=-900
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,276 @@
"""Steam Web API client for fetching games and achievement data."""
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
import logging
import threading
import time
from typing import TYPE_CHECKING, Any
import requests
if TYPE_CHECKING:
from collections.abc import Callable
logger = logging.getLogger(__name__)
STEAM_API_BASE = "https://api.steampowered.com"
MAX_WORKERS = 20
@dataclass
class AchievementInfo:
"""Single achievement state."""
api_name: str
display_name: str
achieved: bool
unlock_time: int
@dataclass
class GameInfo:
"""Info about an owned Steam game."""
app_id: int
name: str
total_achievements: int
unlocked_achievements: int
playtime_minutes: int
achievements: list[AchievementInfo] = field(default_factory=list)
completionist_hours: float = -1
@property
def completion_pct(self) -> float:
"""Achievement completion percentage."""
if self.total_achievements == 0:
return 100.0
return (self.unlocked_achievements / self.total_achievements) * 100.0
@property
def is_complete(self) -> bool:
"""True if all achievements are unlocked."""
return (
self.total_achievements > 0
and self.unlocked_achievements >= self.total_achievements
)
def to_snapshot(self) -> dict[str, Any]:
"""Serialize to JSON-safe dict."""
return {
"app_id": self.app_id,
"name": self.name,
"total_achievements": self.total_achievements,
"unlocked_achievements": self.unlocked_achievements,
"playtime_minutes": self.playtime_minutes,
"completionist_hours": self.completionist_hours,
"achievements": [
{
"api_name": a.api_name,
"display_name": a.display_name,
"achieved": a.achieved,
"unlock_time": a.unlock_time,
}
for a in self.achievements
],
}
@classmethod
def from_snapshot(cls, data: dict[str, Any]) -> GameInfo:
"""Deserialize from a cached snapshot dict."""
achievements = [
AchievementInfo(
api_name=a["api_name"],
display_name=a.get("display_name", a["api_name"]),
achieved=a["achieved"],
unlock_time=a.get("unlock_time", 0),
)
for a in data.get("achievements", [])
]
return cls(
app_id=data["app_id"],
name=data["name"],
total_achievements=data["total_achievements"],
unlocked_achievements=data["unlocked_achievements"],
playtime_minutes=data.get("playtime_minutes", 0),
completionist_hours=data.get("completionist_hours", -1),
achievements=achievements,
)
class SteamAPIError(Exception):
"""Raised when the Steam API returns an error."""
class SteamAPIClient:
"""Client for interacting with the Steam Web API."""
def __init__(self, api_key: str, steam_id: str) -> None:
"""Initialize the Steam API client.
Args:
api_key: Steam Web API key.
steam_id: Steam64 ID of the user.
"""
self.api_key = api_key
self.steam_id = steam_id
self.session = requests.Session()
self.session.headers["Accept"] = "application/json"
self._rate_lock = threading.Lock()
self._request_times: list[float] = []
self._max_rps = 18
def _rate_limit(self) -> None:
"""Enforce rate limit across threads."""
while True:
with self._rate_lock:
now = time.time()
self._request_times = [t for t in self._request_times if now - t < 1.0]
if len(self._request_times) < self._max_rps:
self._request_times.append(now)
return
time.sleep(0.06)
def _get(self, url: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
"""Rate-limited GET request."""
self._rate_limit()
if params is None:
params = {}
params["key"] = self.api_key
try:
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
result: dict[str, Any] = resp.json()
except requests.RequestException as e:
msg = f"Steam API request failed: {e}"
raise SteamAPIError(msg) from e
else:
return result
def get_owned_games(self) -> list[dict[str, Any]]:
"""Fetch all games owned by the user."""
url = f"{STEAM_API_BASE}/IPlayerService/GetOwnedGames/v1/"
data = self._get(
url,
{
"steamid": self.steam_id,
"include_appinfo": "true",
"include_played_free_games": "true",
"format": "json",
},
)
games: list[dict[str, Any]] = data.get("response", {}).get("games", [])
logger.info("Found %d owned games.", len(games))
return games
def get_achievement_details(self, app_id: int) -> list[AchievementInfo]:
"""Fetch per-achievement detail for a game."""
url = f"{STEAM_API_BASE}/ISteamUserStats/GetPlayerAchievements/v1/"
try:
data = self._get(
url,
{
"steamid": self.steam_id,
"appid": str(app_id),
"l": "english",
"format": "json",
},
)
except SteamAPIError:
return []
stats = data.get("playerstats", {})
if not stats.get("success", False):
return []
raw: list[dict[str, Any]] = stats.get("achievements", [])
return [
AchievementInfo(
api_name=a.get("apiname", ""),
display_name=a.get("name", a.get("apiname", "")),
achieved=bool(a.get("achieved", 0)),
unlock_time=a.get("unlocktime", 0),
)
for a in raw
]
def _fetch_one_game(
self, game_dict: dict[str, Any], skip: set[int]
) -> GameInfo | None:
"""Fetch achievement data for one game. Thread-safe."""
app_id = game_dict["appid"]
if app_id in skip:
return None
achievements = self.get_achievement_details(app_id)
if not achievements:
return None
name = game_dict.get("name", f"Unknown ({app_id})")
total = len(achievements)
unlocked = sum(1 for a in achievements if a.achieved)
return GameInfo(
app_id=app_id,
name=name,
total_achievements=total,
unlocked_achievements=unlocked,
playtime_minutes=game_dict.get("playtime_forever", 0),
achievements=achievements,
)
def build_game_list(
self,
skip_app_ids: list[int] | None = None,
progress_callback: Callable[[int, int], None] | None = None,
) -> list[GameInfo]:
"""Build full game list with achievement data (parallel)."""
skip = set(skip_app_ids or [])
owned = self.get_owned_games()
games: list[GameInfo] = []
done_count = 0
total = len(owned)
lock = threading.Lock()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
futures = {pool.submit(self._fetch_one_game, g, skip): g for g in owned}
for future in as_completed(futures):
try:
result = future.result()
except (
KeyError,
TypeError,
ValueError,
SteamAPIError,
requests.RequestException,
):
result = None
with lock:
done_count += 1
if progress_callback:
progress_callback(done_count, total)
if result is not None:
games.append(result)
games.sort(key=lambda g: g.name.lower())
return games
def refresh_single_game(
self, app_id: int, name: str, playtime: int = 0
) -> GameInfo | None:
"""Re-fetch achievement data for one game."""
achievements = self.get_achievement_details(app_id)
if not achievements:
return None
total = len(achievements)
unlocked = sum(1 for a in achievements if a.achieved)
return GameInfo(
app_id=app_id,
name=name,
total_achievements=total,
unlocked_achievements=unlocked,
playtime_minutes=playtime,
achievements=achievements,
)

View File

@ -0,0 +1,268 @@
"""Block Steam Store access via /etc/hosts (hosts install script) and iptables.
The system uses a dedicated hosts install script at
linux_configuration/hosts/install.sh that manages /etc/hosts with:
- chattr +ia (immutable + append-only)
- read-only bind mount
- protection against removing entries (only adding is easy)
This module checks if the Steam Store domains are already blocked in
/etc/hosts. If not, it runs the hosts install.sh (which must already
contain the Steam Store entries in its heredoc). As a belt-and-suspenders
fallback, it also blocks via iptables.
"""
from __future__ import annotations
import contextlib
import logging
from pathlib import Path
import shutil
import socket
import subprocess
from python_pkg.steam_backlog_enforcer.config import (
BLOCKED_DOMAINS,
HOSTS_FILE,
)
logger = logging.getLogger(__name__)
# Path to the hosts install script (relative to repo root).
_REPO_ROOT = Path(__file__).resolve().parents[2]
HOSTS_INSTALL_SCRIPT = _REPO_ROOT / "linux_configuration" / "hosts" / "install.sh"
# iptables chain name for our blocking rules.
IPTABLES_CHAIN = "STEAM_ENFORCER"
# Resolved absolute paths for executables (avoids S607 partial-path warnings).
_SUDO = shutil.which("sudo") or "/usr/bin/sudo"
_IPTABLES = shutil.which("iptables") or "/usr/sbin/iptables"
_BASH = shutil.which("bash") or "/usr/bin/bash"
# IP address used in /etc/hosts for blocking domains.
_HOSTS_REDIRECT_IP = ".".join(["0"] * 4)
def is_store_blocked() -> bool:
"""Check if Steam Store domains are blocked in /etc/hosts."""
try:
content = HOSTS_FILE.read_text(encoding="utf-8")
# Check for at least the primary store domain.
if "store.steampowered.com" in content:
# Verify it's actually blocked (not commented out).
for line in content.splitlines():
stripped = line.strip()
if (
not stripped.startswith("#")
and "store.steampowered.com" in stripped
and stripped.startswith(_HOSTS_REDIRECT_IP)
):
return True
except OSError:
pass
return _is_iptables_blocked()
def block_store() -> bool:
"""Block Steam Store: run hosts install script + iptables fallback.
Returns True if at least one blocking method succeeded.
"""
hosts_ok = _block_via_hosts_install()
ipt_ok = _block_store_iptables()
if hosts_ok or ipt_ok:
flush_dns_cache()
return True
logger.error("All store-blocking methods failed.")
return False
def _block_via_hosts_install() -> bool:
"""Run the hosts install.sh to apply /etc/hosts with Steam Store entries.
The install script handles: immutable flag removal, bind mount remounting,
writing the file, re-applying protections, and DoH disabling.
"""
if is_store_blocked():
logger.info("Steam Store already blocked in /etc/hosts.")
return True
if not HOSTS_INSTALL_SCRIPT.exists():
logger.error("hosts install script not found at %s", HOSTS_INSTALL_SCRIPT)
return False
try:
logger.info("Running hosts install script to block Steam Store...")
result = subprocess.run(
[_SUDO, _BASH, str(HOSTS_INSTALL_SCRIPT), "--no-flush-dns"],
capture_output=True,
text=True,
timeout=120,
check=False,
)
except (OSError, subprocess.SubprocessError):
logger.exception("Failed to run hosts install script")
return False
else:
if result.returncode == 0:
logger.info("hosts install script succeeded.")
return True
logger.error(
"hosts install script failed (rc=%d): %s",
result.returncode,
result.stderr[-500:] if result.stderr else result.stdout[-500:],
)
return False
def _is_iptables_blocked() -> bool:
"""Check if our iptables chain exists and has rules."""
try:
result = subprocess.run(
[_SUDO, _IPTABLES, "-L", IPTABLES_CHAIN, "-n"],
capture_output=True,
text=True,
timeout=5,
check=False,
)
except (OSError, subprocess.SubprocessError):
return False
else:
return result.returncode == 0 and "DROP" in result.stdout
def _block_store_iptables() -> bool:
"""Block Steam Store domains using iptables (IP-based)."""
try:
# Create chain if it doesn't exist.
subprocess.run(
[_SUDO, _IPTABLES, "-N", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=False,
)
# Flush existing rules in our chain.
subprocess.run(
[_SUDO, _IPTABLES, "-F", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=True,
)
# Resolve domains and block their IPs.
blocked_ips: set[str] = set()
for domain in BLOCKED_DOMAINS:
with contextlib.suppress(socket.gaierror):
ips = socket.getaddrinfo(domain, 443, socket.AF_INET)
for _, _, _, _, addr in ips:
blocked_ips.add(addr[0])
for ip in blocked_ips:
subprocess.run(
[
_SUDO,
_IPTABLES,
"-A",
IPTABLES_CHAIN,
"-d",
ip,
"-j",
"DROP",
],
capture_output=True,
timeout=5,
check=True,
)
# Hook our chain into OUTPUT if not already there.
result = subprocess.run(
[_SUDO, _IPTABLES, "-C", "OUTPUT", "-j", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=False,
)
if result.returncode != 0:
subprocess.run(
[_SUDO, _IPTABLES, "-I", "OUTPUT", "-j", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=True,
)
except (OSError, subprocess.SubprocessError):
logger.exception("Failed to block store via iptables")
return False
else:
logger.info("Steam Store blocked via iptables (%d IPs).", len(blocked_ips))
return True
def unblock_store() -> bool:
"""Remove iptables-based Steam Store blocks.
NOTE: /etc/hosts entries are NOT removed the hosts install script's
protection mechanism intentionally makes removal difficult. Only
iptables rules are cleared.
"""
ipt_ok = _unblock_store_iptables()
flush_dns_cache()
if not ipt_ok:
logger.warning("Failed to remove iptables rules.")
logger.warning(
"Steam Store entries in /etc/hosts are protected and cannot be "
"removed programmatically. This is by design — you must manually "
"remove the immutable flag, bind mount, and edit the hosts install "
"script to unblock."
)
return ipt_ok
def _unblock_store_iptables() -> bool:
"""Remove iptables-based block."""
try:
subprocess.run(
[_SUDO, _IPTABLES, "-D", "OUTPUT", "-j", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=False,
)
subprocess.run(
[_SUDO, _IPTABLES, "-F", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=False,
)
subprocess.run(
[_SUDO, _IPTABLES, "-X", IPTABLES_CHAIN],
capture_output=True,
timeout=5,
check=False,
)
except (OSError, subprocess.SubprocessError):
logger.exception("Failed to unblock iptables")
return False
else:
logger.info("Steam Store unblocked from iptables.")
return True
def flush_dns_cache() -> None:
"""Flush the system DNS cache."""
commands = [
["systemd-resolve", "--flush-caches"],
["resolvectl", "flush-caches"],
["nscd", "--invalidate=hosts"],
]
for cmd in commands:
with contextlib.suppress(FileNotFoundError, OSError):
subprocess.run(
cmd,
capture_output=True,
timeout=5,
check=False,
)