From 3109d6053cd204df0344705cf89deff3e9cbd0f7 Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Sat, 23 May 2026 19:51:26 +0200 Subject: [PATCH] wake_alarm: Tapo P110 plug control, fan ramp via sudo, loud warnings - Add python-kasa-based smart-plug control (_smart_plug.py) with turn_on_plug / turn_off_plug called around the alarm window. Reads ~/.config/wake_alarm/tapo.json (host/email/password). - Hard timeout (TAPO_TIMEOUT_SECONDS) so plug never blocks the alarm. - Install fan-control script + sudoers entry (install.sh step 6); _max_fans / _restore_fans now invoke it via /usr/bin/sudo -n so pwm1_enable writes succeed. - Remove ntfy.sh push notifications entirely (silent no-op was useless). - Replace every silent skip with _logger.warning() so failures are loud: missing xset / xrandr / speaker-test, unreadable hwmon files, fan script errors, missing Tapo config, kasa import failure, etc. - wake-alarm.service: Restart=on-failure with 10s backoff. - Tests: 100% line+branch coverage on python_pkg/wake_alarm. --- wake_alarm/_alarm.py | 208 +++++++++++++++- wake_alarm/_constants.py | 11 + wake_alarm/_smart_plug.py | 155 ++++++++++++ wake_alarm/install.sh | 45 +++- wake_alarm/tests/test_alarm.py | 322 ++++++++++++++++++++++++ wake_alarm/tests/test_alarm_part2.py | 102 +++++++- wake_alarm/tests/test_smart_plug.py | 351 +++++++++++++++++++++++++++ wake_alarm/wake-alarm-fans.sh | 52 ++++ wake_alarm/wake-alarm.service | 3 +- wake_alarm/wake_state.json | 4 +- 10 files changed, 1240 insertions(+), 13 deletions(-) create mode 100644 wake_alarm/_smart_plug.py create mode 100644 wake_alarm/tests/test_smart_plug.py create mode 100755 wake_alarm/wake-alarm-fans.sh diff --git a/wake_alarm/_alarm.py b/wake_alarm/_alarm.py index bfbf6b4..0b09e8c 100644 --- a/wake_alarm/_alarm.py +++ b/wake_alarm/_alarm.py @@ -10,6 +10,8 @@ from __future__ import annotations from datetime import datetime, timezone import logging +import os +from pathlib import Path import secrets import shutil import string @@ -30,6 +32,7 @@ from python_pkg.wake_alarm._constants import ( PHASE_SOFT_END, SOFT_BEEP_INTERVAL, ) +from python_pkg.wake_alarm._smart_plug import turn_off_plug, turn_on_plug from python_pkg.wake_alarm._state import ( save_wake_state, was_alarm_dismissed_today, @@ -52,6 +55,7 @@ def _wake_display() -> None: """Force the display on and disable screensaver during alarm.""" xset = shutil.which("xset") if xset is None: + _logger.warning("xset not on PATH; skipping display wake") return for cmd in ( [xset, "dpms", "force", "on"], @@ -64,6 +68,7 @@ def _restore_display() -> None: """Re-enable screensaver after the alarm ends.""" xset = shutil.which("xset") if xset is None: + _logger.warning("xset not on PATH; skipping display restore") return subprocess.run( [xset, "s", "on"], @@ -88,6 +93,175 @@ def _speaker_test_path() -> str: return path +# Extra PipeWire sinks to always play alarm audio on (alongside the default). +# alsa_output...hdmi-stereo = GA102 → G27Q (has built-in speaker, always on). +_EXTRA_PIPEWIRE_SINKS: tuple[str, ...] = ("alsa_output.pci-0000_01_00.1.hdmi-stereo",) + + +def _play_on_extra_devices(frequency: int) -> None: + """Fire-and-forget: play a sine tone on each extra PipeWire sink.""" + try: + path = _speaker_test_path() + except FileNotFoundError: + _logger.warning("speaker-test missing; skipping extra-device beep") + return + for sink in _EXTRA_PIPEWIRE_SINKS: + _play_tone_on_sink(path, sink, frequency) + + +def _play_tone_on_sink(path: str, sink: str, frequency: int) -> None: + """Launch speaker-test for *sink*; log a warning on OSError.""" + try: + subprocess.Popen( + [path, "-t", "sine", "-f", str(frequency), "-l", "1"], + env={**os.environ, "PIPEWIRE_NODE": sink}, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + except OSError: + _logger.warning("Failed to play tone on sink %s", sink, exc_info=True) + + +# NCT Super I/O chip names that expose a single pwm1 fan control channel. +_NCT_CHIP_NAMES: frozenset[str] = frozenset( + { + "nct6775", + "nct6779", + "nct6791", + "nct6792", + "nct6793", + "nct6795", + "nct6796", + "nct6797", + "nct6798", + "nct6799", + } +) + +# Installed by install.sh, controlled via sudoers NOPASSWD entry. +_FAN_SCRIPT: str = "/usr/local/bin/wake-alarm-fans.sh" +_SUDO_BIN: str = "/usr/bin/sudo" + + +def _find_fan_hwmon() -> str | None: + """Return the hwmon directory for an NCT fan controller, or None.""" + for name_path in Path("/sys/class/hwmon").glob("hwmon*/name"): + try: + chip = name_path.read_text().strip() + except OSError: + _logger.warning("Could not read %s", name_path, exc_info=True) + continue + if chip in _NCT_CHIP_NAMES: + return str(name_path.parent) + _logger.warning( + "No NCT super-I/O hwmon entry found; fan ramp will be skipped", + ) + return None + + +def _max_fans() -> tuple[str, str, str] | None: + """Ramp all NCT fans to 100% speed for maximum audible noise. + + Saves the current pwm1 values so they can be restored after the alarm. + Safe: higher fan speed only lowers temperatures, never damages hardware. + + Returns: + (hwmon_dir, old_enable, old_pwm) tuple to pass to _restore_fans(), + or None when fan control is unavailable. + """ + hwmon = _find_fan_hwmon() + if hwmon is None: + return None + try: + old_enable = (Path(hwmon) / "pwm1_enable").read_text().strip() + old_pwm = (Path(hwmon) / "pwm1").read_text().strip() + except OSError: + _logger.warning( + "Could not read pwm1/pwm1_enable in %s; skipping fan ramp", + hwmon, + exc_info=True, + ) + return None + try: + result = subprocess.run( + [_SUDO_BIN, "-n", _FAN_SCRIPT, "max"], + check=False, + capture_output=True, + timeout=5, + ) + except (OSError, subprocess.TimeoutExpired): + _logger.warning( + "Fan script %s not runnable; skipping fan ramp", + _FAN_SCRIPT, + exc_info=True, + ) + return None + if result.returncode != 0: + _logger.warning( + "Fan script %s exited %d: %s", + _FAN_SCRIPT, + result.returncode, + result.stderr.decode(errors="replace").strip(), + ) + return None + return (hwmon, old_enable, old_pwm) + + +def _restore_fans(state: tuple[str, str, str] | None) -> None: + """Restore fan speed to the values saved by _max_fans().""" + if state is None: + return + _hwmon, old_enable, old_pwm = state + try: + subprocess.run( + [_SUDO_BIN, "-n", _FAN_SCRIPT, "restore", old_enable, old_pwm], + check=False, + capture_output=True, + timeout=5, + ) + except (OSError, subprocess.TimeoutExpired): + _logger.warning( + "Failed to restore fan state via %s", + _FAN_SCRIPT, + exc_info=True, + ) + + +def _set_max_brightness() -> None: + """Set all connected monitors to maximum brightness via xrandr.""" + xrandr = shutil.which("xrandr") + if xrandr is None: + _logger.warning("xrandr not on PATH; skipping max-brightness") + return + try: + result = subprocess.run( + [xrandr, "--query"], + capture_output=True, + text=True, + timeout=5, + check=False, + ) + except (OSError, subprocess.TimeoutExpired): + _logger.warning("xrandr --query failed; skipping max-brightness", exc_info=True) + return + for line in result.stdout.splitlines(): + if " connected" in line: + output = line.split()[0] + try: + subprocess.run( + [xrandr, "--output", output, "--brightness", "1.0"], + check=False, + capture_output=True, + timeout=5, + ) + except (OSError, subprocess.TimeoutExpired): + _logger.warning( + "Failed to set brightness on %s", + output, + exc_info=True, + ) + + def _beep_medium(frequency: int = 1000) -> None: """Play a medium beep via speaker-test (sine wave, short).""" try: @@ -106,6 +280,10 @@ def _beep_medium(frequency: int = 1000) -> None: check=False, ) except (OSError, subprocess.TimeoutExpired): + _logger.warning( + "_beep_medium subprocess failed; falling back to soft beep", + exc_info=True, + ) _beep_soft() @@ -127,6 +305,10 @@ def _beep_loud(frequency: int = 1000) -> None: check=False, ) except (OSError, subprocess.TimeoutExpired): + _logger.warning( + "_beep_loud subprocess failed; falling back to soft beep", + exc_info=True, + ) _beep_soft() @@ -166,6 +348,9 @@ class WakeAlarm: self._schedule_code_refresh() self._schedule_dismiss_window_close() self._start_beep_thread() + self._fan_state: tuple[str, str, str] | None = _max_fans() + self._flash_on: bool = False + self._start_screen_flash() def _build_ui(self) -> None: """Build the dismiss challenge UI.""" @@ -268,7 +453,9 @@ class WakeAlarm: def _close(self) -> None: """Close the alarm window.""" self._stop_beep.set() + _restore_fans(self._fan_state) _restore_display() + turn_off_plug() self.root.destroy() def _schedule_code_refresh(self) -> None: @@ -310,12 +497,14 @@ class WakeAlarm: def _close_and_schedule_fallback(self) -> None: """Close the window and schedule the 1 PM fallback alarm.""" + _restore_fans(self._fan_state) _restore_display() + turn_off_plug() self.root.destroy() def _update_timer(self) -> None: """Update the remaining time display.""" - if self.dismissed: + if not self._active: return elapsed = time.monotonic() - self._alarm_start window = DISMISS_WINDOW_MINUTES * 60 if not self.demo_mode else 30 @@ -336,19 +525,34 @@ class WakeAlarm: ) self._beep_thread.start() + def _start_screen_flash(self) -> None: + """Start flashing the screen background to attract attention.""" + self._flash_step() + + def _flash_step(self) -> None: + """Alternate background colour every 750 ms (below seizure-risk 3 Hz).""" + if not self._active: + return + self.root.configure(bg="#ff0000" if self._flash_on else "#1a1a1a") + self._flash_on = not self._flash_on + self.root.after(750, self._flash_step) + def _beep_loop(self) -> None: """Escalating beep loop running in background thread.""" while not self._stop_beep.is_set(): elapsed_minutes = (time.monotonic() - self._alarm_start) / 60.0 if elapsed_minutes < PHASE_SOFT_END: + _play_on_extra_devices(440) _beep_soft() self._stop_beep.wait(SOFT_BEEP_INTERVAL) elif elapsed_minutes < PHASE_MEDIUM_END: + _play_on_extra_devices(1000) _beep_medium() self._stop_beep.wait(MEDIUM_BEEP_INTERVAL) else: freq = 800 if int(elapsed_minutes * 10) % 2 == 0 else 1200 + _play_on_extra_devices(freq) _beep_loud(freq) self._stop_beep.wait(LOUD_TOGGLE_INTERVAL) @@ -380,6 +584,8 @@ def main() -> None: demo_mode = "--demo" in sys.argv _wake_display() + _set_max_brightness() + turn_on_plug() alarm = WakeAlarm(demo_mode=demo_mode) alarm.run() diff --git a/wake_alarm/_constants.py b/wake_alarm/_constants.py index 7d53c9a..c85763c 100644 --- a/wake_alarm/_constants.py +++ b/wake_alarm/_constants.py @@ -37,3 +37,14 @@ WAKE_STATE_FILE: Path = Path(__file__).resolve().parent / "wake_state.json" # rtcwake binary path RTCWAKE_BIN: str = "/usr/sbin/rtcwake" + +# TP-Link Tapo P110 smart-plug config file (JSON). +# Create with mode 0600 and these keys: host, email, password. +# Example contents: a JSON object mapping host -> "192.168.x.x", email -> +# "tapo@example.com" and password -> "your-password". +# Missing/invalid file => smart-plug control is skipped silently. +TAPO_CONFIG_FILE: Path = Path.home() / ".config" / "wake_alarm" / "tapo.json" + +# Timeout (seconds) for a single Tapo plug operation. Keep short so a +# missing/unreachable plug never delays the alarm by more than this. +TAPO_TIMEOUT_SECONDS: float = 5.0 diff --git a/wake_alarm/_smart_plug.py b/wake_alarm/_smart_plug.py new file mode 100644 index 0000000..271c588 --- /dev/null +++ b/wake_alarm/_smart_plug.py @@ -0,0 +1,155 @@ +"""TP-Link Tapo P110 smart-plug control for the wake alarm. + +Config file ``~/.config/wake_alarm/tapo.json`` (mode 0600) must contain:: + + { + "host": "192.168.x.x", + "email": "tapo-account@example.com", + "password": "tapo-account-password", + } + +If the file is missing, malformed, the ``kasa`` package is unavailable, or +the plug cannot be reached within :data:`TAPO_TIMEOUT_SECONDS`, the +operation is skipped with a WARNING log entry — the alarm must never +block on the plug. +""" + +from __future__ import annotations + +import asyncio +import contextlib +import json +import logging +from typing import TYPE_CHECKING + +from python_pkg.wake_alarm._constants import ( + TAPO_CONFIG_FILE, + TAPO_TIMEOUT_SECONDS, +) + +if TYPE_CHECKING: + from kasa import Device + +_logger = logging.getLogger(__name__) + +# ``kasa`` is an optional runtime dependency. Import at module load time so +# we fail fast if it is missing rather than re-importing on every call. +try: + from kasa import Credentials, Discover + from kasa.exceptions import KasaException + + _KASA_AVAILABLE = True +except ImportError: + _KASA_AVAILABLE = False + _logger.warning( + "python-kasa is not installed; Tapo smart-plug control disabled", + ) + + +def _load_config() -> dict[str, str] | None: + """Return validated Tapo config from :data:`TAPO_CONFIG_FILE`, or ``None``. + + Returns: + ``None`` if the file is missing, unreadable, malformed, or missing + any of the required keys. Otherwise a dict with ``host``, ``email``, + ``password``. + """ + try: + with TAPO_CONFIG_FILE.open(encoding="utf-8") as fh: + data = json.load(fh) + except FileNotFoundError: + _logger.warning( + "Tapo config %s does not exist; smart-plug control disabled", + TAPO_CONFIG_FILE, + ) + return None + except (OSError, json.JSONDecodeError): + _logger.warning( + "Tapo config %s is unreadable or malformed; skipping plug control", + TAPO_CONFIG_FILE, + exc_info=True, + ) + return None + if not isinstance(data, dict): + _logger.warning( + "Tapo config %s is not a JSON object; skipping plug control", + TAPO_CONFIG_FILE, + ) + return None + required = ("host", "email", "password") + if not all(isinstance(data.get(k), str) and data[k] for k in required): + _logger.warning( + "Tapo config %s missing required keys %s; skipping plug control", + TAPO_CONFIG_FILE, + required, + ) + return None + return {k: data[k] for k in required} + + +async def _connect(config: dict[str, str]) -> Device | None: + """Open a connection to the configured plug, or ``None`` on failure.""" + try: + dev = await Discover.discover_single( + config["host"], + credentials=Credentials(config["email"], config["password"]), + ) + except (KasaException, OSError, asyncio.TimeoutError): + _logger.warning("Tapo plug discovery failed", exc_info=True) + return None + try: + await dev.update() + except (KasaException, OSError, asyncio.TimeoutError): + _logger.warning("Tapo plug update failed", exc_info=True) + with contextlib.suppress(KasaException, OSError): + await dev.disconnect() + return None + return dev + + +async def _set_state(*, on: bool) -> None: + """Connect to the plug and set its on/off state.""" + config = _load_config() + if config is None: + return + dev = await _connect(config) + if dev is None: + return + try: + if on: + await dev.turn_on() + else: + await dev.turn_off() + except (KasaException, OSError, asyncio.TimeoutError): + _logger.warning("Tapo plug toggle failed", exc_info=True) + finally: + with contextlib.suppress(KasaException, OSError): + await dev.disconnect() + + +def _run(*, on: bool) -> None: + """Run :func:`_set_state` with a hard timeout. Never raises.""" + if not _KASA_AVAILABLE: + _logger.warning( + "python-kasa unavailable; skipping Tapo plug %s", + "ON" if on else "OFF", + ) + return + + async def _runner() -> None: + await asyncio.wait_for(_set_state(on=on), timeout=TAPO_TIMEOUT_SECONDS) + + try: + asyncio.run(_runner()) + except (asyncio.TimeoutError, OSError, RuntimeError): + _logger.warning("Tapo plug control timed out or failed", exc_info=True) + + +def turn_on_plug() -> None: + """Turn the configured Tapo plug on. Logs a WARNING if not configured.""" + _run(on=True) + + +def turn_off_plug() -> None: + """Turn the configured Tapo plug off. Logs a WARNING if not configured.""" + _run(on=False) diff --git a/wake_alarm/install.sh b/wake_alarm/install.sh index 831484f..a325172 100755 --- a/wake_alarm/install.sh +++ b/wake_alarm/install.sh @@ -9,6 +9,8 @@ # 3. Installs the systemd-sleep hook (restarts alarm after hibernate resume) # 4. Adds a sudoers entry for passwordless rtcwake # 5. Installs shutdown wrapper so "shutdown now" also hibernates on alarm nights +# 6. Installs fan-control script so alarm can max fans on wake +# 7. Installs python-kasa (AUR) so the alarm can toggle a Tapo P110 smart plug set -euo pipefail @@ -16,6 +18,8 @@ SCRIPT_DIR="$(dirname "$(readlink -f "$0")")" SERVICE_FILE="$SCRIPT_DIR/wake-alarm.service" SLEEP_HOOK_SRC="$SCRIPT_DIR/sleep-hook.sh" SHUTDOWN_WRAPPER_SRC="$SCRIPT_DIR/shutdown-wrapper.sh" +FANS_SCRIPT_SRC="$SCRIPT_DIR/wake-alarm-fans.sh" +FANS_SCRIPT_DST="/usr/local/bin/wake-alarm-fans.sh" SYSTEMD_USER_DIR="$HOME/.config/systemd/user" SLEEP_HOOK_DST="/usr/lib/systemd/system-sleep/wake-alarm.sh" SHUTDOWN_WRAPPER_DST="/usr/local/bin/shutdown" @@ -25,7 +29,7 @@ RTCWAKE_BIN="/usr/sbin/rtcwake" echo "=== Weekend Wake Alarm Installer ===" # 0. Install system dependencies -echo "[0/5] Checking system dependencies..." +echo "[0/7] Checking system dependencies..." if ! command -v speaker-test &>/dev/null; then echo " Installing alsa-utils (required for speaker-test)..." sudo pacman -S --noconfirm alsa-utils @@ -34,25 +38,25 @@ else fi # 1. Install systemd user service -echo "[1/5] Installing systemd user service..." +echo "[1/7] Installing systemd user service..." mkdir -p "$SYSTEMD_USER_DIR" cp "$SERVICE_FILE" "$SYSTEMD_USER_DIR/wake-alarm.service" systemctl --user daemon-reload echo " Installed to $SYSTEMD_USER_DIR/wake-alarm.service" # 2. Enable service -echo "[2/5] Enabling wake-alarm.service..." +echo "[2/7] Enabling wake-alarm.service..." systemctl --user enable wake-alarm.service echo " Service enabled (will start on next boot)" # 3. Install systemd-sleep hook (restarts alarm after hibernate resume) -echo "[3/5] Installing systemd-sleep hook..." +echo "[3/7] Installing systemd-sleep hook..." sudo cp "$SLEEP_HOOK_SRC" "$SLEEP_HOOK_DST" sudo chmod 0755 "$SLEEP_HOOK_DST" echo " Installed to $SLEEP_HOOK_DST" # 4. Add sudoers entry for rtcwake (requires root) -echo "[4/5] Setting up sudoers for rtcwake..." +echo "[4/7] Setting up sudoers for rtcwake..." SUDOERS_LINE="$USER ALL=(root) NOPASSWD: $RTCWAKE_BIN" if [[ -f "$SUDOERS_FILE" ]] && grep -qF "$SUDOERS_LINE" "$SUDOERS_FILE"; then echo " Sudoers entry already exists" @@ -64,13 +68,42 @@ else fi # 5. Install shutdown wrapper (/usr/local/bin/shutdown shadows /usr/bin/shutdown) -echo "[5/5] Installing shutdown wrapper..." +echo "[5/7] Installing shutdown wrapper..." sudo cp "$SHUTDOWN_WRAPPER_SRC" "$SHUTDOWN_WRAPPER_DST" sudo chmod 0755 "$SHUTDOWN_WRAPPER_DST" echo " Installed to $SHUTDOWN_WRAPPER_DST" echo " 'shutdown now' will now hibernate (not poweroff) on alarm nights." +# 6. Install fan-control script and its sudoers entry +echo "[6/7] Installing fan-control script..." +sudo cp "$FANS_SCRIPT_SRC" "$FANS_SCRIPT_DST" +sudo chmod 0755 "$FANS_SCRIPT_DST" +FANS_SUDOERS_LINE="$USER ALL=(root) NOPASSWD: $FANS_SCRIPT_DST" +if [[ -f "$SUDOERS_FILE" ]] && grep -qF "$FANS_SUDOERS_LINE" "$SUDOERS_FILE"; then + echo " Fan sudoers entry already exists" +else + # Append to existing file (or create) + echo "$FANS_SUDOERS_LINE" | sudo tee -a "$SUDOERS_FILE" > /dev/null + sudo chmod 0440 "$SUDOERS_FILE" + echo " Added fan sudoers entry" +fi + +# 7. Install python-kasa (AUR) for TP-Link Tapo P110 smart-plug control +echo "[7/7] Installing python-kasa (AUR)..." +if python -c 'import kasa' 2>/dev/null; then + echo " python-kasa already installed" +elif command -v yay &>/dev/null; then + yay -S --noconfirm --needed python-kasa +else + echo " WARNING: yay not found; install python-kasa manually for smart-plug support" >&2 +fi +if [[ ! -f "$HOME/.config/wake_alarm/tapo.json" ]]; then + echo " NOTE: ~/.config/wake_alarm/tapo.json not found — smart-plug control is disabled." + echo " Create it (mode 0600) with keys: host, email, password." +fi + echo "=== Installation complete ===" echo "The wake alarm will activate on boot for alarm days (Mon, Fri, Sat, Sun)." echo "After hibernate resume the sleep hook will restart the alarm service." +echo "Fans will ramp to 100% while the alarm is active, then restore automatically." echo "To test now: python -m python_pkg.wake_alarm._alarm --demo" diff --git a/wake_alarm/tests/test_alarm.py b/wake_alarm/tests/test_alarm.py index 21c6375..2a5ecec 100644 --- a/wake_alarm/tests/test_alarm.py +++ b/wake_alarm/tests/test_alarm.py @@ -2,6 +2,8 @@ from __future__ import annotations +import pathlib +import subprocess import tkinter as tk from typing import TYPE_CHECKING from unittest.mock import MagicMock, patch @@ -15,9 +17,14 @@ from python_pkg.wake_alarm._alarm import ( _beep_loud, _beep_medium, _beep_soft, + _find_fan_hwmon, _generate_code, _is_alarm_day, + _max_fans, + _play_on_extra_devices, _restore_display, + _restore_fans, + _set_max_brightness, _should_run_alarm, _speaker_test_path, _wake_display, @@ -372,3 +379,318 @@ class TestDisplayHelpers: ): _restore_display() mock_run.assert_not_called() + + +class TestPlayOnExtraDevices: + """Tests for _play_on_extra_devices.""" + + def test_popen_called_for_each_device(self) -> None: + """_play_on_extra_devices spawns speaker-test with PIPEWIRE_NODE set.""" + with ( + patch( + "python_pkg.wake_alarm._alarm._speaker_test_path", + return_value="/usr/bin/speaker-test", + ), + patch("python_pkg.wake_alarm._alarm.subprocess.Popen") as mock_popen, + ): + _play_on_extra_devices(1000) + mock_popen.assert_called_once() + args = mock_popen.call_args[0][0] + env = mock_popen.call_args.kwargs["env"] + assert "/usr/bin/speaker-test" in args + assert "-D" not in args + assert "1000" in args + assert "PIPEWIRE_NODE" in env + assert "alsa_output" in env["PIPEWIRE_NODE"] + + def test_noop_when_speaker_test_missing(self) -> None: + """_play_on_extra_devices does nothing when speaker-test is absent.""" + with ( + patch( + "python_pkg.wake_alarm._alarm._speaker_test_path", + side_effect=FileNotFoundError("not found"), + ), + patch("python_pkg.wake_alarm._alarm.subprocess.Popen") as mock_popen, + ): + _play_on_extra_devices(1000) + mock_popen.assert_not_called() + + def test_ignores_oserror_on_popen(self) -> None: + """_play_on_extra_devices silently ignores OSError from Popen.""" + with ( + patch( + "python_pkg.wake_alarm._alarm._speaker_test_path", + return_value="/usr/bin/speaker-test", + ), + patch( + "python_pkg.wake_alarm._alarm.subprocess.Popen", + side_effect=OSError("device busy"), + ), + ): + _play_on_extra_devices(1000) # must not raise + + +class TestFindFanHwmon: + """Tests for _find_fan_hwmon.""" + + def test_returns_none_when_no_hwmon(self, monkeypatch: pytest.MonkeyPatch) -> None: + """No hwmon entries → returns None.""" + monkeypatch.setattr(pathlib.Path, "glob", lambda _s, _p: iter([])) + assert _find_fan_hwmon() is None + + def test_returns_none_for_unknown_chip( + self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Non-NCT chip name → returns None.""" + name_file = tmp_path / "name" + name_file.write_text("unknown_chip\n") + monkeypatch.setattr(pathlib.Path, "glob", lambda _s, _p: iter([name_file])) + assert _find_fan_hwmon() is None + + def test_returns_hwmon_dir_for_nct_chip( + self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """NCT chip name → returns the hwmon directory path.""" + name_file = tmp_path / "name" + name_file.write_text("nct6799\n") + monkeypatch.setattr(pathlib.Path, "glob", lambda _s, _p: iter([name_file])) + result = _find_fan_hwmon() + assert result == str(tmp_path) + + def test_skips_unreadable_name_file(self, monkeypatch: pytest.MonkeyPatch) -> None: + """OSError on read → skips and returns None.""" + bad_path = MagicMock(spec=pathlib.Path) + bad_path.read_text.side_effect = OSError("unreadable") + monkeypatch.setattr(pathlib.Path, "glob", lambda _s, _p: iter([bad_path])) + assert _find_fan_hwmon() is None + + +class TestMaxFans: + """Tests for _max_fans.""" + + def test_returns_none_when_no_hwmon(self) -> None: + """No fan controller → returns None immediately.""" + with patch("python_pkg.wake_alarm._alarm._find_fan_hwmon", return_value=None): + assert _max_fans() is None + + def test_returns_none_on_oserror_reading_pwm(self, tmp_path: pathlib.Path) -> None: + """Missing pwm files → returns None.""" + hwmon_dir = str(tmp_path) + with patch( + "python_pkg.wake_alarm._alarm._find_fan_hwmon", return_value=hwmon_dir + ): + assert _max_fans() is None + + def test_returns_none_on_script_oserror(self, tmp_path: pathlib.Path) -> None: + """OSError running fan script → returns None.""" + (tmp_path / "pwm1_enable").write_text("5\n") + (tmp_path / "pwm1").write_text("165\n") + with ( + patch( + "python_pkg.wake_alarm._alarm._find_fan_hwmon", + return_value=str(tmp_path), + ), + patch( + "python_pkg.wake_alarm._alarm.subprocess.run", + side_effect=OSError("not found"), + ), + ): + assert _max_fans() is None + + def test_returns_none_on_script_timeout(self, tmp_path: pathlib.Path) -> None: + """TimeoutExpired running fan script → returns None.""" + (tmp_path / "pwm1_enable").write_text("5\n") + (tmp_path / "pwm1").write_text("165\n") + with ( + patch( + "python_pkg.wake_alarm._alarm._find_fan_hwmon", + return_value=str(tmp_path), + ), + patch( + "python_pkg.wake_alarm._alarm.subprocess.run", + side_effect=subprocess.TimeoutExpired("fan", 5), + ), + ): + assert _max_fans() is None + + def test_returns_none_on_nonzero_returncode(self, tmp_path: pathlib.Path) -> None: + """Fan script exits non-zero → returns None.""" + (tmp_path / "pwm1_enable").write_text("5\n") + (tmp_path / "pwm1").write_text("165\n") + mock_result = MagicMock() + mock_result.returncode = 1 + with ( + patch( + "python_pkg.wake_alarm._alarm._find_fan_hwmon", + return_value=str(tmp_path), + ), + patch( + "python_pkg.wake_alarm._alarm.subprocess.run", return_value=mock_result + ), + ): + assert _max_fans() is None + + def test_returns_state_on_success(self, tmp_path: pathlib.Path) -> None: + """Successful run → returns (hwmon, old_enable, old_pwm).""" + (tmp_path / "pwm1_enable").write_text("5\n") + (tmp_path / "pwm1").write_text("165\n") + mock_result = MagicMock() + mock_result.returncode = 0 + with ( + patch( + "python_pkg.wake_alarm._alarm._find_fan_hwmon", + return_value=str(tmp_path), + ), + patch( + "python_pkg.wake_alarm._alarm.subprocess.run", return_value=mock_result + ), + ): + result = _max_fans() + assert result == (str(tmp_path), "5", "165") + + +class TestRestoreFans: + """Tests for _restore_fans.""" + + def test_noop_when_state_is_none(self) -> None: + """None state → subprocess.run is never called.""" + with patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run: + _restore_fans(None) + mock_run.assert_not_called() + + def test_calls_fan_script_with_saved_values(self) -> None: + """Saved state → fan script called with restore + old values.""" + with patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run: + mock_run.return_value.returncode = 0 + _restore_fans(("/sys/class/hwmon/hwmon6", "5", "165")) + mock_run.assert_called_once() + args = mock_run.call_args[0][0] + assert "restore" in args + assert "5" in args + assert "165" in args + + def test_ignores_oserror_on_restore(self) -> None: + """OSError from fan script is silently suppressed.""" + with patch( + "python_pkg.wake_alarm._alarm.subprocess.run", + side_effect=OSError("no script"), + ): + _restore_fans(("/sys/class/hwmon/hwmon6", "5", "165")) # must not raise + + def test_ignores_timeout_on_restore(self) -> None: + """TimeoutExpired from fan script is silently suppressed.""" + with patch( + "python_pkg.wake_alarm._alarm.subprocess.run", + side_effect=subprocess.TimeoutExpired("fan", 5), + ): + _restore_fans(("/sys/class/hwmon/hwmon6", "5", "165")) # must not raise + + +class TestSetMaxBrightness: + """Tests for _set_max_brightness.""" + + def test_noop_when_xrandr_missing(self) -> None: + """No xrandr on PATH → subprocess.run never called.""" + with ( + patch("python_pkg.wake_alarm._alarm.shutil.which", return_value=None), + patch("python_pkg.wake_alarm._alarm.subprocess.run") as mock_run, + ): + _set_max_brightness() + mock_run.assert_not_called() + + def test_noop_on_oserror_from_query(self) -> None: + """OSError from xrandr --query is suppressed.""" + with ( + patch( + "python_pkg.wake_alarm._alarm.shutil.which", + return_value="/usr/bin/xrandr", + ), + patch( + "python_pkg.wake_alarm._alarm.subprocess.run", + side_effect=OSError("no display"), + ), + ): + _set_max_brightness() # must not raise + + def test_noop_on_timeout_from_query(self) -> None: + """TimeoutExpired from xrandr --query is suppressed.""" + with ( + patch( + "python_pkg.wake_alarm._alarm.shutil.which", + return_value="/usr/bin/xrandr", + ), + patch( + "python_pkg.wake_alarm._alarm.subprocess.run", + side_effect=subprocess.TimeoutExpired("xrandr", 5), + ), + ): + _set_max_brightness() # must not raise + + def test_sets_brightness_for_connected_displays(self) -> None: + """Connected displays each get an --output --brightness call.""" + mock_query_result = MagicMock() + mock_query_result.stdout = ( + "HDMI-0 connected 2560x1440+0+0\nDP-0 connected primary\n" + ) + call_args_list: list[list[str]] = [] + + def fake_run(args: list[str], **_kwargs: object) -> MagicMock: + call_args_list.append(args) + return mock_query_result + + with ( + patch( + "python_pkg.wake_alarm._alarm.shutil.which", + return_value="/usr/bin/xrandr", + ), + patch("python_pkg.wake_alarm._alarm.subprocess.run", side_effect=fake_run), + ): + _set_max_brightness() + + # First call is --query; subsequent calls set brightness for each output. + brightness_calls = [a for a in call_args_list if "--brightness" in a] + expected_brightness_calls = 2 + assert len(brightness_calls) == expected_brightness_calls + + def test_skips_disconnected_outputs(self) -> None: + """Disconnected outputs do NOT get a brightness call.""" + mock_result = MagicMock() + mock_result.stdout = "Screen 0: minimum 320\nHDMI-0 disconnected\n" + with ( + patch( + "python_pkg.wake_alarm._alarm.shutil.which", + return_value="/usr/bin/xrandr", + ), + patch( + "python_pkg.wake_alarm._alarm.subprocess.run", + return_value=mock_result, + ) as mock_run, + ): + _set_max_brightness() + # Only the --query call, no brightness calls. + assert mock_run.call_count == 1 + + def test_warns_when_brightness_call_fails(self) -> None: + """OSError on per-output --brightness call is logged but swallowed.""" + query_result = MagicMock() + query_result.stdout = ( + "Screen 0: minimum 320\nHDMI-0 connected primary 1920x1080\n" + ) + + def _run_side_effect(args: list[str], **_kwargs: object) -> MagicMock: + if "--query" in args: + return query_result + msg = "permission denied" + raise OSError(msg) + + with ( + patch( + "python_pkg.wake_alarm._alarm.shutil.which", + return_value="/usr/bin/xrandr", + ), + patch( + "python_pkg.wake_alarm._alarm.subprocess.run", + side_effect=_run_side_effect, + ), + ): + _set_max_brightness() # must not raise diff --git a/wake_alarm/tests/test_alarm_part2.py b/wake_alarm/tests/test_alarm_part2.py index 5e94c25..a65ac29 100644 --- a/wake_alarm/tests/test_alarm_part2.py +++ b/wake_alarm/tests/test_alarm_part2.py @@ -48,6 +48,20 @@ def _block_real_tk() -> Generator[MagicMock]: yield mock +@pytest.fixture(autouse=True) +def _block_extra_devices() -> Generator[MagicMock]: + """Prevent real subprocess.Popen calls for extra ALSA devices.""" + with ( + patch("python_pkg.wake_alarm._alarm._play_on_extra_devices") as mock, + patch("python_pkg.wake_alarm._alarm._max_fans", return_value=None), + patch("python_pkg.wake_alarm._alarm._restore_fans"), + patch("python_pkg.wake_alarm._alarm._set_max_brightness"), + patch("python_pkg.wake_alarm._alarm.turn_on_plug"), + patch("python_pkg.wake_alarm._alarm.turn_off_plug"), + ): + yield mock + + @pytest.fixture def mock_tk_module() -> Generator[MagicMock]: """Provide explicit access to the mocked tk module.""" @@ -224,13 +238,13 @@ class TestCodeRefreshAndTimer: assert alarm._current_code == old_code alarm._stop_beep.set() - def test_update_timer_noop_when_dismissed( + def test_update_timer_noop_when_not_active( self, mock_tk_module: MagicMock, ) -> None: - """Timer update is a no-op after dismissal.""" + """Timer update is a no-op when alarm is inactive.""" alarm = WakeAlarm(demo_mode=True) - alarm.dismissed = True + alarm._active = False alarm._update_timer() # Should not raise alarm._stop_beep.set() @@ -266,6 +280,18 @@ class TestCloseAndFallback: assert alarm._stop_beep.is_set() alarm.root.destroy.assert_called() + def test_close_restores_fans( + self, + mock_tk_module: MagicMock, + ) -> None: + """_close calls _restore_fans with the saved fan state.""" + alarm = WakeAlarm(demo_mode=True) + saved_state = ("/sys/class/hwmon/hwmon6", "5", "165") + alarm._fan_state = saved_state + with patch("python_pkg.wake_alarm._alarm._restore_fans") as mock_restore: + alarm._close() + mock_restore.assert_called_once_with(saved_state) + def test_close_and_schedule_fallback( self, mock_tk_module: MagicMock, @@ -276,6 +302,76 @@ class TestCloseAndFallback: alarm.root.destroy.assert_called() alarm._stop_beep.set() + def test_close_and_schedule_fallback_restores_fans( + self, + mock_tk_module: MagicMock, + ) -> None: + """_close_and_schedule_fallback calls _restore_fans with the saved state.""" + alarm = WakeAlarm(demo_mode=True) + saved_state = ("/sys/class/hwmon/hwmon6", "5", "165") + alarm._fan_state = saved_state + with patch("python_pkg.wake_alarm._alarm._restore_fans") as mock_restore: + alarm._close_and_schedule_fallback() + mock_restore.assert_called_once_with(saved_state) + alarm._stop_beep.set() + + +class TestScreenFlash: + """Tests for _start_screen_flash and _flash_step.""" + + def test_flash_step_shows_dark_on_flash_off( + self, + mock_tk_module: MagicMock, + ) -> None: + """When _flash_on=False, the background is set to dark colour.""" + alarm = WakeAlarm(demo_mode=True) + mock_root = mock_tk_module.Tk.return_value + mock_root.configure.reset_mock() + mock_root.after.reset_mock() + + alarm._flash_on = False + alarm._flash_step() + + mock_root.configure.assert_called_once_with(bg="#1a1a1a") + assert alarm._flash_on is True + mock_root.after.assert_called_with(750, alarm._flash_step) + alarm._stop_beep.set() + + def test_flash_step_shows_red_on_flash_on( + self, + mock_tk_module: MagicMock, + ) -> None: + """When _flash_on=True, the background is set to red.""" + alarm = WakeAlarm(demo_mode=True) + mock_root = mock_tk_module.Tk.return_value + mock_root.configure.reset_mock() + mock_root.after.reset_mock() + + alarm._flash_on = True + alarm._flash_step() + + mock_root.configure.assert_called_once_with(bg="#ff0000") + assert alarm._flash_on is False + mock_root.after.assert_called_with(750, alarm._flash_step) + alarm._stop_beep.set() + + def test_flash_step_stops_when_inactive( + self, + mock_tk_module: MagicMock, + ) -> None: + """When alarm is no longer active, _flash_step returns without scheduling.""" + alarm = WakeAlarm(demo_mode=True) + mock_root = mock_tk_module.Tk.return_value + alarm._active = False + mock_root.configure.reset_mock() + mock_root.after.reset_mock() + + alarm._flash_step() + + mock_root.configure.assert_not_called() + mock_root.after.assert_not_called() + alarm._stop_beep.set() + class TestDismissWithoutSkip: """Tests for alarm dismiss without earning skip.""" diff --git a/wake_alarm/tests/test_smart_plug.py b/wake_alarm/tests/test_smart_plug.py new file mode 100644 index 0000000..c7f85fe --- /dev/null +++ b/wake_alarm/tests/test_smart_plug.py @@ -0,0 +1,351 @@ +"""Tests for _smart_plug.py — Tapo P110 control with config + asyncio.""" + +from __future__ import annotations + +import asyncio +import json +from typing import TYPE_CHECKING +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from python_pkg.wake_alarm import _smart_plug +from python_pkg.wake_alarm._smart_plug import ( + _connect, + _load_config, + _run, + _set_state, + turn_off_plug, + turn_on_plug, +) + +if TYPE_CHECKING: + from collections.abc import Generator + from pathlib import Path + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_config_file(tmp_path: Path, contents: object) -> Path: + """Write ``contents`` (encoded as JSON unless str) to a config file.""" + path = tmp_path / "tapo.json" + if isinstance(contents, str): + path.write_text(contents, encoding="utf-8") + else: + path.write_text(json.dumps(contents), encoding="utf-8") + return path + + +@pytest.fixture +def _kasa_available() -> Generator[None]: + """Force _smart_plug to treat ``kasa`` as importable for the test.""" + with patch.object(_smart_plug, "_KASA_AVAILABLE", new=True): + yield + + +# --------------------------------------------------------------------------- +# _load_config +# --------------------------------------------------------------------------- + + +class TestLoadConfig: + """Tests for _load_config().""" + + def test_returns_none_when_file_missing(self, tmp_path: Path) -> None: + """Missing config file returns None.""" + with patch.object(_smart_plug, "TAPO_CONFIG_FILE", tmp_path / "missing.json"): + assert _load_config() is None + + def test_returns_none_for_invalid_json(self, tmp_path: Path) -> None: + """Malformed JSON returns None.""" + path = _make_config_file(tmp_path, "{not valid json") + with patch.object(_smart_plug, "TAPO_CONFIG_FILE", path): + assert _load_config() is None + + def test_returns_none_when_top_level_not_dict(self, tmp_path: Path) -> None: + """A JSON list at top level returns None.""" + path = _make_config_file(tmp_path, ["host", "email", "password"]) + with patch.object(_smart_plug, "TAPO_CONFIG_FILE", path): + assert _load_config() is None + + def test_returns_none_when_key_missing(self, tmp_path: Path) -> None: + """Missing required key returns None.""" + path = _make_config_file(tmp_path, {"host": "1.2.3.4", "email": "x"}) + with patch.object(_smart_plug, "TAPO_CONFIG_FILE", path): + assert _load_config() is None + + def test_returns_none_when_value_empty(self, tmp_path: Path) -> None: + """Empty-string value returns None.""" + path = _make_config_file( + tmp_path, {"host": "1.2.3.4", "email": "", "password": "p"} + ) + with patch.object(_smart_plug, "TAPO_CONFIG_FILE", path): + assert _load_config() is None + + def test_returns_none_when_value_not_string(self, tmp_path: Path) -> None: + """Non-string value returns None.""" + path = _make_config_file( + tmp_path, {"host": 1234, "email": "e", "password": "p"} + ) + with patch.object(_smart_plug, "TAPO_CONFIG_FILE", path): + assert _load_config() is None + + def test_returns_validated_dict(self, tmp_path: Path) -> None: + """Valid config returns a normalized dict with only required keys.""" + path = _make_config_file( + tmp_path, + { + "host": "192.168.1.50", + "email": "user@example.com", + "password": "secret", + "extra": "ignored", + }, + ) + with patch.object(_smart_plug, "TAPO_CONFIG_FILE", path): + assert _load_config() == { + "host": "192.168.1.50", + "email": "user@example.com", + "password": "secret", + } + + +# --------------------------------------------------------------------------- +# _connect +# --------------------------------------------------------------------------- + + +class TestConnect: + """Tests for _connect().""" + + def test_returns_device_on_success(self) -> None: + """Successful discover + update returns the device.""" + dev = MagicMock() + dev.update = AsyncMock() + dev.disconnect = AsyncMock() + with ( + patch.object(_smart_plug, "Discover") as mock_discover, + patch.object(_smart_plug, "Credentials") as mock_creds, + ): + mock_discover.discover_single = AsyncMock(return_value=dev) + result = asyncio.run( + _connect({"host": "1.2.3.4", "email": "e", "password": "p"}) + ) + assert result is dev + mock_creds.assert_called_once_with("e", "p") + + def test_returns_none_when_discover_raises_oserror(self) -> None: + """OSError during discovery returns None.""" + with patch.object(_smart_plug, "Discover") as mock_discover: + mock_discover.discover_single = AsyncMock(side_effect=OSError) + result = asyncio.run(_connect({"host": "h", "email": "e", "password": "p"})) + assert result is None + + def test_returns_none_when_update_raises(self) -> None: + """Failure during update returns None and attempts disconnect.""" + dev = MagicMock() + dev.update = AsyncMock(side_effect=OSError) + dev.disconnect = AsyncMock() + with patch.object(_smart_plug, "Discover") as mock_discover: + mock_discover.discover_single = AsyncMock(return_value=dev) + result = asyncio.run(_connect({"host": "h", "email": "e", "password": "p"})) + assert result is None + dev.disconnect.assert_awaited_once() + + def test_swallows_disconnect_failure_after_update_error(self) -> None: + """A disconnect error after a failed update is suppressed.""" + dev = MagicMock() + dev.update = AsyncMock(side_effect=OSError) + dev.disconnect = AsyncMock(side_effect=OSError) + with patch.object(_smart_plug, "Discover") as mock_discover: + mock_discover.discover_single = AsyncMock(return_value=dev) + result = asyncio.run(_connect({"host": "h", "email": "e", "password": "p"})) + assert result is None + + +# --------------------------------------------------------------------------- +# _set_state +# --------------------------------------------------------------------------- + + +class TestSetState: + """Tests for _set_state().""" + + def test_noop_when_config_missing(self) -> None: + """No config => no kasa calls.""" + with ( + patch.object(_smart_plug, "_load_config", return_value=None), + patch.object(_smart_plug, "_connect") as mock_connect, + ): + asyncio.run(_set_state(on=True)) + mock_connect.assert_not_called() + + def test_noop_when_connect_returns_none(self) -> None: + """Connect failure => no toggle.""" + with ( + patch.object( + _smart_plug, + "_load_config", + return_value={"host": "h", "email": "e", "password": "p"}, + ), + patch.object(_smart_plug, "_connect", new=AsyncMock(return_value=None)), + ): + asyncio.run(_set_state(on=True)) + + def test_turns_on_when_on_true(self) -> None: + """on=True calls dev.turn_on(), not turn_off().""" + dev = MagicMock() + dev.turn_on = AsyncMock() + dev.turn_off = AsyncMock() + dev.disconnect = AsyncMock() + with ( + patch.object( + _smart_plug, + "_load_config", + return_value={"host": "h", "email": "e", "password": "p"}, + ), + patch.object(_smart_plug, "_connect", new=AsyncMock(return_value=dev)), + ): + asyncio.run(_set_state(on=True)) + dev.turn_on.assert_awaited_once() + dev.turn_off.assert_not_called() + dev.disconnect.assert_awaited_once() + + def test_turns_off_when_on_false(self) -> None: + """on=False calls dev.turn_off(), not turn_on().""" + dev = MagicMock() + dev.turn_on = AsyncMock() + dev.turn_off = AsyncMock() + dev.disconnect = AsyncMock() + with ( + patch.object( + _smart_plug, + "_load_config", + return_value={"host": "h", "email": "e", "password": "p"}, + ), + patch.object(_smart_plug, "_connect", new=AsyncMock(return_value=dev)), + ): + asyncio.run(_set_state(on=False)) + dev.turn_off.assert_awaited_once() + dev.turn_on.assert_not_called() + + def test_swallows_toggle_oserror_and_still_disconnects(self) -> None: + """A toggle OSError is swallowed; disconnect still runs.""" + dev = MagicMock() + dev.turn_on = AsyncMock(side_effect=OSError) + dev.disconnect = AsyncMock() + with ( + patch.object( + _smart_plug, + "_load_config", + return_value={"host": "h", "email": "e", "password": "p"}, + ), + patch.object(_smart_plug, "_connect", new=AsyncMock(return_value=dev)), + ): + asyncio.run(_set_state(on=True)) + dev.disconnect.assert_awaited_once() + + def test_swallows_disconnect_oserror(self) -> None: + """A disconnect OSError after a successful toggle is suppressed.""" + dev = MagicMock() + dev.turn_on = AsyncMock() + dev.disconnect = AsyncMock(side_effect=OSError) + with ( + patch.object( + _smart_plug, + "_load_config", + return_value={"host": "h", "email": "e", "password": "p"}, + ), + patch.object(_smart_plug, "_connect", new=AsyncMock(return_value=dev)), + ): + asyncio.run(_set_state(on=True)) + + +# --------------------------------------------------------------------------- +# _run, turn_on_plug, turn_off_plug +# --------------------------------------------------------------------------- + + +class TestRun: + """Tests for _run() and the sync wrappers.""" + + def test_noop_when_kasa_unavailable(self) -> None: + """When kasa import failed, _run returns silently.""" + with ( + patch.object(_smart_plug, "_KASA_AVAILABLE", new=False), + patch.object(_smart_plug, "_set_state") as mock_set_state, + ): + _run(on=True) + mock_set_state.assert_not_called() + + @pytest.mark.usefixtures("_kasa_available") + def test_invokes_set_state(self) -> None: + """When kasa is available, _set_state runs via asyncio.run.""" + with patch.object(_smart_plug, "_set_state", new=AsyncMock()) as mock_set_state: + _run(on=True) + mock_set_state.assert_awaited_once_with(on=True) + + @pytest.mark.usefixtures("_kasa_available") + def test_swallows_timeout(self) -> None: + """A timeout from asyncio.wait_for is suppressed.""" + + async def _hang(**_: bool) -> None: + await asyncio.sleep(10) + + with ( + patch.object(_smart_plug, "_set_state", new=_hang), + patch.object(_smart_plug, "TAPO_TIMEOUT_SECONDS", 0.01), + ): + _run(on=True) + + @pytest.mark.usefixtures("_kasa_available") + def test_swallows_oserror(self) -> None: + """An OSError raised from _set_state is suppressed.""" + with patch.object( + _smart_plug, "_set_state", new=AsyncMock(side_effect=OSError) + ): + _run(on=True) + + @pytest.mark.usefixtures("_kasa_available") + def test_swallows_runtimeerror(self) -> None: + """A RuntimeError raised from _set_state is suppressed.""" + with patch.object( + _smart_plug, "_set_state", new=AsyncMock(side_effect=RuntimeError) + ): + _run(on=True) + + @pytest.mark.usefixtures("_kasa_available") + def test_turn_on_plug_delegates(self) -> None: + """turn_on_plug calls _run with on=True.""" + with patch.object(_smart_plug, "_run") as mock_run: + turn_on_plug() + mock_run.assert_called_once_with(on=True) + + @pytest.mark.usefixtures("_kasa_available") + def test_turn_off_plug_delegates(self) -> None: + """turn_off_plug calls _run with on=False.""" + with patch.object(_smart_plug, "_run") as mock_run: + turn_off_plug() + mock_run.assert_called_once_with(on=False) + + +class TestKasaImportFallback: + """Cover the ImportError branch of the optional ``kasa`` import.""" + + def test_module_sets_kasa_unavailable_when_import_fails( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Reloading the module with ``kasa`` blocked sets _KASA_AVAILABLE=False.""" + import importlib + import sys + + monkeypatch.setitem(sys.modules, "kasa", None) + monkeypatch.setitem(sys.modules, "kasa.exceptions", None) + try: + reloaded = importlib.reload(_smart_plug) + assert reloaded._KASA_AVAILABLE is False + finally: + monkeypatch.undo() + importlib.reload(_smart_plug) diff --git a/wake_alarm/wake-alarm-fans.sh b/wake_alarm/wake-alarm-fans.sh new file mode 100755 index 0000000..6745695 --- /dev/null +++ b/wake_alarm/wake-alarm-fans.sh @@ -0,0 +1,52 @@ +#!/bin/bash +# Control CPU/case fan speed for the wake alarm. +# +# Usage: +# wake-alarm-fans.sh max — ramp all NCT fans to 100% +# wake-alarm-fans.sh restore — restore saved values +# +# Must be run as root (installed in /etc/sudoers.d/wake-alarm via install.sh). +# Safe: fans are designed to run at max speed indefinitely. + +set -euo pipefail + +# Locate the hwmon directory for any NCT Super I/O fan controller. +HWMON="" +for name_file in /sys/class/hwmon/hwmon*/name; do + [[ -f "$name_file" ]] || continue + chip=$(cat "$name_file") + case "$chip" in + nct6775|nct6779|nct6791|nct6792|nct6793|nct6795|nct6796|nct6797|nct6798|nct6799) + HWMON=$(dirname "$name_file") + break + ;; + esac +done + +if [[ -z "$HWMON" ]]; then + # Not an error — hardware without this chip just skips fan control. + exit 0 +fi + +PWM_PATH="$HWMON/pwm1" +ENABLE_PATH="$HWMON/pwm1_enable" + +case "${1:-}" in + max) + echo 1 > "$ENABLE_PATH" # Switch to manual mode + echo 255 > "$PWM_PATH" # 255/255 = 100% speed + ;; + restore) + if [[ $# -ne 3 ]]; then + echo "Usage: $0 restore " >&2 + exit 1 + fi + # Restore pwm value first, then restore the control mode. + echo "${3}" > "$PWM_PATH" + echo "${2}" > "$ENABLE_PATH" + ;; + *) + echo "Usage: $0 max | $0 restore " >&2 + exit 1 + ;; +esac diff --git a/wake_alarm/wake-alarm.service b/wake_alarm/wake-alarm.service index 0330656..790fb44 100644 --- a/wake_alarm/wake-alarm.service +++ b/wake_alarm/wake-alarm.service @@ -6,7 +6,8 @@ After=graphical-session.target Type=simple ExecStart=/usr/bin/python -m python_pkg.wake_alarm._alarm --production WorkingDirectory=%h/testsAndMisc -Restart=no +Restart=on-failure +RestartSec=10 [Install] WantedBy=graphical-session.target diff --git a/wake_alarm/wake_state.json b/wake_alarm/wake_state.json index 1c22a17..ff8a0ad 100644 --- a/wake_alarm/wake_state.json +++ b/wake_alarm/wake_state.json @@ -1,6 +1,6 @@ { - "date": "2026-05-22", + "date": "2026-05-23", "dismissed_at": null, "skip_workout": false, - "hmac": "2261cf16bef81ce45f8f0664454c441a88bdacf1e71161a10c50472ff63fdf8c" + "hmac": "17ae4173304d5f4b7c2376df2326162ae3bc4dfcf3a38ccb6f0647c81f61b5fa" }