testsAndMisc/linux_configuration/scripts/digital_wellbeing/focus_mode_daemon.py

472 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""Focus Mode Daemon - Steam/Browser Mutual Exclusion.
This daemon monitors running processes and enforces mutual exclusion between
Steam (gaming) and web browsers. Whichever starts first "wins" and the other
category is blocked/killed.
Run as a systemd user service for continuous monitoring.
"""
from __future__ import annotations
import argparse
import contextlib
from datetime import datetime, timedelta, timezone
import logging
from pathlib import Path
import shutil
import signal
import subprocess
import sys
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from types import FrameType
logger = logging.getLogger(__name__)
# Configuration
STATE_DIR = Path.home() / ".local" / "state" / "focus-mode"
LOG_FILE = STATE_DIR / "focus-mode.log"
WHITELIST_FILE = STATE_DIR / "whitelist"
POLL_INTERVAL = 2 # seconds between process checks
DEFAULT_WHITELIST_MINUTES = 5
# Process patterns
STEAM_PATTERNS = frozenset(
[
"steam",
"steamwebhelper",
"steam_ocompati", # Proton compatibility tool
]
)
# Games often have steam_app_ prefix in process name
STEAM_GAME_PREFIX = "steam_app_"
BROWSER_PATTERNS = frozenset(
[
"firefox",
"firefox-esr",
"librewolf",
"chromium",
"chrome",
"google-chrome",
"brave",
"vivaldi",
"opera",
"microsoft-edge",
"ungoogled-chromium",
"thorium",
]
)
# Electron apps that should NOT be treated as browsers
# These use Chromium under the hood but are not web browsers
ELECTRON_IGNORE = frozenset(
[
"electron",
"code", # VS Code
"chrome_crashpad", # Crashpad handler used by all Electron apps
]
)
# Patterns to ignore (browser helpers that aren't the main browser)
IGNORE_PATTERNS = frozenset(
[
"crashhandler",
"update",
"helper",
"crashpad",
]
)
def log(message: str) -> None:
"""Log message with timestamp."""
timestamp = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
log_line = f"{timestamp} - {message}"
logger.info("%s", log_line)
with contextlib.suppress(OSError):
STATE_DIR.mkdir(parents=True, exist_ok=True)
with LOG_FILE.open("a") as f:
f.write(log_line + "\n")
def is_whitelist_active() -> bool:
"""Check if the browser whitelist is currently active."""
try:
if not WHITELIST_FILE.exists():
return False
expiry_str = WHITELIST_FILE.read_text().strip()
expiry = datetime.fromisoformat(expiry_str)
if datetime.now(tz=timezone.utc) < expiry:
return True
# Expired - clean up
WHITELIST_FILE.unlink(missing_ok=True)
log("Browser whitelist expired")
notify(
"\U0001f6ab Whitelist Expired",
"Browser whitelist ended. Browsers are blocked again.",
"normal",
)
except (ValueError, OSError) as exc:
log(f"Error reading whitelist: {exc}")
with contextlib.suppress(OSError):
WHITELIST_FILE.unlink(missing_ok=True)
return False
def activate_whitelist(minutes: int = DEFAULT_WHITELIST_MINUTES) -> None:
"""Activate the browser whitelist for the given number of minutes."""
STATE_DIR.mkdir(parents=True, exist_ok=True)
expiry = datetime.now(tz=timezone.utc) + timedelta(minutes=minutes)
WHITELIST_FILE.write_text(expiry.isoformat() + "\n")
expiry_str = expiry.strftime("%H:%M:%S")
log(f"Browser whitelist activated for {minutes} minutes (expires {expiry_str} UTC)")
notify(
"\U0001f513 Browser Whitelist Active",
f"Browsers allowed for {minutes} minutes (auth/verification).",
"normal",
)
def cancel_whitelist() -> None:
"""Cancel the browser whitelist."""
if WHITELIST_FILE.exists():
WHITELIST_FILE.unlink(missing_ok=True)
log("Browser whitelist cancelled")
notify(
"\U0001f6ab Whitelist Cancelled",
"Browser whitelist removed. Browsers are blocked again.",
"normal",
)
else:
log("No active whitelist to cancel")
def notify(title: str, message: str, urgency: str = "normal") -> None:
"""Send desktop notification."""
notify_send = shutil.which("notify-send")
if notify_send is None:
return
with contextlib.suppress(OSError, subprocess.SubprocessError):
subprocess.run(
[notify_send, "-u", urgency, title, message],
capture_output=True,
timeout=5,
check=False,
)
def get_running_processes() -> set[str]:
"""Get set of currently running process names by reading /proc directly.
Reads /proc/*/comm to avoid forking a subprocess on every poll cycle.
"""
processes: set[str] = set()
try:
for comm_path in Path("/proc").glob("*/comm"):
with contextlib.suppress(OSError):
proc_name = comm_path.read_text().strip().lower()
if proc_name:
processes.add(proc_name)
except OSError as exc:
log(f"Error reading /proc: {exc}")
return processes
def is_steam_running(processes: set[str]) -> bool:
"""Check if Steam or any Steam game is running."""
for proc in processes:
if proc in STEAM_PATTERNS:
return True
if proc.startswith(STEAM_GAME_PREFIX):
return True
return False
def is_browser_running(processes: set[str]) -> bool:
"""Check if any browser is running."""
for proc in processes:
if proc in ELECTRON_IGNORE:
continue
if any(ign in proc for ign in IGNORE_PATTERNS):
continue
if proc in BROWSER_PATTERNS:
return True
return False
def _run_pkill(pattern: str, *, force: bool = False) -> None:
"""Run pkill with the given pattern."""
pkill_bin = shutil.which("pkill")
if pkill_bin is None:
return
cmd = [pkill_bin]
if force:
cmd.append("-9")
cmd.extend(["-f", pattern])
with contextlib.suppress(OSError, subprocess.SubprocessError):
subprocess.run(cmd, capture_output=True, timeout=5, check=False)
def kill_steam() -> None:
"""Kill all Steam-related processes."""
log("Killing Steam processes...")
notify(
"\U0001f3ae Gaming Blocked",
"Browser is active. Closing Steam.",
"critical",
)
_run_pkill("steam")
time.sleep(2)
_run_pkill("steam", force=True)
def kill_browsers() -> None:
"""Kill all browser processes."""
log("Killing browser processes...")
notify(
"\U0001f310 Browsers Blocked",
"Steam is active. Closing browsers.",
"critical",
)
for browser in BROWSER_PATTERNS:
_run_pkill(browser)
time.sleep(2)
for browser in BROWSER_PATTERNS:
_run_pkill(browser, force=True)
class FocusMode:
"""Tracks current focus mode and enforces mutual exclusion."""
def __init__(self) -> None:
"""Initialize focus mode as inactive."""
self.current_mode: str | None = None
self.mode_start_time: datetime | None = None
def _enter_mode(self, mode: str, msg: str, notification: str) -> None:
"""Enter a new focus mode."""
log(msg)
self.current_mode = mode
self.mode_start_time = datetime.now(tz=timezone.utc)
notify(*notification.split("|", 1))
def _handle_no_mode(
self,
*,
steam_running: bool,
browser_running: bool,
) -> None:
"""Handle updates when no mode is active."""
if steam_running and browser_running:
log("Both Steam and browsers detected at startup - entering GAMING mode")
self.current_mode = "gaming"
self.mode_start_time = datetime.now(tz=timezone.utc)
kill_browsers()
elif steam_running:
self._enter_mode(
"gaming",
"Steam detected - entering GAMING mode",
"\U0001f3ae Gaming Mode|Steam detected. Browsers are now blocked.",
)
elif browser_running:
self._enter_mode(
"browsing",
"Browser detected - entering BROWSING mode",
"\U0001f310 Browsing Mode|Browser detected. Steam is now blocked.",
)
def _handle_gaming(
self,
*,
steam_running: bool,
browser_running: bool,
) -> None:
"""Handle updates in gaming mode."""
if not steam_running:
log("Steam closed - exiting GAMING mode")
self.current_mode = None
self.mode_start_time = None
notify(
"\U0001f3ae Gaming Mode Ended",
"You can now use browsers.",
"normal",
)
elif browser_running:
if is_whitelist_active():
return
log("Browser detected during GAMING mode - killing browsers")
kill_browsers()
def _handle_browsing(
self,
*,
steam_running: bool,
browser_running: bool,
) -> None:
"""Handle updates in browsing mode."""
if not browser_running:
log("Browsers closed - exiting BROWSING mode")
self.current_mode = None
self.mode_start_time = None
notify(
"\U0001f310 Browsing Mode Ended",
"You can now use Steam.",
"normal",
)
elif steam_running:
log("Steam detected during BROWSING mode - killing Steam")
kill_steam()
def update(self, processes: set[str]) -> None:
"""Update focus mode based on running processes."""
steam_running = is_steam_running(processes)
browser_running = is_browser_running(processes)
if self.current_mode is None:
self._handle_no_mode(
steam_running=steam_running,
browser_running=browser_running,
)
elif self.current_mode == "gaming":
self._handle_gaming(
steam_running=steam_running,
browser_running=browser_running,
)
elif self.current_mode == "browsing":
self._handle_browsing(
steam_running=steam_running,
browser_running=browser_running,
)
def get_status(self) -> str:
"""Get current status string."""
if self.current_mode is None:
return "No active focus mode"
duration = ""
if self.mode_start_time:
elapsed = datetime.now(tz=timezone.utc) - self.mode_start_time
minutes = int(elapsed.total_seconds() // 60)
duration = f" (active for {minutes}m)"
if self.current_mode == "gaming":
return f"\U0001f3ae GAMING mode{duration} - browsers blocked"
return f"\U0001f310 BROWSING mode{duration} - Steam blocked"
def write_status(focus: FocusMode) -> None:
"""Write current status to state file for external queries."""
with contextlib.suppress(OSError):
STATE_DIR.mkdir(parents=True, exist_ok=True)
status_file = STATE_DIR / "status"
with status_file.open("w") as f:
f.write(focus.get_status() + "\n")
f.write(f"mode={focus.current_mode or 'none'}\n")
f.write(f"whitelist={'active' if is_whitelist_active() else 'inactive'}\n")
def _parse_args() -> tuple[str, int]:
"""Parse command-line arguments.
Returns a (command, minutes) tuple where command is one of
'daemon', 'whitelist', or 'cancel-whitelist'.
"""
parser = argparse.ArgumentParser(
description="Focus Mode Daemon - Steam/Browser mutual exclusion",
)
sub = parser.add_subparsers(dest="command")
wl = sub.add_parser(
"whitelist",
help=f"Allow browsers temporarily ({DEFAULT_WHITELIST_MINUTES}m default)",
)
wl.add_argument(
"minutes",
nargs="?",
type=int,
default=DEFAULT_WHITELIST_MINUTES,
help="Duration in minutes (default: %(default)s)",
)
sub.add_parser("cancel-whitelist", help="Cancel active browser whitelist")
sub.add_parser("status", help="Show whitelist status")
args = parser.parse_args()
command = args.command or "daemon"
minutes = getattr(args, "minutes", DEFAULT_WHITELIST_MINUTES)
return command, minutes
def _print_whitelist_status() -> None:
"""Print current whitelist status to stdout."""
if not WHITELIST_FILE.exists():
return
try:
expiry_str = WHITELIST_FILE.read_text().strip()
expiry = datetime.fromisoformat(expiry_str)
now = datetime.now(tz=timezone.utc)
if now < expiry:
remaining = expiry - now
int(remaining.total_seconds() // 60)
int(remaining.total_seconds() % 60)
else:
pass
except (ValueError, OSError):
pass
def main() -> None:
"""Run the main daemon loop or handle CLI subcommands."""
logging.basicConfig(format="%(message)s", level=logging.INFO)
command, minutes = _parse_args()
if command == "whitelist":
activate_whitelist(minutes)
return
if command == "cancel-whitelist":
cancel_whitelist()
return
if command == "status":
_print_whitelist_status()
return
log("Focus Mode Daemon starting...")
def handle_signal(signum: int, _frame: FrameType | None) -> None:
"""Handle termination signals."""
log(f"Received signal {signum} - shutting down")
sys.exit(0)
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
focus = FocusMode()
while True:
try:
processes = get_running_processes()
focus.update(processes)
write_status(focus)
except (
OSError,
subprocess.SubprocessError,
) as exc:
log(f"Error in main loop: {exc}")
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
main()