refactor: auto-detect wireless ADB device, remove phone_config.txt

- Replace stored phone_config.txt with _get_wireless_serial() which
  parses 'adb devices' and auto-picks the ip:port (wireless) entry
- Replace _scan_phone_port-based reconnect with _try_wireless_reconnect
  that scans local /24 subnet on port 5555 via parallel probing
- Add _get_local_subnet_prefix() using UDP socket trick (8.8.8.8:80)
- Remove PHONE_CONFIG_FILE, _load_phone_config, _save_phone_config,
  _save_connected_device_config, _scan_phone_port
- No config file needed; device is always discovered dynamically
- 112 tests passing
This commit is contained in:
Krzysztof kuhy Rudnicki 2026-02-24 21:19:47 +01:00
parent 78086b1785
commit 7d537c134a
2 changed files with 56 additions and 90 deletions

View File

@ -48,8 +48,6 @@ SHUTDOWN_CONFIG_FILE = Path("/etc/shutdown-schedule.conf")
ADJUST_SHUTDOWN_SCRIPT = Path(__file__).resolve().parent / "adjust_shutdown_schedule.sh" ADJUST_SHUTDOWN_SCRIPT = Path(__file__).resolve().parent / "adjust_shutdown_schedule.sh"
# State file to track sick day usage and original config values # State file to track sick day usage and original config values
SICK_DAY_STATE_FILE = Path(__file__).resolve().parent / "sick_day_state.json" SICK_DAY_STATE_FILE = Path(__file__).resolve().parent / "sick_day_state.json"
# Stores last known phone wireless ADB address (ip:port) for auto-reconnect
PHONE_CONFIG_FILE = Path(__file__).resolve().parent / "phone_config.txt"
_STRENGTH_FIELDS: list[tuple[str, int]] = [ _STRENGTH_FIELDS: list[tuple[str, int]] = [
("Exercises (comma-separated):", 50), ("Exercises (comma-separated):", 50),
@ -947,12 +945,11 @@ class ScreenLocker:
def _run_adb(self, args: list[str]) -> tuple[bool, str]: def _run_adb(self, args: list[str]) -> tuple[bool, str]:
"""Run an ADB command and return success flag and stdout.""" """Run an ADB command and return success flag and stdout."""
adb = shutil.which("adb") or "adb" adb = shutil.which("adb") or "adb"
# When a specific device is configured and the command targets a device # When multiple devices are connected (e.g. USB + wireless), pin to
# (not discovery/connect/disconnect), pin to that serial to avoid # the wireless device's serial to avoid "more than one device" errors.
# "more than one device" errors when USB + wireless are both connected.
_discovery_cmds = {"devices", "connect", "disconnect", "kill-server"} _discovery_cmds = {"devices", "connect", "disconnect", "kill-server"}
serial = ( serial = (
self._load_phone_config() self._get_wireless_serial()
if args and args[0] not in _discovery_cmds if args and args[0] not in _discovery_cmds
else None else None
) )
@ -984,6 +981,21 @@ class ScreenLocker:
return self._run_adb(["shell", "su", "-c", command]) return self._run_adb(["shell", "su", "-c", command])
return self._run_adb(["shell", command]) return self._run_adb(["shell", command])
def _get_wireless_serial(self) -> str | None:
"""Return the serial (ip:port) of the first connected wireless ADB device.
Used to pin ADB commands to the wireless device when multiple devices
(e.g. USB cable + wireless debugging) are simultaneously connected.
"""
success, output = self._run_adb(["devices"])
if not success:
return None
for line in output.strip().split("\n")[1:]:
parts = line.split()
if parts and ":" in parts[0] and "device" in line and "offline" not in line:
return parts[0]
return None
def _has_adb_device(self) -> bool: def _has_adb_device(self) -> bool:
"""Return True if adb devices shows at least one connected device.""" """Return True if adb devices shows at least one connected device."""
success, output = self._run_adb(["devices"]) success, output = self._run_adb(["devices"])
@ -992,89 +1004,46 @@ class ScreenLocker:
lines = output.strip().split("\n")[1:] lines = output.strip().split("\n")[1:]
return any("device" in line and "offline" not in line for line in lines) return any("device" in line and "offline" not in line for line in lines)
def _load_phone_config(self) -> str | None:
"""Load stored phone wireless ADB address (ip:port)."""
if not PHONE_CONFIG_FILE.exists():
return None
try:
return PHONE_CONFIG_FILE.read_text().strip() or None
except OSError:
return None
def _save_phone_config(self, address: str) -> None:
"""Persist a working phone wireless ADB address for future reconnects."""
try:
PHONE_CONFIG_FILE.write_text(address)
_logger.info("Saved phone config: %s", address)
except OSError as e:
_logger.warning("Could not save phone config: %s", e)
def _try_adb_connect(self, address: str) -> bool: def _try_adb_connect(self, address: str) -> bool:
"""Run adb connect to address. Returns True on success.""" """Run adb connect to address. Returns True on success."""
_, output = self._run_adb(["connect", address]) _, output = self._run_adb(["connect", address])
lower = output.lower() lower = output.lower()
return "connected" in lower and "unable" not in lower and "failed" not in lower return "connected" in lower and "unable" not in lower and "failed" not in lower
def _scan_phone_port(self, ip: str) -> int | None: def _get_local_subnet_prefix(self) -> str | None:
"""Scan for an open ADB port on the phone's IP. """Detect the local /24 network prefix (e.g. '192.168.1')."""
with (
Tries port 5555 first (legacy ADB), then scans the typical contextlib.suppress(OSError),
Android 11+ wireless ADB port range in parallel. socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock,
):
Args: sock.connect(("8.8.8.8", 80))
ip: Phone IP address to scan. return ".".join(sock.getsockname()[0].split(".")[:3])
Returns:
Open port number, or None if not found.
"""
def probe(port: int) -> int | None:
with (
contextlib.suppress(OSError),
socket.create_connection((ip, port), timeout=1.0),
):
return port
return None
if probe(5555) is not None:
return 5555
_logger.info("Scanning %s for wireless ADB port (30000-50000)...", ip)
with ThreadPoolExecutor(max_workers=128) as executor:
for future in as_completed(
executor.submit(probe, p) for p in range(30000, 50001)
):
result = future.result()
if result is not None:
return result
return None return None
def _try_wireless_reconnect(self) -> bool: def _try_wireless_reconnect(self) -> bool:
"""Attempt to reconnect to the phone over wireless ADB. """Scan local /24 subnet on port 5555 and attempt ADB connect to phone."""
prefix = self._get_local_subnet_prefix()
Tries the stored ip:port first. If the port has changed (wireless if prefix is None:
debugging restarts assign a new random port), scans the same IP _logger.info("Could not determine local subnet for wireless scan")
for the new port and saves it.
Returns:
True if a device is now connected.
"""
stored = self._load_phone_config()
if stored is None:
_logger.info("No stored phone config — cannot attempt wireless reconnect")
return False return False
if self._try_adb_connect(stored) and self._has_adb_device():
return True def probe(i: int) -> bool:
# Stored port may have changed — scan for the new one ip = f"{prefix}.{i}"
ip = stored.split(":")[0] with (
_logger.info("Stored port failed, scanning %s for new ADB port...", ip) contextlib.suppress(OSError),
port = self._scan_phone_port(ip) socket.create_connection((ip, 5555), timeout=0.5),
if port is None: ):
_logger.info("No open ADB port found on %s", ip) if self._try_adb_connect(f"{ip}:5555"):
return self._has_adb_device()
return False return False
address = f"{ip}:{port}"
if self._try_adb_connect(address) and self._has_adb_device(): _logger.info("Scanning %s.1-254:5555 for phone...", prefix)
self._save_phone_config(address) with ThreadPoolExecutor(max_workers=64) as executor:
return True for future in as_completed(
executor.submit(probe, i) for i in range(1, 255)
):
if future.result():
return True
return False return False
def _is_phone_connected(self) -> bool: def _is_phone_connected(self) -> bool:
@ -1149,24 +1118,12 @@ class ScreenLocker:
return "error", "StrongLifts database not found on phone" return "error", "StrongLifts database not found on phone"
count = self._count_today_workouts(local_db) count = self._count_today_workouts(local_db)
if count > 0: if count > 0:
self._save_connected_device_config()
return ( return (
"verified", "verified",
f"Workout verified! ({count} session(s) found on phone)", f"Workout verified! ({count} session(s) found on phone)",
) )
return "not_verified", "No workout found on phone today" return "not_verified", "No workout found on phone today"
def _save_connected_device_config(self) -> None:
"""Save the address of the currently connected wireless ADB device."""
success, output = self._run_adb(["devices"])
if not success:
return
for line in output.strip().split("\n")[1:]:
parts = line.split()
if parts and ":" in parts[0] and "device" in line and "offline" not in line:
self._save_phone_config(parts[0])
return
def _attempt_unlock(self) -> None: def _attempt_unlock(self) -> None:
"""Unlock screen after workout form submission.""" """Unlock screen after workout form submission."""
self.unlock_screen() self.unlock_screen()

View File

@ -1425,6 +1425,9 @@ class TestIsPhoneConnected:
locker._run_adb = MagicMock( # type: ignore[method-assign] locker._run_adb = MagicMock( # type: ignore[method-assign]
return_value=(True, "List of devices attached\n\n"), return_value=(True, "List of devices attached\n\n"),
) )
locker._try_wireless_reconnect = MagicMock( # type: ignore[method-assign]
return_value=False,
)
assert locker._is_phone_connected() is False assert locker._is_phone_connected() is False
@ -1442,6 +1445,9 @@ class TestIsPhoneConnected:
"List of devices attached\nABC123\toffline\n\n", "List of devices attached\nABC123\toffline\n\n",
), ),
) )
locker._try_wireless_reconnect = MagicMock( # type: ignore[method-assign]
return_value=False,
)
assert locker._is_phone_connected() is False assert locker._is_phone_connected() is False
@ -1456,6 +1462,9 @@ class TestIsPhoneConnected:
locker._run_adb = MagicMock( # type: ignore[method-assign] locker._run_adb = MagicMock( # type: ignore[method-assign]
return_value=(False, ""), return_value=(False, ""),
) )
locker._try_wireless_reconnect = MagicMock( # type: ignore[method-assign]
return_value=False,
)
assert locker._is_phone_connected() is False assert locker._is_phone_connected() is False