feat: robotgo improvements in phone focus bluetooth and printer scripts

This commit is contained in:
Krzysztof kuhy Rudnicki 2026-03-09 18:34:15 +01:00
parent f4b77e51dc
commit de98faafc9
17 changed files with 1015 additions and 392 deletions

View File

@ -42,6 +42,13 @@ cd C/lichess_random_engine && make
- `search.c/h` - Alpha-beta search with scoring
- `Makefile` - Build with `make`, clean with `make clean`
### Go Projects (`robotgo_demo/`)
- **robotgo_demo/** - Desktop automation using [go-vgo/robotgo](https://github.com/go-vgo/robotgo)
- `main.go` - Demo: mouse, keyboard, screen, clipboard, window control
- Requires X11 (Arch Linux), built with `go build -o robotgo_demo .`
- See `robotgo_demo/.github/copilot-instructions.md` for full API reference
### Cross-Language Integration
- Python `engine.py` → calls C binary via `subprocess.Popen`

View File

@ -12,9 +12,12 @@
# 7. Hung PipeWire/WirePlumber audio stack
# 8. Auto scan/pair/trust/connect when MAC is provided
# 9. SBC-XQ codec causing dropouts on older adapters
# 10. Stale HCI link state (link tx timeout) requiring btusb reload
# 11. A2DP ServicesResolved stuck at false after connect
# 12. PipeWire bluez audio card not appearing after connection
#
# Usage:
# ./fix_bluetooth.sh # Diagnose and fix all issues
# ./fix_bluetooth.sh # Diagnose and fix + connect JBL Charge 5
# ./fix_bluetooth.sh --interactive # Prompt before each fix
# ./fix_bluetooth.sh <MAC> # Fix + auto-connect to device
# ./fix_bluetooth.sh --interactive <MAC> # Both
@ -30,7 +33,7 @@ source "$SCRIPT_DIR/../lib/common.sh"
parse_interactive_args "$@"
shift "$COMMON_ARGS_SHIFT"
TARGET_MAC="${1:-}"
TARGET_MAC="${1:-F8:5C:7E:0E:50:6B}"
require_root "$@"
@ -225,6 +228,69 @@ _reload_btusb() {
modprobe -r btusb && sleep 1 && modprobe btusb && sleep 2
}
# ---------------------------------------------------------------------------
# Helper: check if A2DP services are resolved for a connected device
# Returns 0 if resolved, 1 otherwise.
# ---------------------------------------------------------------------------
_services_resolved() {
local mac="$1"
local dbus_path="/org/bluez/hci0/dev_${mac//:/_}"
local result
result=$(dbus-send --system --print-reply \
--dest=org.bluez "$dbus_path" \
org.freedesktop.DBus.Properties.Get \
string:"org.bluez.Device1" string:"ServicesResolved" 2>/dev/null || true)
echo "$result" | grep -q "boolean true"
}
# ---------------------------------------------------------------------------
# Helper: full reset cycle — btusb reload + service restarts + reconnect.
# Fixes stale HCI link state ("link tx timeout" / ServicesResolved stuck).
# ---------------------------------------------------------------------------
_full_adapter_reset_and_connect() {
local mac="$1"
log_info "Performing full adapter reset (btusb reload)..."
_btctl disconnect "$mac" >/dev/null 2>&1 || true
sleep 1
modprobe -r btusb && sleep 2 && modprobe btusb && sleep 5
systemctl restart bluetooth.service
sleep 3
_restart_pipewire_stack
sleep 3
log_info "Reconnecting to $mac after adapter reset..."
{ echo "power on"; sleep 1; echo "connect $mac"; sleep 20; } \
| bluetoothctl 2>/dev/null || true
}
# ---------------------------------------------------------------------------
# Helper: verify the Bluetooth audio sink appeared in PipeWire.
# ---------------------------------------------------------------------------
_verify_audio_sink() {
local mac="$1"
local card_name="bluez_card.${mac//:/_}"
if ! has_cmd pactl; then
return 0
fi
# Give PipeWire time to create the audio card
local _attempt
for _attempt in 1 2 3 4 5; do
if pactl list cards short 2>/dev/null | grep -q "$card_name"; then
log_ok "Bluetooth audio card detected in PipeWire."
return 0
fi
sleep 3
done
log_warn "Bluetooth audio card not found in PipeWire after connection."
return 1
}
# ==========================================================================
# 5. Remove stale pairing for target device (if specified)
# ==========================================================================
@ -415,9 +481,7 @@ connect_device() {
{ echo "power on"; sleep 1; echo "connect $TARGET_MAC"; sleep 15; } \
| bluetoothctl 2>/dev/null || true
info=$(_btctl info "$TARGET_MAC" || true)
if echo "$info" | grep -q "Connected: yes"; then
log_ok "Connected to $TARGET_MAC!"
if _check_connection_health "$TARGET_MAC"; then
return 0
fi
@ -458,16 +522,57 @@ connect_device() {
{ echo "power on"; sleep 1; echo "connect $TARGET_MAC"; sleep 15; } \
| bluetoothctl 2>/dev/null || true
# Verify connection
sleep 2
info=$(_btctl info "$TARGET_MAC" || true)
if echo "$info" | grep -q "Connected: yes"; then
log_ok "Successfully connected to $TARGET_MAC!"
else
# Verify connection + services + audio
if _check_connection_health "$TARGET_MAC"; then
return 0
fi
log_error "Connection to $TARGET_MAC failed."
log_info "Try putting the device in pairing mode and re-run."
return 1
}
# ---------------------------------------------------------------------------
# Helper: verify connection is fully healthy (connected + services + audio).
# If connected but services stuck, triggers full adapter reset + retry.
# ---------------------------------------------------------------------------
_check_connection_health() {
local mac="$1"
local info
sleep 2
info=$(_btctl info "$mac" || true)
# Not connected at all
if ! echo "$info" | grep -q "Connected: yes"; then
return 1
fi
# Connected — check if A2DP services resolved
local _attempt
for _attempt in 1 2 3; do
if _services_resolved "$mac"; then
log_ok "Connected to $mac with A2DP services resolved."
_verify_audio_sink "$mac" || true
return 0
fi
sleep 3
done
# Connected but ServicesResolved stuck at false — stale HCI link state.
log_warn "Connected but A2DP services not resolved (stale HCI link state)."
apply_fix "Full adapter reset to fix stale link" \
_full_adapter_reset_and_connect "$mac"
# Verify after reset
sleep 3
if _services_resolved "$mac"; then
log_ok "Connected to $mac with A2DP services resolved after reset."
_verify_audio_sink "$mac" || true
return 0
fi
return 1
}
# ==========================================================================

View File

@ -12,15 +12,16 @@ SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
. "$SCRIPT_DIR/config_secrets.sh"
# --- Radius in meters ---
export RADIUS=250
export RADIUS=150
# --- Hysteresis buffer in meters (prevents rapid toggling at boundary) ---
export HYSTERESIS=50
export HYSTERESIS=30
# --- Location check interval in seconds ---
# When focus mode is ON (at home): check frequently (phone can charge).
# When focus mode is ON (at home): check very frequently for near-instant
# detection of leaving home (phone is charging anyway).
# When focus mode is OFF (away): check less often to save battery.
export CHECK_INTERVAL_FOCUS=30
export CHECK_INTERVAL_FOCUS=10
export CHECK_INTERVAL_NORMAL=120
# --- Log file ---
@ -75,6 +76,31 @@ com.microsoft.office.outlook
com.google.android.gm
ch.protonmail.android
com.microsoft.teams
# --- Manga reader ---
eu.kanade.tachiyomi.sy
"
# ============================================================
# BLOCKED SYSTEM APPS
# System apps that should be disabled in focus mode.
# These are NOT covered by third-party package blocking.
# ============================================================
export BLOCKED_SYSTEM_APPS="
# --- Browsers ---
com.android.chrome
com.chrome.beta
com.chrome.dev
com.chrome.canary
com.sec.android.app.sbrowser
com.opera.browser
com.opera.mini.native
com.brave.browser
com.vivaldi.browser
com.microsoft.emmx
com.kiwibrowser.browser
com.duckduckgo.mobile.android
"
# --- System / essential packages that must NEVER be disabled ---

View File

@ -132,6 +132,22 @@ enable_focus_mode() {
done < "$tmp_pkgs"
rm -f "$tmp_pkgs"
# Also remove explicitly blocked system apps (e.g. browsers)
# Uses pm uninstall --user 0 so they vanish from Settings entirely
local blocked_sys="$STATE_DIR/blocked_sys.txt"
local uninstalled_sys="$STATE_DIR/uninstalled_sys.txt"
echo "$BLOCKED_SYSTEM_APPS" | grep -v '^[[:space:]]*#' | grep -v '^[[:space:]]*$' \
| sed 's/^[[:space:]]*//;s/[[:space:]]*$//' > "$blocked_sys"
: > "$uninstalled_sys"
while IFS= read -r pkg; do
[ -z "$pkg" ] && continue
# Try uninstall; even if already uninstalled, record it for re-install later
pm uninstall -k --user 0 "$pkg" >/dev/null 2>&1
echo "$pkg" >> "$uninstalled_sys"
echo "$pkg" >> "$DISABLED_APPS_FILE"
done < "$blocked_sys"
rm -f "$blocked_sys"
local count
count=$(wc -l < "$DISABLED_APPS_FILE" 2>/dev/null || echo 0)
CURRENT_MODE="focus"
@ -145,6 +161,15 @@ disable_focus_mode() {
local count=0
if [ -f "$DISABLED_APPS_FILE" ] && [ -s "$DISABLED_APPS_FILE" ]; then
# Re-install system apps that were uninstalled for user
if [ -f "$STATE_DIR/uninstalled_sys.txt" ] && [ -s "$STATE_DIR/uninstalled_sys.txt" ]; then
while IFS= read -r pkg; do
[ -z "$pkg" ] && continue
pm install-existing --user 0 "$pkg" >/dev/null 2>&1
done < "$STATE_DIR/uninstalled_sys.txt"
: > "$STATE_DIR/uninstalled_sys.txt"
fi
# Re-enable all disabled apps
while IFS= read -r pkg; do
[ -z "$pkg" ] && continue
pm enable "$pkg" >/dev/null 2>&1 && count=$((count + 1))

View File

@ -1,15 +1,17 @@
"""Check Brother laser printer consumable/maintenance status.
Supports both USB-connected and network printers on Arch Linux.
Requires root (sudo) for USB hardware queries and CUPS management.
USB: Queries via PJL over /dev/usb/lp* (requires root + usblp module).
Falls back to CUPS IPP status when usblp is unavailable (no root needed).
USB: Queries via PJL over /dev/usb/lp* (requires usblp module).
Falls back to USB port status query + CUPS IPP when usblp is unavailable.
Network: Queries via SNMP (requires net-snmp).
Usage:
sudo python3 -m brother_printer # auto-detect USB or network
sudo python3 -m brother_printer <printer_ip> # force network/SNMP mode
sudo python3 brother_printer.py # run directly
sudo python3 -m brother_printer --reset-toner # after replacing toner
sudo python3 -m brother_printer --reset-drum # after replacing drum
"""
from __future__ import annotations
@ -792,10 +794,7 @@ def _ensure_cups_running() -> bool:
"""Make sure CUPS is running, starting it if necessary."""
if _is_cups_scheduler_running():
return True
# CUPS not running — try to start it (needs root)
if os.geteuid() == 0:
return _start_cups()
return False
def _query_usb_via_cups() -> USBResult:
@ -827,8 +826,7 @@ def _query_usb_via_cups() -> USBResult:
reasons = ipp.get("printer-state-reasons", "none")
result.economode = _get_cups_economode(printer_name)
# Try direct USB hardware status query (requires root)
if os.geteuid() == 0:
# Direct USB hardware status query
port_status = _query_usb_port_status_raw()
if port_status is not None:
result.port_status = port_status
@ -858,7 +856,7 @@ def _query_usb_via_cups() -> USBResult:
result.online = "TRUE"
return result
# Non-root or pyusb unavailable: CUPS-only fallback
# pyusb unavailable: CUPS-only fallback
result.status_code = _map_cups_to_status_code(state, reasons)
result.display = ipp.get("printer-state-message", "")
result.online = "TRUE" if state.lower() in {"idle", "processing"} else "FALSE"
@ -1577,12 +1575,9 @@ def _display_cups_fallback_note(result: USBResult) -> None:
)
else:
_out(
f" {DIM}Note: Status obtained via CUPS only"
f" (run with sudo for direct hardware query).{RESET}"
)
_out(
f" {DIM}Detailed toner/drum levels are not available in this"
f" mode.{RESET}"
f" {DIM}Note: pyusb not available; status obtained via"
f" CUPS only. Detailed toner/drum levels are not"
f" available in this mode.{RESET}"
)
@ -1796,10 +1791,6 @@ def _run_network_mode(printer_ip: str) -> None:
def _run_usb_mode(usb_line: str) -> None:
"""Handle USB printer mode."""
_out(f"{CYAN}Found Brother printer on USB: {usb_line}{RESET}")
has_dev = find_usb_printer_dev() is not None
if has_dev and os.geteuid() != 0:
_out(f"{RED}Root access required for USB printer. Re-run with sudo.{RESET}")
sys.exit(1)
display_usb_results(query_usb_pjl())
@ -1827,6 +1818,14 @@ def main(argv: list[str] | None = None) -> None:
_reset_consumable("drum")
return
# Enforce root — needed for USB hardware queries and CUPS management
if os.geteuid() != 0:
_out(
f"{RED}Root access required. Re-run with sudo:{RESET}\n"
f" sudo python3 -m brother_printer {' '.join(args)}".rstrip(),
)
sys.exit(1)
printer_ip = args[0] if args else ""
if printer_ip:

