feat: screen locker made even stronger

This commit is contained in:
Krzysztof kuhy Rudnicki 2026-02-23 22:50:42 +01:00
parent 2a8d61088b
commit 192c91094e
3 changed files with 960 additions and 54 deletions

View File

@ -136,6 +136,7 @@ get_running_file() {
}
# Clean up stale running state (process no longer running)
# Uses process-name matching so Electron apps that fork don't appear stale.
cleanup_stale_running_state() {
local app="$1"
local running_file
@ -145,21 +146,29 @@ cleanup_stale_running_state() {
return 0
fi
local pid
pid=$(awk '{print $1}' "$running_file" 2>/dev/null || echo "")
local real_binary="${REAL_BINARIES[$app]}"
if [[ -z $pid ]]; then
rm -f "$running_file"
return 0
fi
# Check if process is still running
if ! kill -0 "$pid" 2>/dev/null; then
log_message "CLEANUP: Stale running state for $app (PID $pid no longer exists)"
# Check if any process matching the real binary is still running
if ! is_app_running "$real_binary"; then
log_message "CLEANUP: Stale running state for $app (no matching processes found)"
rm -f "$running_file"
fi
}
# Check if app is running by process name (handles Electron apps that fork)
is_app_running() {
local real_binary="$1"
pgrep -f "$real_binary" >/dev/null 2>&1
}
# Kill all processes matching the real binary path
kill_app() {
local real_binary="$1"
pkill -f "$real_binary" 2>/dev/null || true
sleep 2
pkill -9 -f "$real_binary" 2>/dev/null || true
}
# Launch app with auto-close timer
launch_with_timer() {
local app="$1"
@ -174,11 +183,15 @@ launch_with_timer() {
"$real_binary" "$@" &
local app_pid=$!
# Give Electron apps time to fork before we start polling
sleep 2
# Record state
echo "$app_pid $(date +%s)" >"$running_file"
log_message "LAUNCHED: $app with PID $app_pid (auto-close in ${AUTO_CLOSE_TIMEOUT_MINUTES}m)"
# Spawn the auto-close daemon in a completely detached subshell
# Uses process-name matching so it works for Electron apps that fork on launch
(
# Detach from terminal
exec </dev/null >/dev/null 2>&1
@ -186,8 +199,8 @@ launch_with_timer() {
# Wait for warning time
sleep "$warning_seconds"
# Check if still running before warning
if kill -0 "$app_pid" 2>/dev/null; then
# Check if still running before warning (by process name, not PID)
if is_app_running "$real_binary"; then
# Send warning notification
notify-send -u critical -t 30000 "$app Closing Soon" \
"Session will end in ${AUTO_CLOSE_WARNING_MINUTES} minutes. Save your work!" 2>/dev/null || true
@ -200,39 +213,32 @@ launch_with_timer() {
# Wait remaining time
sleep $((AUTO_CLOSE_WARNING_MINUTES * 60))
# Check if still running
if kill -0 "$app_pid" 2>/dev/null; then
# Check if still running (by process name)
if is_app_running "$real_binary"; then
# Send final notification
notify-send -u critical -t 5000 "🚫 $app Session Ended" \
"Time's up! Closing $app now." 2>/dev/null || true
# Graceful kill first
kill "$app_pid" 2>/dev/null || true
# Kill all matching processes (handles forked Electron children)
kill_app "$real_binary"
# Wait a moment for graceful shutdown
sleep 2
# Force kill if still running
if kill -0 "$app_pid" 2>/dev/null; then
kill -9 "$app_pid" 2>/dev/null || true
fi
echo "$(date '+%Y-%m-%d %H:%M:%S') - AUTO-CLOSED: $app (PID $app_pid) after ${AUTO_CLOSE_TIMEOUT_MINUTES}m" >>"$LOG_FILE" 2>/dev/null || true
echo "$(date '+%Y-%m-%d %H:%M:%S') - AUTO-CLOSED: $app after ${AUTO_CLOSE_TIMEOUT_MINUTES}m" >>"$LOG_FILE" 2>/dev/null || true
fi
rm -f "$running_file" 2>/dev/null || true
) &
disown
# Wait for the app to exit (keeps wrapper process alive while app is running)
wait "$app_pid" 2>/dev/null || true
local exit_code=$?
# Wait for the app to exit by polling process name.
# Electron apps fork immediately so waiting on $app_pid would return too soon.
while is_app_running "$real_binary"; do
sleep 5
done
# Clean up running state
rm -f "$running_file" 2>/dev/null || true
log_message "EXITED: $app (PID $app_pid) with code $exit_code"
return $exit_code
log_message "EXITED: $app"
}
# Main wrapper function - called when wrapping app launches
@ -269,10 +275,17 @@ install_wrapper() {
local wrapper_path="${APPS[$app]}"
local real_binary="${REAL_BINARIES[$app]}"
# Check if already wrapped
# Check if already wrapped: .orig must exist AND current file must be our wrapper
if [[ -f "${wrapper_path}.orig" ]]; then
echo "$app already wrapped"
return 0
if grep -q "block-compulsive-opening" "$wrapper_path" 2>/dev/null; then
echo "$app already wrapped"
return 0
else
# .orig exists but wrapper was overwritten (e.g. by package update)
echo "$app wrapper was overwritten, re-installing..."
rm -f "${wrapper_path}.orig"
# Fall through to re-install
fi
fi
# Check if wrapper location exists (file or symlink)
@ -428,15 +441,10 @@ rewrap_quiet() {
for app in "${!APPS[@]}"; do
local wrapper_path="${APPS[$app]}"
# Check if wrapper was overwritten (no longer our wrapper script)
if [[ -f $wrapper_path ]] && ! grep -q "block-compulsive-opening" "$wrapper_path" 2>/dev/null; then
# Wrapper was overwritten by package update
log_message "REWRAP: $app wrapper was overwritten, re-installing"
# Remove old .orig if exists (it's now stale)
# Re-wrap if wrapper is missing or was overwritten by a package update
if [[ ! -f "${wrapper_path}.orig" ]] || ! grep -q "block-compulsive-opening" "$wrapper_path" 2>/dev/null; then
log_message "REWRAP: $app wrapper missing or overwritten, re-installing"
rm -f "${wrapper_path}.orig"
# Re-install wrapper
install_wrapper "$app" >>"$LOG_FILE" 2>&1 || true
fi
done

View File

@ -11,8 +11,11 @@ from datetime import datetime, timezone
import json
import logging
from pathlib import Path
import shutil
import sqlite3
import subprocess
import sys
import tempfile
import tkinter as tk
from typing import TYPE_CHECKING
@ -30,6 +33,14 @@ MAX_SETS = 20
MAX_REPS = 100
MAX_WEIGHT_KG = 500
SICK_LOCKOUT_SECONDS = 120 # 2 minutes wait when sick
SUBMIT_DELAY_DEMO = 30
SUBMIT_DELAY_PRODUCTION = 180
PHONE_PENALTY_DELAY_DEMO = 10
PHONE_PENALTY_DELAY_PRODUCTION = 600
ADB_TIMEOUT = 15
STRONGLIFTS_DB_REMOTE = (
"/data/data/com.stronglifts.app/databases/StrongLifts-Database-3"
)
SHUTDOWN_CONFIG_FILE = Path("/etc/shutdown-schedule.conf")
# Helper script path (relative to this file)
ADJUST_SHUTDOWN_SCRIPT = Path(__file__).resolve().parent / "adjust_shutdown_schedule.sh"
@ -65,7 +76,7 @@ class ScreenLocker:
self._setup_demo_close_button()
self.container = tk.Frame(self.root, bg="#1a1a1a")
self.container.place(relx=0.5, rely=0.5, anchor="center")
self.ask_workout_done()
self._start_phone_check()
self._grab_input()
def _setup_window(self) -> None:
@ -232,7 +243,9 @@ class ScreenLocker:
self.timer_label = self._text("", font_size=16, color="#ffaa00")
self.submit_btn = self._disabled_submit_button()
self._back_button(back_command)
self.submit_unlock_time = 30
self.submit_unlock_time = (
SUBMIT_DELAY_DEMO if self.demo_mode else SUBMIT_DELAY_PRODUCTION
)
self.entries_to_check = entries
self.submit_command = verify_command
self.update_submit_timer()
@ -259,6 +272,52 @@ class ScreenLocker:
command=self.ask_if_sick,
).pack(side="left", padx=20)
def _start_phone_check(self) -> None:
"""Check phone for today's workout immediately at startup."""
self.clear_container()
self._label("Checking phone...", font_size=36, color="#ffaa00", pady=30)
self._text("Looking for today's workout in StrongLifts...", font_size=18)
self.root.after(100, self._handle_startup_phone_result)
def _handle_startup_phone_result(self) -> None:
"""Route to appropriate screen based on startup phone check result."""
status, message = self._verify_phone_workout()
if status == "verified":
self.clear_container()
self._label(
"\u2713 Workout Verified!", font_size=36, color="#00cc00", pady=20
)
self._text(message, color="#88ff88")
self._text("\nLog the details below to unlock.", font_size=18)
self.root.after(1500, self.ask_workout_type)
elif status == "not_verified":
self.clear_container()
self._label("No Workout Found", font_size=36, color="#ff4444", pady=20)
self._text(
f"\u274c {message}\n\n"
"StrongLifts shows no workout today.\n"
"Go do your workout first!",
color="#ffaa00",
)
frame = self._button_row()
self._button(
frame,
"TRY AGAIN",
bg="#0066cc",
command=self._start_phone_check,
width=12,
).pack(side="left", padx=10)
self._button(
frame,
"I'm sick",
bg="#cc6600",
command=self.ask_if_sick,
width=12,
).pack(side="left", padx=10)
else:
# no_phone or error — penalty timer, then proceed to logging form
self._show_phone_penalty(message, on_done=self.ask_workout_done)
def ask_if_sick(self) -> None:
"""Display sick day question dialog."""
self.clear_container()
@ -694,7 +753,7 @@ class ScreenLocker:
self.workout_data["distance_km"] = str(distance)
self.workout_data["time_minutes"] = str(time_mins)
self.workout_data["pace_min_per_km"] = str(pace)
self.unlock_screen()
self._attempt_unlock()
# ------------------------------------------------------------------
# Strength workout
@ -852,8 +911,165 @@ class ScreenLocker:
self.show_error(total_err)
return
self._store_strength_data(exercises, sets, reps, weights, total_weight)
self._attempt_unlock()
# ------------------------------------------------------------------
# Phone workout verification via ADB + StrongLifts DB
# ------------------------------------------------------------------
def _run_adb(self, args: list[str]) -> tuple[bool, str]:
"""Run an ADB command and return success flag and stdout."""
adb = shutil.which("adb") or "adb"
try:
result = subprocess.run(
[adb, *args],
capture_output=True,
text=True,
timeout=ADB_TIMEOUT,
check=False,
)
except (FileNotFoundError, OSError) as exc:
_logger.warning("ADB not available: %s", exc)
return False, ""
except subprocess.TimeoutExpired:
_logger.warning("ADB command timed out: %s", args)
return False, ""
return result.returncode == 0, result.stdout
def _adb_shell(
self,
command: str,
*,
root: bool = False,
) -> tuple[bool, str]:
"""Run a shell command on the connected Android device."""
if root:
return self._run_adb(["shell", "su", "-c", command])
return self._run_adb(["shell", command])
def _is_phone_connected(self) -> bool:
"""Check if an Android device is connected via ADB."""
success, output = self._run_adb(["devices"])
if not success:
return False
lines = output.strip().split("\n")[1:]
return any("device" in line and "offline" not in line for line in lines)
def _pull_stronglifts_db(self) -> Path | None:
"""Pull StrongLifts database from phone to a local temp file.
Returns:
Path to the local copy, or None on failure.
"""
tmp = Path(tempfile.gettempdir()) / "stronglifts_check.db"
success, _ = self._adb_shell(
f"cat '{STRONGLIFTS_DB_REMOTE}' > /sdcard/_sl_tmp.db",
root=True,
)
if not success:
return None
ok, _ = self._run_adb(["pull", "/sdcard/_sl_tmp.db", str(tmp)])
if not ok:
return None
return tmp
def _count_today_workouts(self, db_path: Path) -> int:
"""Count today's workouts in a local copy of StrongLifts DB.
Args:
db_path: Path to the locally-pulled StrongLifts database.
Returns:
Number of workouts started today (local time).
"""
try:
conn = sqlite3.connect(str(db_path))
try:
cursor = conn.execute(
"SELECT COUNT(*) FROM workouts "
"WHERE date(start / 1000, 'unixepoch', 'localtime') "
"= date('now', 'localtime')",
)
row = cursor.fetchone()
return int(row[0]) if row else 0
finally:
conn.close()
except (sqlite3.Error, ValueError, TypeError):
_logger.warning("Failed to query StrongLifts database")
return 0
def _verify_phone_workout(self) -> tuple[str, str]:
"""Verify workout was recorded in StrongLifts on the phone.
Returns:
Tuple of (status, message) where status is one of:
- "verified": Workout confirmed on phone.
- "not_verified": Phone connected but no workout found.
- "no_phone": No phone connected via ADB.
- "error": Could not access StrongLifts database.
"""
if not self._is_phone_connected():
return "no_phone", "No phone connected via ADB"
local_db = self._pull_stronglifts_db()
if local_db is None:
return "error", "StrongLifts database not found on phone"
count = self._count_today_workouts(local_db)
if count > 0:
return (
"verified",
f"Workout verified! ({count} session(s) found on phone)",
)
return "not_verified", "No workout found on phone today"
def _attempt_unlock(self) -> None:
"""Unlock screen after workout form submission."""
self.unlock_screen()
def _show_phone_penalty(
self, message: str, *, on_done: Callable[[], None] | None = None
) -> None:
"""Show penalty countdown when phone verification is unavailable."""
self.clear_container()
self._phone_penalty_done_fn: Callable[[], None] = (
on_done if on_done is not None else self.unlock_screen
)
delay = (
PHONE_PENALTY_DELAY_DEMO
if self.demo_mode
else PHONE_PENALTY_DELAY_PRODUCTION
)
self._label(
"Cannot Verify Workout",
font_size=36,
color="#ff8800",
pady=20,
)
self._text(message, color="#ffaa00")
self._text(
"Connect phone via ADB to skip this wait,\n"
"or wait for the penalty timer.\n\n"
"Note: Phone must be rooted and StrongLifts installed.",
font_size=18,
)
self.phone_penalty_remaining = delay
self.phone_penalty_label = self._label(
str(delay),
font_size=80,
pady=20,
)
self._update_phone_penalty()
def _update_phone_penalty(self) -> None:
"""Update phone penalty countdown."""
if self.phone_penalty_remaining > 0:
self.phone_penalty_label.config(
text=str(self.phone_penalty_remaining),
)
self.phone_penalty_remaining -= 1
self.root.after(1000, self._update_phone_penalty)
else:
self._phone_penalty_done_fn()
# ------------------------------------------------------------------
# Submit timer and entry checking
# ------------------------------------------------------------------

View File

@ -13,6 +13,8 @@ from __future__ import annotations
from datetime import datetime, timezone
import json
from pathlib import Path
import sqlite3
import subprocess
import tkinter as tk
from typing import TYPE_CHECKING, Any, NamedTuple
from unittest.mock import MagicMock, patch
@ -27,6 +29,11 @@ from python_pkg.screen_locker.screen_lock import (
MAX_TIME_MINUTES,
MAX_WEIGHT_KG,
MIN_EXERCISE_NAME_LEN,
PHONE_PENALTY_DELAY_DEMO,
PHONE_PENALTY_DELAY_PRODUCTION,
STRONGLIFTS_DB_REMOTE,
SUBMIT_DELAY_DEMO,
SUBMIT_DELAY_PRODUCTION,
ScreenLocker,
)
@ -365,16 +372,16 @@ class TestVerifyRunningData:
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test valid running data unlocks screen."""
"""Test valid running data triggers unlock attempt."""
locker = create_locker(mock_tk, tmp_path)
setup_running_entries(locker, RunningData("5", "25", "5"))
locker.log_file = tmp_path / "workout_log.json"
locker.workout_data = {"type": "running"}
locker.unlock_screen = MagicMock() # type: ignore[method-assign]
locker._attempt_unlock = MagicMock() # type: ignore[method-assign]
locker.verify_running_data()
locker.unlock_screen.assert_called_once()
locker._attempt_unlock.assert_called_once()
def test_invalid_distance_zero(
self,
@ -515,16 +522,16 @@ class TestVerifyStrengthData:
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test valid strength data unlocks screen."""
"""Test valid strength data triggers unlock attempt."""
locker = create_locker(mock_tk, tmp_path)
setup_strength_entries(locker, StrengthData("Squat", "3", "10", "50", "1500"))
locker.log_file = tmp_path / "workout_log.json"
locker.workout_data = {"type": "strength"}
locker.unlock_screen = MagicMock() # type: ignore[method-assign]
locker._attempt_unlock = MagicMock() # type: ignore[method-assign]
locker.verify_strength_data()
locker.unlock_screen.assert_called_once()
locker._attempt_unlock.assert_called_once()
def test_valid_multiple_exercises(
self,
@ -540,11 +547,11 @@ class TestVerifyStrengthData:
)
locker.log_file = tmp_path / "workout_log.json"
locker.workout_data = {"type": "strength"}
locker.unlock_screen = MagicMock() # type: ignore[method-assign]
locker._attempt_unlock = MagicMock() # type: ignore[method-assign]
locker.verify_strength_data()
locker.unlock_screen.assert_called_once()
locker._attempt_unlock.assert_called_once()
def test_mismatched_list_lengths(
self,
@ -1081,7 +1088,7 @@ class TestAskRunningDetails:
locker.ask_running_details()
assert locker.submit_unlock_time == 30
assert locker.submit_unlock_time == SUBMIT_DELAY_DEMO
locker.update_submit_timer.assert_called_once()
@ -1138,9 +1145,24 @@ class TestAskStrengthDetails:
locker.ask_strength_details()
assert locker.submit_unlock_time == 30
assert locker.submit_unlock_time == SUBMIT_DELAY_DEMO
locker.update_submit_timer.assert_called_once()
def test_ask_strength_details_production_timer(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test production mode uses longer submit delay."""
locker = create_locker(mock_tk, tmp_path, demo_mode=False)
locker.clear_container = MagicMock() # type: ignore[method-assign]
locker.update_submit_timer = MagicMock() # type: ignore[method-assign]
locker.ask_strength_details()
assert locker.submit_unlock_time == SUBMIT_DELAY_PRODUCTION
class TestAskWorkoutDone:
"""Tests for ask_workout_done method."""
@ -1239,6 +1261,666 @@ class TestAdjustShutdownTimeLater:
assert result is False
class TestRunAdb:
"""Tests for _run_adb ADB command execution."""
def test_run_adb_success(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test successful ADB command."""
locker = create_locker(mock_tk, tmp_path)
mock_result = MagicMock(returncode=0, stdout="ok\n")
with patch(
"python_pkg.screen_locker.screen_lock.subprocess.run",
return_value=mock_result,
) as mock_run:
success, output = locker._run_adb(["devices"])
assert success is True
assert output == "ok\n"
mock_run.assert_called_once()
def test_run_adb_failure(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test failed ADB command."""
locker = create_locker(mock_tk, tmp_path)
mock_result = MagicMock(returncode=1, stdout="")
with patch(
"python_pkg.screen_locker.screen_lock.subprocess.run",
return_value=mock_result,
):
success, output = locker._run_adb(["devices"])
assert success is False
def test_run_adb_not_found(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test ADB binary not found."""
locker = create_locker(mock_tk, tmp_path)
with patch(
"python_pkg.screen_locker.screen_lock.subprocess.run",
side_effect=FileNotFoundError("adb not found"),
):
success, output = locker._run_adb(["devices"])
assert success is False
assert output == ""
def test_run_adb_oserror(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test ADB OSError."""
locker = create_locker(mock_tk, tmp_path)
with patch(
"python_pkg.screen_locker.screen_lock.subprocess.run",
side_effect=OSError("permission denied"),
):
success, output = locker._run_adb(["devices"])
assert success is False
assert output == ""
def test_run_adb_timeout(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test ADB command timeout."""
locker = create_locker(mock_tk, tmp_path)
with patch(
"python_pkg.screen_locker.screen_lock.subprocess.run",
side_effect=subprocess.TimeoutExpired("adb", 15),
):
success, output = locker._run_adb(["devices"])
assert success is False
assert output == ""
class TestAdbShell:
"""Tests for _adb_shell method."""
def test_adb_shell_no_root(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test ADB shell without root."""
locker = create_locker(mock_tk, tmp_path)
locker._run_adb = MagicMock( # type: ignore[method-assign]
return_value=(True, "output"),
)
success, output = locker._adb_shell("ls /sdcard")
locker._run_adb.assert_called_once_with(["shell", "ls /sdcard"])
assert success is True
assert output == "output"
def test_adb_shell_with_root(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test ADB shell with root."""
locker = create_locker(mock_tk, tmp_path)
locker._run_adb = MagicMock( # type: ignore[method-assign]
return_value=(True, "output"),
)
success, output = locker._adb_shell("ls /data", root=True)
locker._run_adb.assert_called_once_with(
["shell", "su", "-c", "ls /data"],
)
assert success is True
class TestIsPhoneConnected:
"""Tests for _is_phone_connected method."""
def test_phone_connected(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test phone detected as connected."""
locker = create_locker(mock_tk, tmp_path)
locker._run_adb = MagicMock( # type: ignore[method-assign]
return_value=(
True,
"List of devices attached\nABC123\tdevice\n\n",
),
)
assert locker._is_phone_connected() is True
def test_phone_not_connected(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test no phone connected."""
locker = create_locker(mock_tk, tmp_path)
locker._run_adb = MagicMock( # type: ignore[method-assign]
return_value=(True, "List of devices attached\n\n"),
)
assert locker._is_phone_connected() is False
def test_phone_offline(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test phone connected but offline."""
locker = create_locker(mock_tk, tmp_path)
locker._run_adb = MagicMock( # type: ignore[method-assign]
return_value=(
True,
"List of devices attached\nABC123\toffline\n\n",
),
)
assert locker._is_phone_connected() is False
def test_adb_command_fails(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test ADB command failure."""
locker = create_locker(mock_tk, tmp_path)
locker._run_adb = MagicMock( # type: ignore[method-assign]
return_value=(False, ""),
)
assert locker._is_phone_connected() is False
class TestFindHealthConnectDb:
"""Tests for _pull_stronglifts_db method."""
def test_db_pulled_successfully(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test StrongLifts DB pulled from device."""
locker = create_locker(mock_tk, tmp_path)
locker._adb_shell = MagicMock( # type: ignore[method-assign]
return_value=(True, ""),
)
locker._run_adb = MagicMock( # type: ignore[method-assign]
return_value=(True, ""),
)
result = locker._pull_stronglifts_db()
assert result is not None
locker._adb_shell.assert_called_once()
locker._run_adb.assert_called_once()
call_args = locker._run_adb.call_args[0][0]
assert call_args[0] == "pull"
def test_db_cat_fails(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test returns None when cat command fails."""
locker = create_locker(mock_tk, tmp_path)
locker._adb_shell = MagicMock( # type: ignore[method-assign]
return_value=(False, ""),
)
assert locker._pull_stronglifts_db() is None
def test_db_pull_fails(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test returns None when adb pull fails."""
locker = create_locker(mock_tk, tmp_path)
locker._adb_shell = MagicMock( # type: ignore[method-assign]
return_value=(True, ""),
)
locker._run_adb = MagicMock( # type: ignore[method-assign]
return_value=(False, ""),
)
assert locker._pull_stronglifts_db() is None
def test_db_uses_correct_remote_path(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test uses the correct StrongLifts DB remote path."""
locker = create_locker(mock_tk, tmp_path)
locker._adb_shell = MagicMock( # type: ignore[method-assign]
return_value=(True, ""),
)
locker._run_adb = MagicMock( # type: ignore[method-assign]
return_value=(True, ""),
)
locker._pull_stronglifts_db()
shell_cmd = locker._adb_shell.call_args[0][0]
assert STRONGLIFTS_DB_REMOTE in shell_cmd
class TestCountTodayWorkouts:
"""Tests for _count_today_workouts method."""
def test_workouts_found_today(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test workouts found today."""
locker = create_locker(mock_tk, tmp_path)
db_file = tmp_path / "sl_test.db"
conn = sqlite3.connect(str(db_file))
conn.execute(
"CREATE TABLE workouts "
"(id TEXT PRIMARY KEY, start INTEGER, finish INTEGER)",
)
# Insert a workout with today's timestamp (ms)
import time
now_ms = int(time.time() * 1000)
conn.execute(
"INSERT INTO workouts VALUES (?, ?, ?)",
("w1", now_ms, now_ms + 3600000),
)
conn.commit()
conn.close()
assert locker._count_today_workouts(db_file) == 1
def test_no_workouts_today(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test no workouts today."""
locker = create_locker(mock_tk, tmp_path)
db_file = tmp_path / "sl_test.db"
conn = sqlite3.connect(str(db_file))
conn.execute(
"CREATE TABLE workouts "
"(id TEXT PRIMARY KEY, start INTEGER, finish INTEGER)",
)
# Insert a workout from yesterday (24h+ ago)
import time
yesterday_ms = int((time.time() - 200000) * 1000)
conn.execute(
"INSERT INTO workouts VALUES (?, ?, ?)",
("w1", yesterday_ms, yesterday_ms + 3600000),
)
conn.commit()
conn.close()
assert locker._count_today_workouts(db_file) == 0
def test_invalid_db_returns_zero(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test returns 0 for invalid database file."""
locker = create_locker(mock_tk, tmp_path)
bad_file = tmp_path / "not_a_db.db"
bad_file.write_text("not a database")
assert locker._count_today_workouts(bad_file) == 0
def test_missing_table_returns_zero(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test returns 0 when workouts table doesn't exist."""
locker = create_locker(mock_tk, tmp_path)
db_file = tmp_path / "empty.db"
conn = sqlite3.connect(str(db_file))
conn.execute("CREATE TABLE other (id TEXT)")
conn.commit()
conn.close()
assert locker._count_today_workouts(db_file) == 0
def test_multiple_workouts_today(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test counts multiple workouts today correctly."""
locker = create_locker(mock_tk, tmp_path)
db_file = tmp_path / "sl_test.db"
conn = sqlite3.connect(str(db_file))
conn.execute(
"CREATE TABLE workouts "
"(id TEXT PRIMARY KEY, start INTEGER, finish INTEGER)",
)
import time
now_ms = int(time.time() * 1000)
conn.execute(
"INSERT INTO workouts VALUES (?, ?, ?)",
("w1", now_ms, now_ms + 3600000),
)
conn.execute(
"INSERT INTO workouts VALUES (?, ?, ?)",
("w2", now_ms + 100000, now_ms + 3700000),
)
conn.commit()
conn.close()
assert locker._count_today_workouts(db_file) == 2
class TestVerifyPhoneWorkout:
"""Tests for _verify_phone_workout method."""
def test_verified(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test workout verified on phone."""
locker = create_locker(mock_tk, tmp_path)
locker._is_phone_connected = MagicMock( # type: ignore[method-assign]
return_value=True,
)
locker._pull_stronglifts_db = MagicMock( # type: ignore[method-assign]
return_value=tmp_path / "sl.db",
)
locker._count_today_workouts = MagicMock( # type: ignore[method-assign]
return_value=2,
)
status, message = locker._verify_phone_workout()
assert status == "verified"
assert "2 session" in message
def test_not_verified(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test no workout found on phone."""
locker = create_locker(mock_tk, tmp_path)
locker._is_phone_connected = MagicMock( # type: ignore[method-assign]
return_value=True,
)
locker._pull_stronglifts_db = MagicMock( # type: ignore[method-assign]
return_value=tmp_path / "sl.db",
)
locker._count_today_workouts = MagicMock( # type: ignore[method-assign]
return_value=0,
)
status, message = locker._verify_phone_workout()
assert status == "not_verified"
assert "No workout" in message
def test_no_phone(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test no phone connected."""
locker = create_locker(mock_tk, tmp_path)
locker._is_phone_connected = MagicMock( # type: ignore[method-assign]
return_value=False,
)
status, _ = locker._verify_phone_workout()
assert status == "no_phone"
def test_error_no_db(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test error when StrongLifts DB cannot be pulled."""
locker = create_locker(mock_tk, tmp_path)
locker._is_phone_connected = MagicMock( # type: ignore[method-assign]
return_value=True,
)
locker._pull_stronglifts_db = MagicMock( # type: ignore[method-assign]
return_value=None,
)
status, message = locker._verify_phone_workout()
assert status == "error"
assert "database" in message.lower()
class TestStartPhoneCheck:
"""Tests for _start_phone_check and _handle_startup_phone_result."""
def test_start_phone_check_shows_checking_screen(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test _start_phone_check shows checking message and schedules check."""
locker = create_locker(mock_tk, tmp_path)
locker.clear_container = MagicMock() # type: ignore[method-assign]
locker._start_phone_check()
locker.clear_container.assert_called()
locker.root.after.assert_called() # type: ignore[attr-defined]
def test_handle_startup_verified_shows_success_then_form(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test verified result shows success and schedules form."""
locker = create_locker(mock_tk, tmp_path)
locker.clear_container = MagicMock() # type: ignore[method-assign]
locker._verify_phone_workout = MagicMock( # type: ignore[method-assign]
return_value=("verified", "Workout verified! (1 session)"),
)
locker._handle_startup_phone_result()
locker.clear_container.assert_called()
locker.root.after.assert_called() # type: ignore[attr-defined]
def test_handle_startup_not_verified_shows_block(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test not_verified result shows blocking screen with buttons."""
locker = create_locker(mock_tk, tmp_path)
locker.clear_container = MagicMock() # type: ignore[method-assign]
locker._verify_phone_workout = MagicMock( # type: ignore[method-assign]
return_value=("not_verified", "No workout found on phone today"),
)
locker._handle_startup_phone_result()
locker.clear_container.assert_called()
def test_handle_startup_no_phone_shows_penalty(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test no_phone result triggers penalty with ask_workout_done as callback."""
locker = create_locker(mock_tk, tmp_path)
locker._verify_phone_workout = MagicMock( # type: ignore[method-assign]
return_value=("no_phone", "No phone"),
)
locker._show_phone_penalty = MagicMock() # type: ignore[method-assign]
locker._handle_startup_phone_result()
locker._show_phone_penalty.assert_called_once()
_, kwargs = locker._show_phone_penalty.call_args
assert kwargs["on_done"] == locker.ask_workout_done
def test_handle_startup_error_shows_penalty(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test error result triggers penalty with ask_workout_done as callback."""
locker = create_locker(mock_tk, tmp_path)
locker._verify_phone_workout = MagicMock( # type: ignore[method-assign]
return_value=("error", "DB not found"),
)
locker._show_phone_penalty = MagicMock() # type: ignore[method-assign]
locker._handle_startup_phone_result()
locker._show_phone_penalty.assert_called_once()
_, kwargs = locker._show_phone_penalty.call_args
assert kwargs["on_done"] == locker.ask_workout_done
class TestAttemptUnlock:
"""Tests for _attempt_unlock method."""
def test_attempt_unlock_calls_unlock_screen(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test _attempt_unlock calls unlock_screen directly."""
locker = create_locker(mock_tk, tmp_path)
locker.log_file = tmp_path / "workout_log.json"
locker.workout_data = {"type": "strength"}
locker.unlock_screen = MagicMock() # type: ignore[method-assign]
locker._attempt_unlock()
locker.unlock_screen.assert_called_once()
class TestShowPhonePenalty:
"""Tests for _show_phone_penalty and _update_phone_penalty methods."""
def test_show_phone_penalty_demo_delay(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test demo mode uses short penalty delay."""
locker = create_locker(mock_tk, tmp_path, demo_mode=True)
locker.clear_container = MagicMock() # type: ignore[method-assign]
locker._show_phone_penalty("test message")
# _update_phone_penalty is called once, decrementing by 1
assert locker.phone_penalty_remaining == PHONE_PENALTY_DELAY_DEMO - 1
def test_show_phone_penalty_production_delay(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test production mode uses long penalty delay."""
locker = create_locker(mock_tk, tmp_path, demo_mode=False)
locker.clear_container = MagicMock() # type: ignore[method-assign]
locker._show_phone_penalty("test message")
assert locker.phone_penalty_remaining == PHONE_PENALTY_DELAY_PRODUCTION - 1
def test_update_phone_penalty_countdown(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test phone penalty countdown decrements."""
locker = create_locker(mock_tk, tmp_path)
locker.phone_penalty_remaining = 5
locker.phone_penalty_label = MagicMock()
locker._update_phone_penalty()
assert locker.phone_penalty_remaining == 4
locker.phone_penalty_label.config.assert_called_once_with(text="5")
locker.root.after.assert_called() # type: ignore[attr-defined]
def test_update_phone_penalty_at_zero(
self,
mock_tk: MagicMock,
mock_sys_exit: MagicMock, # noqa: ARG002
tmp_path: Path,
) -> None:
"""Test phone penalty unlocks when timer reaches zero."""
locker = create_locker(mock_tk, tmp_path)
locker.log_file = tmp_path / "workout_log.json"
locker.workout_data = {"type": "strength"}
locker.phone_penalty_remaining = 0
locker.phone_penalty_label = MagicMock()
locker.unlock_screen = MagicMock() # type: ignore[method-assign]
locker._phone_penalty_done_fn = locker.unlock_screen # type: ignore[attr-defined]
locker._update_phone_penalty()
locker.unlock_screen.assert_called_once()
class TestUnlockScreenShutdownAdjustment:
"""Tests for unlock_screen shutdown time adjustment."""