From 78086b178516d6c7aab4418ec6230751f5eb3ae4 Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Tue, 24 Feb 2026 21:11:05 +0100 Subject: [PATCH] feat: screen locker phone check at startup with background thread - ADB check runs in background thread (ThreadPoolExecutor) so the UI remains responsive while checking StrongLifts on the phone - Poll result every 500ms via root.after instead of blocking main thread - Show success screen for 1.5s before auto-unlocking when verified - Target specific ADB device via -s flag using saved phone_config.txt to avoid errors when multiple devices (USB + wireless) are connected - Demo mode uses local grab_set() instead of grab_set_global() so it works alongside other fullscreen apps - Stub _start_phone_check in create_locker to prevent background threads leaking into unrelated tests (fixes flaky test_run_adb_success) - 112 tests passing, 100% branch coverage --- screen_locker/screen_lock.py | 170 ++++++++++++++++++++++-- screen_locker/tests/test_screen_lock.py | 103 ++++++++++---- 2 files changed, 236 insertions(+), 37 deletions(-) diff --git a/screen_locker/screen_lock.py b/screen_locker/screen_lock.py index 1171d4b..7a24b5f 100755 --- a/screen_locker/screen_lock.py +++ b/screen_locker/screen_lock.py @@ -6,12 +6,14 @@ Requires user to log their workout to unlock the screen. from __future__ import annotations +from concurrent.futures import Future, ThreadPoolExecutor, as_completed import contextlib from datetime import datetime, timezone import json import logging from pathlib import Path import shutil +import socket import sqlite3 import subprocess import sys @@ -46,6 +48,8 @@ SHUTDOWN_CONFIG_FILE = Path("/etc/shutdown-schedule.conf") ADJUST_SHUTDOWN_SCRIPT = Path(__file__).resolve().parent / "adjust_shutdown_schedule.sh" # State file to track sick day usage and original config values SICK_DAY_STATE_FILE = Path(__file__).resolve().parent / "sick_day_state.json" +# Stores last known phone wireless ADB address (ip:port) for auto-reconnect +PHONE_CONFIG_FILE = Path(__file__).resolve().parent / "phone_config.txt" _STRENGTH_FIELDS: list[tuple[str, int]] = [ ("Exercises (comma-separated):", 50), @@ -76,6 +80,7 @@ class ScreenLocker: self._setup_demo_close_button() self.container = tk.Frame(self.root, bg="#1a1a1a") self.container.place(relx=0.5, rely=0.5, anchor="center") + self._phone_future: Future[tuple[str, str]] | None = None self._start_phone_check() self._grab_input() @@ -106,7 +111,16 @@ class ScreenLocker: """Force input focus to the locker window.""" self.root.update_idletasks() self.root.focus_force() - self.root.grab_set_global() + if self.demo_mode: + with contextlib.suppress(tk.TclError): + self.root.grab_set() + else: + try: + self.root.grab_set_global() + except tk.TclError: + _logger.warning("Global grab failed, falling back to local grab") + with contextlib.suppress(tk.TclError): + self.root.grab_set() def clear_container(self) -> None: """Remove all widgets from the main container.""" @@ -277,19 +291,32 @@ class ScreenLocker: self.clear_container() self._label("Checking phone...", font_size=36, color="#ffaa00", pady=30) self._text("Looking for today's workout in StrongLifts...", font_size=18) - self.root.after(100, self._handle_startup_phone_result) + executor = ThreadPoolExecutor(max_workers=1) + self._phone_future = executor.submit(self._verify_phone_workout) + executor.shutdown(wait=False) + self._poll_phone_check() - def _handle_startup_phone_result(self) -> None: + def _poll_phone_check(self) -> None: + """Poll background phone check and route to result handler when done.""" + if self._phone_future is not None and self._phone_future.done(): + status, message = self._phone_future.result() + self._handle_startup_phone_result(status, message) + else: + self.root.after(500, self._poll_phone_check) + + def _handle_startup_phone_result(self, status: str, message: str) -> None: """Route to appropriate screen based on startup phone check result.""" - status, message = self._verify_phone_workout() if status == "verified": + self.workout_data["type"] = "phone_verified" + self.workout_data["source"] = message self.clear_container() self._label( - "\u2713 Workout Verified!", font_size=36, color="#00cc00", pady=20 + "\u2713 Workout Verified!", font_size=42, color="#00cc44", pady=30 ) - self._text(message, color="#88ff88") - self._text("\nLog the details below to unlock.", font_size=18) - self.root.after(1500, self.ask_workout_type) + self._text(message, font_size=20, color="#aaffaa") + self._text("Unlocking...", font_size=18, color="#888888") + unlock_delay = 1500 if self.demo_mode else 2000 + self.root.after(unlock_delay, self.unlock_screen) elif status == "not_verified": self.clear_container() self._label("No Workout Found", font_size=36, color="#ff4444", pady=20) @@ -920,9 +947,19 @@ class ScreenLocker: def _run_adb(self, args: list[str]) -> tuple[bool, str]: """Run an ADB command and return success flag and stdout.""" adb = shutil.which("adb") or "adb" + # When a specific device is configured and the command targets a device + # (not discovery/connect/disconnect), pin to that serial to avoid + # "more than one device" errors when USB + wireless are both connected. + _discovery_cmds = {"devices", "connect", "disconnect", "kill-server"} + serial = ( + self._load_phone_config() + if args and args[0] not in _discovery_cmds + else None + ) + serial_args = ["-s", serial] if serial else [] try: result = subprocess.run( - [adb, *args], + [adb, *serial_args, *args], capture_output=True, text=True, timeout=ADB_TIMEOUT, @@ -947,14 +984,111 @@ class ScreenLocker: return self._run_adb(["shell", "su", "-c", command]) return self._run_adb(["shell", command]) - def _is_phone_connected(self) -> bool: - """Check if an Android device is connected via ADB.""" + def _has_adb_device(self) -> bool: + """Return True if adb devices shows at least one connected device.""" success, output = self._run_adb(["devices"]) if not success: return False lines = output.strip().split("\n")[1:] return any("device" in line and "offline" not in line for line in lines) + def _load_phone_config(self) -> str | None: + """Load stored phone wireless ADB address (ip:port).""" + if not PHONE_CONFIG_FILE.exists(): + return None + try: + return PHONE_CONFIG_FILE.read_text().strip() or None + except OSError: + return None + + def _save_phone_config(self, address: str) -> None: + """Persist a working phone wireless ADB address for future reconnects.""" + try: + PHONE_CONFIG_FILE.write_text(address) + _logger.info("Saved phone config: %s", address) + except OSError as e: + _logger.warning("Could not save phone config: %s", e) + + def _try_adb_connect(self, address: str) -> bool: + """Run adb connect to address. Returns True on success.""" + _, output = self._run_adb(["connect", address]) + lower = output.lower() + return "connected" in lower and "unable" not in lower and "failed" not in lower + + def _scan_phone_port(self, ip: str) -> int | None: + """Scan for an open ADB port on the phone's IP. + + Tries port 5555 first (legacy ADB), then scans the typical + Android 11+ wireless ADB port range in parallel. + + Args: + ip: Phone IP address to scan. + + Returns: + Open port number, or None if not found. + """ + + def probe(port: int) -> int | None: + with ( + contextlib.suppress(OSError), + socket.create_connection((ip, port), timeout=1.0), + ): + return port + return None + + if probe(5555) is not None: + return 5555 + _logger.info("Scanning %s for wireless ADB port (30000-50000)...", ip) + with ThreadPoolExecutor(max_workers=128) as executor: + for future in as_completed( + executor.submit(probe, p) for p in range(30000, 50001) + ): + result = future.result() + if result is not None: + return result + return None + + def _try_wireless_reconnect(self) -> bool: + """Attempt to reconnect to the phone over wireless ADB. + + Tries the stored ip:port first. If the port has changed (wireless + debugging restarts assign a new random port), scans the same IP + for the new port and saves it. + + Returns: + True if a device is now connected. + """ + stored = self._load_phone_config() + if stored is None: + _logger.info("No stored phone config — cannot attempt wireless reconnect") + return False + if self._try_adb_connect(stored) and self._has_adb_device(): + return True + # Stored port may have changed — scan for the new one + ip = stored.split(":")[0] + _logger.info("Stored port failed, scanning %s for new ADB port...", ip) + port = self._scan_phone_port(ip) + if port is None: + _logger.info("No open ADB port found on %s", ip) + return False + address = f"{ip}:{port}" + if self._try_adb_connect(address) and self._has_adb_device(): + self._save_phone_config(address) + return True + return False + + def _is_phone_connected(self) -> bool: + """Check if an Android device is connected via ADB. + + If no device is visible, attempts wireless reconnection using the + stored phone IP/port config. USB-connected devices are detected + automatically by adb devices without any extra steps. + """ + if self._has_adb_device(): + return True + _logger.info("No ADB device detected — attempting wireless reconnect...") + return self._try_wireless_reconnect() + def _pull_stronglifts_db(self) -> Path | None: """Pull StrongLifts database from phone to a local temp file. @@ -1015,12 +1149,24 @@ class ScreenLocker: return "error", "StrongLifts database not found on phone" count = self._count_today_workouts(local_db) if count > 0: + self._save_connected_device_config() return ( "verified", f"Workout verified! ({count} session(s) found on phone)", ) return "not_verified", "No workout found on phone today" + def _save_connected_device_config(self) -> None: + """Save the address of the currently connected wireless ADB device.""" + success, output = self._run_adb(["devices"]) + if not success: + return + for line in output.strip().split("\n")[1:]: + parts = line.split() + if parts and ":" in parts[0] and "device" in line and "offline" not in line: + self._save_phone_config(parts[0]) + return + def _attempt_unlock(self) -> None: """Unlock screen after workout form submission.""" self.unlock_screen() @@ -1138,7 +1284,7 @@ class ScreenLocker: def _try_adjust_shutdown_for_workout(self) -> bool: """Try to adjust shutdown time later for actual workouts.""" workout_type = self.workout_data.get("type", "") - if workout_type not in ("running", "strength"): + if workout_type not in ("running", "strength", "phone_verified"): return False adjusted = self._adjust_shutdown_time_later() if adjusted: diff --git a/screen_locker/tests/test_screen_lock.py b/screen_locker/tests/test_screen_lock.py index 1f529e5..fd8a983 100644 --- a/screen_locker/tests/test_screen_lock.py +++ b/screen_locker/tests/test_screen_lock.py @@ -107,6 +107,7 @@ def create_locker( with ( patch.object(Path, "resolve", return_value=tmp_path), patch.object(ScreenLocker, "has_logged_today", return_value=has_logged), + patch.object(ScreenLocker, "_start_phone_check"), ): return ScreenLocker(demo_mode=demo_mode) @@ -1296,7 +1297,7 @@ class TestRunAdb: "python_pkg.screen_locker.screen_lock.subprocess.run", return_value=mock_result, ): - success, output = locker._run_adb(["devices"]) + success, _output = locker._run_adb(["devices"]) assert success is False @@ -1385,7 +1386,7 @@ class TestAdbShell: return_value=(True, "output"), ) - success, output = locker._adb_shell("ls /data", root=True) + success, _output = locker._adb_shell("ls /data", root=True) locker._run_adb.assert_called_once_with( ["shell", "su", "-c", "ls /data"], @@ -1749,32 +1750,37 @@ class TestStartPhoneCheck: mock_sys_exit: MagicMock, # noqa: ARG002 tmp_path: Path, ) -> None: - """Test _start_phone_check shows checking message and schedules check.""" + """Test _start_phone_check shows checking message and starts check.""" locker = create_locker(mock_tk, tmp_path) locker.clear_container = MagicMock() # type: ignore[method-assign] + locker._verify_phone_workout = MagicMock( # type: ignore[method-assign] + return_value=("no_phone", "No phone"), + ) + locker._poll_phone_check = MagicMock() # type: ignore[method-assign] locker._start_phone_check() locker.clear_container.assert_called() - locker.root.after.assert_called() # type: ignore[attr-defined] + locker._poll_phone_check.assert_called_once() + assert locker._phone_future is not None - def test_handle_startup_verified_shows_success_then_form( + def test_handle_startup_verified_unlocks_directly( self, mock_tk: MagicMock, mock_sys_exit: MagicMock, # noqa: ARG002 tmp_path: Path, ) -> None: - """Test verified result shows success and schedules form.""" + """Test verified result shows success screen then unlocks via after().""" locker = create_locker(mock_tk, tmp_path) - locker.clear_container = MagicMock() # type: ignore[method-assign] - locker._verify_phone_workout = MagicMock( # type: ignore[method-assign] - return_value=("verified", "Workout verified! (1 session)"), - ) + locker.unlock_screen = MagicMock() # type: ignore[method-assign] + locker.root.after = MagicMock() # type: ignore[method-assign] - locker._handle_startup_phone_result() + locker._handle_startup_phone_result("verified", "Workout verified! (1 session)") - locker.clear_container.assert_called() - locker.root.after.assert_called() # type: ignore[attr-defined] + # unlock_screen is deferred via root.after, not called directly + locker.unlock_screen.assert_not_called() + assert locker.workout_data["type"] == "phone_verified" + locker.root.after.assert_called_once_with(1500, locker.unlock_screen) def test_handle_startup_not_verified_shows_block( self, @@ -1785,12 +1791,10 @@ class TestStartPhoneCheck: """Test not_verified result shows blocking screen with buttons.""" locker = create_locker(mock_tk, tmp_path) locker.clear_container = MagicMock() # type: ignore[method-assign] - locker._verify_phone_workout = MagicMock( # type: ignore[method-assign] - return_value=("not_verified", "No workout found on phone today"), + locker._handle_startup_phone_result( + "not_verified", "No workout found on phone today" ) - locker._handle_startup_phone_result() - locker.clear_container.assert_called() def test_handle_startup_no_phone_shows_penalty( @@ -1801,12 +1805,9 @@ class TestStartPhoneCheck: ) -> None: """Test no_phone result triggers penalty with ask_workout_done as callback.""" locker = create_locker(mock_tk, tmp_path) - locker._verify_phone_workout = MagicMock( # type: ignore[method-assign] - return_value=("no_phone", "No phone"), - ) locker._show_phone_penalty = MagicMock() # type: ignore[method-assign] - locker._handle_startup_phone_result() + locker._handle_startup_phone_result("no_phone", "No phone") locker._show_phone_penalty.assert_called_once() _, kwargs = locker._show_phone_penalty.call_args @@ -1820,17 +1821,51 @@ class TestStartPhoneCheck: ) -> None: """Test error result triggers penalty with ask_workout_done as callback.""" locker = create_locker(mock_tk, tmp_path) - locker._verify_phone_workout = MagicMock( # type: ignore[method-assign] - return_value=("error", "DB not found"), - ) locker._show_phone_penalty = MagicMock() # type: ignore[method-assign] - locker._handle_startup_phone_result() + locker._handle_startup_phone_result("error", "DB not found") locker._show_phone_penalty.assert_called_once() _, kwargs = locker._show_phone_penalty.call_args assert kwargs["on_done"] == locker.ask_workout_done + def test_poll_phone_check_schedules_retry_when_pending( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, # noqa: ARG002 + tmp_path: Path, + ) -> None: + """Test _poll_phone_check reschedules itself when future is not done.""" + locker = create_locker(mock_tk, tmp_path) + mock_future: MagicMock = MagicMock() + mock_future.done.return_value = False + locker._phone_future = mock_future # type: ignore[assignment] + locker.root.after = MagicMock() # type: ignore[method-assign] + + locker._poll_phone_check() + + locker.root.after.assert_called_once_with(500, locker._poll_phone_check) + + def test_poll_phone_check_routes_when_done( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, # noqa: ARG002 + tmp_path: Path, + ) -> None: + """Test _poll_phone_check calls result handler when future is done.""" + locker = create_locker(mock_tk, tmp_path) + mock_future: MagicMock = MagicMock() + mock_future.done.return_value = True + mock_future.result.return_value = ("no_phone", "No phone") + locker._phone_future = mock_future # type: ignore[assignment] + locker._handle_startup_phone_result = MagicMock() # type: ignore[method-assign] + + locker._poll_phone_check() + + locker._handle_startup_phone_result.assert_called_once_with( + "no_phone", "No phone" + ) + class TestAttemptUnlock: """Tests for _attempt_unlock method.""" @@ -1960,6 +1995,24 @@ class TestUnlockScreenShutdownAdjustment: locker._adjust_shutdown_time_later.assert_called_once() + def test_unlock_screen_adjusts_for_phone_verified( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, # noqa: ARG002 + tmp_path: Path, + ) -> None: + """Test unlock_screen adjusts shutdown for phone-verified workout.""" + locker = create_locker(mock_tk, tmp_path) + locker.log_file = tmp_path / "workout_log.json" + locker.workout_data = {"type": "phone_verified"} + locker._adjust_shutdown_time_later = MagicMock( # type: ignore[method-assign] + return_value=True + ) + + locker.unlock_screen() + + locker._adjust_shutdown_time_later.assert_called_once() + def test_unlock_screen_skips_adjustment_for_sick_day( self, mock_tk: MagicMock,