feat: robotgo improvements in phone focus bluetooth and printer scripts

This commit is contained in:
Krzysztof kuhy Rudnicki 2026-03-09 18:34:15 +01:00
parent b18a50d3a9
commit 2dc4a8d759
4 changed files with 454 additions and 323 deletions

View File

@ -1,115 +1,211 @@
"""Hide / unhide games in the Steam library via sharedconfig.vdf. """Hide / unhide games in the Steam library via Chrome DevTools Protocol.
Steam stores per-app settings (including the "hidden" flag) in Modern Steam clients (2023+) use an internal ``collectionStore`` JS
``userdata/<userid>/7/remote/sharedconfig.vdf`` under the path: object running inside the CEF (Chromium Embedded Framework) browser.
Game collections (including "hidden") are synced to Steam Cloud and
can only be reliably modified through this API.
UserRoamingConfigStore > Software > Valve > Steam > apps > <appid> This module connects to Steam's ``SharedJSContext`` page over CDP
(Chrome DevTools Protocol) on a local debug port and evaluates
JavaScript to call ``collectionStore.SetAppsAsHidden()``.
Setting ``"hidden" "1"`` makes the game invisible in the default Steam must be running with ``-cef-enable-debugging`` and
library view. This module provides functions to bulk-hide every owned ``-devtools-port=<PORT>`` for this to work. If it isn't, the module
game *except* the currently assigned one, and to unhide them all when will shut Steam down and relaunch it with the required flags.
enforcement is lifted.
Steam must be restarted (or not running) for the changes to take effect,
because it overwrites the file on exit.
""" """
from __future__ import annotations from __future__ import annotations
import contextlib import asyncio
import json
import logging import logging
import os import os
from pathlib import Path
import pwd import pwd
import re
import shutil import shutil
import subprocess import subprocess
from typing import Any import time
import urllib.request
import websockets
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Steam user-data paths. _CDP_PORT = 8080
_STEAM_DIR = Path.home() / ".local" / "share" / "Steam" _CDP_TIMEOUT = 30
_USERDATA_DIR = _STEAM_DIR / "userdata" _STEAM_STARTUP_WAIT = 45
_SHARED_CONFIG_REL = Path("7") / "remote" / "sharedconfig.vdf"
# ────────────────────────────────────────────────────────────── # ──────────────────────────────────────────────────────────────
# Minimal VDF parser / writer # CDP (Chrome DevTools Protocol) helpers
# ────────────────────────────────────────────────────────────── # ──────────────────────────────────────────────────────────────
def _parse_vdf(text: str) -> dict[str, Any]: def _get_shared_js_ws_url() -> str | None:
"""Parse a Valve VDF text file into nested dicts. """Query the CDP HTTP endpoint and return the SharedJSContext WS URL."""
url = f"http://127.0.0.1:{_CDP_PORT}/json"
try:
with urllib.request.urlopen(url, timeout=5) as resp: # noqa: S310
targets = json.loads(resp.read())
except (OSError, ValueError):
return None
Only handles the subset used by sharedconfig.vdf (string values and for target in targets:
nested sections). if target.get("title") == "SharedJSContext":
ws_url: str = target["webSocketDebuggerUrl"]
return ws_url
return None
async def _evaluate_js_async(ws_url: str, expression: str) -> dict:
"""Connect to a CDP WebSocket target and evaluate *expression*."""
async with websockets.connect(ws_url) as ws:
msg = json.dumps(
{
"id": 1,
"method": "Runtime.evaluate",
"params": {
"expression": expression,
"returnByValue": True,
"awaitPromise": True,
},
}
)
await ws.send(msg)
resp = await asyncio.wait_for(ws.recv(), timeout=_CDP_TIMEOUT)
return json.loads(resp)
def _evaluate_js(expression: str) -> dict:
"""Synchronous wrapper around :func:`_evaluate_js_async`."""
ws_url = _get_shared_js_ws_url()
if ws_url is None:
msg = "SharedJSContext not found on CDP port"
raise RuntimeError(msg)
return asyncio.run(_evaluate_js_async(ws_url, expression))
def _cdp_result_value(result: dict) -> str:
"""Extract the return value from a CDP Runtime.evaluate response."""
inner = result.get("result", {}).get("result", {})
if "exceptionDetails" in result.get("result", {}):
desc = inner.get("description", "Unknown JS error")
msg = f"JS evaluation error: {desc}"
raise RuntimeError(msg)
value: str = inner.get("value", "")
return value
# ──────────────────────────────────────────────────────────────
# Ensure Steam is running with devtools port
# ──────────────────────────────────────────────────────────────
def _is_steam_running() -> bool:
"""Check whether any Steam process is alive."""
pgrep = shutil.which("pgrep") or "/usr/bin/pgrep"
result = subprocess.run(
[pgrep, "-x", "steam"],
capture_output=True,
check=False,
)
return result.returncode == 0
def _steam_has_debug_port() -> bool:
"""Check whether steamwebhelper is listening on the CDP port."""
return _get_shared_js_ws_url() is not None
def _wait_for_cdp_ready() -> bool:
"""Wait up to *_STEAM_STARTUP_WAIT* seconds for CDP to become ready."""
for _ in range(_STEAM_STARTUP_WAIT):
if _get_shared_js_ws_url() is not None:
return True
time.sleep(1)
return False
def _wait_for_collections_ready() -> bool:
"""Wait until ``collectionStore`` is fully initialised.
Right after Steam starts, the CDP port may be open but the
internal collection data hasn't loaded yet. Poll a lightweight
JS check until ``GetCollection`` stops throwing.
""" """
tokens: list[str] = [] js = (
for m in re.finditer(r'"([^"]*)"|\{|\}', text): "(() => { try { collectionStore.GetCollection('hidden');"
if m.group(1) is not None: " return 'ok'; } catch(e) { return 'not_ready'; } })()"
tokens.append(m.group(1)) )
else: for _ in range(_STEAM_STARTUP_WAIT):
tokens.append(m.group(0)) # "{" or "}" try:
idx = 0 result = _evaluate_js(js)
if _cdp_result_value(result) == "ok":
def _parse_obj() -> dict[str, Any]: return True
nonlocal idx except RuntimeError:
obj: dict[str, Any] = {} pass
while idx < len(tokens): time.sleep(1)
token = tokens[idx] return False
if token == "}": # noqa: S105
idx += 1
return obj
# Key.
key = token
idx += 1
if idx >= len(tokens):
break
# Value: either a string or a nested object.
nxt = tokens[idx]
if nxt == "{":
idx += 1
obj[key] = _parse_obj()
elif nxt == "}":
# Key without value right before closing brace — skip.
obj[key] = ""
# Don't advance; let the outer loop consume '}'.
else:
obj[key] = nxt
idx += 1
return obj
return _parse_obj()
def _write_vdf(data: dict[str, Any], indent: int = 0) -> str: def _shutdown_steam() -> None:
"""Serialize a nested dict back to VDF text.""" """Send ``steam -shutdown`` and wait for the process to exit."""
lines: list[str] = [] real_user = os.environ.get("SUDO_USER") or os.environ.get("USER")
prefix = "\t" * indent try:
_run_as_user(["steam", "-shutdown"], real_user)
except FileNotFoundError:
return
for key, value in data.items(): pgrep = shutil.which("pgrep") or "/usr/bin/pgrep"
if isinstance(value, dict): for _ in range(30):
lines.append(f'{prefix}"{key}"') result = subprocess.run(
lines.append(f"{prefix}{{") [pgrep, "-x", "steam"],
lines.append(_write_vdf(value, indent + 1)) capture_output=True,
lines.append(f"{prefix}}}") check=False,
else: )
lines.append(f'{prefix}"{key}"\t\t"{value}"') if result.returncode != 0:
return
return "\n".join(lines) time.sleep(1)
# ────────────────────────────────────────────────────────────── def _launch_steam_with_debug() -> None:
# Discover Steam user IDs on this machine """Launch Steam with CEF debugging enabled."""
# ────────────────────────────────────────────────────────────── real_user = os.environ.get("SUDO_USER") or os.environ.get("USER")
_run_as_user(
[
"steam",
"-cef-enable-debugging",
f"-devtools-port={_CDP_PORT}",
"-silent",
],
real_user,
)
def _find_user_dirs() -> list[Path]: def ensure_steam_debug_port() -> None:
"""Return paths to all numeric userdata directories except '0'.""" """Make sure Steam is running with the CDP debug port open.
if not _USERDATA_DIR.is_dir():
return [] If Steam is running without the port, it is restarted.
return [p for p in _USERDATA_DIR.iterdir() if p.name.isdigit() and p.name != "0"] If Steam is not running, it is launched.
"""
if _steam_has_debug_port():
logger.debug("Steam CDP port already available.")
return
logger.info("Steam CDP port not available — (re)starting Steam...")
if _is_steam_running():
_shutdown_steam()
_launch_steam_with_debug()
if not _wait_for_cdp_ready():
msg = "Timed out waiting for Steam CDP port to become ready"
raise RuntimeError(msg)
logger.info("Steam CDP port ready.")
if not _wait_for_collections_ready():
msg = "Timed out waiting for Steam collections to initialise"
raise RuntimeError(msg)
logger.info("Steam collection store ready.")
# ────────────────────────────────────────────────────────────── # ──────────────────────────────────────────────────────────────
@ -117,159 +213,79 @@ def _find_user_dirs() -> list[Path]:
# ────────────────────────────────────────────────────────────── # ──────────────────────────────────────────────────────────────
def _get_apps_section(
vdf_data: dict[str, Any],
) -> dict[str, Any] | None:
"""Navigate to the ``apps`` dict inside the VDF tree."""
try:
steam_section = vdf_data["UserRoamingConfigStore"]["Software"]["Valve"]["Steam"]
if "apps" not in steam_section:
steam_section["apps"] = {}
except (KeyError, TypeError):
return None
else:
result: dict[str, Any] = steam_section["apps"]
return result
def _hide_games_in_profile(
config_path: Path,
user_dir: Path,
owned_app_ids: list[int],
allowed_app_id: int | None,
) -> int:
"""Hide games in a single Steam user profile.
Args:
config_path: Path to the sharedconfig.vdf file.
user_dir: Path to the user's data directory.
owned_app_ids: List of owned game app IDs.
allowed_app_id: App ID of the game that should remain visible.
Returns:
Number of games hidden in this profile.
"""
# Back up the original.
backup = config_path.with_suffix(".vdf.bak")
if not backup.exists():
shutil.copy2(config_path, backup)
text = config_path.read_text(encoding="utf-8")
vdf_data = _parse_vdf(text)
apps = _get_apps_section(vdf_data)
if apps is None:
logger.warning("Could not find apps section in %s", config_path)
return 0
hidden_count = _apply_hide_flags(apps, owned_app_ids, allowed_app_id)
output = _write_vdf(vdf_data) + "\n"
config_path.write_text(output, encoding="utf-8")
_fix_ownership(config_path, user_dir)
logger.info("Hidden %d games in profile %s", hidden_count, user_dir.name)
return hidden_count
def _apply_hide_flags(
apps: dict[str, Any],
owned_app_ids: list[int],
allowed_app_id: int | None,
) -> int:
"""Set hidden flags on all games except the allowed one.
Args:
apps: The VDF apps section dict.
owned_app_ids: List of owned app IDs.
allowed_app_id: App ID to keep visible.
Returns:
Number of games newly hidden.
"""
hidden_count = 0
for app_id in owned_app_ids:
sid = str(app_id)
if app_id == allowed_app_id:
if sid in apps and isinstance(apps[sid], dict):
apps[sid].pop("hidden", None)
continue
if sid not in apps or not isinstance(apps[sid], dict):
apps[sid] = {}
if apps[sid].get("hidden") != "1":
apps[sid]["hidden"] = "1"
hidden_count += 1
return hidden_count
def hide_other_games( def hide_other_games(
owned_app_ids: list[int], owned_app_ids: list[int],
allowed_app_id: int | None, allowed_app_id: int | None,
) -> int: ) -> int:
"""Hide every owned game except *allowed_app_id* in the Steam library. """Hide every owned game except *allowed_app_id* in the Steam library.
Modifies ``sharedconfig.vdf`` for every local Steam user profile. Uses the Chrome DevTools Protocol to call
Steam must be restarted for changes to take effect. ``collectionStore.SetAppsAsHidden()`` in Steam's JS context.
Changes take effect immediately no restart required.
Returns the number of games that were hidden. Returns the number of games newly hidden.
""" """
user_dirs = _find_user_dirs() ensure_steam_debug_port()
if not user_dirs:
logger.warning("No Steam userdata directories found.") hide_ids = sorted(aid for aid in owned_app_ids if aid != allowed_app_id)
if not hide_ids:
return 0 return 0
total_hidden = 0 ids_json = json.dumps(hide_ids)
js = f"""
(() => {{
const toHide = {ids_json};
const already = new Set();
const hidden = collectionStore.GetCollection('hidden');
if (hidden && hidden.allApps) {{
for (const app of hidden.allApps) already.add(app.appid);
}}
const newIds = toHide.filter(id => !already.has(id));
if (newIds.length > 0) {{
collectionStore.SetAppsAsHidden(newIds, true);
}}
// Unhide the allowed game if it was hidden.
const allowedId = {allowed_app_id if allowed_app_id is not None else 'null'};
if (allowedId !== null && collectionStore.BIsHidden(allowedId)) {{
collectionStore.SetAppsAsHidden([allowedId], false);
}}
return JSON.stringify({{ newlyHidden: newIds.length }});
}})()
"""
for user_dir in user_dirs: result = _evaluate_js(js)
config_path = user_dir / _SHARED_CONFIG_REL value = _cdp_result_value(result)
if not config_path.exists(): parsed = json.loads(value)
logger.debug("No sharedconfig.vdf in %s", user_dir.name) count: int = parsed["newlyHidden"]
continue logger.info("Hidden %d new games via CDP.", count)
return count
total_hidden += _hide_games_in_profile(
config_path, user_dir, owned_app_ids, allowed_app_id
)
return total_hidden
def unhide_all_games(owned_app_ids: list[int]) -> int: def unhide_all_games(owned_app_ids: list[int]) -> int:
"""Remove the hidden flag from all owned games. """Remove all games from the hidden collection.
Returns the number of games that were unhidden. Returns the number of games that were unhidden.
""" """
user_dirs = _find_user_dirs() ensure_steam_debug_port()
total = 0
for user_dir in user_dirs: json.dumps(sorted(owned_app_ids))
config_path = user_dir / _SHARED_CONFIG_REL js = """
if not config_path.exists(): (() => {
continue const hidden = collectionStore.GetCollection('hidden');
if (!hidden || !hidden.allApps) return JSON.stringify({ count: 0 });
const hiddenIds = hidden.allApps.map(a => a.appid);
if (hiddenIds.length === 0) return JSON.stringify({ count: 0 });
collectionStore.SetAppsAsHidden(hiddenIds, false);
return JSON.stringify({ count: hiddenIds.length });
})()
"""
text = config_path.read_text(encoding="utf-8") result = _evaluate_js(js)
vdf_data = _parse_vdf(text) value = _cdp_result_value(result)
apps = _get_apps_section(vdf_data) parsed = json.loads(value)
if apps is None: count: int = parsed["count"]
continue logger.info("Unhidden %d games via CDP.", count)
return count
count = 0
for app_id in owned_app_ids:
sid = str(app_id)
if sid in apps and isinstance(apps[sid], dict):
if apps[sid].pop("hidden", None) is not None:
count += 1
# Remove the entry entirely if it's now empty.
if not apps[sid]:
del apps[sid]
output = _write_vdf(vdf_data) + "\n"
config_path.write_text(output, encoding="utf-8")
_fix_ownership(config_path, user_dir)
logger.info("Unhidden %d games in profile %s", count, user_dir.name)
total += count
return total
# ────────────────────────────────────────────────────────────── # ──────────────────────────────────────────────────────────────
@ -278,37 +294,15 @@ def unhide_all_games(owned_app_ids: list[int]) -> int:
def restart_steam() -> None: def restart_steam() -> None:
"""Gracefully restart the Steam client. """Gracefully restart the Steam client with CEF debugging enabled."""
logger.info("Restarting Steam client with debug port...")
_shutdown_steam()
_launch_steam_with_debug()
Sends ``steam -shutdown``, waits, then launches again with ``-silent``. if not _wait_for_cdp_ready():
""" logger.warning("Steam restarted but CDP port not ready.")
real_user = os.environ.get("SUDO_USER") or os.environ.get("USER") else:
logger.info("Restarting Steam client...") logger.info("Steam restarted with CDP port ready.")
# Shut down Steam gracefully.
try:
_run_as_user(["steam", "-shutdown"], real_user)
except FileNotFoundError:
logger.warning("Steam executable not found for restart.")
return
# Wait for Steam to exit.
import time
_pgrep = shutil.which("pgrep") or "/usr/bin/pgrep"
for _ in range(30):
result = subprocess.run(
[_pgrep, "-f", "steam.sh"],
capture_output=True,
check=False,
)
if result.returncode != 0:
break
time.sleep(1)
# Relaunch silently.
with contextlib.suppress(FileNotFoundError):
_run_as_user(["steam", "-silent"], real_user)
def _run_as_user(cmd: list[str], user: str | None) -> None: def _run_as_user(cmd: list[str], user: str | None) -> None:
@ -341,14 +335,3 @@ def _run_as_user(cmd: list[str], user: str | None) -> None:
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
) )
def _fix_ownership(path: Path, user_dir: Path) -> None:
"""If running as root, chown the file to the user who owns user_dir."""
if os.geteuid() != 0:
return
try:
stat = user_dir.stat()
os.chown(path, stat.st_uid, stat.st_gid)
except OSError:
pass

