mirror of
https://github.com/kuhyx/testsAndMisc.git
synced 2026-07-04 15:43:06 +02:00
Split diet_guard/_gatelock.py, wake_alarm/_alarm.py, and the usage_report.py/_usage_report_parsing.py pair into focused sub-modules so every Python file is <= 500 lines, satisfying test_file_length.py. Install python-kasa into .venv (declared in requirements but missing after the 3.13->3.14 venv upgrade), fixing 8 failing smart_plug tests and restoring 100% coverage. Also includes prior in-progress work from the working tree: the wake_alarm Progress/View/Hardware field-grouping refactor, brother_printer query module + tests, diet_guard foodbank/state/cli updates, new shared coerce/logging_setup helpers, morning_routine orchestrator tweaks, dwm window-manager config, gaming scripts, and misc maintenance/digital-wellbeing script updates. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
197 lines
6.2 KiB
Python
197 lines
6.2 KiB
Python
"""USB printer discovery and PJL query functions."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import fcntl
|
|
import importlib
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
import select
|
|
import shutil
|
|
import time
|
|
from typing import TYPE_CHECKING
|
|
|
|
from python_pkg.brother_printer._query import (
|
|
printer_info_from_cups,
|
|
run_command_text,
|
|
)
|
|
from python_pkg.brother_printer.data_classes import USBResult
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Callable
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ── USB printer discovery ────────────────────────────────────────────
|
|
|
|
|
|
def find_brother_usb() -> str:
|
|
"""Look for any Brother printer on USB via lsusb. Returns the info line."""
|
|
if not shutil.which("lsusb"):
|
|
return ""
|
|
for line in run_command_text(["/usr/bin/lsusb"]).splitlines():
|
|
if "04f9:" in line.lower():
|
|
return line.split(": ", 1)[1] if ": " in line else line
|
|
return ""
|
|
|
|
|
|
def find_usb_printer_dev() -> str | None:
|
|
"""Find /dev/usb/lp* device for the Brother printer."""
|
|
devices = sorted(Path("/dev/usb").glob("lp*"))
|
|
return str(devices[0]) if devices else None
|
|
|
|
|
|
# ── PJL over USB ─────────────────────────────────────────────────────
|
|
|
|
|
|
def _drain_buffer(fd: int) -> None:
|
|
"""Read and discard any stale data from the USB buffer."""
|
|
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
|
|
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
|
with contextlib.suppress(OSError):
|
|
while os.read(fd, 4096):
|
|
pass
|
|
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
|
|
|
|
|
|
def _read_nonblocking(fd: int, flags: int) -> bytes:
|
|
"""Read all available data from fd in non-blocking mode."""
|
|
data = b""
|
|
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
|
with contextlib.suppress(OSError):
|
|
while True:
|
|
chunk = os.read(fd, 4096)
|
|
if not chunk:
|
|
break
|
|
data += chunk
|
|
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
|
|
return data
|
|
|
|
|
|
def _wait_for_pjl_response(fd: int, flags: int, deadline: float) -> bytes:
|
|
"""Poll fd until PJL data arrives or deadline expires."""
|
|
response = b""
|
|
while time.time() < deadline:
|
|
remaining = deadline - time.time()
|
|
if remaining <= 0:
|
|
break
|
|
readable, _, _ = select.select([fd], [], [], min(remaining, 1.0))
|
|
if readable:
|
|
response += _read_nonblocking(fd, flags)
|
|
if response and (b"=" in response or b"@PJL" in response):
|
|
break
|
|
return response
|
|
|
|
|
|
def pjl_query(fd: int, cmd: str, timeout_sec: float = 5.0) -> str:
|
|
"""Send a PJL command via raw fd and read the response."""
|
|
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
|
|
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
|
|
|
|
pjl_cmd = f"\x1b%-12345X@PJL\r\n{cmd}\r\n\x1b%-12345X"
|
|
os.write(fd, pjl_cmd.encode())
|
|
|
|
deadline = time.time() + timeout_sec
|
|
response = _wait_for_pjl_response(fd, flags, deadline)
|
|
|
|
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
|
|
return response.decode("ascii", errors="replace")
|
|
|
|
|
|
def _parse_status(resp: str, result: USBResult) -> bool:
|
|
"""Parse STATUS response into result. Returns True if code was found."""
|
|
found = False
|
|
for raw_line in resp.splitlines():
|
|
stripped = raw_line.strip()
|
|
if stripped.startswith("CODE="):
|
|
result.status_code = stripped.split("=", 1)[1]
|
|
found = True
|
|
elif stripped.startswith("DISPLAY="):
|
|
result.display = stripped.split("=", 1)[1].strip().strip('"').strip()
|
|
elif stripped.startswith("ONLINE="):
|
|
result.online = stripped.split("=", 1)[1]
|
|
return found
|
|
|
|
|
|
def _parse_variables(resp: str, result: USBResult) -> bool:
|
|
"""Parse VARIABLES response into result. Returns True if data found."""
|
|
found = False
|
|
for raw_line in resp.splitlines():
|
|
stripped = raw_line.strip()
|
|
if stripped.startswith("ECONOMODE="):
|
|
result.economode = stripped.split("=", 1)[1].split()[0]
|
|
found = True
|
|
return found
|
|
|
|
|
|
def _retry_pjl_query(
|
|
fd: int,
|
|
cmd: str,
|
|
parser: Callable[[str, USBResult], bool],
|
|
result: USBResult,
|
|
max_retries: int,
|
|
) -> None:
|
|
"""Send a PJL query with retries, draining between attempts."""
|
|
for attempt in range(max_retries + 1):
|
|
resp = pjl_query(fd, cmd)
|
|
if parser(resp, result):
|
|
break
|
|
if attempt < max_retries:
|
|
_drain_buffer(fd)
|
|
time.sleep(0.5)
|
|
|
|
|
|
def _run_pjl_queries(fd: int, result: USBResult, max_retries: int) -> None:
|
|
"""Execute PJL query sequence on an open file descriptor."""
|
|
_drain_buffer(fd)
|
|
|
|
os.write(fd, b"\x1b%-12345X@PJL\r\n\x1b%-12345X")
|
|
time.sleep(0.5)
|
|
_drain_buffer(fd)
|
|
|
|
_retry_pjl_query(fd, "@PJL INFO STATUS", _parse_status, result, max_retries)
|
|
_drain_buffer(fd)
|
|
time.sleep(0.5)
|
|
_retry_pjl_query(fd, "@PJL INFO VARIABLES", _parse_variables, result, max_retries)
|
|
|
|
|
|
def _init_usb_result(dev_path: str) -> USBResult:
|
|
"""Create a USBResult with device info from CUPS."""
|
|
cups_info = printer_info_from_cups()
|
|
return USBResult(
|
|
device=dev_path,
|
|
product=cups_info.get("product") or "Brother Laser Printer",
|
|
serial=cups_info.get("serial", ""),
|
|
)
|
|
|
|
|
|
def query_usb_pjl(max_retries: int = 2) -> USBResult:
|
|
"""Query a Brother printer via PJL over /dev/usb/lp*."""
|
|
dev_path = find_usb_printer_dev()
|
|
if not dev_path:
|
|
cups_service = importlib.import_module(
|
|
"python_pkg.brother_printer.cups_service",
|
|
)
|
|
return cups_service.query_usb_via_cups()
|
|
|
|
result = _init_usb_result(dev_path)
|
|
if not os.access(dev_path, os.R_OK | os.W_OK):
|
|
result.error = f"Permission denied: {dev_path}. Run with sudo."
|
|
return result
|
|
|
|
fd: int | None = None
|
|
try:
|
|
fd = os.open(dev_path, os.O_RDWR)
|
|
fcntl.fcntl(fd, fcntl.F_GETFL)
|
|
_run_pjl_queries(fd, result, max_retries)
|
|
except OSError as e:
|
|
result.error = str(e)
|
|
finally:
|
|
if fd is not None:
|
|
os.close(fd)
|
|
return result
|