feat: added flake8 max funciton length

This commit is contained in:
Krzysztof kuhy Rudnicki 2026-02-06 21:39:15 +01:00
parent 029a99aaba
commit 11427631cd
3 changed files with 261 additions and 220 deletions

View File

@ -232,23 +232,16 @@ repos:
# - id: pyright
# ===========================================================================
# FLAKE8 - Disabled: ruff covers all flake8 rules and is faster
# FLAKE8 - Python linter with plugins (local: uses venv with patched plugins)
# ===========================================================================
# - repo: https://github.com/PyCQA/flake8
# rev: 7.1.1
# hooks:
# - id: flake8
# args:
# - --max-line-length=120
# - --extend-ignore=E203,W503,T201
# - --max-complexity=10
# - --statistics
# additional_dependencies:
# - flake8-bugbear
# - flake8-comprehensions
# - flake8-simplify
# - flake8-print
# exclude: ^(Bash/|\.venv/)
- repo: local
hooks:
- id: flake8
name: flake8
entry: .venv/bin/flake8
language: system
types: [python]
exclude: ^(Bash/|\.venv/)
# ===========================================================================
# CHECK JSON/YAML/TOML formatting

View File

@ -292,9 +292,9 @@ max-complexity = 10
# Maximum cognitive complexity (flake8-cognitive-complexity)
max-cognitive-complexity = 12
# Maximum function length (flake8-functions)
max-function-length = 50
max-function-length = 20
# Maximum returns/arguments per function
max-returns = 6
max-returns-amount = 6
max-arguments = 5
# Docstring convention (matches ruff)
docstring-convention = "google"

View File

