diff --git a/wake_alarm/_alarm.py b/wake_alarm/_alarm.py index a305cc6..47834b6 100644 --- a/wake_alarm/_alarm.py +++ b/wake_alarm/_alarm.py @@ -11,30 +11,34 @@ from __future__ import annotations import argparse from datetime import datetime, timezone import logging -import math -import os -from pathlib import Path -import secrets import shutil -import string -import struct import subprocess import sys -import tempfile import threading import time import tkinter as tk -import wave +from python_pkg.wake_alarm._audio import ( + _activate_alarm_audio, + _beep_loud, + _beep_medium, + _beep_soft, + _max_fans, + _play_on_extra_devices, + _restore_alarm_audio, + _restore_fans, + _set_max_brightness, + _warn_if_no_real_sink, +) +from python_pkg.wake_alarm._challenges import ( + _Challenge, + _make_challenge, +) from python_pkg.wake_alarm._constants import ( - ALARM_AUDIO_CARD, - ALARM_AUDIO_PROFILE, - ALARM_AUDIO_SINK, - ALARM_AUDIO_SINK_POLL_SECONDS, - ALARM_AUDIO_SINK_WAIT_SECONDS, ALARM_DAYS, - DISMISS_CODE_LENGTH, DISMISS_CODE_REFRESH_SECONDS, + DISMISS_FLASH_SECONDS, + DISMISS_ROUNDS_REQUIRED, DISMISS_WINDOW_MINUTES, LOUD_TOGGLE_INTERVAL, MEDIUM_BEEP_INTERVAL, @@ -51,11 +55,6 @@ from python_pkg.wake_alarm._state import ( _logger = logging.getLogger(__name__) -def _generate_code() -> str: - """Generate a random numeric dismiss code.""" - return "".join(secrets.choice(string.digits) for _ in range(DISMISS_CODE_LENGTH)) - - def _is_alarm_day() -> bool: """Check if today is an alarm day.""" return datetime.now(tz=timezone.utc).weekday() in ALARM_DAYS @@ -88,347 +87,6 @@ def _restore_display() -> None: ) -def _beep_soft() -> None: - """Play a soft system beep via terminal bell.""" - sys.stdout.write("\a") - sys.stdout.flush() - - -def _speaker_test_path() -> str: - """Resolve absolute path to speaker-test binary.""" - path = shutil.which("speaker-test") - if path is None: - msg = "speaker-test not found on PATH" - raise FileNotFoundError(msg) - return path - - -_TONE_CACHE: dict[int, Path] = {} -_TONE_TIMEOUT_SECONDS: float = 6.0 -_TONE_DURATION_SECONDS: float = 1.5 -_TONE_FRAMERATE: int = 48000 -_TONE_AMPLITUDE: int = 32760 # near s16 max for the loudest sine we can emit - -# Number of back-to-back PC-speaker beeps per _play_tone call. -# pcspkr volume is hardware-fixed, so we lean on repetition + duration to be -# loud enough to actually wake the user. -_PCSPKR_REPEATS: int = 3 -_PCSPKR_GAP_SECONDS: float = 0.12 - -# Motherboard PC speaker exposed by the pcspkr kernel module. -# Writing EV_SND/SND_TONE input_event structs makes it beep — bypasses -# PipeWire/ALSA entirely, so it stays audible even when no real sink exists. -_PCSPKR_DEVICE: str = "/dev/input/by-path/platform-pcspkr-event-spkr" -_PCSPKR_EV_SND: int = 0x12 -_PCSPKR_SND_TONE: int = 0x02 -# struct input_event: timeval (long sec, long usec), u16 type, u16 code, s32 val -_PCSPKR_EVENT_FMT: str = "llHHi" - - -def _beep_pcspkr(frequency: int, duration_seconds: float) -> None: - """Beep the motherboard PC speaker via evdev (audible without any sink). - - Silently no-ops when the device is missing or unwritable so the call is - always safe from the alarm hot path. - """ - try: - # buffering=0 so the write hits the device immediately. - with Path(_PCSPKR_DEVICE).open("wb", buffering=0) as dev: - dev.write( - struct.pack( - _PCSPKR_EVENT_FMT, - 0, - 0, - _PCSPKR_EV_SND, - _PCSPKR_SND_TONE, - int(frequency), - ), - ) - time.sleep(duration_seconds) - dev.write( - struct.pack( - _PCSPKR_EVENT_FMT, - 0, - 0, - _PCSPKR_EV_SND, - _PCSPKR_SND_TONE, - 0, - ), - ) - except OSError: - _logger.warning( - "PC speaker beep at %d Hz failed (device %s)", - frequency, - _PCSPKR_DEVICE, - exc_info=True, - ) - - -def _ensure_tone_wav(frequency: int) -> Path: - """Generate (and cache) a mono 48 kHz sine WAV at *frequency* Hz.""" - cached = _TONE_CACHE.get(frequency) - if cached is not None and cached.exists(): - return cached - path = Path(tempfile.gettempdir()) / f"wake_alarm_tone_{frequency}.wav" - n_frames = int(_TONE_FRAMERATE * _TONE_DURATION_SECONDS) - with wave.open(str(path), "wb") as wav: - wav.setnchannels(1) - wav.setsampwidth(2) - wav.setframerate(_TONE_FRAMERATE) - frames = bytearray() - for i in range(n_frames): - sample = int( - _TONE_AMPLITUDE - * math.sin(2 * math.pi * frequency * i / _TONE_FRAMERATE), - ) - frames.extend(struct.pack(" bool: - """Run *binary* on *wav* with a generous timeout. Return True on success.""" - path = shutil.which(binary) - if path is None: - return False - try: - result = subprocess.run( - [path, str(wav)], - capture_output=True, - timeout=_TONE_TIMEOUT_SECONDS, - check=False, - ) - except (OSError, subprocess.TimeoutExpired): - _logger.warning("%s failed playing %s", binary, wav.name, exc_info=True) - return False - if result.returncode != 0: - _logger.warning( - "%s exited %d for %s: %s", - binary, - result.returncode, - wav.name, - result.stderr.decode(errors="replace").strip()[:200], - ) - return False - return True - - -def _play_tone(frequency: int) -> None: - """Play a sine tone via paplay/aplay/speaker-test, fall back to soft beep. - - Always also beeps the motherboard PC speaker (multiple times) so the - alarm stays loud and audible even when PipeWire only has the auto_null - sink. - """ - for i in range(_PCSPKR_REPEATS): - _beep_pcspkr(frequency, _TONE_DURATION_SECONDS) - if i < _PCSPKR_REPEATS - 1: - time.sleep(_PCSPKR_GAP_SECONDS) - try: - wav = _ensure_tone_wav(frequency) - except OSError: - _logger.warning( - "Could not generate tone WAV at %d Hz; using soft beep", - frequency, - exc_info=True, - ) - _beep_soft() - return - for binary in ("paplay", "aplay"): - if _try_player(binary, wav): - return - try: - subprocess.run( - [ - _speaker_test_path(), - "-t", - "sine", - "-f", - str(frequency), - "-l", - "1", - ], - capture_output=True, - timeout=_TONE_TIMEOUT_SECONDS, - check=False, - ) - except (FileNotFoundError, OSError, subprocess.TimeoutExpired): - _logger.warning( - "All tone players failed at %d Hz; falling back to soft beep", - frequency, - exc_info=True, - ) - _beep_soft() - - -# Extra PipeWire sinks to always play alarm audio on (alongside the default). -# alsa_output...hdmi-stereo = GA102 → G27Q (has built-in speaker, always on). -_EXTRA_PIPEWIRE_SINKS: tuple[str, ...] = ("alsa_output.pci-0000_01_00.1.hdmi-stereo",) - - -def _play_on_extra_devices(frequency: int) -> None: - """Fire-and-forget: play a sine tone on each extra PipeWire sink.""" - try: - path = _speaker_test_path() - except FileNotFoundError: - _logger.warning("speaker-test missing; skipping extra-device beep") - return - for sink in _EXTRA_PIPEWIRE_SINKS: - _play_tone_on_sink(path, sink, frequency) - - -def _play_tone_on_sink(path: str, sink: str, frequency: int) -> None: - """Launch speaker-test for *sink*; log a warning on OSError.""" - try: - subprocess.Popen( - [path, "-t", "sine", "-f", str(frequency), "-l", "1"], - env={**os.environ, "PIPEWIRE_NODE": sink}, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - except OSError: - _logger.warning("Failed to play tone on sink %s", sink, exc_info=True) - - -# NCT Super I/O chip names that expose a single pwm1 fan control channel. -_NCT_CHIP_NAMES: frozenset[str] = frozenset( - { - "nct6775", - "nct6779", - "nct6791", - "nct6792", - "nct6793", - "nct6795", - "nct6796", - "nct6797", - "nct6798", - "nct6799", - } -) - -# Installed by install.sh, controlled via sudoers NOPASSWD entry. -_FAN_SCRIPT: str = "/usr/local/bin/wake-alarm-fans.sh" -_SUDO_BIN: str = "/usr/bin/sudo" - - -def _find_fan_hwmon() -> str | None: - """Return the hwmon directory for an NCT fan controller, or None.""" - for name_path in Path("/sys/class/hwmon").glob("hwmon*/name"): - try: - chip = name_path.read_text().strip() - except OSError: - _logger.warning("Could not read %s", name_path, exc_info=True) - continue - if chip in _NCT_CHIP_NAMES: - return str(name_path.parent) - _logger.warning( - "No NCT super-I/O hwmon entry found; fan ramp will be skipped", - ) - return None - - -def _max_fans() -> bool: - """Ramp every NCT pwm channel to 100% speed via the helper script. - - The helper records prior state under /run/wake-alarm-fans.state so - _restore_fans() can put things back without arguments. Safe: higher fan - speed only lowers temperatures, never damages hardware. - - Returns: - True when the ramp script ran successfully, False otherwise. - """ - if _find_fan_hwmon() is None: - return False - try: - result = subprocess.run( - [_SUDO_BIN, "-n", _FAN_SCRIPT, "max"], - check=False, - capture_output=True, - timeout=5, - ) - except (OSError, subprocess.TimeoutExpired): - _logger.warning( - "Fan script %s not runnable; skipping fan ramp", - _FAN_SCRIPT, - exc_info=True, - ) - return False - if result.returncode != 0: - _logger.warning( - "Fan script %s exited %d: %s", - _FAN_SCRIPT, - result.returncode, - result.stderr.decode(errors="replace").strip(), - ) - return False - return True - - -def _restore_fans(*, active: bool) -> None: - """Restore fan speed if _max_fans() previously succeeded.""" - if not active: - return - try: - subprocess.run( - [_SUDO_BIN, "-n", _FAN_SCRIPT, "restore"], - check=False, - capture_output=True, - timeout=5, - ) - except (OSError, subprocess.TimeoutExpired): - _logger.warning( - "Failed to restore fan state via %s", - _FAN_SCRIPT, - exc_info=True, - ) - - -def _set_max_brightness() -> None: - """Set all connected monitors to maximum brightness via xrandr.""" - xrandr = shutil.which("xrandr") - if xrandr is None: - _logger.warning("xrandr not on PATH; skipping max-brightness") - return - try: - result = subprocess.run( - [xrandr, "--query"], - capture_output=True, - text=True, - timeout=5, - check=False, - ) - except (OSError, subprocess.TimeoutExpired): - _logger.warning("xrandr --query failed; skipping max-brightness", exc_info=True) - return - for line in result.stdout.splitlines(): - if " connected" in line: - output = line.split()[0] - try: - subprocess.run( - [xrandr, "--output", output, "--brightness", "1.0"], - check=False, - capture_output=True, - timeout=5, - ) - except (OSError, subprocess.TimeoutExpired): - _logger.warning( - "Failed to set brightness on %s", - output, - exc_info=True, - ) - - -def _beep_medium(frequency: int = 1000) -> None: - """Play a medium beep (sine tone via paplay/aplay/speaker-test).""" - _play_tone(frequency) - - -def _beep_loud(frequency: int = 1000) -> None: - """Play a loud sine tone via paplay/aplay/speaker-test.""" - _play_tone(frequency) - - class WakeAlarm: """Fullscreen wake alarm with escalating beep and dismiss challenge.""" @@ -465,9 +123,13 @@ class WakeAlarm: self.root.focus_force() self.root.update_idletasks() - self._current_code = _generate_code() + self._current_challenge: _Challenge = _make_challenge() self._skip_earnable: bool = True + self._rounds_completed: int = 0 + self._flash_remaining: int = 0 self._build_ui() + if self._current_challenge.kind == "flash": + self._start_flash_countdown() self._schedule_code_refresh() self._schedule_skip_window_close() self._start_beep_thread() @@ -490,19 +152,30 @@ class WakeAlarm: ) self._title_label.pack(pady=20) + self._round_label = tk.Label( + self._container, + text=f"Round 1 / {DISMISS_ROUNDS_REQUIRED}", + font=("Arial", 24, "bold"), + fg="#ffaa00", + bg="#1a1a1a", + ) + self._round_label.pack(pady=5) + self._info_label = tk.Label( self._container, - text="Type the code below to earn a workout-free day", + text=self._current_challenge.hint, font=("Arial", 18), fg="white", bg="#1a1a1a", ) self._info_label.pack(pady=10) + # Math and sort use a smaller font because their display text is wider. + code_font_size = 48 if self._current_challenge.kind in ("math", "sort") else 72 self._code_label = tk.Label( self._container, - text=self._current_code, - font=("Courier", 72, "bold"), + text=self._current_challenge.display, + font=("Courier", code_font_size, "bold"), fg="#00ff00", bg="#1a1a1a", ) @@ -512,7 +185,7 @@ class WakeAlarm: self._container, font=("Courier", 36), justify="center", - width=DISMISS_CODE_LENGTH + 2, + width=12, ) self._entry.pack(pady=10) self._entry.focus_set() @@ -538,13 +211,64 @@ class WakeAlarm: self._update_timer() def _on_submit(self, _event: object = None) -> None: - """Handle code submission.""" - entered = self._entry.get().strip() - if entered == self._current_code: - self._dismiss_alarm(earned_skip=self._skip_earnable) - else: - self._status_label.configure(text="Wrong code! Try again.") + """Handle challenge submission. + + Normalises input and compares to the current challenge answer. + Requires DISMISS_ROUNDS_REQUIRED correct entries in sequence — each + correct round generates a new random challenge so the user must stay + awake and re-engage each time. + """ + entered = self._entry.get().strip().upper() + if entered != self._current_challenge.answer: + self._status_label.configure(text="Wrong! Try again.") self._entry.delete(0, tk.END) + if self._current_challenge.kind == "flash": + self._code_label.configure( + text=self._current_challenge.display, + fg="#00ff00", + ) + self._start_flash_countdown() + return + self._rounds_completed += 1 + if self._rounds_completed >= DISMISS_ROUNDS_REQUIRED: + self._dismiss_alarm(earned_skip=self._skip_earnable) + return + self._current_challenge = _make_challenge() + self._code_label.configure( + text=self._current_challenge.display, + fg="#00ff00", + ) + self._info_label.configure(text=self._current_challenge.hint) + self._entry.delete(0, tk.END) + next_round = self._rounds_completed + 1 + self._round_label.configure( + text=f"Round {next_round} / {DISMISS_ROUNDS_REQUIRED}", + ) + self._status_label.configure( + text=f"Round {self._rounds_completed} done — keep going!", + ) + if self._current_challenge.kind == "flash": + self._start_flash_countdown() + + def _start_flash_countdown(self) -> None: + """Begin the flash countdown: show code then hide it.""" + self._flash_remaining = DISMISS_FLASH_SECONDS + self._flash_tick() + + def _flash_tick(self) -> None: + """Decrement flash countdown; replace the displayed code with placeholders.""" + if not self._active: + return + if self._flash_remaining > 0: + self._status_label.configure( + text=f"Memorise! Hiding in {self._flash_remaining}s…", + ) + self._flash_remaining -= 1 + self.root.after(1000, self._flash_tick) + else: + hidden = "?" * len(self._current_challenge.display) + self._code_label.configure(text=hidden, fg="#555555") + self._status_label.configure(text="Now type the code from memory!") def _dismiss_alarm(self, *, earned_skip: bool) -> None: """Dismiss the alarm and save state.""" @@ -584,12 +308,22 @@ class WakeAlarm: self.root.destroy() def _schedule_code_refresh(self) -> None: - """Refresh the dismiss code periodically.""" + """Replace the current challenge periodically. + + Ensures the user can't simply wait out a hard challenge type — a new + random challenge is generated every DISMISS_CODE_REFRESH_SECONDS. + """ if not self._active: return - self._current_code = _generate_code() - self._code_label.configure(text=self._current_code) + self._current_challenge = _make_challenge() + self._code_label.configure( + text=self._current_challenge.display, + fg="#00ff00", + ) + self._info_label.configure(text=self._current_challenge.hint) self._entry.delete(0, tk.END) + if self._current_challenge.kind == "flash": + self._start_flash_countdown() ms = DISMISS_CODE_REFRESH_SECONDS * 1000 if not self.demo_mode else 10_000 self.root.after(ms, self._schedule_code_refresh) @@ -688,137 +422,6 @@ def _should_run_alarm() -> bool: return True -def _pactl_path() -> str | None: - """Return the absolute path to pactl, or None when not installed.""" - return shutil.which("pactl") - - -def _alarm_sink_present(pactl: str) -> bool: - """Return True when the dedicated alarm HDMI sink exists in PipeWire.""" - try: - result = subprocess.run( - [pactl, "list", "short", "sinks"], - capture_output=True, - timeout=3, - check=False, - ) - except (OSError, subprocess.TimeoutExpired): - _logger.warning("pactl list sinks failed", exc_info=True) - return False - return ALARM_AUDIO_SINK in result.stdout.decode(errors="replace") - - -def _current_default_sink(pactl: str) -> str | None: - """Return the current default sink name, or None on failure / empty.""" - try: - result = subprocess.run( - [pactl, "get-default-sink"], - capture_output=True, - timeout=3, - check=False, - ) - except (OSError, subprocess.TimeoutExpired): - _logger.warning("pactl get-default-sink failed", exc_info=True) - return None - name = result.stdout.decode(errors="replace").strip() - return name or None - - -def _activate_alarm_audio() -> str | None: - """Force the monitor's HDMI output on and route the alarm to it. - - At wake time the Bluetooth speaker is disconnected and PipeWire only has the - ``auto_null`` sink, so the alarm is silent. This forces the HDMI card - profile on, waits for its sink to appear, makes it the default sink, and - raises it to full volume - empirically the only output audible on this - machine at wake time (the G27Q monitor's built-in speaker). - - Returns: - The previous default sink name (to restore on close), or ``None`` when - the alarm audio sink could not be activated. - """ - pactl = _pactl_path() - if pactl is None: - _logger.warning("pactl not on PATH; cannot activate alarm audio") - return None - subprocess.run( - [pactl, "set-card-profile", ALARM_AUDIO_CARD, ALARM_AUDIO_PROFILE], - capture_output=True, - timeout=3, - check=False, - ) - attempts = max( - 1, - int(ALARM_AUDIO_SINK_WAIT_SECONDS / ALARM_AUDIO_SINK_POLL_SECONDS), - ) - for _ in range(attempts): - if _alarm_sink_present(pactl): - break - time.sleep(ALARM_AUDIO_SINK_POLL_SECONDS) - else: - _logger.warning( - "Alarm audio sink %s did not appear after %.0fs; alarm may be silent", - ALARM_AUDIO_SINK, - ALARM_AUDIO_SINK_WAIT_SECONDS, - ) - return None - old_default = _current_default_sink(pactl) - for cmd in ( - [pactl, "set-default-sink", ALARM_AUDIO_SINK], - [pactl, "set-sink-mute", ALARM_AUDIO_SINK, "0"], - [pactl, "set-sink-volume", ALARM_AUDIO_SINK, "100%"], - ): - subprocess.run(cmd, capture_output=True, timeout=3, check=False) - _logger.warning("Alarm audio routed to %s at 100%%", ALARM_AUDIO_SINK) - return old_default - - -def _restore_alarm_audio(old_default: str | None) -> None: - """Restore the default sink captured by :func:`_activate_alarm_audio`.""" - if old_default is None: - return - pactl = _pactl_path() - if pactl is None: - return - subprocess.run( - [pactl, "set-default-sink", old_default], - capture_output=True, - timeout=3, - check=False, - ) - - -def _warn_if_no_real_sink() -> None: - """Log a loud warning if PipeWire only has the auto_null sink.""" - pactl = _pactl_path() - if pactl is None: - _logger.warning("pactl not on PATH; cannot verify audio sinks") - return - try: - result = subprocess.run( - [pactl, "list", "short", "sinks"], - capture_output=True, - timeout=5, - check=False, - ) - except (OSError, subprocess.TimeoutExpired): - _logger.warning("pactl list sinks failed", exc_info=True) - return - sinks_text = result.stdout.decode(errors="replace").strip() - sink_names = [ - line.split("\t")[1] for line in sinks_text.splitlines() if "\t" in line - ] - real_sinks = [s for s in sink_names if s != "auto_null"] - if not real_sinks: - _logger.warning( - "ONLY auto_null PipeWire sink available \u2014 alarm will be SILENT. " - "Sinks: %s", - sink_names or "", - ) - else: - _logger.info("Audio sinks available: %s", sink_names) - - def _parse_args(argv: list[str]) -> argparse.Namespace: """Parse CLI arguments for the alarm daemon.""" parser = argparse.ArgumentParser(description="Wake alarm daemon.") diff --git a/wake_alarm/_audio.py b/wake_alarm/_audio.py new file mode 100644 index 0000000..54c977a --- /dev/null +++ b/wake_alarm/_audio.py @@ -0,0 +1,493 @@ +"""Audio playback, fan control, and PipeWire sink management for the wake alarm.""" + +from __future__ import annotations + +import logging +import math +import os +from pathlib import Path +import shutil +import struct +import subprocess +import sys +import tempfile +import time +import wave + +from python_pkg.wake_alarm._constants import ( + ALARM_AUDIO_CARD, + ALARM_AUDIO_PROFILE, + ALARM_AUDIO_SINK, + ALARM_AUDIO_SINK_POLL_SECONDS, + ALARM_AUDIO_SINK_WAIT_SECONDS, +) + +_logger = logging.getLogger(__name__) + +_TONE_CACHE: dict[int, Path] = {} +_TONE_TIMEOUT_SECONDS: float = 6.0 +_TONE_DURATION_SECONDS: float = 1.5 +_TONE_FRAMERATE: int = 48000 +_TONE_AMPLITUDE: int = 32760 # near s16 max for the loudest sine we can emit + +# Number of back-to-back PC-speaker beeps per _play_tone call. +# pcspkr volume is hardware-fixed, so we lean on repetition + duration to be +# loud enough to actually wake the user. +_PCSPKR_REPEATS: int = 3 +_PCSPKR_GAP_SECONDS: float = 0.12 + +# Motherboard PC speaker exposed by the pcspkr kernel module. +# Writing EV_SND/SND_TONE input_event structs makes it beep — bypasses +# PipeWire/ALSA entirely, so it stays audible even when no real sink exists. +_PCSPKR_DEVICE: str = "/dev/input/by-path/platform-pcspkr-event-spkr" +_PCSPKR_EV_SND: int = 0x12 +_PCSPKR_SND_TONE: int = 0x02 +# struct input_event: timeval (long sec, long usec), u16 type, u16 code, s32 val +_PCSPKR_EVENT_FMT: str = "llHHi" + +# Extra PipeWire sinks to always play alarm audio on (alongside the default). +# alsa_output...hdmi-stereo = GA102 → G27Q (has built-in speaker, always on). +_EXTRA_PIPEWIRE_SINKS: tuple[str, ...] = ("alsa_output.pci-0000_01_00.1.hdmi-stereo",) + +# NCT Super I/O chip names that expose a single pwm1 fan control channel. +_NCT_CHIP_NAMES: frozenset[str] = frozenset( + { + "nct6775", + "nct6779", + "nct6791", + "nct6792", + "nct6793", + "nct6795", + "nct6796", + "nct6797", + "nct6798", + "nct6799", + } +) + +# Installed by install.sh, controlled via sudoers NOPASSWD entry. +_FAN_SCRIPT: str = "/usr/local/bin/wake-alarm-fans.sh" +_SUDO_BIN: str = "/usr/bin/sudo" + + +def _beep_soft() -> None: + """Play a soft system beep via terminal bell.""" + sys.stdout.write("\a") + sys.stdout.flush() + + +def _speaker_test_path() -> str: + """Resolve absolute path to speaker-test binary.""" + path = shutil.which("speaker-test") + if path is None: + msg = "speaker-test not found on PATH" + raise FileNotFoundError(msg) + return path + + +def _beep_pcspkr(frequency: int, duration_seconds: float) -> None: + """Beep the motherboard PC speaker via evdev (audible without any sink). + + Silently no-ops when the device is missing or unwritable so the call is + always safe from the alarm hot path. + """ + try: + # buffering=0 so the write hits the device immediately. + with Path(_PCSPKR_DEVICE).open("wb", buffering=0) as dev: + dev.write( + struct.pack( + _PCSPKR_EVENT_FMT, + 0, + 0, + _PCSPKR_EV_SND, + _PCSPKR_SND_TONE, + int(frequency), + ), + ) + time.sleep(duration_seconds) + dev.write( + struct.pack( + _PCSPKR_EVENT_FMT, + 0, + 0, + _PCSPKR_EV_SND, + _PCSPKR_SND_TONE, + 0, + ), + ) + except OSError: + _logger.warning( + "PC speaker beep at %d Hz failed (device %s)", + frequency, + _PCSPKR_DEVICE, + exc_info=True, + ) + + +def _ensure_tone_wav(frequency: int) -> Path: + """Generate (and cache) a mono 48 kHz sine WAV at *frequency* Hz.""" + cached = _TONE_CACHE.get(frequency) + if cached is not None and cached.exists(): + return cached + path = Path(tempfile.gettempdir()) / f"wake_alarm_tone_{frequency}.wav" + n_frames = int(_TONE_FRAMERATE * _TONE_DURATION_SECONDS) + with wave.open(str(path), "wb") as wav: + wav.setnchannels(1) + wav.setsampwidth(2) + wav.setframerate(_TONE_FRAMERATE) + frames = bytearray() + for i in range(n_frames): + sample = int( + _TONE_AMPLITUDE + * math.sin(2 * math.pi * frequency * i / _TONE_FRAMERATE), + ) + frames.extend(struct.pack(" bool: + """Run *binary* on *wav* with a generous timeout. Return True on success.""" + path = shutil.which(binary) + if path is None: + return False + try: + result = subprocess.run( + [path, str(wav)], + capture_output=True, + timeout=_TONE_TIMEOUT_SECONDS, + check=False, + ) + except (OSError, subprocess.TimeoutExpired): + _logger.warning("%s failed playing %s", binary, wav.name, exc_info=True) + return False + if result.returncode != 0: + _logger.warning( + "%s exited %d for %s: %s", + binary, + result.returncode, + wav.name, + result.stderr.decode(errors="replace").strip()[:200], + ) + return False + return True + + +def _play_tone(frequency: int) -> None: + """Play a sine tone via paplay/aplay/speaker-test, fall back to soft beep. + + Always also beeps the motherboard PC speaker (multiple times) so the + alarm stays loud and audible even when PipeWire only has the auto_null + sink. + """ + for i in range(_PCSPKR_REPEATS): + _beep_pcspkr(frequency, _TONE_DURATION_SECONDS) + if i < _PCSPKR_REPEATS - 1: + time.sleep(_PCSPKR_GAP_SECONDS) + try: + wav = _ensure_tone_wav(frequency) + except OSError: + _logger.warning( + "Could not generate tone WAV at %d Hz; using soft beep", + frequency, + exc_info=True, + ) + _beep_soft() + return + for binary in ("paplay", "aplay"): + if _try_player(binary, wav): + return + try: + subprocess.run( + [ + _speaker_test_path(), + "-t", + "sine", + "-f", + str(frequency), + "-l", + "1", + ], + capture_output=True, + timeout=_TONE_TIMEOUT_SECONDS, + check=False, + ) + except (FileNotFoundError, OSError, subprocess.TimeoutExpired): + _logger.warning( + "All tone players failed at %d Hz; falling back to soft beep", + frequency, + exc_info=True, + ) + _beep_soft() + + +def _play_on_extra_devices(frequency: int) -> None: + """Fire-and-forget: play a sine tone on each extra PipeWire sink.""" + try: + path = _speaker_test_path() + except FileNotFoundError: + _logger.warning("speaker-test missing; skipping extra-device beep") + return + for sink in _EXTRA_PIPEWIRE_SINKS: + _play_tone_on_sink(path, sink, frequency) + + +def _play_tone_on_sink(path: str, sink: str, frequency: int) -> None: + """Launch speaker-test for *sink*; log a warning on OSError.""" + try: + subprocess.Popen( + [path, "-t", "sine", "-f", str(frequency), "-l", "1"], + env={**os.environ, "PIPEWIRE_NODE": sink}, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + except OSError: + _logger.warning("Failed to play tone on sink %s", sink, exc_info=True) + + +def _find_fan_hwmon() -> str | None: + """Return the hwmon directory for an NCT fan controller, or None.""" + for name_path in Path("/sys/class/hwmon").glob("hwmon*/name"): + try: + chip = name_path.read_text().strip() + except OSError: + _logger.warning("Could not read %s", name_path, exc_info=True) + continue + if chip in _NCT_CHIP_NAMES: + return str(name_path.parent) + _logger.warning( + "No NCT super-I/O hwmon entry found; fan ramp will be skipped", + ) + return None + + +def _max_fans() -> bool: + """Ramp every NCT pwm channel to 100% speed via the helper script. + + The helper records prior state under /run/wake-alarm-fans.state so + _restore_fans() can put things back without arguments. Safe: higher fan + speed only lowers temperatures, never damages hardware. + + Returns: + True when the ramp script ran successfully, False otherwise. + """ + if _find_fan_hwmon() is None: + return False + try: + result = subprocess.run( + [_SUDO_BIN, "-n", _FAN_SCRIPT, "max"], + check=False, + capture_output=True, + timeout=5, + ) + except (OSError, subprocess.TimeoutExpired): + _logger.warning( + "Fan script %s not runnable; skipping fan ramp", + _FAN_SCRIPT, + exc_info=True, + ) + return False + if result.returncode != 0: + _logger.warning( + "Fan script %s exited %d: %s", + _FAN_SCRIPT, + result.returncode, + result.stderr.decode(errors="replace").strip(), + ) + return False + return True + + +def _restore_fans(*, active: bool) -> None: + """Restore fan speed if _max_fans() previously succeeded.""" + if not active: + return + try: + subprocess.run( + [_SUDO_BIN, "-n", _FAN_SCRIPT, "restore"], + check=False, + capture_output=True, + timeout=5, + ) + except (OSError, subprocess.TimeoutExpired): + _logger.warning( + "Failed to restore fan state via %s", + _FAN_SCRIPT, + exc_info=True, + ) + + +def _set_max_brightness() -> None: + """Set all connected monitors to maximum brightness via xrandr.""" + xrandr = shutil.which("xrandr") + if xrandr is None: + _logger.warning("xrandr not on PATH; skipping max-brightness") + return + try: + result = subprocess.run( + [xrandr, "--query"], + capture_output=True, + text=True, + timeout=5, + check=False, + ) + except (OSError, subprocess.TimeoutExpired): + _logger.warning("xrandr --query failed; skipping max-brightness", exc_info=True) + return + for line in result.stdout.splitlines(): + if " connected" in line: + output = line.split()[0] + try: + subprocess.run( + [xrandr, "--output", output, "--brightness", "1.0"], + check=False, + capture_output=True, + timeout=5, + ) + except (OSError, subprocess.TimeoutExpired): + _logger.warning( + "Failed to set brightness on %s", + output, + exc_info=True, + ) + + +def _beep_medium(frequency: int = 1000) -> None: + """Play a medium beep (sine tone via paplay/aplay/speaker-test).""" + _play_tone(frequency) + + +def _beep_loud(frequency: int = 1000) -> None: + """Play a loud sine tone via paplay/aplay/speaker-test.""" + _play_tone(frequency) + + +def _pactl_path() -> str | None: + """Return the absolute path to pactl, or None when not installed.""" + return shutil.which("pactl") + + +def _alarm_sink_present(pactl: str) -> bool: + """Return True when the dedicated alarm HDMI sink exists in PipeWire.""" + try: + result = subprocess.run( + [pactl, "list", "short", "sinks"], + capture_output=True, + timeout=3, + check=False, + ) + except (OSError, subprocess.TimeoutExpired): + _logger.warning("pactl list sinks failed", exc_info=True) + return False + return ALARM_AUDIO_SINK in result.stdout.decode(errors="replace") + + +def _current_default_sink(pactl: str) -> str | None: + """Return the current default sink name, or None on failure / empty.""" + try: + result = subprocess.run( + [pactl, "get-default-sink"], + capture_output=True, + timeout=3, + check=False, + ) + except (OSError, subprocess.TimeoutExpired): + _logger.warning("pactl get-default-sink failed", exc_info=True) + return None + name = result.stdout.decode(errors="replace").strip() + return name or None + + +def _activate_alarm_audio() -> str | None: + """Force the monitor's HDMI output on and route the alarm to it. + + At wake time the Bluetooth speaker is disconnected and PipeWire only has the + ``auto_null`` sink, so the alarm is silent. This forces the HDMI card + profile on, waits for its sink to appear, makes it the default sink, and + raises it to full volume - empirically the only output audible on this + machine at wake time (the G27Q monitor's built-in speaker). + + Returns: + The previous default sink name (to restore on close), or ``None`` when + the alarm audio sink could not be activated. + """ + pactl = _pactl_path() + if pactl is None: + _logger.warning("pactl not on PATH; cannot activate alarm audio") + return None + subprocess.run( + [pactl, "set-card-profile", ALARM_AUDIO_CARD, ALARM_AUDIO_PROFILE], + capture_output=True, + timeout=3, + check=False, + ) + attempts = max( + 1, + int(ALARM_AUDIO_SINK_WAIT_SECONDS / ALARM_AUDIO_SINK_POLL_SECONDS), + ) + for _ in range(attempts): + if _alarm_sink_present(pactl): + break + time.sleep(ALARM_AUDIO_SINK_POLL_SECONDS) + else: + _logger.warning( + "Alarm audio sink %s did not appear after %.0fs; alarm may be silent", + ALARM_AUDIO_SINK, + ALARM_AUDIO_SINK_WAIT_SECONDS, + ) + return None + old_default = _current_default_sink(pactl) + for cmd in ( + [pactl, "set-default-sink", ALARM_AUDIO_SINK], + [pactl, "set-sink-mute", ALARM_AUDIO_SINK, "0"], + [pactl, "set-sink-volume", ALARM_AUDIO_SINK, "100%"], + ): + subprocess.run(cmd, capture_output=True, timeout=3, check=False) + _logger.warning("Alarm audio routed to %s at 100%%", ALARM_AUDIO_SINK) + return old_default + + +def _restore_alarm_audio(old_default: str | None) -> None: + """Restore the default sink captured by :func:`_activate_alarm_audio`.""" + if old_default is None: + return + pactl = _pactl_path() + if pactl is None: + return + subprocess.run( + [pactl, "set-default-sink", old_default], + capture_output=True, + timeout=3, + check=False, + ) + + +def _warn_if_no_real_sink() -> None: + """Log a loud warning if PipeWire only has the auto_null sink.""" + pactl = _pactl_path() + if pactl is None: + _logger.warning("pactl not on PATH; cannot verify audio sinks") + return + try: + result = subprocess.run( + [pactl, "list", "short", "sinks"], + capture_output=True, + timeout=5, + check=False, + ) + except (OSError, subprocess.TimeoutExpired): + _logger.warning("pactl list sinks failed", exc_info=True) + return + sinks_text = result.stdout.decode(errors="replace").strip() + sink_names = [ + line.split("\t")[1] for line in sinks_text.splitlines() if "\t" in line + ] + real_sinks = [s for s in sink_names if s != "auto_null"] + if not real_sinks: + _logger.warning( + "ONLY auto_null PipeWire sink available — alarm will be SILENT. Sinks: %s", + sink_names or "", + ) + else: + _logger.info("Audio sinks available: %s", sink_names) diff --git a/wake_alarm/_challenges.py b/wake_alarm/_challenges.py new file mode 100644 index 0000000..88cfe93 --- /dev/null +++ b/wake_alarm/_challenges.py @@ -0,0 +1,129 @@ +"""Dismiss-challenge types for the wake alarm. + +Provides three challenge variants: +- math: solve an arithmetic problem +- sort: type shuffled digits in ascending order +- flash: memorise a code before it is hidden +""" + +from __future__ import annotations + +import secrets + +from python_pkg.wake_alarm._constants import DISMISS_CODE_LENGTH, DISMISS_FLASH_SECONDS + +# Uppercase alphanumeric chars with visually ambiguous characters removed: +# O/0 (oh vs zero) and I/1 (capital-i vs one) are excluded so the code is +# legible at a glance, even half-asleep. +_DISMISS_CHARS: str = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" + + +class _Challenge: + """A dismiss challenge presented to the user to prove wakefulness.""" + + def __init__( + self, + *, + kind: str, + display: str, + answer: str, + hint: str, + ) -> None: + """Store challenge parameters. + + Args: + kind: Challenge type — "math", "flash", or "sort". + display: Text shown in the large code label. + answer: Expected typed answer (normalised, upper-case). + hint: Short instruction shown above the code label. + """ + self.kind: str = kind + self.display: str = display + self.answer: str = answer + self.hint: str = hint + + +def _generate_code() -> str: + """Generate a random alphanumeric dismiss code. + + Uses uppercase letters and digits only, with ambiguous characters + (O, I, 0, 1) removed so the displayed code is easy to read at a glance. + """ + return "".join(secrets.choice(_DISMISS_CHARS) for _ in range(DISMISS_CODE_LENGTH)) + + +def _make_math_challenge() -> _Challenge: + """Generate an arithmetic problem the user must solve to dismiss. + + Picks randomly from addition, subtraction, and multiplication. + The user types only the numeric answer — no copying, no autopilot. + """ + op = secrets.choice(("+", "-", "*")) + if op == "+": + a, b = 10 + secrets.randbelow(90), 10 + secrets.randbelow(90) + return _Challenge( + kind="math", + display=f"{a} + {b} = ?", + answer=str(a + b), + hint="Solve and type the answer", + ) + if op == "-": + a = 20 + secrets.randbelow(80) + b = 10 + secrets.randbelow(a - 10) + return _Challenge( + kind="math", + display=f"{a} - {b} = ?", + answer=str(a - b), + hint="Solve and type the answer", + ) + a, b = 12 + secrets.randbelow(14), 3 + secrets.randbelow(7) + return _Challenge( + kind="math", + display=f"{a} * {b} = ?", + answer=str(a * b), + hint="Solve and type the answer", + ) + + +def _make_sort_challenge() -> _Challenge: + """Generate a sort-the-digits challenge. + + Displays six shuffled single digits; the user types them ascending (no spaces). + Requires a brief cognitive effort — fast enough to be fair, slow enough to prove + you are awake. + """ + pool = list(range(1, 10)) + for i in range(len(pool) - 1, 0, -1): + j = secrets.randbelow(i + 1) + pool[i], pool[j] = pool[j], pool[i] + digits = pool[:6] + display = " ".join(str(d) for d in digits) + answer = "".join(str(d) for d in sorted(digits)) + return _Challenge( + kind="sort", + display=display, + answer=answer, + hint="Type digits sorted lowest → highest (no spaces)", + ) + + +def _make_flash_challenge() -> _Challenge: + """Generate a memorise-then-type challenge. + + Shows a code for DISMISS_FLASH_SECONDS, then hides it. + The user must type the code from memory. + """ + code = _generate_code() + return _Challenge( + kind="flash", + display=code, + answer=code, + hint=f"Memorise this code — it disappears in {DISMISS_FLASH_SECONDS}s", + ) + + +def _make_challenge() -> _Challenge: + """Pick a random challenge type and generate an instance.""" + return secrets.choice( + (_make_math_challenge, _make_flash_challenge, _make_sort_challenge), + )() diff --git a/wake_alarm/_constants.py b/wake_alarm/_constants.py index 5cb51ab..7bae93f 100644 --- a/wake_alarm/_constants.py +++ b/wake_alarm/_constants.py @@ -28,7 +28,13 @@ MEDIUM_BEEP_INTERVAL: float = 5.0 LOUD_TOGGLE_INTERVAL: float = 2.0 # Dismiss challenge: length of the random code -DISMISS_CODE_LENGTH: int = 6 +DISMISS_CODE_LENGTH: int = 8 +# Number of correct code entries required to dismiss the alarm. +# Requiring more than one round forces the user to stay awake long enough +# to actually read and type multiple independent codes. +DISMISS_ROUNDS_REQUIRED: int = 2 +# Seconds the code is visible before being hidden in a flash challenge. +DISMISS_FLASH_SECONDS: int = 4 # How often the dismiss code refreshes (seconds) DISMISS_CODE_REFRESH_SECONDS: int = 30 diff --git a/wake_alarm/tests/test_alarm.py b/wake_alarm/tests/test_alarm.py index 7b99761..086b056 100644 --- a/wake_alarm/tests/test_alarm.py +++ b/wake_alarm/tests/test_alarm.py @@ -11,33 +11,27 @@ from unittest.mock import MagicMock, patch import pytest if TYPE_CHECKING: - from collections.abc import Generator, Iterator + from collections.abc import Generator from python_pkg.wake_alarm._alarm import ( - _activate_alarm_audio, - _alarm_sink_present, + _is_alarm_day, + _restore_display, + _should_run_alarm, + _wake_display, +) +from python_pkg.wake_alarm._audio import ( _beep_loud, _beep_medium, - _beep_pcspkr, _beep_soft, - _current_default_sink, - _ensure_tone_wav, _find_fan_hwmon, - _generate_code, - _is_alarm_day, _max_fans, - _parse_args, _play_on_extra_devices, - _play_tone, - _restore_alarm_audio, - _restore_display, _restore_fans, - _set_max_brightness, - _should_run_alarm, _speaker_test_path, - _try_player, - _wake_display, - _warn_if_no_real_sink, +) +from python_pkg.wake_alarm._challenges import ( + _DISMISS_CHARS, + _generate_code, ) from python_pkg.wake_alarm._constants import ( DISMISS_CODE_LENGTH, @@ -92,10 +86,11 @@ class TestGenerateCode: code = _generate_code() assert len(code) == DISMISS_CODE_LENGTH - def test_all_digits(self) -> None: - """Generated code contains only digits.""" + def test_all_alphanumeric(self) -> None: + """Generated code uses only the unambiguous alphanumeric charset.""" + code = _generate_code() - assert code.isdigit() + assert all(c in _DISMISS_CHARS for c in code) def test_different_codes(self) -> None: """Two calls produce different codes (probabilistic, but safe).""" @@ -177,7 +172,7 @@ class TestSpeakerTestPath: def test_returns_path_when_found(self) -> None: """Return full path when speaker-test is available.""" with patch( - "python_pkg.wake_alarm._alarm.shutil.which", + "python_pkg.wake_alarm._audio.shutil.which", return_value="/usr/bin/speaker-test", ): assert _speaker_test_path() == "/usr/bin/speaker-test" @@ -186,7 +181,7 @@ class TestSpeakerTestPath: """Raise FileNotFoundError when speaker-test is missing.""" with ( patch( - "python_pkg.wake_alarm._alarm.shutil.which", + "python_pkg.wake_alarm._audio.shutil.which", return_value=None, ), pytest.raises(FileNotFoundError, match="speaker-test not found"), @@ -199,7 +194,7 @@ class TestBeepFunctions: def test_beep_soft_writes_bell(self) -> None: """_beep_soft writes terminal bell character.""" - with patch("python_pkg.wake_alarm._alarm.sys") as mock_sys: + with patch("python_pkg.wake_alarm._audio.sys") as mock_sys: mock_sys.stdout = MagicMock() _beep_soft() mock_sys.stdout.write.assert_called_once_with("\a") @@ -207,13 +202,13 @@ class TestBeepFunctions: def test_beep_medium_delegates_to_play_tone(self) -> None: """_beep_medium just delegates to _play_tone.""" - with patch("python_pkg.wake_alarm._alarm._play_tone") as mock_play: + with patch("python_pkg.wake_alarm._audio._play_tone") as mock_play: _beep_medium(frequency=800) mock_play.assert_called_once_with(800) def test_beep_loud_delegates_to_play_tone(self) -> None: """_beep_loud just delegates to _play_tone.""" - with patch("python_pkg.wake_alarm._alarm._play_tone") as mock_play: + with patch("python_pkg.wake_alarm._audio._play_tone") as mock_play: _beep_loud(frequency=1200) mock_play.assert_called_once_with(1200) @@ -308,10 +303,10 @@ class TestPlayOnExtraDevices: """_play_on_extra_devices spawns speaker-test with PIPEWIRE_NODE set.""" with ( patch( - "python_pkg.wake_alarm._alarm._speaker_test_path", + "python_pkg.wake_alarm._audio._speaker_test_path", return_value="/usr/bin/speaker-test", ), - patch("python_pkg.wake_alarm._alarm.subprocess.Popen") as mock_popen, + patch("python_pkg.wake_alarm._audio.subprocess.Popen") as mock_popen, ): _play_on_extra_devices(1000) mock_popen.assert_called_once() @@ -327,10 +322,10 @@ class TestPlayOnExtraDevices: """_play_on_extra_devices does nothing when speaker-test is absent.""" with ( patch( - "python_pkg.wake_alarm._alarm._speaker_test_path", + "python_pkg.wake_alarm._audio._speaker_test_path", side_effect=FileNotFoundError("not found"), ), - patch("python_pkg.wake_alarm._alarm.subprocess.Popen") as mock_popen, + patch("python_pkg.wake_alarm._audio.subprocess.Popen") as mock_popen, ): _play_on_extra_devices(1000) mock_popen.assert_not_called() @@ -339,11 +334,11 @@ class TestPlayOnExtraDevices: """_play_on_extra_devices silently ignores OSError from Popen.""" with ( patch( - "python_pkg.wake_alarm._alarm._speaker_test_path", + "python_pkg.wake_alarm._audio._speaker_test_path", return_value="/usr/bin/speaker-test", ), patch( - "python_pkg.wake_alarm._alarm.subprocess.Popen", + "python_pkg.wake_alarm._audio.subprocess.Popen", side_effect=OSError("device busy"), ), ): @@ -390,18 +385,18 @@ class TestMaxFans: def test_returns_false_when_no_hwmon(self) -> None: """No fan controller → returns False immediately.""" - with patch("python_pkg.wake_alarm._alarm._find_fan_hwmon", return_value=None): + with patch("python_pkg.wake_alarm._audio._find_fan_hwmon", return_value=None): assert _max_fans() is False def test_returns_false_on_script_oserror(self, tmp_path: pathlib.Path) -> None: """OSError running fan script → returns False.""" with ( patch( - "python_pkg.wake_alarm._alarm._find_fan_hwmon", + "python_pkg.wake_alarm._audio._find_fan_hwmon", return_value=str(tmp_path), ), patch( - "python_pkg.wake_alarm._alarm.subprocess.run", + "python_pkg.wake_alarm._audio.subprocess.run", side_effect=OSError("not found"), ), ): @@ -411,11 +406,11 @@ class TestMaxFans: """TimeoutExpired running fan script → returns False.""" with ( patch( - "python_pkg.wake_alarm._alarm._find_fan_hwmon", + "python_pkg.wake_alarm._audio._find_fan_hwmon", return_value=str(tmp_path), ), patch( - "python_pkg.wake_alarm._alarm.subprocess.run", + "python_pkg.wake_alarm._audio.subprocess.run", side_effect=subprocess.TimeoutExpired("fan", 5), ), ): @@ -427,11 +422,11 @@ class TestMaxFans: mock_result.returncode = 1 with ( patch( - "python_pkg.wake_alarm._alarm._find_fan_hwmon", + "python_pkg.wake_alarm._audio._find_fan_hwmon", return_value=str(tmp_path), ), patch( - "python_pkg.wake_alarm._alarm.subprocess.run", + "python_pkg.wake_alarm._audio.subprocess.run", return_value=mock_result, ), ): @@ -443,11 +438,11 @@ class TestMaxFans: mock_result.returncode = 0 with ( patch( - "python_pkg.wake_alarm._alarm._find_fan_hwmon", + "python_pkg.wake_alarm._audio._find_fan_hwmon", return_value=str(tmp_path), ), patch( - "python_pkg.wake_alarm._alarm.subprocess.run", + "python_pkg.wake_alarm._audio.subprocess.run", return_value=mock_result, ), ): @@ -459,13 +454,13 @@ class TestRestoreFans: def test_noop_when_inactive(self) -> None: """False state → subprocess.run is never called.""" - with patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run: + with patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run: _restore_fans(active=False) mock_run.assert_not_called() def test_calls_fan_script_restore(self) -> None: """Active state → fan script called with restore (no args).""" - with patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run: + with patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run: mock_run.return_value.returncode = 0 _restore_fans(active=True) mock_run.assert_called_once() @@ -475,7 +470,7 @@ class TestRestoreFans: def test_ignores_oserror_on_restore(self) -> None: """OSError from fan script is silently suppressed.""" with patch( - "python_pkg.wake_alarm._alarm.subprocess.run", + "python_pkg.wake_alarm._audio.subprocess.run", side_effect=OSError("no script"), ): _restore_fans(active=True) # must not raise @@ -483,672 +478,7 @@ class TestRestoreFans: def test_ignores_timeout_on_restore(self) -> None: """TimeoutExpired from fan script is silently suppressed.""" with patch( - "python_pkg.wake_alarm._alarm.subprocess.run", + "python_pkg.wake_alarm._audio.subprocess.run", side_effect=subprocess.TimeoutExpired("fan", 5), ): _restore_fans(active=True) # must not raise - - -class TestSetMaxBrightness: - """Tests for _set_max_brightness.""" - - def test_noop_when_xrandr_missing(self) -> None: - """No xrandr on PATH → subprocess.run never called.""" - with ( - patch("python_pkg.wake_alarm._alarm.shutil.which", return_value=None), - patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run, - ): - _set_max_brightness() - mock_run.assert_not_called() - - def test_noop_on_oserror_from_query(self) -> None: - """OSError from xrandr --query is suppressed.""" - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/xrandr", - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - side_effect=OSError("no display"), - ), - ): - _set_max_brightness() # must not raise - - def test_noop_on_timeout_from_query(self) -> None: - """TimeoutExpired from xrandr --query is suppressed.""" - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/xrandr", - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - side_effect=subprocess.TimeoutExpired("xrandr", 5), - ), - ): - _set_max_brightness() # must not raise - - def test_sets_brightness_for_connected_displays(self) -> None: - """Connected displays each get an --output --brightness call.""" - mock_query_result = MagicMock() - mock_query_result.stdout = ( - "HDMI-0 connected 2560x1440+0+0\nDP-0 connected primary\n" - ) - call_args_list: list[list[str]] = [] - - def fake_run(args: list[str], **_kwargs: object) -> MagicMock: - call_args_list.append(args) - return mock_query_result - - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/xrandr", - ), - patch("python_pkg.wake_alarm._alarm.subprocess.run", side_effect=fake_run), - ): - _set_max_brightness() - - # First call is --query; subsequent calls set brightness for each output. - brightness_calls = [a for a in call_args_list if "--brightness" in a] - expected_brightness_calls = 2 - assert len(brightness_calls) == expected_brightness_calls - - def test_skips_disconnected_outputs(self) -> None: - """Disconnected outputs do NOT get a brightness call.""" - mock_result = MagicMock() - mock_result.stdout = "Screen 0: minimum 320\nHDMI-0 disconnected\n" - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/xrandr", - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - return_value=mock_result, - ) as mock_run, - ): - _set_max_brightness() - # Only the --query call, no brightness calls. - assert mock_run.call_count == 1 - - def test_warns_when_brightness_call_fails(self) -> None: - """OSError on per-output --brightness call is logged but swallowed.""" - query_result = MagicMock() - query_result.stdout = ( - "Screen 0: minimum 320\nHDMI-0 connected primary 1920x1080\n" - ) - - def _run_side_effect(args: list[str], **_kwargs: object) -> MagicMock: - if "--query" in args: - return query_result - msg = "permission denied" - raise OSError(msg) - - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/xrandr", - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - side_effect=_run_side_effect, - ), - ): - _set_max_brightness() # must not raise - - -class TestEnsureToneWav: - """Tests for _ensure_tone_wav (sine WAV generator + cache).""" - - def test_generates_and_caches(self, tmp_path: pathlib.Path) -> None: - """First call generates the WAV; second call returns the cached path.""" - from python_pkg.wake_alarm import _alarm as alarm_mod - - alarm_mod._TONE_CACHE.clear() - with patch( - "python_pkg.wake_alarm._alarm.tempfile.gettempdir", - return_value=str(tmp_path), - ): - path1 = _ensure_tone_wav(440) - assert path1.exists() - size = path1.stat().st_size - assert size > 0 - # Second call must hit the cache (no regeneration). - with patch("python_pkg.wake_alarm._alarm.wave.open") as mock_open: - path2 = _ensure_tone_wav(440) - mock_open.assert_not_called() - assert path2 == path1 - alarm_mod._TONE_CACHE.clear() - - def test_regenerates_when_cached_file_missing( - self, - tmp_path: pathlib.Path, - ) -> None: - """If the cached file was deleted, regenerate it.""" - from python_pkg.wake_alarm._alarm import _TONE_CACHE - - _TONE_CACHE.clear() - with patch( - "python_pkg.wake_alarm._alarm.tempfile.gettempdir", - return_value=str(tmp_path), - ): - path1 = _ensure_tone_wav(880) - path1.unlink() - path2 = _ensure_tone_wav(880) - assert path2.exists() - _TONE_CACHE.clear() - - -class TestTryPlayer: - """Tests for _try_player.""" - - def test_returns_false_when_binary_missing( - self, - tmp_path: pathlib.Path, - ) -> None: - """Missing binary returns False without raising.""" - wav = tmp_path / "x.wav" - wav.write_bytes(b"\x00") - with patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value=None, - ): - assert _try_player("paplay", wav) is False - - def test_returns_true_on_success(self, tmp_path: pathlib.Path) -> None: - """Zero exit code returns True.""" - wav = tmp_path / "x.wav" - wav.write_bytes(b"\x00") - result = MagicMock() - result.returncode = 0 - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/paplay", - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - return_value=result, - ), - ): - assert _try_player("paplay", wav) is True - - def test_returns_false_on_nonzero_exit( - self, - tmp_path: pathlib.Path, - ) -> None: - """Non-zero exit code returns False and logs.""" - wav = tmp_path / "x.wav" - wav.write_bytes(b"\x00") - result = MagicMock() - result.returncode = 1 - result.stderr = b"boom" - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/paplay", - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - return_value=result, - ), - ): - assert _try_player("paplay", wav) is False - - def test_returns_false_on_timeout(self, tmp_path: pathlib.Path) -> None: - """TimeoutExpired returns False and logs.""" - wav = tmp_path / "x.wav" - wav.write_bytes(b"\x00") - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/paplay", - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - side_effect=subprocess.TimeoutExpired("paplay", 6), - ), - ): - assert _try_player("paplay", wav) is False - - def test_returns_false_on_oserror(self, tmp_path: pathlib.Path) -> None: - """OSError returns False and logs.""" - wav = tmp_path / "x.wav" - wav.write_bytes(b"\x00") - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/paplay", - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - side_effect=OSError("nope"), - ), - ): - assert _try_player("paplay", wav) is False - - -class TestBeepPcspkr: - """Tests for _beep_pcspkr (evdev PC speaker).""" - - def test_writes_tone_then_zero_to_device(self) -> None: - """Successful path writes start-frequency then stop event.""" - - mock_dev = MagicMock() - mock_open_ctx = MagicMock() - mock_open_ctx.__enter__.return_value = mock_dev - mock_open_ctx.__exit__.return_value = False - with ( - patch( - "python_pkg.wake_alarm._alarm.Path.open", - return_value=mock_open_ctx, - ), - patch("python_pkg.wake_alarm._alarm.time.sleep"), - ): - _beep_pcspkr(1000, 0.05) - # First write carries the frequency, second write carries 0 (stop). - assert mock_dev.write.call_count == 2 - - def test_oserror_is_swallowed(self) -> None: - """OSError opening the device must not raise.""" - - with patch( - "python_pkg.wake_alarm._alarm.Path.open", - side_effect=OSError("no device"), - ): - _beep_pcspkr(1000, 0.05) # must not raise - - -class TestPlayTone: - """Tests for _play_tone.""" - - @pytest.fixture(autouse=True) - def _silence_pcspkr(self) -> Iterator[None]: - """Stop tests from hitting the real /dev/input PC speaker device.""" - with patch("python_pkg.wake_alarm._alarm._beep_pcspkr"): - yield - - def test_paplay_success_short_circuits(self, tmp_path: pathlib.Path) -> None: - """If paplay succeeds, no further players are tried.""" - wav = tmp_path / "tone.wav" - wav.write_bytes(b"\x00") - with ( - patch( - "python_pkg.wake_alarm._alarm._ensure_tone_wav", - return_value=wav, - ), - patch( - "python_pkg.wake_alarm._alarm._try_player", - return_value=True, - ) as mock_try, - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - ) as mock_run, - ): - _play_tone(440) - mock_try.assert_called_once_with("paplay", wav) - mock_run.assert_not_called() - - def test_falls_back_to_aplay_then_speaker_test( - self, - tmp_path: pathlib.Path, - ) -> None: - """paplay+aplay fail → speaker-test is tried.""" - wav = tmp_path / "tone.wav" - wav.write_bytes(b"\x00") - with ( - patch( - "python_pkg.wake_alarm._alarm._ensure_tone_wav", - return_value=wav, - ), - patch( - "python_pkg.wake_alarm._alarm._try_player", - return_value=False, - ), - patch( - "python_pkg.wake_alarm._alarm._speaker_test_path", - return_value="/usr/bin/speaker-test", - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - ) as mock_run, - ): - _play_tone(1000) - mock_run.assert_called_once() - args = mock_run.call_args[0][0] - assert "/usr/bin/speaker-test" in args - assert "1000" in args - - def test_soft_beep_when_speaker_test_missing( - self, - tmp_path: pathlib.Path, - ) -> None: - """All players fail → soft beep.""" - wav = tmp_path / "tone.wav" - wav.write_bytes(b"\x00") - with ( - patch( - "python_pkg.wake_alarm._alarm._ensure_tone_wav", - return_value=wav, - ), - patch( - "python_pkg.wake_alarm._alarm._try_player", - return_value=False, - ), - patch( - "python_pkg.wake_alarm._alarm._speaker_test_path", - side_effect=FileNotFoundError("missing"), - ), - patch("python_pkg.wake_alarm._alarm._beep_soft") as mock_soft, - ): - _play_tone(800) - mock_soft.assert_called_once() - - def test_soft_beep_when_speaker_test_times_out( - self, - tmp_path: pathlib.Path, - ) -> None: - """speaker-test TimeoutExpired → soft beep.""" - wav = tmp_path / "tone.wav" - wav.write_bytes(b"\x00") - with ( - patch( - "python_pkg.wake_alarm._alarm._ensure_tone_wav", - return_value=wav, - ), - patch( - "python_pkg.wake_alarm._alarm._try_player", - return_value=False, - ), - patch( - "python_pkg.wake_alarm._alarm._speaker_test_path", - return_value="/usr/bin/speaker-test", - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - side_effect=subprocess.TimeoutExpired("speaker-test", 6), - ), - patch("python_pkg.wake_alarm._alarm._beep_soft") as mock_soft, - ): - _play_tone(800) - mock_soft.assert_called_once() - - def test_soft_beep_when_wav_generation_fails(self) -> None: - """OSError generating WAV → soft beep.""" - with ( - patch( - "python_pkg.wake_alarm._alarm._ensure_tone_wav", - side_effect=OSError("disk full"), - ), - patch("python_pkg.wake_alarm._alarm._beep_soft") as mock_soft, - ): - _play_tone(440) - mock_soft.assert_called_once() - - -class TestWarnIfNoRealSink: - """Tests for _warn_if_no_real_sink.""" - - def test_logs_when_pactl_missing(self) -> None: - """No pactl on PATH → warns and returns.""" - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value=None, - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - ) as mock_run, - ): - _warn_if_no_real_sink() - mock_run.assert_not_called() - - def test_warns_when_only_auto_null(self) -> None: - """Only auto_null sink → warning is emitted.""" - result = MagicMock() - result.stdout = b"4319\tauto_null\tPipeWire\tfloat32le 2ch 48000Hz\tIDLE\n" - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/pactl", - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - return_value=result, - ), - patch("python_pkg.wake_alarm._alarm._logger") as mock_log, - ): - _warn_if_no_real_sink() - mock_log.warning.assert_called() - - def test_info_when_real_sink_present(self) -> None: - """A non-auto_null sink → info log, no warning.""" - result = MagicMock() - result.stdout = b"1\talsa_output.pci-0000_01_00.1.hdmi-stereo\tPipeWire\t-\t-\n" - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/pactl", - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - return_value=result, - ), - patch("python_pkg.wake_alarm._alarm._logger") as mock_log, - ): - _warn_if_no_real_sink() - mock_log.info.assert_called() - mock_log.warning.assert_not_called() - - def test_handles_pactl_failure(self) -> None: - """OSError/TimeoutExpired running pactl → warning, no raise.""" - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/pactl", - ), - patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - side_effect=subprocess.TimeoutExpired("pactl", 5), - ), - ): - _warn_if_no_real_sink() # must not raise - - -class TestAlarmSinkPresent: - """Tests for _alarm_sink_present.""" - - def test_true_when_sink_listed(self) -> None: - """Returns True when the alarm sink name appears in pactl output.""" - from python_pkg.wake_alarm._constants import ALARM_AUDIO_SINK - - proc = MagicMock(stdout=ALARM_AUDIO_SINK.encode() + b"\tPipeWire\n") - with patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - return_value=proc, - ): - assert _alarm_sink_present("/usr/bin/pactl") is True - - def test_false_when_sink_absent(self) -> None: - """Returns False when the alarm sink is not in pactl output.""" - proc = MagicMock(stdout=b"auto_null\tPipeWire\n") - with patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - return_value=proc, - ): - assert _alarm_sink_present("/usr/bin/pactl") is False - - def test_false_on_subprocess_error(self) -> None: - """OSError while listing sinks → False, no raise.""" - with patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - side_effect=OSError("boom"), - ): - assert _alarm_sink_present("/usr/bin/pactl") is False - - -class TestCurrentDefaultSink: - """Tests for _current_default_sink.""" - - def test_returns_sink_name(self) -> None: - """Returns the trimmed default sink name.""" - proc = MagicMock(stdout=b"jbl_sink\n") - with patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - return_value=proc, - ): - assert _current_default_sink("/usr/bin/pactl") == "jbl_sink" - - def test_returns_none_when_empty(self) -> None: - """Empty output → None.""" - proc = MagicMock(stdout=b"\n") - with patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - return_value=proc, - ): - assert _current_default_sink("/usr/bin/pactl") is None - - def test_returns_none_on_error(self) -> None: - """TimeoutExpired → None, no raise.""" - with patch( - "python_pkg.wake_alarm._alarm.subprocess.run", - side_effect=subprocess.TimeoutExpired("pactl", 3), - ): - assert _current_default_sink("/usr/bin/pactl") is None - - -class TestActivateAlarmAudio: - """Tests for _activate_alarm_audio.""" - - def test_returns_none_when_pactl_missing(self) -> None: - """No pactl on PATH → returns None without touching audio.""" - with ( - patch("python_pkg.wake_alarm._alarm.shutil.which", return_value=None), - patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run, - ): - assert _activate_alarm_audio() is None - mock_run.assert_not_called() - - def test_activates_and_returns_old_default(self) -> None: - """Sink present → routes audio there and returns prior default sink.""" - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/pactl", - ), - patch( - "python_pkg.wake_alarm._alarm._alarm_sink_present", - return_value=True, - ), - patch( - "python_pkg.wake_alarm._alarm._current_default_sink", - return_value="jbl_sink", - ), - patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run, - ): - result = _activate_alarm_audio() - assert result == "jbl_sink" - cmds = [call.args[0] for call in mock_run.call_args_list] - from python_pkg.wake_alarm._constants import ( - ALARM_AUDIO_CARD, - ALARM_AUDIO_PROFILE, - ALARM_AUDIO_SINK, - ) - - assert [ - "/usr/bin/pactl", - "set-card-profile", - ALARM_AUDIO_CARD, - ALARM_AUDIO_PROFILE, - ] in cmds - assert ["/usr/bin/pactl", "set-default-sink", ALARM_AUDIO_SINK] in cmds - - def test_returns_none_when_sink_never_appears(self) -> None: - """Sink never shows up → returns None after polling (no raise).""" - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/pactl", - ), - patch( - "python_pkg.wake_alarm._alarm._alarm_sink_present", - return_value=False, - ), - patch("python_pkg.wake_alarm._alarm.time.sleep") as mock_sleep, - patch("python_pkg.wake_alarm._alarm.subprocess.run"), - ): - assert _activate_alarm_audio() is None - mock_sleep.assert_called() - - def test_waits_then_succeeds(self) -> None: - """Sink absent then present → sleeps once, then routes audio.""" - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/pactl", - ), - patch( - "python_pkg.wake_alarm._alarm._alarm_sink_present", - side_effect=[False, True], - ), - patch( - "python_pkg.wake_alarm._alarm._current_default_sink", - return_value="old", - ), - patch("python_pkg.wake_alarm._alarm.time.sleep") as mock_sleep, - patch("python_pkg.wake_alarm._alarm.subprocess.run"), - ): - assert _activate_alarm_audio() == "old" - mock_sleep.assert_called_once() - - -class TestRestoreAlarmAudio: - """Tests for _restore_alarm_audio.""" - - def test_none_is_noop(self) -> None: - """None default → does nothing, no pactl lookup.""" - with patch("python_pkg.wake_alarm._alarm.shutil.which") as mock_which: - _restore_alarm_audio(None) - mock_which.assert_not_called() - - def test_no_pactl_returns_silently(self) -> None: - """Default present but pactl missing → no raise, no run.""" - with ( - patch("python_pkg.wake_alarm._alarm.shutil.which", return_value=None), - patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run, - ): - _restore_alarm_audio("jbl_sink") - mock_run.assert_not_called() - - def test_restores_default_sink(self) -> None: - """Calls set-default-sink with the captured prior default.""" - with ( - patch( - "python_pkg.wake_alarm._alarm.shutil.which", - return_value="/usr/bin/pactl", - ), - patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run, - ): - _restore_alarm_audio("jbl_sink") - cmds = [call.args[0] for call in mock_run.call_args_list] - assert ["/usr/bin/pactl", "set-default-sink", "jbl_sink"] in cmds - - -class TestParseArgs: - """Tests for _parse_args.""" - - def test_default_flags_are_false(self) -> None: - """No CLI args means every flag is False.""" - ns = _parse_args([]) - assert ns.demo is False - assert ns.trigger_now is False - assert ns.production is False - - def test_flags_parse(self) -> None: - """Each flag flips to True when passed.""" - ns = _parse_args(["--production", "--demo", "--trigger-now"]) - assert ns.production is True - assert ns.demo is True - assert ns.trigger_now is True diff --git a/wake_alarm/tests/test_alarm_audio.py b/wake_alarm/tests/test_alarm_audio.py new file mode 100644 index 0000000..dc37875 --- /dev/null +++ b/wake_alarm/tests/test_alarm_audio.py @@ -0,0 +1,420 @@ +"""Tests for _audio.py — audio playback, fan control, and sink management.""" + +from __future__ import annotations + +import subprocess +from typing import TYPE_CHECKING +from unittest.mock import MagicMock, patch + +import pytest + +if TYPE_CHECKING: + from collections.abc import Iterator + import pathlib + +from python_pkg.wake_alarm._audio import ( + _beep_pcspkr, + _ensure_tone_wav, + _play_tone, + _set_max_brightness, + _try_player, +) + + +class TestSetMaxBrightness: + """Tests for _set_max_brightness.""" + + def test_noop_when_xrandr_missing(self) -> None: + """No xrandr on PATH → subprocess.run never called.""" + with ( + patch("python_pkg.wake_alarm._audio.shutil.which", return_value=None), + patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run, + ): + _set_max_brightness() + mock_run.assert_not_called() + + def test_noop_on_oserror_from_query(self) -> None: + """OSError from xrandr --query is suppressed.""" + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/xrandr", + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + side_effect=OSError("no display"), + ), + ): + _set_max_brightness() # must not raise + + def test_noop_on_timeout_from_query(self) -> None: + """TimeoutExpired from xrandr --query is suppressed.""" + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/xrandr", + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + side_effect=subprocess.TimeoutExpired("xrandr", 5), + ), + ): + _set_max_brightness() # must not raise + + def test_sets_brightness_for_connected_displays(self) -> None: + """Connected displays each get an --output --brightness call.""" + mock_query_result = MagicMock() + mock_query_result.stdout = ( + "HDMI-0 connected 2560x1440+0+0\nDP-0 connected primary\n" + ) + call_args_list: list[list[str]] = [] + + def fake_run(args: list[str], **_kwargs: object) -> MagicMock: + call_args_list.append(args) + return mock_query_result + + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/xrandr", + ), + patch("python_pkg.wake_alarm._audio.subprocess.run", side_effect=fake_run), + ): + _set_max_brightness() + + # First call is --query; subsequent calls set brightness for each output. + brightness_calls = [a for a in call_args_list if "--brightness" in a] + expected_brightness_calls = 2 + assert len(brightness_calls) == expected_brightness_calls + + def test_skips_disconnected_outputs(self) -> None: + """Disconnected outputs do NOT get a brightness call.""" + mock_result = MagicMock() + mock_result.stdout = "Screen 0: minimum 320\nHDMI-0 disconnected\n" + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/xrandr", + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + return_value=mock_result, + ) as mock_run, + ): + _set_max_brightness() + # Only the --query call, no brightness calls. + assert mock_run.call_count == 1 + + def test_warns_when_brightness_call_fails(self) -> None: + """OSError on per-output --brightness call is logged but swallowed.""" + query_result = MagicMock() + query_result.stdout = ( + "Screen 0: minimum 320\nHDMI-0 connected primary 1920x1080\n" + ) + + def _run_side_effect(args: list[str], **_kwargs: object) -> MagicMock: + if "--query" in args: + return query_result + msg = "permission denied" + raise OSError(msg) + + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/xrandr", + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + side_effect=_run_side_effect, + ), + ): + _set_max_brightness() # must not raise + + +class TestEnsureToneWav: + """Tests for _ensure_tone_wav (sine WAV generator + cache).""" + + def test_generates_and_caches(self, tmp_path: pathlib.Path) -> None: + """First call generates the WAV; second call returns the cached path.""" + from python_pkg.wake_alarm import _audio as alarm_mod + + alarm_mod._TONE_CACHE.clear() + with patch( + "python_pkg.wake_alarm._audio.tempfile.gettempdir", + return_value=str(tmp_path), + ): + path1 = _ensure_tone_wav(440) + assert path1.exists() + size = path1.stat().st_size + assert size > 0 + # Second call must hit the cache (no regeneration). + with patch("python_pkg.wake_alarm._audio.wave.open") as mock_open: + path2 = _ensure_tone_wav(440) + mock_open.assert_not_called() + assert path2 == path1 + alarm_mod._TONE_CACHE.clear() + + def test_regenerates_when_cached_file_missing( + self, + tmp_path: pathlib.Path, + ) -> None: + """If the cached file was deleted, regenerate it.""" + from python_pkg.wake_alarm._audio import _TONE_CACHE + + _TONE_CACHE.clear() + with patch( + "python_pkg.wake_alarm._audio.tempfile.gettempdir", + return_value=str(tmp_path), + ): + path1 = _ensure_tone_wav(880) + path1.unlink() + path2 = _ensure_tone_wav(880) + assert path2.exists() + _TONE_CACHE.clear() + + +class TestTryPlayer: + """Tests for _try_player.""" + + def test_returns_false_when_binary_missing( + self, + tmp_path: pathlib.Path, + ) -> None: + """Missing binary returns False without raising.""" + wav = tmp_path / "x.wav" + wav.write_bytes(b"\x00") + with patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value=None, + ): + assert _try_player("paplay", wav) is False + + def test_returns_true_on_success(self, tmp_path: pathlib.Path) -> None: + """Zero exit code returns True.""" + wav = tmp_path / "x.wav" + wav.write_bytes(b"\x00") + result = MagicMock() + result.returncode = 0 + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/paplay", + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + return_value=result, + ), + ): + assert _try_player("paplay", wav) is True + + def test_returns_false_on_nonzero_exit( + self, + tmp_path: pathlib.Path, + ) -> None: + """Non-zero exit code returns False and logs.""" + wav = tmp_path / "x.wav" + wav.write_bytes(b"\x00") + result = MagicMock() + result.returncode = 1 + result.stderr = b"boom" + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/paplay", + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + return_value=result, + ), + ): + assert _try_player("paplay", wav) is False + + def test_returns_false_on_timeout(self, tmp_path: pathlib.Path) -> None: + """TimeoutExpired returns False and logs.""" + wav = tmp_path / "x.wav" + wav.write_bytes(b"\x00") + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/paplay", + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + side_effect=subprocess.TimeoutExpired("paplay", 6), + ), + ): + assert _try_player("paplay", wav) is False + + def test_returns_false_on_oserror(self, tmp_path: pathlib.Path) -> None: + """OSError returns False and logs.""" + wav = tmp_path / "x.wav" + wav.write_bytes(b"\x00") + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/paplay", + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + side_effect=OSError("nope"), + ), + ): + assert _try_player("paplay", wav) is False + + +class TestBeepPcspkr: + """Tests for _beep_pcspkr (evdev PC speaker).""" + + def test_writes_tone_then_zero_to_device(self) -> None: + """Successful path writes start-frequency then stop event.""" + + mock_dev = MagicMock() + mock_open_ctx = MagicMock() + mock_open_ctx.__enter__.return_value = mock_dev + mock_open_ctx.__exit__.return_value = False + with ( + patch( + "python_pkg.wake_alarm._audio.Path.open", + return_value=mock_open_ctx, + ), + patch("python_pkg.wake_alarm._audio.time.sleep"), + ): + _beep_pcspkr(1000, 0.05) + # First write carries the frequency, second write carries 0 (stop). + assert mock_dev.write.call_count == 2 + + def test_oserror_is_swallowed(self) -> None: + """OSError opening the device must not raise.""" + + with patch( + "python_pkg.wake_alarm._audio.Path.open", + side_effect=OSError("no device"), + ): + _beep_pcspkr(1000, 0.05) # must not raise + + +class TestPlayTone: + """Tests for _play_tone.""" + + @pytest.fixture(autouse=True) + def _silence_pcspkr(self) -> Iterator[None]: + """Stop tests from hitting the real /dev/input PC speaker device.""" + with patch("python_pkg.wake_alarm._audio._beep_pcspkr"): + yield + + def test_paplay_success_short_circuits(self, tmp_path: pathlib.Path) -> None: + """If paplay succeeds, no further players are tried.""" + wav = tmp_path / "tone.wav" + wav.write_bytes(b"\x00") + with ( + patch( + "python_pkg.wake_alarm._audio._ensure_tone_wav", + return_value=wav, + ), + patch( + "python_pkg.wake_alarm._audio._try_player", + return_value=True, + ) as mock_try, + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + ) as mock_run, + ): + _play_tone(440) + mock_try.assert_called_once_with("paplay", wav) + mock_run.assert_not_called() + + def test_falls_back_to_aplay_then_speaker_test( + self, + tmp_path: pathlib.Path, + ) -> None: + """paplay+aplay fail → speaker-test is tried.""" + wav = tmp_path / "tone.wav" + wav.write_bytes(b"\x00") + with ( + patch( + "python_pkg.wake_alarm._audio._ensure_tone_wav", + return_value=wav, + ), + patch( + "python_pkg.wake_alarm._audio._try_player", + return_value=False, + ), + patch( + "python_pkg.wake_alarm._audio._speaker_test_path", + return_value="/usr/bin/speaker-test", + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + ) as mock_run, + ): + _play_tone(1000) + mock_run.assert_called_once() + args = mock_run.call_args[0][0] + assert "/usr/bin/speaker-test" in args + assert "1000" in args + + def test_soft_beep_when_speaker_test_missing( + self, + tmp_path: pathlib.Path, + ) -> None: + """All players fail → soft beep.""" + wav = tmp_path / "tone.wav" + wav.write_bytes(b"\x00") + with ( + patch( + "python_pkg.wake_alarm._audio._ensure_tone_wav", + return_value=wav, + ), + patch( + "python_pkg.wake_alarm._audio._try_player", + return_value=False, + ), + patch( + "python_pkg.wake_alarm._audio._speaker_test_path", + side_effect=FileNotFoundError("missing"), + ), + patch("python_pkg.wake_alarm._audio._beep_soft") as mock_soft, + ): + _play_tone(800) + mock_soft.assert_called_once() + + def test_soft_beep_when_speaker_test_times_out( + self, + tmp_path: pathlib.Path, + ) -> None: + """speaker-test TimeoutExpired → soft beep.""" + wav = tmp_path / "tone.wav" + wav.write_bytes(b"\x00") + with ( + patch( + "python_pkg.wake_alarm._audio._ensure_tone_wav", + return_value=wav, + ), + patch( + "python_pkg.wake_alarm._audio._try_player", + return_value=False, + ), + patch( + "python_pkg.wake_alarm._audio._speaker_test_path", + return_value="/usr/bin/speaker-test", + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + side_effect=subprocess.TimeoutExpired("speaker-test", 6), + ), + patch("python_pkg.wake_alarm._audio._beep_soft") as mock_soft, + ): + _play_tone(800) + mock_soft.assert_called_once() + + def test_soft_beep_when_wav_generation_fails(self) -> None: + """OSError generating WAV → soft beep.""" + with ( + patch( + "python_pkg.wake_alarm._audio._ensure_tone_wav", + side_effect=OSError("disk full"), + ), + patch("python_pkg.wake_alarm._audio._beep_soft") as mock_soft, + ): + _play_tone(440) + mock_soft.assert_called_once() diff --git a/wake_alarm/tests/test_alarm_challenges.py b/wake_alarm/tests/test_alarm_challenges.py new file mode 100644 index 0000000..d574268 --- /dev/null +++ b/wake_alarm/tests/test_alarm_challenges.py @@ -0,0 +1,133 @@ +"""Tests for _challenges.py — dismiss challenge generators.""" + +from __future__ import annotations + +from unittest.mock import patch + +from python_pkg.wake_alarm._challenges import ( + _DISMISS_CHARS, + _Challenge, + _make_challenge, + _make_flash_challenge, + _make_math_challenge, + _make_sort_challenge, +) +from python_pkg.wake_alarm._constants import DISMISS_FLASH_SECONDS + + +class TestMakeMathChallenge: + """Tests for _make_math_challenge.""" + + def test_kind_is_math(self) -> None: + """Challenge kind is always 'math'.""" + assert _make_math_challenge().kind == "math" + + def test_answer_is_correct_for_addition(self) -> None: + """Stored answer is numerically correct for addition.""" + with ( + patch("python_pkg.wake_alarm._challenges.secrets.choice", return_value="+"), + patch( + "python_pkg.wake_alarm._challenges.secrets.randbelow", + side_effect=[13, 35], # 10+13=23, 10+35=45 + ), + ): + ch = _make_math_challenge() + assert ch.display == "23 + 45 = ?" + assert ch.answer == "68" + + def test_answer_is_correct_for_subtraction(self) -> None: + """Stored answer is numerically correct for subtraction.""" + with ( + patch("python_pkg.wake_alarm._challenges.secrets.choice", return_value="-"), + patch( + "python_pkg.wake_alarm._challenges.secrets.randbelow", + side_effect=[30, 7], # 20+30=50, 10+7=17 + ), + ): + ch = _make_math_challenge() + assert ch.display == "50 - 17 = ?" + assert ch.answer == "33" + + def test_answer_is_correct_for_multiplication(self) -> None: + """Stored answer is numerically correct for multiplication.""" + with ( + patch("python_pkg.wake_alarm._challenges.secrets.choice", return_value="*"), + patch( + "python_pkg.wake_alarm._challenges.secrets.randbelow", + side_effect=[3, 4], # 12+3=15, 3+4=7 + ), + ): + ch = _make_math_challenge() + assert ch.display == "15 * 7 = ?" + assert ch.answer == "105" + + def test_answer_varies_across_calls(self) -> None: + """Multiple calls produce varied answers (probabilistic).""" + answers = {_make_math_challenge().answer for _ in range(30)} + assert len(answers) > 1 + + +class TestMakeSortChallenge: + """Tests for _make_sort_challenge.""" + + def test_kind_is_sort(self) -> None: + """Challenge kind is always 'sort'.""" + assert _make_sort_challenge().kind == "sort" + + def test_answer_is_sorted_digits(self) -> None: + """Answer equals the digits in display sorted ascending.""" + ch = _make_sort_challenge() + displayed_digits = [int(c) for c in ch.display if c.isdigit()] + expected = "".join(str(d) for d in sorted(displayed_digits)) + assert ch.answer == expected + + def test_display_contains_six_digits(self) -> None: + """Display always contains exactly six digit characters.""" + ch = _make_sort_challenge() + assert len([c for c in ch.display if c.isdigit()]) == 6 + + def test_answer_varies_across_calls(self) -> None: + """Multiple calls produce varied digit sets.""" + answers = {_make_sort_challenge().answer for _ in range(30)} + assert len(answers) > 1 + + +class TestMakeFlashChallenge: + """Tests for _make_flash_challenge.""" + + def test_kind_is_flash(self) -> None: + """Challenge kind is always 'flash'.""" + assert _make_flash_challenge().kind == "flash" + + def test_display_equals_answer(self) -> None: + """Display and answer are identical (the user must recall the full code).""" + ch = _make_flash_challenge() + assert ch.display == ch.answer + + def test_code_uses_dismiss_chars(self) -> None: + """Generated code only contains chars from _DISMISS_CHARS.""" + ch = _make_flash_challenge() + assert all(c in _DISMISS_CHARS for c in ch.answer) + + def test_hint_mentions_flash_seconds(self) -> None: + """Hint text includes the number of visible seconds.""" + ch = _make_flash_challenge() + assert str(DISMISS_FLASH_SECONDS) in ch.hint + + +class TestMakeChallenge: + """Tests for _make_challenge (the random dispatcher).""" + + def test_returns_a_challenge(self) -> None: + """Returns a _Challenge instance with all expected fields populated.""" + ch = _make_challenge() + assert isinstance(ch, _Challenge) + assert ch.kind in ("math", "flash", "sort") + assert ch.display + assert ch.answer + assert ch.hint + + def test_all_types_reachable(self) -> None: + """All three challenge types appear across many calls.""" + kinds = {_make_challenge().kind for _ in range(200)} + assert kinds == {"math", "flash", "sort"} diff --git a/wake_alarm/tests/test_alarm_part2.py b/wake_alarm/tests/test_alarm_part2.py index e525f15..44ac32d 100644 --- a/wake_alarm/tests/test_alarm_part2.py +++ b/wake_alarm/tests/test_alarm_part2.py @@ -15,10 +15,6 @@ from python_pkg.wake_alarm._alarm import ( WakeAlarm, main, ) -from python_pkg.wake_alarm._constants import ( - PHASE_MEDIUM_END, - PHASE_SOFT_END, -) # --------------------------------------------------------------------------- # Helpers (duplicated from part 1 so this file is self-contained) @@ -123,37 +119,77 @@ class TestWakeAlarmInit: class TestWakeAlarmDismiss: """Tests for alarm dismiss logic.""" - def test_correct_code_dismisses( + def test_correct_code_dismisses_after_all_rounds( self, mock_tk_module: MagicMock, ) -> None: - """Entering the correct code dismisses the alarm.""" + """Entering the correct answer for every required round dismisses the alarm.""" + from python_pkg.wake_alarm._constants import DISMISS_ROUNDS_REQUIRED + alarm = WakeAlarm(demo_mode=True) - code = alarm._current_code mock_entry = mock_tk_module.Entry.return_value - mock_entry.get.return_value = code with patch( "python_pkg.wake_alarm._alarm.save_wake_state", ) as mock_save: - alarm._on_submit() + for _ in range(DISMISS_ROUNDS_REQUIRED): + mock_entry.get.return_value = alarm._current_challenge.answer + alarm._on_submit() assert alarm.dismissed is True mock_save.assert_called_once() - call_kwargs = mock_save.call_args[1] - assert call_kwargs["skip_workout"] is True + assert mock_save.call_args[1]["skip_workout"] is True + alarm._stop_beep.set() + + def test_first_round_correct_does_not_dismiss( + self, + mock_tk_module: MagicMock, + ) -> None: + """A single correct entry is not enough — DISMISS_ROUNDS_REQUIRED is 2+.""" + alarm = WakeAlarm(demo_mode=True) + mock_entry = mock_tk_module.Entry.return_value + mock_entry.get.return_value = alarm._current_challenge.answer + + alarm._on_submit() + + assert alarm.dismissed is False + assert alarm._rounds_completed == 1 + alarm._stop_beep.set() + + def test_first_round_correct_non_flash_next_no_countdown( + self, + mock_tk_module: MagicMock, + ) -> None: + """When next challenge is math, no flash countdown is started.""" + from python_pkg.wake_alarm._challenges import _Challenge + + alarm = WakeAlarm(demo_mode=True) + mock_entry = mock_tk_module.Entry.return_value + mock_entry.get.return_value = alarm._current_challenge.answer + next_math = _Challenge(kind="math", display="2 + 2 = ?", answer="4", hint="x") + with patch( + "python_pkg.wake_alarm._alarm._make_challenge", return_value=next_math + ): + alarm._on_submit() + + assert alarm._current_challenge.kind == "math" + assert alarm.dismissed is False alarm._stop_beep.set() def test_wrong_code_does_not_dismiss( self, mock_tk_module: MagicMock, ) -> None: - """Entering the wrong code shows error without dismissing.""" + """Entering the wrong answer shows an error without dismissing.""" + from python_pkg.wake_alarm._alarm import _Challenge + alarm = WakeAlarm(demo_mode=True) + # Use a pinned math challenge so the non-flash wrong-answer branch is covered. + alarm._current_challenge = _Challenge( + kind="math", display="2 + 2 = ?", answer="4", hint="test" + ) mock_entry = mock_tk_module.Entry.return_value - mock_entry.get.return_value = "000000" - # Ensure current code is different - alarm._current_code = "123456" + mock_entry.get.return_value = "99" alarm._on_submit() @@ -201,17 +237,19 @@ class TestWakeAlarmDismiss: self, mock_tk_module: MagicMock, ) -> None: - """Typing the code after the skip window stops the alarm w/o a skip.""" + """Typing all rounds after the skip window stops the alarm without a skip.""" + from python_pkg.wake_alarm._constants import DISMISS_ROUNDS_REQUIRED + alarm = WakeAlarm(demo_mode=True) alarm._skip_earnable = False - code = alarm._current_code mock_entry = mock_tk_module.Entry.return_value - mock_entry.get.return_value = code with patch( "python_pkg.wake_alarm._alarm.save_wake_state", ) as mock_save: - alarm._on_submit() + for _ in range(DISMISS_ROUNDS_REQUIRED): + mock_entry.get.return_value = alarm._current_challenge.answer + alarm._on_submit() assert alarm.dismissed is True assert mock_save.call_args[1]["skip_workout"] is False @@ -278,18 +316,17 @@ class TestMain: class TestCodeRefreshAndTimer: """Tests for code refresh and timer update methods.""" - def test_code_refresh_changes_code( + def test_code_refresh_changes_challenge( self, mock_tk_module: MagicMock, ) -> None: - """Code refresh generates a new code.""" + """Code refresh generates a new challenge each call.""" alarm = WakeAlarm(demo_mode=True) - # Call refresh many times — at least one should differ - codes = set() + displays = set() for _ in range(50): alarm._schedule_code_refresh() - codes.add(alarm._current_code) - assert len(codes) > 1 + displays.add(alarm._current_challenge.display) + assert len(displays) > 1 alarm._stop_beep.set() def test_code_refresh_noop_when_not_active( @@ -299,10 +336,9 @@ class TestCodeRefreshAndTimer: """Code refresh is a no-op when alarm is no longer active.""" alarm = WakeAlarm(demo_mode=True) alarm._active = False - old_code = alarm._current_code + old_challenge = alarm._current_challenge alarm._schedule_code_refresh() - # Code doesn't change because _active=False causes early return - assert alarm._current_code == old_code + assert alarm._current_challenge is old_challenge alarm._stop_beep.set() def test_update_timer_noop_when_not_active( @@ -431,172 +467,3 @@ class TestScreenFlash: mock_root.configure.assert_not_called() mock_root.after.assert_not_called() alarm._stop_beep.set() - - -class TestDismissWithoutSkip: - """Tests for alarm dismiss without earning skip.""" - - def test_dismiss_without_skip_shows_no_skip_message( - self, - mock_tk_module: MagicMock, - ) -> None: - """Dismissing with earned_skip=False shows appropriate message.""" - alarm = WakeAlarm(demo_mode=True) - # Simulate existing child widgets - mock_widget = MagicMock() - alarm._container.winfo_children.return_value = [mock_widget] - - with patch( - "python_pkg.wake_alarm._alarm.save_wake_state", - ) as mock_save: - alarm._dismiss_alarm(earned_skip=False) - - assert alarm.dismissed is True - mock_save.assert_called_once() - call_kwargs = mock_save.call_args[1] - assert call_kwargs["skip_workout"] is False - mock_widget.destroy.assert_called_once() - alarm._stop_beep.set() - - -class TestSkipWindowExpiredMessage: - """Tests for the on-screen message when the skip window expires.""" - - def test_expired_updates_status_label( - self, - mock_tk_module: MagicMock, - ) -> None: - """Expiry updates the status label instead of closing the alarm.""" - del mock_tk_module - alarm = WakeAlarm(demo_mode=True) - - alarm._on_skip_window_expired() - - alarm._status_label.configure.assert_called_with( - text="No workout skip today.", - ) - alarm._stop_beep.set() - - -class TestBeepLoopPhases: - """Tests for different beep loop escalation phases.""" - - def test_medium_phase( - self, - mock_tk_module: MagicMock, - ) -> None: - """Beep loop enters medium phase after PHASE_SOFT_END minutes.""" - alarm = WakeAlarm(demo_mode=True) - # Set alarm start to make elapsed > PHASE_SOFT_END minutes - import time as time_mod - - alarm._alarm_start = time_mod.monotonic() - (PHASE_SOFT_END + 1) * 60 - - call_count = 0 - - def stop_after_one(*_args: object, **_kwargs: object) -> None: - nonlocal call_count - call_count += 1 - if call_count >= 1: - alarm._stop_beep.set() - - with ( - patch( - "python_pkg.wake_alarm._alarm._beep_medium", - side_effect=stop_after_one, - ) as mock_beep, - ): - alarm._beep_loop() - - mock_beep.assert_called() - alarm._stop_beep.set() - - def test_loud_phase( - self, - mock_tk_module: MagicMock, - ) -> None: - """Beep loop enters loud phase after PHASE_MEDIUM_END minutes.""" - alarm = WakeAlarm(demo_mode=True) - import time as time_mod - - alarm._alarm_start = time_mod.monotonic() - (PHASE_MEDIUM_END + 1) * 60 - - call_count = 0 - - def stop_after_one(*_args: object, **_kwargs: object) -> None: - nonlocal call_count - call_count += 1 - if call_count >= 1: - alarm._stop_beep.set() - - with ( - patch( - "python_pkg.wake_alarm._alarm._beep_loud", - side_effect=stop_after_one, - ) as mock_beep, - ): - alarm._beep_loop() - - mock_beep.assert_called() - alarm._stop_beep.set() - - -class TestRunMethod: - """Tests for the run() method.""" - - def test_run_calls_mainloop( - self, - mock_tk_module: MagicMock, - ) -> None: - """run() calls root.mainloop().""" - alarm = WakeAlarm(demo_mode=True) - alarm.run() - alarm.root.mainloop.assert_called_once() - alarm._stop_beep.set() - - -class TestUpdateTimerActive: - """Tests for timer update when alarm is active.""" - - def test_update_timer_shows_skip_window( - self, - mock_tk_module: MagicMock, - ) -> None: - """While the skip is earnable, the timer shows the skip-window count.""" - del mock_tk_module - alarm = WakeAlarm(demo_mode=True) - alarm._update_timer() - text = alarm._timer_label.configure.call_args[1]["text"] - assert text.startswith("Skip window:") - alarm._stop_beep.set() - - def test_update_timer_shows_prompt_after_window( - self, - mock_tk_module: MagicMock, - ) -> None: - """After the window the timer shows the silence prompt and keeps going.""" - import time as time_mod - - alarm = WakeAlarm(demo_mode=True) - # Far in the past so remaining == 0 -> the else branch. - alarm._alarm_start = time_mod.monotonic() - 60 * 60 - alarm.root.after.reset_mock() - alarm._update_timer() - text = alarm._timer_label.configure.call_args[1]["text"] - assert "type the code" in text - # The alarm keeps nagging: it always reschedules while active. - alarm.root.after.assert_called_once() - alarm._stop_beep.set() - - def test_update_timer_noop_when_not_active( - self, - mock_tk_module: MagicMock, - ) -> None: - """Timer update is a no-op once the alarm is no longer active.""" - del mock_tk_module - alarm = WakeAlarm(demo_mode=True) - alarm._active = False - alarm._timer_label.configure.reset_mock() - alarm._update_timer() - alarm._timer_label.configure.assert_not_called() - alarm._stop_beep.set() diff --git a/wake_alarm/tests/test_alarm_part3.py b/wake_alarm/tests/test_alarm_part3.py new file mode 100644 index 0000000..fa54a14 --- /dev/null +++ b/wake_alarm/tests/test_alarm_part3.py @@ -0,0 +1,340 @@ +"""Tests for WakeAlarm — beep loop phases, run, update timer, and flash challenge.""" + +from __future__ import annotations + +import tkinter as tk +from typing import TYPE_CHECKING +from unittest.mock import MagicMock, patch + +import pytest + +if TYPE_CHECKING: + from collections.abc import Generator + +from python_pkg.wake_alarm._alarm import ( + WakeAlarm, +) +from python_pkg.wake_alarm._constants import ( + PHASE_MEDIUM_END, + PHASE_SOFT_END, +) + + +def _make_mock_tk() -> MagicMock: + """Build a MagicMock that stands in for the tkinter module.""" + mock = MagicMock() + mock_root = MagicMock() + mock_root.winfo_screenwidth.return_value = 1920 + mock_root.winfo_screenheight.return_value = 1080 + mock.Tk.return_value = mock_root + mock.Frame.return_value = MagicMock() + mock.Label.return_value = MagicMock() + mock.Entry.return_value = MagicMock() + mock.TclError = tk.TclError + mock.END = tk.END + return mock + + +@pytest.fixture(autouse=True) +def _block_real_tk() -> Generator[MagicMock]: + """Prevent any real Tk windows in tests.""" + mock = _make_mock_tk() + with patch("python_pkg.wake_alarm._alarm.tk", mock): + yield mock + + +@pytest.fixture(autouse=True) +def _block_extra_devices() -> Generator[MagicMock]: + """Prevent real subprocess calls for extra ALSA devices and hardware.""" + with ( + patch("python_pkg.wake_alarm._alarm._play_on_extra_devices") as mock, + patch("python_pkg.wake_alarm._alarm._max_fans", return_value=False), + patch("python_pkg.wake_alarm._alarm._restore_fans"), + patch("python_pkg.wake_alarm._alarm._set_max_brightness"), + patch("python_pkg.wake_alarm._alarm._wake_display"), + patch("python_pkg.wake_alarm._alarm._warn_if_no_real_sink"), + patch("python_pkg.wake_alarm._alarm._activate_alarm_audio", return_value=None), + patch("python_pkg.wake_alarm._alarm._restore_alarm_audio"), + patch("python_pkg.wake_alarm._alarm.turn_on_plug"), + patch("python_pkg.wake_alarm._alarm.turn_off_plug"), + ): + yield mock + + +@pytest.fixture +def mock_tk_module() -> Generator[MagicMock]: + """Provide explicit access to the mocked tk module.""" + mock = _make_mock_tk() + with patch("python_pkg.wake_alarm._alarm.tk", mock): + yield mock + + +class TestBeepLoopPhases: + """Tests for different beep loop escalation phases.""" + + def test_medium_phase( + self, + mock_tk_module: MagicMock, + ) -> None: + """Beep loop enters medium phase after PHASE_SOFT_END minutes.""" + alarm = WakeAlarm(demo_mode=True) + # Set alarm start to make elapsed > PHASE_SOFT_END minutes + import time as time_mod + + alarm._alarm_start = time_mod.monotonic() - (PHASE_SOFT_END + 1) * 60 + + call_count = 0 + + def stop_after_one(*_args: object, **_kwargs: object) -> None: + nonlocal call_count + call_count += 1 + if call_count >= 1: + alarm._stop_beep.set() + + with ( + patch( + "python_pkg.wake_alarm._alarm._beep_medium", + side_effect=stop_after_one, + ) as mock_beep, + ): + alarm._beep_loop() + + mock_beep.assert_called() + alarm._stop_beep.set() + + def test_loud_phase( + self, + mock_tk_module: MagicMock, + ) -> None: + """Beep loop enters loud phase after PHASE_MEDIUM_END minutes.""" + alarm = WakeAlarm(demo_mode=True) + import time as time_mod + + alarm._alarm_start = time_mod.monotonic() - (PHASE_MEDIUM_END + 1) * 60 + + call_count = 0 + + def stop_after_one(*_args: object, **_kwargs: object) -> None: + nonlocal call_count + call_count += 1 + if call_count >= 1: + alarm._stop_beep.set() + + with ( + patch( + "python_pkg.wake_alarm._alarm._beep_loud", + side_effect=stop_after_one, + ) as mock_beep, + ): + alarm._beep_loop() + + mock_beep.assert_called() + alarm._stop_beep.set() + + +class TestRunMethod: + """Tests for the run() method.""" + + def test_run_calls_mainloop( + self, + mock_tk_module: MagicMock, + ) -> None: + """run() calls root.mainloop().""" + alarm = WakeAlarm(demo_mode=True) + alarm.run() + alarm.root.mainloop.assert_called_once() + alarm._stop_beep.set() + + +class TestUpdateTimerActive: + """Tests for timer update when alarm is active.""" + + def test_update_timer_shows_skip_window( + self, + mock_tk_module: MagicMock, + ) -> None: + """While the skip is earnable, the timer shows the skip-window count.""" + del mock_tk_module + alarm = WakeAlarm(demo_mode=True) + alarm._update_timer() + text = alarm._timer_label.configure.call_args[1]["text"] + assert text.startswith("Skip window:") + alarm._stop_beep.set() + + def test_update_timer_shows_prompt_after_window( + self, + mock_tk_module: MagicMock, + ) -> None: + """After the window the timer shows the silence prompt and keeps going.""" + import time as time_mod + + alarm = WakeAlarm(demo_mode=True) + # Far in the past so remaining == 0 -> the else branch. + alarm._alarm_start = time_mod.monotonic() - 60 * 60 + alarm.root.after.reset_mock() + alarm._update_timer() + text = alarm._timer_label.configure.call_args[1]["text"] + assert "type the code" in text + # The alarm keeps nagging: it always reschedules while active. + alarm.root.after.assert_called_once() + alarm._stop_beep.set() + + def test_update_timer_noop_when_not_active( + self, + mock_tk_module: MagicMock, + ) -> None: + """Timer update is a no-op once the alarm is no longer active.""" + del mock_tk_module + alarm = WakeAlarm(demo_mode=True) + alarm._active = False + alarm._timer_label.configure.reset_mock() + alarm._update_timer() + alarm._timer_label.configure.assert_not_called() + alarm._stop_beep.set() + + +class TestFlashChallenge: + """Tests for flash challenge countdown behaviour inside WakeAlarm.""" + + def test_flash_tick_counts_down_and_hides( + self, + mock_tk_module: MagicMock, + ) -> None: + """_flash_tick counts down per second and hides the code at zero.""" + from python_pkg.wake_alarm._alarm import _Challenge + + alarm = WakeAlarm(demo_mode=True) + alarm._current_challenge = _Challenge( + kind="flash", + display="ABCDEFGH", + answer="ABCDEFGH", + hint="Memorise", + ) + alarm._flash_remaining = 2 + alarm._status_label.configure.reset_mock() + + alarm._flash_tick() + assert alarm._flash_remaining == 1 + alarm._status_label.configure.assert_called() + + alarm._flash_tick() + assert alarm._flash_remaining == 0 + + # Final tick hides the code. + alarm._flash_tick() + # _code_label and _status_label share the same mock; inspect all calls. + all_texts = [ + c.kwargs.get("text", "") for c in alarm._code_label.configure.call_args_list + ] + assert any("?" in t for t in all_texts) + alarm._stop_beep.set() + + def test_flash_tick_noop_when_inactive( + self, + mock_tk_module: MagicMock, + ) -> None: + """_flash_tick returns immediately when the alarm is no longer active.""" + alarm = WakeAlarm(demo_mode=True) + alarm._active = False + alarm._flash_remaining = 3 + alarm._status_label.configure.reset_mock() + + alarm._flash_tick() + + alarm._status_label.configure.assert_not_called() + alarm._stop_beep.set() + + def test_wrong_flash_answer_reshows_code( + self, + mock_tk_module: MagicMock, + ) -> None: + """Wrong flash answer restores the code and restarts the countdown.""" + from python_pkg.wake_alarm._alarm import _Challenge + + alarm = WakeAlarm(demo_mode=True) + alarm._current_challenge = _Challenge( + kind="flash", + display="TESTCODE", + answer="TESTCODE", + hint="Memorise", + ) + mock_entry = mock_tk_module.Entry.return_value + mock_entry.get.return_value = "WRONGCODE" + alarm._code_label.configure.reset_mock() + + alarm._on_submit() + + assert alarm.dismissed is False + # Code label should be reconfigured (code shown again + countdown restarted). + alarm._code_label.configure.assert_called() + alarm._stop_beep.set() + + def test_next_round_flash_starts_countdown( + self, + mock_tk_module: MagicMock, + ) -> None: + """When the next-round challenge is flash, the countdown starts immediately.""" + from python_pkg.wake_alarm._alarm import _Challenge + + alarm = WakeAlarm(demo_mode=True) + alarm._current_challenge = _Challenge( + kind="math", display="2 + 2 = ?", answer="4", hint="test" + ) + next_flash = _Challenge( + kind="flash", display="ABCDEFGH", answer="ABCDEFGH", hint="Memorise" + ) + mock_entry = mock_tk_module.Entry.return_value + mock_entry.get.return_value = "4" + + with patch( + "python_pkg.wake_alarm._alarm._make_challenge", return_value=next_flash + ): + alarm._on_submit() + + assert alarm._current_challenge.kind == "flash" + assert alarm.dismissed is False + alarm._stop_beep.set() + + +class TestDismissWithoutSkip: + """Tests for alarm dismiss without earning skip.""" + + def test_dismiss_without_skip_shows_no_skip_message( + self, + mock_tk_module: MagicMock, + ) -> None: + """Dismissing with earned_skip=False shows appropriate message.""" + del mock_tk_module + alarm = WakeAlarm(demo_mode=True) + mock_widget = MagicMock() + alarm._container.winfo_children.return_value = [mock_widget] + + with patch( + "python_pkg.wake_alarm._alarm.save_wake_state", + ) as mock_save: + alarm._dismiss_alarm(earned_skip=False) + + assert alarm.dismissed is True + mock_save.assert_called_once() + assert mock_save.call_args[1]["skip_workout"] is False + mock_widget.destroy.assert_called_once() + alarm._stop_beep.set() + + +class TestSkipWindowExpiredMessage: + """Tests for the on-screen message when the skip window expires.""" + + def test_expired_updates_status_label( + self, + mock_tk_module: MagicMock, + ) -> None: + """Expiry updates the status label instead of closing the alarm.""" + del mock_tk_module + alarm = WakeAlarm(demo_mode=True) + + alarm._on_skip_window_expired() + + alarm._status_label.configure.assert_called_with( + text="No workout skip today.", + ) + alarm._stop_beep.set() diff --git a/wake_alarm/tests/test_alarm_sinks.py b/wake_alarm/tests/test_alarm_sinks.py new file mode 100644 index 0000000..0efb964 --- /dev/null +++ b/wake_alarm/tests/test_alarm_sinks.py @@ -0,0 +1,281 @@ +"""Tests for sink management and parse_args in wake alarm.""" + +from __future__ import annotations + +import subprocess +from unittest.mock import MagicMock, patch + +from python_pkg.wake_alarm._alarm import _parse_args +from python_pkg.wake_alarm._audio import ( + _activate_alarm_audio, + _alarm_sink_present, + _current_default_sink, + _restore_alarm_audio, + _warn_if_no_real_sink, +) + + +class TestWarnIfNoRealSink: + """Tests for _warn_if_no_real_sink.""" + + def test_logs_when_pactl_missing(self) -> None: + """No pactl on PATH → warns and returns.""" + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value=None, + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + ) as mock_run, + ): + _warn_if_no_real_sink() + mock_run.assert_not_called() + + def test_warns_when_only_auto_null(self) -> None: + """Only auto_null sink → warning is emitted.""" + result = MagicMock() + result.stdout = b"4319\tauto_null\tPipeWire\tfloat32le 2ch 48000Hz\tIDLE\n" + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/pactl", + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + return_value=result, + ), + patch("python_pkg.wake_alarm._audio._logger") as mock_log, + ): + _warn_if_no_real_sink() + mock_log.warning.assert_called() + + def test_info_when_real_sink_present(self) -> None: + """A non-auto_null sink → info log, no warning.""" + result = MagicMock() + result.stdout = b"1\talsa_output.pci-0000_01_00.1.hdmi-stereo\tPipeWire\t-\t-\n" + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/pactl", + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + return_value=result, + ), + patch("python_pkg.wake_alarm._audio._logger") as mock_log, + ): + _warn_if_no_real_sink() + mock_log.info.assert_called() + mock_log.warning.assert_not_called() + + def test_handles_pactl_failure(self) -> None: + """OSError/TimeoutExpired running pactl → warning, no raise.""" + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/pactl", + ), + patch( + "python_pkg.wake_alarm._audio.subprocess.run", + side_effect=subprocess.TimeoutExpired("pactl", 5), + ), + ): + _warn_if_no_real_sink() # must not raise + + +class TestAlarmSinkPresent: + """Tests for _alarm_sink_present.""" + + def test_true_when_sink_listed(self) -> None: + """Returns True when the alarm sink name appears in pactl output.""" + from python_pkg.wake_alarm._constants import ALARM_AUDIO_SINK + + proc = MagicMock(stdout=ALARM_AUDIO_SINK.encode() + b"\tPipeWire\n") + with patch( + "python_pkg.wake_alarm._audio.subprocess.run", + return_value=proc, + ): + assert _alarm_sink_present("/usr/bin/pactl") is True + + def test_false_when_sink_absent(self) -> None: + """Returns False when the alarm sink is not in pactl output.""" + proc = MagicMock(stdout=b"auto_null\tPipeWire\n") + with patch( + "python_pkg.wake_alarm._audio.subprocess.run", + return_value=proc, + ): + assert _alarm_sink_present("/usr/bin/pactl") is False + + def test_false_on_subprocess_error(self) -> None: + """OSError while listing sinks → False, no raise.""" + with patch( + "python_pkg.wake_alarm._audio.subprocess.run", + side_effect=OSError("boom"), + ): + assert _alarm_sink_present("/usr/bin/pactl") is False + + +class TestCurrentDefaultSink: + """Tests for _current_default_sink.""" + + def test_returns_sink_name(self) -> None: + """Returns the trimmed default sink name.""" + proc = MagicMock(stdout=b"jbl_sink\n") + with patch( + "python_pkg.wake_alarm._audio.subprocess.run", + return_value=proc, + ): + assert _current_default_sink("/usr/bin/pactl") == "jbl_sink" + + def test_returns_none_when_empty(self) -> None: + """Empty output → None.""" + proc = MagicMock(stdout=b"\n") + with patch( + "python_pkg.wake_alarm._audio.subprocess.run", + return_value=proc, + ): + assert _current_default_sink("/usr/bin/pactl") is None + + def test_returns_none_on_error(self) -> None: + """TimeoutExpired → None, no raise.""" + with patch( + "python_pkg.wake_alarm._audio.subprocess.run", + side_effect=subprocess.TimeoutExpired("pactl", 3), + ): + assert _current_default_sink("/usr/bin/pactl") is None + + +class TestActivateAlarmAudio: + """Tests for _activate_alarm_audio.""" + + def test_returns_none_when_pactl_missing(self) -> None: + """No pactl on PATH → returns None without touching audio.""" + with ( + patch("python_pkg.wake_alarm._audio.shutil.which", return_value=None), + patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run, + ): + assert _activate_alarm_audio() is None + mock_run.assert_not_called() + + def test_activates_and_returns_old_default(self) -> None: + """Sink present → routes audio there and returns prior default sink.""" + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/pactl", + ), + patch( + "python_pkg.wake_alarm._audio._alarm_sink_present", + return_value=True, + ), + patch( + "python_pkg.wake_alarm._audio._current_default_sink", + return_value="jbl_sink", + ), + patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run, + ): + result = _activate_alarm_audio() + assert result == "jbl_sink" + cmds = [call.args[0] for call in mock_run.call_args_list] + from python_pkg.wake_alarm._constants import ( + ALARM_AUDIO_CARD, + ALARM_AUDIO_PROFILE, + ALARM_AUDIO_SINK, + ) + + assert [ + "/usr/bin/pactl", + "set-card-profile", + ALARM_AUDIO_CARD, + ALARM_AUDIO_PROFILE, + ] in cmds + assert ["/usr/bin/pactl", "set-default-sink", ALARM_AUDIO_SINK] in cmds + + def test_returns_none_when_sink_never_appears(self) -> None: + """Sink never shows up → returns None after polling (no raise).""" + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/pactl", + ), + patch( + "python_pkg.wake_alarm._audio._alarm_sink_present", + return_value=False, + ), + patch("python_pkg.wake_alarm._audio.time.sleep") as mock_sleep, + patch("python_pkg.wake_alarm._audio.subprocess.run"), + ): + assert _activate_alarm_audio() is None + mock_sleep.assert_called() + + def test_waits_then_succeeds(self) -> None: + """Sink absent then present → sleeps once, then routes audio.""" + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/pactl", + ), + patch( + "python_pkg.wake_alarm._audio._alarm_sink_present", + side_effect=[False, True], + ), + patch( + "python_pkg.wake_alarm._audio._current_default_sink", + return_value="old", + ), + patch("python_pkg.wake_alarm._audio.time.sleep") as mock_sleep, + patch("python_pkg.wake_alarm._audio.subprocess.run"), + ): + assert _activate_alarm_audio() == "old" + mock_sleep.assert_called_once() + + +class TestRestoreAlarmAudio: + """Tests for _restore_alarm_audio.""" + + def test_none_is_noop(self) -> None: + """None default → does nothing, no pactl lookup.""" + with patch("python_pkg.wake_alarm._audio.shutil.which") as mock_which: + _restore_alarm_audio(None) + mock_which.assert_not_called() + + def test_no_pactl_returns_silently(self) -> None: + """Default present but pactl missing → no raise, no run.""" + with ( + patch("python_pkg.wake_alarm._audio.shutil.which", return_value=None), + patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run, + ): + _restore_alarm_audio("jbl_sink") + mock_run.assert_not_called() + + def test_restores_default_sink(self) -> None: + """Calls set-default-sink with the captured prior default.""" + with ( + patch( + "python_pkg.wake_alarm._audio.shutil.which", + return_value="/usr/bin/pactl", + ), + patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run, + ): + _restore_alarm_audio("jbl_sink") + cmds = [call.args[0] for call in mock_run.call_args_list] + assert ["/usr/bin/pactl", "set-default-sink", "jbl_sink"] in cmds + + +class TestParseArgs: + """Tests for _parse_args.""" + + def test_default_flags_are_false(self) -> None: + """No CLI args means every flag is False.""" + ns = _parse_args([]) + assert ns.demo is False + assert ns.trigger_now is False + assert ns.production is False + + def test_flags_parse(self) -> None: + """Each flag flips to True when passed.""" + ns = _parse_args(["--production", "--demo", "--trigger-now"]) + assert ns.production is True + assert ns.demo is True + assert ns.trigger_now is True