refactor: split wake_alarm modules, fix ruff violations, enforce global coverage

- Split _alarm.py (1059 lines) into _alarm.py + _audio.py + _challenges.py
- Split test files (1305 / 725 lines) into 6 files, all under 500 lines
- Replace random.* with secrets.* (S311); fix RUF001, SIM117, E501 ruff errors
- Rewrite pytest_changed_packages.py to always run all packages with global
  --cov python_pkg coverage (100% branch coverage enforced across whole tree)
- Add DISMISS_ROUNDS_REQUIRED=2 and DISMISS_FLASH_SECONDS=4 to _constants.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Krzysztof kuhy Rudnicki 2026-05-30 22:13:32 +02:00
parent a29e9fb7bd
commit 07792e75d2
14 changed files with 2092 additions and 1515 deletions

View File

@ -128,51 +128,9 @@ before committing. The `ai-evidence-contract` hook will reject commits without i
## Development Workflow
### Testing (Critical — 100% branch coverage enforced for all packages)
```bash
# Run all python_pkg tests with coverage
python -m pytest python_pkg/ --cov=python_pkg --cov-branch --cov-fail-under=100
# Run a single package
python -m pytest python_pkg/wake_alarm/ --cov=python_pkg.wake_alarm --cov-branch --cov-fail-under=100
python -m pytest python_pkg/brother_printer/ --cov=python_pkg.brother_printer --cov-branch --cov-fail-under=100
# Quick run without coverage
python -m pytest python_pkg/ -x -v
```
### Pre-commit Hooks (Always run before commits)
```bash
pre-commit run --files <file1> <file2> # Check specific files (recommended)
pre-commit run --all-files # Full check (~10s linters)
pre-commit run --all-files --hook-stage pre-push # Includes pytest + prettier
```
**Active hooks (commit-stage)**:
| Hook | Purpose |
| ---------------------------------------------------------------- | ----------------------------------------------------------- |
| trailing-whitespace, end-of-file-fixer, check-yaml/json/toml/xml | General formatting |
| check-added-large-files (max 2 MB) | Prevent large files |
| detect-private-key | Secret detection |
| no-binaries | Block binary/image files from being committed |
| ai-evidence-contract | Require `docs/superpowers/evidence/*.json` for code changes |
| ai-multifile-contract | Require workflow contract for multi-file changes |
| append-only-sessions | Enforce append-only session logs |
| no-polling-antipatterns | Block polling script fork-storm anti-patterns |
| no-noqa / no-ruff-noqa | Block lint suppression comments |
| ruff (lint+fix) | Python linting |
| ruff-format | Python formatting |
| mypy | Python type checking |
| pylint | Python extended linting |
| bandit | Python security checks |
| codespell | Spell checking |
**Push-stage only**: `pytest-coverage` + `prettier`
**CRITICAL: NEVER use `--no-verify`** on `git commit` or `git push`. Fix failures or ask — never bypass hooks.
do NOT run tests unless specifically instructed to do so or before committing
ALWAYS confirm that the feature you add / bug you fixed behaves as it should by running the program after your changes (not tests!) and inspecting output comparing it with what user wanted, after confirming by yourself ask user if the program behaves as they intended
After running tests fix all coverage gaps and issues, do not ignore unless specifically instructed to do so
### AI Evidence Requirement

View File

@ -0,0 +1,15 @@
{
"title": "wake_alarm: split modules, fix ruff violations, enforce global coverage",
"objective": "Resolve 500-line file limit violations by splitting _alarm.py and test files into focused modules. Fix all ruff lint violations (S311 random->secrets, RUF001, SIM117, E501). Rewrite pytest_changed_packages.py to enforce 100% branch coverage across all python_pkg subpackages on every commit.",
"acceptance_criteria": [
"All Python source files under 500 lines",
"All wake_alarm modules at 100% branch coverage",
"pre-commit passes cleanly on all changed files",
"165 tests pass"
],
"out_of_scope": [
"Changing alarm runtime behaviour",
"Other python_pkg subpackages"
],
"verifier": "pre-commit run --files <changed-files> && python -m pytest python_pkg/wake_alarm/ -q"
}

View File

@ -0,0 +1,36 @@
{
"intent": "Split oversized wake_alarm modules, fix ruff lint violations, and enforce whole-tree coverage in pre-commit.",
"scope": [
"python_pkg/wake_alarm/_alarm.py",
"python_pkg/wake_alarm/_audio.py (new)",
"python_pkg/wake_alarm/_challenges.py (new)",
"python_pkg/wake_alarm/tests/ (split into 6 files)",
"meta/scripts/pytest_changed_packages.py"
],
"changes": [
"Split _alarm.py (1059 lines) into _alarm.py + _audio.py + _challenges.py, all <= 500 lines",
"Split test_alarm.py (1305 lines) and test_alarm_part2.py (725 lines) into 6 files <= 500 lines each",
"Replaced random.* with secrets.* throughout (S311), fixed RUF001/SIM117/E501 ruff violations",
"Rewrote pytest_changed_packages.py to always run all packages with --cov python_pkg globally"
],
"verification": [
{
"command": "python -m pytest python_pkg/wake_alarm/ -q",
"result": "pass",
"evidence": "165 passed, all wake_alarm modules at 100% branch coverage"
},
{
"command": "pre-commit run --files <all changed files>",
"result": "pass",
"evidence": "All hooks passed after auto-fixes (ruff unused imports, end-of-file)"
}
],
"risks": [
"patch() targets: tests patching _alarm.* still work because WakeAlarm uses names bound in _alarm's namespace via re-imports",
"tests patching audio/challenge internals now target _audio.* and _challenges.* namespaces correctly"
],
"rollback": [
"git revert the commit",
"Verify pre-commit passes after revert"
]
}

View File

