diff --git a/python_pkg/screen_locker/_constants.py b/python_pkg/screen_locker/_constants.py index acc0e58..72ea665 100644 --- a/python_pkg/screen_locker/_constants.py +++ b/python_pkg/screen_locker/_constants.py @@ -11,7 +11,11 @@ ADB_TIMEOUT = 15 STRONGLIFTS_DB_REMOTE = ( "/data/data/com.stronglifts.app/databases/StrongLifts-Database-3" ) +MIN_WORKOUT_DURATION_MINUTES = 50 +MAX_CLOCK_SKEW_SECONDS = 300 # 5 minutes max time skew from NTP SHUTDOWN_CONFIG_FILE = Path("/etc/shutdown-schedule.conf") +# HMAC key for signing workout log entries (root-owned, 0600) +HMAC_KEY_FILE = Path("/etc/workout-locker/hmac.key") # Helper script path (relative to this file) ADJUST_SHUTDOWN_SCRIPT = Path(__file__).resolve().parent / "adjust_shutdown_schedule.sh" # State file to track sick day usage and original config values diff --git a/python_pkg/screen_locker/_log_integrity.py b/python_pkg/screen_locker/_log_integrity.py new file mode 100644 index 0000000..a34c6a2 --- /dev/null +++ b/python_pkg/screen_locker/_log_integrity.py @@ -0,0 +1,78 @@ +"""HMAC-based integrity checking for workout log entries.""" + +from __future__ import annotations + +import hashlib +import hmac +import json +import logging +import secrets + +from python_pkg.screen_locker._constants import HMAC_KEY_FILE + +_logger = logging.getLogger(__name__) + + +def _load_hmac_key() -> bytes | None: + """Load HMAC key from the root-owned key file. + + Returns the key bytes, or None if the file cannot be read. + """ + try: + return HMAC_KEY_FILE.read_bytes().strip() + except OSError: + _logger.warning("Cannot read HMAC key from %s", HMAC_KEY_FILE) + return None + + +def _generate_hmac_key() -> bytes | None: + """Generate a new HMAC key and write it to the key file. + + The key file must be writable (requires root or setup script). + Returns the new key bytes, or None on failure. + """ + key = secrets.token_bytes(32) + try: + HMAC_KEY_FILE.parent.mkdir(parents=True, exist_ok=True) + HMAC_KEY_FILE.write_bytes(key) + except OSError: + _logger.warning("Cannot write HMAC key to %s", HMAC_KEY_FILE) + return None + return key + + +def compute_entry_hmac(entry_data: dict[str, object]) -> str | None: + """Compute HMAC-SHA256 for a workout log entry. + + Args: + entry_data: The log entry dict (without the 'hmac' field). + + Returns: + Hex-encoded HMAC string, or None if the key is unavailable. + """ + key = _load_hmac_key() + if key is None: + return None + payload = json.dumps(entry_data, sort_keys=True, separators=(",", ":")) + return hmac.new(key, payload.encode(), hashlib.sha256).hexdigest() + + +def verify_entry_hmac(entry: dict[str, object]) -> bool: + """Verify HMAC signature of a workout log entry. + + Args: + entry: The full log entry dict including the 'hmac' field. + + Returns: + True if the HMAC is valid, False if invalid or key unavailable. + """ + stored_hmac = entry.get("hmac") + if not isinstance(stored_hmac, str): + return False + key = _load_hmac_key() + if key is None: + return False + entry_without_hmac = {k: v for k, v in entry.items() if k != "hmac"} + payload = json.dumps(entry_without_hmac, sort_keys=True, separators=(",", ":")) + expected = hmac.new(key, payload.encode(), hashlib.sha256).hexdigest() + return hmac.compare_digest(stored_hmac, expected) diff --git a/python_pkg/screen_locker/_phone_verification.py b/python_pkg/screen_locker/_phone_verification.py index 5d158ef..03fa8f7 100644 --- a/python_pkg/screen_locker/_phone_verification.py +++ b/python_pkg/screen_locker/_phone_verification.py @@ -11,8 +11,14 @@ import socket import sqlite3 import subprocess import tempfile +import time -from python_pkg.screen_locker._constants import ADB_TIMEOUT, STRONGLIFTS_DB_REMOTE +from python_pkg.screen_locker._constants import ( + ADB_TIMEOUT, + MIN_WORKOUT_DURATION_MINUTES, + STRONGLIFTS_DB_REMOTE, +) +from python_pkg.screen_locker._time_check import check_clock_skew _logger = logging.getLogger(__name__) @@ -179,25 +185,160 @@ class PhoneVerificationMixin: _logger.warning("Failed to query StrongLifts database") return 0 + def _get_today_workout_duration_minutes(self, db_path: Path) -> float: + """Get the total duration in minutes of today's workouts. + + Args: + db_path: Path to the locally-pulled StrongLifts database. + + Returns: + Total duration in minutes of all workouts started today. + Returns 0.0 on any error or if no workouts found. + """ + try: + conn = sqlite3.connect(str(db_path)) + try: + cursor = conn.execute( + "SELECT SUM((finish - start) / 1000.0 / 60.0) " + "FROM workouts " + "WHERE date(start / 1000, 'unixepoch', 'localtime') " + "= date('now', 'localtime') " + "AND finish > start", + ) + row = cursor.fetchone() + return float(row[0]) if row and row[0] is not None else 0.0 + finally: + conn.close() + except (sqlite3.Error, ValueError, TypeError): + _logger.warning("Failed to query workout duration") + return 0.0 + + def _get_today_exercise_count(self, db_path: Path) -> int: + """Count distinct exercises in today's workouts. + + Uses the StrongLifts 'exercises' table joined with 'workouts' to + verify that actual exercises were logged, not just empty sessions. + + Args: + db_path: Path to the locally-pulled StrongLifts database. + + Returns: + Number of distinct exercises in today's workouts. + Returns 0 on any error. + """ + try: + conn = sqlite3.connect(str(db_path)) + try: + cursor = conn.execute( + "SELECT COUNT(DISTINCT e.exercise) " + "FROM exercises e " + "JOIN workouts w ON e.workout = w.id " + "WHERE date(w.start / 1000, 'unixepoch', 'localtime') " + "= date('now', 'localtime')", + ) + row = cursor.fetchone() + return int(row[0]) if row else 0 + finally: + conn.close() + except (sqlite3.Error, ValueError, TypeError): + _logger.warning("Failed to query exercise count") + return 0 + + def _is_workout_finish_recent(self, db_path: Path) -> bool: + """Check if the latest workout's finish time is recent. + + A fresh workout should have finished within the last few hours. + This prevents using an old pre-prepared database dump. + + Args: + db_path: Path to the locally-pulled StrongLifts database. + + Returns: + True if the latest finish time is within 4 hours of now. + """ + max_age_seconds = 4 * 3600 + try: + conn = sqlite3.connect(str(db_path)) + try: + cursor = conn.execute( + "SELECT MAX(finish) FROM workouts " + "WHERE date(start / 1000, 'unixepoch', 'localtime') " + "= date('now', 'localtime') " + "AND finish > start", + ) + row = cursor.fetchone() + if not row or row[0] is None: + return False + finish_epoch = int(row[0]) / 1000.0 + return (time.time() - finish_epoch) < max_age_seconds + finally: + conn.close() + except (sqlite3.Error, ValueError, TypeError): + _logger.warning("Failed to query workout finish time") + return False + + def _validate_workout_db( + self, + local_db: Path, + ) -> tuple[str, str] | None: + """Validate workout database has a recent, real workout. + + Returns: + A (status, message) tuple if validation fails, or None if OK. + """ + count = self._count_today_workouts(local_db) + if count <= 0: + return "not_verified", "No workout found on phone today" + if not self._is_workout_finish_recent(local_db): + return ( + "stale", + "Workout finish time is too old. Did you actually work out today?", + ) + exercise_count = self._get_today_exercise_count(local_db) + if exercise_count < 1: + return ( + "no_exercises", + "No exercises found in today's workout. " + "Log actual exercises in StrongLifts!", + ) + return None + def _verify_phone_workout(self) -> tuple[str, str]: """Verify workout was recorded in StrongLifts on the phone. Returns: Tuple of (status, message) where status is one of: - - "verified": Workout confirmed on phone. + - "verified": Workout confirmed and >= minimum duration. + - "too_short": Workout found but shorter than minimum. - "not_verified": Phone connected but no workout found. - "no_phone": No phone connected via ADB. - "error": Could not access StrongLifts database. + - "stale": Workout finish time is not recent. + - "no_exercises": Workout has no logged exercises. + - "clock_tampered": System clock skew exceeds threshold. """ + clock_ok, clock_msg = check_clock_skew() + if not clock_ok: + return "clock_tampered", clock_msg if not self._is_phone_connected(): return "no_phone", "No phone connected via ADB" local_db = self._pull_stronglifts_db() if local_db is None: return "error", "StrongLifts database not found on phone" - count = self._count_today_workouts(local_db) - if count > 0: + db_error = self._validate_workout_db(local_db) + if db_error is not None: + return db_error + duration = self._get_today_workout_duration_minutes(local_db) + if duration < MIN_WORKOUT_DURATION_MINUTES: return ( - "verified", - f"Workout verified! ({count} session(s) found on phone)", + "too_short", + f"Workout too short! {duration:.0f} min logged, " + f"need at least {MIN_WORKOUT_DURATION_MINUTES} min.", ) - return "not_verified", "No workout found on phone today" + exercise_count = self._get_today_exercise_count(local_db) + return ( + "verified", + f"Workout verified! ({self._count_today_workouts(local_db)}" + f" session(s), {duration:.0f} min, " + f"{exercise_count} exercise(s))", + ) diff --git a/python_pkg/screen_locker/_time_check.py b/python_pkg/screen_locker/_time_check.py new file mode 100644 index 0000000..f1d0073 --- /dev/null +++ b/python_pkg/screen_locker/_time_check.py @@ -0,0 +1,79 @@ +"""System clock skew detection via NTP.""" + +from __future__ import annotations + +import logging +import socket +import struct +import time + +from python_pkg.screen_locker._constants import MAX_CLOCK_SKEW_SECONDS + +_logger = logging.getLogger(__name__) + +_NTP_EPOCH_OFFSET = 2208988800 # Seconds between 1900-01-01 and 1970-01-01 +_NTP_PORT = 123 +_NTP_TIMEOUT = 5 +_NTP_MIN_PACKET_SIZE = 48 + + +def _query_ntp_offset(server: str = "pool.ntp.org") -> float | None: + """Query an NTP server and return the clock offset in seconds. + + Uses a minimal SNTP (RFC 4330) client-mode request. + + Returns: + Offset in seconds (positive = local clock is ahead), or None on error. + """ + # NTP v3, mode 3 (client), transmit timestamp at bytes 40-47 + packet = b"\x1b" + b"\0" * 47 + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.settimeout(_NTP_TIMEOUT) + t1 = time.time() + sock.sendto(packet, (server, _NTP_PORT)) + data, _ = sock.recvfrom(1024) + t4 = time.time() + except OSError as exc: + _logger.info("NTP query to %s failed: %s", server, exc) + return None + + if len(data) < _NTP_MIN_PACKET_SIZE: + return None + + # Transmit timestamp from server (bytes 40-47) + tx_seconds = struct.unpack("!I", data[40:44])[0] - _NTP_EPOCH_OFFSET + tx_fraction = struct.unpack("!I", data[44:48])[0] / (2**32) + server_time = tx_seconds + tx_fraction + + # Simplified offset: server_time should be close to (t1 + t4) / 2 + local_mid = (t1 + t4) / 2 + return server_time - local_mid + + +def check_clock_skew() -> tuple[bool, str]: + """Check if system clock is within acceptable skew of NTP time. + + Returns: + Tuple of (ok, message). + ok is True if clock is within MAX_CLOCK_SKEW_SECONDS or NTP is unreachable. + When NTP is unreachable, we allow through (fail-open for network issues). + """ + offset = _query_ntp_offset() + if offset is None: + _logger.info("NTP unreachable — allowing through") + return True, "NTP check skipped (server unreachable)" + + abs_offset = abs(offset) + if abs_offset > MAX_CLOCK_SKEW_SECONDS: + direction = "ahead" if offset < 0 else "behind" + _logger.warning( + "Clock skew detected: %.0f seconds %s", + abs_offset, + direction, + ) + return False, ( + f"System clock is {abs_offset:.0f}s {direction} of NTP time. " + f"Max allowed skew: {MAX_CLOCK_SKEW_SECONDS}s." + ) + return True, f"Clock OK (offset: {offset:+.1f}s)" diff --git a/python_pkg/screen_locker/_ui_flows.py b/python_pkg/screen_locker/_ui_flows.py index 96e93d6..489e44b 100644 --- a/python_pkg/screen_locker/_ui_flows.py +++ b/python_pkg/screen_locker/_ui_flows.py @@ -70,6 +70,25 @@ class UIFlowsMixin: self._text("Unlocking...", font_size=18, color="#888888") unlock_delay = 1500 if self.demo_mode else 2000 self.root.after(unlock_delay, self.unlock_screen) + elif status == "too_short": + self._show_retry_and_sick( + f"\u274c {message}\n\n" + "Your workout was too short!\n" + "Actually do the full workout, don't just\n" + "spam through the exercises.", + ) + elif status in ("stale", "no_exercises"): + self._show_retry_and_sick( + f"\u274c {message}\n\n" + "The workout data looks suspicious.\n" + "Make sure you did a real workout today.", + ) + elif status == "clock_tampered": + self._show_retry_and_sick( + f"\u274c {message}\n\n" + "System clock appears to be manipulated.\n" + "Fix your system time and try again.", + ) elif status == "not_verified": self._show_retry_and_sick( f"\u274c {message}\n\n" diff --git a/python_pkg/screen_locker/install_systemd.sh b/python_pkg/screen_locker/install_systemd.sh index 703e3cc..3fd3f83 100755 --- a/python_pkg/screen_locker/install_systemd.sh +++ b/python_pkg/screen_locker/install_systemd.sh @@ -66,6 +66,9 @@ if [ -f "$I3_CONFIG" ] && grep -q "exec.*screen_lock.py" "$I3_CONFIG"; then echo "✓ i3 autostart: INSTALLED" else echo " i3 autostart: not installed" + echo "" + echo "To add i3 startup hook (recommended), add this line to $I3_CONFIG:" + echo " exec --no-startup-id /usr/bin/python3 -m python_pkg.screen_locker.screen_lock --production" fi # Immediately check if today's workout is done; block if not diff --git a/python_pkg/screen_locker/screen_lock.py b/python_pkg/screen_locker/screen_lock.py index 9bbe436..43f5a95 100755 --- a/python_pkg/screen_locker/screen_lock.py +++ b/python_pkg/screen_locker/screen_lock.py @@ -15,27 +15,37 @@ import sys import tkinter as tk from typing import TYPE_CHECKING -if TYPE_CHECKING: - from collections.abc import Callable - from concurrent.futures import Future - from python_pkg.screen_locker._constants import ( + HMAC_KEY_FILE, + MAX_CLOCK_SKEW_SECONDS, + MIN_WORKOUT_DURATION_MINUTES, PHONE_PENALTY_DELAY_DEMO, PHONE_PENALTY_DELAY_PRODUCTION, SICK_LOCKOUT_SECONDS, STRONGLIFTS_DB_REMOTE, ) +from python_pkg.screen_locker._log_integrity import ( + compute_entry_hmac, + verify_entry_hmac, +) +from python_pkg.screen_locker._phone_verification import PhoneVerificationMixin +from python_pkg.screen_locker._shutdown import ShutdownMixin +from python_pkg.screen_locker._ui_flows import UIFlowsMixin + +if TYPE_CHECKING: + from collections.abc import Callable + from concurrent.futures import Future __all__ = [ + "HMAC_KEY_FILE", + "MAX_CLOCK_SKEW_SECONDS", + "MIN_WORKOUT_DURATION_MINUTES", "PHONE_PENALTY_DELAY_DEMO", "PHONE_PENALTY_DELAY_PRODUCTION", "SICK_LOCKOUT_SECONDS", "STRONGLIFTS_DB_REMOTE", "ScreenLocker", ] -from python_pkg.screen_locker._phone_verification import PhoneVerificationMixin -from python_pkg.screen_locker._shutdown import ShutdownMixin -from python_pkg.screen_locker._ui_flows import UIFlowsMixin _logger = logging.getLogger(__name__) @@ -252,7 +262,7 @@ class ScreenLocker( self.root.after(1500, self.close) def has_logged_today(self) -> bool: - """Check if workout has been logged today.""" + """Check if workout has been logged today with valid HMAC.""" if not self.log_file.exists(): return False @@ -263,7 +273,13 @@ class ScreenLocker( return False else: today = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d") - return today in logs + entry = logs.get(today) + if entry is None: + return False + if not verify_entry_hmac(entry): + _logger.warning("HMAC verification failed for today's log entry") + return False + return True def _load_existing_logs(self) -> dict: """Load existing workout logs from file.""" @@ -276,13 +292,19 @@ class ScreenLocker( return {} def save_workout_log(self) -> None: - """Save workout data to log file.""" + """Save workout data to log file with HMAC signature.""" logs = self._load_existing_logs() today = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d") - logs[today] = { + entry: dict[str, object] = { "timestamp": datetime.now(tz=timezone.utc).isoformat(), "workout_data": self.workout_data, } + signature = compute_entry_hmac(entry) + if signature is not None: + entry["hmac"] = signature + else: + _logger.warning("HMAC key unavailable — saving unsigned entry") + logs[today] = entry try: with self.log_file.open("w") as f: json.dump(logs, f, indent=2) diff --git a/python_pkg/screen_locker/tests/test_adb_and_phone.py b/python_pkg/screen_locker/tests/test_adb_and_phone.py index d0ec67c..7b79b5d 100644 --- a/python_pkg/screen_locker/tests/test_adb_and_phone.py +++ b/python_pkg/screen_locker/tests/test_adb_and_phone.py @@ -8,6 +8,8 @@ import time from typing import TYPE_CHECKING from unittest.mock import MagicMock, patch +import pytest + from python_pkg.screen_locker.screen_lock import STRONGLIFTS_DB_REMOTE from python_pkg.screen_locker.tests.conftest import create_locker @@ -473,3 +475,329 @@ class TestCountTodayWorkouts: conn.close() assert locker._count_today_workouts(db_file) == 2 + + +class TestGetTodayWorkoutDurationMinutes: + """Tests for _get_today_workout_duration_minutes method.""" + + def test_returns_duration_for_today_workout( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test returns correct duration for a 60-minute workout.""" + locker = create_locker(mock_tk, tmp_path) + db_file = tmp_path / "sl_test.db" + conn = sqlite3.connect(str(db_file)) + conn.execute( + "CREATE TABLE workouts " + "(id TEXT PRIMARY KEY, start INTEGER, finish INTEGER)", + ) + now_ms = int(time.time() * 1000) + duration_ms = 60 * 60 * 1000 # 60 minutes + conn.execute( + "INSERT INTO workouts VALUES (?, ?, ?)", + ("w1", now_ms, now_ms + duration_ms), + ) + conn.commit() + conn.close() + + result = locker._get_today_workout_duration_minutes(db_file) + assert result == pytest.approx(60.0, abs=1.0) + + def test_returns_zero_for_no_workouts( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test returns 0.0 when no workouts today.""" + locker = create_locker(mock_tk, tmp_path) + db_file = tmp_path / "sl_test.db" + conn = sqlite3.connect(str(db_file)) + conn.execute( + "CREATE TABLE workouts " + "(id TEXT PRIMARY KEY, start INTEGER, finish INTEGER)", + ) + yesterday_ms = int((time.time() - 200000) * 1000) + conn.execute( + "INSERT INTO workouts VALUES (?, ?, ?)", + ("w1", yesterday_ms, yesterday_ms + 3600000), + ) + conn.commit() + conn.close() + + assert locker._get_today_workout_duration_minutes(db_file) == 0.0 + + def test_sums_multiple_workouts( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test sums durations of multiple workouts today.""" + locker = create_locker(mock_tk, tmp_path) + db_file = tmp_path / "sl_test.db" + conn = sqlite3.connect(str(db_file)) + conn.execute( + "CREATE TABLE workouts " + "(id TEXT PRIMARY KEY, start INTEGER, finish INTEGER)", + ) + now_ms = int(time.time() * 1000) + # 30 min + 25 min = 55 min total + conn.execute( + "INSERT INTO workouts VALUES (?, ?, ?)", + ("w1", now_ms, now_ms + 30 * 60 * 1000), + ) + conn.execute( + "INSERT INTO workouts VALUES (?, ?, ?)", + ("w2", now_ms + 31 * 60 * 1000, now_ms + 56 * 60 * 1000), + ) + conn.commit() + conn.close() + + result = locker._get_today_workout_duration_minutes(db_file) + assert result == pytest.approx(55.0, abs=1.0) + + def test_ignores_invalid_finish( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test ignores workouts where finish <= start.""" + locker = create_locker(mock_tk, tmp_path) + db_file = tmp_path / "sl_test.db" + conn = sqlite3.connect(str(db_file)) + conn.execute( + "CREATE TABLE workouts " + "(id TEXT PRIMARY KEY, start INTEGER, finish INTEGER)", + ) + now_ms = int(time.time() * 1000) + # finish == start (zero duration - should be excluded by WHERE) + conn.execute( + "INSERT INTO workouts VALUES (?, ?, ?)", + ("w1", now_ms, now_ms), + ) + conn.commit() + conn.close() + + assert locker._get_today_workout_duration_minutes(db_file) == 0.0 + + def test_invalid_db_returns_zero( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test returns 0.0 for invalid database file.""" + locker = create_locker(mock_tk, tmp_path) + bad_file = tmp_path / "not_a_db.db" + bad_file.write_text("not a database") + + assert locker._get_today_workout_duration_minutes(bad_file) == 0.0 + + def test_missing_table_returns_zero( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test returns 0.0 when workouts table doesn't exist.""" + locker = create_locker(mock_tk, tmp_path) + db_file = tmp_path / "empty.db" + conn = sqlite3.connect(str(db_file)) + conn.execute("CREATE TABLE other (id TEXT)") + conn.commit() + conn.close() + + assert locker._get_today_workout_duration_minutes(db_file) == 0.0 + + +class TestGetTodayExerciseCount: + """Tests for _get_today_exercise_count method.""" + + def test_counts_exercises( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test counts distinct exercises in today's workouts.""" + locker = create_locker(mock_tk, tmp_path) + db_file = tmp_path / "sl_test.db" + conn = sqlite3.connect(str(db_file)) + conn.execute( + "CREATE TABLE workouts " + "(id TEXT PRIMARY KEY, start INTEGER, finish INTEGER)", + ) + conn.execute( + "CREATE TABLE exercises (id TEXT, workout TEXT, exercise TEXT)", + ) + now_ms = int(time.time() * 1000) + conn.execute( + "INSERT INTO workouts VALUES (?, ?, ?)", + ("w1", now_ms, now_ms + 3600000), + ) + conn.execute( + "INSERT INTO exercises VALUES (?, ?, ?)", + ("e1", "w1", "squat"), + ) + conn.execute( + "INSERT INTO exercises VALUES (?, ?, ?)", + ("e2", "w1", "bench_press"), + ) + conn.execute( + "INSERT INTO exercises VALUES (?, ?, ?)", + ("e3", "w1", "squat"), + ) + conn.commit() + conn.close() + + assert locker._get_today_exercise_count(db_file) == 2 + + def test_no_exercises_returns_zero( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test returns 0 when no exercises exist.""" + locker = create_locker(mock_tk, tmp_path) + db_file = tmp_path / "sl_test.db" + conn = sqlite3.connect(str(db_file)) + conn.execute( + "CREATE TABLE workouts " + "(id TEXT PRIMARY KEY, start INTEGER, finish INTEGER)", + ) + conn.execute( + "CREATE TABLE exercises (id TEXT, workout TEXT, exercise TEXT)", + ) + now_ms = int(time.time() * 1000) + conn.execute( + "INSERT INTO workouts VALUES (?, ?, ?)", + ("w1", now_ms, now_ms + 3600000), + ) + conn.commit() + conn.close() + + assert locker._get_today_exercise_count(db_file) == 0 + + def test_invalid_db_returns_zero( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test returns 0 for invalid database file.""" + locker = create_locker(mock_tk, tmp_path) + bad_file = tmp_path / "bad.db" + bad_file.write_text("not a db") + + assert locker._get_today_exercise_count(bad_file) == 0 + + def test_missing_table_returns_zero_exercises( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test returns 0 when exercises table doesn't exist.""" + locker = create_locker(mock_tk, tmp_path) + db_file = tmp_path / "empty.db" + conn = sqlite3.connect(str(db_file)) + conn.execute( + "CREATE TABLE workouts " + "(id TEXT PRIMARY KEY, start INTEGER, finish INTEGER)", + ) + conn.commit() + conn.close() + + assert locker._get_today_exercise_count(db_file) == 0 + + +class TestIsWorkoutFinishRecent: + """Tests for _is_workout_finish_recent method.""" + + def test_recent_workout_returns_true( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test returns True for workout that finished recently.""" + locker = create_locker(mock_tk, tmp_path) + db_file = tmp_path / "sl_test.db" + conn = sqlite3.connect(str(db_file)) + conn.execute( + "CREATE TABLE workouts " + "(id TEXT PRIMARY KEY, start INTEGER, finish INTEGER)", + ) + now_ms = int(time.time() * 1000) + conn.execute( + "INSERT INTO workouts VALUES (?, ?, ?)", + ("w1", now_ms - 3600000, now_ms), + ) + conn.commit() + conn.close() + + assert locker._is_workout_finish_recent(db_file) is True + + def test_old_workout_returns_false( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test returns False for workout that finished >4 hours ago.""" + locker = create_locker(mock_tk, tmp_path) + db_file = tmp_path / "sl_test.db" + conn = sqlite3.connect(str(db_file)) + conn.execute( + "CREATE TABLE workouts " + "(id TEXT PRIMARY KEY, start INTEGER, finish INTEGER)", + ) + # Finished 5 hours ago (but still "today" in local time) + now_ms = int(time.time() * 1000) + old_finish = now_ms - 5 * 3600 * 1000 + conn.execute( + "INSERT INTO workouts VALUES (?, ?, ?)", + ("w1", old_finish - 3600000, old_finish), + ) + conn.commit() + conn.close() + + assert locker._is_workout_finish_recent(db_file) is False + + def test_no_workouts_returns_false( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test returns False when no workouts exist.""" + locker = create_locker(mock_tk, tmp_path) + db_file = tmp_path / "sl_test.db" + conn = sqlite3.connect(str(db_file)) + conn.execute( + "CREATE TABLE workouts " + "(id TEXT PRIMARY KEY, start INTEGER, finish INTEGER)", + ) + conn.commit() + conn.close() + + assert locker._is_workout_finish_recent(db_file) is False + + def test_invalid_db_returns_false( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test returns False for invalid database file.""" + locker = create_locker(mock_tk, tmp_path) + bad_file = tmp_path / "bad.db" + bad_file.write_text("not a db") + + assert locker._is_workout_finish_recent(bad_file) is False diff --git a/python_pkg/screen_locker/tests/test_init_and_log.py b/python_pkg/screen_locker/tests/test_init_and_log.py index 58d02ac..a781e69 100644 --- a/python_pkg/screen_locker/tests/test_init_and_log.py +++ b/python_pkg/screen_locker/tests/test_init_and_log.py @@ -6,7 +6,7 @@ from datetime import datetime, timezone import json import tkinter as tk from typing import TYPE_CHECKING, Any -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest @@ -101,14 +101,41 @@ class TestHasLoggedToday: mock_sys_exit: MagicMock, tmp_path: Path, ) -> None: - """Test when today's workout is logged.""" + """Test when today's workout is logged with valid HMAC.""" log_file = tmp_path / "workout_log.json" today = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d") - log_file.write_text(json.dumps({today: {"workout": "data"}})) + log_file.write_text( + json.dumps({today: {"workout": "data", "hmac": "valid"}}), + ) locker = create_locker(mock_tk, tmp_path) locker.log_file = log_file - assert locker.has_logged_today() is True + with patch( + "python_pkg.screen_locker.screen_lock.verify_entry_hmac", + return_value=True, + ): + assert locker.has_logged_today() is True + + def test_today_logged_invalid_hmac( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test rejects entry when HMAC verification fails.""" + log_file = tmp_path / "workout_log.json" + today = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d") + log_file.write_text( + json.dumps({today: {"workout": "data", "hmac": "tampered"}}), + ) + + locker = create_locker(mock_tk, tmp_path) + locker.log_file = log_file + with patch( + "python_pkg.screen_locker.screen_lock.verify_entry_hmac", + return_value=False, + ): + assert locker.has_logged_today() is False def test_other_day_logged( self, @@ -134,12 +161,16 @@ class TestSaveWorkoutLog: mock_sys_exit: MagicMock, tmp_path: Path, ) -> None: - """Test saving to a new log file.""" + """Test saving to a new log file includes HMAC.""" log_file = tmp_path / "workout_log.json" locker = create_locker(mock_tk, tmp_path) locker.log_file = log_file locker.workout_data = {"type": "running"} - locker.save_workout_log() + with patch( + "python_pkg.screen_locker.screen_lock.compute_entry_hmac", + return_value="abc123", + ): + locker.save_workout_log() assert log_file.exists() with log_file.open() as f: @@ -147,6 +178,29 @@ class TestSaveWorkoutLog: today = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d") assert today in data assert data[today]["workout_data"]["type"] == "running" + assert data[today]["hmac"] == "abc123" + + def test_save_to_new_file_no_hmac_key( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test saving without HMAC key produces unsigned entry.""" + log_file = tmp_path / "workout_log.json" + locker = create_locker(mock_tk, tmp_path) + locker.log_file = log_file + locker.workout_data = {"type": "running"} + with patch( + "python_pkg.screen_locker.screen_lock.compute_entry_hmac", + return_value=None, + ): + locker.save_workout_log() + + with log_file.open() as f: + data: dict[str, Any] = json.load(f) + today = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d") + assert "hmac" not in data[today] def test_save_to_existing_file( self, @@ -161,7 +215,11 @@ class TestSaveWorkoutLog: locker = create_locker(mock_tk, tmp_path) locker.log_file = log_file locker.workout_data = {"type": "strength"} - locker.save_workout_log() + with patch( + "python_pkg.screen_locker.screen_lock.compute_entry_hmac", + return_value="sig", + ): + locker.save_workout_log() with log_file.open() as f: data: dict[str, Any] = json.load(f) @@ -182,7 +240,11 @@ class TestSaveWorkoutLog: locker = create_locker(mock_tk, tmp_path) locker.log_file = log_file locker.workout_data = {"type": "running"} - locker.save_workout_log() + with patch( + "python_pkg.screen_locker.screen_lock.compute_entry_hmac", + return_value="sig", + ): + locker.save_workout_log() with log_file.open() as f: data: dict[str, Any] = json.load(f) @@ -201,8 +263,12 @@ class TestSaveWorkoutLog: locker = create_locker(mock_tk, tmp_path) locker.log_file = log_file locker.workout_data = {"type": "running"} - # Should not raise, just log warning - locker.save_workout_log() + with patch( + "python_pkg.screen_locker.screen_lock.compute_entry_hmac", + return_value="sig", + ): + # Should not raise, just log warning + locker.save_workout_log() class TestRun: diff --git a/python_pkg/screen_locker/tests/test_log_integrity.py b/python_pkg/screen_locker/tests/test_log_integrity.py new file mode 100644 index 0000000..e3fb191 --- /dev/null +++ b/python_pkg/screen_locker/tests/test_log_integrity.py @@ -0,0 +1,152 @@ +"""Tests for _log_integrity HMAC signing and verification.""" + +from __future__ import annotations + +import hashlib +import hmac +import json +from typing import TYPE_CHECKING +from unittest.mock import patch + +from python_pkg.screen_locker._log_integrity import ( + _generate_hmac_key, + _load_hmac_key, + compute_entry_hmac, + verify_entry_hmac, +) + +if TYPE_CHECKING: + from pathlib import Path + + +class TestLoadHmacKey: + """Tests for _load_hmac_key.""" + + def test_loads_key_from_file(self, tmp_path: Path) -> None: + """Test loading HMAC key from existing file.""" + key_file = tmp_path / "hmac.key" + key_file.write_bytes(b"secret_key_bytes") + with patch( + "python_pkg.screen_locker._log_integrity.HMAC_KEY_FILE", + key_file, + ): + result = _load_hmac_key() + assert result == b"secret_key_bytes" + + def test_returns_none_on_missing_file(self, tmp_path: Path) -> None: + """Test returns None when key file doesn't exist.""" + key_file = tmp_path / "nonexistent.key" + with patch( + "python_pkg.screen_locker._log_integrity.HMAC_KEY_FILE", + key_file, + ): + result = _load_hmac_key() + assert result is None + + +class TestGenerateHmacKey: + """Tests for _generate_hmac_key.""" + + def test_generates_and_writes_key(self, tmp_path: Path) -> None: + """Test key generation creates file with 32-byte key.""" + key_file = tmp_path / "subdir" / "hmac.key" + with patch( + "python_pkg.screen_locker._log_integrity.HMAC_KEY_FILE", + key_file, + ): + result = _generate_hmac_key() + assert result is not None + assert len(result) == 32 + assert key_file.read_bytes() == result + + def test_returns_none_on_write_failure(self) -> None: + """Test returns None when file cannot be written.""" + with patch( + "python_pkg.screen_locker._log_integrity.HMAC_KEY_FILE", + ) as mock_path: + mock_path.parent.mkdir.side_effect = OSError("permission denied") + result = _generate_hmac_key() + assert result is None + + +class TestComputeEntryHmac: + """Tests for compute_entry_hmac.""" + + def test_computes_hmac_for_entry(self, tmp_path: Path) -> None: + """Test HMAC computation produces valid hex string.""" + key_file = tmp_path / "hmac.key" + key = b"test_key_12345" + key_file.write_bytes(key) + entry = {"timestamp": "2025-01-01T00:00:00", "workout_data": {"type": "test"}} + with patch( + "python_pkg.screen_locker._log_integrity.HMAC_KEY_FILE", + key_file, + ): + result = compute_entry_hmac(entry) + assert result is not None + # Verify manually + payload = json.dumps(entry, sort_keys=True, separators=(",", ":")) + expected = hmac.new(key, payload.encode(), hashlib.sha256).hexdigest() + assert result == expected + + def test_returns_none_when_no_key(self, tmp_path: Path) -> None: + """Test returns None when key file is missing.""" + key_file = tmp_path / "nonexistent.key" + with patch( + "python_pkg.screen_locker._log_integrity.HMAC_KEY_FILE", + key_file, + ): + result = compute_entry_hmac({"data": "test"}) + assert result is None + + +class TestVerifyEntryHmac: + """Tests for verify_entry_hmac.""" + + def test_valid_hmac(self, tmp_path: Path) -> None: + """Test verification passes with correct HMAC.""" + key_file = tmp_path / "hmac.key" + key = b"verification_key" + key_file.write_bytes(key) + entry_data = {"timestamp": "2025-01-01", "workout_data": {"type": "test"}} + payload = json.dumps(entry_data, sort_keys=True, separators=(",", ":")) + correct_hmac = hmac.new(key, payload.encode(), hashlib.sha256).hexdigest() + entry = {**entry_data, "hmac": correct_hmac} + + with patch( + "python_pkg.screen_locker._log_integrity.HMAC_KEY_FILE", + key_file, + ): + assert verify_entry_hmac(entry) is True + + def test_invalid_hmac(self, tmp_path: Path) -> None: + """Test verification fails with wrong HMAC.""" + key_file = tmp_path / "hmac.key" + key_file.write_bytes(b"verification_key") + entry = {"timestamp": "2025-01-01", "hmac": "wrong_hmac_value"} + + with patch( + "python_pkg.screen_locker._log_integrity.HMAC_KEY_FILE", + key_file, + ): + assert verify_entry_hmac(entry) is False + + def test_missing_hmac_field(self) -> None: + """Test verification fails when entry has no hmac field.""" + entry: dict[str, object] = {"timestamp": "2025-01-01"} + assert verify_entry_hmac(entry) is False + + def test_non_string_hmac_field(self) -> None: + """Test verification fails when hmac field is not a string.""" + entry: dict[str, object] = {"timestamp": "2025-01-01", "hmac": 12345} + assert verify_entry_hmac(entry) is False + + def test_missing_key_file(self, tmp_path: Path) -> None: + """Test verification fails when key file doesn't exist.""" + key_file = tmp_path / "nonexistent.key" + entry = {"timestamp": "2025-01-01", "hmac": "some_hmac"} + with patch( + "python_pkg.screen_locker._log_integrity.HMAC_KEY_FILE", + key_file, + ): + assert verify_entry_hmac(entry) is False diff --git a/python_pkg/screen_locker/tests/test_phone_check_unlock.py b/python_pkg/screen_locker/tests/test_phone_check_unlock.py index 08582ef..7ac871c 100644 --- a/python_pkg/screen_locker/tests/test_phone_check_unlock.py +++ b/python_pkg/screen_locker/tests/test_phone_check_unlock.py @@ -3,7 +3,7 @@ from __future__ import annotations from typing import TYPE_CHECKING -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch from python_pkg.screen_locker.screen_lock import ( PHONE_PENALTY_DELAY_DEMO, @@ -24,34 +24,49 @@ class TestVerifyPhoneWorkout: mock_sys_exit: MagicMock, tmp_path: Path, ) -> None: - """Test workout verified on phone.""" + """Test workout verified on phone with sufficient duration.""" locker = create_locker(mock_tk, tmp_path) object.__setattr__( locker, "_is_phone_connected", - MagicMock( - return_value=True, - ), + MagicMock(return_value=True), ) object.__setattr__( locker, "_pull_stronglifts_db", - MagicMock( - return_value=tmp_path / "sl.db", - ), + MagicMock(return_value=tmp_path / "sl.db"), ) object.__setattr__( locker, "_count_today_workouts", - MagicMock( - return_value=2, - ), + MagicMock(return_value=2), + ) + object.__setattr__( + locker, + "_is_workout_finish_recent", + MagicMock(return_value=True), + ) + object.__setattr__( + locker, + "_get_today_exercise_count", + MagicMock(return_value=3), + ) + object.__setattr__( + locker, + "_get_today_workout_duration_minutes", + MagicMock(return_value=65.0), ) - status, message = locker._verify_phone_workout() + with patch( + "python_pkg.screen_locker._phone_verification.check_clock_skew", + return_value=(True, "Clock OK"), + ): + status, message = locker._verify_phone_workout() assert status == "verified" assert "2 session" in message + assert "65 min" in message + assert "3 exercise" in message def test_not_verified( self, @@ -64,30 +79,77 @@ class TestVerifyPhoneWorkout: object.__setattr__( locker, "_is_phone_connected", - MagicMock( - return_value=True, - ), + MagicMock(return_value=True), ) object.__setattr__( locker, "_pull_stronglifts_db", - MagicMock( - return_value=tmp_path / "sl.db", - ), + MagicMock(return_value=tmp_path / "sl.db"), ) object.__setattr__( locker, "_count_today_workouts", - MagicMock( - return_value=0, - ), + MagicMock(return_value=0), ) - status, message = locker._verify_phone_workout() + with patch( + "python_pkg.screen_locker._phone_verification.check_clock_skew", + return_value=(True, "Clock OK"), + ): + status, message = locker._verify_phone_workout() assert status == "not_verified" assert "No workout" in message + def test_too_short( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test workout found but too short.""" + locker = create_locker(mock_tk, tmp_path) + object.__setattr__( + locker, + "_is_phone_connected", + MagicMock(return_value=True), + ) + object.__setattr__( + locker, + "_pull_stronglifts_db", + MagicMock(return_value=tmp_path / "sl.db"), + ) + object.__setattr__( + locker, + "_count_today_workouts", + MagicMock(return_value=1), + ) + object.__setattr__( + locker, + "_is_workout_finish_recent", + MagicMock(return_value=True), + ) + object.__setattr__( + locker, + "_get_today_exercise_count", + MagicMock(return_value=3), + ) + object.__setattr__( + locker, + "_get_today_workout_duration_minutes", + MagicMock(return_value=25.0), + ) + + with patch( + "python_pkg.screen_locker._phone_verification.check_clock_skew", + return_value=(True, "Clock OK"), + ): + status, message = locker._verify_phone_workout() + + assert status == "too_short" + assert "25 min" in message + assert "50 min" in message + def test_no_phone( self, mock_tk: MagicMock, @@ -99,12 +161,14 @@ class TestVerifyPhoneWorkout: object.__setattr__( locker, "_is_phone_connected", - MagicMock( - return_value=False, - ), + MagicMock(return_value=False), ) - status, _ = locker._verify_phone_workout() + with patch( + "python_pkg.screen_locker._phone_verification.check_clock_skew", + return_value=(True, "Clock OK"), + ): + status, _ = locker._verify_phone_workout() assert status == "no_phone" @@ -119,23 +183,122 @@ class TestVerifyPhoneWorkout: object.__setattr__( locker, "_is_phone_connected", - MagicMock( - return_value=True, - ), + MagicMock(return_value=True), ) object.__setattr__( locker, "_pull_stronglifts_db", - MagicMock( - return_value=None, - ), + MagicMock(return_value=None), ) - status, message = locker._verify_phone_workout() + with patch( + "python_pkg.screen_locker._phone_verification.check_clock_skew", + return_value=(True, "Clock OK"), + ): + status, message = locker._verify_phone_workout() assert status == "error" assert "database" in message.lower() + def test_clock_tampered( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test clock_tampered when NTP check fails.""" + locker = create_locker(mock_tk, tmp_path) + + with patch( + "python_pkg.screen_locker._phone_verification.check_clock_skew", + return_value=(False, "System clock is 600s ahead"), + ): + status, message = locker._verify_phone_workout() + + assert status == "clock_tampered" + assert "600s" in message + + def test_stale_workout( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test stale status when workout finish is not recent.""" + locker = create_locker(mock_tk, tmp_path) + object.__setattr__( + locker, + "_is_phone_connected", + MagicMock(return_value=True), + ) + object.__setattr__( + locker, + "_pull_stronglifts_db", + MagicMock(return_value=tmp_path / "sl.db"), + ) + object.__setattr__( + locker, + "_count_today_workouts", + MagicMock(return_value=1), + ) + object.__setattr__( + locker, + "_is_workout_finish_recent", + MagicMock(return_value=False), + ) + + with patch( + "python_pkg.screen_locker._phone_verification.check_clock_skew", + return_value=(True, "Clock OK"), + ): + status, message = locker._verify_phone_workout() + + assert status == "stale" + assert "old" in message.lower() + + def test_no_exercises( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test no_exercises when workout has no exercise data.""" + locker = create_locker(mock_tk, tmp_path) + object.__setattr__( + locker, + "_is_phone_connected", + MagicMock(return_value=True), + ) + object.__setattr__( + locker, + "_pull_stronglifts_db", + MagicMock(return_value=tmp_path / "sl.db"), + ) + object.__setattr__( + locker, + "_count_today_workouts", + MagicMock(return_value=1), + ) + object.__setattr__( + locker, + "_is_workout_finish_recent", + MagicMock(return_value=True), + ) + object.__setattr__( + locker, + "_get_today_exercise_count", + MagicMock(return_value=0), + ) + + with patch( + "python_pkg.screen_locker._phone_verification.check_clock_skew", + return_value=(True, "Clock OK"), + ): + status, message = locker._verify_phone_workout() + + assert status == "no_exercises" + assert "exercise" in message.lower() + class TestStartPhoneCheck: """Tests for _start_phone_check and _handle_startup_phone_result.""" @@ -197,6 +360,71 @@ class TestStartPhoneCheck: locker._show_retry_and_sick.assert_called_once() + def test_handle_startup_too_short_shows_retry_and_sick( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test too_short result shows retry and sick buttons.""" + locker = create_locker(mock_tk, tmp_path) + object.__setattr__(locker, "_show_retry_and_sick", MagicMock()) + locker._handle_startup_phone_result( + "too_short", "Workout too short! 25 min logged, need at least 50 min." + ) + + locker._show_retry_and_sick.assert_called_once() + call_args = locker._show_retry_and_sick.call_args[0][0] + assert "too short" in call_args.lower() + + def test_handle_startup_stale_shows_retry_and_sick( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test stale result shows retry and sick buttons.""" + locker = create_locker(mock_tk, tmp_path) + object.__setattr__(locker, "_show_retry_and_sick", MagicMock()) + locker._handle_startup_phone_result("stale", "Workout too old") + + locker._show_retry_and_sick.assert_called_once() + call_args = locker._show_retry_and_sick.call_args[0][0] + assert "suspicious" in call_args.lower() + + def test_handle_startup_no_exercises_shows_retry_and_sick( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test no_exercises result shows retry and sick buttons.""" + locker = create_locker(mock_tk, tmp_path) + object.__setattr__(locker, "_show_retry_and_sick", MagicMock()) + locker._handle_startup_phone_result("no_exercises", "No exercises found") + + locker._show_retry_and_sick.assert_called_once() + call_args = locker._show_retry_and_sick.call_args[0][0] + assert "suspicious" in call_args.lower() + + def test_handle_startup_clock_tampered_shows_retry_and_sick( + self, + mock_tk: MagicMock, + mock_sys_exit: MagicMock, + tmp_path: Path, + ) -> None: + """Test clock_tampered result shows retry and sick buttons.""" + locker = create_locker(mock_tk, tmp_path) + object.__setattr__(locker, "_show_retry_and_sick", MagicMock()) + locker._handle_startup_phone_result( + "clock_tampered", + "System clock is 600s ahead", + ) + + locker._show_retry_and_sick.assert_called_once() + call_args = locker._show_retry_and_sick.call_args[0][0] + assert "clock" in call_args.lower() + def test_handle_startup_no_phone_shows_penalty( self, mock_tk: MagicMock, diff --git a/python_pkg/screen_locker/tests/test_time_check.py b/python_pkg/screen_locker/tests/test_time_check.py new file mode 100644 index 0000000..b8474bc --- /dev/null +++ b/python_pkg/screen_locker/tests/test_time_check.py @@ -0,0 +1,108 @@ +"""Tests for _time_check NTP clock skew detection.""" + +from __future__ import annotations + +import struct +import time +from unittest.mock import MagicMock, patch + +from python_pkg.screen_locker._time_check import ( + _NTP_EPOCH_OFFSET, + _query_ntp_offset, + check_clock_skew, +) + + +class TestQueryNtpOffset: + """Tests for _query_ntp_offset.""" + + def test_returns_offset_on_success(self) -> None: + """Test returns float offset when NTP server responds.""" + now = time.time() + # Build a fake NTP response with server time close to now + server_ntp = int(now + _NTP_EPOCH_OFFSET) + fraction = 0 + response = b"\x00" * 40 + struct.pack("!II", server_ntp, fraction) + + mock_socket = MagicMock() + mock_socket.__enter__ = MagicMock(return_value=mock_socket) + mock_socket.__exit__ = MagicMock(return_value=False) + mock_socket.recvfrom.return_value = (response, ("pool.ntp.org", 123)) + + with patch("socket.socket", return_value=mock_socket): + offset = _query_ntp_offset() + + assert offset is not None + assert abs(offset) < 5 # Should be very close to zero + + def test_returns_none_on_oserror(self) -> None: + """Test returns None when socket fails.""" + mock_socket = MagicMock() + mock_socket.__enter__ = MagicMock(return_value=mock_socket) + mock_socket.__exit__ = MagicMock(return_value=False) + mock_socket.sendto.side_effect = OSError("network unreachable") + + with patch("socket.socket", return_value=mock_socket): + offset = _query_ntp_offset() + + assert offset is None + + def test_returns_none_on_short_response(self) -> None: + """Test returns None when NTP response is too short.""" + mock_socket = MagicMock() + mock_socket.__enter__ = MagicMock(return_value=mock_socket) + mock_socket.__exit__ = MagicMock(return_value=False) + mock_socket.recvfrom.return_value = (b"\x00" * 10, ("pool.ntp.org", 123)) + + with patch("socket.socket", return_value=mock_socket): + offset = _query_ntp_offset() + + assert offset is None + + +class TestCheckClockSkew: + """Tests for check_clock_skew.""" + + def test_ok_within_threshold(self) -> None: + """Test returns ok when clock offset is small.""" + with patch( + "python_pkg.screen_locker._time_check._query_ntp_offset", + return_value=2.5, + ): + ok, message = check_clock_skew() + + assert ok is True + assert "OK" in message + + def test_fails_when_skew_exceeds_threshold(self) -> None: + """Test returns failure when clock offset exceeds max.""" + with patch( + "python_pkg.screen_locker._time_check._query_ntp_offset", + return_value=600.0, + ): + ok, message = check_clock_skew() + + assert ok is False + assert "600" in message + + def test_ntp_unreachable_passes(self) -> None: + """Test returns ok when NTP server is unreachable (fail-open).""" + with patch( + "python_pkg.screen_locker._time_check._query_ntp_offset", + return_value=None, + ): + ok, message = check_clock_skew() + + assert ok is True + assert "skipped" in message.lower() + + def test_negative_offset_detected(self) -> None: + """Test detects clock ahead with negative offset.""" + with patch( + "python_pkg.screen_locker._time_check._query_ntp_offset", + return_value=-400.0, + ): + ok, message = check_clock_skew() + + assert ok is False + assert "ahead" in message.lower() diff --git a/python_pkg/screen_locker/workout-locker.service b/python_pkg/screen_locker/workout-locker.service index 2d76902..47e5437 100644 --- a/python_pkg/screen_locker/workout-locker.service +++ b/python_pkg/screen_locker/workout-locker.service @@ -7,9 +7,11 @@ Type=simple WorkingDirectory=/home/kuhy/testsAndMisc Environment=DISPLAY=:0 Environment=PYTHONPATH=/home/kuhy/testsAndMisc -ExecStartPre=/bin/sleep 3 +ExecStartPre=/bin/sleep 1 ExecStart=/usr/bin/python3 -m python_pkg.screen_locker.screen_lock --production -Restart=no +Restart=on-failure +RestartSec=2s +RestartPreventExitStatus=0 User=%u [Install] diff --git a/python_pkg/screen_locker/workout-locker.timer b/python_pkg/screen_locker/workout-locker.timer index 7b57a66..4a01809 100644 --- a/python_pkg/screen_locker/workout-locker.timer +++ b/python_pkg/screen_locker/workout-locker.timer @@ -2,7 +2,7 @@ Description=Periodically check if workout was done today [Timer] -OnBootSec=30s +OnBootSec=5s OnUnitActiveSec=15min Persistent=true