diff --git a/steam_backlog_enforcer/library_hider.py b/steam_backlog_enforcer/library_hider.py index 5284bfb..a58c69a 100644 --- a/steam_backlog_enforcer/library_hider.py +++ b/steam_backlog_enforcer/library_hider.py @@ -1,115 +1,211 @@ -"""Hide / unhide games in the Steam library via sharedconfig.vdf. +"""Hide / unhide games in the Steam library via Chrome DevTools Protocol. -Steam stores per-app settings (including the "hidden" flag) in -``userdata//7/remote/sharedconfig.vdf`` under the path: +Modern Steam clients (2023+) use an internal ``collectionStore`` JS +object running inside the CEF (Chromium Embedded Framework) browser. +Game collections (including "hidden") are synced to Steam Cloud and +can only be reliably modified through this API. - UserRoamingConfigStore > Software > Valve > Steam > apps > +This module connects to Steam's ``SharedJSContext`` page over CDP +(Chrome DevTools Protocol) on a local debug port and evaluates +JavaScript to call ``collectionStore.SetAppsAsHidden()``. -Setting ``"hidden" "1"`` makes the game invisible in the default -library view. This module provides functions to bulk-hide every owned -game *except* the currently assigned one, and to unhide them all when -enforcement is lifted. - -Steam must be restarted (or not running) for the changes to take effect, -because it overwrites the file on exit. +Steam must be running with ``-cef-enable-debugging`` and +``-devtools-port=`` for this to work. If it isn't, the module +will shut Steam down and relaunch it with the required flags. """ from __future__ import annotations -import contextlib +import asyncio +import json import logging import os -from pathlib import Path import pwd -import re import shutil import subprocess -from typing import Any +import time +import urllib.request + +import websockets logger = logging.getLogger(__name__) -# Steam user-data paths. -_STEAM_DIR = Path.home() / ".local" / "share" / "Steam" -_USERDATA_DIR = _STEAM_DIR / "userdata" -_SHARED_CONFIG_REL = Path("7") / "remote" / "sharedconfig.vdf" +_CDP_PORT = 8080 +_CDP_TIMEOUT = 30 +_STEAM_STARTUP_WAIT = 45 # ────────────────────────────────────────────────────────────── -# Minimal VDF parser / writer +# CDP (Chrome DevTools Protocol) helpers # ────────────────────────────────────────────────────────────── -def _parse_vdf(text: str) -> dict[str, Any]: - """Parse a Valve VDF text file into nested dicts. +def _get_shared_js_ws_url() -> str | None: + """Query the CDP HTTP endpoint and return the SharedJSContext WS URL.""" + url = f"http://127.0.0.1:{_CDP_PORT}/json" + try: + with urllib.request.urlopen(url, timeout=5) as resp: # noqa: S310 + targets = json.loads(resp.read()) + except (OSError, ValueError): + return None - Only handles the subset used by sharedconfig.vdf (string values and - nested sections). + for target in targets: + if target.get("title") == "SharedJSContext": + ws_url: str = target["webSocketDebuggerUrl"] + return ws_url + return None + + +async def _evaluate_js_async(ws_url: str, expression: str) -> dict: + """Connect to a CDP WebSocket target and evaluate *expression*.""" + async with websockets.connect(ws_url) as ws: + msg = json.dumps( + { + "id": 1, + "method": "Runtime.evaluate", + "params": { + "expression": expression, + "returnByValue": True, + "awaitPromise": True, + }, + } + ) + await ws.send(msg) + resp = await asyncio.wait_for(ws.recv(), timeout=_CDP_TIMEOUT) + return json.loads(resp) + + +def _evaluate_js(expression: str) -> dict: + """Synchronous wrapper around :func:`_evaluate_js_async`.""" + ws_url = _get_shared_js_ws_url() + if ws_url is None: + msg = "SharedJSContext not found on CDP port" + raise RuntimeError(msg) + return asyncio.run(_evaluate_js_async(ws_url, expression)) + + +def _cdp_result_value(result: dict) -> str: + """Extract the return value from a CDP Runtime.evaluate response.""" + inner = result.get("result", {}).get("result", {}) + if "exceptionDetails" in result.get("result", {}): + desc = inner.get("description", "Unknown JS error") + msg = f"JS evaluation error: {desc}" + raise RuntimeError(msg) + value: str = inner.get("value", "") + return value + + +# ────────────────────────────────────────────────────────────── +# Ensure Steam is running with devtools port +# ────────────────────────────────────────────────────────────── + + +def _is_steam_running() -> bool: + """Check whether any Steam process is alive.""" + pgrep = shutil.which("pgrep") or "/usr/bin/pgrep" + result = subprocess.run( + [pgrep, "-x", "steam"], + capture_output=True, + check=False, + ) + return result.returncode == 0 + + +def _steam_has_debug_port() -> bool: + """Check whether steamwebhelper is listening on the CDP port.""" + return _get_shared_js_ws_url() is not None + + +def _wait_for_cdp_ready() -> bool: + """Wait up to *_STEAM_STARTUP_WAIT* seconds for CDP to become ready.""" + for _ in range(_STEAM_STARTUP_WAIT): + if _get_shared_js_ws_url() is not None: + return True + time.sleep(1) + return False + + +def _wait_for_collections_ready() -> bool: + """Wait until ``collectionStore`` is fully initialised. + + Right after Steam starts, the CDP port may be open but the + internal collection data hasn't loaded yet. Poll a lightweight + JS check until ``GetCollection`` stops throwing. """ - tokens: list[str] = [] - for m in re.finditer(r'"([^"]*)"|\{|\}', text): - if m.group(1) is not None: - tokens.append(m.group(1)) - else: - tokens.append(m.group(0)) # "{" or "}" - idx = 0 - - def _parse_obj() -> dict[str, Any]: - nonlocal idx - obj: dict[str, Any] = {} - while idx < len(tokens): - token = tokens[idx] - if token == "}": # noqa: S105 - idx += 1 - return obj - # Key. - key = token - idx += 1 - if idx >= len(tokens): - break - # Value: either a string or a nested object. - nxt = tokens[idx] - if nxt == "{": - idx += 1 - obj[key] = _parse_obj() - elif nxt == "}": - # Key without value right before closing brace — skip. - obj[key] = "" - # Don't advance; let the outer loop consume '}'. - else: - obj[key] = nxt - idx += 1 - return obj - - return _parse_obj() + js = ( + "(() => { try { collectionStore.GetCollection('hidden');" + " return 'ok'; } catch(e) { return 'not_ready'; } })()" + ) + for _ in range(_STEAM_STARTUP_WAIT): + try: + result = _evaluate_js(js) + if _cdp_result_value(result) == "ok": + return True + except RuntimeError: + pass + time.sleep(1) + return False -def _write_vdf(data: dict[str, Any], indent: int = 0) -> str: - """Serialize a nested dict back to VDF text.""" - lines: list[str] = [] - prefix = "\t" * indent +def _shutdown_steam() -> None: + """Send ``steam -shutdown`` and wait for the process to exit.""" + real_user = os.environ.get("SUDO_USER") or os.environ.get("USER") + try: + _run_as_user(["steam", "-shutdown"], real_user) + except FileNotFoundError: + return - for key, value in data.items(): - if isinstance(value, dict): - lines.append(f'{prefix}"{key}"') - lines.append(f"{prefix}{{") - lines.append(_write_vdf(value, indent + 1)) - lines.append(f"{prefix}}}") - else: - lines.append(f'{prefix}"{key}"\t\t"{value}"') - - return "\n".join(lines) + pgrep = shutil.which("pgrep") or "/usr/bin/pgrep" + for _ in range(30): + result = subprocess.run( + [pgrep, "-x", "steam"], + capture_output=True, + check=False, + ) + if result.returncode != 0: + return + time.sleep(1) -# ────────────────────────────────────────────────────────────── -# Discover Steam user IDs on this machine -# ────────────────────────────────────────────────────────────── +def _launch_steam_with_debug() -> None: + """Launch Steam with CEF debugging enabled.""" + real_user = os.environ.get("SUDO_USER") or os.environ.get("USER") + _run_as_user( + [ + "steam", + "-cef-enable-debugging", + f"-devtools-port={_CDP_PORT}", + "-silent", + ], + real_user, + ) -def _find_user_dirs() -> list[Path]: - """Return paths to all numeric userdata directories except '0'.""" - if not _USERDATA_DIR.is_dir(): - return [] - return [p for p in _USERDATA_DIR.iterdir() if p.name.isdigit() and p.name != "0"] +def ensure_steam_debug_port() -> None: + """Make sure Steam is running with the CDP debug port open. + + If Steam is running without the port, it is restarted. + If Steam is not running, it is launched. + """ + if _steam_has_debug_port(): + logger.debug("Steam CDP port already available.") + return + + logger.info("Steam CDP port not available — (re)starting Steam...") + if _is_steam_running(): + _shutdown_steam() + + _launch_steam_with_debug() + + if not _wait_for_cdp_ready(): + msg = "Timed out waiting for Steam CDP port to become ready" + raise RuntimeError(msg) + logger.info("Steam CDP port ready.") + + if not _wait_for_collections_ready(): + msg = "Timed out waiting for Steam collections to initialise" + raise RuntimeError(msg) + logger.info("Steam collection store ready.") # ────────────────────────────────────────────────────────────── @@ -117,159 +213,79 @@ def _find_user_dirs() -> list[Path]: # ────────────────────────────────────────────────────────────── -def _get_apps_section( - vdf_data: dict[str, Any], -) -> dict[str, Any] | None: - """Navigate to the ``apps`` dict inside the VDF tree.""" - try: - steam_section = vdf_data["UserRoamingConfigStore"]["Software"]["Valve"]["Steam"] - if "apps" not in steam_section: - steam_section["apps"] = {} - except (KeyError, TypeError): - return None - else: - result: dict[str, Any] = steam_section["apps"] - return result - - -def _hide_games_in_profile( - config_path: Path, - user_dir: Path, - owned_app_ids: list[int], - allowed_app_id: int | None, -) -> int: - """Hide games in a single Steam user profile. - - Args: - config_path: Path to the sharedconfig.vdf file. - user_dir: Path to the user's data directory. - owned_app_ids: List of owned game app IDs. - allowed_app_id: App ID of the game that should remain visible. - - Returns: - Number of games hidden in this profile. - """ - # Back up the original. - backup = config_path.with_suffix(".vdf.bak") - if not backup.exists(): - shutil.copy2(config_path, backup) - - text = config_path.read_text(encoding="utf-8") - vdf_data = _parse_vdf(text) - apps = _get_apps_section(vdf_data) - if apps is None: - logger.warning("Could not find apps section in %s", config_path) - return 0 - - hidden_count = _apply_hide_flags(apps, owned_app_ids, allowed_app_id) - - output = _write_vdf(vdf_data) + "\n" - config_path.write_text(output, encoding="utf-8") - _fix_ownership(config_path, user_dir) - - logger.info("Hidden %d games in profile %s", hidden_count, user_dir.name) - return hidden_count - - -def _apply_hide_flags( - apps: dict[str, Any], - owned_app_ids: list[int], - allowed_app_id: int | None, -) -> int: - """Set hidden flags on all games except the allowed one. - - Args: - apps: The VDF apps section dict. - owned_app_ids: List of owned app IDs. - allowed_app_id: App ID to keep visible. - - Returns: - Number of games newly hidden. - """ - hidden_count = 0 - for app_id in owned_app_ids: - sid = str(app_id) - if app_id == allowed_app_id: - if sid in apps and isinstance(apps[sid], dict): - apps[sid].pop("hidden", None) - continue - - if sid not in apps or not isinstance(apps[sid], dict): - apps[sid] = {} - if apps[sid].get("hidden") != "1": - apps[sid]["hidden"] = "1" - hidden_count += 1 - return hidden_count - - def hide_other_games( owned_app_ids: list[int], allowed_app_id: int | None, ) -> int: """Hide every owned game except *allowed_app_id* in the Steam library. - Modifies ``sharedconfig.vdf`` for every local Steam user profile. - Steam must be restarted for changes to take effect. + Uses the Chrome DevTools Protocol to call + ``collectionStore.SetAppsAsHidden()`` in Steam's JS context. + Changes take effect immediately — no restart required. - Returns the number of games that were hidden. + Returns the number of games newly hidden. """ - user_dirs = _find_user_dirs() - if not user_dirs: - logger.warning("No Steam userdata directories found.") + ensure_steam_debug_port() + + hide_ids = sorted(aid for aid in owned_app_ids if aid != allowed_app_id) + if not hide_ids: return 0 - total_hidden = 0 + ids_json = json.dumps(hide_ids) + js = f""" + (() => {{ + const toHide = {ids_json}; + const already = new Set(); + const hidden = collectionStore.GetCollection('hidden'); + if (hidden && hidden.allApps) {{ + for (const app of hidden.allApps) already.add(app.appid); + }} + const newIds = toHide.filter(id => !already.has(id)); + if (newIds.length > 0) {{ + collectionStore.SetAppsAsHidden(newIds, true); + }} + // Unhide the allowed game if it was hidden. + const allowedId = {allowed_app_id if allowed_app_id is not None else 'null'}; + if (allowedId !== null && collectionStore.BIsHidden(allowedId)) {{ + collectionStore.SetAppsAsHidden([allowedId], false); + }} + return JSON.stringify({{ newlyHidden: newIds.length }}); + }})() + """ - for user_dir in user_dirs: - config_path = user_dir / _SHARED_CONFIG_REL - if not config_path.exists(): - logger.debug("No sharedconfig.vdf in %s", user_dir.name) - continue - - total_hidden += _hide_games_in_profile( - config_path, user_dir, owned_app_ids, allowed_app_id - ) - - return total_hidden + result = _evaluate_js(js) + value = _cdp_result_value(result) + parsed = json.loads(value) + count: int = parsed["newlyHidden"] + logger.info("Hidden %d new games via CDP.", count) + return count def unhide_all_games(owned_app_ids: list[int]) -> int: - """Remove the hidden flag from all owned games. + """Remove all games from the hidden collection. Returns the number of games that were unhidden. """ - user_dirs = _find_user_dirs() - total = 0 + ensure_steam_debug_port() - for user_dir in user_dirs: - config_path = user_dir / _SHARED_CONFIG_REL - if not config_path.exists(): - continue + json.dumps(sorted(owned_app_ids)) + js = """ + (() => { + const hidden = collectionStore.GetCollection('hidden'); + if (!hidden || !hidden.allApps) return JSON.stringify({ count: 0 }); + const hiddenIds = hidden.allApps.map(a => a.appid); + if (hiddenIds.length === 0) return JSON.stringify({ count: 0 }); + collectionStore.SetAppsAsHidden(hiddenIds, false); + return JSON.stringify({ count: hiddenIds.length }); + })() + """ - text = config_path.read_text(encoding="utf-8") - vdf_data = _parse_vdf(text) - apps = _get_apps_section(vdf_data) - if apps is None: - continue - - count = 0 - for app_id in owned_app_ids: - sid = str(app_id) - if sid in apps and isinstance(apps[sid], dict): - if apps[sid].pop("hidden", None) is not None: - count += 1 - # Remove the entry entirely if it's now empty. - if not apps[sid]: - del apps[sid] - - output = _write_vdf(vdf_data) + "\n" - config_path.write_text(output, encoding="utf-8") - _fix_ownership(config_path, user_dir) - - logger.info("Unhidden %d games in profile %s", count, user_dir.name) - total += count - - return total + result = _evaluate_js(js) + value = _cdp_result_value(result) + parsed = json.loads(value) + count: int = parsed["count"] + logger.info("Unhidden %d games via CDP.", count) + return count # ────────────────────────────────────────────────────────────── @@ -278,37 +294,15 @@ def unhide_all_games(owned_app_ids: list[int]) -> int: def restart_steam() -> None: - """Gracefully restart the Steam client. + """Gracefully restart the Steam client with CEF debugging enabled.""" + logger.info("Restarting Steam client with debug port...") + _shutdown_steam() + _launch_steam_with_debug() - Sends ``steam -shutdown``, waits, then launches again with ``-silent``. - """ - real_user = os.environ.get("SUDO_USER") or os.environ.get("USER") - logger.info("Restarting Steam client...") - - # Shut down Steam gracefully. - try: - _run_as_user(["steam", "-shutdown"], real_user) - except FileNotFoundError: - logger.warning("Steam executable not found for restart.") - return - - # Wait for Steam to exit. - import time - - _pgrep = shutil.which("pgrep") or "/usr/bin/pgrep" - for _ in range(30): - result = subprocess.run( - [_pgrep, "-f", "steam.sh"], - capture_output=True, - check=False, - ) - if result.returncode != 0: - break - time.sleep(1) - - # Relaunch silently. - with contextlib.suppress(FileNotFoundError): - _run_as_user(["steam", "-silent"], real_user) + if not _wait_for_cdp_ready(): + logger.warning("Steam restarted but CDP port not ready.") + else: + logger.info("Steam restarted with CDP port ready.") def _run_as_user(cmd: list[str], user: str | None) -> None: @@ -341,14 +335,3 @@ def _run_as_user(cmd: list[str], user: str | None) -> None: stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) - - -def _fix_ownership(path: Path, user_dir: Path) -> None: - """If running as root, chown the file to the user who owns user_dir.""" - if os.geteuid() != 0: - return - try: - stat = user_dir.stat() - os.chown(path, stat.st_uid, stat.st_gid) - except OSError: - pass diff --git a/steam_backlog_enforcer/main.py b/steam_backlog_enforcer/main.py index 12af073..c00b1c2 100644 --- a/steam_backlog_enforcer/main.py +++ b/steam_backlog_enforcer/main.py @@ -27,7 +27,6 @@ from python_pkg.steam_backlog_enforcer.enforcer import ( ) from python_pkg.steam_backlog_enforcer.hltb import ( fetch_hltb_times_cached, - get_hltb_submit_url, ) from python_pkg.steam_backlog_enforcer.library_hider import ( hide_other_games, @@ -755,8 +754,6 @@ def _enforce_hide_games(config: Config, state: State) -> None: hidden = hide_other_games(owned_ids, state.current_app_id) if hidden > 0: _echo(f" Library: hid {hidden} games (only assigned game visible)") - restart_steam() - _echo(" Steam restarted to apply library changes.") else: _echo(" Library: games already hidden") else: @@ -923,6 +920,32 @@ def cmd_unblock(_config: Config, _state: State) -> None: _echo("Failed to unblock. Run with sudo.") +def cmd_buy_dlc(config: Config, state: State) -> None: + """Temporarily unblock the store so the user can buy DLC.""" + if state.current_app_id is None: + _echo("No game currently assigned.") + return + + _echo(f"Current game: {state.current_game_name} (AppID={state.current_app_id})") + _echo("Unblocking Steam store for DLC purchase...") + + if not unblock_store(): + _echo("Failed to unblock store. Run with sudo.") + return + + _echo("\nStore UNBLOCKED — buy your DLC now.") + _echo("Press Enter when you're done to re-block the store...") + input() + + if config.block_store: + if block_store(): + _echo("Store re-blocked. Restarting Steam to clear DNS cache...") + restart_steam() + _echo("Done.") + else: + _echo("Warning: failed to re-block store.") + + def cmd_reset(config: Config, state: State) -> None: """Reset all state (unblock, unhide, clear assignment).""" unblock_store() @@ -934,7 +957,6 @@ def cmd_reset(config: Config, state: State) -> None: count = unhide_all_games(owned) if count: _echo(f"Unhidden {count} games.") - restart_steam() except Exception as exc: # noqa: BLE001 _echo(f"Warning: could not unhide games: {exc}") @@ -1043,8 +1065,6 @@ def cmd_hide(config: Config, state: State) -> None: _echo(f"Hidden {hidden} games.") if hidden > 0: - _echo("Restarting Steam to apply changes...") - restart_steam() _echo("Done! Only the assigned game should be visible in your library.") @@ -1060,49 +1080,18 @@ def cmd_unhide(config: Config, _state: State) -> None: _echo(f"Unhidden {count} games.") if count > 0: - _echo("Restarting Steam to apply changes...") - restart_steam() _echo("Done!") -def _open_hltb_submit_page( - game_name: str, - app_id: int, - snapshot_data: list[dict[str, Any]] | None, -) -> None: - """Show playtime and open the HLTB submit page in the browser.""" - playtime_minutes = 0 - if snapshot_data: - for entry in snapshot_data: - if entry.get("app_id") == app_id: - playtime_minutes = entry.get("playtime_minutes", 0) - break - - playtime_h = playtime_minutes / 60 - _echo(f"\n Steam playtime: {playtime_h:.1f} hours") - - _echo(" Looking up game on HowLongToBeat...") - submit_url = get_hltb_submit_url(game_name) - if submit_url: - _echo(f" HLTB submit page: {submit_url}") - _echo(" Opening in browser (log in & submit your time)...") - import webbrowser - - webbrowser.open(submit_url) - else: - _echo(" Could not find game on HLTB (submit manually).") - - def cmd_done(config: Config, state: State) -> None: - """Check completion, open HLTB submit, pick next game, uninstall & hide. + """Check completion, pick next game, uninstall & hide. All-in-one command for after finishing a game: 1. Verify 100% achievements on Steam. - 2. Show playtime and open HLTB submit page in browser. - 3. Pick the next game (shortest HLTB 100% time). - 4. Uninstall all non-assigned games. - 5. Hide all non-assigned games in the Steam library. - 6. Install the newly assigned game. + 2. Pick the next game (shortest HLTB 100% time). + 3. Uninstall all non-assigned games. + 4. Hide all non-assigned games in the Steam library. + 5. Install the newly assigned game. """ if state.current_app_id is None: _echo("No game currently assigned. Run 'scan' first.") @@ -1132,11 +1121,8 @@ def cmd_done(config: Config, state: State) -> None: _echo(f"\n COMPLETED: {game_name}!") state.finished_app_ids.append(app_id) - # ── Step 2: HLTB submit ── + # ── Step 2: Pick next game ── snapshot_data = load_snapshot() - _open_hltb_submit_page(game_name, app_id, snapshot_data) - - # ── Step 3: Pick next game ── _echo("\nPicking next game...") if not snapshot_data: _echo(" No snapshot found. Run 'scan' first.") @@ -1152,14 +1138,12 @@ def cmd_done(config: Config, state: State) -> None: _echo(" No more games to assign!") return - # ── Step 4: Hide non-assigned games in library ── + # ── Step 3: Hide non-assigned games in library ── owned_ids = _get_all_owned_app_ids(config) if owned_ids: hidden = hide_other_games(owned_ids, state.current_app_id) if hidden > 0: _echo(f"\n Library: hid {hidden} games") - restart_steam() - _echo(" Steam restarted to apply library changes.") send_notification( "Game Complete!", @@ -1179,6 +1163,7 @@ COMMANDS = { "hide": ("Hide all non-assigned games in library", cmd_hide), "unhide": ("Unhide all games in library", cmd_unhide), "unblock": ("Remove store blocking", cmd_unblock), + "buy-dlc": ("Temporarily unblock store to buy DLC", cmd_buy_dlc), "reset": ("Reset all state", cmd_reset), "installed": ("List installed games", cmd_installed), "uninstall": ("Uninstall all non-assigned games", cmd_uninstall), diff --git a/steam_backlog_enforcer/run.sh b/steam_backlog_enforcer/run.sh index b99c6b4..46d9efc 100755 --- a/steam_backlog_enforcer/run.sh +++ b/steam_backlog_enforcer/run.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash -# Quick launcher for the "done" workflow: -# check completion → open HLTB → pick next game → uninstall & hide others +# Launcher for the Steam Backlog Enforcer. +# Usage: ./run.sh [command] (defaults to "done" if no command given) set -euo pipefail cd "$(dirname "$0")/../.." -exec python -m python_pkg.steam_backlog_enforcer.main "done" +exec python -m python_pkg.steam_backlog_enforcer.main "${1:-done}" diff --git a/steam_backlog_enforcer/store_blocker.py b/steam_backlog_enforcer/store_blocker.py index 6fa0090..827321d 100644 --- a/steam_backlog_enforcer/store_blocker.py +++ b/steam_backlog_enforcer/store_blocker.py @@ -39,11 +39,30 @@ IPTABLES_CHAIN = "STEAM_ENFORCER" _SUDO = shutil.which("sudo") or "/usr/bin/sudo" _IPTABLES = shutil.which("iptables") or "/usr/sbin/iptables" _BASH = shutil.which("bash") or "/usr/bin/bash" +_CHATTR = shutil.which("chattr") or "/usr/bin/chattr" +_SYSTEMCTL = shutil.which("systemctl") or "/usr/bin/systemctl" +_UMOUNT = shutil.which("umount") or "/usr/bin/umount" +_MOUNT = shutil.which("mount") or "/usr/bin/mount" +_FINDMNT = shutil.which("findmnt") or "/usr/bin/findmnt" +_CP = shutil.which("cp") or "/usr/bin/cp" +_CHMOD = shutil.which("chmod") or "/usr/bin/chmod" +_TEE = shutil.which("tee") or "/usr/bin/tee" # IP address used in /etc/hosts for blocking domains. _HOSTS_REDIRECT_IP = ".".join(["0"] * 4) +def _sudo_write_hosts(content: str) -> None: + """Write *content* to /etc/hosts via ``sudo tee``.""" + subprocess.run( + [_SUDO, _TEE, str(HOSTS_FILE)], + input=content.encode(), + stdout=subprocess.DEVNULL, + timeout=10, + check=True, + ) + + def is_store_blocked() -> bool: """Check if Steam Store domains are blocked in /etc/hosts.""" try: @@ -66,10 +85,21 @@ def is_store_blocked() -> bool: def block_store() -> bool: - """Block Steam Store: run hosts install script + iptables fallback. + """Block Steam Store: uncomment hosts entries, or run install script. Returns True if at least one blocking method succeeded. """ + if is_store_blocked(): + logger.info("Steam Store already blocked in /etc/hosts.") + return True + + # Try quick re-block (uncomment lines) first. + if _reblock_hosts() and is_store_blocked(): + _block_store_iptables() + flush_dns_cache() + return True + + # Fall back to the full hosts install script. hosts_ok = _block_via_hosts_install() ipt_ok = _block_store_iptables() @@ -201,25 +231,17 @@ def _block_store_iptables() -> bool: def unblock_store() -> bool: - """Remove iptables-based Steam Store blocks. - - NOTE: /etc/hosts entries are NOT removed — the hosts install script's - protection mechanism intentionally makes removal difficult. Only - iptables rules are cleared. - """ + """Remove Steam Store blocks from both iptables and /etc/hosts.""" ipt_ok = _unblock_store_iptables() + hosts_ok = _unblock_hosts() flush_dns_cache() if not ipt_ok: logger.warning("Failed to remove iptables rules.") + if not hosts_ok: + logger.warning("Failed to remove /etc/hosts entries.") - logger.warning( - "Steam Store entries in /etc/hosts are protected and cannot be " - "removed programmatically. This is by design — you must manually " - "remove the immutable flag, bind mount, and edit the hosts install " - "script to unblock." - ) - return ipt_ok + return ipt_ok or hosts_ok def _unblock_store_iptables() -> bool: @@ -266,3 +288,144 @@ def flush_dns_cache() -> None: timeout=5, check=False, ) + + +# ────────────────────────────────────────────────────────────── +# /etc/hosts protection helpers +# ────────────────────────────────────────────────────────────── + +_GUARD_SERVICES = ("hosts-bind-mount.service", "hosts-guard.path") +_LOCKED_HOSTS_COPY = Path("/usr/local/share/locked-hosts") + + +def _disable_hosts_protection() -> None: + """Stop guard services, unmount bind mount, remove chattr flags.""" + for svc in _GUARD_SERVICES: + subprocess.run( + [_SUDO, _SYSTEMCTL, "stop", svc], + capture_output=True, + timeout=10, + check=False, + ) + + # Unmount bind mount if active. + result = subprocess.run( + [_FINDMNT, str(HOSTS_FILE)], + capture_output=True, + timeout=5, + check=False, + ) + if result.returncode == 0: + subprocess.run( + [_SUDO, _UMOUNT, str(HOSTS_FILE)], + capture_output=True, + timeout=5, + check=False, + ) + + # Remove immutable + append-only attributes. + subprocess.run( + [_SUDO, _CHATTR, "-i", "-a", str(HOSTS_FILE)], + capture_output=True, + timeout=5, + check=False, + ) + + +def _enable_hosts_protection() -> None: + """Re-apply chattr flags and restart guard services.""" + subprocess.run( + [_SUDO, _CHMOD, "644", str(HOSTS_FILE)], + capture_output=True, + timeout=5, + check=False, + ) + subprocess.run( + [_SUDO, _CHATTR, "+ia", str(HOSTS_FILE)], + capture_output=True, + timeout=5, + check=False, + ) + + # Update the canonical copy so the guard doesn't revert changes. + if _LOCKED_HOSTS_COPY.exists(): + subprocess.run( + [_SUDO, _CP, str(HOSTS_FILE), str(_LOCKED_HOSTS_COPY)], + capture_output=True, + timeout=5, + check=False, + ) + + for svc in _GUARD_SERVICES: + subprocess.run( + [_SUDO, _SYSTEMCTL, "start", svc], + capture_output=True, + timeout=10, + check=False, + ) + + +def _unblock_hosts() -> bool: + """Comment out Steam Store entries in /etc/hosts.""" + if not is_store_blocked(): + logger.info("Steam Store not blocked in /etc/hosts, nothing to do.") + return True + + try: + _disable_hosts_protection() + content = HOSTS_FILE.read_text(encoding="utf-8") + new_lines = [] + changed = False + for line in content.splitlines(keepends=True): + stripped = line.strip() + if ( + not stripped.startswith("#") + and stripped.startswith(_HOSTS_REDIRECT_IP) + and any(d in stripped for d in BLOCKED_DOMAINS) + ): + new_lines.append(f"# {line}" if line.endswith("\n") else f"# {line}\n") + changed = True + else: + new_lines.append(line) + + if changed: + _sudo_write_hosts("".join(new_lines)) + logger.info("Commented out Steam Store entries in /etc/hosts.") + + _enable_hosts_protection() + except OSError: + logger.exception("Failed to modify /etc/hosts") + return False + else: + return True + + +def _reblock_hosts() -> bool: + """Uncomment Steam Store entries in /etc/hosts.""" + try: + _disable_hosts_protection() + content = HOSTS_FILE.read_text(encoding="utf-8") + new_lines = [] + changed = False + for line in content.splitlines(keepends=True): + stripped = line.strip() + if stripped.startswith("# ") and any( + d in stripped for d in BLOCKED_DOMAINS + ): + # Remove the '# ' prefix. + uncommented = line.replace("# ", "", 1) + new_lines.append(uncommented) + changed = True + else: + new_lines.append(line) + + if changed: + _sudo_write_hosts("".join(new_lines)) + logger.info("Re-enabled Steam Store entries in /etc/hosts.") + + _enable_hosts_protection() + except OSError: + logger.exception("Failed to modify /etc/hosts") + return False + else: + return True