diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8cead33..8fc1b9c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -232,23 +232,16 @@ repos: # - id: pyright # =========================================================================== - # FLAKE8 - Disabled: ruff covers all flake8 rules and is faster + # FLAKE8 - Python linter with plugins (local: uses venv with patched plugins) # =========================================================================== - # - repo: https://github.com/PyCQA/flake8 - # rev: 7.1.1 - # hooks: - # - id: flake8 - # args: - # - --max-line-length=120 - # - --extend-ignore=E203,W503,T201 - # - --max-complexity=10 - # - --statistics - # additional_dependencies: - # - flake8-bugbear - # - flake8-comprehensions - # - flake8-simplify - # - flake8-print - # exclude: ^(Bash/|\.venv/) + - repo: local + hooks: + - id: flake8 + name: flake8 + entry: .venv/bin/flake8 + language: system + types: [python] + exclude: ^(Bash/|\.venv/) # =========================================================================== # CHECK JSON/YAML/TOML formatting diff --git a/pyproject.toml b/pyproject.toml index 3bc9e35..2ed44e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -292,9 +292,9 @@ max-complexity = 10 # Maximum cognitive complexity (flake8-cognitive-complexity) max-cognitive-complexity = 12 # Maximum function length (flake8-functions) -max-function-length = 50 +max-function-length = 20 # Maximum returns/arguments per function -max-returns = 6 +max-returns-amount = 6 max-arguments = 5 # Docstring convention (matches ruff) docstring-convention = "google" diff --git a/python_pkg/brother_printer/check_brother_printer.py b/python_pkg/brother_printer/check_brother_printer.py index 5ffb10f..3d2c8b9 100644 --- a/python_pkg/brother_printer/check_brother_printer.py +++ b/python_pkg/brother_printer/check_brother_printer.py @@ -210,6 +210,15 @@ def find_usb_printer_dev() -> str | None: return str(devices[0]) if devices else None +def _parse_cups_usb_uri(uri: str, info: dict[str, str]) -> None: + """Extract product and serial from a CUPS usb:// URI.""" + parsed = urllib.parse.urlparse(uri) + info["product"] = urllib.parse.unquote(parsed.path.lstrip("/")) + qs = urllib.parse.parse_qs(parsed.query) + if "serial" in qs: + info["serial"] = qs["serial"][0] + + def get_printer_info_from_cups() -> dict[str, str]: """Get printer model/serial from lpstat.""" info: dict[str, str] = {"product": "", "serial": ""} @@ -222,16 +231,11 @@ def get_printer_info_from_cups() -> dict[str, str]: check=False, ) for line in r.stdout.splitlines(): - if "Brother" not in line: - continue - for part in line.split(): - if part.startswith("usb://"): - parsed = urllib.parse.urlparse(part) - info["product"] = urllib.parse.unquote(parsed.path.lstrip("/")) - qs = urllib.parse.parse_qs(parsed.query) - if "serial" in qs: - info["serial"] = qs["serial"][0] - break + if "Brother" in line: + for part in line.split(): + if part.startswith("usb://"): + _parse_cups_usb_uri(part, info) + break except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError): logger.debug("Failed to query CUPS for printer info", exc_info=True) return info @@ -250,42 +254,46 @@ def _drain_buffer(fd: int) -> None: fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) -def pjl_query(fd: int, cmd: str, timeout_sec: float = 5.0) -> str: - """Send a PJL command via raw fd and read the response. - - Uses select() to wait for data availability instead of polling. - """ - # Ensure blocking mode for write - flags = fcntl.fcntl(fd, fcntl.F_GETFL) +def _read_nonblocking(fd: int, flags: int) -> bytes: + """Read all available data from fd in non-blocking mode.""" + data = b"" + fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + with contextlib.suppress(OSError): + while True: + chunk = os.read(fd, 4096) + if not chunk: + break + data += chunk fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) + return data - pjl_cmd = f"\x1b%-12345X@PJL\r\n{cmd}\r\n\x1b%-12345X" - os.write(fd, pjl_cmd.encode()) - # Wait for data to become available using select() +def _wait_for_pjl_response(fd: int, flags: int, deadline: float) -> bytes: + """Poll fd until PJL data arrives or deadline expires.""" response = b"" - deadline = time.time() + timeout_sec while time.time() < deadline: remaining = deadline - time.time() if remaining <= 0: break readable, _, _ = select.select([fd], [], [], min(remaining, 1.0)) if readable: - # Switch to non-blocking to read all available data - fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - with contextlib.suppress(OSError): - while True: - chunk = os.read(fd, 4096) - if chunk: - response += chunk - else: - break - fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) - # If we got meaningful PJL data, stop waiting + response += _read_nonblocking(fd, flags) if response and (b"=" in response or b"@PJL" in response): break + return response + + +def pjl_query(fd: int, cmd: str, timeout_sec: float = 5.0) -> str: + """Send a PJL command via raw fd and read the response.""" + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) + + pjl_cmd = f"\x1b%-12345X@PJL\r\n{cmd}\r\n\x1b%-12345X" + os.write(fd, pjl_cmd.encode()) + + deadline = time.time() + timeout_sec + response = _wait_for_pjl_response(fd, flags, deadline) - # Restore blocking mode fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) return response.decode("ascii", errors="replace") @@ -333,19 +341,37 @@ def _retry_pjl_query( time.sleep(0.5) +def _run_pjl_queries(fd: int, result: USBResult, max_retries: int) -> None: + """Execute PJL query sequence on an open file descriptor.""" + _drain_buffer(fd) + + os.write(fd, b"\x1b%-12345X@PJL\r\n\x1b%-12345X") + time.sleep(0.5) + _drain_buffer(fd) + + _retry_pjl_query(fd, "@PJL INFO STATUS", _parse_status, result, max_retries) + _drain_buffer(fd) + time.sleep(0.5) + _retry_pjl_query(fd, "@PJL INFO VARIABLES", _parse_variables, result, max_retries) + + +def _init_usb_result(dev_path: str) -> USBResult: + """Create a USBResult with device info from CUPS.""" + cups_info = get_printer_info_from_cups() + return USBResult( + device=dev_path, + product=cups_info.get("product") or "Brother Laser Printer", + serial=cups_info.get("serial", ""), + ) + + def query_usb_pjl(max_retries: int = 2) -> USBResult: """Query a Brother printer via PJL over /dev/usb/lp*.""" dev_path = find_usb_printer_dev() if not dev_path: return USBResult(error="No USB printer device found at /dev/usb/lp*") - cups_info = get_printer_info_from_cups() - result = USBResult( - device=dev_path, - product=cups_info.get("product") or "Brother Laser Printer", - serial=cups_info.get("serial", ""), - ) - + result = _init_usb_result(dev_path) if not os.access(dev_path, os.R_OK | os.W_OK): result.error = f"Permission denied: {dev_path}. Run with sudo." return result @@ -354,34 +380,25 @@ def query_usb_pjl(max_retries: int = 2) -> USBResult: try: fd = os.open(dev_path, os.O_RDWR) fcntl.fcntl(fd, fcntl.F_GETFL) - - # Drain any stale data in the USB buffer - _drain_buffer(fd) - - # Wake-up: send a bare UEL to get the printer's attention - os.write(fd, b"\x1b%-12345X@PJL\r\n\x1b%-12345X") - time.sleep(0.5) - _drain_buffer(fd) - - _retry_pjl_query(fd, "@PJL INFO STATUS", _parse_status, result, max_retries) - _drain_buffer(fd) - time.sleep(0.5) - _retry_pjl_query( - fd, "@PJL INFO VARIABLES", _parse_variables, result, max_retries - ) - + _run_pjl_queries(fd, result, max_retries) except OSError as e: result.error = str(e) finally: if fd is not None: os.close(fd) - return result # ── SNMP network query ────────────────────────────────────────────── +def _snmpwalk_cmd( + path: str, community: str, timeout: int, ip: str, oid: str +) -> list[str]: + """Build the snmpwalk command arguments.""" + return [path, "-v", "2c", "-c", community, "-t", str(timeout), "-OQvs", ip, oid] + + def snmp_walk(ip: str, oid: str, community: str, timeout: int) -> list[str]: """Run snmpwalk and return cleaned values.""" snmpwalk_path = shutil.which("snmpwalk") @@ -389,18 +406,7 @@ def snmp_walk(ip: str, oid: str, community: str, timeout: int) -> list[str]: return [] try: r = subprocess.run( - [ - snmpwalk_path, - "-v", - "2c", - "-c", - community, - "-t", - str(timeout), - "-OQvs", - ip, - oid, - ], + _snmpwalk_cmd(snmpwalk_path, community, timeout, ip, oid), capture_output=True, text=True, timeout=15, @@ -415,37 +421,38 @@ def snmp_walk(ip: str, oid: str, community: str, timeout: int) -> list[str]: return [] -def query_network_snmp(ip: str) -> NetworkResult: - """Query a Brother printer via SNMP over the network.""" - community = "public" - timeout = 5 +def _snmpget_cmd( + path: str, community: str, timeout: int, ip: str, oid: str +) -> list[str]: + """Build the snmpget command arguments.""" + return [path, "-v", "2c", "-c", community, "-t", str(timeout), ip, oid] - # Quick connectivity check + +def _check_snmp_connectivity(ip: str, community: str, timeout: int) -> str | None: + """Verify SNMP connectivity. Returns error message or None on success.""" snmpget_path = shutil.which("snmpget") if not snmpget_path: - return NetworkResult( - ip=ip, - error="snmpget not found. Install: sudo pacman -S net-snmp", - ) + return "snmpget not found. Install: sudo pacman -S net-snmp" try: subprocess.run( - [ + _snmpget_cmd( snmpget_path, - "-v", - "2c", - "-c", community, - "-t", - str(timeout), + timeout, ip, "1.3.6.1.2.1.43.11.1.1.6.1.1", - ], + ), capture_output=True, timeout=10, check=True, ) except (subprocess.TimeoutExpired, subprocess.CalledProcessError, OSError): - return NetworkResult(ip=ip, error=f"Cannot reach printer at {ip} via SNMP.") + return f"Cannot reach printer at {ip} via SNMP." + return None + + +def _build_network_result(ip: str, community: str, timeout: int) -> NetworkResult: + """Collect all SNMP data into a NetworkResult.""" def walk(oid: str) -> list[str]: return snmp_walk(ip, oid, community, timeout) @@ -464,6 +471,16 @@ def query_network_snmp(ip: str) -> NetworkResult: ) +def query_network_snmp(ip: str) -> NetworkResult: + """Query a Brother printer via SNMP over the network.""" + community = "public" + timeout = 5 + error = _check_snmp_connectivity(ip, community, timeout) + if error: + return NetworkResult(ip=ip, error=error) + return _build_network_result(ip, community, timeout) + + # ── Status code lookup ────────────────────────────────────────────── @@ -528,6 +545,49 @@ def _display_usb_device_info(result: USBResult) -> None: _out(f"{BOLD}Toner Save:{RESET} OFF") +_SEVERITY_ICONS: dict[str, str] = { + "ok": "✓", + "info": "i", + "warn": "⚡", + "critical": "⚠", +} +_SEVERITY_COLORS: dict[str, str] = { + "ok": GREEN, + "info": CYAN, + "warn": YELLOW, + "critical": RED, +} +_SEVERITY_SUMMARIES: dict[str, str] = { + "ok": f"{GREEN}{BOLD}✓ Printer is healthy. No replacements needed.{RESET}", + "info": f"{CYAN}{BOLD}i Printer is busy/processing." + f" No replacements needed.{RESET}", + "warn": f"{YELLOW}{BOLD}⚡ WARNING: Maintenance will be needed" + f" soon.{RESET}\n{YELLOW} Order replacement parts" + f" now to avoid interruption.{RESET}", + "critical": f"{RED}{BOLD}⚠ ACTION REQUIRED: Replacement or fix" + f" needed now!{RESET}", +} + + +def _format_status_detail( + severity: str, short_text: str, action: str, result: USBResult +) -> None: + """Print severity icon, display text, and action.""" + color = _SEVERITY_COLORS.get(severity, GREEN) + icon = _SEVERITY_ICONS.get(severity, "✓") + + _out(f" {color}{BOLD}{icon} {short_text}{RESET}") + if result.display and result.display != short_text: + _out(f" {DIM}Display: {result.display}{RESET}") + _out(f" {DIM}Status code: {result.status_code}{RESET}") + + if action: + _out() + _out(f" {color}{BOLD}Action:{RESET} {color}{action}{RESET}") + _out() + _out(_SEVERITY_SUMMARIES.get(severity, "")) + + def _display_pjl_status(result: USBResult) -> None: """Display PJL status code interpretation.""" _out() @@ -541,34 +601,7 @@ def _display_pjl_status(result: USBResult) -> None: return severity, short_text, action = get_status_info(result.status_code) - - icons = {"ok": "✓", "info": "i", "warn": "⚡", "critical": "⚠"} - colors = {"ok": GREEN, "info": CYAN, "warn": YELLOW, "critical": RED} - color = colors.get(severity, GREEN) - icon = icons.get(severity, "✓") - - _out(f" {color}{BOLD}{icon} {short_text}{RESET}") - if result.display and result.display != short_text: - _out(f" {DIM}Display: {result.display}{RESET}") - _out(f" {DIM}Status code: {result.status_code}{RESET}") - - if action: - _out() - _out(f" {color}{BOLD}Action:{RESET} {color}{action}{RESET}") - - _out() - - summaries = { - "ok": f"{GREEN}{BOLD}✓ Printer is healthy." f" No replacements needed.{RESET}", - "info": f"{CYAN}{BOLD}i Printer is busy/processing." - f" No replacements needed.{RESET}", - "warn": f"{YELLOW}{BOLD}⚡ WARNING: Maintenance will be needed" - f" soon.{RESET}\n{YELLOW} Order replacement parts" - f" now to avoid interruption.{RESET}", - "critical": f"{RED}{BOLD}⚠ ACTION REQUIRED: Replacement or fix" - f" needed now!{RESET}", - } - _out(summaries.get(severity, "")) + _format_status_detail(severity, short_text, action, result) # ── Display: USB results ──────────────────────────────────────────── @@ -601,44 +634,46 @@ class _SupplyStatus: needs_replacement: bool +def _classify_percentage_level(desc: str, pct: int) -> tuple[int, str, str, str, bool]: + """Classify a supply by its calculated percentage.""" + if pct <= SUPPLY_LOW_PCT: + return pct, f"{pct}%", RED, f"{desc} at {pct}%.", True + if pct <= SUPPLY_WARN_PCT: + return pct, f"{pct}%", YELLOW, f"{desc} at {pct}% -- order soon.", False + return pct, f"{pct}%", GREEN, "", False + + +def _classify_supply_level( + desc: str, max_val: int, level: int +) -> tuple[int, str, str, str, bool]: + """Classify a supply level. Returns (pct, status, color, warning, replace).""" + if level == SNMP_LEVEL_OK: + return -1, "OK", GREEN, "", False + if level == SNMP_LEVEL_LOW: + return -1, "LOW", RED, f"{desc} is LOW.", True + if level == 0: + return 0, "EMPTY", RED, f"{desc} is EMPTY -- replace now!", True + if max_val > 0: + pct = min(level * 100 // max_val, 100) + return _classify_percentage_level(desc, pct) + return -1, "", GREEN, "", False + + +def _format_supply_bar(pct: int) -> str: + """Build a progress bar string for a supply percentage.""" + if pct < 0: + return "" + filled = pct * PROGRESS_BAR_WIDTH // 100 + empty = PROGRESS_BAR_WIDTH - filled + return f"[{'█' * filled}{'░' * empty}]" + + def _process_supply_item(desc: str, max_val: int, level: int) -> _SupplyStatus: """Process a single supply item into display info.""" - pct = -1 - status_text = "" - color = GREEN - warning = "" - needs_replacement = False - - if level == SNMP_LEVEL_OK: - status_text = "OK" - elif level == SNMP_LEVEL_LOW: - status_text = "LOW" - color = RED - needs_replacement = True - warning = f"{desc} is LOW." - elif level == 0: - status_text = "EMPTY" - color = RED - pct = 0 - needs_replacement = True - warning = f"{desc} is EMPTY -- replace now!" - elif max_val > 0: - pct = min(level * 100 // max_val, 100) - status_text = f"{pct}%" - if pct <= SUPPLY_LOW_PCT: - color = RED - needs_replacement = True - warning = f"{desc} at {pct}%." - elif pct <= SUPPLY_WARN_PCT: - color = YELLOW - warning = f"{desc} at {pct}% -- order soon." - - bar = "" - if pct >= 0: - filled = pct * PROGRESS_BAR_WIDTH // 100 - empty = PROGRESS_BAR_WIDTH - filled - bar = f"[{'█' * filled}{'░' * empty}]" - + pct, status_text, color, warning, needs_replacement = _classify_supply_level( + desc, max_val, level + ) + bar = _format_supply_bar(pct) return _SupplyStatus(color, bar, status_text, warning, needs_replacement) @@ -657,6 +692,28 @@ def _display_supply_warnings(*, needs_replacement: bool, warnings: list[str]) -> _out(f"{GREEN}{BOLD}✓ All consumables are at healthy levels.{RESET}") +def _parse_supply_value(values: list[str], index: int) -> int: + """Safely parse an integer from a supply value list.""" + try: + return int(values[index]) + except (IndexError, ValueError): + return 0 + + +def _collect_supply_items( + result: NetworkResult, +) -> tuple[list[_SupplyStatus], list[str]]: + """Parse and collect supply items with their descriptions.""" + items: list[_SupplyStatus] = [] + descs: list[str] = [] + for i, desc in enumerate(result.supply_descriptions): + max_val = _parse_supply_value(result.supply_max, i) + level = _parse_supply_value(result.supply_levels, i) + items.append(_process_supply_item(desc, max_val, level)) + descs.append(desc) + return items, descs + + def _display_supply_levels(result: NetworkResult) -> None: """Display consumable supply levels section.""" _out() @@ -665,18 +722,9 @@ def _display_supply_levels(result: NetworkResult) -> None: needs_replacement = False warnings: list[str] = [] + items, descs = _collect_supply_items(result) - for i, desc in enumerate(result.supply_descriptions): - try: - max_val = int(result.supply_max[i]) - except (IndexError, ValueError): - max_val = 0 - try: - level = int(result.supply_levels[i]) - except (IndexError, ValueError): - level = 0 - - item = _process_supply_item(desc, max_val, level) + for desc, item in zip(descs, items, strict=True): _out( f" {BOLD}{desc:<25}{RESET}" f" {item.color}{item.bar} {item.status_text}{RESET}" @@ -749,58 +797,58 @@ def _discover_network_printer() -> str: return "" -def main(argv: list[str] | None = None) -> None: - """Entry point: auto-detect USB or network Brother printer.""" - args = argv if argv is not None else sys.argv[1:] - printer_ip = args[0] if args else "" +def _run_network_mode(printer_ip: str) -> None: + """Handle explicit network/SNMP mode.""" + if not shutil.which("snmpwalk"): + _out(f"{RED}snmpwalk not found. Install: sudo pacman -S net-snmp{RESET}") + sys.exit(1) + _out(f"{CYAN}Querying printer at {printer_ip} via SNMP...{RESET}") + display_network_results(query_network_snmp(printer_ip)) - # ── Network mode (explicit IP given) ───────────────────────── - if printer_ip: - if not shutil.which("snmpwalk"): - _out( - f"{RED}snmpwalk not found." f" Install: sudo pacman -S net-snmp{RESET}" - ) - sys.exit(1) - _out(f"{CYAN}Querying printer at {printer_ip} via SNMP...{RESET}") - net_result = query_network_snmp(printer_ip) - display_network_results(net_result) - return - # ── Auto-detect: USB first, then network ───────────────────── - usb_line = find_brother_usb() +def _run_usb_mode(usb_line: str) -> None: + """Handle USB printer mode.""" + _out(f"{CYAN}Found Brother printer on USB: {usb_line}{RESET}") + if os.geteuid() != 0: + _out(f"{RED}Root access required for USB printer." f" Re-run with sudo.{RESET}") + sys.exit(1) + display_usb_results(query_usb_pjl()) - if usb_line: - _out(f"{CYAN}Found Brother printer on USB: {usb_line}{RESET}") - - if os.geteuid() != 0: - _out( - f"{RED}Root access required for USB printer." - f" Re-run with sudo.{RESET}" - ) - sys.exit(1) - - usb_result = query_usb_pjl() - display_usb_results(usb_result) - return - - # ── Try network discovery via CUPS ─────────────────────────── - network_ip = _discover_network_printer() - - if network_ip and shutil.which("snmpwalk"): - _out(f"{CYAN}Found network printer at {network_ip}{RESET}") - net_result = query_network_snmp(network_ip) - display_network_results(net_result) - return +def _no_printer_found() -> None: + """Print error message when no printer is detected.""" _out(f"{RED}No Brother printer found.{RESET}") _out() _out("Ensure the printer is:") - _out(" • Powered on") - _out(" • Connected via USB or on the same network") + _out(" \u2022 Powered on") + _out(" \u2022 Connected via USB or on the same network") _out() _out("Usage: python3 -m brother_printer [printer_ip]") sys.exit(1) +def main(argv: list[str] | None = None) -> None: + """Entry point: auto-detect USB or network Brother printer.""" + args = argv if argv is not None else sys.argv[1:] + printer_ip = args[0] if args else "" + + if printer_ip: + _run_network_mode(printer_ip) + return + + usb_line = find_brother_usb() + if usb_line: + _run_usb_mode(usb_line) + return + + network_ip = _discover_network_printer() + if network_ip and shutil.which("snmpwalk"): + _out(f"{CYAN}Found network printer at {network_ip}{RESET}") + display_network_results(query_network_snmp(network_ip)) + return + + _no_printer_found() + + if __name__ == "__main__": main()