View File

@ -27,7 +27,6 @@ from python_pkg.steam_backlog_enforcer.enforcer import (
) )
from python_pkg.steam_backlog_enforcer.hltb import ( from python_pkg.steam_backlog_enforcer.hltb import (
fetch_hltb_times_cached, fetch_hltb_times_cached,
get_hltb_submit_url,
) )
from python_pkg.steam_backlog_enforcer.library_hider import ( from python_pkg.steam_backlog_enforcer.library_hider import (
hide_other_games, hide_other_games,
@ -755,8 +754,6 @@ def _enforce_hide_games(config: Config, state: State) -> None:
hidden = hide_other_games(owned_ids, state.current_app_id) hidden = hide_other_games(owned_ids, state.current_app_id)
if hidden > 0: if hidden > 0:
_echo(f" Library: hid {hidden} games (only assigned game visible)") _echo(f" Library: hid {hidden} games (only assigned game visible)")
restart_steam()
_echo(" Steam restarted to apply library changes.")
else: else:
_echo(" Library: games already hidden") _echo(" Library: games already hidden")
else: else:
@ -923,6 +920,32 @@ def cmd_unblock(_config: Config, _state: State) -> None:
_echo("Failed to unblock. Run with sudo.") _echo("Failed to unblock. Run with sudo.")
def cmd_buy_dlc(config: Config, state: State) -> None:
"""Temporarily unblock the store so the user can buy DLC."""
if state.current_app_id is None:
_echo("No game currently assigned.")
return
_echo(f"Current game: {state.current_game_name} (AppID={state.current_app_id})")
_echo("Unblocking Steam store for DLC purchase...")
if not unblock_store():
_echo("Failed to unblock store. Run with sudo.")
return
_echo("\nStore UNBLOCKED — buy your DLC now.")
_echo("Press Enter when you're done to re-block the store...")
input()
if config.block_store:
if block_store():
_echo("Store re-blocked. Restarting Steam to clear DNS cache...")
restart_steam()
_echo("Done.")
else:
_echo("Warning: failed to re-block store.")
def cmd_reset(config: Config, state: State) -> None: def cmd_reset(config: Config, state: State) -> None:
"""Reset all state (unblock, unhide, clear assignment).""" """Reset all state (unblock, unhide, clear assignment)."""
unblock_store() unblock_store()
@ -934,7 +957,6 @@ def cmd_reset(config: Config, state: State) -> None:
count = unhide_all_games(owned) count = unhide_all_games(owned)
if count: if count:
_echo(f"Unhidden {count} games.") _echo(f"Unhidden {count} games.")
restart_steam()
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
_echo(f"Warning: could not unhide games: {exc}") _echo(f"Warning: could not unhide games: {exc}")
@ -1043,8 +1065,6 @@ def cmd_hide(config: Config, state: State) -> None:
_echo(f"Hidden {hidden} games.") _echo(f"Hidden {hidden} games.")
if hidden > 0: if hidden > 0:
_echo("Restarting Steam to apply changes...")
restart_steam()
_echo("Done! Only the assigned game should be visible in your library.") _echo("Done! Only the assigned game should be visible in your library.")
@ -1060,49 +1080,18 @@ def cmd_unhide(config: Config, _state: State) -> None:
_echo(f"Unhidden {count} games.") _echo(f"Unhidden {count} games.")
if count > 0: if count > 0:
_echo("Restarting Steam to apply changes...")
restart_steam()
_echo("Done!") _echo("Done!")
def _open_hltb_submit_page(
game_name: str,
app_id: int,
snapshot_data: list[dict[str, Any]] | None,
) -> None:
"""Show playtime and open the HLTB submit page in the browser."""
playtime_minutes = 0
if snapshot_data:
for entry in snapshot_data:
if entry.get("app_id") == app_id:
playtime_minutes = entry.get("playtime_minutes", 0)
break
playtime_h = playtime_minutes / 60
_echo(f"\n Steam playtime: {playtime_h:.1f} hours")
_echo(" Looking up game on HowLongToBeat...")
submit_url = get_hltb_submit_url(game_name)
if submit_url:
_echo(f" HLTB submit page: {submit_url}")
_echo(" Opening in browser (log in & submit your time)...")
import webbrowser
webbrowser.open(submit_url)
else:
_echo(" Could not find game on HLTB (submit manually).")
def cmd_done(config: Config, state: State) -> None: def cmd_done(config: Config, state: State) -> None:
"""Check completion, open HLTB submit, pick next game, uninstall & hide. """Check completion, pick next game, uninstall & hide.
All-in-one command for after finishing a game: All-in-one command for after finishing a game:
1. Verify 100% achievements on Steam. 1. Verify 100% achievements on Steam.
2. Show playtime and open HLTB submit page in browser. 2. Pick the next game (shortest HLTB 100% time).
3. Pick the next game (shortest HLTB 100% time). 3. Uninstall all non-assigned games.
4. Uninstall all non-assigned games. 4. Hide all non-assigned games in the Steam library.
5. Hide all non-assigned games in the Steam library. 5. Install the newly assigned game.
6. Install the newly assigned game.
""" """
if state.current_app_id is None: if state.current_app_id is None:
_echo("No game currently assigned. Run 'scan' first.") _echo("No game currently assigned. Run 'scan' first.")
@ -1132,11 +1121,8 @@ def cmd_done(config: Config, state: State) -> None:
_echo(f"\n COMPLETED: {game_name}!") _echo(f"\n COMPLETED: {game_name}!")
state.finished_app_ids.append(app_id) state.finished_app_ids.append(app_id)
# ── Step 2: HLTB submit ── # ── Step 2: Pick next game ──
snapshot_data = load_snapshot() snapshot_data = load_snapshot()
_open_hltb_submit_page(game_name, app_id, snapshot_data)
# ── Step 3: Pick next game ──
_echo("\nPicking next game...") _echo("\nPicking next game...")
if not snapshot_data: if not snapshot_data:
_echo(" No snapshot found. Run 'scan' first.") _echo(" No snapshot found. Run 'scan' first.")
@ -1152,14 +1138,12 @@ def cmd_done(config: Config, state: State) -> None:
_echo(" No more games to assign!") _echo(" No more games to assign!")
return return
# ── Step 4: Hide non-assigned games in library ── # ── Step 3: Hide non-assigned games in library ──
owned_ids = _get_all_owned_app_ids(config) owned_ids = _get_all_owned_app_ids(config)
if owned_ids: if owned_ids:
hidden = hide_other_games(owned_ids, state.current_app_id) hidden = hide_other_games(owned_ids, state.current_app_id)
if hidden > 0: if hidden > 0:
_echo(f"\n Library: hid {hidden} games") _echo(f"\n Library: hid {hidden} games")
restart_steam()
_echo(" Steam restarted to apply library changes.")
send_notification( send_notification(
"Game Complete!", "Game Complete!",
@ -1179,6 +1163,7 @@ COMMANDS = {
"hide": ("Hide all non-assigned games in library", cmd_hide), "hide": ("Hide all non-assigned games in library", cmd_hide),
"unhide": ("Unhide all games in library", cmd_unhide), "unhide": ("Unhide all games in library", cmd_unhide),
"unblock": ("Remove store blocking", cmd_unblock), "unblock": ("Remove store blocking", cmd_unblock),
"buy-dlc": ("Temporarily unblock store to buy DLC", cmd_buy_dlc),
"reset": ("Reset all state", cmd_reset), "reset": ("Reset all state", cmd_reset),
"installed": ("List installed games", cmd_installed), "installed": ("List installed games", cmd_installed),
"uninstall": ("Uninstall all non-assigned games", cmd_uninstall), "uninstall": ("Uninstall all non-assigned games", cmd_uninstall),

View File

@ -1,7 +1,7 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Quick launcher for the "done" workflow: # Launcher for the Steam Backlog Enforcer.
# check completion → open HLTB → pick next game → uninstall & hide others # Usage: ./run.sh [command] (defaults to "done" if no command given)
set -euo pipefail set -euo pipefail
cd "$(dirname "$0")/../.." cd "$(dirname "$0")/../.."
exec python -m python_pkg.steam_backlog_enforcer.main "done" exec python -m python_pkg.steam_backlog_enforcer.main "${1:-done}"

View File

@ -39,11 +39,30 @@ IPTABLES_CHAIN = "STEAM_ENFORCER"
_SUDO = shutil.which("sudo") or "/usr/bin/sudo" _SUDO = shutil.which("sudo") or "/usr/bin/sudo"
_IPTABLES = shutil.which("iptables") or "/usr/sbin/iptables" _IPTABLES = shutil.which("iptables") or "/usr/sbin/iptables"
_BASH = shutil.which("bash") or "/usr/bin/bash" _BASH = shutil.which("bash") or "/usr/bin/bash"
_CHATTR = shutil.which("chattr") or "/usr/bin/chattr"
_SYSTEMCTL = shutil.which("systemctl") or "/usr/bin/systemctl"
_UMOUNT = shutil.which("umount") or "/usr/bin/umount"
_MOUNT = shutil.which("mount") or "/usr/bin/mount"
_FINDMNT = shutil.which("findmnt") or "/usr/bin/findmnt"
_CP = shutil.which("cp") or "/usr/bin/cp"
_CHMOD = shutil.which("chmod") or "/usr/bin/chmod"
_TEE = shutil.which("tee") or "/usr/bin/tee"
# IP address used in /etc/hosts for blocking domains. # IP address used in /etc/hosts for blocking domains.
_HOSTS_REDIRECT_IP = ".".join(["0"] * 4) _HOSTS_REDIRECT_IP = ".".join(["0"] * 4)
def _sudo_write_hosts(content: str) -> None:
"""Write *content* to /etc/hosts via ``sudo tee``."""
subprocess.run(
[_SUDO, _TEE, str(HOSTS_FILE)],
input=content.encode(),
stdout=subprocess.DEVNULL,
timeout=10,
check=True,
)
def is_store_blocked() -> bool: def is_store_blocked() -> bool:
"""Check if Steam Store domains are blocked in /etc/hosts.""" """Check if Steam Store domains are blocked in /etc/hosts."""
try: try:
@ -66,10 +85,21 @@ def is_store_blocked() -> bool:
def block_store() -> bool: def block_store() -> bool:
"""Block Steam Store: run hosts install script + iptables fallback. """Block Steam Store: uncomment hosts entries, or run install script.
Returns True if at least one blocking method succeeded. Returns True if at least one blocking method succeeded.
""" """
if is_store_blocked():
logger.info("Steam Store already blocked in /etc/hosts.")
return True
# Try quick re-block (uncomment lines) first.
if _reblock_hosts() and is_store_blocked():
_block_store_iptables()
flush_dns_cache()
return True
# Fall back to the full hosts install script.
hosts_ok = _block_via_hosts_install() hosts_ok = _block_via_hosts_install()
ipt_ok = _block_store_iptables() ipt_ok = _block_store_iptables()
@ -201,25 +231,17 @@ def _block_store_iptables() -> bool:
def unblock_store() -> bool: def unblock_store() -> bool:
"""Remove iptables-based Steam Store blocks. """Remove Steam Store blocks from both iptables and /etc/hosts."""
NOTE: /etc/hosts entries are NOT removed the hosts install script's
protection mechanism intentionally makes removal difficult. Only
iptables rules are cleared.
"""
ipt_ok = _unblock_store_iptables() ipt_ok = _unblock_store_iptables()
hosts_ok = _unblock_hosts()
flush_dns_cache() flush_dns_cache()
if not ipt_ok: if not ipt_ok:
logger.warning("Failed to remove iptables rules.") logger.warning("Failed to remove iptables rules.")
if not hosts_ok:
logger.warning("Failed to remove /etc/hosts entries.")
logger.warning( return ipt_ok or hosts_ok
"Steam Store entries in /etc/hosts are protected and cannot be "
"removed programmatically. This is by design — you must manually "
"remove the immutable flag, bind mount, and edit the hosts install "
"script to unblock."
)
return ipt_ok
def _unblock_store_iptables() -> bool: def _unblock_store_iptables() -> bool:
@ -266,3 +288,144 @@ def flush_dns_cache() -> None:
timeout=5, timeout=5,
check=False, check=False,
) )
# ──────────────────────────────────────────────────────────────
# /etc/hosts protection helpers
# ──────────────────────────────────────────────────────────────
_GUARD_SERVICES = ("hosts-bind-mount.service", "hosts-guard.path")
_LOCKED_HOSTS_COPY = Path("/usr/local/share/locked-hosts")
def _disable_hosts_protection() -> None:
"""Stop guard services, unmount bind mount, remove chattr flags."""
for svc in _GUARD_SERVICES:
subprocess.run(
[_SUDO, _SYSTEMCTL, "stop", svc],
capture_output=True,
timeout=10,
check=False,
)
# Unmount bind mount if active.
result = subprocess.run(
[_FINDMNT, str(HOSTS_FILE)],
capture_output=True,
timeout=5,
check=False,
)
if result.returncode == 0:
subprocess.run(
[_SUDO, _UMOUNT, str(HOSTS_FILE)],
capture_output=True,
timeout=5,
check=False,
)
# Remove immutable + append-only attributes.
subprocess.run(
[_SUDO, _CHATTR, "-i", "-a", str(HOSTS_FILE)],
capture_output=True,
timeout=5,
check=False,
)
def _enable_hosts_protection() -> None:
"""Re-apply chattr flags and restart guard services."""
subprocess.run(
[_SUDO, _CHMOD, "644", str(HOSTS_FILE)],
capture_output=True,
timeout=5,
check=False,
)
subprocess.run(
[_SUDO, _CHATTR, "+ia", str(HOSTS_FILE)],
capture_output=True,
timeout=5,
check=False,
)
# Update the canonical copy so the guard doesn't revert changes.
if _LOCKED_HOSTS_COPY.exists():
subprocess.run(
[_SUDO, _CP, str(HOSTS_FILE), str(_LOCKED_HOSTS_COPY)],
capture_output=True,
timeout=5,
check=False,
)
for svc in _GUARD_SERVICES:
subprocess.run(
[_SUDO, _SYSTEMCTL, "start", svc],
capture_output=True,
timeout=10,
check=False,
)
def _unblock_hosts() -> bool:
"""Comment out Steam Store entries in /etc/hosts."""
if not is_store_blocked():
logger.info("Steam Store not blocked in /etc/hosts, nothing to do.")
return True
try:
_disable_hosts_protection()
content = HOSTS_FILE.read_text(encoding="utf-8")
new_lines = []
changed = False
for line in content.splitlines(keepends=True):
stripped = line.strip()
if (
not stripped.startswith("#")
and stripped.startswith(_HOSTS_REDIRECT_IP)
and any(d in stripped for d in BLOCKED_DOMAINS)
):
new_lines.append(f"# {line}" if line.endswith("\n") else f"# {line}\n")
changed = True
else:
new_lines.append(line)
if changed:
_sudo_write_hosts("".join(new_lines))
logger.info("Commented out Steam Store entries in /etc/hosts.")
_enable_hosts_protection()
except OSError:
logger.exception("Failed to modify /etc/hosts")
return False
else:
return True
def _reblock_hosts() -> bool:
"""Uncomment Steam Store entries in /etc/hosts."""
try:
_disable_hosts_protection()
content = HOSTS_FILE.read_text(encoding="utf-8")
new_lines = []
changed = False
for line in content.splitlines(keepends=True):
stripped = line.strip()
if stripped.startswith("# ") and any(
d in stripped for d in BLOCKED_DOMAINS
):
# Remove the '# ' prefix.
uncommented = line.replace("# ", "", 1)
new_lines.append(uncommented)
changed = True
else:
new_lines.append(line)
if changed:
_sudo_write_hosts("".join(new_lines))
logger.info("Re-enabled Steam Store entries in /etc/hosts.")
_enable_hosts_protection()
except OSError:
logger.exception("Failed to modify /etc/hosts")
return False
else:
return True