View File

@ -3,14 +3,11 @@ set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# No pip dependencies — script only uses stdlib (+pyusb for fallback info)
# Requires root for USB access (PJL via usblp or port status via pyusb)
# Requires root for USB hardware queries and CUPS management
# Usage: ./run.sh # auto-detect
# ./run.sh <printer_ip> # network/SNMP mode
# Use sudo when a Brother printer is on USB (for /dev/usb/lp* or pyusb hw query)
if ls /dev/usb/lp* &>/dev/null || lsusb 2>/dev/null | grep -qi "04f9.*brother"; then
echo "Note: sudo may prompt for your password (required for USB printer access)."
sudo python3 "$SCRIPT_DIR/check_brother_printer.py" "$@"
else
python3 "$SCRIPT_DIR/check_brother_printer.py" "$@"
if [[ $EUID -ne 0 ]]; then
exec sudo python3 "$SCRIPT_DIR/check_brother_printer.py" "$@"
fi
exec python3 "$SCRIPT_DIR/check_brother_printer.py" "$@"

View File

@ -1,115 +1,211 @@
"""Hide / unhide games in the Steam library via sharedconfig.vdf.
"""Hide / unhide games in the Steam library via Chrome DevTools Protocol.
Steam stores per-app settings (including the "hidden" flag) in
``userdata/<userid>/7/remote/sharedconfig.vdf`` under the path:
Modern Steam clients (2023+) use an internal ``collectionStore`` JS
object running inside the CEF (Chromium Embedded Framework) browser.
Game collections (including "hidden") are synced to Steam Cloud and
can only be reliably modified through this API.
UserRoamingConfigStore > Software > Valve > Steam > apps > <appid>
This module connects to Steam's ``SharedJSContext`` page over CDP
(Chrome DevTools Protocol) on a local debug port and evaluates
JavaScript to call ``collectionStore.SetAppsAsHidden()``.
Setting ``"hidden" "1"`` makes the game invisible in the default
library view. This module provides functions to bulk-hide every owned
game *except* the currently assigned one, and to unhide them all when
enforcement is lifted.
Steam must be restarted (or not running) for the changes to take effect,
because it overwrites the file on exit.
Steam must be running with ``-cef-enable-debugging`` and
``-devtools-port=<PORT>`` for this to work. If it isn't, the module
will shut Steam down and relaunch it with the required flags.
"""
from __future__ import annotations
import contextlib
import asyncio
import json
import logging
import os
from pathlib import Path
import pwd
import re
import shutil
import subprocess
from typing import Any
import time
import urllib.request
import websockets
logger = logging.getLogger(__name__)
# Steam user-data paths.
_STEAM_DIR = Path.home() / ".local" / "share" / "Steam"
_USERDATA_DIR = _STEAM_DIR / "userdata"
_SHARED_CONFIG_REL = Path("7") / "remote" / "sharedconfig.vdf"
_CDP_PORT = 8080
_CDP_TIMEOUT = 30
_STEAM_STARTUP_WAIT = 45
# ──────────────────────────────────────────────────────────────
# Minimal VDF parser / writer
# CDP (Chrome DevTools Protocol) helpers
# ──────────────────────────────────────────────────────────────
def _parse_vdf(text: str) -> dict[str, Any]:
"""Parse a Valve VDF text file into nested dicts.
def _get_shared_js_ws_url() -> str | None:
"""Query the CDP HTTP endpoint and return the SharedJSContext WS URL."""
url = f"http://127.0.0.1:{_CDP_PORT}/json"
try:
with urllib.request.urlopen(url, timeout=5) as resp: # noqa: S310
targets = json.loads(resp.read())
except (OSError, ValueError):
return None
Only handles the subset used by sharedconfig.vdf (string values and
nested sections).
for target in targets:
if target.get("title") == "SharedJSContext":
ws_url: str = target["webSocketDebuggerUrl"]
return ws_url
return None
async def _evaluate_js_async(ws_url: str, expression: str) -> dict:
"""Connect to a CDP WebSocket target and evaluate *expression*."""
async with websockets.connect(ws_url) as ws:
msg = json.dumps(
{
"id": 1,
"method": "Runtime.evaluate",
"params": {
"expression": expression,
"returnByValue": True,
"awaitPromise": True,
},
}
)
await ws.send(msg)
resp = await asyncio.wait_for(ws.recv(), timeout=_CDP_TIMEOUT)
return json.loads(resp)
def _evaluate_js(expression: str) -> dict:
"""Synchronous wrapper around :func:`_evaluate_js_async`."""
ws_url = _get_shared_js_ws_url()
if ws_url is None:
msg = "SharedJSContext not found on CDP port"
raise RuntimeError(msg)
return asyncio.run(_evaluate_js_async(ws_url, expression))
def _cdp_result_value(result: dict) -> str:
"""Extract the return value from a CDP Runtime.evaluate response."""
inner = result.get("result", {}).get("result", {})
if "exceptionDetails" in result.get("result", {}):
desc = inner.get("description", "Unknown JS error")
msg = f"JS evaluation error: {desc}"
raise RuntimeError(msg)
value: str = inner.get("value", "")
return value
# ──────────────────────────────────────────────────────────────
# Ensure Steam is running with devtools port
# ──────────────────────────────────────────────────────────────
def _is_steam_running() -> bool:
"""Check whether any Steam process is alive."""
pgrep = shutil.which("pgrep") or "/usr/bin/pgrep"
result = subprocess.run(
[pgrep, "-x", "steam"],
capture_output=True,
check=False,
)
return result.returncode == 0
def _steam_has_debug_port() -> bool:
"""Check whether steamwebhelper is listening on the CDP port."""
return _get_shared_js_ws_url() is not None
def _wait_for_cdp_ready() -> bool:
"""Wait up to *_STEAM_STARTUP_WAIT* seconds for CDP to become ready."""
for _ in range(_STEAM_STARTUP_WAIT):
if _get_shared_js_ws_url() is not None:
return True
time.sleep(1)
return False
def _wait_for_collections_ready() -> bool:
"""Wait until ``collectionStore`` is fully initialised.
Right after Steam starts, the CDP port may be open but the
internal collection data hasn't loaded yet. Poll a lightweight
JS check until ``GetCollection`` stops throwing.
"""
tokens: list[str] = []
for m in re.finditer(r'"([^"]*)"|\{|\}', text):
if m.group(1) is not None:
tokens.append(m.group(1))
else:
tokens.append(m.group(0)) # "{" or "}"
idx = 0
def _parse_obj() -> dict[str, Any]:
nonlocal idx
obj: dict[str, Any] = {}
while idx < len(tokens):
token = tokens[idx]
if token == "}": # noqa: S105
idx += 1
return obj
# Key.
key = token
idx += 1
if idx >= len(tokens):
break
# Value: either a string or a nested object.
nxt = tokens[idx]
if nxt == "{":
idx += 1
obj[key] = _parse_obj()
elif nxt == "}":
# Key without value right before closing brace — skip.
obj[key] = ""
# Don't advance; let the outer loop consume '}'.
else:
obj[key] = nxt
idx += 1
return obj
return _parse_obj()
js = (
"(() => { try { collectionStore.GetCollection('hidden');"
" return 'ok'; } catch(e) { return 'not_ready'; } })()"
)
for _ in range(_STEAM_STARTUP_WAIT):
try:
result = _evaluate_js(js)
if _cdp_result_value(result) == "ok":
return True
except RuntimeError:
pass
time.sleep(1)
return False
def _write_vdf(data: dict[str, Any], indent: int = 0) -> str:
"""Serialize a nested dict back to VDF text."""
lines: list[str] = []
prefix = "\t" * indent
def _shutdown_steam() -> None:
"""Send ``steam -shutdown`` and wait for the process to exit."""
real_user = os.environ.get("SUDO_USER") or os.environ.get("USER")
try:
_run_as_user(["steam", "-shutdown"], real_user)
except FileNotFoundError:
return
for key, value in data.items():
if isinstance(value, dict):
lines.append(f'{prefix}"{key}"')
lines.append(f"{prefix}{{")
lines.append(_write_vdf(value, indent + 1))
lines.append(f"{prefix}}}")
else:
lines.append(f'{prefix}"{key}"\t\t"{value}"')
return "\n".join(lines)
pgrep = shutil.which("pgrep") or "/usr/bin/pgrep"
for _ in range(30):
result = subprocess.run(
[pgrep, "-x", "steam"],
capture_output=True,
check=False,
)
if result.returncode != 0:
return
time.sleep(1)
# ──────────────────────────────────────────────────────────────
# Discover Steam user IDs on this machine
# ──────────────────────────────────────────────────────────────
def _launch_steam_with_debug() -> None:
"""Launch Steam with CEF debugging enabled."""
real_user = os.environ.get("SUDO_USER") or os.environ.get("USER")
_run_as_user(
[
"steam",
"-cef-enable-debugging",
f"-devtools-port={_CDP_PORT}",
"-silent",
],
real_user,
)
def _find_user_dirs() -> list[Path]:
"""Return paths to all numeric userdata directories except '0'."""
if not _USERDATA_DIR.is_dir():
return []
return [p for p in _USERDATA_DIR.iterdir() if p.name.isdigit() and p.name != "0"]
def ensure_steam_debug_port() -> None:
"""Make sure Steam is running with the CDP debug port open.
If Steam is running without the port, it is restarted.
If Steam is not running, it is launched.
"""
if _steam_has_debug_port():
logger.debug("Steam CDP port already available.")
return
logger.info("Steam CDP port not available — (re)starting Steam...")
if _is_steam_running():
_shutdown_steam()
_launch_steam_with_debug()
if not _wait_for_cdp_ready():
msg = "Timed out waiting for Steam CDP port to become ready"
raise RuntimeError(msg)
logger.info("Steam CDP port ready.")
if not _wait_for_collections_ready():
msg = "Timed out waiting for Steam collections to initialise"
raise RuntimeError(msg)
logger.info("Steam collection store ready.")
# ──────────────────────────────────────────────────────────────
@ -117,159 +213,79 @@ def _find_user_dirs() -> list[Path]:
# ──────────────────────────────────────────────────────────────
def _get_apps_section(
vdf_data: dict[str, Any],
) -> dict[str, Any] | None:
"""Navigate to the ``apps`` dict inside the VDF tree."""
try:
steam_section = vdf_data["UserRoamingConfigStore"]["Software"]["Valve"]["Steam"]
if "apps" not in steam_section:
steam_section["apps"] = {}
except (KeyError, TypeError):
return None
else:
result: dict[str, Any] = steam_section["apps"]
return result
def _hide_games_in_profile(
config_path: Path,
user_dir: Path,
owned_app_ids: list[int],
allowed_app_id: int | None,
) -> int:
"""Hide games in a single Steam user profile.
Args:
config_path: Path to the sharedconfig.vdf file.
user_dir: Path to the user's data directory.
owned_app_ids: List of owned game app IDs.
allowed_app_id: App ID of the game that should remain visible.
Returns:
Number of games hidden in this profile.
"""
# Back up the original.
backup = config_path.with_suffix(".vdf.bak")
if not backup.exists():
shutil.copy2(config_path, backup)
text = config_path.read_text(encoding="utf-8")
vdf_data = _parse_vdf(text)
apps = _get_apps_section(vdf_data)
if apps is None:
logger.warning("Could not find apps section in %s", config_path)
return 0
hidden_count = _apply_hide_flags(apps, owned_app_ids, allowed_app_id)
output = _write_vdf(vdf_data) + "\n"
config_path.write_text(output, encoding="utf-8")
_fix_ownership(config_path, user_dir)
logger.info("Hidden %d games in profile %s", hidden_count, user_dir.name)
return hidden_count
def _apply_hide_flags(
apps: dict[str, Any],
owned_app_ids: list[int],
allowed_app_id: int | None,
) -> int:
"""Set hidden flags on all games except the allowed one.
Args:
apps: The VDF apps section dict.
owned_app_ids: List of owned app IDs.
allowed_app_id: App ID to keep visible.
Returns:
Number of games newly hidden.
"""
hidden_count = 0
for app_id in owned_app_ids:
sid = str(app_id)
if app_id == allowed_app_id:
if sid in apps and isinstance(apps[sid], dict):
apps[sid].pop("hidden", None)
continue
if sid not in apps or not isinstance(apps[sid], dict):
apps[sid] = {}
if apps[sid].get("hidden") != "1":
apps[sid]["hidden"] = "1"
hidden_count += 1
return hidden_count
def hide_other_games(
owned_app_ids: list[int],
allowed_app_id: int | None,
) -> int:
"""Hide every owned game except *allowed_app_id* in the Steam library.
Modifies ``sharedconfig.vdf`` for every local Steam user profile.
Steam must be restarted for changes to take effect.
Uses the Chrome DevTools Protocol to call
``collectionStore.SetAppsAsHidden()`` in Steam's JS context.
Changes take effect immediately no restart required.
Returns the number of games that were hidden.
Returns the number of games newly hidden.
"""
user_dirs = _find_user_dirs()
if not user_dirs:
logger.warning("No Steam userdata directories found.")
ensure_steam_debug_port()
hide_ids = sorted(aid for aid in owned_app_ids if aid != allowed_app_id)
if not hide_ids:
return 0
total_hidden = 0
ids_json = json.dumps(hide_ids)
js = f"""
(() => {{
const toHide = {ids_json};
const already = new Set();
const hidden = collectionStore.GetCollection('hidden');
if (hidden && hidden.allApps) {{
for (const app of hidden.allApps) already.add(app.appid);
}}
const newIds = toHide.filter(id => !already.has(id));
if (newIds.length > 0) {{
collectionStore.SetAppsAsHidden(newIds, true);
}}
// Unhide the allowed game if it was hidden.
const allowedId = {allowed_app_id if allowed_app_id is not None else 'null'};
if (allowedId !== null && collectionStore.BIsHidden(allowedId)) {{
collectionStore.SetAppsAsHidden([allowedId], false);
}}
return JSON.stringify({{ newlyHidden: newIds.length }});
}})()
"""
for user_dir in user_dirs:
config_path = user_dir / _SHARED_CONFIG_REL
if not config_path.exists():
logger.debug("No sharedconfig.vdf in %s", user_dir.name)
continue
total_hidden += _hide_games_in_profile(
config_path, user_dir, owned_app_ids, allowed_app_id
)
return total_hidden
result = _evaluate_js(js)
value = _cdp_result_value(result)
parsed = json.loads(value)
count: int = parsed["newlyHidden"]
logger.info("Hidden %d new games via CDP.", count)
return count
def unhide_all_games(owned_app_ids: list[int]) -> int:
"""Remove the hidden flag from all owned games.
"""Remove all games from the hidden collection.
Returns the number of games that were unhidden.
"""
user_dirs = _find_user_dirs()
total = 0
ensure_steam_debug_port()
for user_dir in user_dirs:
config_path = user_dir / _SHARED_CONFIG_REL
if not config_path.exists():
continue
json.dumps(sorted(owned_app_ids))
js = """
(() => {
const hidden = collectionStore.GetCollection('hidden');
if (!hidden || !hidden.allApps) return JSON.stringify({ count: 0 });
const hiddenIds = hidden.allApps.map(a => a.appid);
if (hiddenIds.length === 0) return JSON.stringify({ count: 0 });
collectionStore.SetAppsAsHidden(hiddenIds, false);
return JSON.stringify({ count: hiddenIds.length });
})()
"""
text = config_path.read_text(encoding="utf-8")
vdf_data = _parse_vdf(text)
apps = _get_apps_section(vdf_data)
if apps is None:
continue
count = 0
for app_id in owned_app_ids:
sid = str(app_id)
if sid in apps and isinstance(apps[sid], dict):
if apps[sid].pop("hidden", None) is not None:
count += 1
# Remove the entry entirely if it's now empty.
if not apps[sid]:
del apps[sid]
output = _write_vdf(vdf_data) + "\n"
config_path.write_text(output, encoding="utf-8")
_fix_ownership(config_path, user_dir)
logger.info("Unhidden %d games in profile %s", count, user_dir.name)
total += count
return total
result = _evaluate_js(js)
value = _cdp_result_value(result)
parsed = json.loads(value)
count: int = parsed["count"]
logger.info("Unhidden %d games via CDP.", count)
return count
# ──────────────────────────────────────────────────────────────
@ -278,37 +294,15 @@ def unhide_all_games(owned_app_ids: list[int]) -> int:
def restart_steam() -> None:
"""Gracefully restart the Steam client.
"""Gracefully restart the Steam client with CEF debugging enabled."""
logger.info("Restarting Steam client with debug port...")
_shutdown_steam()
_launch_steam_with_debug()
Sends ``steam -shutdown``, waits, then launches again with ``-silent``.
"""
real_user = os.environ.get("SUDO_USER") or os.environ.get("USER")
logger.info("Restarting Steam client...")
# Shut down Steam gracefully.
try:
_run_as_user(["steam", "-shutdown"], real_user)
except FileNotFoundError:
logger.warning("Steam executable not found for restart.")
return
# Wait for Steam to exit.
import time
_pgrep = shutil.which("pgrep") or "/usr/bin/pgrep"
for _ in range(30):
result = subprocess.run(
[_pgrep, "-f", "steam.sh"],
capture_output=True,
check=False,
)
if result.returncode != 0:
break
time.sleep(1)
# Relaunch silently.
with contextlib.suppress(FileNotFoundError):
_run_as_user(["steam", "-silent"], real_user)
if not _wait_for_cdp_ready():
logger.warning("Steam restarted but CDP port not ready.")
else:
logger.info("Steam restarted with CDP port ready.")
def _run_as_user(cmd: list[str], user: str | None) -> None:
@ -341,14 +335,3 @@ def _run_as_user(cmd: list[str], user: str | None) -> None:
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def _fix_ownership(path: Path, user_dir: Path) -> None:
"""If running as root, chown the file to the user who owns user_dir."""
if os.geteuid() != 0:
return
try:
stat = user_dir.stat()
os.chown(path, stat.st_uid, stat.st_gid)
except OSError:
pass

View File

@ -27,7 +27,6 @@ from python_pkg.steam_backlog_enforcer.enforcer import (
)
from python_pkg.steam_backlog_enforcer.hltb import (
fetch_hltb_times_cached,
get_hltb_submit_url,
)
from python_pkg.steam_backlog_enforcer.library_hider import (
hide_other_games,
@ -755,8 +754,6 @@ def _enforce_hide_games(config: Config, state: State) -> None:
hidden = hide_other_games(owned_ids, state.current_app_id)
if hidden > 0:
_echo(f" Library: hid {hidden} games (only assigned game visible)")
restart_steam()
_echo(" Steam restarted to apply library changes.")
else:
_echo(" Library: games already hidden")
else:
@ -923,6 +920,32 @@ def cmd_unblock(_config: Config, _state: State) -> None:
_echo("Failed to unblock. Run with sudo.")
def cmd_buy_dlc(config: Config, state: State) -> None:
"""Temporarily unblock the store so the user can buy DLC."""
if state.current_app_id is None:
_echo("No game currently assigned.")
return
_echo(f"Current game: {state.current_game_name} (AppID={state.current_app_id})")
_echo("Unblocking Steam store for DLC purchase...")
if not unblock_store():
_echo("Failed to unblock store. Run with sudo.")
return
_echo("\nStore UNBLOCKED — buy your DLC now.")
_echo("Press Enter when you're done to re-block the store...")
input()
if config.block_store:
if block_store():
_echo("Store re-blocked. Restarting Steam to clear DNS cache...")
restart_steam()
_echo("Done.")
else:
_echo("Warning: failed to re-block store.")
def cmd_reset(config: Config, state: State) -> None:
"""Reset all state (unblock, unhide, clear assignment)."""
unblock_store()
@ -934,7 +957,6 @@ def cmd_reset(config: Config, state: State) -> None:
count = unhide_all_games(owned)
if count:
_echo(f"Unhidden {count} games.")
restart_steam()
except Exception as exc: # noqa: BLE001
_echo(f"Warning: could not unhide games: {exc}")
@ -1043,8 +1065,6 @@ def cmd_hide(config: Config, state: State) -> None:
_echo(f"Hidden {hidden} games.")
if hidden > 0:
_echo("Restarting Steam to apply changes...")
restart_steam()
_echo("Done! Only the assigned game should be visible in your library.")
@ -1060,49 +1080,18 @@ def cmd_unhide(config: Config, _state: State) -> None:
_echo(f"Unhidden {count} games.")
if count > 0:
_echo("Restarting Steam to apply changes...")
restart_steam()
_echo("Done!")
def _open_hltb_submit_page(
game_name: str,
app_id: int,
snapshot_data: list[dict[str, Any]] | None,
) -> None:
"""Show playtime and open the HLTB submit page in the browser."""
playtime_minutes = 0
if snapshot_data:
for entry in snapshot_data:
if entry.get("app_id") == app_id:
playtime_minutes = entry.get("playtime_minutes", 0)
break
playtime_h = playtime_minutes / 60
_echo(f"\n Steam playtime: {playtime_h:.1f} hours")
_echo(" Looking up game on HowLongToBeat...")
submit_url = get_hltb_submit_url(game_name)
if submit_url:
_echo(f" HLTB submit page: {submit_url}")
_echo(" Opening in browser (log in & submit your time)...")
import webbrowser
webbrowser.open(submit_url)
else:
_echo(" Could not find game on HLTB (submit manually).")
def cmd_done(config: Config, state: State) -> None:
"""Check completion, open HLTB submit, pick next game, uninstall & hide.
"""Check completion, pick next game, uninstall & hide.
All-in-one command for after finishing a game:
1. Verify 100% achievements on Steam.
2. Show playtime and open HLTB submit page in browser.
3. Pick the next game (shortest HLTB 100% time).
4. Uninstall all non-assigned games.
5. Hide all non-assigned games in the Steam library.
6. Install the newly assigned game.
2. Pick the next game (shortest HLTB 100% time).
3. Uninstall all non-assigned games.
4. Hide all non-assigned games in the Steam library.
5. Install the newly assigned game.
"""
if state.current_app_id is None:
_echo("No game currently assigned. Run 'scan' first.")
@ -1132,11 +1121,8 @@ def cmd_done(config: Config, state: State) -> None:
_echo(f"\n COMPLETED: {game_name}!")
state.finished_app_ids.append(app_id)
# ── Step 2: HLTB submit ──
# ── Step 2: Pick next game ──
snapshot_data = load_snapshot()
_open_hltb_submit_page(game_name, app_id, snapshot_data)
# ── Step 3: Pick next game ──
_echo("\nPicking next game...")
if not snapshot_data:
_echo(" No snapshot found. Run 'scan' first.")
@ -1152,14 +1138,12 @@ def cmd_done(config: Config, state: State) -> None:
_echo(" No more games to assign!")
return
# ── Step 4: Hide non-assigned games in library ──
# ── Step 3: Hide non-assigned games in library ──
owned_ids = _get_all_owned_app_ids(config)
if owned_ids:
hidden = hide_other_games(owned_ids, state.current_app_id)
if hidden > 0:
_echo(f"\n Library: hid {hidden} games")
restart_steam()
_echo(" Steam restarted to apply library changes.")
send_notification(
"Game Complete!",
@ -1179,6 +1163,7 @@ COMMANDS = {
"hide": ("Hide all non-assigned games in library", cmd_hide),
"unhide": ("Unhide all games in library", cmd_unhide),
"unblock": ("Remove store blocking", cmd_unblock),
"buy-dlc": ("Temporarily unblock store to buy DLC", cmd_buy_dlc),
"reset": ("Reset all state", cmd_reset),
"installed": ("List installed games", cmd_installed),
"uninstall": ("Uninstall all non-assigned games", cmd_uninstall),

View File

@ -1,7 +1,7 @@
#!/usr/bin/env bash
# Quick launcher for the "done" workflow:
# check completion → open HLTB → pick next game → uninstall & hide others
# Launcher for the Steam Backlog Enforcer.
# Usage: ./run.sh [command] (defaults to "done" if no command given)
set -euo pipefail
cd "$(dirname "$0")/../.."
exec python -m python_pkg.steam_backlog_enforcer.main "done"
exec python -m python_pkg.steam_backlog_enforcer.main "${1:-done}"

View File

@ -39,11 +39,30 @@ IPTABLES_CHAIN = "STEAM_ENFORCER"
_SUDO = shutil.which("sudo") or "/usr/bin/sudo"
_IPTABLES = shutil.which("iptables") or "/usr/sbin/iptables"
_BASH = shutil.which("bash") or "/usr/bin/bash"
_CHATTR = shutil.which("chattr") or "/usr/bin/chattr"
_SYSTEMCTL = shutil.which("systemctl") or "/usr/bin/systemctl"
_UMOUNT = shutil.which("umount") or "/usr/bin/umount"
_MOUNT = shutil.which("mount") or "/usr/bin/mount"
_FINDMNT = shutil.which("findmnt") or "/usr/bin/findmnt"
_CP = shutil.which("cp") or "/usr/bin/cp"
_CHMOD = shutil.which("chmod") or "/usr/bin/chmod"
_TEE = shutil.which("tee") or "/usr/bin/tee"
# IP address used in /etc/hosts for blocking domains.
_HOSTS_REDIRECT_IP = ".".join(["0"] * 4)
def _sudo_write_hosts(content: str) -> None:
"""Write *content* to /etc/hosts via ``sudo tee``."""
subprocess.run(
[_SUDO, _TEE, str(HOSTS_FILE)],
input=content.encode(),
stdout=subprocess.DEVNULL,
timeout=10,
check=True,
)
def is_store_blocked() -> bool:
"""Check if Steam Store domains are blocked in /etc/hosts."""
try:
@ -66,10 +85,21 @@ def is_store_blocked() -> bool:
def block_store() -> bool:
"""Block Steam Store: run hosts install script + iptables fallback.
"""Block Steam Store: uncomment hosts entries, or run install script.
Returns True if at least one blocking method succeeded.
"""
if is_store_blocked():
logger.info("Steam Store already blocked in /etc/hosts.")
return True
# Try quick re-block (uncomment lines) first.
if _reblock_hosts() and is_store_blocked():
_block_store_iptables()
flush_dns_cache()
return True
# Fall back to the full hosts install script.
hosts_ok = _block_via_hosts_install()
ipt_ok = _block_store_iptables()
@ -201,25 +231,17 @@ def _block_store_iptables() -> bool:
def unblock_store() -> bool:
"""Remove iptables-based Steam Store blocks.
NOTE: /etc/hosts entries are NOT removed the hosts install script's
protection mechanism intentionally makes removal difficult. Only
iptables rules are cleared.
"""
"""Remove Steam Store blocks from both iptables and /etc/hosts."""
ipt_ok = _unblock_store_iptables()
hosts_ok = _unblock_hosts()
flush_dns_cache()
if not ipt_ok:
logger.warning("Failed to remove iptables rules.")
if not hosts_ok:
logger.warning("Failed to remove /etc/hosts entries.")
logger.warning(
"Steam Store entries in /etc/hosts are protected and cannot be "
"removed programmatically. This is by design — you must manually "
"remove the immutable flag, bind mount, and edit the hosts install "
"script to unblock."
)
return ipt_ok
return ipt_ok or hosts_ok
def _unblock_store_iptables() -> bool:
@ -266,3 +288,144 @@ def flush_dns_cache() -> None:
timeout=5,
check=False,
)
# ──────────────────────────────────────────────────────────────
# /etc/hosts protection helpers
# ──────────────────────────────────────────────────────────────
_GUARD_SERVICES = ("hosts-bind-mount.service", "hosts-guard.path")
_LOCKED_HOSTS_COPY = Path("/usr/local/share/locked-hosts")
def _disable_hosts_protection() -> None:
"""Stop guard services, unmount bind mount, remove chattr flags."""
for svc in _GUARD_SERVICES:
subprocess.run(
[_SUDO, _SYSTEMCTL, "stop", svc],
capture_output=True,
timeout=10,
check=False,
)
# Unmount bind mount if active.
result = subprocess.run(
[_FINDMNT, str(HOSTS_FILE)],
capture_output=True,
timeout=5,
check=False,
)
if result.returncode == 0:
subprocess.run(
[_SUDO, _UMOUNT, str(HOSTS_FILE)],
capture_output=True,
timeout=5,
check=False,
)
# Remove immutable + append-only attributes.
subprocess.run(
[_SUDO, _CHATTR, "-i", "-a", str(HOSTS_FILE)],
capture_output=True,
timeout=5,
check=False,
)
def _enable_hosts_protection() -> None:
"""Re-apply chattr flags and restart guard services."""
subprocess.run(
[_SUDO, _CHMOD, "644", str(HOSTS_FILE)],
capture_output=True,
timeout=5,
check=False,
)
subprocess.run(
[_SUDO, _CHATTR, "+ia", str(HOSTS_FILE)],
capture_output=True,
timeout=5,
check=False,
)
# Update the canonical copy so the guard doesn't revert changes.
if _LOCKED_HOSTS_COPY.exists():
subprocess.run(
[_SUDO, _CP, str(HOSTS_FILE), str(_LOCKED_HOSTS_COPY)],
capture_output=True,
timeout=5,
check=False,
)
for svc in _GUARD_SERVICES:
subprocess.run(
[_SUDO, _SYSTEMCTL, "start", svc],
capture_output=True,
timeout=10,
check=False,
)
def _unblock_hosts() -> bool:
"""Comment out Steam Store entries in /etc/hosts."""
if not is_store_blocked():
logger.info("Steam Store not blocked in /etc/hosts, nothing to do.")
return True
try:
_disable_hosts_protection()
content = HOSTS_FILE.read_text(encoding="utf-8")
new_lines = []
changed = False
for line in content.splitlines(keepends=True):
stripped = line.strip()
if (
not stripped.startswith("#")
and stripped.startswith(_HOSTS_REDIRECT_IP)
and any(d in stripped for d in BLOCKED_DOMAINS)
):
new_lines.append(f"# {line}" if line.endswith("\n") else f"# {line}\n")
changed = True
else:
new_lines.append(line)
if changed:
_sudo_write_hosts("".join(new_lines))
logger.info("Commented out Steam Store entries in /etc/hosts.")
_enable_hosts_protection()
except OSError:
logger.exception("Failed to modify /etc/hosts")
return False
else:
return True
def _reblock_hosts() -> bool:
"""Uncomment Steam Store entries in /etc/hosts."""
try:
_disable_hosts_protection()
content = HOSTS_FILE.read_text(encoding="utf-8")
new_lines = []
changed = False
for line in content.splitlines(keepends=True):
stripped = line.strip()
if stripped.startswith("# ") and any(
d in stripped for d in BLOCKED_DOMAINS
):
# Remove the '# ' prefix.
uncommented = line.replace("# ", "", 1)
new_lines.append(uncommented)
changed = True
else:
new_lines.append(line)
if changed:
_sudo_write_hosts("".join(new_lines))
logger.info("Re-enabled Steam Store entries in /etc/hosts.")
_enable_hosts_protection()
except OSError:
logger.exception("Failed to modify /etc/hosts")
return False
else:
return True

View File

@ -15,3 +15,4 @@ pytest>=7.0
python-chess>=1.999
requests>=2.0
selenium>=4.0
websockets>=13.0

View File

@ -0,0 +1,143 @@
# Copilot Instructions for robotgo_demo
## Overview
`robotgo_demo/` is a Go project using [go-vgo/robotgo](https://github.com/go-vgo/robotgo) for desktop automation on **Arch Linux, X11, i3 window manager**. Robotgo provides cross-platform control of mouse, keyboard, screen, clipboard, and window management.
## System Requirements (Arch Linux / X11)
All dependencies must be installed before building:
```bash
sudo pacman -S --needed gcc go libxtst libx11 libxkbcommon libxkbcommon-x11 \
libpng xsel xclip libxcb
```
These provide:
- **GCC + Go** — compiler toolchain (robotgo uses cgo)
- **libxtst, libx11, libxcb** — X11 display and XTest extension
- **libxkbcommon, libxkbcommon-x11** — keyboard hook support (gohook)
- **libpng** — bitmap/screenshot capture
- **xsel, xclip** — clipboard read/write
## Build & Run
```bash
cd robotgo_demo
go build -o robotgo_demo .
./robotgo_demo
```
To update the dependency:
```bash
go get -u github.com/go-vgo/robotgo
go mod tidy
```
## Robotgo API Quick Reference
Use `import "github.com/go-vgo/robotgo"` in Go files. Key APIs:
### Mouse
```go
robotgo.Move(x, y) // instant move
robotgo.MoveSmooth(x, y) // human-like smooth move
robotgo.Click("left") // click (left/right/center)
robotgo.DragSmooth(x, y) // drag to position
robotgo.ScrollDir(n, "up") // scroll direction
x, y := robotgo.Location() // current mouse position
```
### Keyboard
```go
robotgo.Type("text") // type a string
robotgo.KeyTap("enter") // tap a single key
robotgo.KeyTap("a", "ctrl") // key combo (ctrl+a)
robotgo.KeyToggle("shift") // hold key down
robotgo.KeyToggle("shift", "up") // release key
```
### Screen
```go
sx, sy := robotgo.GetScreenSize() // screen dimensions
color := robotgo.GetPixelColor(x, y) // hex color at pixel
bit := robotgo.CaptureScreen(x, y, w, h) // capture region
defer robotgo.FreeBitmap(bit) // always free bitmaps
img := robotgo.ToImage(bit) // convert to Go image
```
### Clipboard
```go
robotgo.WriteAll("text") // write to clipboard
text, err := robotgo.ReadAll() // read from clipboard
```
### Window
```go
title := robotgo.GetTitle() // active window title
pids, err := robotgo.FindIds("firefox") // find PIDs by name
robotgo.ActivePid(pid) // focus window by PID
robotgo.ActiveName("firefox") // focus window by name
```
### Event Hooks (via gohook)
```go
import hook "github.com/robotn/gohook"
hook.Register(hook.KeyDown, []string{"q", "ctrl"}, func(e hook.Event) {
hook.End()
})
s := hook.Start()
<-hook.Process(s)
```
### Timing
```go
robotgo.Sleep(1) // sleep 1 second
robotgo.MilliSleep(500) // sleep 500ms
robotgo.MouseSleep = 300 // default delay between mouse ops
robotgo.KeySleep = 100 // default delay between key ops
```
## Key Docs & Links
- [Full API docs](https://github.com/go-vgo/robotgo/blob/master/docs/doc.md)
- [Key name reference](https://github.com/go-vgo/robotgo/blob/master/docs/keys.md)
- [Examples](https://github.com/go-vgo/robotgo/blob/master/examples)
- [GoDoc](https://pkg.go.dev/github.com/go-vgo/robotgo)
## Conventions
- Always `defer robotgo.FreeBitmap(bit)` after `CaptureScreen` to avoid memory leaks.
- Use `MoveSmooth` over `Move` when simulating human interaction.
- Add `time.Sleep` or `robotgo.MilliSleep` before typing to give the user time to focus the target window.
- Robotgo operates on X11 only — this project does **not** support Wayland.
- The compiled binary is git-ignored; always build from source.
## Gotchas
- **cgo required**: robotgo uses C bindings. `CGO_ENABLED=1` must be set (it's the default on native builds).
- **X11 only**: will not work under Wayland. Ensure `$DISPLAY` is set.
- **Root not needed**: runs as a regular user with X11 access.
- **Bitmap capture** may fail if no X display is available (e.g., headless/SSH without X forwarding).
- **`png.h: No such file or directory`**: install `libpng` (`sudo pacman -S libpng`).
## Project Structure
```
robotgo_demo/
├── .gitignore # ignores compiled binary
├── go.mod # Go module (robotgo_demo)
├── go.sum # dependency checksums
├── main.go # demo: mouse, keyboard, screen, clipboard, window
└── README.md # user-facing docs
```

1
robotgo_demo/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
robotgo_demo

31
robotgo_demo/README.md Normal file
View File

@ -0,0 +1,31 @@
# robotgo_demo
A simple demo of [go-vgo/robotgo](https://github.com/go-vgo/robotgo) — desktop
automation for Go (mouse, keyboard, screen, clipboard, window management).
## Requirements (Arch Linux, X11)
```bash
sudo pacman -S --needed gcc go libxtst libx11 libxkbcommon libxkbcommon-x11 \
libpng xsel xclip libxcb
```
## Build & Run
```bash
cd robotgo_demo
go build -o robotgo_demo .
./robotgo_demo
```
**Note:** The program types text after a 2-second delay — focus a text editor
before running.
## What it does
1. Prints screen resolution
2. Reads and moves the mouse to screen center
3. Reads the pixel color at center
4. Writes/reads the clipboard
5. Types "Hello World!" into the focused window
6. Prints the active window title

32
robotgo_demo/go.mod Normal file
View File

@ -0,0 +1,32 @@
module robotgo_demo
go 1.26.1
require github.com/go-vgo/robotgo v1.0.1
require (
github.com/dblohm7/wingoes v0.0.0-20250822163801-6d8e6105c62d // indirect
github.com/ebitengine/purego v0.9.1 // indirect
github.com/gen2brain/shm v0.1.1 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/godbus/dbus/v5 v5.2.0 // indirect
github.com/jezek/xgb v1.2.0 // indirect
github.com/lufia/plan9stats v0.0.0-20251013123823-9fd1530e3ec3 // indirect
github.com/otiai10/gosseract/v2 v2.4.1 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/robotn/xgb v0.10.0 // indirect
github.com/robotn/xgbutil v0.10.0 // indirect
github.com/shirou/gopsutil/v4 v4.25.10 // indirect
github.com/tailscale/win v0.0.0-20250627215312-f4da2b8ee071 // indirect
github.com/tklauser/go-sysconf v0.3.16 // indirect
github.com/tklauser/numcpus v0.11.0 // indirect
github.com/vcaesar/gops v0.41.0 // indirect
github.com/vcaesar/imgo v0.41.0 // indirect
github.com/vcaesar/keycode v0.10.1 // indirect
github.com/vcaesar/screenshot v0.11.1 // indirect
github.com/vcaesar/tt v0.20.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 // indirect
golang.org/x/image v0.33.0 // indirect
golang.org/x/sys v0.38.0 // indirect
)

73
robotgo_demo/go.sum Normal file
View File

@ -0,0 +1,73 @@
github.com/BurntSushi/freetype-go v0.0.0-20160129220410-b763ddbfe298/go.mod h1:D+QujdIlUNfa0igpNMk6UIvlb6C252URs4yupRUV4lQ=
github.com/BurntSushi/graphics-go v0.0.0-20160129215708-b43f31a4a966/go.mod h1:Mid70uvE93zn9wgF92A/r5ixgnvX8Lh68fxp9KQBaI0=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dblohm7/wingoes v0.0.0-20250822163801-6d8e6105c62d h1:QRKpU+9ZBDs62LyBfwhZkJdB5DJX2Sm3p4kUh7l1aA0=
github.com/dblohm7/wingoes v0.0.0-20250822163801-6d8e6105c62d/go.mod h1:SUxUaAK/0UG5lYyZR1L1nC4AaYYvSSYTWQSH3FPcxKU=
github.com/ebitengine/purego v0.9.1 h1:a/k2f2HQU3Pi399RPW1MOaZyhKJL9w/xFpKAg4q1s0A=
github.com/ebitengine/purego v0.9.1/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
github.com/gen2brain/shm v0.1.1 h1:1cTVA5qcsUFixnDHl14TmRoxgfWEEZlTezpUj1vm5uQ=
github.com/gen2brain/shm v0.1.1/go.mod h1:UgIcVtvmOu+aCJpqJX7GOtiN7X2ct+TKLg4RTxwPIUA=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
github.com/go-vgo/robotgo v1.0.1 h1:4dS+dXSMPRt+VmvG4QZPlH9BNG9Jfywq4q0YjSiFN0A=
github.com/go-vgo/robotgo v1.0.1/go.mod h1:NcSL/tqNqkpWJ3rmT6YSDUVhQKZwyRsaanDMO4qkT5I=
github.com/godbus/dbus/v5 v5.2.0 h1:3WexO+U+yg9T70v9FdHr9kCxYlazaAXUhx2VMkbfax8=
github.com/godbus/dbus/v5 v5.2.0/go.mod h1:3AAv2+hPq5rdnr5txxxRwiGjPXamgoIHgz9FPBfOp3c=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/jezek/xgb v1.2.0 h1:LzgkD11wOrPnxXEqo588cnjUt4NwMHrFh/tgajo50Q0=
github.com/jezek/xgb v1.2.0/go.mod h1:nrhwO0FX/enq75I7Y7G8iN1ubpSGZEiA3v9e9GyRFlk=
github.com/lufia/plan9stats v0.0.0-20251013123823-9fd1530e3ec3 h1:PwQumkgq4/acIiZhtifTV5OUqqiP82UAl0h87xj/l9k=
github.com/lufia/plan9stats v0.0.0-20251013123823-9fd1530e3ec3/go.mod h1:autxFIvghDt3jPTLoqZ9OZ7s9qTGNAWmYCjVFWPX/zg=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
github.com/otiai10/gosseract/v2 v2.4.1 h1:G8AyBpXEeSlcq8TI85LH/pM5SXk8Djy2GEXisgyblRw=
github.com/otiai10/gosseract/v2 v2.4.1/go.mod h1:1gNWP4Hgr2o7yqWfs6r5bZxAatjOIdqWxJLWsTsembk=
github.com/otiai10/mint v1.6.3 h1:87qsV/aw1F5as1eH1zS/yqHY85ANKVMgkDrf9rcxbQs=
github.com/otiai10/mint v1.6.3/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU=
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/robotn/xgb v0.0.0-20190912153532-2cb92d044934/go.mod h1:SxQhJskUJ4rleVU44YvnrdvxQr0tKy5SRSigBrCgyyQ=
github.com/robotn/xgb v0.10.0 h1:O3kFbIwtwZ3pgLbp1h5slCQ4OpY8BdwugJLrUe6GPIM=
github.com/robotn/xgb v0.10.0/go.mod h1:SxQhJskUJ4rleVU44YvnrdvxQr0tKy5SRSigBrCgyyQ=
github.com/robotn/xgbutil v0.10.0 h1:gvf7mGQqCWQ68aHRtCxgdewRk+/KAJui6l3MJQQRCKw=
github.com/robotn/xgbutil v0.10.0/go.mod h1:svkDXUDQjUiWzLrA0OZgHc4lbOts3C+uRfP6/yjwYnU=
github.com/shirou/gopsutil/v4 v4.25.10 h1:at8lk/5T1OgtuCp+AwrDofFRjnvosn0nkN2OLQ6g8tA=
github.com/shirou/gopsutil/v4 v4.25.10/go.mod h1:+kSwyC8DRUD9XXEHCAFjK+0nuArFJM0lva+StQAcskM=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tailscale/win v0.0.0-20250627215312-f4da2b8ee071 h1:qo7kOhoN5DHioXNlFytBzIoA5glW6lsb8YqV0lP3IyE=
github.com/tailscale/win v0.0.0-20250627215312-f4da2b8ee071/go.mod h1:aMd4yDHLjbOuYP6fMxj1d9ACDQlSWwYztcpybGHCQc8=
github.com/tc-hib/winres v0.2.1 h1:YDE0FiP0VmtRaDn7+aaChp1KiF4owBiJa5l964l5ujA=
github.com/tc-hib/winres v0.2.1/go.mod h1:C/JaNhH3KBvhNKVbvdlDWkbMDO9H4fKKDaN7/07SSuk=
github.com/tklauser/go-sysconf v0.3.16 h1:frioLaCQSsF5Cy1jgRBrzr6t502KIIwQ0MArYICU0nA=
github.com/tklauser/go-sysconf v0.3.16/go.mod h1:/qNL9xxDhc7tx3HSRsLWNnuzbVfh3e7gh/BmM179nYI=
github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9RXw=
github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ=
github.com/vcaesar/gops v0.41.0 h1:FG748Jyw3FOuZnbzSgB+CQSx2e5LbLCPWV2JU1brFdc=
github.com/vcaesar/gops v0.41.0/go.mod h1:/3048L7Rj7QjQKTSB+kKc7hDm63YhTWy5QJ10TCP37A=
github.com/vcaesar/imgo v0.41.0 h1:kNLYGrThXhB9Dd6IwFmfPnxq9P6yat2g7dpPjr7OWO8=
github.com/vcaesar/imgo v0.41.0/go.mod h1:/LGOge8etlzaVu/7l+UfhJxR6QqaoX5yeuzGIMfWb4I=
github.com/vcaesar/keycode v0.10.1 h1:0DesGmMAPWpYTCYddOFiCMKCDKgNnwiQa2QXindVUHw=
github.com/vcaesar/keycode v0.10.1/go.mod h1:JNlY7xbKsh+LAGfY2j4M3znVrGEm5W1R8s/Uv6BJcfQ=
github.com/vcaesar/screenshot v0.11.1 h1:GgPuN89XC4Yh38dLx4quPlSo3YiWWhwIria/j3LtrqU=
github.com/vcaesar/screenshot v0.11.1/go.mod h1:gJNwHBiP1v1v7i8TQ4yV1XJtcyn2I/OJL7OziVQkwjs=
github.com/vcaesar/tt v0.20.1 h1:D/jUeeVCNbq3ad8M7hhtB3J9x5RZ6I1n1eZ0BJp7M+4=
github.com/vcaesar/tt v0.20.1/go.mod h1:cH2+AwGAJm19Wa6xvEa+0r+sXDJBT0QgNQey6mwqLeU=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 h1:DHNhtq3sNNzrvduZZIiFyXWOL9IWaDPHqTnLJp+rCBY=
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0=
golang.org/x/image v0.33.0 h1:LXRZRnv1+zGd5XBUVRFmYEphyyKJjQjCRiOuAP3sZfQ=
golang.org/x/image v0.33.0/go.mod h1:DD3OsTYT9chzuzTQt+zMcOlBHgfoKQb1gry8p76Y1sc=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

52
robotgo_demo/main.go Normal file
View File

@ -0,0 +1,52 @@
// Package main demonstrates basic robotgo capabilities:
// mouse control, keyboard typing, screen info, and clipboard.
package main
import (
"fmt"
"time"
"github.com/go-vgo/robotgo"
)
func main() {
// --- Screen info ---
sx, sy := robotgo.GetScreenSize()
fmt.Printf("Screen size: %d x %d\n", sx, sy)
// --- Mouse: read position, move, click ---
x, y := robotgo.Location()
fmt.Printf("Current mouse position: (%d, %d)\n", x, y)
targetX, targetY := sx/2, sy/2
fmt.Printf("Moving mouse to center (%d, %d)...\n", targetX, targetY)
robotgo.MoveSmooth(targetX, targetY)
time.Sleep(500 * time.Millisecond)
nx, ny := robotgo.Location()
fmt.Printf("Mouse is now at: (%d, %d)\n", nx, ny)
// --- Pixel color at center ---
color := robotgo.GetPixelColor(targetX, targetY)
fmt.Printf("Pixel color at center: #%s\n", color)
// --- Clipboard ---
robotgo.WriteAll("Hello from robotgo!")
text, err := robotgo.ReadAll()
if err != nil {
fmt.Println("Clipboard read error:", err)
} else {
fmt.Printf("Clipboard contents: %q\n", text)
}
// --- Keyboard: type into the currently focused window ---
fmt.Println("Typing 'Hello World!' in 2 seconds (focus a text editor)...")
time.Sleep(2 * time.Second)
robotgo.Type("Hello World!")
// --- Window: get active window title ---
title := robotgo.GetTitle()
fmt.Printf("Active window title: %q\n", title)
fmt.Println("Done!")
}