From 7d537c134a32fba34b7705abe2846546cbd49fb8 Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Tue, 24 Feb 2026 21:19:47 +0100 Subject: [PATCH] refactor: auto-detect wireless ADB device, remove phone_config.txt - Replace stored phone_config.txt with _get_wireless_serial() which parses 'adb devices' and auto-picks the ip:port (wireless) entry - Replace _scan_phone_port-based reconnect with _try_wireless_reconnect that scans local /24 subnet on port 5555 via parallel probing - Add _get_local_subnet_prefix() using UDP socket trick (8.8.8.8:80) - Remove PHONE_CONFIG_FILE, _load_phone_config, _save_phone_config, _save_connected_device_config, _scan_phone_port - No config file needed; device is always discovered dynamically - 112 tests passing --- screen_locker/screen_lock.py | 137 ++++++++---------------- screen_locker/tests/test_screen_lock.py | 9 ++ 2 files changed, 56 insertions(+), 90 deletions(-) diff --git a/screen_locker/screen_lock.py b/screen_locker/screen_lock.py index 7a24b5f..f217786 100755 --- a/screen_locker/screen_lock.py +++ b/screen_locker/screen_lock.py @@ -48,8 +48,6 @@ SHUTDOWN_CONFIG_FILE = Path("/etc/shutdown-schedule.conf") ADJUST_SHUTDOWN_SCRIPT = Path(__file__).resolve().parent / "adjust_shutdown_schedule.sh" # State file to track sick day usage and original config values SICK_DAY_STATE_FILE = Path(__file__).resolve().parent / "sick_day_state.json" -# Stores last known phone wireless ADB address (ip:port) for auto-reconnect -PHONE_CONFIG_FILE = Path(__file__).resolve().parent / "phone_config.txt" _STRENGTH_FIELDS: list[tuple[str, int]] = [ ("Exercises (comma-separated):", 50), @@ -947,12 +945,11 @@ class ScreenLocker: def _run_adb(self, args: list[str]) -> tuple[bool, str]: """Run an ADB command and return success flag and stdout.""" adb = shutil.which("adb") or "adb" - # When a specific device is configured and the command targets a device - # (not discovery/connect/disconnect), pin to that serial to avoid - # "more than one device" errors when USB + wireless are both connected. + # When multiple devices are connected (e.g. USB + wireless), pin to + # the wireless device's serial to avoid "more than one device" errors. _discovery_cmds = {"devices", "connect", "disconnect", "kill-server"} serial = ( - self._load_phone_config() + self._get_wireless_serial() if args and args[0] not in _discovery_cmds else None ) @@ -984,6 +981,21 @@ class ScreenLocker: return self._run_adb(["shell", "su", "-c", command]) return self._run_adb(["shell", command]) + def _get_wireless_serial(self) -> str | None: + """Return the serial (ip:port) of the first connected wireless ADB device. + + Used to pin ADB commands to the wireless device when multiple devices + (e.g. USB cable + wireless debugging) are simultaneously connected. + """ + success, output = self._run_adb(["devices"]) + if not success: + return None + for line in output.strip().split("\n")[1:]: + parts = line.split() + if parts and ":" in parts[0] and "device" in line and "offline" not in line: + return parts[0] + return None + def _has_adb_device(self) -> bool: """Return True if adb devices shows at least one connected device.""" success, output = self._run_adb(["devices"]) @@ -992,89 +1004,46 @@ class ScreenLocker: lines = output.strip().split("\n")[1:] return any("device" in line and "offline" not in line for line in lines) - def _load_phone_config(self) -> str | None: - """Load stored phone wireless ADB address (ip:port).""" - if not PHONE_CONFIG_FILE.exists(): - return None - try: - return PHONE_CONFIG_FILE.read_text().strip() or None - except OSError: - return None - - def _save_phone_config(self, address: str) -> None: - """Persist a working phone wireless ADB address for future reconnects.""" - try: - PHONE_CONFIG_FILE.write_text(address) - _logger.info("Saved phone config: %s", address) - except OSError as e: - _logger.warning("Could not save phone config: %s", e) - def _try_adb_connect(self, address: str) -> bool: """Run adb connect to address. Returns True on success.""" _, output = self._run_adb(["connect", address]) lower = output.lower() return "connected" in lower and "unable" not in lower and "failed" not in lower - def _scan_phone_port(self, ip: str) -> int | None: - """Scan for an open ADB port on the phone's IP. - - Tries port 5555 first (legacy ADB), then scans the typical - Android 11+ wireless ADB port range in parallel. - - Args: - ip: Phone IP address to scan. - - Returns: - Open port number, or None if not found. - """ - - def probe(port: int) -> int | None: - with ( - contextlib.suppress(OSError), - socket.create_connection((ip, port), timeout=1.0), - ): - return port - return None - - if probe(5555) is not None: - return 5555 - _logger.info("Scanning %s for wireless ADB port (30000-50000)...", ip) - with ThreadPoolExecutor(max_workers=128) as executor: - for future in as_completed( - executor.submit(probe, p) for p in range(30000, 50001) - ): - result = future.result() - if result is not None: - return result + def _get_local_subnet_prefix(self) -> str | None: + """Detect the local /24 network prefix (e.g. '192.168.1').""" + with ( + contextlib.suppress(OSError), + socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock, + ): + sock.connect(("8.8.8.8", 80)) + return ".".join(sock.getsockname()[0].split(".")[:3]) return None def _try_wireless_reconnect(self) -> bool: - """Attempt to reconnect to the phone over wireless ADB. - - Tries the stored ip:port first. If the port has changed (wireless - debugging restarts assign a new random port), scans the same IP - for the new port and saves it. - - Returns: - True if a device is now connected. - """ - stored = self._load_phone_config() - if stored is None: - _logger.info("No stored phone config — cannot attempt wireless reconnect") + """Scan local /24 subnet on port 5555 and attempt ADB connect to phone.""" + prefix = self._get_local_subnet_prefix() + if prefix is None: + _logger.info("Could not determine local subnet for wireless scan") return False - if self._try_adb_connect(stored) and self._has_adb_device(): - return True - # Stored port may have changed — scan for the new one - ip = stored.split(":")[0] - _logger.info("Stored port failed, scanning %s for new ADB port...", ip) - port = self._scan_phone_port(ip) - if port is None: - _logger.info("No open ADB port found on %s", ip) + + def probe(i: int) -> bool: + ip = f"{prefix}.{i}" + with ( + contextlib.suppress(OSError), + socket.create_connection((ip, 5555), timeout=0.5), + ): + if self._try_adb_connect(f"{ip}:5555"): + return self._has_adb_device() return False - address = f"{ip}:{port}" - if self._try_adb_connect(address) and self._has_adb_device(): - self._save_phone_config(address) - return True + + _logger.info("Scanning %s.1-254:5555 for phone...", prefix) + with ThreadPoolExecutor(max_workers=64) as executor: + for future in as_completed( + executor.submit(probe, i) for i in range(1, 255) + ): + if future.result(): + return True return False def _is_phone_connected(self) -> bool: @@ -1149,24 +1118,12 @@ class ScreenLocker: return "error", "StrongLifts database not found on phone" count = self._count_today_workouts(local_db) if count > 0: - self._save_connected_device_config() return ( "verified", f"Workout verified! ({count} session(s) found on phone)", ) return "not_verified", "No workout found on phone today" - def _save_connected_device_config(self) -> None: - """Save the address of the currently connected wireless ADB device.""" - success, output = self._run_adb(["devices"]) - if not success: - return - for line in output.strip().split("\n")[1:]: - parts = line.split() - if parts and ":" in parts[0] and "device" in line and "offline" not in line: - self._save_phone_config(parts[0]) - return - def _attempt_unlock(self) -> None: """Unlock screen after workout form submission.""" self.unlock_screen() diff --git a/screen_locker/tests/test_screen_lock.py b/screen_locker/tests/test_screen_lock.py index fd8a983..c1b8272 100644 --- a/screen_locker/tests/test_screen_lock.py +++ b/screen_locker/tests/test_screen_lock.py @@ -1425,6 +1425,9 @@ class TestIsPhoneConnected: locker._run_adb = MagicMock( # type: ignore[method-assign] return_value=(True, "List of devices attached\n\n"), ) + locker._try_wireless_reconnect = MagicMock( # type: ignore[method-assign] + return_value=False, + ) assert locker._is_phone_connected() is False @@ -1442,6 +1445,9 @@ class TestIsPhoneConnected: "List of devices attached\nABC123\toffline\n\n", ), ) + locker._try_wireless_reconnect = MagicMock( # type: ignore[method-assign] + return_value=False, + ) assert locker._is_phone_connected() is False @@ -1456,6 +1462,9 @@ class TestIsPhoneConnected: locker._run_adb = MagicMock( # type: ignore[method-assign] return_value=(False, ""), ) + locker._try_wireless_reconnect = MagicMock( # type: ignore[method-assign] + return_value=False, + ) assert locker._is_phone_connected() is False