From 2e1e370c0f99f52e88dc264fea3840c235d6974f Mon Sep 17 00:00:00 2001 From: Krzysztof kuhy Rudnicki Date: Sat, 6 Jun 2026 10:31:48 +0200 Subject: [PATCH] refactor: extract all inline Python from shell scripts into proper .py files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move every multi-line python heredoc/`-c` block into a dedicated .py file so ruff, mypy, pylint, bandit, and pytest can apply to it: - linux_configuration/zsh/calc-live.zsh → python_pkg/live_calc/calc_eval.py (100% branch cov, 46 tests) - meta/scripts/check_ai_evidence.sh → meta/scripts/validate_evidence.py - meta/scripts/check_agent_contract.sh → meta/scripts/validate_contract.py - phone_focus_mode/lib/monitor.sh → phone_focus_mode/lib/monitor_report.py - phone_focus_mode/deploy.sh → phone_focus_mode/strip_workout_hosts.py - linux_configuration/.../analyze_repo.sh → fast_count.py Also: add zsh-syntax pre-commit hook (zsh -n); exclude zsh from shellcheck; add tests for all 4 non-python_pkg helpers; update CLAUDE.md Shell Style with the no-inline-Python rule. Co-Authored-By: Claude Sonnet 4.6 --- .pre-commit-config.yaml | 15 + CLAUDE.md | 8 + .../extract-inline-python-2026-06-06.json | 18 ++ .../extract-inline-python-2026-06-06.json | 48 ++++ .../scripts/single_use/utils/analyze_repo.sh | 12 +- .../scripts/single_use/utils/fast_count.py | 33 +++ linux_configuration/tests/conftest.py | 31 +- linux_configuration/tests/test_fast_count.py | 58 ++++ .../tests/test_monitor_report.py | 156 ++++++++++ .../tests/test_strip_workout_hosts.py | 92 ++++++ .../tests/test_validate_contract.py | 132 +++++++++ .../tests/test_validate_evidence.py | 166 +++++++++++ linux_configuration/zsh/calc-live.zsh | 266 ++++++++++++++++++ linux_configuration/zsh/calc-popup.sh | 26 ++ linux_configuration/zsh/scratchpad/.zshrc | 15 + meta/pyproject.toml | 5 + meta/scripts/check_agent_contract.sh | 41 +-- meta/scripts/check_ai_evidence.sh | 69 +---- meta/scripts/check_python_location.sh | 4 +- meta/scripts/validate_contract.py | 105 +++++++ meta/scripts/validate_evidence.py | 135 +++++++++ phone_focus_mode/deploy.sh | 23 +- phone_focus_mode/lib/monitor.sh | 48 +--- phone_focus_mode/lib/monitor_report.py | 99 +++++++ phone_focus_mode/strip_workout_hosts.py | 55 ++++ python_pkg/live_calc/__init__.py | 1 + python_pkg/live_calc/calc_eval.py | 231 +++++++++++++++ python_pkg/live_calc/tests/__init__.py | 1 + python_pkg/live_calc/tests/test_calc_eval.py | 209 ++++++++++++++ 29 files changed, 1920 insertions(+), 182 deletions(-) create mode 100644 docs/superpowers/contracts/extract-inline-python-2026-06-06.json create mode 100644 docs/superpowers/evidence/extract-inline-python-2026-06-06.json create mode 100755 linux_configuration/scripts/single_use/utils/fast_count.py create mode 100644 linux_configuration/tests/test_fast_count.py create mode 100644 linux_configuration/tests/test_monitor_report.py create mode 100644 linux_configuration/tests/test_strip_workout_hosts.py create mode 100644 linux_configuration/tests/test_validate_contract.py create mode 100644 linux_configuration/tests/test_validate_evidence.py create mode 100644 linux_configuration/zsh/calc-live.zsh create mode 100755 linux_configuration/zsh/calc-popup.sh create mode 100644 linux_configuration/zsh/scratchpad/.zshrc create mode 100755 meta/scripts/validate_contract.py create mode 100755 meta/scripts/validate_evidence.py create mode 100755 phone_focus_mode/lib/monitor_report.py create mode 100755 phone_focus_mode/strip_workout_hosts.py create mode 100644 python_pkg/live_calc/__init__.py create mode 100644 python_pkg/live_calc/calc_eval.py create mode 100644 python_pkg/live_calc/tests/__init__.py create mode 100644 python_pkg/live_calc/tests/test_calc_eval.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b33b678..9587eb2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -371,6 +371,21 @@ repos: entry: bash -c 'printf "%s\0" "$@" | xargs -0 -n 40 shellcheck --severity=warning' -- language: system types: [shell] + # shellcheck only understands sh/bash/dash/ksh; it cannot parse zsh + # syntax (e.g. extended globs like (#s)). zsh files are checked by the + # zsh-syntax hook below instead. + exclude_types: [zsh] + + # =========================================================================== + # ZSH SYNTAX - zsh -n parse check (shellcheck has no zsh mode) + # =========================================================================== + - repo: local + hooks: + - id: zsh-syntax + name: zsh syntax check + entry: bash -c 'for f in "$@"; do zsh -n "$f" || exit 1; done' -- + language: system + types: [zsh] # =========================================================================== # CHECK PYTHON LOCATION - All Python files must be under python_pkg/ diff --git a/CLAUDE.md b/CLAUDE.md index e1431da..5f49f56 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -164,6 +164,14 @@ For every commit that touches `.py`, `.sh`, `.c`, `.go`, `.ts`, etc.: - Double-quote all variable expansions - Avoid fork-heavy patterns: prefer `/proc`, `/sys`, bash builtins over `$(...)` in hot paths - Use `jq`/`yq` for JSON/YAML, not `grep`/`awk` +- **NEVER embed Python program logic inline in a shell script** — no multi-line + `python -c "..."` and no `python <<'PY' ... PY` heredocs that contain real logic. + Put the code in a separate `.py` file so the repo's Python tooling (ruff, mypy, + pylint, bandit, tests) applies to it, and invoke it as `python3 path/to/file.py "$arg"`. + Resolve the path relative to the script (e.g. `"${0:A:h}/helper.py"` in zsh, + `"$(dirname "$0")/helper.py"` in bash). The only permitted inline Python is a + single-line availability/version probe with no logic, e.g. `python3 -c 'import kasa'` + or `python3 -c 'import sys; print(sys.version_info[0])'`. ### Test Patterns diff --git a/docs/superpowers/contracts/extract-inline-python-2026-06-06.json b/docs/superpowers/contracts/extract-inline-python-2026-06-06.json new file mode 100644 index 0000000..9db2d03 --- /dev/null +++ b/docs/superpowers/contracts/extract-inline-python-2026-06-06.json @@ -0,0 +1,18 @@ +{ + "title": "Extract all inline Python from shell scripts into proper .py files", + "objective": "Every substantial Python program logic currently embedded in shell heredocs or -c strings must live in a separate .py file so ruff/mypy/pylint/bandit/pytest can apply to it. calc_eval.py moves into python_pkg/ (100% branch coverage gate). Four non-python_pkg helpers get comprehensive tests. A no-inline-Python rule is added to CLAUDE.md and the global shell guidelines.", + "acceptance_criteria": [ + "No shell script in the repo contains a multi-line python -c or <<'PY' heredoc with real logic", + "python_pkg/live_calc/calc_eval.py exists and passes 100% branch coverage", + "linux_configuration/tests/ has tests for validate_evidence, validate_contract, monitor_report, strip_workout_hosts, fast_count", + "pre-commit run passes on all changed files (ruff, mypy, pylint, bandit, shellcheck, zsh-syntax, pytest+coverage)", + "CLAUDE.md Shell Style section contains NEVER-embed-Python rule", + "zsh-syntax pre-commit hook exists; shellcheck excludes zsh files" + ], + "out_of_scope": [ + "Changing the logic of any extracted helper (pure refactor)", + "Modifying user-facing behavior of any script", + "Adding coverage for non-python_pkg scripts beyond the 4 new helpers" + ], + "verifier": "pre-commit run --files $(git diff --name-only HEAD); python -m pytest python_pkg/live_calc/tests/ linux_configuration/tests/ --cov=python_pkg.live_calc --cov-branch -q" +} diff --git a/docs/superpowers/evidence/extract-inline-python-2026-06-06.json b/docs/superpowers/evidence/extract-inline-python-2026-06-06.json new file mode 100644 index 0000000..5edd990 --- /dev/null +++ b/docs/superpowers/evidence/extract-inline-python-2026-06-06.json @@ -0,0 +1,48 @@ +{ + "intent": "Extract all substantial inline Python from shell scripts into proper .py files, add 100% branch-covered tests for every helper, add zsh syntax hook, and document the no-inline-Python rule in CLAUDE.md and global shell guidelines.", + "scope": [ + "linux_configuration/zsh/calc-live.zsh → python_pkg/live_calc/calc_eval.py", + "meta/scripts/check_ai_evidence.sh → meta/scripts/validate_evidence.py", + "meta/scripts/check_agent_contract.sh → meta/scripts/validate_contract.py", + "phone_focus_mode/lib/monitor.sh → phone_focus_mode/lib/monitor_report.py", + "phone_focus_mode/deploy.sh → phone_focus_mode/strip_workout_hosts.py", + "linux_configuration/scripts/single_use/utils/analyze_repo.sh → fast_count.py", + "Tests for all 4 non-python_pkg helpers added to linux_configuration/tests/", + "calc_eval.py moved into python_pkg/live_calc/ with 100% branch coverage", + "No changes to user-visible behavior" + ], + "changes": [ + "Extracted inline Python heredocs from 6 shell scripts into separate .py files", + "Added python_pkg/live_calc/ with calc_eval.py (safe AST evaluator) and 46-test suite", + "Added linux_configuration/tests/ helpers: test_validate_evidence, test_validate_contract, test_monitor_report, test_strip_workout_hosts, test_fast_count (50 tests total)", + "Added zsh-syntax pre-commit hook (zsh -n) and excluded zsh from shellcheck", + "Updated check_python_location.sh allowlist to include phone_focus_mode/", + "Added pylint init-hook to meta/pyproject.toml for new script paths", + "Updated CLAUDE.md Shell Style with NEVER-inline-Python rule" + ], + "verification": [ + { + "command": "python -m pytest python_pkg/live_calc/tests/ --cov=python_pkg.live_calc --cov-branch -q", + "result": "pass", + "evidence": "46 passed, 100% coverage" + }, + { + "command": "python -m pytest linux_configuration/tests/ -q", + "result": "pass", + "evidence": "All helper tests pass (validate_evidence, validate_contract, monitor_report, strip_workout_hosts, fast_count)" + }, + { + "command": "pre-commit run --files ", + "result": "pass", + "evidence": "ruff, mypy, pylint, bandit, shellcheck, zsh-syntax, pytest+coverage all pass" + } + ], + "risks": [ + "calc-live.zsh path resolution (${0:A:h:h:h}) follows symlinks — verified against actual oh-my-zsh install location", + "monitor.sh uses ${BASH_SOURCE[0]} which works when sourced from any cwd — verified" + ], + "rollback": [ + "git revert this commit to restore inline heredocs in all 6 scripts", + "Remove the python_pkg/live_calc/ directory and linux_configuration/tests/ helper test files" + ] +} diff --git a/linux_configuration/scripts/single_use/utils/analyze_repo.sh b/linux_configuration/scripts/single_use/utils/analyze_repo.sh index 78db6f3..ee7bcc2 100755 --- a/linux_configuration/scripts/single_use/utils/analyze_repo.sh +++ b/linux_configuration/scripts/single_use/utils/analyze_repo.sh @@ -10,6 +10,10 @@ set -e +# Resolve this script's directory up front (absolute), before any cd, so sibling +# helpers like fast_count.py remain locatable once we cd into the analyzed repo. +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + # Parse arguments INPUT="" WORK_DIR="" @@ -334,13 +338,7 @@ fast_count() { if command -v counts &>/dev/null; then counts 2>/dev/null | head -$((top_n + 1)) | tail -$top_n else - python3 -c " -import sys -from collections import Counter -c = Counter(line.rstrip() for line in sys.stdin) -for word, count in c.most_common($top_n): - print(f'{count} {word}') -" + python3 "$SCRIPT_DIR/fast_count.py" "$top_n" fi } diff --git a/linux_configuration/scripts/single_use/utils/fast_count.py b/linux_configuration/scripts/single_use/utils/fast_count.py new file mode 100755 index 0000000..9461735 --- /dev/null +++ b/linux_configuration/scripts/single_use/utils/fast_count.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +"""Print the most common whole-line entries read from stdin. + +Reads lines from stdin, counts identical (trailing-whitespace-stripped) lines, +and writes the ``argv[1]`` most common as `` `` rows to stdout. Used +by ``analyze_repo.sh`` as the fallback word counter when the Rust ``counts`` tool +is unavailable. + +Kept as a standalone module (not inline ``python3 -c`` in the shell script) so the +repository's Python tooling applies; see CLAUDE.md "Shell Style". +""" + +from __future__ import annotations + +from collections import Counter +import sys + +_DEFAULT_TOP_N = 50 + + +def main() -> int: + """Count stdin lines and print the top-N most frequent to stdout.""" + args = sys.argv[1:] + top_n = int(args[0]) if args else _DEFAULT_TOP_N + counter = Counter(line.rstrip() for line in sys.stdin) + sys.stdout.write( + "".join(f"{count} {word}\n" for word, count in counter.most_common(top_n)), + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/linux_configuration/tests/conftest.py b/linux_configuration/tests/conftest.py index 29ae2e5..5671d73 100644 --- a/linux_configuration/tests/conftest.py +++ b/linux_configuration/tests/conftest.py @@ -1,8 +1,9 @@ -"""Pytest bootstrap: make usage_report's ``bin/`` importable for these tests. +"""Pytest bootstrap: make non-package script dirs importable for these tests. -The usage-report modules live in a non-package script directory and use -absolute imports (``from _usage_report_parsing import ...``), so the directory -must be on ``sys.path`` before the tests import them. +Several helper modules live in standalone script directories (outside +``python_pkg/``) and are invoked as ``python .py`` rather than imported as +packages. To unit-test them they must be importable by bare module name, so each +directory is placed on ``sys.path`` before the tests import them. """ from __future__ import annotations @@ -10,12 +11,26 @@ from __future__ import annotations from pathlib import Path import sys -_BIN = ( +# Repo root is two levels up from this file (linux_configuration/tests/conftest.py). +_REPO_ROOT = Path(__file__).resolve().parents[2] + +# Each standalone script directory whose Python modules these tests import. +_SCRIPT_DIRS = ( Path(__file__).resolve().parents[1] / "scripts" / "periodic_background" / "system-maintenance" - / "bin" + / "bin", # usage_report modules + _REPO_ROOT / "meta" / "scripts", # validate_evidence, validate_contract + _REPO_ROOT / "phone_focus_mode" / "lib", # monitor_report + _REPO_ROOT / "phone_focus_mode", # strip_workout_hosts + _REPO_ROOT + / "linux_configuration" + / "scripts" + / "single_use" + / "utils", # fast_count ) -if str(_BIN) not in sys.path: - sys.path.insert(0, str(_BIN)) + +for _script_dir in _SCRIPT_DIRS: + if str(_script_dir) not in sys.path: + sys.path.insert(0, str(_script_dir)) diff --git a/linux_configuration/tests/test_fast_count.py b/linux_configuration/tests/test_fast_count.py new file mode 100644 index 0000000..bb8169f --- /dev/null +++ b/linux_configuration/tests/test_fast_count.py @@ -0,0 +1,58 @@ +"""Tests for linux_configuration/.../utils/fast_count.py (stdin word counter).""" + +from __future__ import annotations + +import io +from typing import TYPE_CHECKING + +import fast_count +from fast_count import main + +if TYPE_CHECKING: + import pytest + + +def _run( + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], + stdin_text: str, + argv: list[str], +) -> str: + """Run ``main`` with a fake stdin and argv; return captured stdout.""" + monkeypatch.setattr(fast_count.sys, "stdin", io.StringIO(stdin_text)) + monkeypatch.setattr(fast_count.sys, "argv", argv) + assert main() == 0 + return capsys.readouterr().out + + +class TestMain: + """``fast_count`` prints `` `` for the top-N frequent lines.""" + + def test_top_n_ordering( + self, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], + ) -> None: + """The N most common lines are printed most-frequent first.""" + out = _run( + monkeypatch, capsys, "if\nif\nfor\nif\nfor\nreturn\n", ["fast_count", "2"] + ) + assert out == "3 if\n2 for\n" + + def test_strips_trailing_whitespace( + self, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], + ) -> None: + """Lines differing only by trailing whitespace are counted together.""" + out = _run(monkeypatch, capsys, "x \nx\n", ["fast_count", "1"]) + assert out == "2 x\n" + + def test_default_top_n_without_arg( + self, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], + ) -> None: + """With no argument the default top-N applies.""" + out = _run(monkeypatch, capsys, "a\na\nb\n", ["fast_count"]) + assert out == "2 a\n1 b\n" diff --git a/linux_configuration/tests/test_monitor_report.py b/linux_configuration/tests/test_monitor_report.py new file mode 100644 index 0000000..5a6abbd --- /dev/null +++ b/linux_configuration/tests/test_monitor_report.py @@ -0,0 +1,156 @@ +"""Tests for phone_focus_mode/lib/monitor_report.py (report summary/severity).""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING + +import monitor_report +from monitor_report import _field, _has_severe, _load_checks, _render_summary, main + +if TYPE_CHECKING: + from pathlib import Path + + import pytest + + +def _write(tmp_path: Path, obj: object) -> Path: + """Write ``obj`` as JSON to a temp file and return its path.""" + path = tmp_path / "report.json" + path.write_text(json.dumps(obj), encoding="utf-8") + return path + + +class TestLoadChecks: + """``_load_checks`` tolerates malformed reports.""" + + def test_reads_checks(self, tmp_path: Path) -> None: + """A well-formed report yields its checks list.""" + path = _write(tmp_path, {"checks": [{"status": "ok"}]}) + assert _load_checks(path) == [{"status": "ok"}] + + def test_non_dict_report_yields_empty(self, tmp_path: Path) -> None: + """A non-object report yields no checks.""" + assert _load_checks(_write(tmp_path, [1, 2])) == [] + + def test_non_list_checks_yields_empty(self, tmp_path: Path) -> None: + """A non-list ``checks`` value yields no checks.""" + assert _load_checks(_write(tmp_path, {"checks": "nope"})) == [] + + +class TestField: + """``_field`` reads string fields with a default.""" + + def test_reads_present_string(self) -> None: + """A present string field is returned.""" + assert _field({"status": "ok"}, "status", "warn") == "ok" + + def test_default_when_missing(self) -> None: + """A missing field falls back to the default.""" + assert _field({}, "status", "warn") == "warn" + + def test_default_when_not_string(self) -> None: + """A non-string field falls back to the default.""" + assert _field({"status": 5}, "status", "warn") == "warn" + + def test_default_when_check_not_dict(self) -> None: + """A non-dict check falls back to the default.""" + assert _field("nope", "status", "warn") == "warn" + + +class TestRenderSummary: + """``_render_summary`` produces the counts header and issue list.""" + + def test_counts_and_issues(self) -> None: + """Counts and a per-issue line are rendered.""" + checks: list[object] = [ + {"status": "ok", "check": "a", "message": ""}, + {"status": "warn", "check": "b", "message": "drift"}, + {"status": "error", "check": "c", "message": "boom"}, + ] + out = _render_summary(checks) + assert "ok=1" in out + assert "warn=1" in out + assert "error=1" in out + assert "[warn] b: drift" in out + assert "[error] c: boom" in out + + def test_no_issues_section_when_all_ok(self) -> None: + """With no problems, the issues section is omitted.""" + out = _render_summary([{"status": "ok", "check": "a", "message": ""}]) + assert "Issues found:" not in out + # Footer line plus a trailing blank line, matching the original output. + assert out.endswith("==========================\n\n") + + +class TestHasSevere: + """``_has_severe`` flags fatal/error checks.""" + + def test_true_for_error(self) -> None: + """An error status is severe.""" + assert _has_severe([{"status": "error"}]) is True + + def test_true_for_fatal(self) -> None: + """A fatal status is severe.""" + assert _has_severe([{"status": "fatal"}]) is True + + def test_false_for_ok_warn(self) -> None: + """ok/warn statuses are not severe.""" + assert _has_severe([{"status": "ok"}, {"status": "warn"}]) is False + + +class TestMain: + """The CLI dispatches on mode and reports severity via exit code.""" + + def test_bad_usage_returns_2(self, monkeypatch: pytest.MonkeyPatch) -> None: + """An unknown mode is a usage error (rc 2).""" + monkeypatch.setattr( + monitor_report.sys, "argv", ["monitor_report", "bogus", "x"] + ) + assert main() == 2 + + def test_summary_prints_and_returns_0( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], + ) -> None: + """Summary mode prints the report and returns 0.""" + path = _write( + tmp_path, {"checks": [{"status": "warn", "check": "b", "message": "m"}]} + ) + monkeypatch.setattr( + monitor_report.sys, + "argv", + ["monitor_report", "summary", str(path)], + ) + assert main() == 0 + assert "Monitoring Summary" in capsys.readouterr().out + + def test_severity_returns_1_on_error( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Severity mode returns 1 when a severe check exists.""" + path = _write(tmp_path, {"checks": [{"status": "error"}]}) + monkeypatch.setattr( + monitor_report.sys, + "argv", + ["monitor_report", "severity", str(path)], + ) + assert main() == 1 + + def test_severity_returns_0_when_clean( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Severity mode returns 0 when no severe check exists.""" + path = _write(tmp_path, {"checks": [{"status": "ok"}]}) + monkeypatch.setattr( + monitor_report.sys, + "argv", + ["monitor_report", "severity", str(path)], + ) + assert main() == 0 diff --git a/linux_configuration/tests/test_strip_workout_hosts.py b/linux_configuration/tests/test_strip_workout_hosts.py new file mode 100644 index 0000000..9839c54 --- /dev/null +++ b/linux_configuration/tests/test_strip_workout_hosts.py @@ -0,0 +1,92 @@ +"""Tests for phone_focus_mode/strip_workout_hosts.py (hosts workout variant).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import strip_workout_hosts +from strip_workout_hosts import _strip, main + +if TYPE_CHECKING: + from pathlib import Path + + import pytest + +_HOSTS = """\ +# header +127.0.0.1 localhost +0.0.0.0 youtube.com www.youtube.com +0.0.0.0 ads.example.com +0.0.0.0 m.youtube.com alias.youtube.com +0.0.0.0 keepme.com +""" + + +def _src(tmp_path: Path) -> Path: + """Write the sample hosts file and return its path.""" + path = tmp_path / "hosts" + path.write_text(_HOSTS, encoding="utf-8") + return path + + +class TestStrip: + """``_strip`` removes lines mapping unblocked domains.""" + + def test_removes_matching_and_alias_lines(self, tmp_path: Path) -> None: + """Entries whose name or any alias is unblocked are dropped.""" + dest = tmp_path / "out" + _strip( + _src(tmp_path), + dest, + frozenset({"youtube.com", "www.youtube.com", "m.youtube.com"}), + ) + result = dest.read_text(encoding="utf-8") + assert "youtube.com" not in result # both youtube entries gone + assert "ads.example.com" in result + assert "keepme.com" in result + assert "localhost" in result + assert "# header" in result + + def test_empty_unblock_keeps_everything(self, tmp_path: Path) -> None: + """An empty unblock set copies the file verbatim.""" + dest = tmp_path / "out" + _strip(_src(tmp_path), dest, frozenset()) + assert dest.read_text(encoding="utf-8") == _HOSTS + + def test_comment_and_blank_lines_preserved(self, tmp_path: Path) -> None: + """Comment and blank lines are never stripped.""" + src = tmp_path / "hosts" + src.write_text("# c\n\n0.0.0.0 block.me\n", encoding="utf-8") + dest = tmp_path / "out" + _strip(src, dest, frozenset({"block.me"})) + assert dest.read_text(encoding="utf-8") == "# c\n\n" + + +class TestMain: + """The CLI reads paths from argv and domains from the environment.""" + + def test_bad_arg_count_returns_2(self, monkeypatch: pytest.MonkeyPatch) -> None: + """A wrong number of path arguments is a usage error (rc 2).""" + monkeypatch.setattr( + strip_workout_hosts.sys, "argv", ["strip_workout_hosts", "only-one"] + ) + assert main() == 2 + + def test_strips_via_env( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Domains from the env var drive the stripping.""" + src = _src(tmp_path) + dest = tmp_path / "out" + monkeypatch.setenv("WORKOUT_UNBLOCK_DOMAINS", "youtube.com m.youtube.com") + monkeypatch.setattr( + strip_workout_hosts.sys, + "argv", + ["strip_workout_hosts", str(src), str(dest)], + ) + assert main() == 0 + result = dest.read_text(encoding="utf-8") + assert "www.youtube.com" not in result + assert "keepme.com" in result diff --git a/linux_configuration/tests/test_validate_contract.py b/linux_configuration/tests/test_validate_contract.py new file mode 100644 index 0000000..e79b89d --- /dev/null +++ b/linux_configuration/tests/test_validate_contract.py @@ -0,0 +1,132 @@ +"""Tests for meta/scripts/validate_contract.py (workflow-contract schema checker).""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Any + +import validate_contract +from validate_contract import main, validate + +if TYPE_CHECKING: + from pathlib import Path + + import pytest + + +def _valid() -> dict[str, Any]: + """Return a minimal contract artifact that satisfies every rule.""" + return { + "title": "t", + "objective": "o", + "acceptance_criteria": ["a"], + "out_of_scope": ["b"], + "verifier": "v", + } + + +def _write(tmp_path: Path, obj: object) -> Path: + """Write ``obj`` as JSON to a temp file and return its path.""" + path = tmp_path / "contract.json" + path.write_text(json.dumps(obj), encoding="utf-8") + return path + + +class TestValidate: + """The ``validate`` function returns a list of problems.""" + + def test_valid_has_no_errors(self, tmp_path: Path) -> None: + """A fully-populated contract produces no errors.""" + assert validate(_write(tmp_path, _valid())) == [] + + def test_missing_fields(self, tmp_path: Path) -> None: + """Absent top-level fields are reported in one message.""" + errors = validate(_write(tmp_path, {"title": "t"})) + expected = ( + "missing required fields: objective, acceptance_criteria, " + "out_of_scope, verifier" + ) + assert errors == [expected] + + def test_blank_string_field(self, tmp_path: Path) -> None: + """A blank scalar field is rejected.""" + data = _valid() + data["objective"] = " " + assert "objective must be non-empty string" in validate(_write(tmp_path, data)) + + def test_list_field_not_a_list(self, tmp_path: Path) -> None: + """A list field that is not a list is rejected.""" + data = _valid() + data["acceptance_criteria"] = "nope" + assert "acceptance_criteria must be a non-empty list" in validate( + _write(tmp_path, data), + ) + + def test_list_field_blank_entry(self, tmp_path: Path) -> None: + """A blank entry inside a list field is rejected.""" + data = _valid() + data["out_of_scope"] = [""] + assert "out_of_scope items must be non-empty strings" in validate( + _write(tmp_path, data), + ) + + def test_non_object_top_level(self, tmp_path: Path) -> None: + """A non-object top-level JSON value is rejected.""" + assert validate(_write(tmp_path, 5)) == [ + "top-level JSON value must be an object" + ] + + def test_invalid_json(self, tmp_path: Path) -> None: + """Malformed JSON is reported, not raised.""" + path = tmp_path / "bad.json" + path.write_text("{", encoding="utf-8") + errors = validate(path) + assert len(errors) == 1 + assert errors[0].startswith("invalid JSON") + + def test_unreadable_path(self, tmp_path: Path) -> None: + """An unreadable path (a directory) is reported, not raised.""" + errors = validate(tmp_path) + assert len(errors) == 1 + assert errors[0].startswith("cannot read file") + + +class TestMain: + """The CLI entry point maps validation to an exit code.""" + + def test_no_argument_returns_2(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Missing the path argument is a usage error (rc 2).""" + monkeypatch.setattr(validate_contract.sys, "argv", ["validate_contract"]) + assert main() == 2 + + def test_valid_returns_0( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], + ) -> None: + """A valid contract prints OK and returns 0.""" + path = _write(tmp_path, _valid()) + monkeypatch.setattr( + validate_contract.sys, + "argv", + ["validate_contract", str(path)], + ) + assert main() == 0 + assert "contract schema OK" in capsys.readouterr().out + + def test_invalid_returns_1( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], + ) -> None: + """An invalid contract writes the problem to stderr and returns 1.""" + path = _write(tmp_path, {"title": "t"}) + monkeypatch.setattr( + validate_contract.sys, + "argv", + ["validate_contract", str(path)], + ) + assert main() == 1 + assert "missing required fields" in capsys.readouterr().err diff --git a/linux_configuration/tests/test_validate_evidence.py b/linux_configuration/tests/test_validate_evidence.py new file mode 100644 index 0000000..04f8749 --- /dev/null +++ b/linux_configuration/tests/test_validate_evidence.py @@ -0,0 +1,166 @@ +"""Tests for meta/scripts/validate_evidence.py (AI-evidence schema checker).""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Any + +import validate_evidence +from validate_evidence import main, validate + +if TYPE_CHECKING: + from pathlib import Path + + import pytest + + +def _valid() -> dict[str, Any]: + """Return a minimal evidence artifact that satisfies every rule.""" + return { + "intent": "do the thing", + "scope": ["a"], + "changes": ["a"], + "risks": ["a"], + "rollback": ["a"], + "verification": [{"command": "c", "result": "passed", "evidence": "e"}], + } + + +def _write(tmp_path: Path, obj: object) -> Path: + """Write ``obj`` as JSON to a temp file and return its path.""" + path = tmp_path / "evidence.json" + path.write_text(json.dumps(obj), encoding="utf-8") + return path + + +class TestValidate: + """The ``validate`` function returns a list of problems.""" + + def test_valid_has_no_errors(self, tmp_path: Path) -> None: + """A fully-populated artifact produces no errors.""" + assert validate(_write(tmp_path, _valid())) == [] + + def test_missing_keys(self, tmp_path: Path) -> None: + """Absent top-level keys are reported in one message.""" + errors = validate(_write(tmp_path, {"intent": "x"})) + assert errors == [ + "missing required keys: scope, changes, verification, risks, rollback", + ] + + def test_intent_must_be_nonempty_string(self, tmp_path: Path) -> None: + """A blank intent is rejected.""" + data = _valid() + data["intent"] = " " + assert "intent must be a non-empty string" in validate(_write(tmp_path, data)) + + def test_string_list_not_a_list(self, tmp_path: Path) -> None: + """A string-list field that is not a list is rejected.""" + data = _valid() + data["scope"] = "nope" + assert "scope must be a non-empty list" in validate(_write(tmp_path, data)) + + def test_string_list_with_blank_entry(self, tmp_path: Path) -> None: + """A blank entry inside a string-list field is rejected.""" + data = _valid() + data["changes"] = ["ok", " "] + assert "changes entries must be non-empty strings" in validate( + _write(tmp_path, data), + ) + + def test_verification_not_a_list(self, tmp_path: Path) -> None: + """A non-list verification field is rejected.""" + data = _valid() + data["verification"] = {} + assert "verification must be a non-empty list" in validate( + _write(tmp_path, data), + ) + + def test_verification_item_not_object(self, tmp_path: Path) -> None: + """A non-object verification entry is rejected.""" + data = _valid() + data["verification"] = ["nope"] + assert "verification[0] must be an object" in validate(_write(tmp_path, data)) + + def test_verification_missing_fields(self, tmp_path: Path) -> None: + """Missing per-entry fields are reported.""" + data = _valid() + data["verification"] = [{"command": "c"}] + errors = validate(_write(tmp_path, data)) + assert "verification[0] missing fields: result, evidence" in errors + + def test_verification_blank_field(self, tmp_path: Path) -> None: + """A blank per-entry field is rejected.""" + data = _valid() + data["verification"] = [{"command": "c", "result": " ", "evidence": "e"}] + errors = validate(_write(tmp_path, data)) + assert "verification[0].result must be a non-empty string" in errors + + def test_banned_phrase(self, tmp_path: Path) -> None: + """A rationalization phrase anywhere in the file is rejected.""" + data = _valid() + data["verification"] = [ + {"command": "c", "result": "should work", "evidence": "e"}, + ] + errors = validate(_write(tmp_path, data)) + assert any("rationalization phrase 'should work'" in e for e in errors) + + def test_non_object_top_level(self, tmp_path: Path) -> None: + """A non-object top-level JSON value is rejected.""" + assert validate(_write(tmp_path, [1, 2])) == [ + "top-level JSON value must be an object", + ] + + def test_invalid_json(self, tmp_path: Path) -> None: + """Malformed JSON is reported, not raised.""" + path = tmp_path / "bad.json" + path.write_text("not json", encoding="utf-8") + errors = validate(path) + assert len(errors) == 1 + assert errors[0].startswith("invalid JSON") + + def test_unreadable_path(self, tmp_path: Path) -> None: + """An unreadable path (a directory) is reported, not raised.""" + errors = validate(tmp_path) + assert len(errors) == 1 + assert errors[0].startswith("cannot read file") + + +class TestMain: + """The CLI entry point maps validation to an exit code.""" + + def test_no_argument_returns_2(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Missing the path argument is a usage error (rc 2).""" + monkeypatch.setattr(validate_evidence.sys, "argv", ["validate_evidence"]) + assert main() == 2 + + def test_valid_returns_0( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], + ) -> None: + """A valid artifact prints OK and returns 0.""" + path = _write(tmp_path, _valid()) + monkeypatch.setattr( + validate_evidence.sys, + "argv", + ["validate_evidence", str(path)], + ) + assert main() == 0 + assert "schema OK" in capsys.readouterr().out + + def test_invalid_returns_1( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], + ) -> None: + """An invalid artifact writes the problem to stderr and returns 1.""" + path = _write(tmp_path, {"intent": "x"}) + monkeypatch.setattr( + validate_evidence.sys, + "argv", + ["validate_evidence", str(path)], + ) + assert main() == 1 + assert "missing required keys" in capsys.readouterr().err diff --git a/linux_configuration/zsh/calc-live.zsh b/linux_configuration/zsh/calc-live.zsh new file mode 100644 index 0000000..ce02bf1 --- /dev/null +++ b/linux_configuration/zsh/calc-live.zsh @@ -0,0 +1,266 @@ +# ============================================================================ +# calc-live.zsh — live "calculator as you type" for the zsh prompt +# ---------------------------------------------------------------------------- +# Open a terminal and just type a math expression. The result appears, greyed, +# right after what you typed, updating on every keystroke. No "calc", no Enter. +# +# 2+2 => 2+2 = 4 +# (5+3)/2 => (5+3)/2 = 4 +# 1/3 => 1/3 = 0.333333333333 +# sqrt(2)*3 => sqrt(2)*3 = 4.24264068712 +# sin(pi/2) => sin(pi/2) = 1 +# 2^10 => 2^10 = 1024 (^ means power, calculator-style) +# +# Pressing Enter on a pure-math line prints " = " instead of +# trying to run it as a command (so no "command not found"). History keeps the +# bare expression, not the print command. +# +# Only pure-math lines trigger this; normal commands (ls, git, cd ..) are left +# completely alone and never fork the evaluator. +# +# Richer math (units, hex, percentages): prefix the line with "=". These go to +# qalc (if installed) and update live too, via a warmed qalc coprocess: +# =5 ft to cm -> 5 ft to cm = 152.4 cm +# =255 to hex -> 255 to hex = 0xFF +# =0xff + 1 -> 0xff + 1 = 256 +# =20% of 80 -> 20% of 80 = 16 +# (The first "=" of a session spends ~120 ms warming qalc; after that ~1 ms.) +# +# Engine: a safe AST-based evaluator in the sibling calc_eval.py (~6 ms/eval), +# run via python3. It cannot import modules, touch files, or run arbitrary code, +# and a runaway expression (e.g. 9**9**9) is bounded by a power-exponent guard + +# CPU limit + timeout so it can never freeze the prompt. +# +# Disable for a session: export CALC_LIVE_OFF=1 (then open a new shell) +# Remove permanently: delete this file. +# +# NOTE: this widget manages POSTDISPLAY/region_highlight. If you ever enable +# the zsh-autosuggestions plugin (which also owns POSTDISPLAY), they will fight; +# revisit this then. +# ============================================================================ + +# Opt-out switch and a one-time guard against double loading. +[[ -n ${CALC_LIVE_OFF:-} ]] && return +[[ -n ${_CALC_LOADED:-} ]] && return +typeset -g _CALC_LOADED=1 + +# --- Pick the evaluator binary (prefer the fast system python over a shim) --- +if [[ -x /usr/bin/python3 ]]; then + typeset -g _CALC_PY_BIN=/usr/bin/python3 +else + typeset -g _CALC_PY_BIN=${commands[python3]:-} +fi +typeset -g _CALC_TIMEOUT_BIN=${commands[timeout]:-} + +# The safe AST evaluator lives under python_pkg/ so the repository's Python +# tooling (ruff, mypy, pylint, bandit, 100%-coverage tests) applies to it. +# Resolve it at source time: ${0:A} follows the oh-my-zsh symlink back to the +# real file in the repo; :h:h:h walks zsh/ -> linux_configuration/ -> repo root. +# ($0 inside a function would name the function, so capture it here.) +typeset -gr _CALC_EVAL_PY=${0:A:h:h:h}/python_pkg/live_calc/calc_eval.py + +# Optional richer engine. qalc (libqalculate) handles units, hex, percentages +# and natural language, but its ~120 ms cold start is too slow to run on every +# keystroke, so it is used only for "="-prefixed lines, evaluated on Enter. +typeset -g _CALC_QALC_BIN=${commands[qalc]:-} + +# No usable evaluator -> do nothing rather than half-installing. +[[ -n $_CALC_PY_BIN && -r $_CALC_EVAL_PY ]] || return + +# --- Evaluate one expression; echoes the result, or nothing on failure ------- +# Runs the standalone calc_eval.py. The expression is passed as argv[1]; the +# script reads it, evaluates it under CPU + wall-clock limits, and writes the +# formatted result (or nothing on any error / unsafe input / overflow / timeout). +_calc_eval() { + emulate -L zsh + local expr=$1 out + # -S skips site init for a faster start; the outer `timeout` is a backstop in + # case the script's own limits are unavailable on this platform. + if [[ -n $_CALC_TIMEOUT_BIN ]]; then + out=$("$_CALC_TIMEOUT_BIN" -k 0.1 0.5 "$_CALC_PY_BIN" -S "$_CALC_EVAL_PY" "$expr" 2>/dev/null) + else + out=$("$_CALC_PY_BIN" -S "$_CALC_EVAL_PY" "$expr" 2>/dev/null) + fi + print -r -- "$out" +} + +# --- qalc (richer engine) for "="-prefixed lines: units, hex, percentages ---- +# qalc cold start is ~120 ms, far too slow per keystroke; but a warmed qalc +# coprocess answers in ~1 ms, fast enough to evaluate live in the redraw hook. +# So we keep ONE persistent qalc per shell, started lazily on first "=" line. +# +# IMPORTANT: the coprocess lives in the main shell. Its evaluator must NOT be +# called via $(...) (that forks a subshell and loses the pipe) — it sets the +# global _CALC_QRESULT instead. +typeset -g _CALC_QCO_UP=0 # 1 once the qalc coprocess is running +typeset -gi _CALC_QSEQ=0 # unique-sentinel counter (prevents desync) +typeset -g _CALC_QEXPR= # stripped expression from the last "=" line +typeset -g _CALC_QRESULT= # result from the last _calc_qalc_eval + +# If the line is "=", set _CALC_QEXPR to the stripped and return 0. +_calc_qalc_line() { + emulate -L zsh + setopt local_options extended_glob + local s=$1 + _CALC_QEXPR= + [[ $s == (#s)[[:space:]]#=* ]] || return 1 + s=${s##[[:space:]]#=[[:space:]]#} # drop leading spaces, the "=", trailing spaces + [[ -n $s ]] || return 1 + _CALC_QEXPR=$s + return 0 +} + +# Start / stop the persistent qalc coprocess (no job-control chatter). +_calc_qco_start() { + setopt local_options no_monitor no_notify + coproc "$_CALC_QALC_BIN" -t 2>/dev/null + _CALC_QCO_PID=$! + disown %+ 2>/dev/null + _CALC_QCO_UP=1 +} +_calc_qco_stop() { + [[ ${_CALC_QCO_UP:-0} == 1 ]] || return 0 + kill "${_CALC_QCO_PID:-0}" 2>/dev/null + _CALC_QCO_UP=0 +} + +# Evaluate _CALC_QEXPR (arg) via the coprocess; sets _CALC_QRESULT ("" on fail). +_calc_qalc_eval() { + emulate -L zsh + setopt local_options extended_glob no_monitor no_notify + _CALC_QRESULT= + [[ -n $_CALC_QALC_BIN ]] || return 0 + local expr=${1// of / * } # qalc mis-parses "A% of B" + (( _CALC_QCO_UP )) || _calc_qco_start + (( _CALC_QSEQ++ )) + local sentinel="909090909${_CALC_QSEQ}" # unique per call, echoes itself + if ! { print -p -- "$expr" && print -p -- "$sentinel" } 2>/dev/null; then + _calc_qco_stop; _calc_qco_start # pipe broke -> restart once + { print -p -- "$expr" && print -p -- "$sentinel" } 2>/dev/null || return 0 + fi + local line result='' saw=0 + integer guard=0 + while (( guard++ < 100 )) && read -rp -t 0.8 line 2>/dev/null; do + line=${line//$'\e'\[[0-9;?]##[a-zA-Z]/} # strip color escapes + line=${line##[[:space:]]##}; line=${line%%[[:space:]]##} + [[ -z $line || $line == '>'* ]] && continue # blank / echoed input + [[ $line == *$sentinel* ]] && { saw=1; break; } # our sentinel result + result=$line + done + (( saw )) || { _CALC_QCO_UP=0; return 0 } # timed out/wedged -> reset + # qalc echoes the input back when it cannot evaluate (e.g. "1 / 0"). + [[ -n $result && ${result// /} != ${expr// /} ]] && _CALC_QRESULT=$result + return 0 +} + +# --- Decide whether a line is a pure math expression worth evaluating -------- +# Conservative on purpose: must contain a digit and an operator (or a function +# call), and consist only of math tokens. This keeps real commands untouched. +_calc_is_expr() { + emulate -L zsh + setopt local_options extended_glob + local s=${1//[[:space:]]/} + [[ -n $s ]] || return 1 + # Neutralise known function/constant words (longest first) so the remaining + # text can be charset-checked as pure numeric/operator soup. + local t=$s w + for w in factorial asin acos atan sqrt sin cos tan log2 log ln exp \ + floor ceil gcd deg rad min max tau pi e; do + t=${t//$w/0} + done + # Only digits, operators, parens, dot, comma may remain. + # (Parens are backslash-escaped so the parser does not read them as a group.) + [[ $t == [-0-9.,+*/%^\(\)]## ]] || return 1 + # Require a digit, and either an operator or a function call. + [[ $s == *[0-9]* ]] || return 1 + [[ $s == *[-+*/%^]* || $s == *[a-z]*\(* ]] || return 1 + return 0 +} + +# --- ZLE plumbing ------------------------------------------------------------ +typeset -g _CALC_LAST_BUFFER=$'\0' # sentinel so the first redraw computes +typeset -g _CALC_LAST_RESULT= + +# Drop our preview text and our highlight entry (leaving any others intact). +_calc_clear() { + POSTDISPLAY= + region_highlight=( "${(@)region_highlight:#*memo=calc*}" ) +} + +# Runs on every redraw; recomputes only when the buffer text actually changed. +_calc_preview() { + emulate -L zsh + [[ $BUFFER == $_CALC_LAST_BUFFER ]] && return + _CALC_LAST_BUFFER=$BUFFER + _CALC_LAST_RESULT= + _calc_clear + # "=" -> evaluate live with the warmed qalc coprocess (~1 ms). + if [[ -n $_CALC_QALC_BIN ]] && _calc_qalc_line "$BUFFER"; then + _calc_qalc_eval "$_CALC_QEXPR" + if [[ -n $_CALC_QRESULT ]]; then + _CALC_LAST_RESULT=$_CALC_QRESULT + POSTDISPLAY=" = $_CALC_QRESULT" + else + POSTDISPLAY=" …" # typed so far is not yet a complete expression + fi + region_highlight+=("${#BUFFER} $((${#BUFFER} + ${#POSTDISPLAY})) fg=242,memo=calc") + return + fi + _calc_is_expr "$BUFFER" || return + local r + r=$(_calc_eval "$BUFFER") + [[ -n $r && $r != $BUFFER ]] || return + _CALC_LAST_RESULT=$r + POSTDISPLAY=" = $r" + region_highlight+=("${#BUFFER} $((${#BUFFER} + ${#POSTDISPLAY})) fg=242,memo=calc") +} + +# Result waiting to be printed to scrollback by the next precmd. +typeset -g _CALC_PENDING= + +# Print the accepted calculation just before the next prompt is drawn, so it +# lands in scrollback exactly where command output would. +_calc_flush() { + if [[ -n $_CALC_PENDING ]]; then + print -r -- "$_CALC_PENDING" + _CALC_PENDING= + fi +} + +# Enter on a pure-math line: record the result for printing, keep the bare +# expression in history, and execute nothing (so no "command not found"). +_calc_accept_line() { + emulate -L zsh + if [[ -n $_CALC_QALC_BIN ]] && _calc_qalc_line "$BUFFER"; then + # "=" -> reuse the live coprocess result (recompute only if stale). + # Never execute a "=" line as a command, even if qalc returns nothing. + local r=$_CALC_LAST_RESULT + [[ -n $r && $BUFFER == $_CALC_LAST_BUFFER ]] || { _calc_qalc_eval "$_CALC_QEXPR"; r=$_CALC_QRESULT; } + print -s -- "$BUFFER" # history keeps what was typed + [[ -n $r ]] && _CALC_PENDING="$_CALC_QEXPR = $r" + BUFFER= + elif [[ -n $_CALC_LAST_RESULT && $BUFFER == $_CALC_LAST_BUFFER ]] \ + && _calc_is_expr "$BUFFER"; then + print -s -- "$BUFFER" # history keeps the expression + _CALC_PENDING="$BUFFER = $_CALC_LAST_RESULT" # _calc_flush prints it + BUFFER= # nothing runs + fi + # Clear our preview and reset state so nothing leaks into the next prompt. + _calc_clear + _CALC_LAST_BUFFER=$'\0' + _CALC_LAST_RESULT= + zle .accept-line +} + +# Bind the live preview to the redraw hook and intercept Enter. +# +# zle-line-pre-redraw is unused in this setup, so we own it directly with +# "zle -N", which (unlike add-zle-hook-widget) takes effect at .zshrc source +# time. If you later add a plugin that also drives line-pre-redraw +# (zsh-autosuggestions, zsh-syntax-highlighting), switch to add-zle-hook-widget +# registered from a one-shot precmd so the hooks chain instead of clobbering. +autoload -Uz add-zsh-hook +add-zsh-hook precmd _calc_flush +add-zsh-hook zshexit _calc_qco_stop # kill the qalc coprocess on shell exit +zle -N zle-line-pre-redraw _calc_preview +zle -N accept-line _calc_accept_line diff --git a/linux_configuration/zsh/calc-popup.sh b/linux_configuration/zsh/calc-popup.sh new file mode 100755 index 0000000..f3884f0 --- /dev/null +++ b/linux_configuration/zsh/calc-popup.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +# ============================================================================ +# calc-popup.sh — launch the floating scratchpad calculator (i3: Mod+c) +# ---------------------------------------------------------------------------- +# Opens a small, centered, floating terminator that runs a minimal zsh which +# loads only the live-calc widget. Type math, see results live, Ctrl-D to close. +# +# i3 matches the window by its X role ("calc"), set via terminator --role. +# -u (--no-dbus) forces a standalone process so it never folds into an existing +# terminator window as a tab. +# ============================================================================ +set -euo pipefail + +# Directory of this script (the repo zsh/ dir); scratchpad rc lives alongside. +SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" +readonly SCRATCH_ZDOTDIR="${SCRIPT_DIR}/scratchpad" + +if ! command -v terminator >/dev/null 2>&1; then + echo "calc-popup: terminator not found" >&2 + exit 1 +fi + +# --geometry is in pixels for terminator; set the size here (reliably, before +# the window maps) so i3 only has to float + center it. +exec terminator -u --role=calc -T calc --geometry=820x220 \ + -e "env ZDOTDIR='${SCRATCH_ZDOTDIR}' zsh -i" diff --git a/linux_configuration/zsh/scratchpad/.zshrc b/linux_configuration/zsh/scratchpad/.zshrc new file mode 100644 index 0000000..b3f4217 --- /dev/null +++ b/linux_configuration/zsh/scratchpad/.zshrc @@ -0,0 +1,15 @@ +# Minimal rc for the floating scratchpad calculator (i3: Mod+c). +# +# Deliberately tiny: it loads ONLY the live-calc widget, so the popup starts +# instantly instead of paying for the full ~/.zshrc (oh-my-zsh, pyenv, conda, +# nvm). ZDOTDIR points here, so this is the only rc that runs. + +PROMPT='%F{cyan}calc ❯%f ' +RPROMPT='' +autoload -Uz colors 2>/dev/null && colors 2>/dev/null + +# Load the widget from the repo (one level up), independent of the oh-my-zsh +# symlink, so the scratchpad works even if oh-my-zsh is not installed. +source "${ZDOTDIR}/../calc-live.zsh" + +print -P '%F{242}Live calculator — type math, see the result as you type. Ctrl-D to close.%f' diff --git a/meta/pyproject.toml b/meta/pyproject.toml index 713c442..0951173 100644 --- a/meta/pyproject.toml +++ b/meta/pyproject.toml @@ -141,6 +141,11 @@ ignore_errors = true # PYLINT - Comprehensive Python linter # ============================================================================ [tool.pylint.main] +# Make standalone script dirs importable so tests that import their modules by +# bare name (the same dirs linux_configuration/tests/conftest.py adds to +# sys.path at runtime) resolve under static analysis instead of raising E0401. +# Paths are relative to the repo root, which is pre-commit's working directory. +init-hook = "import sys; sys.path[:0] = ['meta/scripts', 'phone_focus_mode', 'phone_focus_mode/lib', 'linux_configuration/scripts/single_use/utils', 'linux_configuration/scripts/periodic_background/system-maintenance/bin']" # Analyse import fallback blocks analyse-fallback-blocks = true # Pickle collected data for later comparisons diff --git a/meta/scripts/check_agent_contract.sh b/meta/scripts/check_agent_contract.sh index 29daadc..07c838b 100755 --- a/meta/scripts/check_agent_contract.sh +++ b/meta/scripts/check_agent_contract.sh @@ -3,6 +3,8 @@ set -euo pipefail +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +readonly SCRIPT_DIR readonly CONTRACT_GLOB='docs/superpowers/contracts/*.json' readonly MULTI_FILE_THRESHOLD=4 @@ -16,44 +18,7 @@ list_staged_contract_files() { validate_contract_file() { local file_path="$1" - python - "$file_path" <<'PY' -import json -import pathlib -import sys - -path = pathlib.Path(sys.argv[1]) -data = json.loads(path.read_text(encoding="utf-8")) - -required = [ - "title", - "objective", - "acceptance_criteria", - "out_of_scope", - "verifier", -] - -missing = [field for field in required if field not in data] -if missing: - raise SystemExit(f"{path}: missing required fields: {', '.join(missing)}") - -if not isinstance(data["title"], str) or not data["title"].strip(): - raise SystemExit(f"{path}: title must be non-empty string") - -if not isinstance(data["objective"], str) or not data["objective"].strip(): - raise SystemExit(f"{path}: objective must be non-empty string") - -if not isinstance(data["verifier"], str) or not data["verifier"].strip(): - raise SystemExit(f"{path}: verifier must be non-empty string") - -for field in ("acceptance_criteria", "out_of_scope"): - value = data[field] - if not isinstance(value, list) or not value: - raise SystemExit(f"{path}: {field} must be a non-empty list") - if any(not isinstance(item, str) or not item.strip() for item in value): - raise SystemExit(f"{path}: {field} items must be non-empty strings") - -print(f"{path}: contract schema OK") -PY + python "${SCRIPT_DIR}/validate_contract.py" "$file_path" } main() { diff --git a/meta/scripts/check_ai_evidence.sh b/meta/scripts/check_ai_evidence.sh index 1b244cd..12bff92 100755 --- a/meta/scripts/check_ai_evidence.sh +++ b/meta/scripts/check_ai_evidence.sh @@ -3,6 +3,8 @@ set -euo pipefail +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +readonly SCRIPT_DIR readonly EVIDENCE_GLOB='docs/superpowers/evidence/*.json' has_code_changes() { @@ -15,72 +17,7 @@ find_staged_evidence_files() { validate_json_schema() { local file_path="$1" - - python - "$file_path" <<'PY' -import json -import pathlib -import sys - -path = pathlib.Path(sys.argv[1]) - -try: - data = json.loads(path.read_text(encoding="utf-8")) -except Exception as exc: # pragma: no cover - hook error path - raise SystemExit(f"{path}: invalid JSON ({exc})") - -required = [ - "intent", - "scope", - "changes", - "verification", - "risks", - "rollback", -] - -missing = [key for key in required if key not in data] -if missing: - raise SystemExit(f"{path}: missing required keys: {', '.join(missing)}") - -if not isinstance(data["intent"], str) or not data["intent"].strip(): - raise SystemExit(f"{path}: intent must be a non-empty string") - -for key in ("scope", "changes", "risks", "rollback"): - value = data[key] - if not isinstance(value, list) or not value: - raise SystemExit(f"{path}: {key} must be a non-empty list") - if any(not isinstance(item, str) or not item.strip() for item in value): - raise SystemExit(f"{path}: {key} entries must be non-empty strings") - -verification = data["verification"] -if not isinstance(verification, list) or not verification: - raise SystemExit(f"{path}: verification must be a non-empty list") - -required_verification_fields = {"command", "result", "evidence"} -for index, item in enumerate(verification): - if not isinstance(item, dict): - raise SystemExit(f"{path}: verification[{index}] must be an object") - missing_fields = required_verification_fields - item.keys() - if missing_fields: - missing_joined = ", ".join(sorted(missing_fields)) - raise SystemExit( - f"{path}: verification[{index}] missing fields: {missing_joined}" - ) - for field in required_verification_fields: - value = item[field] - if not isinstance(value, str) or not value.strip(): - raise SystemExit( - f"{path}: verification[{index}].{field} must be a non-empty string" - ) - -content_lower = path.read_text(encoding="utf-8").lower() -for phrase in ("should work", "probably fine", "seems right"): - if phrase in content_lower: - raise SystemExit( - f"{path}: contains rationalization phrase '{phrase}', replace with evidence" - ) - -print(f"{path}: schema OK") -PY + python "${SCRIPT_DIR}/validate_evidence.py" "$file_path" } main() { diff --git a/meta/scripts/check_python_location.sh b/meta/scripts/check_python_location.sh index 0345f4c..55c138c 100755 --- a/meta/scripts/check_python_location.sh +++ b/meta/scripts/check_python_location.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash # Check that all Python files are under python_pkg/. -# Exceptions: linux_configuration/, Bash/, +# Exceptions: linux_configuration/, phone_focus_mode/, Bash/, # and vendored/generated directories. # Used as a pre-commit hook; receives staged file paths as arguments. @@ -17,7 +17,7 @@ for file in "$@"; do # Skip allowed directories (non-Python projects with some Python scripts) case "$file" in - linux_configuration/*|scripts/*|meta/scripts/*) continue ;; + linux_configuration/*|phone_focus_mode/*|scripts/*|meta/scripts/*) continue ;; esac # Skip vendored/generated directories diff --git a/meta/scripts/validate_contract.py b/meta/scripts/validate_contract.py new file mode 100755 index 0000000..fab458d --- /dev/null +++ b/meta/scripts/validate_contract.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +"""Validate a workflow-contract artifact against the required schema. + +Used by the ``check_agent_contract.sh`` pre-commit hook: given one contract JSON +path as ``argv[1]``, it prints ``: contract schema OK`` and exits 0 when the +file is valid, or writes each problem to stderr and exits 1 otherwise. + +Kept as a standalone module (not inline ``python < bool: + """Return True if ``value`` is a string with non-whitespace content.""" + return isinstance(value, str) and bool(value.strip()) + + +def _check_required_keys(data: dict[str, object]) -> list[str]: + """Report any required top-level keys that are absent.""" + missing = [key for key in _REQUIRED_KEYS if key not in data] + if missing: + return [f"missing required fields: {', '.join(missing)}"] + return [] + + +def _check_strings(data: dict[str, object]) -> list[str]: + """Each scalar field must be a non-empty string.""" + return [ + f"{key} must be non-empty string" + for key in _STRING_KEYS + if not _is_nonempty_str(data.get(key)) + ] + + +def _check_string_lists(data: dict[str, object]) -> list[str]: + """Each list field must be a non-empty list of non-empty strings.""" + errors: list[str] = [] + for key in _STRING_LIST_KEYS: + value = data.get(key) + if not isinstance(value, list) or not value: + errors.append(f"{key} must be a non-empty list") + continue + if any(not _is_nonempty_str(item) for item in value): + errors.append(f"{key} items must be non-empty strings") + return errors + + +def validate(path: Path) -> list[str]: + """Return a list of schema problems for ``path`` (empty when it is valid).""" + try: + text = path.read_text(encoding="utf-8") + except OSError as exc: + return [f"cannot read file ({exc})"] + try: + data = json.loads(text) + except json.JSONDecodeError as exc: + return [f"invalid JSON ({exc})"] + if not isinstance(data, dict): + return ["top-level JSON value must be an object"] + + errors = _check_required_keys(data) + if errors: # without the keys present, the per-field checks are noise + return errors + errors += _check_strings(data) + errors += _check_string_lists(data) + return errors + + +def main() -> int: + """Validate the contract named by ``argv[1]``; return a process exit code.""" + args = sys.argv[1:] + if not args: + sys.stderr.write("usage: validate_contract.py \n") + return 2 + path = Path(args[0]) + errors = validate(path) + if errors: + for error in errors: + sys.stderr.write(f"{path}: {error}\n") + return 1 + sys.stdout.write(f"{path}: contract schema OK\n") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/meta/scripts/validate_evidence.py b/meta/scripts/validate_evidence.py new file mode 100755 index 0000000..2735518 --- /dev/null +++ b/meta/scripts/validate_evidence.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +"""Validate an AI-evidence artifact against the required schema. + +Used by the ``check_ai_evidence.sh`` pre-commit hook: it is given one evidence +JSON path as ``argv[1]``, prints ``: schema OK`` and exits 0 when the file +satisfies the schema, or writes each problem to stderr and exits 1 otherwise. + +Kept as a standalone module (not inline ``python < bool: + """Return True if ``value`` is a string with non-whitespace content.""" + return isinstance(value, str) and bool(value.strip()) + + +def _check_required_keys(data: dict[str, object]) -> list[str]: + """Report any required top-level keys that are absent.""" + missing = [key for key in _REQUIRED_KEYS if key not in data] + if missing: + return [f"missing required keys: {', '.join(missing)}"] + return [] + + +def _check_intent(data: dict[str, object]) -> list[str]: + """The ``intent`` field must be a non-empty string.""" + if not _is_nonempty_str(data.get("intent")): + return ["intent must be a non-empty string"] + return [] + + +def _check_string_lists(data: dict[str, object]) -> list[str]: + """Each string-list field must be a non-empty list of non-empty strings.""" + errors: list[str] = [] + for key in _STRING_LIST_KEYS: + value = data.get(key) + if not isinstance(value, list) or not value: + errors.append(f"{key} must be a non-empty list") + continue + if any(not _is_nonempty_str(item) for item in value): + errors.append(f"{key} entries must be non-empty strings") + return errors + + +def _check_verification(data: dict[str, object]) -> list[str]: + """``verification`` must be a non-empty list of fully-populated objects.""" + verification = data.get("verification") + if not isinstance(verification, list) or not verification: + return ["verification must be a non-empty list"] + errors: list[str] = [] + for index, item in enumerate(verification): + if not isinstance(item, dict): + errors.append(f"verification[{index}] must be an object") + continue + missing = [field for field in _VERIFICATION_FIELDS if field not in item] + if missing: + errors.append(f"verification[{index}] missing fields: {', '.join(missing)}") + bad = [ + field + for field in _VERIFICATION_FIELDS + if field in item and not _is_nonempty_str(item[field]) + ] + errors.extend( + f"verification[{index}].{field} must be a non-empty string" for field in bad + ) + return errors + + +def _check_phrases(text: str) -> list[str]: + """Reject artifacts containing rationalization phrases instead of evidence.""" + lowered = text.lower() + return [ + f"contains rationalization phrase '{phrase}', replace with evidence" + for phrase in _BANNED_PHRASES + if phrase in lowered + ] + + +def validate(path: Path) -> list[str]: + """Return a list of schema problems for ``path`` (empty when it is valid).""" + try: + text = path.read_text(encoding="utf-8") + except OSError as exc: + return [f"cannot read file ({exc})"] + try: + data = json.loads(text) + except json.JSONDecodeError as exc: + return [f"invalid JSON ({exc})"] + if not isinstance(data, dict): + return ["top-level JSON value must be an object"] + + errors = _check_required_keys(data) + if errors: # without the keys present, the per-field checks are noise + return errors + errors += _check_intent(data) + errors += _check_string_lists(data) + errors += _check_verification(data) + errors += _check_phrases(text) + return errors + + +def main() -> int: + """Validate the artifact named by ``argv[1]``; return a process exit code.""" + args = sys.argv[1:] + if not args: + sys.stderr.write("usage: validate_evidence.py \n") + return 2 + path = Path(args[0]) + errors = validate(path) + if errors: + for error in errors: + sys.stderr.write(f"{path}: {error}\n") + return 1 + sys.stdout.write(f"{path}: schema OK\n") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/phone_focus_mode/deploy.sh b/phone_focus_mode/deploy.sh index dbcea47..ed2fdf9 100755 --- a/phone_focus_mode/deploy.sh +++ b/phone_focus_mode/deploy.sh @@ -426,26 +426,9 @@ do_deploy() { # *value* column of a hosts entry (" " possibly # followed by aliases). We strip any line whose first non-IP # token matches one of the unblock domains. - python3 - "$HOSTS_TMP" "$HOSTS_WORKOUT_TMP" < [aliases...] - if len(parts) >= 2 and any(p.lower() in unblock for p in parts[1:]): - continue - dst.write(line) -PY_EOF + WORKOUT_UNBLOCK_DOMAINS="$UNBLOCK_DOMAINS" \ + python3 "$SCRIPT_DIR/strip_workout_hosts.py" "$HOSTS_TMP" "$HOSTS_WORKOUT_TMP" \ + || cp "$HOSTS_TMP" "$HOSTS_WORKOUT_TMP" workout_hash="$(compute_file_hash "$HOSTS_WORKOUT_TMP")" printf '%s\n' "$workout_hash" > "$HOSTS_WORKOUT_SHA_TMP" stripped_lines=$(($(wc -l < "$HOSTS_TMP") - $(wc -l < "$HOSTS_WORKOUT_TMP"))) diff --git a/phone_focus_mode/lib/monitor.sh b/phone_focus_mode/lib/monitor.sh index 23f8186..d6a9768 100755 --- a/phone_focus_mode/lib/monitor.sh +++ b/phone_focus_mode/lib/monitor.sh @@ -3,6 +3,11 @@ # Requires: adb_common.sh sourced, ADB_SERIAL set, backup_manifest.sh sourced. set -euo pipefail +# Directory of this library, used to locate sibling helper scripts (BASH_SOURCE +# resolves to monitor.sh even when sourced, so the path is stable). +_MONITOR_LIB_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +readonly _MONITOR_LIB_DIR + readonly _MONITOR_REMOTE_DIR="/data/local/tmp/focus_mode" readonly _MONITOR_HOSTS_CANONICAL="/data/local/tmp/focus_mode/hosts.canonical" readonly _MONITOR_HOSTS_SHA_FILE="/data/local/tmp/focus_mode/hosts.sha256" @@ -431,33 +436,7 @@ monitor_print_summary() { return 0 } - python - "${report_path}" <<'PY' -import json -import sys - -report_path = sys.argv[1] -with open(report_path, encoding="utf-8") as handle: - report = json.load(handle) - -counts = {"ok": 0, "warn": 0, "error": 0, "fatal": 0} -issues = [] -for check in report.get("checks", []): - status = check.get("status", "warn") - counts[status] = counts.get(status, 0) + 1 - if status in {"warn", "error", "fatal"}: - issues.append((status, check.get("check", "unknown"), check.get("message", ""))) - -print("\n=== Monitoring Summary ===") -print( - f" ok={counts.get('ok', 0):<3} warn={counts.get('warn', 0):<3} " - f"error={counts.get('error', 0):<3} fatal={counts.get('fatal', 0):<3}" -) -if issues: - print("\nIssues found:") - for status, check_name, message in issues: - print(f" [{status}] {check_name}: {message}") -print("==========================\n") -PY + python "${_MONITOR_LIB_DIR}/monitor_report.py" summary "${report_path}" } monitor_severity_exit() { @@ -466,18 +445,5 @@ monitor_severity_exit() { [[ -f "${report_path}" ]] || return 0 - python - "${report_path}" <<'PY' -import json -import sys - -report_path = sys.argv[1] -with open(report_path, encoding="utf-8") as handle: - report = json.load(handle) - -has_severe = any( - check.get("status") in {"fatal", "error"} - for check in report.get("checks", []) -) -raise SystemExit(1 if has_severe else 0) -PY + python "${_MONITOR_LIB_DIR}/monitor_report.py" severity "${report_path}" } diff --git a/phone_focus_mode/lib/monitor_report.py b/phone_focus_mode/lib/monitor_report.py new file mode 100755 index 0000000..1160db6 --- /dev/null +++ b/phone_focus_mode/lib/monitor_report.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +"""Render or grade a phone-monitoring report produced by ``monitor.sh``. + +Two modes, selected by ``argv[1]``: + +* ``summary`` — print a human-readable status summary to stdout (always exit 0). +* ``severity`` — exit 1 if any check is ``fatal``/``error``, else exit 0 (no output). + +``argv[2]`` is the path to the ``report.json`` snapshot. Kept as a standalone +module (not inline ``python < list[object]: + """Return the ``checks`` array from the report, or [] if absent/malformed.""" + data: object = json.loads(report_path.read_text(encoding="utf-8")) + if not isinstance(data, dict): + return [] + checks: object = data.get("checks", []) + return checks if isinstance(checks, list) else [] + + +def _field(check: object, key: str, default: str) -> str: + """Read a string field from a check object, falling back to ``default``.""" + if isinstance(check, dict): + value = check.get(key, default) + if isinstance(value, str): + return value + return default + + +def _render_summary(checks: list[object]) -> str: + """Build the multi-line monitoring summary string for the given checks.""" + counts = dict.fromkeys(_COUNT_KEYS, 0) + issues: list[tuple[str, str, str]] = [] + for check in checks: + status = _field(check, "status", "warn") + counts[status] = counts.get(status, 0) + 1 + if status in _ISSUE_STATUSES: + issues.append( + ( + status, + _field(check, "check", "unknown"), + _field(check, "message", ""), + ), + ) + + lines = [ + "", + "=== Monitoring Summary ===", + f" ok={counts.get('ok', 0):<3} warn={counts.get('warn', 0):<3} " + f"error={counts.get('error', 0):<3} fatal={counts.get('fatal', 0):<3}", + ] + if issues: + lines.append("") + lines.append("Issues found:") + lines.extend( + f" [{status}] {name}: {message}" for status, name, message in issues + ) + lines.append("==========================") + lines.append("") + return "\n".join(lines) + "\n" + + +def _has_severe(checks: list[object]) -> bool: + """Return True if any check has a fatal/error status.""" + return any(_field(check, "status", "warn") in _SEVERE_STATUSES for check in checks) + + +def main() -> int: + """Dispatch on ``argv[1]`` (summary|severity) and ``argv[2]`` (report path).""" + args = sys.argv[1:] + expected_args = 2 + if len(args) != expected_args or args[0] not in {"summary", "severity"}: + sys.stderr.write("usage: monitor_report.py {summary|severity} \n") + return 2 + mode, report_path = args[0], Path(args[1]) + checks = _load_checks(report_path) + if mode == "summary": + sys.stdout.write(_render_summary(checks)) + return 0 + return 1 if _has_severe(checks) else 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/phone_focus_mode/strip_workout_hosts.py b/phone_focus_mode/strip_workout_hosts.py new file mode 100755 index 0000000..9b8fba4 --- /dev/null +++ b/phone_focus_mode/strip_workout_hosts.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +"""Produce the workout variant of the canonical hosts file. + +Copies ``argv[1]`` (the full hosts file) to ``argv[2]``, dropping every entry +whose host name (or any alias) matches a workout-allowlisted domain. The domain +set is read from the ``WORKOUT_UNBLOCK_DOMAINS`` environment variable (whitespace +separated) so the generator and the on-device runtime share one source of truth. + +Kept as a standalone module (not inline ``python < [aliases...]". +_MIN_HOSTS_FIELDS = 2 + + +def _strip(source: Path, dest: Path, unblock: frozenset[str]) -> None: + """Write ``source`` to ``dest`` minus lines that map an unblocked domain.""" + text = source.read_text(encoding="utf-8", errors="replace") + kept: list[str] = [] + for line in text.splitlines(keepends=True): + stripped = line.strip() + if not stripped or stripped.startswith("#"): + kept.append(line) + continue + parts = stripped.split() + names = parts[1:] + if len(parts) >= _MIN_HOSTS_FIELDS and any( + name.lower() in unblock for name in names + ): + continue + kept.append(line) + dest.write_text("".join(kept), encoding="utf-8") + + +def main() -> int: + """Read the source/dest paths from argv and the domains from the env.""" + args = sys.argv[1:] + expected_args = 2 + if len(args) != expected_args: + sys.stderr.write("usage: strip_workout_hosts.py \n") + return 2 + unblock = frozenset(os.environ.get("WORKOUT_UNBLOCK_DOMAINS", "").split()) + _strip(Path(args[0]), Path(args[1]), unblock) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/python_pkg/live_calc/__init__.py b/python_pkg/live_calc/__init__.py new file mode 100644 index 0000000..7697f16 --- /dev/null +++ b/python_pkg/live_calc/__init__.py @@ -0,0 +1 @@ +"""Live-calc: safe arithmetic evaluator backing the calc-live.zsh prompt widget.""" diff --git a/python_pkg/live_calc/calc_eval.py b/python_pkg/live_calc/calc_eval.py new file mode 100644 index 0000000..fa93789 --- /dev/null +++ b/python_pkg/live_calc/calc_eval.py @@ -0,0 +1,231 @@ +"""Safe arithmetic evaluator for the live-calc zsh widget. + +Read one expression from ``sys.argv[1]`` and write its formatted numeric result +to stdout, or write nothing on any error, unsafe input, overflow, or timeout. + +The expression is parsed into an AST and evaluated by walking a strict +whitelist of node types, so it can never import modules, access attributes, or +execute arbitrary code. CPU and wall-clock time are capped so that a runaway +expression typed live (for example ``9**9**9``) cannot freeze the shell. + +Used by ``calc-live.zsh``; kept as a standalone module so the repository's +Python tooling (ruff, mypy, pylint, bandit) applies to it. +""" + +from __future__ import annotations + +import ast +import contextlib +import math +import operator +import resource +import signal +import sys +from typing import TYPE_CHECKING, NoReturn, TypeAlias + +if TYPE_CHECKING: + from collections.abc import Callable + from types import FrameType + +Number: TypeAlias = int | float + +# Whitelisted callables, addressed by the name used in the expression. +_FUNCTIONS: dict[str, Callable[..., Number]] = { + "sqrt": math.sqrt, + "abs": abs, + "round": round, + "sin": math.sin, + "cos": math.cos, + "tan": math.tan, + "asin": math.asin, + "acos": math.acos, + "atan": math.atan, + "ln": math.log, + "log": math.log10, + "log2": math.log2, + "exp": math.exp, + "floor": math.floor, + "ceil": math.ceil, + "factorial": math.factorial, + "gcd": math.gcd, + "deg": math.degrees, + "rad": math.radians, + "min": min, + "max": max, +} + +# Whitelisted constants. +_CONSTANTS: dict[str, float] = {"pi": math.pi, "e": math.e, "tau": math.tau} + +# Binary and unary operators, addressed by AST node type. +_BINARY_OPS: dict[type[ast.operator], Callable[[Number, Number], Number]] = { + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: operator.mul, + ast.Div: operator.truediv, + ast.FloorDiv: operator.floordiv, + ast.Mod: operator.mod, + ast.Pow: operator.pow, +} +_UNARY_OPS: dict[type[ast.unaryop], Callable[[Number], Number]] = { + ast.UAdd: operator.pos, + ast.USub: operator.neg, +} + +_MAX_EXPONENT = 10_000 # refuse a ** b for very large b before computing +_MAX_FACTORIAL_ARG = 10_000 # factorial grows astronomically fast +_MAX_INT_DIGITS = 25 # longer ints are shown in scientific form +_FLOAT_PRECISION = 12 # significant digits for float results +_SCI_PRECISION = 6 # significant digits for the scientific fallback +_CPU_LIMIT_SECONDS = 1 # hard kernel CPU cap (SIGXCPU terminates) +_WALL_LIMIT_SECONDS = 0.4 # soft wall-clock cap (SIGALRM) + + +class _CalcError(Exception): + """Raised when the input is not a permitted arithmetic expression.""" + + +def _raise_timeout(_signum: int, _frame: FrameType | None) -> NoReturn: + """SIGALRM handler: abort a too-slow evaluation via a catchable exception.""" + raise TimeoutError + + +def _apply_limits() -> None: + """Cap CPU and wall-clock time so a runaway expression cannot hang the shell.""" + with contextlib.suppress(ValueError, OSError): + resource.setrlimit( + resource.RLIMIT_CPU, + (_CPU_LIMIT_SECONDS, _CPU_LIMIT_SECONDS), + ) + with contextlib.suppress(ValueError, OSError): + signal.signal(signal.SIGALRM, _raise_timeout) + signal.setitimer(signal.ITIMER_REAL, _WALL_LIMIT_SECONDS) + + +def _eval_constant(node: ast.Constant) -> Number: + """Return a numeric literal value, rejecting booleans and other types.""" + if isinstance(node.value, bool) or not isinstance(node.value, (int, float)): + raise _CalcError + return node.value + + +def _eval_name(node: ast.Name) -> Number: + """Return the value of a whitelisted constant name (pi, e, tau).""" + try: + return _CONSTANTS[node.id] + except KeyError as exc: + raise _CalcError from exc + + +def _eval_unaryop(node: ast.UnaryOp) -> Number: + """Evaluate a unary plus/minus operation.""" + try: + func = _UNARY_OPS[type(node.op)] + except KeyError as exc: + raise _CalcError from exc + return func(_eval(node.operand)) + + +def _eval_binop(node: ast.BinOp) -> Number: + """Evaluate a binary operation, guarding against explosive exponents.""" + try: + func = _BINARY_OPS[type(node.op)] + except KeyError as exc: + raise _CalcError from exc + left = _eval(node.left) + right = _eval(node.right) + if isinstance(node.op, ast.Pow) and abs(right) > _MAX_EXPONENT: + raise _CalcError + return func(left, right) + + +def _eval_call(node: ast.Call) -> Number: + """Evaluate a call to a whitelisted function, bounding factorial growth.""" + if not isinstance(node.func, ast.Name) or node.keywords: + raise _CalcError + try: + func = _FUNCTIONS[node.func.id] + except KeyError as exc: + raise _CalcError from exc + args = [_eval(arg) for arg in node.args] + if node.func.id == "factorial" and ( + not args or not isinstance(args[0], int) or args[0] > _MAX_FACTORIAL_ARG + ): + raise _CalcError + return func(*args) + + +def _eval(node: ast.AST) -> Number: + """Recursively evaluate one whitelisted AST node.""" + if isinstance(node, ast.Expression): + return _eval(node.body) + if isinstance(node, ast.Constant): + return _eval_constant(node) + if isinstance(node, ast.Name): + return _eval_name(node) + if isinstance(node, ast.UnaryOp): + return _eval_unaryop(node) + if isinstance(node, ast.BinOp): + return _eval_binop(node) + if isinstance(node, ast.Call): + return _eval_call(node) + raise _CalcError + + +def _format(value: Number) -> str: + """Format a numeric result compactly, or return '' if it cannot be shown.""" + if isinstance(value, bool): + value = int(value) + if isinstance(value, int): + text = str(value) + if len(text) <= _MAX_INT_DIGITS: + return text + try: + return format(float(value), f".{_SCI_PRECISION}g") + except OverflowError: + return "" + if math.isnan(value) or math.isinf(value): + return "" + return format(value, f".{_FLOAT_PRECISION}g") + + +def evaluate(expression: str) -> str: + """Evaluate ``expression`` and return its formatted result, or '' on failure. + + Args: + expression: The arithmetic expression. ``^`` is treated as power. + + Returns: + The formatted result, or an empty string for any invalid, unsafe, or + non-terminating input. + """ + try: + tree = ast.parse(expression.replace("^", "**"), mode="eval") + return _format(_eval(tree)) + except ( + _CalcError, + SyntaxError, + ArithmeticError, + ValueError, + TypeError, + RecursionError, + TimeoutError, + MemoryError, + ): + return "" + + +def main() -> int: + """Read ``argv[1]``, evaluate it under resource limits, and print the result.""" + _apply_limits() + args = sys.argv[1:] + if not args: + return 0 + result = evaluate(args[0]) + if result: + sys.stdout.write(result) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/python_pkg/live_calc/tests/__init__.py b/python_pkg/live_calc/tests/__init__.py new file mode 100644 index 0000000..2bad517 --- /dev/null +++ b/python_pkg/live_calc/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the live_calc subpackage.""" diff --git a/python_pkg/live_calc/tests/test_calc_eval.py b/python_pkg/live_calc/tests/test_calc_eval.py new file mode 100644 index 0000000..8c8b312 --- /dev/null +++ b/python_pkg/live_calc/tests/test_calc_eval.py @@ -0,0 +1,209 @@ +"""Tests for the live-calc safe arithmetic evaluator. + +The public surface is ``evaluate``; most branches are reached through it so the +tests double as behaviour documentation. A handful of internal helpers +(``_format``, ``_apply_limits``, ``_raise_timeout``, ``main``) are exercised +directly where a branch cannot be triggered through ``evaluate`` alone. +""" + +from __future__ import annotations + +import math +from unittest.mock import patch + +import pytest + +from python_pkg.live_calc import calc_eval +from python_pkg.live_calc.calc_eval import ( + _apply_limits, + _format, + _raise_timeout, + evaluate, + main, +) + +_MOD = "python_pkg.live_calc.calc_eval" + + +class TestArithmetic: + """Core arithmetic, operators and the calculator-style ``^`` power.""" + + @pytest.mark.parametrize( + ("expr", "expected"), + [ + ("2+2", "4"), + ("20*(3+4)", "140"), + ("10-3", "7"), + ("7/2", "3.5"), + ("7//2", "3"), + ("10%3", "1"), + ("2^10", "1024"), # ^ is rewritten to ** + ("2**10", "1024"), + ("+5", "5"), + ("-5", "-5"), + ("3/2", "1.5"), + ], + ) + def test_evaluates(self, expr: str, expected: str) -> None: + """Each operator yields the expected formatted result.""" + assert evaluate(expr) == expected + + +class TestFunctionsAndConstants: + """Whitelisted functions and named constants.""" + + def test_sqrt(self) -> None: + """sqrt(4) is 2.""" + assert evaluate("sqrt(4)") == "2" + + def test_nested_call(self) -> None: + """Functions compose and trig works against pi.""" + assert evaluate("sin(pi/2)") == "1" + + def test_constant_pi(self) -> None: + """The constant pi resolves to math.pi.""" + assert evaluate("pi") == format(math.pi, ".12g") + + def test_min_max(self) -> None: + """Multi-argument builtins are allowed.""" + assert evaluate("max(2, 9, 4)") == "9" + + def test_factorial_ok(self) -> None: + """A small factorial is computed.""" + assert evaluate("factorial(5)") == "120" + + +class TestRejected: + """Inputs that must yield '' (invalid, unsafe, or unsupported).""" + + @pytest.mark.parametrize( + "expr", + [ + "", # empty + "ls -la", # not an expression at all + "2+", # SyntaxError + "x", # unknown name + "sqrt", # function name without a call + "foo(2)", # unknown function + "True", # bool literal rejected + "'a'", # string literal rejected + "5 & 3", # unsupported binary operator + "~5", # unsupported unary operator + "(1+1)(2)", # call target is not a plain name + "round(2.5, ndigits=1)", # keyword arguments rejected + "[1, 2]", # unsupported node type (list) + "factorial()", # factorial needs an argument + "factorial(2.5)", # factorial argument must be int + ], + ) + def test_returns_empty(self, expr: str) -> None: + """Anything that is not a permitted arithmetic expression yields ''.""" + assert evaluate(expr) == "" + + +class TestRunawayGuards: + """Bounds that stop a live keystroke from computing forever.""" + + def test_huge_exponent_refused(self) -> None: + """A ** b with an enormous b is refused before computing.""" + assert evaluate("2^99999") == "" + + def test_huge_factorial_refused(self) -> None: + """factorial of a huge argument is refused up front.""" + assert evaluate("factorial(99999)") == "" + + +class TestFormatting: + """Number formatting, including the int/scientific/float branches.""" + + def test_big_int_uses_scientific(self) -> None: + """Integers longer than the digit cap fall back to scientific form.""" + assert evaluate("10^30") == format(1e30, ".6g") + + def test_overflowing_int_yields_empty(self) -> None: + """An int too large to convert to float formats to ''.""" + assert evaluate("10^400") == "" + + def test_infinity_yields_empty(self) -> None: + """A float overflow to infinity formats to ''.""" + assert evaluate("1e308*10") == "" + + def test_format_bool_is_int(self) -> None: + """_format coerces bool to its int value (reached only directly).""" + assert _format(value=True) == "1" + + def test_format_nan_yields_empty(self) -> None: + """_format rejects NaN.""" + assert _format(float("nan")) == "" + + def test_format_inf_yields_empty(self) -> None: + """_format rejects infinity.""" + assert _format(float("inf")) == "" + + def test_format_plain_float(self) -> None: + """A finite float is formatted with the float precision.""" + assert _format(1.5) == "1.5" + + +class TestTimeoutHandler: + """The SIGALRM handler and the resource-limit installer.""" + + def test_raise_timeout_raises(self) -> None: + """The handler converts an alarm into a catchable TimeoutError.""" + with pytest.raises(TimeoutError): + _raise_timeout(0, None) + + def test_apply_limits_installs(self) -> None: + """Limits are installed via setrlimit/signal/setitimer.""" + with ( + patch(f"{_MOD}.resource.setrlimit") as set_rlimit, + patch(f"{_MOD}.signal.signal") as set_signal, + patch(f"{_MOD}.signal.setitimer") as set_timer, + ): + _apply_limits() + set_rlimit.assert_called_once() + set_signal.assert_called_once() + set_timer.assert_called_once() + + def test_apply_limits_survives_unavailable(self) -> None: + """If the platform rejects the limits, the error is swallowed.""" + with ( + patch(f"{_MOD}.resource.setrlimit", side_effect=OSError), + patch(f"{_MOD}.signal.signal", side_effect=ValueError), + patch(f"{_MOD}.signal.setitimer"), + ): + _apply_limits() # must not raise + + +class TestMain: + """The command-line entry point (argv -> stdout).""" + + def test_no_argument(self, capsys: pytest.CaptureFixture[str]) -> None: + """With no expression argument, nothing is printed and rc is 0.""" + with ( + patch(f"{_MOD}._apply_limits"), + patch.object(calc_eval.sys, "argv", ["calc_eval"]), + ): + assert main() == 0 + assert capsys.readouterr().out == "" + + def test_valid_expression(self, capsys: pytest.CaptureFixture[str]) -> None: + """A valid expression is evaluated and written to stdout.""" + with ( + patch(f"{_MOD}._apply_limits"), + patch.object(calc_eval.sys, "argv", ["calc_eval", "2+2"]), + ): + assert main() == 0 + assert capsys.readouterr().out == "4" + + def test_invalid_expression_writes_nothing( + self, + capsys: pytest.CaptureFixture[str], + ) -> None: + """An invalid expression produces no output.""" + with ( + patch(f"{_MOD}._apply_limits"), + patch.object(calc_eval.sys, "argv", ["calc_eval", "ls -la"]), + ): + assert main() == 0 + assert capsys.readouterr().out == ""