@ -210,6 +210,15 @@ def find_usb_printer_dev() -> str | None:
return str(devices[0]) if devices else None
def _parse_cups_usb_uri(uri: str, info: dict[str, str]) -> None:
"""Extract product and serial from a CUPS usb:// URI."""
parsed = urllib.parse.urlparse(uri)
info["product"] = urllib.parse.unquote(parsed.path.lstrip("/"))
qs = urllib.parse.parse_qs(parsed.query)
if "serial" in qs:
info["serial"] = qs["serial"][0]
def get_printer_info_from_cups() -> dict[str, str]:
"""Get printer model/serial from lpstat."""
info: dict[str, str] = {"product": "", "serial": ""}
@ -222,16 +231,11 @@ def get_printer_info_from_cups() -> dict[str, str]:
check=False,
)
for line in r.stdout.splitlines():
if "Brother" not in line:
continue
for part in line.split():
if part.startswith("usb://"):
parsed = urllib.parse.urlparse(part)
info["product"] = urllib.parse.unquote(parsed.path.lstrip("/"))
qs = urllib.parse.parse_qs(parsed.query)
if "serial" in qs:
info["serial"] = qs["serial"][0]
break
if "Brother" in line:
for part in line.split():
if part.startswith("usb://"):
_parse_cups_usb_uri(part, info)
break
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError):
logger.debug("Failed to query CUPS for printer info", exc_info=True)
return info
@ -250,42 +254,46 @@ def _drain_buffer(fd: int) -> None:
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
def pjl_query(fd: int, cmd: str, timeout_sec: float = 5.0) -> str:
"""Send a PJL command via raw fd and read the response.
Uses select() to wait for data availability instead of polling.
"""
# Ensure blocking mode for write
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
def _read_nonblocking(fd: int, flags: int) -> bytes:
"""Read all available data from fd in non-blocking mode."""
data = b""
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
with contextlib.suppress(OSError):
while True:
chunk = os.read(fd, 4096)
if not chunk:
break
data += chunk
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
return data
pjl_cmd = f"\x1b%-12345X@PJL\r\n{cmd}\r\n\x1b%-12345X"
os.write(fd, pjl_cmd.encode())
# Wait for data to become available using select()
def _wait_for_pjl_response(fd: int, flags: int, deadline: float) -> bytes:
"""Poll fd until PJL data arrives or deadline expires."""
response = b""
deadline = time.time() + timeout_sec
while time.time() < deadline:
remaining = deadline - time.time()
if remaining <= 0:
break
readable, _, _ = select.select([fd], [], [], min(remaining, 1.0))
if readable:
# Switch to non-blocking to read all available data
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
with contextlib.suppress(OSError):
while True:
chunk = os.read(fd, 4096)
if chunk:
response += chunk
else:
break
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
# If we got meaningful PJL data, stop waiting
response += _read_nonblocking(fd, flags)
if response and (b"=" in response or b"@PJL" in response):
break
return response
def pjl_query(fd: int, cmd: str, timeout_sec: float = 5.0) -> str:
"""Send a PJL command via raw fd and read the response."""
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
pjl_cmd = f"\x1b%-12345X@PJL\r\n{cmd}\r\n\x1b%-12345X"
os.write(fd, pjl_cmd.encode())
deadline = time.time() + timeout_sec
response = _wait_for_pjl_response(fd, flags, deadline)
# Restore blocking mode
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
return response.decode("ascii", errors="replace")
@ -333,19 +341,37 @@ def _retry_pjl_query(
time.sleep(0.5)
def _run_pjl_queries(fd: int, result: USBResult, max_retries: int) -> None:
"""Execute PJL query sequence on an open file descriptor."""
_drain_buffer(fd)
os.write(fd, b"\x1b%-12345X@PJL\r\n\x1b%-12345X")
time.sleep(0.5)
_drain_buffer(fd)
_retry_pjl_query(fd, "@PJL INFO STATUS", _parse_status, result, max_retries)
_drain_buffer(fd)
time.sleep(0.5)
_retry_pjl_query(fd, "@PJL INFO VARIABLES", _parse_variables, result, max_retries)
def _init_usb_result(dev_path: str) -> USBResult:
"""Create a USBResult with device info from CUPS."""
cups_info = get_printer_info_from_cups()
return USBResult(
device=dev_path,
product=cups_info.get("product") or "Brother Laser Printer",
serial=cups_info.get("serial", ""),
)
def query_usb_pjl(max_retries: int = 2) -> USBResult:
"""Query a Brother printer via PJL over /dev/usb/lp*."""
dev_path = find_usb_printer_dev()
if not dev_path:
return USBResult(error="No USB printer device found at /dev/usb/lp*")
cups_info = get_printer_info_from_cups()
result = USBResult(
device=dev_path,
product=cups_info.get("product") or "Brother Laser Printer",
serial=cups_info.get("serial", ""),
)
result = _init_usb_result(dev_path)
if not os.access(dev_path, os.R_OK | os.W_OK):
result.error = f"Permission denied: {dev_path}. Run with sudo."
return result
@ -354,34 +380,25 @@ def query_usb_pjl(max_retries: int = 2) -> USBResult:
try:
fd = os.open(dev_path, os.O_RDWR)
fcntl.fcntl(fd, fcntl.F_GETFL)
# Drain any stale data in the USB buffer
_drain_buffer(fd)
# Wake-up: send a bare UEL to get the printer's attention
os.write(fd, b"\x1b%-12345X@PJL\r\n\x1b%-12345X")
time.sleep(0.5)
_drain_buffer(fd)
_retry_pjl_query(fd, "@PJL INFO STATUS", _parse_status, result, max_retries)
_drain_buffer(fd)
time.sleep(0.5)
_retry_pjl_query(
fd, "@PJL INFO VARIABLES", _parse_variables, result, max_retries
)
_run_pjl_queries(fd, result, max_retries)
except OSError as e:
result.error = str(e)
finally:
if fd is not None:
os.close(fd)
return result
# ── SNMP network query ──────────────────────────────────────────────
def _snmpwalk_cmd(
path: str, community: str, timeout: int, ip: str, oid: str
) -> list[str]:
"""Build the snmpwalk command arguments."""
return [path, "-v", "2c", "-c", community, "-t", str(timeout), "-OQvs", ip, oid]
def snmp_walk(ip: str, oid: str, community: str, timeout: int) -> list[str]:
"""Run snmpwalk and return cleaned values."""
snmpwalk_path = shutil.which("snmpwalk")
@ -389,18 +406,7 @@ def snmp_walk(ip: str, oid: str, community: str, timeout: int) -> list[str]:
return []
try:
r = subprocess.run(
[
snmpwalk_path,
"-v",
"2c",
"-c",
community,
"-t",
str(timeout),
"-OQvs",
ip,
oid,
],
_snmpwalk_cmd(snmpwalk_path, community, timeout, ip, oid),
capture_output=True,
text=True,
timeout=15,
@ -415,37 +421,38 @@ def snmp_walk(ip: str, oid: str, community: str, timeout: int) -> list[str]:
return []
def query_network_snmp(ip: str) -> NetworkResult:
"""Query a Brother printer via SNMP over the network."""
community = "public"
timeout = 5
def _snmpget_cmd(
path: str, community: str, timeout: int, ip: str, oid: str
) -> list[str]:
"""Build the snmpget command arguments."""
return [path, "-v", "2c", "-c", community, "-t", str(timeout), ip, oid]
# Quick connectivity check
def _check_snmp_connectivity(ip: str, community: str, timeout: int) -> str | None:
"""Verify SNMP connectivity. Returns error message or None on success."""
snmpget_path = shutil.which("snmpget")
if not snmpget_path:
return NetworkResult(
ip=ip,
error="snmpget not found. Install: sudo pacman -S net-snmp",
)
return "snmpget not found. Install: sudo pacman -S net-snmp"
try:
subprocess.run(
[
_snmpget_cmd(
snmpget_path,
"-v",
"2c",
"-c",
community,
"-t",
str(timeout),
timeout,
ip,
"1.3.6.1.2.1.43.11.1.1.6.1.1",
],
),
capture_output=True,
timeout=10,
check=True,
)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, OSError):
return NetworkResult(ip=ip, error=f"Cannot reach printer at {ip} via SNMP.")
return f"Cannot reach printer at {ip} via SNMP."
return None
def _build_network_result(ip: str, community: str, timeout: int) -> NetworkResult:
"""Collect all SNMP data into a NetworkResult."""
def walk(oid: str) -> list[str]:
return snmp_walk(ip, oid, community, timeout)
@ -464,6 +471,16 @@ def query_network_snmp(ip: str) -> NetworkResult:
)
def query_network_snmp(ip: str) -> NetworkResult:
"""Query a Brother printer via SNMP over the network."""
community = "public"
timeout = 5
error = _check_snmp_connectivity(ip, community, timeout)
if error:
return NetworkResult(ip=ip, error=error)
return _build_network_result(ip, community, timeout)
# ── Status code lookup ──────────────────────────────────────────────
@ -528,6 +545,49 @@ def _display_usb_device_info(result: USBResult) -> None:
_out(f"{BOLD}Toner Save:{RESET} OFF")
_SEVERITY_ICONS: dict[str, str] = {
"ok": "",
"info": "i",
"warn": "",
"critical": "",
}
_SEVERITY_COLORS: dict[str, str] = {
"ok": GREEN,
"info": CYAN,
"warn": YELLOW,
"critical": RED,
}
_SEVERITY_SUMMARIES: dict[str, str] = {
"ok": f"{GREEN}{BOLD}✓ Printer is healthy. No replacements needed.{RESET}",
"info": f"{CYAN}{BOLD}i Printer is busy/processing."
f" No replacements needed.{RESET}",
"warn": f"{YELLOW}{BOLD}⚡ WARNING: Maintenance will be needed"
f" soon.{RESET}\n{YELLOW} Order replacement parts"
f" now to avoid interruption.{RESET}",
"critical": f"{RED}{BOLD}⚠ ACTION REQUIRED: Replacement or fix"
f" needed now!{RESET}",
}
def _format_status_detail(
severity: str, short_text: str, action: str, result: USBResult
) -> None:
"""Print severity icon, display text, and action."""
color = _SEVERITY_COLORS.get(severity, GREEN)
icon = _SEVERITY_ICONS.get(severity, "")
_out(f" {color}{BOLD}{icon} {short_text}{RESET}")
if result.display and result.display != short_text:
_out(f" {DIM}Display: {result.display}{RESET}")
_out(f" {DIM}Status code: {result.status_code}{RESET}")
if action:
_out()
_out(f" {color}{BOLD}Action:{RESET} {color}{action}{RESET}")
_out()
_out(_SEVERITY_SUMMARIES.get(severity, ""))
def _display_pjl_status(result: USBResult) -> None:
"""Display PJL status code interpretation."""
_out()
@ -541,34 +601,7 @@ def _display_pjl_status(result: USBResult) -> None:
return
severity, short_text, action = get_status_info(result.status_code)
icons = {"ok": "", "info": "i", "warn": "", "critical": ""}
colors = {"ok": GREEN, "info": CYAN, "warn": YELLOW, "critical": RED}
color = colors.get(severity, GREEN)
icon = icons.get(severity, "")
_out(f" {color}{BOLD}{icon} {short_text}{RESET}")
if result.display and result.display != short_text:
_out(f" {DIM}Display: {result.display}{RESET}")
_out(f" {DIM}Status code: {result.status_code}{RESET}")
if action:
_out()
_out(f" {color}{BOLD}Action:{RESET} {color}{action}{RESET}")
_out()
summaries = {
"ok": f"{GREEN}{BOLD}✓ Printer is healthy." f" No replacements needed.{RESET}",
"info": f"{CYAN}{BOLD}i Printer is busy/processing."
f" No replacements needed.{RESET}",
"warn": f"{YELLOW}{BOLD}⚡ WARNING: Maintenance will be needed"
f" soon.{RESET}\n{YELLOW} Order replacement parts"
f" now to avoid interruption.{RESET}",
"critical": f"{RED}{BOLD}⚠ ACTION REQUIRED: Replacement or fix"
f" needed now!{RESET}",
}
_out(summaries.get(severity, ""))
_format_status_detail(severity, short_text, action, result)
# ── Display: USB results ────────────────────────────────────────────
@ -601,44 +634,46 @@ class _SupplyStatus:
needs_replacement: bool
def _classify_percentage_level(desc: str, pct: int) -> tuple[int, str, str, str, bool]:
"""Classify a supply by its calculated percentage."""
if pct <= SUPPLY_LOW_PCT:
return pct, f"{pct}%", RED, f"{desc} at {pct}%.", True
if pct <= SUPPLY_WARN_PCT:
return pct, f"{pct}%", YELLOW, f"{desc} at {pct}% -- order soon.", False
return pct, f"{pct}%", GREEN, "", False
def _classify_supply_level(
desc: str, max_val: int, level: int
) -> tuple[int, str, str, str, bool]:
"""Classify a supply level. Returns (pct, status, color, warning, replace)."""
if level == SNMP_LEVEL_OK:
return -1, "OK", GREEN, "", False
if level == SNMP_LEVEL_LOW:
return -1, "LOW", RED, f"{desc} is LOW.", True
if level == 0:
return 0, "EMPTY", RED, f"{desc} is EMPTY -- replace now!", True
if max_val > 0:
pct = min(level * 100 // max_val, 100)
return _classify_percentage_level(desc, pct)
return -1, "", GREEN, "", False
def _format_supply_bar(pct: int) -> str:
"""Build a progress bar string for a supply percentage."""
if pct < 0:
return ""
filled = pct * PROGRESS_BAR_WIDTH // 100
empty = PROGRESS_BAR_WIDTH - filled
return f"[{'' * filled}{'' * empty}]"
def _process_supply_item(desc: str, max_val: int, level: int) -> _SupplyStatus:
"""Process a single supply item into display info."""
pct = -1
status_text = ""
color = GREEN
warning = ""
needs_replacement = False
if level == SNMP_LEVEL_OK:
status_text = "OK"
elif level == SNMP_LEVEL_LOW:
status_text = "LOW"
color = RED
needs_replacement = True
warning = f"{desc} is LOW."
elif level == 0:
status_text = "EMPTY"
color = RED
pct = 0
needs_replacement = True
warning = f"{desc} is EMPTY -- replace now!"
elif max_val > 0:
pct = min(level * 100 // max_val, 100)
status_text = f"{pct}%"
if pct <= SUPPLY_LOW_PCT:
color = RED
needs_replacement = True
warning = f"{desc} at {pct}%."
elif pct <= SUPPLY_WARN_PCT:
color = YELLOW
warning = f"{desc} at {pct}% -- order soon."
bar = ""
if pct >= 0:
filled = pct * PROGRESS_BAR_WIDTH // 100
empty = PROGRESS_BAR_WIDTH - filled
bar = f"[{'' * filled}{'' * empty}]"
pct, status_text, color, warning, needs_replacement = _classify_supply_level(
desc, max_val, level
)
bar = _format_supply_bar(pct)
return _SupplyStatus(color, bar, status_text, warning, needs_replacement)
@ -657,6 +692,28 @@ def _display_supply_warnings(*, needs_replacement: bool, warnings: list[str]) ->
_out(f"{GREEN}{BOLD}✓ All consumables are at healthy levels.{RESET}")
def _parse_supply_value(values: list[str], index: int) -> int:
"""Safely parse an integer from a supply value list."""
try:
return int(values[index])
except (IndexError, ValueError):
return 0
def _collect_supply_items(
result: NetworkResult,
) -> tuple[list[_SupplyStatus], list[str]]:
"""Parse and collect supply items with their descriptions."""
items: list[_SupplyStatus] = []
descs: list[str] = []
for i, desc in enumerate(result.supply_descriptions):
max_val = _parse_supply_value(result.supply_max, i)
level = _parse_supply_value(result.supply_levels, i)
items.append(_process_supply_item(desc, max_val, level))
descs.append(desc)
return items, descs
def _display_supply_levels(result: NetworkResult) -> None:
"""Display consumable supply levels section."""
_out()
@ -665,18 +722,9 @@ def _display_supply_levels(result: NetworkResult) -> None:
needs_replacement = False
warnings: list[str] = []
items, descs = _collect_supply_items(result)
for i, desc in enumerate(result.supply_descriptions):
try:
max_val = int(result.supply_max[i])
except (IndexError, ValueError):
max_val = 0
try:
level = int(result.supply_levels[i])
except (IndexError, ValueError):
level = 0
item = _process_supply_item(desc, max_val, level)
for desc, item in zip(descs, items, strict=True):
_out(
f" {BOLD}{desc:<25}{RESET}"
f" {item.color}{item.bar} {item.status_text}{RESET}"
@ -749,58 +797,58 @@ def _discover_network_printer() -> str:
return ""
def main(argv: list[str] | None = None) -> None:
"""Entry point: auto-detect USB or network Brother printer."""
args = argv if argv is not None else sys.argv[1:]
printer_ip = args[0] if args else ""
def _run_network_mode(printer_ip: str) -> None:
"""Handle explicit network/SNMP mode."""
if not shutil.which("snmpwalk"):
_out(f"{RED}snmpwalk not found. Install: sudo pacman -S net-snmp{RESET}")
sys.exit(1)
_out(f"{CYAN}Querying printer at {printer_ip} via SNMP...{RESET}")
display_network_results(query_network_snmp(printer_ip))
# ── Network mode (explicit IP given) ─────────────────────────
if printer_ip:
if not shutil.which("snmpwalk"):
_out(
f"{RED}snmpwalk not found." f" Install: sudo pacman -S net-snmp{RESET}"
)
sys.exit(1)
_out(f"{CYAN}Querying printer at {printer_ip} via SNMP...{RESET}")
net_result = query_network_snmp(printer_ip)
display_network_results(net_result)
return
# ── Auto-detect: USB first, then network ─────────────────────
usb_line = find_brother_usb()
def _run_usb_mode(usb_line: str) -> None:
"""Handle USB printer mode."""
_out(f"{CYAN}Found Brother printer on USB: {usb_line}{RESET}")
if os.geteuid() != 0:
_out(f"{RED}Root access required for USB printer." f" Re-run with sudo.{RESET}")
sys.exit(1)
display_usb_results(query_usb_pjl())
if usb_line:
_out(f"{CYAN}Found Brother printer on USB: {usb_line}{RESET}")
if os.geteuid() != 0:
_out(
f"{RED}Root access required for USB printer."
f" Re-run with sudo.{RESET}"
)
sys.exit(1)
usb_result = query_usb_pjl()
display_usb_results(usb_result)
return
# ── Try network discovery via CUPS ───────────────────────────
network_ip = _discover_network_printer()
if network_ip and shutil.which("snmpwalk"):
_out(f"{CYAN}Found network printer at {network_ip}{RESET}")
net_result = query_network_snmp(network_ip)
display_network_results(net_result)
return
def _no_printer_found() -> None:
"""Print error message when no printer is detected."""
_out(f"{RED}No Brother printer found.{RESET}")
_out()
_out("Ensure the printer is:")
_out(" Powered on")
_out(" Connected via USB or on the same network")
_out(" \u2022 Powered on")
_out(" \u2022 Connected via USB or on the same network")
_out()
_out("Usage: python3 -m brother_printer [printer_ip]")
sys.exit(1)
def main(argv: list[str] | None = None) -> None:
"""Entry point: auto-detect USB or network Brother printer."""
args = argv if argv is not None else sys.argv[1:]
printer_ip = args[0] if args else ""
if printer_ip:
_run_network_mode(printer_ip)
return
usb_line = find_brother_usb()
if usb_line:
_run_usb_mode(usb_line)
return
network_ip = _discover_network_printer()
if network_ip and shutil.which("snmpwalk"):
_out(f"{CYAN}Found network printer at {network_ip}{RESET}")
display_network_results(query_network_snmp(network_ip))
return
_no_printer_found()
if __name__ == "__main__":
main()