testsAndMisc/python_pkg/brother_printer/usb_query.py

236 lines
7.5 KiB
Python
Raw Normal View History

"""USB printer discovery and PJL query functions."""
from __future__ import annotations
import contextlib
import fcntl
import importlib
import os
from pathlib import Path
import select
import shutil
import subprocess
import time
from typing import TYPE_CHECKING
import urllib.parse
from python_pkg.brother_printer.data_classes import USBResult
if TYPE_CHECKING:
from collections.abc import Callable
import logging
logger = logging.getLogger(__name__)
# ── USB printer discovery ────────────────────────────────────────────
def find_brother_usb() -> str:
"""Look for any Brother printer on USB via lsusb. Returns the info line."""
if not shutil.which("lsusb"):
return ""
try:
r = subprocess.run(
["/usr/bin/lsusb"],
capture_output=True,
text=True,
timeout=5,
check=False,
)
for line in r.stdout.splitlines():
if "04f9:" in line.lower():
return line.split(": ", 1)[1] if ": " in line else line
except (subprocess.TimeoutExpired, OSError):
pass
return ""
def find_usb_printer_dev() -> str | None:
"""Find /dev/usb/lp* device for the Brother printer."""
devices = sorted(Path("/dev/usb").glob("lp*"))
return str(devices[0]) if devices else None
def _parse_cups_usb_uri(uri: str, info: dict[str, str]) -> None:
"""Extract product and serial from a CUPS usb:// URI."""
parsed = urllib.parse.urlparse(uri)
info["product"] = urllib.parse.unquote(parsed.path.lstrip("/"))
qs = urllib.parse.parse_qs(parsed.query)
if "serial" in qs:
info["serial"] = qs["serial"][0]
def get_printer_info_from_cups() -> dict[str, str]:
"""Get printer model/serial from lpstat."""
info: dict[str, str] = {"product": "", "serial": ""}
try:
r = subprocess.run(
["/usr/bin/lpstat", "-v"],
capture_output=True,
text=True,
timeout=5,
check=False,
)
for line in r.stdout.splitlines():
if "Brother" in line:
for part in line.split():
if part.startswith("usb://"):
_parse_cups_usb_uri(part, info)
break
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError):
logger.debug("Failed to query CUPS for printer info", exc_info=True)
return info
# ── PJL over USB ─────────────────────────────────────────────────────
def _drain_buffer(fd: int) -> None:
"""Read and discard any stale data from the USB buffer."""
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
with contextlib.suppress(OSError):
while os.read(fd, 4096):
pass
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
def _read_nonblocking(fd: int, flags: int) -> bytes:
"""Read all available data from fd in non-blocking mode."""
data = b""
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
with contextlib.suppress(OSError):
while True:
chunk = os.read(fd, 4096)
if not chunk:
break
data += chunk
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
return data
def _wait_for_pjl_response(fd: int, flags: int, deadline: float) -> bytes:
"""Poll fd until PJL data arrives or deadline expires."""
response = b""
while time.time() < deadline:
remaining = deadline - time.time()
if remaining <= 0:
break
readable, _, _ = select.select([fd], [], [], min(remaining, 1.0))
if readable:
response += _read_nonblocking(fd, flags)
if response and (b"=" in response or b"@PJL" in response):
break
return response
def pjl_query(fd: int, cmd: str, timeout_sec: float = 5.0) -> str:
"""Send a PJL command via raw fd and read the response."""
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
pjl_cmd = f"\x1b%-12345X@PJL\r\n{cmd}\r\n\x1b%-12345X"
os.write(fd, pjl_cmd.encode())
deadline = time.time() + timeout_sec
response = _wait_for_pjl_response(fd, flags, deadline)
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
return response.decode("ascii", errors="replace")
def _parse_status(resp: str, result: USBResult) -> bool:
"""Parse STATUS response into result. Returns True if code was found."""
found = False
for raw_line in resp.splitlines():
stripped = raw_line.strip()
if stripped.startswith("CODE="):
result.status_code = stripped.split("=", 1)[1]
found = True
elif stripped.startswith("DISPLAY="):
result.display = stripped.split("=", 1)[1].strip().strip('"').strip()
elif stripped.startswith("ONLINE="):
result.online = stripped.split("=", 1)[1]
return found
def _parse_variables(resp: str, result: USBResult) -> bool:
"""Parse VARIABLES response into result. Returns True if data found."""
found = False
for raw_line in resp.splitlines():
stripped = raw_line.strip()
if stripped.startswith("ECONOMODE="):
result.economode = stripped.split("=", 1)[1].split()[0]
found = True
return found
def _retry_pjl_query(
fd: int,
cmd: str,
parser: Callable[[str, USBResult], bool],
result: USBResult,
max_retries: int,
) -> None:
"""Send a PJL query with retries, draining between attempts."""
for attempt in range(max_retries + 1):
resp = pjl_query(fd, cmd)
if parser(resp, result):
break
if attempt < max_retries:
_drain_buffer(fd)
time.sleep(0.5)
def _run_pjl_queries(fd: int, result: USBResult, max_retries: int) -> None:
"""Execute PJL query sequence on an open file descriptor."""
_drain_buffer(fd)
os.write(fd, b"\x1b%-12345X@PJL\r\n\x1b%-12345X")
time.sleep(0.5)
_drain_buffer(fd)
_retry_pjl_query(fd, "@PJL INFO STATUS", _parse_status, result, max_retries)
_drain_buffer(fd)
time.sleep(0.5)
_retry_pjl_query(fd, "@PJL INFO VARIABLES", _parse_variables, result, max_retries)
def _init_usb_result(dev_path: str) -> USBResult:
"""Create a USBResult with device info from CUPS."""
cups_info = get_printer_info_from_cups()
return USBResult(
device=dev_path,
product=cups_info.get("product") or "Brother Laser Printer",
serial=cups_info.get("serial", ""),
)
def query_usb_pjl(max_retries: int = 2) -> USBResult:
"""Query a Brother printer via PJL over /dev/usb/lp*."""
dev_path = find_usb_printer_dev()
if not dev_path:
cups_service = importlib.import_module(
"python_pkg.brother_printer.cups_service",
)
return cups_service.query_usb_via_cups()
result = _init_usb_result(dev_path)
if not os.access(dev_path, os.R_OK | os.W_OK):
result.error = f"Permission denied: {dev_path}. Run with sudo."
return result
fd: int | None = None
try:
fd = os.open(dev_path, os.O_RDWR)
fcntl.fcntl(fd, fcntl.F_GETFL)
_run_pjl_queries(fd, result, max_retries)
except OSError as e:
result.error = str(e)
finally:
if fd is not None:
os.close(fd)
return result