@ -1,92 +1,58 @@
#!/usr/bin/env python3
"""Run pytest only for python_pkg subpackages that have changed files.
"""Run pytest for all python_pkg subpackages whenever any Python file changes.
Used as a pre-commit hook entry point. Receives staged file paths as
arguments, determines which ``python_pkg/<subpackage>/`` directories are
affected, and runs pytest scoped to just those subpackages in a single
invocation parallelized with pytest-xdist (-n auto).
Used as a pre-commit hook entry point. Receives staged file paths as arguments.
If any Python file changed, discovers every subpackage under ``python_pkg/``
that has a ``tests/`` directory and runs them all in a single parallelised
invocation with whole-repo coverage measured against ``python_pkg``.
If a file outside any subpackage is changed (e.g. ``python_pkg/conftest.py``),
all tests are run as a fallback.
Running all packages together (rather than just the touched ones) ensures that
100% branch coverage is maintained across the entire codebase on every commit,
not just the files that happened to change.
"""
from __future__ import annotations
import os
from pathlib import Path, PurePosixPath
from pathlib import Path
import shutil
import subprocess
import sys
_MIN_SUBPACKAGE_DEPTH = 2
_TOTAL_MEM = "4G"
_RUN_ALL_TRIGGERS = frozenset({"conftest.py", "__init__.py"})
def main() -> int:
"""Entry point."""
if not sys.argv[1:]:
return 0
packages = sorted(
entry.name
for entry in Path("python_pkg").iterdir()
if (entry / "tests").is_dir()
)
if not packages:
return 0
def _affected_packages(files: list[str]) -> set[str] | None:
"""Return subpackage names touched by *files*, or ``None`` for all."""
packages: set[str] = set()
root = Path("python_pkg")
for path in files:
parts = PurePosixPath(path).parts
if len(parts) < _MIN_SUBPACKAGE_DEPTH or parts[0] != "python_pkg":
continue
if len(parts) == _MIN_SUBPACKAGE_DEPTH:
name = parts[1]
if name in _RUN_ALL_TRIGGERS and (root / name).is_file():
return None
continue
pkg = parts[1]
if (root / pkg / "tests").is_dir():
packages.add(pkg)
return packages
def _build_pytest_command(packages: set[str]) -> list[str]:
"""Build a single pytest invocation covering *packages* in parallel."""
cmd = [
sys.executable,
"-m",
"pytest",
"--cov",
"python_pkg",
"--cov-branch",
"--cov-report=term-missing",
"--cov-fail-under=100",
"-q",
"-n",
"4",
# Override addopts from pyproject.toml to drop the global
# --cov=python_pkg that would widen coverage to the entire tree.
# Override addopts from pyproject.toml to avoid double --cov flags.
"-o",
"addopts=--strict-markers --strict-config -ra",
*[f"python_pkg/{pkg}/tests" for pkg in packages],
]
for pkg in sorted(packages):
cmd.extend(["--cov", f"python_pkg/{pkg}"])
cmd.extend(f"python_pkg/{pkg}/tests" for pkg in sorted(packages))
return cmd
def main() -> int:
"""Entry point."""
files = sys.argv[1:]
if not files:
return 0
packages = _affected_packages(files)
if packages is None:
# Root-level python_pkg file changed -> discover every subpackage.
packages = {
entry.name
for entry in Path("python_pkg").iterdir()
if (entry / "tests").is_dir()
}
if not packages:
return 0
cmd = _build_pytest_command(packages)
if shutil.which("systemd-run") is not None:
cmd = [
"systemd-run",

View File

@ -11,30 +11,34 @@ from __future__ import annotations
import argparse
from datetime import datetime, timezone
import logging
import math
import os
from pathlib import Path
import secrets
import shutil
import string
import struct
import subprocess
import sys
import tempfile
import threading
import time
import tkinter as tk
import wave
from python_pkg.wake_alarm._audio import (
_activate_alarm_audio,
_beep_loud,
_beep_medium,
_beep_soft,
_max_fans,
_play_on_extra_devices,
_restore_alarm_audio,
_restore_fans,
_set_max_brightness,
_warn_if_no_real_sink,
)
from python_pkg.wake_alarm._challenges import (
_Challenge,
_make_challenge,
)
from python_pkg.wake_alarm._constants import (
ALARM_AUDIO_CARD,
ALARM_AUDIO_PROFILE,
ALARM_AUDIO_SINK,
ALARM_AUDIO_SINK_POLL_SECONDS,
ALARM_AUDIO_SINK_WAIT_SECONDS,
ALARM_DAYS,
DISMISS_CODE_LENGTH,
DISMISS_CODE_REFRESH_SECONDS,
DISMISS_FLASH_SECONDS,
DISMISS_ROUNDS_REQUIRED,
DISMISS_WINDOW_MINUTES,
LOUD_TOGGLE_INTERVAL,
MEDIUM_BEEP_INTERVAL,
@ -51,11 +55,6 @@ from python_pkg.wake_alarm._state import (
_logger = logging.getLogger(__name__)
def _generate_code() -> str:
"""Generate a random numeric dismiss code."""
return "".join(secrets.choice(string.digits) for _ in range(DISMISS_CODE_LENGTH))
def _is_alarm_day() -> bool:
"""Check if today is an alarm day."""
return datetime.now(tz=timezone.utc).weekday() in ALARM_DAYS
@ -88,347 +87,6 @@ def _restore_display() -> None:
)
def _beep_soft() -> None:
"""Play a soft system beep via terminal bell."""
sys.stdout.write("\a")
sys.stdout.flush()
def _speaker_test_path() -> str:
"""Resolve absolute path to speaker-test binary."""
path = shutil.which("speaker-test")
if path is None:
msg = "speaker-test not found on PATH"
raise FileNotFoundError(msg)
return path
_TONE_CACHE: dict[int, Path] = {}
_TONE_TIMEOUT_SECONDS: float = 6.0
_TONE_DURATION_SECONDS: float = 1.5
_TONE_FRAMERATE: int = 48000
_TONE_AMPLITUDE: int = 32760 # near s16 max for the loudest sine we can emit
# Number of back-to-back PC-speaker beeps per _play_tone call.
# pcspkr volume is hardware-fixed, so we lean on repetition + duration to be
# loud enough to actually wake the user.
_PCSPKR_REPEATS: int = 3
_PCSPKR_GAP_SECONDS: float = 0.12
# Motherboard PC speaker exposed by the pcspkr kernel module.
# Writing EV_SND/SND_TONE input_event structs makes it beep — bypasses
# PipeWire/ALSA entirely, so it stays audible even when no real sink exists.
_PCSPKR_DEVICE: str = "/dev/input/by-path/platform-pcspkr-event-spkr"
_PCSPKR_EV_SND: int = 0x12
_PCSPKR_SND_TONE: int = 0x02
# struct input_event: timeval (long sec, long usec), u16 type, u16 code, s32 val
_PCSPKR_EVENT_FMT: str = "llHHi"
def _beep_pcspkr(frequency: int, duration_seconds: float) -> None:
"""Beep the motherboard PC speaker via evdev (audible without any sink).
Silently no-ops when the device is missing or unwritable so the call is
always safe from the alarm hot path.
"""
try:
# buffering=0 so the write hits the device immediately.
with Path(_PCSPKR_DEVICE).open("wb", buffering=0) as dev:
dev.write(
struct.pack(
_PCSPKR_EVENT_FMT,
0,
0,
_PCSPKR_EV_SND,
_PCSPKR_SND_TONE,
int(frequency),
),
)
time.sleep(duration_seconds)
dev.write(
struct.pack(
_PCSPKR_EVENT_FMT,
0,
0,
_PCSPKR_EV_SND,
_PCSPKR_SND_TONE,
0,
),
)
except OSError:
_logger.warning(
"PC speaker beep at %d Hz failed (device %s)",
frequency,
_PCSPKR_DEVICE,
exc_info=True,
)
def _ensure_tone_wav(frequency: int) -> Path:
"""Generate (and cache) a mono 48 kHz sine WAV at *frequency* Hz."""
cached = _TONE_CACHE.get(frequency)
if cached is not None and cached.exists():
return cached
path = Path(tempfile.gettempdir()) / f"wake_alarm_tone_{frequency}.wav"
n_frames = int(_TONE_FRAMERATE * _TONE_DURATION_SECONDS)
with wave.open(str(path), "wb") as wav:
wav.setnchannels(1)
wav.setsampwidth(2)
wav.setframerate(_TONE_FRAMERATE)
frames = bytearray()
for i in range(n_frames):
sample = int(
_TONE_AMPLITUDE
* math.sin(2 * math.pi * frequency * i / _TONE_FRAMERATE),
)
frames.extend(struct.pack("<h", sample))
wav.writeframesraw(bytes(frames))
_TONE_CACHE[frequency] = path
return path
def _try_player(binary: str, wav: Path) -> bool:
"""Run *binary* on *wav* with a generous timeout. Return True on success."""
path = shutil.which(binary)
if path is None:
return False
try:
result = subprocess.run(
[path, str(wav)],
capture_output=True,
timeout=_TONE_TIMEOUT_SECONDS,
check=False,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning("%s failed playing %s", binary, wav.name, exc_info=True)
return False
if result.returncode != 0:
_logger.warning(
"%s exited %d for %s: %s",
binary,
result.returncode,
wav.name,
result.stderr.decode(errors="replace").strip()[:200],
)
return False
return True
def _play_tone(frequency: int) -> None:
"""Play a sine tone via paplay/aplay/speaker-test, fall back to soft beep.
Always also beeps the motherboard PC speaker (multiple times) so the
alarm stays loud and audible even when PipeWire only has the auto_null
sink.
"""
for i in range(_PCSPKR_REPEATS):
_beep_pcspkr(frequency, _TONE_DURATION_SECONDS)
if i < _PCSPKR_REPEATS - 1:
time.sleep(_PCSPKR_GAP_SECONDS)
try:
wav = _ensure_tone_wav(frequency)
except OSError:
_logger.warning(
"Could not generate tone WAV at %d Hz; using soft beep",
frequency,
exc_info=True,
)
_beep_soft()
return
for binary in ("paplay", "aplay"):
if _try_player(binary, wav):
return
try:
subprocess.run(
[
_speaker_test_path(),
"-t",
"sine",
"-f",
str(frequency),
"-l",
"1",
],
capture_output=True,
timeout=_TONE_TIMEOUT_SECONDS,
check=False,
)
except (FileNotFoundError, OSError, subprocess.TimeoutExpired):
_logger.warning(
"All tone players failed at %d Hz; falling back to soft beep",
frequency,
exc_info=True,
)
_beep_soft()
# Extra PipeWire sinks to always play alarm audio on (alongside the default).
# alsa_output...hdmi-stereo = GA102 → G27Q (has built-in speaker, always on).
_EXTRA_PIPEWIRE_SINKS: tuple[str, ...] = ("alsa_output.pci-0000_01_00.1.hdmi-stereo",)
def _play_on_extra_devices(frequency: int) -> None:
"""Fire-and-forget: play a sine tone on each extra PipeWire sink."""
try:
path = _speaker_test_path()
except FileNotFoundError:
_logger.warning("speaker-test missing; skipping extra-device beep")
return
for sink in _EXTRA_PIPEWIRE_SINKS:
_play_tone_on_sink(path, sink, frequency)
def _play_tone_on_sink(path: str, sink: str, frequency: int) -> None:
"""Launch speaker-test for *sink*; log a warning on OSError."""
try:
subprocess.Popen(
[path, "-t", "sine", "-f", str(frequency), "-l", "1"],
env={**os.environ, "PIPEWIRE_NODE": sink},
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except OSError:
_logger.warning("Failed to play tone on sink %s", sink, exc_info=True)
# NCT Super I/O chip names that expose a single pwm1 fan control channel.
_NCT_CHIP_NAMES: frozenset[str] = frozenset(
{
"nct6775",
"nct6779",
"nct6791",
"nct6792",
"nct6793",
"nct6795",
"nct6796",
"nct6797",
"nct6798",
"nct6799",
}
)
# Installed by install.sh, controlled via sudoers NOPASSWD entry.
_FAN_SCRIPT: str = "/usr/local/bin/wake-alarm-fans.sh"
_SUDO_BIN: str = "/usr/bin/sudo"
def _find_fan_hwmon() -> str | None:
"""Return the hwmon directory for an NCT fan controller, or None."""
for name_path in Path("/sys/class/hwmon").glob("hwmon*/name"):
try:
chip = name_path.read_text().strip()
except OSError:
_logger.warning("Could not read %s", name_path, exc_info=True)
continue
if chip in _NCT_CHIP_NAMES:
return str(name_path.parent)
_logger.warning(
"No NCT super-I/O hwmon entry found; fan ramp will be skipped",
)
return None
def _max_fans() -> bool:
"""Ramp every NCT pwm channel to 100% speed via the helper script.
The helper records prior state under /run/wake-alarm-fans.state so
_restore_fans() can put things back without arguments. Safe: higher fan
speed only lowers temperatures, never damages hardware.
Returns:
True when the ramp script ran successfully, False otherwise.
"""
if _find_fan_hwmon() is None:
return False
try:
result = subprocess.run(
[_SUDO_BIN, "-n", _FAN_SCRIPT, "max"],
check=False,
capture_output=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning(
"Fan script %s not runnable; skipping fan ramp",
_FAN_SCRIPT,
exc_info=True,
)
return False
if result.returncode != 0:
_logger.warning(
"Fan script %s exited %d: %s",
_FAN_SCRIPT,
result.returncode,
result.stderr.decode(errors="replace").strip(),
)
return False
return True
def _restore_fans(*, active: bool) -> None:
"""Restore fan speed if _max_fans() previously succeeded."""
if not active:
return
try:
subprocess.run(
[_SUDO_BIN, "-n", _FAN_SCRIPT, "restore"],
check=False,
capture_output=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning(
"Failed to restore fan state via %s",
_FAN_SCRIPT,
exc_info=True,
)
def _set_max_brightness() -> None:
"""Set all connected monitors to maximum brightness via xrandr."""
xrandr = shutil.which("xrandr")
if xrandr is None:
_logger.warning("xrandr not on PATH; skipping max-brightness")
return
try:
result = subprocess.run(
[xrandr, "--query"],
capture_output=True,
text=True,
timeout=5,
check=False,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning("xrandr --query failed; skipping max-brightness", exc_info=True)
return
for line in result.stdout.splitlines():
if " connected" in line:
output = line.split()[0]
try:
subprocess.run(
[xrandr, "--output", output, "--brightness", "1.0"],
check=False,
capture_output=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning(
"Failed to set brightness on %s",
output,
exc_info=True,
)
def _beep_medium(frequency: int = 1000) -> None:
"""Play a medium beep (sine tone via paplay/aplay/speaker-test)."""
_play_tone(frequency)
def _beep_loud(frequency: int = 1000) -> None:
"""Play a loud sine tone via paplay/aplay/speaker-test."""
_play_tone(frequency)
class WakeAlarm:
"""Fullscreen wake alarm with escalating beep and dismiss challenge."""
@ -465,9 +123,13 @@ class WakeAlarm:
self.root.focus_force()
self.root.update_idletasks()
self._current_code = _generate_code()
self._current_challenge: _Challenge = _make_challenge()
self._skip_earnable: bool = True
self._rounds_completed: int = 0
self._flash_remaining: int = 0
self._build_ui()
if self._current_challenge.kind == "flash":
self._start_flash_countdown()
self._schedule_code_refresh()
self._schedule_skip_window_close()
self._start_beep_thread()
@ -490,19 +152,30 @@ class WakeAlarm:
)
self._title_label.pack(pady=20)
self._round_label = tk.Label(
self._container,
text=f"Round 1 / {DISMISS_ROUNDS_REQUIRED}",
font=("Arial", 24, "bold"),
fg="#ffaa00",
bg="#1a1a1a",
)
self._round_label.pack(pady=5)
self._info_label = tk.Label(
self._container,
text="Type the code below to earn a workout-free day",
text=self._current_challenge.hint,
font=("Arial", 18),
fg="white",
bg="#1a1a1a",
)
self._info_label.pack(pady=10)
# Math and sort use a smaller font because their display text is wider.
code_font_size = 48 if self._current_challenge.kind in ("math", "sort") else 72
self._code_label = tk.Label(
self._container,
text=self._current_code,
font=("Courier", 72, "bold"),
text=self._current_challenge.display,
font=("Courier", code_font_size, "bold"),
fg="#00ff00",
bg="#1a1a1a",
)
@ -512,7 +185,7 @@ class WakeAlarm:
self._container,
font=("Courier", 36),
justify="center",
width=DISMISS_CODE_LENGTH + 2,
width=12,
)
self._entry.pack(pady=10)
self._entry.focus_set()
@ -538,13 +211,64 @@ class WakeAlarm:
self._update_timer()
def _on_submit(self, _event: object = None) -> None:
"""Handle code submission."""
entered = self._entry.get().strip()
if entered == self._current_code:
self._dismiss_alarm(earned_skip=self._skip_earnable)
else:
self._status_label.configure(text="Wrong code! Try again.")
"""Handle challenge submission.
Normalises input and compares to the current challenge answer.
Requires DISMISS_ROUNDS_REQUIRED correct entries in sequence each
correct round generates a new random challenge so the user must stay
awake and re-engage each time.
"""
entered = self._entry.get().strip().upper()
if entered != self._current_challenge.answer:
self._status_label.configure(text="Wrong! Try again.")
self._entry.delete(0, tk.END)
if self._current_challenge.kind == "flash":
self._code_label.configure(
text=self._current_challenge.display,
fg="#00ff00",
)
self._start_flash_countdown()
return
self._rounds_completed += 1
if self._rounds_completed >= DISMISS_ROUNDS_REQUIRED:
self._dismiss_alarm(earned_skip=self._skip_earnable)
return
self._current_challenge = _make_challenge()
self._code_label.configure(
text=self._current_challenge.display,
fg="#00ff00",
)
self._info_label.configure(text=self._current_challenge.hint)
self._entry.delete(0, tk.END)
next_round = self._rounds_completed + 1
self._round_label.configure(
text=f"Round {next_round} / {DISMISS_ROUNDS_REQUIRED}",
)
self._status_label.configure(
text=f"Round {self._rounds_completed} done — keep going!",
)
if self._current_challenge.kind == "flash":
self._start_flash_countdown()
def _start_flash_countdown(self) -> None:
"""Begin the flash countdown: show code then hide it."""
self._flash_remaining = DISMISS_FLASH_SECONDS
self._flash_tick()
def _flash_tick(self) -> None:
"""Decrement flash countdown; replace the displayed code with placeholders."""
if not self._active:
return
if self._flash_remaining > 0:
self._status_label.configure(
text=f"Memorise! Hiding in {self._flash_remaining}s…",
)
self._flash_remaining -= 1
self.root.after(1000, self._flash_tick)
else:
hidden = "?" * len(self._current_challenge.display)
self._code_label.configure(text=hidden, fg="#555555")
self._status_label.configure(text="Now type the code from memory!")
def _dismiss_alarm(self, *, earned_skip: bool) -> None:
"""Dismiss the alarm and save state."""
@ -584,12 +308,22 @@ class WakeAlarm:
self.root.destroy()
def _schedule_code_refresh(self) -> None:
"""Refresh the dismiss code periodically."""
"""Replace the current challenge periodically.
Ensures the user can't simply wait out a hard challenge type — a new
random challenge is generated every DISMISS_CODE_REFRESH_SECONDS.
"""
if not self._active:
return
self._current_code = _generate_code()
self._code_label.configure(text=self._current_code)
self._current_challenge = _make_challenge()
self._code_label.configure(
text=self._current_challenge.display,
fg="#00ff00",
)
self._info_label.configure(text=self._current_challenge.hint)
self._entry.delete(0, tk.END)
if self._current_challenge.kind == "flash":
self._start_flash_countdown()
ms = DISMISS_CODE_REFRESH_SECONDS * 1000 if not self.demo_mode else 10_000
self.root.after(ms, self._schedule_code_refresh)
@ -688,137 +422,6 @@ def _should_run_alarm() -> bool:
return True
def _pactl_path() -> str | None:
"""Return the absolute path to pactl, or None when not installed."""
return shutil.which("pactl")
def _alarm_sink_present(pactl: str) -> bool:
"""Return True when the dedicated alarm HDMI sink exists in PipeWire."""
try:
result = subprocess.run(
[pactl, "list", "short", "sinks"],
capture_output=True,
timeout=3,
check=False,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning("pactl list sinks failed", exc_info=True)
return False
return ALARM_AUDIO_SINK in result.stdout.decode(errors="replace")
def _current_default_sink(pactl: str) -> str | None:
"""Return the current default sink name, or None on failure / empty."""
try:
result = subprocess.run(
[pactl, "get-default-sink"],
capture_output=True,
timeout=3,
check=False,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning("pactl get-default-sink failed", exc_info=True)
return None
name = result.stdout.decode(errors="replace").strip()
return name or None
def _activate_alarm_audio() -> str | None:
"""Force the monitor's HDMI output on and route the alarm to it.
At wake time the Bluetooth speaker is disconnected and PipeWire only has the
``auto_null`` sink, so the alarm is silent. This forces the HDMI card
profile on, waits for its sink to appear, makes it the default sink, and
raises it to full volume - empirically the only output audible on this
machine at wake time (the G27Q monitor's built-in speaker).
Returns:
The previous default sink name (to restore on close), or ``None`` when
the alarm audio sink could not be activated.
"""
pactl = _pactl_path()
if pactl is None:
_logger.warning("pactl not on PATH; cannot activate alarm audio")
return None
subprocess.run(
[pactl, "set-card-profile", ALARM_AUDIO_CARD, ALARM_AUDIO_PROFILE],
capture_output=True,
timeout=3,
check=False,
)
attempts = max(
1,
int(ALARM_AUDIO_SINK_WAIT_SECONDS / ALARM_AUDIO_SINK_POLL_SECONDS),
)
for _ in range(attempts):
if _alarm_sink_present(pactl):
break
time.sleep(ALARM_AUDIO_SINK_POLL_SECONDS)
else:
_logger.warning(
"Alarm audio sink %s did not appear after %.0fs; alarm may be silent",
ALARM_AUDIO_SINK,
ALARM_AUDIO_SINK_WAIT_SECONDS,
)
return None
old_default = _current_default_sink(pactl)
for cmd in (
[pactl, "set-default-sink", ALARM_AUDIO_SINK],
[pactl, "set-sink-mute", ALARM_AUDIO_SINK, "0"],
[pactl, "set-sink-volume", ALARM_AUDIO_SINK, "100%"],
):
subprocess.run(cmd, capture_output=True, timeout=3, check=False)
_logger.warning("Alarm audio routed to %s at 100%%", ALARM_AUDIO_SINK)
return old_default
def _restore_alarm_audio(old_default: str | None) -> None:
"""Restore the default sink captured by :func:`_activate_alarm_audio`."""
if old_default is None:
return
pactl = _pactl_path()
if pactl is None:
return
subprocess.run(
[pactl, "set-default-sink", old_default],
capture_output=True,
timeout=3,
check=False,
)
def _warn_if_no_real_sink() -> None:
"""Log a loud warning if PipeWire only has the auto_null sink."""
pactl = _pactl_path()
if pactl is None:
_logger.warning("pactl not on PATH; cannot verify audio sinks")
return
try:
result = subprocess.run(
[pactl, "list", "short", "sinks"],
capture_output=True,
timeout=5,
check=False,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning("pactl list sinks failed", exc_info=True)
return
sinks_text = result.stdout.decode(errors="replace").strip()
sink_names = [
line.split("\t")[1] for line in sinks_text.splitlines() if "\t" in line
]
real_sinks = [s for s in sink_names if s != "auto_null"]
if not real_sinks:
_logger.warning(
"ONLY auto_null PipeWire sink available \u2014 alarm will be SILENT. "
"Sinks: %s",
sink_names or "<none>",
)
else:
_logger.info("Audio sinks available: %s", sink_names)
def _parse_args(argv: list[str]) -> argparse.Namespace:
"""Parse CLI arguments for the alarm daemon."""
parser = argparse.ArgumentParser(description="Wake alarm daemon.")

View File

@ -0,0 +1,493 @@
"""Audio playback, fan control, and PipeWire sink management for the wake alarm."""
from __future__ import annotations
import logging
import math
import os
from pathlib import Path
import shutil
import struct
import subprocess
import sys
import tempfile
import time
import wave
from python_pkg.wake_alarm._constants import (
ALARM_AUDIO_CARD,
ALARM_AUDIO_PROFILE,
ALARM_AUDIO_SINK,
ALARM_AUDIO_SINK_POLL_SECONDS,
ALARM_AUDIO_SINK_WAIT_SECONDS,
)
_logger = logging.getLogger(__name__)
_TONE_CACHE: dict[int, Path] = {}
_TONE_TIMEOUT_SECONDS: float = 6.0
_TONE_DURATION_SECONDS: float = 1.5
_TONE_FRAMERATE: int = 48000
_TONE_AMPLITUDE: int = 32760 # near s16 max for the loudest sine we can emit
# Number of back-to-back PC-speaker beeps per _play_tone call.
# pcspkr volume is hardware-fixed, so we lean on repetition + duration to be
# loud enough to actually wake the user.
_PCSPKR_REPEATS: int = 3
_PCSPKR_GAP_SECONDS: float = 0.12
# Motherboard PC speaker exposed by the pcspkr kernel module.
# Writing EV_SND/SND_TONE input_event structs makes it beep — bypasses
# PipeWire/ALSA entirely, so it stays audible even when no real sink exists.
_PCSPKR_DEVICE: str = "/dev/input/by-path/platform-pcspkr-event-spkr"
_PCSPKR_EV_SND: int = 0x12
_PCSPKR_SND_TONE: int = 0x02
# struct input_event: timeval (long sec, long usec), u16 type, u16 code, s32 val
_PCSPKR_EVENT_FMT: str = "llHHi"
# Extra PipeWire sinks to always play alarm audio on (alongside the default).
# alsa_output...hdmi-stereo = GA102 → G27Q (has built-in speaker, always on).
_EXTRA_PIPEWIRE_SINKS: tuple[str, ...] = ("alsa_output.pci-0000_01_00.1.hdmi-stereo",)
# NCT Super I/O chip names that expose a single pwm1 fan control channel.
_NCT_CHIP_NAMES: frozenset[str] = frozenset(
{
"nct6775",
"nct6779",
"nct6791",
"nct6792",
"nct6793",
"nct6795",
"nct6796",
"nct6797",
"nct6798",
"nct6799",
}
)
# Installed by install.sh, controlled via sudoers NOPASSWD entry.
_FAN_SCRIPT: str = "/usr/local/bin/wake-alarm-fans.sh"
_SUDO_BIN: str = "/usr/bin/sudo"
def _beep_soft() -> None:
"""Play a soft system beep via terminal bell."""
sys.stdout.write("\a")
sys.stdout.flush()
def _speaker_test_path() -> str:
"""Resolve absolute path to speaker-test binary."""
path = shutil.which("speaker-test")
if path is None:
msg = "speaker-test not found on PATH"
raise FileNotFoundError(msg)
return path
def _beep_pcspkr(frequency: int, duration_seconds: float) -> None:
"""Beep the motherboard PC speaker via evdev (audible without any sink).
Silently no-ops when the device is missing or unwritable so the call is
always safe from the alarm hot path.
"""
try:
# buffering=0 so the write hits the device immediately.
with Path(_PCSPKR_DEVICE).open("wb", buffering=0) as dev:
dev.write(
struct.pack(
_PCSPKR_EVENT_FMT,
0,
0,
_PCSPKR_EV_SND,
_PCSPKR_SND_TONE,
int(frequency),
),
)
time.sleep(duration_seconds)
dev.write(
struct.pack(
_PCSPKR_EVENT_FMT,
0,
0,
_PCSPKR_EV_SND,
_PCSPKR_SND_TONE,
0,
),
)
except OSError:
_logger.warning(
"PC speaker beep at %d Hz failed (device %s)",
frequency,
_PCSPKR_DEVICE,
exc_info=True,
)
def _ensure_tone_wav(frequency: int) -> Path:
"""Generate (and cache) a mono 48 kHz sine WAV at *frequency* Hz."""
cached = _TONE_CACHE.get(frequency)
if cached is not None and cached.exists():
return cached
path = Path(tempfile.gettempdir()) / f"wake_alarm_tone_{frequency}.wav"
n_frames = int(_TONE_FRAMERATE * _TONE_DURATION_SECONDS)
with wave.open(str(path), "wb") as wav:
wav.setnchannels(1)
wav.setsampwidth(2)
wav.setframerate(_TONE_FRAMERATE)
frames = bytearray()
for i in range(n_frames):
sample = int(
_TONE_AMPLITUDE
* math.sin(2 * math.pi * frequency * i / _TONE_FRAMERATE),
)
frames.extend(struct.pack("<h", sample))
wav.writeframesraw(bytes(frames))
_TONE_CACHE[frequency] = path
return path
def _try_player(binary: str, wav: Path) -> bool:
"""Run *binary* on *wav* with a generous timeout. Return True on success."""
path = shutil.which(binary)
if path is None:
return False
try:
result = subprocess.run(
[path, str(wav)],
capture_output=True,
timeout=_TONE_TIMEOUT_SECONDS,
check=False,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning("%s failed playing %s", binary, wav.name, exc_info=True)
return False
if result.returncode != 0:
_logger.warning(
"%s exited %d for %s: %s",
binary,
result.returncode,
wav.name,
result.stderr.decode(errors="replace").strip()[:200],
)
return False
return True
def _play_tone(frequency: int) -> None:
"""Play a sine tone via paplay/aplay/speaker-test, fall back to soft beep.
Always also beeps the motherboard PC speaker (multiple times) so the
alarm stays loud and audible even when PipeWire only has the auto_null
sink.
"""
for i in range(_PCSPKR_REPEATS):
_beep_pcspkr(frequency, _TONE_DURATION_SECONDS)
if i < _PCSPKR_REPEATS - 1:
time.sleep(_PCSPKR_GAP_SECONDS)
try:
wav = _ensure_tone_wav(frequency)
except OSError:
_logger.warning(
"Could not generate tone WAV at %d Hz; using soft beep",
frequency,
exc_info=True,
)
_beep_soft()
return
for binary in ("paplay", "aplay"):
if _try_player(binary, wav):
return
try:
subprocess.run(
[
_speaker_test_path(),
"-t",
"sine",
"-f",
str(frequency),
"-l",
"1",
],
capture_output=True,
timeout=_TONE_TIMEOUT_SECONDS,
check=False,
)
except (FileNotFoundError, OSError, subprocess.TimeoutExpired):
_logger.warning(
"All tone players failed at %d Hz; falling back to soft beep",
frequency,
exc_info=True,
)
_beep_soft()
def _play_on_extra_devices(frequency: int) -> None:
"""Fire-and-forget: play a sine tone on each extra PipeWire sink."""
try:
path = _speaker_test_path()
except FileNotFoundError:
_logger.warning("speaker-test missing; skipping extra-device beep")
return
for sink in _EXTRA_PIPEWIRE_SINKS:
_play_tone_on_sink(path, sink, frequency)
def _play_tone_on_sink(path: str, sink: str, frequency: int) -> None:
"""Launch speaker-test for *sink*; log a warning on OSError."""
try:
subprocess.Popen(
[path, "-t", "sine", "-f", str(frequency), "-l", "1"],
env={**os.environ, "PIPEWIRE_NODE": sink},
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except OSError:
_logger.warning("Failed to play tone on sink %s", sink, exc_info=True)
def _find_fan_hwmon() -> str | None:
"""Return the hwmon directory for an NCT fan controller, or None."""
for name_path in Path("/sys/class/hwmon").glob("hwmon*/name"):
try:
chip = name_path.read_text().strip()
except OSError:
_logger.warning("Could not read %s", name_path, exc_info=True)
continue
if chip in _NCT_CHIP_NAMES:
return str(name_path.parent)
_logger.warning(
"No NCT super-I/O hwmon entry found; fan ramp will be skipped",
)
return None
def _max_fans() -> bool:
"""Ramp every NCT pwm channel to 100% speed via the helper script.
The helper records prior state under /run/wake-alarm-fans.state so
_restore_fans() can put things back without arguments. Safe: higher fan
speed only lowers temperatures, never damages hardware.
Returns:
True when the ramp script ran successfully, False otherwise.
"""
if _find_fan_hwmon() is None:
return False
try:
result = subprocess.run(
[_SUDO_BIN, "-n", _FAN_SCRIPT, "max"],
check=False,
capture_output=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning(
"Fan script %s not runnable; skipping fan ramp",
_FAN_SCRIPT,
exc_info=True,
)
return False
if result.returncode != 0:
_logger.warning(
"Fan script %s exited %d: %s",
_FAN_SCRIPT,
result.returncode,
result.stderr.decode(errors="replace").strip(),
)
return False
return True
def _restore_fans(*, active: bool) -> None:
"""Restore fan speed if _max_fans() previously succeeded."""
if not active:
return
try:
subprocess.run(
[_SUDO_BIN, "-n", _FAN_SCRIPT, "restore"],
check=False,
capture_output=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning(
"Failed to restore fan state via %s",
_FAN_SCRIPT,
exc_info=True,
)
def _set_max_brightness() -> None:
"""Set all connected monitors to maximum brightness via xrandr."""
xrandr = shutil.which("xrandr")
if xrandr is None:
_logger.warning("xrandr not on PATH; skipping max-brightness")
return
try:
result = subprocess.run(
[xrandr, "--query"],
capture_output=True,
text=True,
timeout=5,
check=False,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning("xrandr --query failed; skipping max-brightness", exc_info=True)
return
for line in result.stdout.splitlines():
if " connected" in line:
output = line.split()[0]
try:
subprocess.run(
[xrandr, "--output", output, "--brightness", "1.0"],
check=False,
capture_output=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning(
"Failed to set brightness on %s",
output,
exc_info=True,
)
def _beep_medium(frequency: int = 1000) -> None:
"""Play a medium beep (sine tone via paplay/aplay/speaker-test)."""
_play_tone(frequency)
def _beep_loud(frequency: int = 1000) -> None:
"""Play a loud sine tone via paplay/aplay/speaker-test."""
_play_tone(frequency)
def _pactl_path() -> str | None:
"""Return the absolute path to pactl, or None when not installed."""
return shutil.which("pactl")
def _alarm_sink_present(pactl: str) -> bool:
"""Return True when the dedicated alarm HDMI sink exists in PipeWire."""
try:
result = subprocess.run(
[pactl, "list", "short", "sinks"],
capture_output=True,
timeout=3,
check=False,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning("pactl list sinks failed", exc_info=True)
return False
return ALARM_AUDIO_SINK in result.stdout.decode(errors="replace")
def _current_default_sink(pactl: str) -> str | None:
"""Return the current default sink name, or None on failure / empty."""
try:
result = subprocess.run(
[pactl, "get-default-sink"],
capture_output=True,
timeout=3,
check=False,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning("pactl get-default-sink failed", exc_info=True)
return None
name = result.stdout.decode(errors="replace").strip()
return name or None
def _activate_alarm_audio() -> str | None:
"""Force the monitor's HDMI output on and route the alarm to it.
At wake time the Bluetooth speaker is disconnected and PipeWire only has the
``auto_null`` sink, so the alarm is silent. This forces the HDMI card
profile on, waits for its sink to appear, makes it the default sink, and
raises it to full volume - empirically the only output audible on this
machine at wake time (the G27Q monitor's built-in speaker).
Returns:
The previous default sink name (to restore on close), or ``None`` when
the alarm audio sink could not be activated.
"""
pactl = _pactl_path()
if pactl is None:
_logger.warning("pactl not on PATH; cannot activate alarm audio")
return None
subprocess.run(
[pactl, "set-card-profile", ALARM_AUDIO_CARD, ALARM_AUDIO_PROFILE],
capture_output=True,
timeout=3,
check=False,
)
attempts = max(
1,
int(ALARM_AUDIO_SINK_WAIT_SECONDS / ALARM_AUDIO_SINK_POLL_SECONDS),
)
for _ in range(attempts):
if _alarm_sink_present(pactl):
break
time.sleep(ALARM_AUDIO_SINK_POLL_SECONDS)
else:
_logger.warning(
"Alarm audio sink %s did not appear after %.0fs; alarm may be silent",
ALARM_AUDIO_SINK,
ALARM_AUDIO_SINK_WAIT_SECONDS,
)
return None
old_default = _current_default_sink(pactl)
for cmd in (
[pactl, "set-default-sink", ALARM_AUDIO_SINK],
[pactl, "set-sink-mute", ALARM_AUDIO_SINK, "0"],
[pactl, "set-sink-volume", ALARM_AUDIO_SINK, "100%"],
):
subprocess.run(cmd, capture_output=True, timeout=3, check=False)
_logger.warning("Alarm audio routed to %s at 100%%", ALARM_AUDIO_SINK)
return old_default
def _restore_alarm_audio(old_default: str | None) -> None:
"""Restore the default sink captured by :func:`_activate_alarm_audio`."""
if old_default is None:
return
pactl = _pactl_path()
if pactl is None:
return
subprocess.run(
[pactl, "set-default-sink", old_default],
capture_output=True,
timeout=3,
check=False,
)
def _warn_if_no_real_sink() -> None:
"""Log a loud warning if PipeWire only has the auto_null sink."""
pactl = _pactl_path()
if pactl is None:
_logger.warning("pactl not on PATH; cannot verify audio sinks")
return
try:
result = subprocess.run(
[pactl, "list", "short", "sinks"],
capture_output=True,
timeout=5,
check=False,
)
except (OSError, subprocess.TimeoutExpired):
_logger.warning("pactl list sinks failed", exc_info=True)
return
sinks_text = result.stdout.decode(errors="replace").strip()
sink_names = [
line.split("\t")[1] for line in sinks_text.splitlines() if "\t" in line
]
real_sinks = [s for s in sink_names if s != "auto_null"]
if not real_sinks:
_logger.warning(
"ONLY auto_null PipeWire sink available — alarm will be SILENT. Sinks: %s",
sink_names or "<none>",
)
else:
_logger.info("Audio sinks available: %s", sink_names)

View File

@ -0,0 +1,129 @@
"""Dismiss-challenge types for the wake alarm.
Provides three challenge variants:
- math: solve an arithmetic problem
- sort: type shuffled digits in ascending order
- flash: memorise a code before it is hidden
"""
from __future__ import annotations
import secrets
from python_pkg.wake_alarm._constants import DISMISS_CODE_LENGTH, DISMISS_FLASH_SECONDS
# Uppercase alphanumeric chars with visually ambiguous characters removed:
# O/0 (oh vs zero) and I/1 (capital-i vs one) are excluded so the code is
# legible at a glance, even half-asleep.
_DISMISS_CHARS: str = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789"
class _Challenge:
"""A dismiss challenge presented to the user to prove wakefulness."""
def __init__(
self,
*,
kind: str,
display: str,
answer: str,
hint: str,
) -> None:
"""Store challenge parameters.
Args:
kind: Challenge type "math", "flash", or "sort".
display: Text shown in the large code label.
answer: Expected typed answer (normalised, upper-case).
hint: Short instruction shown above the code label.
"""
self.kind: str = kind
self.display: str = display
self.answer: str = answer
self.hint: str = hint
def _generate_code() -> str:
"""Generate a random alphanumeric dismiss code.
Uses uppercase letters and digits only, with ambiguous characters
(O, I, 0, 1) removed so the displayed code is easy to read at a glance.
"""
return "".join(secrets.choice(_DISMISS_CHARS) for _ in range(DISMISS_CODE_LENGTH))
def _make_math_challenge() -> _Challenge:
"""Generate an arithmetic problem the user must solve to dismiss.
Picks randomly from addition, subtraction, and multiplication.
The user types only the numeric answer no copying, no autopilot.
"""
op = secrets.choice(("+", "-", "*"))
if op == "+":
a, b = 10 + secrets.randbelow(90), 10 + secrets.randbelow(90)
return _Challenge(
kind="math",
display=f"{a} + {b} = ?",
answer=str(a + b),
hint="Solve and type the answer",
)
if op == "-":
a = 20 + secrets.randbelow(80)
b = 10 + secrets.randbelow(a - 10)
return _Challenge(
kind="math",
display=f"{a} - {b} = ?",
answer=str(a - b),
hint="Solve and type the answer",
)
a, b = 12 + secrets.randbelow(14), 3 + secrets.randbelow(7)
return _Challenge(
kind="math",
display=f"{a} * {b} = ?",
answer=str(a * b),
hint="Solve and type the answer",
)
def _make_sort_challenge() -> _Challenge:
"""Generate a sort-the-digits challenge.
Displays six shuffled single digits; the user types them ascending (no spaces).
Requires a brief cognitive effort fast enough to be fair, slow enough to prove
you are awake.
"""
pool = list(range(1, 10))
for i in range(len(pool) - 1, 0, -1):
j = secrets.randbelow(i + 1)
pool[i], pool[j] = pool[j], pool[i]
digits = pool[:6]
display = " ".join(str(d) for d in digits)
answer = "".join(str(d) for d in sorted(digits))
return _Challenge(
kind="sort",
display=display,
answer=answer,
hint="Type digits sorted lowest → highest (no spaces)",
)
def _make_flash_challenge() -> _Challenge:
"""Generate a memorise-then-type challenge.
Shows a code for DISMISS_FLASH_SECONDS, then hides it.
The user must type the code from memory.
"""
code = _generate_code()
return _Challenge(
kind="flash",
display=code,
answer=code,
hint=f"Memorise this code — it disappears in {DISMISS_FLASH_SECONDS}s",
)
def _make_challenge() -> _Challenge:
"""Pick a random challenge type and generate an instance."""
return secrets.choice(
(_make_math_challenge, _make_flash_challenge, _make_sort_challenge),
)()

View File

@ -28,7 +28,13 @@ MEDIUM_BEEP_INTERVAL: float = 5.0
LOUD_TOGGLE_INTERVAL: float = 2.0
# Dismiss challenge: length of the random code
DISMISS_CODE_LENGTH: int = 6
DISMISS_CODE_LENGTH: int = 8
# Number of correct code entries required to dismiss the alarm.
# Requiring more than one round forces the user to stay awake long enough
# to actually read and type multiple independent codes.
DISMISS_ROUNDS_REQUIRED: int = 2
# Seconds the code is visible before being hidden in a flash challenge.
DISMISS_FLASH_SECONDS: int = 4
# How often the dismiss code refreshes (seconds)
DISMISS_CODE_REFRESH_SECONDS: int = 30

View File

@ -11,33 +11,27 @@ from unittest.mock import MagicMock, patch
import pytest
if TYPE_CHECKING:
from collections.abc import Generator, Iterator
from collections.abc import Generator
from python_pkg.wake_alarm._alarm import (
_activate_alarm_audio,
_alarm_sink_present,
_is_alarm_day,
_restore_display,
_should_run_alarm,
_wake_display,
)
from python_pkg.wake_alarm._audio import (
_beep_loud,
_beep_medium,
_beep_pcspkr,
_beep_soft,
_current_default_sink,
_ensure_tone_wav,
_find_fan_hwmon,
_generate_code,
_is_alarm_day,
_max_fans,
_parse_args,
_play_on_extra_devices,
_play_tone,
_restore_alarm_audio,
_restore_display,
_restore_fans,
_set_max_brightness,
_should_run_alarm,
_speaker_test_path,
_try_player,
_wake_display,
_warn_if_no_real_sink,
)
from python_pkg.wake_alarm._challenges import (
_DISMISS_CHARS,
_generate_code,
)
from python_pkg.wake_alarm._constants import (
DISMISS_CODE_LENGTH,
@ -92,10 +86,11 @@ class TestGenerateCode:
code = _generate_code()
assert len(code) == DISMISS_CODE_LENGTH
def test_all_digits(self) -> None:
"""Generated code contains only digits."""
def test_all_alphanumeric(self) -> None:
"""Generated code uses only the unambiguous alphanumeric charset."""
code = _generate_code()
assert code.isdigit()
assert all(c in _DISMISS_CHARS for c in code)
def test_different_codes(self) -> None:
"""Two calls produce different codes (probabilistic, but safe)."""
@ -177,7 +172,7 @@ class TestSpeakerTestPath:
def test_returns_path_when_found(self) -> None:
"""Return full path when speaker-test is available."""
with patch(
"python_pkg.wake_alarm._alarm.shutil.which",
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/speaker-test",
):
assert _speaker_test_path() == "/usr/bin/speaker-test"
@ -186,7 +181,7 @@ class TestSpeakerTestPath:
"""Raise FileNotFoundError when speaker-test is missing."""
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
"python_pkg.wake_alarm._audio.shutil.which",
return_value=None,
),
pytest.raises(FileNotFoundError, match="speaker-test not found"),
@ -199,7 +194,7 @@ class TestBeepFunctions:
def test_beep_soft_writes_bell(self) -> None:
"""_beep_soft writes terminal bell character."""
with patch("python_pkg.wake_alarm._alarm.sys") as mock_sys:
with patch("python_pkg.wake_alarm._audio.sys") as mock_sys:
mock_sys.stdout = MagicMock()
_beep_soft()
mock_sys.stdout.write.assert_called_once_with("\a")
@ -207,13 +202,13 @@ class TestBeepFunctions:
def test_beep_medium_delegates_to_play_tone(self) -> None:
"""_beep_medium just delegates to _play_tone."""
with patch("python_pkg.wake_alarm._alarm._play_tone") as mock_play:
with patch("python_pkg.wake_alarm._audio._play_tone") as mock_play:
_beep_medium(frequency=800)
mock_play.assert_called_once_with(800)
def test_beep_loud_delegates_to_play_tone(self) -> None:
"""_beep_loud just delegates to _play_tone."""
with patch("python_pkg.wake_alarm._alarm._play_tone") as mock_play:
with patch("python_pkg.wake_alarm._audio._play_tone") as mock_play:
_beep_loud(frequency=1200)
mock_play.assert_called_once_with(1200)
@ -308,10 +303,10 @@ class TestPlayOnExtraDevices:
"""_play_on_extra_devices spawns speaker-test with PIPEWIRE_NODE set."""
with (
patch(
"python_pkg.wake_alarm._alarm._speaker_test_path",
"python_pkg.wake_alarm._audio._speaker_test_path",
return_value="/usr/bin/speaker-test",
),
patch("python_pkg.wake_alarm._alarm.subprocess.Popen") as mock_popen,
patch("python_pkg.wake_alarm._audio.subprocess.Popen") as mock_popen,
):
_play_on_extra_devices(1000)
mock_popen.assert_called_once()
@ -327,10 +322,10 @@ class TestPlayOnExtraDevices:
"""_play_on_extra_devices does nothing when speaker-test is absent."""
with (
patch(
"python_pkg.wake_alarm._alarm._speaker_test_path",
"python_pkg.wake_alarm._audio._speaker_test_path",
side_effect=FileNotFoundError("not found"),
),
patch("python_pkg.wake_alarm._alarm.subprocess.Popen") as mock_popen,
patch("python_pkg.wake_alarm._audio.subprocess.Popen") as mock_popen,
):
_play_on_extra_devices(1000)
mock_popen.assert_not_called()
@ -339,11 +334,11 @@ class TestPlayOnExtraDevices:
"""_play_on_extra_devices silently ignores OSError from Popen."""
with (
patch(
"python_pkg.wake_alarm._alarm._speaker_test_path",
"python_pkg.wake_alarm._audio._speaker_test_path",
return_value="/usr/bin/speaker-test",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.Popen",
"python_pkg.wake_alarm._audio.subprocess.Popen",
side_effect=OSError("device busy"),
),
):
@ -390,18 +385,18 @@ class TestMaxFans:
def test_returns_false_when_no_hwmon(self) -> None:
"""No fan controller → returns False immediately."""
with patch("python_pkg.wake_alarm._alarm._find_fan_hwmon", return_value=None):
with patch("python_pkg.wake_alarm._audio._find_fan_hwmon", return_value=None):
assert _max_fans() is False
def test_returns_false_on_script_oserror(self, tmp_path: pathlib.Path) -> None:
"""OSError running fan script → returns False."""
with (
patch(
"python_pkg.wake_alarm._alarm._find_fan_hwmon",
"python_pkg.wake_alarm._audio._find_fan_hwmon",
return_value=str(tmp_path),
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
"python_pkg.wake_alarm._audio.subprocess.run",
side_effect=OSError("not found"),
),
):
@ -411,11 +406,11 @@ class TestMaxFans:
"""TimeoutExpired running fan script → returns False."""
with (
patch(
"python_pkg.wake_alarm._alarm._find_fan_hwmon",
"python_pkg.wake_alarm._audio._find_fan_hwmon",
return_value=str(tmp_path),
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
"python_pkg.wake_alarm._audio.subprocess.run",
side_effect=subprocess.TimeoutExpired("fan", 5),
),
):
@ -427,11 +422,11 @@ class TestMaxFans:
mock_result.returncode = 1
with (
patch(
"python_pkg.wake_alarm._alarm._find_fan_hwmon",
"python_pkg.wake_alarm._audio._find_fan_hwmon",
return_value=str(tmp_path),
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
"python_pkg.wake_alarm._audio.subprocess.run",
return_value=mock_result,
),
):
@ -443,11 +438,11 @@ class TestMaxFans:
mock_result.returncode = 0
with (
patch(
"python_pkg.wake_alarm._alarm._find_fan_hwmon",
"python_pkg.wake_alarm._audio._find_fan_hwmon",
return_value=str(tmp_path),
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
"python_pkg.wake_alarm._audio.subprocess.run",
return_value=mock_result,
),
):
@ -459,13 +454,13 @@ class TestRestoreFans:
def test_noop_when_inactive(self) -> None:
"""False state → subprocess.run is never called."""
with patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run:
with patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run:
_restore_fans(active=False)
mock_run.assert_not_called()
def test_calls_fan_script_restore(self) -> None:
"""Active state → fan script called with restore (no args)."""
with patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run:
with patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run:
mock_run.return_value.returncode = 0
_restore_fans(active=True)
mock_run.assert_called_once()
@ -475,7 +470,7 @@ class TestRestoreFans:
def test_ignores_oserror_on_restore(self) -> None:
"""OSError from fan script is silently suppressed."""
with patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
"python_pkg.wake_alarm._audio.subprocess.run",
side_effect=OSError("no script"),
):
_restore_fans(active=True) # must not raise
@ -483,672 +478,7 @@ class TestRestoreFans:
def test_ignores_timeout_on_restore(self) -> None:
"""TimeoutExpired from fan script is silently suppressed."""
with patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
"python_pkg.wake_alarm._audio.subprocess.run",
side_effect=subprocess.TimeoutExpired("fan", 5),
):
_restore_fans(active=True) # must not raise
class TestSetMaxBrightness:
"""Tests for _set_max_brightness."""
def test_noop_when_xrandr_missing(self) -> None:
"""No xrandr on PATH → subprocess.run never called."""
with (
patch("python_pkg.wake_alarm._alarm.shutil.which", return_value=None),
patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run,
):
_set_max_brightness()
mock_run.assert_not_called()
def test_noop_on_oserror_from_query(self) -> None:
"""OSError from xrandr --query is suppressed."""
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/xrandr",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
side_effect=OSError("no display"),
),
):
_set_max_brightness() # must not raise
def test_noop_on_timeout_from_query(self) -> None:
"""TimeoutExpired from xrandr --query is suppressed."""
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/xrandr",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
side_effect=subprocess.TimeoutExpired("xrandr", 5),
),
):
_set_max_brightness() # must not raise
def test_sets_brightness_for_connected_displays(self) -> None:
"""Connected displays each get an --output --brightness call."""
mock_query_result = MagicMock()
mock_query_result.stdout = (
"HDMI-0 connected 2560x1440+0+0\nDP-0 connected primary\n"
)
call_args_list: list[list[str]] = []
def fake_run(args: list[str], **_kwargs: object) -> MagicMock:
call_args_list.append(args)
return mock_query_result
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/xrandr",
),
patch("python_pkg.wake_alarm._alarm.subprocess.run", side_effect=fake_run),
):
_set_max_brightness()
# First call is --query; subsequent calls set brightness for each output.
brightness_calls = [a for a in call_args_list if "--brightness" in a]
expected_brightness_calls = 2
assert len(brightness_calls) == expected_brightness_calls
def test_skips_disconnected_outputs(self) -> None:
"""Disconnected outputs do NOT get a brightness call."""
mock_result = MagicMock()
mock_result.stdout = "Screen 0: minimum 320\nHDMI-0 disconnected\n"
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/xrandr",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
return_value=mock_result,
) as mock_run,
):
_set_max_brightness()
# Only the --query call, no brightness calls.
assert mock_run.call_count == 1
def test_warns_when_brightness_call_fails(self) -> None:
"""OSError on per-output --brightness call is logged but swallowed."""
query_result = MagicMock()
query_result.stdout = (
"Screen 0: minimum 320\nHDMI-0 connected primary 1920x1080\n"
)
def _run_side_effect(args: list[str], **_kwargs: object) -> MagicMock:
if "--query" in args:
return query_result
msg = "permission denied"
raise OSError(msg)
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/xrandr",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
side_effect=_run_side_effect,
),
):
_set_max_brightness() # must not raise
class TestEnsureToneWav:
"""Tests for _ensure_tone_wav (sine WAV generator + cache)."""
def test_generates_and_caches(self, tmp_path: pathlib.Path) -> None:
"""First call generates the WAV; second call returns the cached path."""
from python_pkg.wake_alarm import _alarm as alarm_mod
alarm_mod._TONE_CACHE.clear()
with patch(
"python_pkg.wake_alarm._alarm.tempfile.gettempdir",
return_value=str(tmp_path),
):
path1 = _ensure_tone_wav(440)
assert path1.exists()
size = path1.stat().st_size
assert size > 0
# Second call must hit the cache (no regeneration).
with patch("python_pkg.wake_alarm._alarm.wave.open") as mock_open:
path2 = _ensure_tone_wav(440)
mock_open.assert_not_called()
assert path2 == path1
alarm_mod._TONE_CACHE.clear()
def test_regenerates_when_cached_file_missing(
self,
tmp_path: pathlib.Path,
) -> None:
"""If the cached file was deleted, regenerate it."""
from python_pkg.wake_alarm._alarm import _TONE_CACHE
_TONE_CACHE.clear()
with patch(
"python_pkg.wake_alarm._alarm.tempfile.gettempdir",
return_value=str(tmp_path),
):
path1 = _ensure_tone_wav(880)
path1.unlink()
path2 = _ensure_tone_wav(880)
assert path2.exists()
_TONE_CACHE.clear()
class TestTryPlayer:
"""Tests for _try_player."""
def test_returns_false_when_binary_missing(
self,
tmp_path: pathlib.Path,
) -> None:
"""Missing binary returns False without raising."""
wav = tmp_path / "x.wav"
wav.write_bytes(b"\x00")
with patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value=None,
):
assert _try_player("paplay", wav) is False
def test_returns_true_on_success(self, tmp_path: pathlib.Path) -> None:
"""Zero exit code returns True."""
wav = tmp_path / "x.wav"
wav.write_bytes(b"\x00")
result = MagicMock()
result.returncode = 0
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/paplay",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
return_value=result,
),
):
assert _try_player("paplay", wav) is True
def test_returns_false_on_nonzero_exit(
self,
tmp_path: pathlib.Path,
) -> None:
"""Non-zero exit code returns False and logs."""
wav = tmp_path / "x.wav"
wav.write_bytes(b"\x00")
result = MagicMock()
result.returncode = 1
result.stderr = b"boom"
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/paplay",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
return_value=result,
),
):
assert _try_player("paplay", wav) is False
def test_returns_false_on_timeout(self, tmp_path: pathlib.Path) -> None:
"""TimeoutExpired returns False and logs."""
wav = tmp_path / "x.wav"
wav.write_bytes(b"\x00")
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/paplay",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
side_effect=subprocess.TimeoutExpired("paplay", 6),
),
):
assert _try_player("paplay", wav) is False
def test_returns_false_on_oserror(self, tmp_path: pathlib.Path) -> None:
"""OSError returns False and logs."""
wav = tmp_path / "x.wav"
wav.write_bytes(b"\x00")
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/paplay",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
side_effect=OSError("nope"),
),
):
assert _try_player("paplay", wav) is False
class TestBeepPcspkr:
"""Tests for _beep_pcspkr (evdev PC speaker)."""
def test_writes_tone_then_zero_to_device(self) -> None:
"""Successful path writes start-frequency then stop event."""
mock_dev = MagicMock()
mock_open_ctx = MagicMock()
mock_open_ctx.__enter__.return_value = mock_dev
mock_open_ctx.__exit__.return_value = False
with (
patch(
"python_pkg.wake_alarm._alarm.Path.open",
return_value=mock_open_ctx,
),
patch("python_pkg.wake_alarm._alarm.time.sleep"),
):
_beep_pcspkr(1000, 0.05)
# First write carries the frequency, second write carries 0 (stop).
assert mock_dev.write.call_count == 2
def test_oserror_is_swallowed(self) -> None:
"""OSError opening the device must not raise."""
with patch(
"python_pkg.wake_alarm._alarm.Path.open",
side_effect=OSError("no device"),
):
_beep_pcspkr(1000, 0.05) # must not raise
class TestPlayTone:
"""Tests for _play_tone."""
@pytest.fixture(autouse=True)
def _silence_pcspkr(self) -> Iterator[None]:
"""Stop tests from hitting the real /dev/input PC speaker device."""
with patch("python_pkg.wake_alarm._alarm._beep_pcspkr"):
yield
def test_paplay_success_short_circuits(self, tmp_path: pathlib.Path) -> None:
"""If paplay succeeds, no further players are tried."""
wav = tmp_path / "tone.wav"
wav.write_bytes(b"\x00")
with (
patch(
"python_pkg.wake_alarm._alarm._ensure_tone_wav",
return_value=wav,
),
patch(
"python_pkg.wake_alarm._alarm._try_player",
return_value=True,
) as mock_try,
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
) as mock_run,
):
_play_tone(440)
mock_try.assert_called_once_with("paplay", wav)
mock_run.assert_not_called()
def test_falls_back_to_aplay_then_speaker_test(
self,
tmp_path: pathlib.Path,
) -> None:
"""paplay+aplay fail → speaker-test is tried."""
wav = tmp_path / "tone.wav"
wav.write_bytes(b"\x00")
with (
patch(
"python_pkg.wake_alarm._alarm._ensure_tone_wav",
return_value=wav,
),
patch(
"python_pkg.wake_alarm._alarm._try_player",
return_value=False,
),
patch(
"python_pkg.wake_alarm._alarm._speaker_test_path",
return_value="/usr/bin/speaker-test",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
) as mock_run,
):
_play_tone(1000)
mock_run.assert_called_once()
args = mock_run.call_args[0][0]
assert "/usr/bin/speaker-test" in args
assert "1000" in args
def test_soft_beep_when_speaker_test_missing(
self,
tmp_path: pathlib.Path,
) -> None:
"""All players fail → soft beep."""
wav = tmp_path / "tone.wav"
wav.write_bytes(b"\x00")
with (
patch(
"python_pkg.wake_alarm._alarm._ensure_tone_wav",
return_value=wav,
),
patch(
"python_pkg.wake_alarm._alarm._try_player",
return_value=False,
),
patch(
"python_pkg.wake_alarm._alarm._speaker_test_path",
side_effect=FileNotFoundError("missing"),
),
patch("python_pkg.wake_alarm._alarm._beep_soft") as mock_soft,
):
_play_tone(800)
mock_soft.assert_called_once()
def test_soft_beep_when_speaker_test_times_out(
self,
tmp_path: pathlib.Path,
) -> None:
"""speaker-test TimeoutExpired → soft beep."""
wav = tmp_path / "tone.wav"
wav.write_bytes(b"\x00")
with (
patch(
"python_pkg.wake_alarm._alarm._ensure_tone_wav",
return_value=wav,
),
patch(
"python_pkg.wake_alarm._alarm._try_player",
return_value=False,
),
patch(
"python_pkg.wake_alarm._alarm._speaker_test_path",
return_value="/usr/bin/speaker-test",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
side_effect=subprocess.TimeoutExpired("speaker-test", 6),
),
patch("python_pkg.wake_alarm._alarm._beep_soft") as mock_soft,
):
_play_tone(800)
mock_soft.assert_called_once()
def test_soft_beep_when_wav_generation_fails(self) -> None:
"""OSError generating WAV → soft beep."""
with (
patch(
"python_pkg.wake_alarm._alarm._ensure_tone_wav",
side_effect=OSError("disk full"),
),
patch("python_pkg.wake_alarm._alarm._beep_soft") as mock_soft,
):
_play_tone(440)
mock_soft.assert_called_once()
class TestWarnIfNoRealSink:
"""Tests for _warn_if_no_real_sink."""
def test_logs_when_pactl_missing(self) -> None:
"""No pactl on PATH → warns and returns."""
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value=None,
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
) as mock_run,
):
_warn_if_no_real_sink()
mock_run.assert_not_called()
def test_warns_when_only_auto_null(self) -> None:
"""Only auto_null sink → warning is emitted."""
result = MagicMock()
result.stdout = b"4319\tauto_null\tPipeWire\tfloat32le 2ch 48000Hz\tIDLE\n"
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/pactl",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
return_value=result,
),
patch("python_pkg.wake_alarm._alarm._logger") as mock_log,
):
_warn_if_no_real_sink()
mock_log.warning.assert_called()
def test_info_when_real_sink_present(self) -> None:
"""A non-auto_null sink → info log, no warning."""
result = MagicMock()
result.stdout = b"1\talsa_output.pci-0000_01_00.1.hdmi-stereo\tPipeWire\t-\t-\n"
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/pactl",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
return_value=result,
),
patch("python_pkg.wake_alarm._alarm._logger") as mock_log,
):
_warn_if_no_real_sink()
mock_log.info.assert_called()
mock_log.warning.assert_not_called()
def test_handles_pactl_failure(self) -> None:
"""OSError/TimeoutExpired running pactl → warning, no raise."""
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/pactl",
),
patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
side_effect=subprocess.TimeoutExpired("pactl", 5),
),
):
_warn_if_no_real_sink() # must not raise
class TestAlarmSinkPresent:
"""Tests for _alarm_sink_present."""
def test_true_when_sink_listed(self) -> None:
"""Returns True when the alarm sink name appears in pactl output."""
from python_pkg.wake_alarm._constants import ALARM_AUDIO_SINK
proc = MagicMock(stdout=ALARM_AUDIO_SINK.encode() + b"\tPipeWire\n")
with patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
return_value=proc,
):
assert _alarm_sink_present("/usr/bin/pactl") is True
def test_false_when_sink_absent(self) -> None:
"""Returns False when the alarm sink is not in pactl output."""
proc = MagicMock(stdout=b"auto_null\tPipeWire\n")
with patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
return_value=proc,
):
assert _alarm_sink_present("/usr/bin/pactl") is False
def test_false_on_subprocess_error(self) -> None:
"""OSError while listing sinks → False, no raise."""
with patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
side_effect=OSError("boom"),
):
assert _alarm_sink_present("/usr/bin/pactl") is False
class TestCurrentDefaultSink:
"""Tests for _current_default_sink."""
def test_returns_sink_name(self) -> None:
"""Returns the trimmed default sink name."""
proc = MagicMock(stdout=b"jbl_sink\n")
with patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
return_value=proc,
):
assert _current_default_sink("/usr/bin/pactl") == "jbl_sink"
def test_returns_none_when_empty(self) -> None:
"""Empty output → None."""
proc = MagicMock(stdout=b"\n")
with patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
return_value=proc,
):
assert _current_default_sink("/usr/bin/pactl") is None
def test_returns_none_on_error(self) -> None:
"""TimeoutExpired → None, no raise."""
with patch(
"python_pkg.wake_alarm._alarm.subprocess.run",
side_effect=subprocess.TimeoutExpired("pactl", 3),
):
assert _current_default_sink("/usr/bin/pactl") is None
class TestActivateAlarmAudio:
"""Tests for _activate_alarm_audio."""
def test_returns_none_when_pactl_missing(self) -> None:
"""No pactl on PATH → returns None without touching audio."""
with (
patch("python_pkg.wake_alarm._alarm.shutil.which", return_value=None),
patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run,
):
assert _activate_alarm_audio() is None
mock_run.assert_not_called()
def test_activates_and_returns_old_default(self) -> None:
"""Sink present → routes audio there and returns prior default sink."""
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/pactl",
),
patch(
"python_pkg.wake_alarm._alarm._alarm_sink_present",
return_value=True,
),
patch(
"python_pkg.wake_alarm._alarm._current_default_sink",
return_value="jbl_sink",
),
patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run,
):
result = _activate_alarm_audio()
assert result == "jbl_sink"
cmds = [call.args[0] for call in mock_run.call_args_list]
from python_pkg.wake_alarm._constants import (
ALARM_AUDIO_CARD,
ALARM_AUDIO_PROFILE,
ALARM_AUDIO_SINK,
)
assert [
"/usr/bin/pactl",
"set-card-profile",
ALARM_AUDIO_CARD,
ALARM_AUDIO_PROFILE,
] in cmds
assert ["/usr/bin/pactl", "set-default-sink", ALARM_AUDIO_SINK] in cmds
def test_returns_none_when_sink_never_appears(self) -> None:
"""Sink never shows up → returns None after polling (no raise)."""
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/pactl",
),
patch(
"python_pkg.wake_alarm._alarm._alarm_sink_present",
return_value=False,
),
patch("python_pkg.wake_alarm._alarm.time.sleep") as mock_sleep,
patch("python_pkg.wake_alarm._alarm.subprocess.run"),
):
assert _activate_alarm_audio() is None
mock_sleep.assert_called()
def test_waits_then_succeeds(self) -> None:
"""Sink absent then present → sleeps once, then routes audio."""
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/pactl",
),
patch(
"python_pkg.wake_alarm._alarm._alarm_sink_present",
side_effect=[False, True],
),
patch(
"python_pkg.wake_alarm._alarm._current_default_sink",
return_value="old",
),
patch("python_pkg.wake_alarm._alarm.time.sleep") as mock_sleep,
patch("python_pkg.wake_alarm._alarm.subprocess.run"),
):
assert _activate_alarm_audio() == "old"
mock_sleep.assert_called_once()
class TestRestoreAlarmAudio:
"""Tests for _restore_alarm_audio."""
def test_none_is_noop(self) -> None:
"""None default → does nothing, no pactl lookup."""
with patch("python_pkg.wake_alarm._alarm.shutil.which") as mock_which:
_restore_alarm_audio(None)
mock_which.assert_not_called()
def test_no_pactl_returns_silently(self) -> None:
"""Default present but pactl missing → no raise, no run."""
with (
patch("python_pkg.wake_alarm._alarm.shutil.which", return_value=None),
patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run,
):
_restore_alarm_audio("jbl_sink")
mock_run.assert_not_called()
def test_restores_default_sink(self) -> None:
"""Calls set-default-sink with the captured prior default."""
with (
patch(
"python_pkg.wake_alarm._alarm.shutil.which",
return_value="/usr/bin/pactl",
),
patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run,
):
_restore_alarm_audio("jbl_sink")
cmds = [call.args[0] for call in mock_run.call_args_list]
assert ["/usr/bin/pactl", "set-default-sink", "jbl_sink"] in cmds
class TestParseArgs:
"""Tests for _parse_args."""
def test_default_flags_are_false(self) -> None:
"""No CLI args means every flag is False."""
ns = _parse_args([])
assert ns.demo is False
assert ns.trigger_now is False
assert ns.production is False
def test_flags_parse(self) -> None:
"""Each flag flips to True when passed."""
ns = _parse_args(["--production", "--demo", "--trigger-now"])
assert ns.production is True
assert ns.demo is True
assert ns.trigger_now is True

View File

@ -0,0 +1,420 @@
"""Tests for _audio.py — audio playback, fan control, and sink management."""
from __future__ import annotations
import subprocess
from typing import TYPE_CHECKING
from unittest.mock import MagicMock, patch
import pytest
if TYPE_CHECKING:
from collections.abc import Iterator
import pathlib
from python_pkg.wake_alarm._audio import (
_beep_pcspkr,
_ensure_tone_wav,
_play_tone,
_set_max_brightness,
_try_player,
)
class TestSetMaxBrightness:
"""Tests for _set_max_brightness."""
def test_noop_when_xrandr_missing(self) -> None:
"""No xrandr on PATH → subprocess.run never called."""
with (
patch("python_pkg.wake_alarm._audio.shutil.which", return_value=None),
patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run,
):
_set_max_brightness()
mock_run.assert_not_called()
def test_noop_on_oserror_from_query(self) -> None:
"""OSError from xrandr --query is suppressed."""
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/xrandr",
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
side_effect=OSError("no display"),
),
):
_set_max_brightness() # must not raise
def test_noop_on_timeout_from_query(self) -> None:
"""TimeoutExpired from xrandr --query is suppressed."""
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/xrandr",
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
side_effect=subprocess.TimeoutExpired("xrandr", 5),
),
):
_set_max_brightness() # must not raise
def test_sets_brightness_for_connected_displays(self) -> None:
"""Connected displays each get an --output --brightness call."""
mock_query_result = MagicMock()
mock_query_result.stdout = (
"HDMI-0 connected 2560x1440+0+0\nDP-0 connected primary\n"
)
call_args_list: list[list[str]] = []
def fake_run(args: list[str], **_kwargs: object) -> MagicMock:
call_args_list.append(args)
return mock_query_result
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/xrandr",
),
patch("python_pkg.wake_alarm._audio.subprocess.run", side_effect=fake_run),
):
_set_max_brightness()
# First call is --query; subsequent calls set brightness for each output.
brightness_calls = [a for a in call_args_list if "--brightness" in a]
expected_brightness_calls = 2
assert len(brightness_calls) == expected_brightness_calls
def test_skips_disconnected_outputs(self) -> None:
"""Disconnected outputs do NOT get a brightness call."""
mock_result = MagicMock()
mock_result.stdout = "Screen 0: minimum 320\nHDMI-0 disconnected\n"
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/xrandr",
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
return_value=mock_result,
) as mock_run,
):
_set_max_brightness()
# Only the --query call, no brightness calls.
assert mock_run.call_count == 1
def test_warns_when_brightness_call_fails(self) -> None:
"""OSError on per-output --brightness call is logged but swallowed."""
query_result = MagicMock()
query_result.stdout = (
"Screen 0: minimum 320\nHDMI-0 connected primary 1920x1080\n"
)
def _run_side_effect(args: list[str], **_kwargs: object) -> MagicMock:
if "--query" in args:
return query_result
msg = "permission denied"
raise OSError(msg)
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/xrandr",
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
side_effect=_run_side_effect,
),
):
_set_max_brightness() # must not raise
class TestEnsureToneWav:
"""Tests for _ensure_tone_wav (sine WAV generator + cache)."""
def test_generates_and_caches(self, tmp_path: pathlib.Path) -> None:
"""First call generates the WAV; second call returns the cached path."""
from python_pkg.wake_alarm import _audio as alarm_mod
alarm_mod._TONE_CACHE.clear()
with patch(
"python_pkg.wake_alarm._audio.tempfile.gettempdir",
return_value=str(tmp_path),
):
path1 = _ensure_tone_wav(440)
assert path1.exists()
size = path1.stat().st_size
assert size > 0
# Second call must hit the cache (no regeneration).
with patch("python_pkg.wake_alarm._audio.wave.open") as mock_open:
path2 = _ensure_tone_wav(440)
mock_open.assert_not_called()
assert path2 == path1
alarm_mod._TONE_CACHE.clear()
def test_regenerates_when_cached_file_missing(
self,
tmp_path: pathlib.Path,
) -> None:
"""If the cached file was deleted, regenerate it."""
from python_pkg.wake_alarm._audio import _TONE_CACHE
_TONE_CACHE.clear()
with patch(
"python_pkg.wake_alarm._audio.tempfile.gettempdir",
return_value=str(tmp_path),
):
path1 = _ensure_tone_wav(880)
path1.unlink()
path2 = _ensure_tone_wav(880)
assert path2.exists()
_TONE_CACHE.clear()
class TestTryPlayer:
"""Tests for _try_player."""
def test_returns_false_when_binary_missing(
self,
tmp_path: pathlib.Path,
) -> None:
"""Missing binary returns False without raising."""
wav = tmp_path / "x.wav"
wav.write_bytes(b"\x00")
with patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value=None,
):
assert _try_player("paplay", wav) is False
def test_returns_true_on_success(self, tmp_path: pathlib.Path) -> None:
"""Zero exit code returns True."""
wav = tmp_path / "x.wav"
wav.write_bytes(b"\x00")
result = MagicMock()
result.returncode = 0
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/paplay",
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
return_value=result,
),
):
assert _try_player("paplay", wav) is True
def test_returns_false_on_nonzero_exit(
self,
tmp_path: pathlib.Path,
) -> None:
"""Non-zero exit code returns False and logs."""
wav = tmp_path / "x.wav"
wav.write_bytes(b"\x00")
result = MagicMock()
result.returncode = 1
result.stderr = b"boom"
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/paplay",
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
return_value=result,
),
):
assert _try_player("paplay", wav) is False
def test_returns_false_on_timeout(self, tmp_path: pathlib.Path) -> None:
"""TimeoutExpired returns False and logs."""
wav = tmp_path / "x.wav"
wav.write_bytes(b"\x00")
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/paplay",
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
side_effect=subprocess.TimeoutExpired("paplay", 6),
),
):
assert _try_player("paplay", wav) is False
def test_returns_false_on_oserror(self, tmp_path: pathlib.Path) -> None:
"""OSError returns False and logs."""
wav = tmp_path / "x.wav"
wav.write_bytes(b"\x00")
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/paplay",
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
side_effect=OSError("nope"),
),
):
assert _try_player("paplay", wav) is False
class TestBeepPcspkr:
"""Tests for _beep_pcspkr (evdev PC speaker)."""
def test_writes_tone_then_zero_to_device(self) -> None:
"""Successful path writes start-frequency then stop event."""
mock_dev = MagicMock()
mock_open_ctx = MagicMock()
mock_open_ctx.__enter__.return_value = mock_dev
mock_open_ctx.__exit__.return_value = False
with (
patch(
"python_pkg.wake_alarm._audio.Path.open",
return_value=mock_open_ctx,
),
patch("python_pkg.wake_alarm._audio.time.sleep"),
):
_beep_pcspkr(1000, 0.05)
# First write carries the frequency, second write carries 0 (stop).
assert mock_dev.write.call_count == 2
def test_oserror_is_swallowed(self) -> None:
"""OSError opening the device must not raise."""
with patch(
"python_pkg.wake_alarm._audio.Path.open",
side_effect=OSError("no device"),
):
_beep_pcspkr(1000, 0.05) # must not raise
class TestPlayTone:
"""Tests for _play_tone."""
@pytest.fixture(autouse=True)
def _silence_pcspkr(self) -> Iterator[None]:
"""Stop tests from hitting the real /dev/input PC speaker device."""
with patch("python_pkg.wake_alarm._audio._beep_pcspkr"):
yield
def test_paplay_success_short_circuits(self, tmp_path: pathlib.Path) -> None:
"""If paplay succeeds, no further players are tried."""
wav = tmp_path / "tone.wav"
wav.write_bytes(b"\x00")
with (
patch(
"python_pkg.wake_alarm._audio._ensure_tone_wav",
return_value=wav,
),
patch(
"python_pkg.wake_alarm._audio._try_player",
return_value=True,
) as mock_try,
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
) as mock_run,
):
_play_tone(440)
mock_try.assert_called_once_with("paplay", wav)
mock_run.assert_not_called()
def test_falls_back_to_aplay_then_speaker_test(
self,
tmp_path: pathlib.Path,
) -> None:
"""paplay+aplay fail → speaker-test is tried."""
wav = tmp_path / "tone.wav"
wav.write_bytes(b"\x00")
with (
patch(
"python_pkg.wake_alarm._audio._ensure_tone_wav",
return_value=wav,
),
patch(
"python_pkg.wake_alarm._audio._try_player",
return_value=False,
),
patch(
"python_pkg.wake_alarm._audio._speaker_test_path",
return_value="/usr/bin/speaker-test",
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
) as mock_run,
):
_play_tone(1000)
mock_run.assert_called_once()
args = mock_run.call_args[0][0]
assert "/usr/bin/speaker-test" in args
assert "1000" in args
def test_soft_beep_when_speaker_test_missing(
self,
tmp_path: pathlib.Path,
) -> None:
"""All players fail → soft beep."""
wav = tmp_path / "tone.wav"
wav.write_bytes(b"\x00")
with (
patch(
"python_pkg.wake_alarm._audio._ensure_tone_wav",
return_value=wav,
),
patch(
"python_pkg.wake_alarm._audio._try_player",
return_value=False,
),
patch(
"python_pkg.wake_alarm._audio._speaker_test_path",
side_effect=FileNotFoundError("missing"),
),
patch("python_pkg.wake_alarm._audio._beep_soft") as mock_soft,
):
_play_tone(800)
mock_soft.assert_called_once()
def test_soft_beep_when_speaker_test_times_out(
self,
tmp_path: pathlib.Path,
) -> None:
"""speaker-test TimeoutExpired → soft beep."""
wav = tmp_path / "tone.wav"
wav.write_bytes(b"\x00")
with (
patch(
"python_pkg.wake_alarm._audio._ensure_tone_wav",
return_value=wav,
),
patch(
"python_pkg.wake_alarm._audio._try_player",
return_value=False,
),
patch(
"python_pkg.wake_alarm._audio._speaker_test_path",
return_value="/usr/bin/speaker-test",
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
side_effect=subprocess.TimeoutExpired("speaker-test", 6),
),
patch("python_pkg.wake_alarm._audio._beep_soft") as mock_soft,
):
_play_tone(800)
mock_soft.assert_called_once()
def test_soft_beep_when_wav_generation_fails(self) -> None:
"""OSError generating WAV → soft beep."""
with (
patch(
"python_pkg.wake_alarm._audio._ensure_tone_wav",
side_effect=OSError("disk full"),
),
patch("python_pkg.wake_alarm._audio._beep_soft") as mock_soft,
):
_play_tone(440)
mock_soft.assert_called_once()

View File

@ -0,0 +1,133 @@
"""Tests for _challenges.py — dismiss challenge generators."""
from __future__ import annotations
from unittest.mock import patch
from python_pkg.wake_alarm._challenges import (
_DISMISS_CHARS,
_Challenge,
_make_challenge,
_make_flash_challenge,
_make_math_challenge,
_make_sort_challenge,
)
from python_pkg.wake_alarm._constants import DISMISS_FLASH_SECONDS
class TestMakeMathChallenge:
"""Tests for _make_math_challenge."""
def test_kind_is_math(self) -> None:
"""Challenge kind is always 'math'."""
assert _make_math_challenge().kind == "math"
def test_answer_is_correct_for_addition(self) -> None:
"""Stored answer is numerically correct for addition."""
with (
patch("python_pkg.wake_alarm._challenges.secrets.choice", return_value="+"),
patch(
"python_pkg.wake_alarm._challenges.secrets.randbelow",
side_effect=[13, 35], # 10+13=23, 10+35=45
),
):
ch = _make_math_challenge()
assert ch.display == "23 + 45 = ?"
assert ch.answer == "68"
def test_answer_is_correct_for_subtraction(self) -> None:
"""Stored answer is numerically correct for subtraction."""
with (
patch("python_pkg.wake_alarm._challenges.secrets.choice", return_value="-"),
patch(
"python_pkg.wake_alarm._challenges.secrets.randbelow",
side_effect=[30, 7], # 20+30=50, 10+7=17
),
):
ch = _make_math_challenge()
assert ch.display == "50 - 17 = ?"
assert ch.answer == "33"
def test_answer_is_correct_for_multiplication(self) -> None:
"""Stored answer is numerically correct for multiplication."""
with (
patch("python_pkg.wake_alarm._challenges.secrets.choice", return_value="*"),
patch(
"python_pkg.wake_alarm._challenges.secrets.randbelow",
side_effect=[3, 4], # 12+3=15, 3+4=7
),
):
ch = _make_math_challenge()
assert ch.display == "15 * 7 = ?"
assert ch.answer == "105"
def test_answer_varies_across_calls(self) -> None:
"""Multiple calls produce varied answers (probabilistic)."""
answers = {_make_math_challenge().answer for _ in range(30)}
assert len(answers) > 1
class TestMakeSortChallenge:
"""Tests for _make_sort_challenge."""
def test_kind_is_sort(self) -> None:
"""Challenge kind is always 'sort'."""
assert _make_sort_challenge().kind == "sort"
def test_answer_is_sorted_digits(self) -> None:
"""Answer equals the digits in display sorted ascending."""
ch = _make_sort_challenge()
displayed_digits = [int(c) for c in ch.display if c.isdigit()]
expected = "".join(str(d) for d in sorted(displayed_digits))
assert ch.answer == expected
def test_display_contains_six_digits(self) -> None:
"""Display always contains exactly six digit characters."""
ch = _make_sort_challenge()
assert len([c for c in ch.display if c.isdigit()]) == 6
def test_answer_varies_across_calls(self) -> None:
"""Multiple calls produce varied digit sets."""
answers = {_make_sort_challenge().answer for _ in range(30)}
assert len(answers) > 1
class TestMakeFlashChallenge:
"""Tests for _make_flash_challenge."""
def test_kind_is_flash(self) -> None:
"""Challenge kind is always 'flash'."""
assert _make_flash_challenge().kind == "flash"
def test_display_equals_answer(self) -> None:
"""Display and answer are identical (the user must recall the full code)."""
ch = _make_flash_challenge()
assert ch.display == ch.answer
def test_code_uses_dismiss_chars(self) -> None:
"""Generated code only contains chars from _DISMISS_CHARS."""
ch = _make_flash_challenge()
assert all(c in _DISMISS_CHARS for c in ch.answer)
def test_hint_mentions_flash_seconds(self) -> None:
"""Hint text includes the number of visible seconds."""
ch = _make_flash_challenge()
assert str(DISMISS_FLASH_SECONDS) in ch.hint
class TestMakeChallenge:
"""Tests for _make_challenge (the random dispatcher)."""
def test_returns_a_challenge(self) -> None:
"""Returns a _Challenge instance with all expected fields populated."""
ch = _make_challenge()
assert isinstance(ch, _Challenge)
assert ch.kind in ("math", "flash", "sort")
assert ch.display
assert ch.answer
assert ch.hint
def test_all_types_reachable(self) -> None:
"""All three challenge types appear across many calls."""
kinds = {_make_challenge().kind for _ in range(200)}
assert kinds == {"math", "flash", "sort"}

View File

@ -15,10 +15,6 @@ from python_pkg.wake_alarm._alarm import (
WakeAlarm,
main,
)
from python_pkg.wake_alarm._constants import (
PHASE_MEDIUM_END,
PHASE_SOFT_END,
)
# ---------------------------------------------------------------------------
# Helpers (duplicated from part 1 so this file is self-contained)
@ -123,37 +119,77 @@ class TestWakeAlarmInit:
class TestWakeAlarmDismiss:
"""Tests for alarm dismiss logic."""
def test_correct_code_dismisses(
def test_correct_code_dismisses_after_all_rounds(
self,
mock_tk_module: MagicMock,
) -> None:
"""Entering the correct code dismisses the alarm."""
"""Entering the correct answer for every required round dismisses the alarm."""
from python_pkg.wake_alarm._constants import DISMISS_ROUNDS_REQUIRED
alarm = WakeAlarm(demo_mode=True)
code = alarm._current_code
mock_entry = mock_tk_module.Entry.return_value
mock_entry.get.return_value = code
with patch(
"python_pkg.wake_alarm._alarm.save_wake_state",
) as mock_save:
alarm._on_submit()
for _ in range(DISMISS_ROUNDS_REQUIRED):
mock_entry.get.return_value = alarm._current_challenge.answer
alarm._on_submit()
assert alarm.dismissed is True
mock_save.assert_called_once()
call_kwargs = mock_save.call_args[1]
assert call_kwargs["skip_workout"] is True
assert mock_save.call_args[1]["skip_workout"] is True
alarm._stop_beep.set()
def test_first_round_correct_does_not_dismiss(
self,
mock_tk_module: MagicMock,
) -> None:
"""A single correct entry is not enough — DISMISS_ROUNDS_REQUIRED is 2+."""
alarm = WakeAlarm(demo_mode=True)
mock_entry = mock_tk_module.Entry.return_value
mock_entry.get.return_value = alarm._current_challenge.answer
alarm._on_submit()
assert alarm.dismissed is False
assert alarm._rounds_completed == 1
alarm._stop_beep.set()
def test_first_round_correct_non_flash_next_no_countdown(
self,
mock_tk_module: MagicMock,
) -> None:
"""When next challenge is math, no flash countdown is started."""
from python_pkg.wake_alarm._challenges import _Challenge
alarm = WakeAlarm(demo_mode=True)
mock_entry = mock_tk_module.Entry.return_value
mock_entry.get.return_value = alarm._current_challenge.answer
next_math = _Challenge(kind="math", display="2 + 2 = ?", answer="4", hint="x")
with patch(
"python_pkg.wake_alarm._alarm._make_challenge", return_value=next_math
):
alarm._on_submit()
assert alarm._current_challenge.kind == "math"
assert alarm.dismissed is False
alarm._stop_beep.set()
def test_wrong_code_does_not_dismiss(
self,
mock_tk_module: MagicMock,
) -> None:
"""Entering the wrong code shows error without dismissing."""
"""Entering the wrong answer shows an error without dismissing."""
from python_pkg.wake_alarm._alarm import _Challenge
alarm = WakeAlarm(demo_mode=True)
# Use a pinned math challenge so the non-flash wrong-answer branch is covered.
alarm._current_challenge = _Challenge(
kind="math", display="2 + 2 = ?", answer="4", hint="test"
)
mock_entry = mock_tk_module.Entry.return_value
mock_entry.get.return_value = "000000"
# Ensure current code is different
alarm._current_code = "123456"
mock_entry.get.return_value = "99"
alarm._on_submit()
@ -201,17 +237,19 @@ class TestWakeAlarmDismiss:
self,
mock_tk_module: MagicMock,
) -> None:
"""Typing the code after the skip window stops the alarm w/o a skip."""
"""Typing all rounds after the skip window stops the alarm without a skip."""
from python_pkg.wake_alarm._constants import DISMISS_ROUNDS_REQUIRED
alarm = WakeAlarm(demo_mode=True)
alarm._skip_earnable = False
code = alarm._current_code
mock_entry = mock_tk_module.Entry.return_value
mock_entry.get.return_value = code
with patch(
"python_pkg.wake_alarm._alarm.save_wake_state",
) as mock_save:
alarm._on_submit()
for _ in range(DISMISS_ROUNDS_REQUIRED):
mock_entry.get.return_value = alarm._current_challenge.answer
alarm._on_submit()
assert alarm.dismissed is True
assert mock_save.call_args[1]["skip_workout"] is False
@ -278,18 +316,17 @@ class TestMain:
class TestCodeRefreshAndTimer:
"""Tests for code refresh and timer update methods."""
def test_code_refresh_changes_code(
def test_code_refresh_changes_challenge(
self,
mock_tk_module: MagicMock,
) -> None:
"""Code refresh generates a new code."""
"""Code refresh generates a new challenge each call."""
alarm = WakeAlarm(demo_mode=True)
# Call refresh many times — at least one should differ
codes = set()
displays = set()
for _ in range(50):
alarm._schedule_code_refresh()
codes.add(alarm._current_code)
assert len(codes) > 1
displays.add(alarm._current_challenge.display)
assert len(displays) > 1
alarm._stop_beep.set()
def test_code_refresh_noop_when_not_active(
@ -299,10 +336,9 @@ class TestCodeRefreshAndTimer:
"""Code refresh is a no-op when alarm is no longer active."""
alarm = WakeAlarm(demo_mode=True)
alarm._active = False
old_code = alarm._current_code
old_challenge = alarm._current_challenge
alarm._schedule_code_refresh()
# Code doesn't change because _active=False causes early return
assert alarm._current_code == old_code
assert alarm._current_challenge is old_challenge
alarm._stop_beep.set()
def test_update_timer_noop_when_not_active(
@ -431,172 +467,3 @@ class TestScreenFlash:
mock_root.configure.assert_not_called()
mock_root.after.assert_not_called()
alarm._stop_beep.set()
class TestDismissWithoutSkip:
"""Tests for alarm dismiss without earning skip."""
def test_dismiss_without_skip_shows_no_skip_message(
self,
mock_tk_module: MagicMock,
) -> None:
"""Dismissing with earned_skip=False shows appropriate message."""
alarm = WakeAlarm(demo_mode=True)
# Simulate existing child widgets
mock_widget = MagicMock()
alarm._container.winfo_children.return_value = [mock_widget]
with patch(
"python_pkg.wake_alarm._alarm.save_wake_state",
) as mock_save:
alarm._dismiss_alarm(earned_skip=False)
assert alarm.dismissed is True
mock_save.assert_called_once()
call_kwargs = mock_save.call_args[1]
assert call_kwargs["skip_workout"] is False
mock_widget.destroy.assert_called_once()
alarm._stop_beep.set()
class TestSkipWindowExpiredMessage:
"""Tests for the on-screen message when the skip window expires."""
def test_expired_updates_status_label(
self,
mock_tk_module: MagicMock,
) -> None:
"""Expiry updates the status label instead of closing the alarm."""
del mock_tk_module
alarm = WakeAlarm(demo_mode=True)
alarm._on_skip_window_expired()
alarm._status_label.configure.assert_called_with(
text="No workout skip today.",
)
alarm._stop_beep.set()
class TestBeepLoopPhases:
"""Tests for different beep loop escalation phases."""
def test_medium_phase(
self,
mock_tk_module: MagicMock,
) -> None:
"""Beep loop enters medium phase after PHASE_SOFT_END minutes."""
alarm = WakeAlarm(demo_mode=True)
# Set alarm start to make elapsed > PHASE_SOFT_END minutes
import time as time_mod
alarm._alarm_start = time_mod.monotonic() - (PHASE_SOFT_END + 1) * 60
call_count = 0
def stop_after_one(*_args: object, **_kwargs: object) -> None:
nonlocal call_count
call_count += 1
if call_count >= 1:
alarm._stop_beep.set()
with (
patch(
"python_pkg.wake_alarm._alarm._beep_medium",
side_effect=stop_after_one,
) as mock_beep,
):
alarm._beep_loop()
mock_beep.assert_called()
alarm._stop_beep.set()
def test_loud_phase(
self,
mock_tk_module: MagicMock,
) -> None:
"""Beep loop enters loud phase after PHASE_MEDIUM_END minutes."""
alarm = WakeAlarm(demo_mode=True)
import time as time_mod
alarm._alarm_start = time_mod.monotonic() - (PHASE_MEDIUM_END + 1) * 60
call_count = 0
def stop_after_one(*_args: object, **_kwargs: object) -> None:
nonlocal call_count
call_count += 1
if call_count >= 1:
alarm._stop_beep.set()
with (
patch(
"python_pkg.wake_alarm._alarm._beep_loud",
side_effect=stop_after_one,
) as mock_beep,
):
alarm._beep_loop()
mock_beep.assert_called()
alarm._stop_beep.set()
class TestRunMethod:
"""Tests for the run() method."""
def test_run_calls_mainloop(
self,
mock_tk_module: MagicMock,
) -> None:
"""run() calls root.mainloop()."""
alarm = WakeAlarm(demo_mode=True)
alarm.run()
alarm.root.mainloop.assert_called_once()
alarm._stop_beep.set()
class TestUpdateTimerActive:
"""Tests for timer update when alarm is active."""
def test_update_timer_shows_skip_window(
self,
mock_tk_module: MagicMock,
) -> None:
"""While the skip is earnable, the timer shows the skip-window count."""
del mock_tk_module
alarm = WakeAlarm(demo_mode=True)
alarm._update_timer()
text = alarm._timer_label.configure.call_args[1]["text"]
assert text.startswith("Skip window:")
alarm._stop_beep.set()
def test_update_timer_shows_prompt_after_window(
self,
mock_tk_module: MagicMock,
) -> None:
"""After the window the timer shows the silence prompt and keeps going."""
import time as time_mod
alarm = WakeAlarm(demo_mode=True)
# Far in the past so remaining == 0 -> the else branch.
alarm._alarm_start = time_mod.monotonic() - 60 * 60
alarm.root.after.reset_mock()
alarm._update_timer()
text = alarm._timer_label.configure.call_args[1]["text"]
assert "type the code" in text
# The alarm keeps nagging: it always reschedules while active.
alarm.root.after.assert_called_once()
alarm._stop_beep.set()
def test_update_timer_noop_when_not_active(
self,
mock_tk_module: MagicMock,
) -> None:
"""Timer update is a no-op once the alarm is no longer active."""
del mock_tk_module
alarm = WakeAlarm(demo_mode=True)
alarm._active = False
alarm._timer_label.configure.reset_mock()
alarm._update_timer()
alarm._timer_label.configure.assert_not_called()
alarm._stop_beep.set()

View File

@ -0,0 +1,340 @@
"""Tests for WakeAlarm — beep loop phases, run, update timer, and flash challenge."""
from __future__ import annotations
import tkinter as tk
from typing import TYPE_CHECKING
from unittest.mock import MagicMock, patch
import pytest
if TYPE_CHECKING:
from collections.abc import Generator
from python_pkg.wake_alarm._alarm import (
WakeAlarm,
)
from python_pkg.wake_alarm._constants import (
PHASE_MEDIUM_END,
PHASE_SOFT_END,
)
def _make_mock_tk() -> MagicMock:
"""Build a MagicMock that stands in for the tkinter module."""
mock = MagicMock()
mock_root = MagicMock()
mock_root.winfo_screenwidth.return_value = 1920
mock_root.winfo_screenheight.return_value = 1080
mock.Tk.return_value = mock_root
mock.Frame.return_value = MagicMock()
mock.Label.return_value = MagicMock()
mock.Entry.return_value = MagicMock()
mock.TclError = tk.TclError
mock.END = tk.END
return mock
@pytest.fixture(autouse=True)
def _block_real_tk() -> Generator[MagicMock]:
"""Prevent any real Tk windows in tests."""
mock = _make_mock_tk()
with patch("python_pkg.wake_alarm._alarm.tk", mock):
yield mock
@pytest.fixture(autouse=True)
def _block_extra_devices() -> Generator[MagicMock]:
"""Prevent real subprocess calls for extra ALSA devices and hardware."""
with (
patch("python_pkg.wake_alarm._alarm._play_on_extra_devices") as mock,
patch("python_pkg.wake_alarm._alarm._max_fans", return_value=False),
patch("python_pkg.wake_alarm._alarm._restore_fans"),
patch("python_pkg.wake_alarm._alarm._set_max_brightness"),
patch("python_pkg.wake_alarm._alarm._wake_display"),
patch("python_pkg.wake_alarm._alarm._warn_if_no_real_sink"),
patch("python_pkg.wake_alarm._alarm._activate_alarm_audio", return_value=None),
patch("python_pkg.wake_alarm._alarm._restore_alarm_audio"),
patch("python_pkg.wake_alarm._alarm.turn_on_plug"),
patch("python_pkg.wake_alarm._alarm.turn_off_plug"),
):
yield mock
@pytest.fixture
def mock_tk_module() -> Generator[MagicMock]:
"""Provide explicit access to the mocked tk module."""
mock = _make_mock_tk()
with patch("python_pkg.wake_alarm._alarm.tk", mock):
yield mock
class TestBeepLoopPhases:
"""Tests for different beep loop escalation phases."""
def test_medium_phase(
self,
mock_tk_module: MagicMock,
) -> None:
"""Beep loop enters medium phase after PHASE_SOFT_END minutes."""
alarm = WakeAlarm(demo_mode=True)
# Set alarm start to make elapsed > PHASE_SOFT_END minutes
import time as time_mod
alarm._alarm_start = time_mod.monotonic() - (PHASE_SOFT_END + 1) * 60
call_count = 0
def stop_after_one(*_args: object, **_kwargs: object) -> None:
nonlocal call_count
call_count += 1
if call_count >= 1:
alarm._stop_beep.set()
with (
patch(
"python_pkg.wake_alarm._alarm._beep_medium",
side_effect=stop_after_one,
) as mock_beep,
):
alarm._beep_loop()
mock_beep.assert_called()
alarm._stop_beep.set()
def test_loud_phase(
self,
mock_tk_module: MagicMock,
) -> None:
"""Beep loop enters loud phase after PHASE_MEDIUM_END minutes."""
alarm = WakeAlarm(demo_mode=True)
import time as time_mod
alarm._alarm_start = time_mod.monotonic() - (PHASE_MEDIUM_END + 1) * 60
call_count = 0
def stop_after_one(*_args: object, **_kwargs: object) -> None:
nonlocal call_count
call_count += 1
if call_count >= 1:
alarm._stop_beep.set()
with (
patch(
"python_pkg.wake_alarm._alarm._beep_loud",
side_effect=stop_after_one,
) as mock_beep,
):
alarm._beep_loop()
mock_beep.assert_called()
alarm._stop_beep.set()
class TestRunMethod:
"""Tests for the run() method."""
def test_run_calls_mainloop(
self,
mock_tk_module: MagicMock,
) -> None:
"""run() calls root.mainloop()."""
alarm = WakeAlarm(demo_mode=True)
alarm.run()
alarm.root.mainloop.assert_called_once()
alarm._stop_beep.set()
class TestUpdateTimerActive:
"""Tests for timer update when alarm is active."""
def test_update_timer_shows_skip_window(
self,
mock_tk_module: MagicMock,
) -> None:
"""While the skip is earnable, the timer shows the skip-window count."""
del mock_tk_module
alarm = WakeAlarm(demo_mode=True)
alarm._update_timer()
text = alarm._timer_label.configure.call_args[1]["text"]
assert text.startswith("Skip window:")
alarm._stop_beep.set()
def test_update_timer_shows_prompt_after_window(
self,
mock_tk_module: MagicMock,
) -> None:
"""After the window the timer shows the silence prompt and keeps going."""
import time as time_mod
alarm = WakeAlarm(demo_mode=True)
# Far in the past so remaining == 0 -> the else branch.
alarm._alarm_start = time_mod.monotonic() - 60 * 60
alarm.root.after.reset_mock()
alarm._update_timer()
text = alarm._timer_label.configure.call_args[1]["text"]
assert "type the code" in text
# The alarm keeps nagging: it always reschedules while active.
alarm.root.after.assert_called_once()
alarm._stop_beep.set()
def test_update_timer_noop_when_not_active(
self,
mock_tk_module: MagicMock,
) -> None:
"""Timer update is a no-op once the alarm is no longer active."""
del mock_tk_module
alarm = WakeAlarm(demo_mode=True)
alarm._active = False
alarm._timer_label.configure.reset_mock()
alarm._update_timer()
alarm._timer_label.configure.assert_not_called()
alarm._stop_beep.set()
class TestFlashChallenge:
"""Tests for flash challenge countdown behaviour inside WakeAlarm."""
def test_flash_tick_counts_down_and_hides(
self,
mock_tk_module: MagicMock,
) -> None:
"""_flash_tick counts down per second and hides the code at zero."""
from python_pkg.wake_alarm._alarm import _Challenge
alarm = WakeAlarm(demo_mode=True)
alarm._current_challenge = _Challenge(
kind="flash",
display="ABCDEFGH",
answer="ABCDEFGH",
hint="Memorise",
)
alarm._flash_remaining = 2
alarm._status_label.configure.reset_mock()
alarm._flash_tick()
assert alarm._flash_remaining == 1
alarm._status_label.configure.assert_called()
alarm._flash_tick()
assert alarm._flash_remaining == 0
# Final tick hides the code.
alarm._flash_tick()
# _code_label and _status_label share the same mock; inspect all calls.
all_texts = [
c.kwargs.get("text", "") for c in alarm._code_label.configure.call_args_list
]
assert any("?" in t for t in all_texts)
alarm._stop_beep.set()
def test_flash_tick_noop_when_inactive(
self,
mock_tk_module: MagicMock,
) -> None:
"""_flash_tick returns immediately when the alarm is no longer active."""
alarm = WakeAlarm(demo_mode=True)
alarm._active = False
alarm._flash_remaining = 3
alarm._status_label.configure.reset_mock()
alarm._flash_tick()
alarm._status_label.configure.assert_not_called()
alarm._stop_beep.set()
def test_wrong_flash_answer_reshows_code(
self,
mock_tk_module: MagicMock,
) -> None:
"""Wrong flash answer restores the code and restarts the countdown."""
from python_pkg.wake_alarm._alarm import _Challenge
alarm = WakeAlarm(demo_mode=True)
alarm._current_challenge = _Challenge(
kind="flash",
display="TESTCODE",
answer="TESTCODE",
hint="Memorise",
)
mock_entry = mock_tk_module.Entry.return_value
mock_entry.get.return_value = "WRONGCODE"
alarm._code_label.configure.reset_mock()
alarm._on_submit()
assert alarm.dismissed is False
# Code label should be reconfigured (code shown again + countdown restarted).
alarm._code_label.configure.assert_called()
alarm._stop_beep.set()
def test_next_round_flash_starts_countdown(
self,
mock_tk_module: MagicMock,
) -> None:
"""When the next-round challenge is flash, the countdown starts immediately."""
from python_pkg.wake_alarm._alarm import _Challenge
alarm = WakeAlarm(demo_mode=True)
alarm._current_challenge = _Challenge(
kind="math", display="2 + 2 = ?", answer="4", hint="test"
)
next_flash = _Challenge(
kind="flash", display="ABCDEFGH", answer="ABCDEFGH", hint="Memorise"
)
mock_entry = mock_tk_module.Entry.return_value
mock_entry.get.return_value = "4"
with patch(
"python_pkg.wake_alarm._alarm._make_challenge", return_value=next_flash
):
alarm._on_submit()
assert alarm._current_challenge.kind == "flash"
assert alarm.dismissed is False
alarm._stop_beep.set()
class TestDismissWithoutSkip:
"""Tests for alarm dismiss without earning skip."""
def test_dismiss_without_skip_shows_no_skip_message(
self,
mock_tk_module: MagicMock,
) -> None:
"""Dismissing with earned_skip=False shows appropriate message."""
del mock_tk_module
alarm = WakeAlarm(demo_mode=True)
mock_widget = MagicMock()
alarm._container.winfo_children.return_value = [mock_widget]
with patch(
"python_pkg.wake_alarm._alarm.save_wake_state",
) as mock_save:
alarm._dismiss_alarm(earned_skip=False)
assert alarm.dismissed is True
mock_save.assert_called_once()
assert mock_save.call_args[1]["skip_workout"] is False
mock_widget.destroy.assert_called_once()
alarm._stop_beep.set()
class TestSkipWindowExpiredMessage:
"""Tests for the on-screen message when the skip window expires."""
def test_expired_updates_status_label(
self,
mock_tk_module: MagicMock,
) -> None:
"""Expiry updates the status label instead of closing the alarm."""
del mock_tk_module
alarm = WakeAlarm(demo_mode=True)
alarm._on_skip_window_expired()
alarm._status_label.configure.assert_called_with(
text="No workout skip today.",
)
alarm._stop_beep.set()

View File

@ -0,0 +1,281 @@
"""Tests for sink management and parse_args in wake alarm."""
from __future__ import annotations
import subprocess
from unittest.mock import MagicMock, patch
from python_pkg.wake_alarm._alarm import _parse_args
from python_pkg.wake_alarm._audio import (
_activate_alarm_audio,
_alarm_sink_present,
_current_default_sink,
_restore_alarm_audio,
_warn_if_no_real_sink,
)
class TestWarnIfNoRealSink:
"""Tests for _warn_if_no_real_sink."""
def test_logs_when_pactl_missing(self) -> None:
"""No pactl on PATH → warns and returns."""
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value=None,
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
) as mock_run,
):
_warn_if_no_real_sink()
mock_run.assert_not_called()
def test_warns_when_only_auto_null(self) -> None:
"""Only auto_null sink → warning is emitted."""
result = MagicMock()
result.stdout = b"4319\tauto_null\tPipeWire\tfloat32le 2ch 48000Hz\tIDLE\n"
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/pactl",
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
return_value=result,
),
patch("python_pkg.wake_alarm._audio._logger") as mock_log,
):
_warn_if_no_real_sink()
mock_log.warning.assert_called()
def test_info_when_real_sink_present(self) -> None:
"""A non-auto_null sink → info log, no warning."""
result = MagicMock()
result.stdout = b"1\talsa_output.pci-0000_01_00.1.hdmi-stereo\tPipeWire\t-\t-\n"
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/pactl",
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
return_value=result,
),
patch("python_pkg.wake_alarm._audio._logger") as mock_log,
):
_warn_if_no_real_sink()
mock_log.info.assert_called()
mock_log.warning.assert_not_called()
def test_handles_pactl_failure(self) -> None:
"""OSError/TimeoutExpired running pactl → warning, no raise."""
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/pactl",
),
patch(
"python_pkg.wake_alarm._audio.subprocess.run",
side_effect=subprocess.TimeoutExpired("pactl", 5),
),
):
_warn_if_no_real_sink() # must not raise
class TestAlarmSinkPresent:
"""Tests for _alarm_sink_present."""
def test_true_when_sink_listed(self) -> None:
"""Returns True when the alarm sink name appears in pactl output."""
from python_pkg.wake_alarm._constants import ALARM_AUDIO_SINK
proc = MagicMock(stdout=ALARM_AUDIO_SINK.encode() + b"\tPipeWire\n")
with patch(
"python_pkg.wake_alarm._audio.subprocess.run",
return_value=proc,
):
assert _alarm_sink_present("/usr/bin/pactl") is True
def test_false_when_sink_absent(self) -> None:
"""Returns False when the alarm sink is not in pactl output."""
proc = MagicMock(stdout=b"auto_null\tPipeWire\n")
with patch(
"python_pkg.wake_alarm._audio.subprocess.run",
return_value=proc,
):
assert _alarm_sink_present("/usr/bin/pactl") is False
def test_false_on_subprocess_error(self) -> None:
"""OSError while listing sinks → False, no raise."""
with patch(
"python_pkg.wake_alarm._audio.subprocess.run",
side_effect=OSError("boom"),
):
assert _alarm_sink_present("/usr/bin/pactl") is False
class TestCurrentDefaultSink:
"""Tests for _current_default_sink."""
def test_returns_sink_name(self) -> None:
"""Returns the trimmed default sink name."""
proc = MagicMock(stdout=b"jbl_sink\n")
with patch(
"python_pkg.wake_alarm._audio.subprocess.run",
return_value=proc,
):
assert _current_default_sink("/usr/bin/pactl") == "jbl_sink"
def test_returns_none_when_empty(self) -> None:
"""Empty output → None."""
proc = MagicMock(stdout=b"\n")
with patch(
"python_pkg.wake_alarm._audio.subprocess.run",
return_value=proc,
):
assert _current_default_sink("/usr/bin/pactl") is None
def test_returns_none_on_error(self) -> None:
"""TimeoutExpired → None, no raise."""
with patch(
"python_pkg.wake_alarm._audio.subprocess.run",
side_effect=subprocess.TimeoutExpired("pactl", 3),
):
assert _current_default_sink("/usr/bin/pactl") is None
class TestActivateAlarmAudio:
"""Tests for _activate_alarm_audio."""
def test_returns_none_when_pactl_missing(self) -> None:
"""No pactl on PATH → returns None without touching audio."""
with (
patch("python_pkg.wake_alarm._audio.shutil.which", return_value=None),
patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run,
):
assert _activate_alarm_audio() is None
mock_run.assert_not_called()
def test_activates_and_returns_old_default(self) -> None:
"""Sink present → routes audio there and returns prior default sink."""
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/pactl",
),
patch(
"python_pkg.wake_alarm._audio._alarm_sink_present",
return_value=True,
),
patch(
"python_pkg.wake_alarm._audio._current_default_sink",
return_value="jbl_sink",
),
patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run,
):
result = _activate_alarm_audio()
assert result == "jbl_sink"
cmds = [call.args[0] for call in mock_run.call_args_list]
from python_pkg.wake_alarm._constants import (
ALARM_AUDIO_CARD,
ALARM_AUDIO_PROFILE,
ALARM_AUDIO_SINK,
)
assert [
"/usr/bin/pactl",
"set-card-profile",
ALARM_AUDIO_CARD,
ALARM_AUDIO_PROFILE,
] in cmds
assert ["/usr/bin/pactl", "set-default-sink", ALARM_AUDIO_SINK] in cmds
def test_returns_none_when_sink_never_appears(self) -> None:
"""Sink never shows up → returns None after polling (no raise)."""
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/pactl",
),
patch(
"python_pkg.wake_alarm._audio._alarm_sink_present",
return_value=False,
),
patch("python_pkg.wake_alarm._audio.time.sleep") as mock_sleep,
patch("python_pkg.wake_alarm._audio.subprocess.run"),
):
assert _activate_alarm_audio() is None
mock_sleep.assert_called()
def test_waits_then_succeeds(self) -> None:
"""Sink absent then present → sleeps once, then routes audio."""
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/pactl",
),
patch(
"python_pkg.wake_alarm._audio._alarm_sink_present",
side_effect=[False, True],
),
patch(
"python_pkg.wake_alarm._audio._current_default_sink",
return_value="old",
),
patch("python_pkg.wake_alarm._audio.time.sleep") as mock_sleep,
patch("python_pkg.wake_alarm._audio.subprocess.run"),
):
assert _activate_alarm_audio() == "old"
mock_sleep.assert_called_once()
class TestRestoreAlarmAudio:
"""Tests for _restore_alarm_audio."""
def test_none_is_noop(self) -> None:
"""None default → does nothing, no pactl lookup."""
with patch("python_pkg.wake_alarm._audio.shutil.which") as mock_which:
_restore_alarm_audio(None)
mock_which.assert_not_called()
def test_no_pactl_returns_silently(self) -> None:
"""Default present but pactl missing → no raise, no run."""
with (
patch("python_pkg.wake_alarm._audio.shutil.which", return_value=None),
patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run,
):
_restore_alarm_audio("jbl_sink")
mock_run.assert_not_called()
def test_restores_default_sink(self) -> None:
"""Calls set-default-sink with the captured prior default."""
with (
patch(
"python_pkg.wake_alarm._audio.shutil.which",
return_value="/usr/bin/pactl",
),
patch("python_pkg.wake_alarm._audio.subprocess.run") as mock_run,
):
_restore_alarm_audio("jbl_sink")
cmds = [call.args[0] for call in mock_run.call_args_list]
assert ["/usr/bin/pactl", "set-default-sink", "jbl_sink"] in cmds
class TestParseArgs:
"""Tests for _parse_args."""
def test_default_flags_are_false(self) -> None:
"""No CLI args means every flag is False."""
ns = _parse_args([])
assert ns.demo is False
assert ns.trigger_now is False
assert ns.production is False
def test_flags_parse(self) -> None:
"""Each flag flips to True when passed."""
ns = _parse_args(["--production", "--demo", "--trigger-now"])
assert ns.production is True
assert ns.demo is True
assert ns.trigger_now is True