Add cross-device log sync (Python half of Milestone 3)

Pulls every other device's pushed log from GitHub-backed dumb storage,
merges it with the local log, and pushes this device's own merged copy
back -- the PC half of the diet-guard-app sync plan.

- _sync_merge.py: pure union-by-id merge, tombstone always wins, legacy
  (time, desc) dedup for pre-id entries. Commutative and idempotent.
- _sync_github.py: minimal GitHub Contents API client (list/get/put),
  distinguishing a 404 on an unused path from the repo itself being
  unreachable.
- _sync.py: orchestration -- pull, merge, re-sign every persisted entry
  regardless of origin, write, rebuild the food bank, push. Re-signing
  unconditionally is load-bearing: an unsigned phone-origin entry would
  otherwise be silently dropped on the very next read once a machine
  holds the shared HMAC key.
- _foodbank.rebuild_food_bank(): the "replay a full log into a fresh
  bank" entrypoint the Python side was missing (the Dart port already
  had its equivalent). Backs sync's bank-rebuild step.
- New diet-guard-sync.service/.timer (15-minute cadence, headless, a
  separate unit from the gate so a held lock can't stall sync) and a
  new install.sh step to install them.
- Created the private kuhyx/diet-guard-sync GitHub repo for storage.

Incidental to this feature: adding the `sync` subcommand pushed _cli.py
past the repo's 500-line cap, so `gate`'s CLI glue moved out alongside
sync's into _cli_gate.py/_cli_sync.py -- same split pattern already used
for the gate window logic itself, not a sync-specific design choice.

338 tests, 100% branch coverage. Verified importing and running cleanly
under /usr/bin/python (the production interpreter), not just the dev
venv -- the gap that caused the earlier 3-day outage.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01FU3f5KQ1GHXsbbSecfVEyF
This commit is contained in:
Krzysztof kuhy Rudnicki 2026-06-22 19:36:27 +02:00
parent feef5984f8
commit e5b80fd610
22 changed files with 1620 additions and 131 deletions

View File

@ -29,6 +29,34 @@ catch-up run at session start can beat the display manager writing
`~/.Xauthority`) and why a real fix lives in Python (`wait_for_display()`) `~/.Xauthority`) and why a real fix lives in Python (`wait_for_display()`)
rather than in the unit file. rather than in the unit file.
## Cross-device sync
`diet-guard-sync.timer` fires `python -m diet_guard sync` every ~15 minutes
(headless, no `DISPLAY` needed — separate from the gate timer on purpose, see
the unit file's comment for why). It pulls every other device's pushed log
from the private `kuhyx/diet-guard-sync` GitHub repo (used as dumb file
storage via the REST Contents API, not a git clone), merges with the local
log (`_sync_merge.merge_logs`: union by `id`, tombstone wins, legacy
`(time, desc)` dedup for pre-`id` entries), **re-signs every persisted entry**
regardless of origin, rebuilds the food bank, then pushes this device's own
merged log back up.
Re-signing on every merge (not just phone-origin entries) is the
non-negotiable step: `_entry_is_valid()` drops any unsigned entry once a
machine has the shared HMAC key, and the phone never holds that key, so
skipping the re-sign would silently lose every phone-logged meal on the very
next read.
Requires a one-time manual setup `install.sh` does **not** automate: create a
fine-grained GitHub PAT scoped to `diet-guard-sync`'s contents (read/write),
then save it to `~/.config/diet_guard/sync_token`, mode 600. Until that file
exists, every sync tick is a harmless no-op that logs `sync not configured`.
The food bank stays *derived*, never synced: only `food_log.json` round-trips
through GitHub, and each device rebuilds its own `food_bank.json` locally by
replaying the merged log (`_foodbank.rebuild_food_bank`) — this is what avoids
needing CRDT counter-merge logic for a food's `count`.
## Production dependency installation — read this before adding any dependency ## Production dependency installation — read this before adding any dependency
`diet-guard-gate.service` runs `/usr/bin/python` directly — **not** a venv. `diet-guard-gate.service` runs `/usr/bin/python` directly — **not** a venv.
@ -61,7 +89,7 @@ silently does **not** reach the running service.
## Operational gotchas ## Operational gotchas
- **The budget file is sealed immutable.** `~/.local/share/diet_guard/.budget` - **The budget file is sealed immutable.** `~/.local/share/diet_guard/.budget`
gets `chattr +i` after `init` (see `install.sh` step 5). This is the actual gets `chattr +i` after `init` (see `install.sh` step 6). This is the actual
tamper-resistance mechanism — the budget can't be casually edited to "make tamper-resistance mechanism — the budget can't be casually edited to "make
room" once locked. To intentionally change it: `sudo chattr -i` the file, room" once locked. To intentionally change it: `sudo chattr -i` the file,
re-run `python -m diet_guard init`, then re-lock. re-run `python -m diet_guard init`, then re-lock.
@ -71,13 +99,16 @@ silently does **not** reach the running service.
- **State lives entirely under `~/.local/share/diet_guard/`** — no - **State lives entirely under `~/.local/share/diet_guard/`** — no
cross-repo file coupling (unlike wake_alarm, which reads cross-repo file coupling (unlike wake_alarm, which reads
`~/screen-locker/screen_locker/workout_log.json`). Safe to reason about in `~/screen-locker/screen_locker/workout_log.json`). Safe to reason about in
isolation. isolation, with one exception: `diet-guard-sync.timer` reads/writes the
private `kuhyx/diet-guard-sync` GitHub repo (see "Cross-device sync" above)
and `~/.config/diet_guard/sync_token`.
## Commands ## Commands
- Run tests: `python -m pytest diet_guard/tests/ --cov=diet_guard --cov-branch --cov-fail-under=100` - Run tests: `python -m pytest diet_guard/tests/ --cov=diet_guard --cov-branch --cov-fail-under=100`
- Lint: `pre-commit run --all-files` - Lint: `pre-commit run --all-files`
- Test the lock manually (safe, closeable): `python -m diet_guard gate --demo` - Test the lock manually (safe, closeable): `python -m diet_guard gate --demo`
- Run one sync tick manually: `python -m diet_guard sync`
- Install for production: `bash install.sh` - Install for production: `bash install.sh`
## Do NOT ## Do NOT

13
diet-guard-sync.service Normal file
View File

@ -0,0 +1,13 @@
[Unit]
Description=Diet Guard cross-device log sync (GitHub-backed)
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
# Headless: no DISPLAY/XAUTHORITY needed, unlike diet-guard-gate.service.
# Deliberately a separate unit from the gate, not piggybacked on its tick:
# the gate is a *blocking* oneshot that holds the lock open until a meal is
# logged, so running sync on the same schedule would mean sync silently
# stalls for as long as a lock is showing.
ExecStart=/usr/bin/python -m diet_guard sync

14
diet-guard-sync.timer Normal file
View File

@ -0,0 +1,14 @@
[Unit]
Description=Periodically sync the Diet Guard log with other devices
[Timer]
# Every 15 minutes -- matches the phone app's WorkManager periodic sync (15
# min is also Android's WorkManager periodic floor), so both sides converge
# on roughly the same cadence. Persistent catches a run missed while the
# machine was suspended; OnCalendar re-arms every period regardless (see
# diet-guard-gate.timer's comment for why OnCalendar over OnBootSec here too).
OnCalendar=*-*-* *:00/15:00
Persistent=true
[Install]
WantedBy=timers.target

View File

@ -31,14 +31,10 @@ from diet_guard._budget import (
seal_budget, seal_budget,
unlock_command, unlock_command,
) )
from diet_guard._cli_gate import cmd_gate
from diet_guard._cli_sync import cmd_sync, register_sync_subparser
from diet_guard._foodbank import remember_food from diet_guard._foodbank import remember_food
from diet_guard._gate import due_slots, gate_is_due from diet_guard._gate import due_slots
from diet_guard._gatelock import (
MealGate,
acquire_gate_lock,
release_gate_lock,
)
from diet_guard._gatelock_support import wait_for_display
from diet_guard._portions import ( from diet_guard._portions import (
DEFAULT_ITEM_GRAMS, DEFAULT_ITEM_GRAMS,
estimate_unit_grams, estimate_unit_grams,
@ -180,6 +176,7 @@ def _parse_args(argv: list[str]) -> argparse.Namespace:
sub.add_parser("status", help="Show today's calories and budget band.") sub.add_parser("status", help="Show today's calories and budget band.")
sub.add_parser("undo", help="Remove today's most recent entry.") sub.add_parser("undo", help="Remove today's most recent entry.")
register_sync_subparser(sub)
gate = sub.add_parser( gate = sub.add_parser(
"gate", "gate",
@ -425,46 +422,6 @@ def _cmd_undo() -> int:
return 0 return 0
def _cmd_gate(*, check: bool, demo: bool) -> int:
"""Run the log-to-unlock gate.
Three modes: ``--check`` is a headless decision (no window) whose exit code
a timer reads; ``--demo`` always shows a safe demo window; bare ``gate``
shows the real lock only when one is due. A flock guard stops a second
window from stacking on top of the first, and a window-opening mode first
waits for the X display so a session-start launch never crashes unshown.
Args:
check: Headless mode -- print and return an exit code, open no window.
demo: Use safe demo mode (local grab + close button) for the window.
Returns:
For ``--check``: 0 if not due, 1 if a lock is due. Otherwise 0.
"""
if check:
due = gate_is_due()
_emit("due (a lock is warranted)" if due else "ok (no lock needed)")
return 1 if due else 0
if not demo and not gate_is_due():
_emit("ok - no lock needed right now.")
return 0
handle = acquire_gate_lock()
if handle is None:
_emit("the gate is already running.")
return 0
try:
# At session start the timer can fire before the X display/auth cookie
# is ready; wait it out so the window opens instead of crashing on a
# "couldn't connect to display" TclError (see _gatelock.wait_for_display).
if not wait_for_display():
_emit("display not ready yet; will retry on the next timer tick.")
return 0
MealGate(demo_mode=demo).run()
finally:
release_gate_lock(handle)
return 0
def main(argv: list[str] | None = None) -> int: def main(argv: list[str] | None = None) -> int:
"""Dispatch a diet_guard subcommand. """Dispatch a diet_guard subcommand.
@ -492,6 +449,8 @@ def main(argv: list[str] | None = None) -> int:
return _cmd_ate(args.description, portion, macros) return _cmd_ate(args.description, portion, macros)
if args.command == "status": if args.command == "status":
return _cmd_status() return _cmd_status()
if args.command == "sync":
return cmd_sync(_emit)
if args.command == "gate": if args.command == "gate":
return _cmd_gate(check=args.check, demo=args.demo) return cmd_gate(_emit, check=args.check, demo=args.demo)
return _cmd_undo() return _cmd_undo()

60
diet_guard/_cli_gate.py Normal file
View File

@ -0,0 +1,60 @@
"""CLI handler for the ``gate`` subcommand.
Split out from :mod:`diet_guard._cli` to keep that module under the repo's
500-line cap (see ``CLAUDE.md``'s "feat: split oversized modules" history).
The gate's actual window logic already lives in ``_gatelock*.py``; this is
just the thin CLI glue, same as ``_cli_sync.py`` is for ``sync``.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from diet_guard._gate import gate_is_due
from diet_guard._gatelock import MealGate, acquire_gate_lock, release_gate_lock
from diet_guard._gatelock_support import wait_for_display
if TYPE_CHECKING:
from collections.abc import Callable
def cmd_gate(emit: Callable[[str], None], *, check: bool, demo: bool) -> int:
"""Run the log-to-unlock gate.
Three modes: ``--check`` is a headless decision (no window) whose exit code
a timer reads; ``--demo`` always shows a safe demo window; bare ``gate``
shows the real lock only when one is due. A flock guard stops a second
window from stacking on top of the first, and a window-opening mode first
waits for the X display so a session-start launch never crashes unshown.
Args:
emit: A one-line output sink (``_cli._emit``, passed in rather than
imported -- see ``_cli_sync.cmd_sync`` for why).
check: Headless mode -- print and return an exit code, open no window.
demo: Use safe demo mode (local grab + close button) for the window.
Returns:
For ``--check``: 0 if not due, 1 if a lock is due. Otherwise 0.
"""
if check:
due = gate_is_due()
emit("due (a lock is warranted)" if due else "ok (no lock needed)")
return 1 if due else 0
if not demo and not gate_is_due():
emit("ok - no lock needed right now.")
return 0
handle = acquire_gate_lock()
if handle is None:
emit("the gate is already running.")
return 0
try:
# At session start the timer can fire before the X display/auth cookie
# is ready; wait it out so the window opens instead of crashing on a
# "couldn't connect to display" TclError (see _gatelock.wait_for_display).
if not wait_for_display():
emit("display not ready yet; will retry on the next timer tick.")
return 0
MealGate(demo_mode=demo).run()
finally:
release_gate_lock(handle)
return 0

55
diet_guard/_cli_sync.py Normal file
View File

@ -0,0 +1,55 @@
"""CLI handler for the ``sync`` subcommand.
Split out from :mod:`diet_guard._cli` to keep that module under the repo's
500-line cap (see ``CLAUDE.md``'s "feat: split oversized modules" history) --
the same reason the gate window logic lives across ``_gatelock*.py`` instead
of one file.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from diet_guard._sync import SyncError, run_sync
from diet_guard._sync_github import GitHubSyncError
if TYPE_CHECKING:
import argparse
from collections.abc import Callable
def register_sync_subparser(sub: argparse._SubParsersAction) -> None:
"""Register the ``sync`` subcommand on ``sub``."""
sub.add_parser(
"sync",
help="Pull/merge/push the log with other devices via GitHub.",
)
def cmd_sync(emit: Callable[[str], None]) -> int:
"""Run one sync tick and report what happened via ``emit``.
Errors are caught here rather than left to propagate: a sync failure
(missing PAT, network error, repo misconfigured) is routine enough on a
timer-driven command that the CLI should report it and exit non-zero,
not crash with a traceback.
Args:
emit: A one-line output sink (``_cli._emit``, kept private to that
module -- passed in rather than imported, so this module has no
reach-in dependency on ``_cli``'s internals).
Returns:
0 on a successful sync, 1 if it could not run or failed partway.
"""
try:
merged = run_sync()
except SyncError as exc:
emit(f"sync not configured: {exc}")
return 1
except GitHubSyncError as exc:
emit(f"sync failed: {exc}")
return 1
total_entries = sum(len(entries) for entries in merged.values())
emit(f"synced: {total_entries} entries across {len(merged)} day(s).")
return 0

View File

@ -63,3 +63,17 @@ GATE_SLOT_INTERVAL_HOURS: int = 4 # slots at 08:00, 12:00, 16:00, 20:00
GATE_EATING_END_HOUR: int = 22 # exclusive (22:00) GATE_EATING_END_HOUR: int = 22 # exclusive (22:00)
# flock single-instance guard: stops a timer from stacking lock windows. # flock single-instance guard: stops a timer from stacking lock windows.
GATE_LOCK_FILE: Path = DATA_DIR / ".gate.lock" GATE_LOCK_FILE: Path = DATA_DIR / ".gate.lock"
# --- Sync (cross-device log merge) ------------------------------------------
# GitHub is used purely as dumb file storage via the REST Contents API (not a
# git clone) -- mirrors ~/todo's sync transport. Each device pushes its own
# full current log as one file under devices/<id>/food_log.json; merging
# happens client-side (see _sync_merge.py), never via git.
SYNC_REPO_OWNER: str = "kuhyx"
SYNC_REPO_NAME: str = "diet-guard-sync"
SYNC_DEVICE_ID: str = "pc"
# A fine-grained GitHub PAT, scoped to just SYNC_REPO_NAME's contents. The
# user creates this once via github.com (see CLAUDE.md) and saves it here,
# mode 600. Never committed -- this path is outside the repo entirely.
SYNC_TOKEN_FILE: Path = Path.home() / ".config" / "diet_guard" / "sync_token"
SYNC_TIMEOUT_SECONDS: float = 10.0

View File

@ -164,19 +164,22 @@ def remember_meal(name: str, items: Sequence[MealItem]) -> Nutrition:
return total return total
def _upsert( def _apply_upsert(
bank: dict[str, BankRecord],
description: str, description: str,
nutrition: Nutrition, nutrition: Nutrition,
*, *,
components: list[str] | None, components: list[str] | None,
) -> None: ) -> None:
"""Insert or refresh one bank record, bumping its use count. """Insert or refresh one record in ``bank`` in place, bumping its count.
Shared by :func:`remember_food` (a single food) and :func:`remember_meal` Pure (no I/O), so it is shared by the disk-backed :func:`_upsert` and by
(a composite, which additionally records its ``components``). A blank :func:`rebuild_food_bank`, which replays a whole log into a fresh
in-memory bank without a read/write round trip per entry. A blank
description is ignored, so an unnamed entry is never stored. description is ignored, so an unnamed entry is never stored.
Args: Args:
bank: The in-memory bank to update.
description: The food or meal name (its normalized form is the key). description: The food or meal name (its normalized form is the key).
nutrition: The macros to store. nutrition: The macros to store.
components: Component names for a composite meal, or None for a food. components: Component names for a composite meal, or None for a food.
@ -184,7 +187,6 @@ def _upsert(
key = _normalize(description) key = _normalize(description)
if not key: if not key:
return return
bank = _read_bank()
previous = bank.get(key, {}) previous = bank.get(key, {})
count = as_float(previous.get("count")) + 1 count = as_float(previous.get("count")) + 1
record: BankRecord = { record: BankRecord = {
@ -199,9 +201,97 @@ def _upsert(
if components is not None: if components is not None:
record["components"] = list(components) record["components"] = list(components)
bank[key] = record bank[key] = record
def _upsert(
description: str,
nutrition: Nutrition,
*,
components: list[str] | None,
) -> None:
"""Insert or refresh one bank record on disk, bumping its use count.
Shared by :func:`remember_food` (a single food) and :func:`remember_meal`
(a composite, which additionally records its ``components``).
Args:
description: The food or meal name (its normalized form is the key).
nutrition: The macros to store.
components: Component names for a composite meal, or None for a food.
"""
bank = _read_bank()
_apply_upsert(bank, description, nutrition, components=components)
_write_bank(bank) _write_bank(bank)
def _entry_nutrition(entry: dict[str, object], *, source: str) -> Nutrition:
"""Build a :class:`Nutrition` from a raw log entry's macro fields."""
return Nutrition(
kcal=as_float(entry.get("kcal")),
protein_g=as_float(entry.get("protein_g")),
carbs_g=as_float(entry.get("carbs_g")),
fat_g=as_float(entry.get("fat_g")),
grams=as_float(entry.get("grams")),
source=source,
)
def rebuild_food_bank(log: dict[str, list[dict[str, object]]]) -> dict[str, BankRecord]:
"""Rebuild the bank from scratch by replaying ``log``'s entries, then persist it.
Replays in a fixed, device-independent order (by ``time`` then ``id``),
so two devices that converge on the same merged log also converge on the
same bank -- this is what lets the food bank stay *derived*, never
synced, with no counter-merge (CRDT) logic needed for ``count``. Mirrors
the Dart port's ``FoodBankService.rebuild`` exactly, including the
composite-meal branch (banks each component, then the composite itself).
Deleted (tombstoned) entries are skipped entirely, same as
:func:`diet_guard._state.load_log`.
Args:
log: A full log keyed by date, e.g. from
:func:`diet_guard._state.read_raw_log` after a sync merge.
Returns:
The freshly rebuilt bank (also written to disk).
"""
entries = sorted(
(
entry
for day_entries in log.values()
for entry in day_entries
if not entry.get("deleted")
),
key=lambda entry: (str(entry.get("time", "")), str(entry.get("id", ""))),
)
bank: dict[str, BankRecord] = {}
for entry in entries:
components = entry.get("components")
component_names: list[str] | None = None
if isinstance(components, list):
component_names = []
for component in components:
if not isinstance(component, dict):
continue
name = str(component.get("name", ""))
component_names.append(name)
_apply_upsert(
bank,
name,
_entry_nutrition(component, source="food bank"),
components=None,
)
_apply_upsert(
bank,
str(entry.get("desc", "")),
_entry_nutrition(entry, source=str(entry.get("source", "manual"))),
components=component_names,
)
_write_bank(bank)
return bank
def lookup_food(description: str) -> Nutrition | None: def lookup_food(description: str) -> Nutrition | None:
"""Return the exact-match macros for ``description``, or None. """Return the exact-match macros for ``description``, or None.

View File

@ -266,6 +266,51 @@ def consumption_band() -> str:
return "on track" return "on track"
def read_raw_log() -> DayLog:
"""Return the log exactly as stored, including tombstoned/invalid entries.
Public counterpart of :func:`_read_raw_log`, for the sync orchestration
(:mod:`diet_guard._sync`), which must see tombstones to merge them (the
filtered :func:`load_log` drops them) and must not discard an entry that
fails verification just because a phone-origin copy was never signed.
"""
return _read_raw_log()
def write_raw_log(log: DayLog) -> None:
"""Persist ``log`` verbatim, overwriting the file on disk.
Public counterpart of :func:`_write_log`, for :mod:`diet_guard._sync` to
write back a merged log after re-signing it.
"""
_write_log(log)
def resign_entry(entry: dict[str, object]) -> dict[str, object]:
"""Return a copy of ``entry`` with a freshly computed ``hmac``.
Strips any existing signature first, mirroring :func:`undo_last_today`:
a signature computed on another device (or none, if the phone -- which
never holds the shared key -- produced this entry) cannot be trusted
as-is, and recomputing is the only way :func:`_entry_is_valid` will
accept it back on the next read. A no-op (signature-wise) when no HMAC
key is available locally, matching :func:`log_meal`'s degrade-gracefully
behavior.
Args:
entry: A log entry, signed or not.
Returns:
A new dict equal to ``entry`` except for its ``hmac`` field.
"""
resigned = dict(entry)
resigned.pop("hmac", None)
signature = compute_entry_hmac(resigned)
if signature is not None:
resigned["hmac"] = signature
return resigned
def undo_last_today() -> dict[str, object] | None: def undo_last_today() -> dict[str, object] | None:
"""Tombstone today's most recently logged, not-yet-undone entry. """Tombstone today's most recently logged, not-yet-undone entry.

122
diet_guard/_sync.py Normal file
View File

@ -0,0 +1,122 @@
"""Cross-device log sync orchestration for diet_guard.
Pulls every other device's pushed log from GitHub-backed dumb storage
(:mod:`diet_guard._sync_github`), merges with the local log
(:mod:`diet_guard._sync_merge`), re-signs every persisted entry, rebuilds the
food bank, and pushes this device's own merged log back up.
"""
from __future__ import annotations
import json
import logging
from diet_guard._constants import (
SYNC_DEVICE_ID,
SYNC_REPO_NAME,
SYNC_REPO_OWNER,
SYNC_TOKEN_FILE,
)
from diet_guard._foodbank import rebuild_food_bank
from diet_guard._state import DayLog, read_raw_log, resign_entry, write_raw_log
from diet_guard._sync_github import GitHubSyncClient
from diet_guard._sync_merge import merge_logs
_logger = logging.getLogger(__name__)
_DEVICES_DIR = "devices"
class SyncError(Exception):
"""Raised when a sync run cannot even start (no usable PAT)."""
def _device_log_path(device_id: str) -> str:
"""Return the repo-relative path a device's full log is pushed to."""
return f"{_DEVICES_DIR}/{device_id}/food_log.json"
def _read_token() -> str:
"""Return the saved sync PAT, stripped of trailing whitespace.
Raises:
SyncError: If the token file is missing or empty -- the user has not
completed the one-time github.com setup step yet.
"""
if not SYNC_TOKEN_FILE.exists():
message = (
f"no sync token at {SYNC_TOKEN_FILE} -- create a fine-grained "
"GitHub PAT scoped to the diet-guard-sync repo's contents and "
f"save it there (mode 600), then re-run sync"
)
raise SyncError(message)
token = SYNC_TOKEN_FILE.read_text().strip()
if not token:
msg = f"{SYNC_TOKEN_FILE} is empty"
raise SyncError(msg)
return token
def _pull_remote_logs(client: GitHubSyncClient) -> list[DayLog]:
"""Return every other device's last-pushed log, skipping this one.
A device whose pushed file is corrupt or truncated (e.g. an interrupted
push) is logged and skipped, same as one that has never pushed at all --
GitHub is an external system boundary, and one bad device's file must
not stall merging in every other device's.
"""
remote_logs: list[DayLog] = []
for device_id in client.list_directory(_DEVICES_DIR):
if device_id == SYNC_DEVICE_ID:
continue
text = client.get_file_text(_device_log_path(device_id))
if text is None:
continue
try:
remote_log = json.loads(text)
except json.JSONDecodeError:
_logger.warning("Unparsable log pushed by device %r, skipping", device_id)
continue
if isinstance(remote_log, dict):
remote_logs.append(remote_log)
return remote_logs
def run_sync() -> DayLog:
"""Run one full sync tick: pull, merge, re-sign, persist, push.
Every persisted entry is re-signed regardless of origin (not just
phone-origin ones): a signature computed on another device cannot be
trusted as this device's shared key sees it, and an inbound entry with no
signature at all would otherwise be silently dropped on the very next
read by :func:`diet_guard._state.load_log`.
Returns:
The merged log as it now sits on disk locally, post re-sign.
Raises:
SyncError: If the local PAT is missing or empty.
diet_guard._sync_github.GitHubSyncError: Propagated from the GitHub
client for any transport failure -- the caller (CLI/timer)
decides how to report it.
"""
token = _read_token()
client = GitHubSyncClient(SYNC_REPO_OWNER, SYNC_REPO_NAME, token)
merged = read_raw_log()
for remote_log in _pull_remote_logs(client):
merged = merge_logs(merged, remote_log)
resigned: DayLog = {
day: [resign_entry(entry) for entry in entries]
for day, entries in merged.items()
}
write_raw_log(resigned)
rebuild_food_bank(resigned)
client.put_file_text(
_device_log_path(SYNC_DEVICE_ID),
json.dumps(resigned, indent=2),
message="diet_guard sync",
)
return resigned

190
diet_guard/_sync_github.py Normal file
View File

@ -0,0 +1,190 @@
"""Minimal GitHub Contents API client for diet_guard's dumb-storage sync.
GitHub is used purely as file storage via the REST Contents API, not a git
clone -- ported in spirit from ``~/todo``'s sync transport. There is no
working tree and no git-level merge; the only merge is the domain-level one
in :mod:`diet_guard._sync_merge`.
"""
from __future__ import annotations
import base64
import logging
import requests
from diet_guard._constants import SYNC_TIMEOUT_SECONDS
_logger = logging.getLogger(__name__)
_API_BASE = "https://api.github.com"
_HTTP_NOT_FOUND = 404
class GitHubSyncError(Exception):
"""Raised for a GitHub API failure the caller must not silently ignore."""
class RepoNotFoundError(GitHubSyncError):
"""Raised when the configured repo itself is unreachable.
Distinguished from a path-404 (nothing pushed to that path yet, which is
benign -- it just means no other device has synced before) so the caller
can tell "the repo name is wrong or the PAT isn't scoped to it" apart
from "no other device has synced yet".
"""
class GitHubSyncClient:
"""Thin wrapper around the subset of the Contents API sync needs."""
def __init__(self, owner: str, repo: str, token: str) -> None:
"""Create a client scoped to one repo, authenticated with ``token``.
Args:
owner: The repo owner/org (e.g. ``"kuhyx"``).
repo: The repo name (e.g. ``"diet-guard-sync"``).
token: A GitHub PAT with contents read/write on that repo.
"""
self._owner = owner
self._repo = repo
self._headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github+json",
}
def _contents_url(self, path: str) -> str:
return f"{_API_BASE}/repos/{self._owner}/{self._repo}/contents/{path}"
def _get(self, path: str) -> requests.Response:
try:
return requests.get(
self._contents_url(path),
headers=self._headers,
timeout=SYNC_TIMEOUT_SECONDS,
)
except requests.RequestException as exc:
msg = f"network error reading {path}"
raise GitHubSyncError(msg) from exc
def _repo_exists(self) -> bool:
try:
response = requests.get(
f"{_API_BASE}/repos/{self._owner}/{self._repo}",
headers=self._headers,
timeout=SYNC_TIMEOUT_SECONDS,
)
except requests.RequestException:
return False
return response.ok
def _raise_for_missing_path(self, path: str) -> None:
"""Raise :class:`RepoNotFoundError` only if the repo is unreachable.
A 404 on a path within a reachable repo just means nothing has been
pushed there yet, which is not an error worth raising on.
"""
if not self._repo_exists():
msg = (
f"{self._owner}/{self._repo} not found, private without "
f"access, or the token lacks contents permission "
f"(while reading {path})"
)
raise RepoNotFoundError(msg)
def get_file_text(self, path: str) -> str | None:
"""Return the decoded text content at ``path``, or None if unused.
Args:
path: A repo-relative file path, e.g. ``"devices/pc/food_log.json"``.
Returns:
The file's text content, or None if nothing has been pushed
there yet (but the repo itself is reachable).
Raises:
RepoNotFoundError: If the repo itself is unreachable.
GitHubSyncError: For any other non-2xx response or network error.
"""
response = self._get(path)
if response.status_code == _HTTP_NOT_FOUND:
self._raise_for_missing_path(path)
return None
if not response.ok:
msg = f"GET {path} failed: {response.status_code}"
raise GitHubSyncError(msg)
data = response.json()
content = data.get("content", "") if isinstance(data, dict) else ""
return base64.b64decode(content).decode("utf-8")
def _existing_sha(self, path: str) -> str | None:
response = self._get(path)
if response.status_code == _HTTP_NOT_FOUND:
self._raise_for_missing_path(path)
return None
if not response.ok:
msg = f"GET {path} (for sha) failed: {response.status_code}"
raise GitHubSyncError(
msg,
)
data = response.json()
sha = data.get("sha") if isinstance(data, dict) else None
return sha if isinstance(sha, str) else None
def put_file_text(self, path: str, text: str, *, message: str) -> None:
"""Create or update the file at ``path`` with ``text``.
Args:
path: A repo-relative file path.
text: The full new content (this device's complete merged log).
message: The commit message for this push.
Raises:
GitHubSyncError: On any non-2xx response or network error.
"""
sha = self._existing_sha(path)
payload: dict[str, object] = {
"message": message,
"content": base64.b64encode(text.encode("utf-8")).decode("ascii"),
}
if sha is not None:
payload["sha"] = sha
try:
response = requests.put(
self._contents_url(path),
headers=self._headers,
json=payload,
timeout=SYNC_TIMEOUT_SECONDS,
)
except requests.RequestException as exc:
msg = f"network error pushing {path}"
raise GitHubSyncError(msg) from exc
if not response.ok:
msg = f"PUT {path} failed: {response.status_code}"
raise GitHubSyncError(msg)
def list_directory(self, path: str) -> list[str]:
"""Return the entry names directly under ``path`` (empty if unused).
Args:
path: A repo-relative directory path, e.g. ``"devices"``.
Raises:
RepoNotFoundError: If the repo itself is unreachable.
GitHubSyncError: For any other non-2xx response or network error.
"""
response = self._get(path)
if response.status_code == _HTTP_NOT_FOUND:
self._raise_for_missing_path(path)
return []
if not response.ok:
msg = f"GET {path} (list) failed: {response.status_code}"
raise GitHubSyncError(msg)
data = response.json()
if not isinstance(data, list):
return []
return [
item["name"]
for item in data
if isinstance(item, dict) and isinstance(item.get("name"), str)
]

81
diet_guard/_sync_merge.py Normal file
View File

@ -0,0 +1,81 @@
"""Pure log-merge logic for diet_guard's cross-device sync.
No I/O here -- this module is unit-testable purely on in-memory ``DayLog``
values, like :mod:`diet_guard._slots`. Mirrored test-for-test by the Dart
port (``app/lib/services/sync_service.dart``), so the merge algorithm
canonically agrees on both sides of the sync.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from diet_guard._state import DayLog
# A dedup key: ("id", <uuid string>) for any entry with one, else
# ("legacy", (time, desc)) for a pre-id entry written before this field
# existed -- two devices that both already had that same legacy entry would
# otherwise end up with two copies of it after a merge.
_Key = tuple[str, object]
def _entry_key(entry: dict[str, object]) -> _Key:
"""Return the dedup key for ``entry``."""
entry_id = entry.get("id")
if isinstance(entry_id, str) and entry_id:
return ("id", entry_id)
return ("legacy", (entry.get("time"), entry.get("desc")))
def _tombstone_wins(
candidate: dict[str, object],
existing: dict[str, object],
) -> bool:
"""Return True if ``candidate`` should replace ``existing`` for one key.
A tombstone always wins over a non-tombstoned copy of the same entry --
deletion is sticky, so a stale pre-undo copy pulled from another device
can never resurrect something the user explicitly removed. Otherwise,
keep whichever copy was seen first: two copies of the same id are
expected to be byte-identical in their macros/desc (the body is never
mutated after creation, only ``deleted``/``hmac``), so which one survives
does not change the merged result's content.
"""
return bool(candidate.get("deleted")) and not existing.get("deleted")
def merge_logs(local: DayLog, remote: DayLog) -> DayLog:
"""Return the union of ``local`` and ``remote``, tombstones winning by id.
Commutative and idempotent: ``merge_logs(a, b) == merge_logs(b, a)`` and
``merge_logs(x, x) == x`` (for an ``x`` with no duplicate keys), so
pull-order between devices never matters and a repeated sync tick is a
no-op. Each entry is re-bucketed under its own ``time``'s date rather
than the date key it arrived under, so a merge can't silently leave an
entry filed under the wrong day.
Args:
local: This device's current full log (including tombstones).
remote: Another device's last-pushed full log.
Returns:
The merged log, keyed by each entry's own date, each day's entries
sorted oldest-first (matching the existing on-disk convention).
"""
by_key: dict[_Key, dict[str, object]] = {}
for day_log in (local, remote):
for entries in day_log.values():
for entry in entries:
key = _entry_key(entry)
existing = by_key.get(key)
if existing is None or _tombstone_wins(entry, existing):
by_key[key] = entry
merged: DayLog = {}
for entry in by_key.values():
date_key = str(entry.get("time", ""))[:10]
merged.setdefault(date_key, []).append(entry)
for entries in merged.values():
entries.sort(key=lambda entry: str(entry.get("time", "")))
return merged

View File

@ -2,8 +2,9 @@
Three safety nets run for every test: Three safety nets run for every test:
* ``_isolate_state`` redirects the food log, sealed budget, and gate lock into * ``_isolate_state`` redirects the food log, sealed budget, gate lock, and
``tmp_path`` so a test can never read or clobber the real ``~/.local/share``. sync token into ``tmp_path`` so a test can never read or clobber the real
``~/.local/share`` or ``~/.config/diet_guard``.
* ``_block_real_tk`` swaps ``tk`` and the ``GateRoot`` window class inside * ``_block_real_tk`` swaps ``tk`` and the ``GateRoot`` window class inside
``_gatelock`` for mocks, so no test can open a real fullscreen window or grab ``_gatelock`` for mocks, so no test can open a real fullscreen window or grab
the keyboard even if it forgets to. the keyboard even if it forgets to.
@ -61,6 +62,10 @@ def _isolate_state(tmp_path: Path) -> Iterator[None]:
"diet_guard._gatelock.GATE_LOCK_FILE", "diet_guard._gatelock.GATE_LOCK_FILE",
tmp_path / ".gate.lock", tmp_path / ".gate.lock",
), ),
patch(
"diet_guard._sync.SYNC_TOKEN_FILE",
tmp_path / "sync_token",
),
): ):
yield yield

View File

@ -9,7 +9,7 @@ from __future__ import annotations
import io import io
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from unittest.mock import MagicMock, patch from unittest.mock import patch
from diet_guard import _cli from diet_guard import _cli
from diet_guard._budget import ( from diet_guard._budget import (
@ -209,73 +209,26 @@ class TestUndo:
class TestGate: class TestGate:
"""The gate subcommand's three modes.""" """Dispatch wiring for the gate subcommand.
def test_check_due(self, capsys: pytest.CaptureFixture[str]) -> None: cmd_gate()'s own branches are tested directly in test_cli_gate.py,
"""--check exits 1 and announces a due lock.""" where it lives after the 500-line split.
with patch.object(_cli, "gate_is_due", return_value=True): """
assert main(["gate", "--check"]) == 1
assert "due" in capsys.readouterr().out
def test_check_not_due(self) -> None: def test_dispatches_to_cmd_gate(self) -> None:
"""--check exits 0 when no lock is needed.""" with patch.object(_cli, "cmd_gate", return_value=0) as mock_cmd_gate:
with patch.object(_cli, "gate_is_due", return_value=False):
assert main(["gate", "--check"]) == 0
def test_demo_opens_window(self) -> None:
"""--demo always builds and runs the gate window."""
gate = MagicMock()
with (
patch.object(_cli, "MealGate", return_value=gate) as factory,
patch.object(_cli, "acquire_gate_lock", return_value=MagicMock()),
patch.object(_cli, "release_gate_lock"),
patch.object(_cli, "wait_for_display", return_value=True),
):
assert main(["gate", "--demo"]) == 0 assert main(["gate", "--demo"]) == 0
factory.assert_called_once_with(demo_mode=True) mock_cmd_gate.assert_called_once_with(_cli._emit, check=False, demo=True)
gate.run.assert_called_once()
def test_bare_gate_not_due(self, capsys: pytest.CaptureFixture[str]) -> None:
"""A bare gate with nothing due just reports and exits."""
with patch.object(_cli, "gate_is_due", return_value=False):
assert main(["gate"]) == 0
assert "no lock needed" in capsys.readouterr().out
def test_bare_gate_due_opens_window(self) -> None: class TestSync:
"""A bare gate that is due opens the real window.""" """Dispatch wiring for the sync subcommand.
gate = MagicMock()
with (
patch.object(_cli, "gate_is_due", return_value=True),
patch.object(_cli, "MealGate", return_value=gate),
patch.object(_cli, "acquire_gate_lock", return_value=MagicMock()),
patch.object(_cli, "release_gate_lock"),
patch.object(_cli, "wait_for_display", return_value=True),
):
assert main(["gate"]) == 0
gate.run.assert_called_once()
def test_gate_already_running(self, capsys: pytest.CaptureFixture[str]) -> None: cmd_sync()'s own branches (success/SyncError/GitHubSyncError) are tested
"""A held single-instance lock means a second window is not opened.""" directly in test_cli_sync.py, where it lives after the 500-line split.
with ( """
patch.object(_cli, "gate_is_due", return_value=True),
patch.object(_cli, "acquire_gate_lock", return_value=None),
patch.object(_cli, "MealGate") as factory,
):
assert main(["gate"]) == 0
factory.assert_not_called()
assert "already running" in capsys.readouterr().out
def test_gate_due_but_display_not_ready_defers( def test_dispatches_to_cmd_sync(self) -> None:
self, capsys: pytest.CaptureFixture[str] with patch.object(_cli, "cmd_sync", return_value=0) as mock_cmd_sync:
) -> None: assert main(["sync"]) == 0
"""A due gate whose display never comes up defers without a window.""" mock_cmd_sync.assert_called_once_with(_cli._emit)
with (
patch.object(_cli, "gate_is_due", return_value=True),
patch.object(_cli, "acquire_gate_lock", return_value=MagicMock()),
patch.object(_cli, "release_gate_lock"),
patch.object(_cli, "wait_for_display", return_value=False),
patch.object(_cli, "MealGate") as factory,
):
assert main(["gate"]) == 0
factory.assert_not_called()
assert "display not ready" in capsys.readouterr().out

View File

@ -0,0 +1,85 @@
"""Tests for the gate subcommand's handler, split out of test_cli.py
alongside its source module (see _cli_gate.py's module docstring).
"""
from __future__ import annotations
from unittest.mock import MagicMock, patch
from diet_guard import _cli_gate
from diet_guard._cli_gate import cmd_gate
class TestCmdGate:
"""The gate subcommand's three modes."""
def test_check_due(self) -> None:
"""--check exits 1 and announces a due lock."""
lines: list[str] = []
with patch.object(_cli_gate, "gate_is_due", return_value=True):
assert cmd_gate(lines.append, check=True, demo=False) == 1
assert "due" in lines[0]
def test_check_not_due(self) -> None:
"""--check exits 0 when no lock is needed."""
with patch.object(_cli_gate, "gate_is_due", return_value=False):
assert cmd_gate([].append, check=True, demo=False) == 0
def test_demo_opens_window(self) -> None:
"""--demo always builds and runs the gate window."""
gate = MagicMock()
with (
patch.object(_cli_gate, "MealGate", return_value=gate) as factory,
patch.object(_cli_gate, "acquire_gate_lock", return_value=MagicMock()),
patch.object(_cli_gate, "release_gate_lock"),
patch.object(_cli_gate, "wait_for_display", return_value=True),
):
assert cmd_gate([].append, check=False, demo=True) == 0
factory.assert_called_once_with(demo_mode=True)
gate.run.assert_called_once()
def test_bare_gate_not_due(self) -> None:
"""A bare gate with nothing due just reports and exits."""
lines: list[str] = []
with patch.object(_cli_gate, "gate_is_due", return_value=False):
assert cmd_gate(lines.append, check=False, demo=False) == 0
assert "no lock needed" in lines[0]
def test_bare_gate_due_opens_window(self) -> None:
"""A bare gate that is due opens the real window."""
gate = MagicMock()
with (
patch.object(_cli_gate, "gate_is_due", return_value=True),
patch.object(_cli_gate, "MealGate", return_value=gate),
patch.object(_cli_gate, "acquire_gate_lock", return_value=MagicMock()),
patch.object(_cli_gate, "release_gate_lock"),
patch.object(_cli_gate, "wait_for_display", return_value=True),
):
assert cmd_gate([].append, check=False, demo=False) == 0
gate.run.assert_called_once()
def test_gate_already_running(self) -> None:
"""A held single-instance lock means a second window is not opened."""
lines: list[str] = []
with (
patch.object(_cli_gate, "gate_is_due", return_value=True),
patch.object(_cli_gate, "acquire_gate_lock", return_value=None),
patch.object(_cli_gate, "MealGate") as factory,
):
assert cmd_gate(lines.append, check=False, demo=False) == 0
factory.assert_not_called()
assert "already running" in lines[0]
def test_gate_due_but_display_not_ready_defers(self) -> None:
"""A due gate whose display never comes up defers without a window."""
lines: list[str] = []
with (
patch.object(_cli_gate, "gate_is_due", return_value=True),
patch.object(_cli_gate, "acquire_gate_lock", return_value=MagicMock()),
patch.object(_cli_gate, "release_gate_lock"),
patch.object(_cli_gate, "wait_for_display", return_value=False),
patch.object(_cli_gate, "MealGate") as factory,
):
assert cmd_gate(lines.append, check=False, demo=False) == 0
factory.assert_not_called()
assert "display not ready" in lines[0]

View File

@ -0,0 +1,43 @@
"""Tests for the sync subcommand's handler, split out of test_cli.py
alongside its source module (see _cli_sync.py's module docstring).
"""
from __future__ import annotations
from unittest.mock import patch
from diet_guard import _cli_sync
from diet_guard._sync import SyncError
from diet_guard._sync_github import GitHubSyncError
class TestCmdSync:
def test_reports_synced_entry_count(self) -> None:
merged = {
"2026-06-22": [{"id": "a"}, {"id": "b"}],
"2026-06-21": [{"id": "c"}],
}
lines: list[str] = []
with patch.object(_cli_sync, "run_sync", return_value=merged):
assert _cli_sync.cmd_sync(lines.append) == 0
assert lines == ["synced: 3 entries across 2 day(s)."]
def test_reports_sync_error_as_not_configured(self) -> None:
lines: list[str] = []
with patch.object(
_cli_sync,
"run_sync",
side_effect=SyncError("no token"),
):
assert _cli_sync.cmd_sync(lines.append) == 1
assert lines == ["sync not configured: no token"]
def test_reports_github_sync_error_as_failed(self) -> None:
lines: list[str] = []
with patch.object(
_cli_sync,
"run_sync",
side_effect=GitHubSyncError("network down"),
):
assert _cli_sync.cmd_sync(lines.append) == 1
assert lines == ["sync failed: network down"]

View File

@ -188,3 +188,163 @@ class TestCorruptQuarantine:
_foodbank.FOOD_BANK_FILE.write_text("{ broken", encoding="utf-8") _foodbank.FOOD_BANK_FILE.write_text("{ broken", encoding="utf-8")
with patch.object(Path, "rename", side_effect=OSError("locked")): with patch.object(Path, "rename", side_effect=OSError("locked")):
assert _foodbank._read_bank() == {} assert _foodbank._read_bank() == {}
class TestRebuildFoodBank:
"""Replaying a full log into a fresh bank, mirroring the Dart port."""
def test_rebuilds_a_simple_food_entry(self) -> None:
log = {
"2026-06-22": [
{
"id": "a",
"time": "2026-06-22T08:00:00+02:00",
"desc": "toast",
"kcal": 150.0,
"protein_g": 5.0,
"carbs_g": 20.0,
"fat_g": 3.0,
"grams": 50.0,
"source": "manual",
},
],
}
bank = _foodbank.rebuild_food_bank(log)
assert lookup_food("toast") is not None
assert bank["toast"]["count"] == 1
def test_skips_tombstoned_entries(self) -> None:
log = {
"2026-06-22": [
{
"id": "a",
"time": "2026-06-22T08:00:00+02:00",
"desc": "toast",
"kcal": 150.0,
"protein_g": 5.0,
"carbs_g": 20.0,
"fat_g": 3.0,
"grams": 50.0,
"source": "manual",
"deleted": True,
},
],
}
bank = _foodbank.rebuild_food_bank(log)
assert bank == {}
def test_banks_each_component_and_the_composite(self) -> None:
log = {
"2026-06-22": [
{
"id": "a",
"time": "2026-06-22T20:00:00+02:00",
"desc": "dinner",
"kcal": 465.0,
"protein_g": 37.0,
"carbs_g": 66.0,
"fat_g": 5.5,
"grams": 300.0,
"source": "meal",
"components": [
{
"name": "rice",
"kcal": 300.0,
"protein_g": 6.0,
"carbs_g": 66.0,
"fat_g": 1.5,
"grams": 150.0,
},
{
"name": "chicken",
"kcal": 165.0,
"protein_g": 31.0,
"carbs_g": 0.0,
"fat_g": 4.0,
"grams": 150.0,
},
],
},
],
}
_foodbank.rebuild_food_bank(log)
assert lookup_food("rice") is not None
assert lookup_food("chicken") is not None
composite = lookup_food("dinner")
assert composite is not None
assert composite.kcal == 465.0
def test_replays_in_time_then_id_order_so_count_and_latest_macros_agree(
self,
) -> None:
log = {
"2026-06-22": [
{
"id": "b",
"time": "2026-06-22T12:00:00+02:00",
"desc": "toast",
"kcal": 999.0,
"protein_g": 0.0,
"carbs_g": 0.0,
"fat_g": 0.0,
"grams": 0.0,
"source": "manual",
},
{
"id": "a",
"time": "2026-06-22T08:00:00+02:00",
"desc": "toast",
"kcal": 150.0,
"protein_g": 5.0,
"carbs_g": 20.0,
"fat_g": 3.0,
"grams": 50.0,
"source": "manual",
},
],
}
bank = _foodbank.rebuild_food_bank(log)
# Replayed oldest-first (08:00 then 12:00) regardless of list order,
# so the 12:00 entry's macros are the ones that survive.
assert bank["toast"]["kcal"] == 999.0
assert bank["toast"]["count"] == 2
def test_persists_to_disk(self) -> None:
log = {
"2026-06-22": [
{
"id": "a",
"time": "2026-06-22T08:00:00+02:00",
"desc": "toast",
"kcal": 150.0,
"protein_g": 5.0,
"carbs_g": 20.0,
"fat_g": 3.0,
"grams": 50.0,
"source": "manual",
},
],
}
_foodbank.rebuild_food_bank(log)
# A fresh read (not the in-memory return value) must also see it.
assert lookup_food("toast") is not None
def test_ignores_a_non_dict_component(self) -> None:
log = {
"2026-06-22": [
{
"id": "a",
"time": "2026-06-22T08:00:00+02:00",
"desc": "dinner",
"kcal": 100.0,
"protein_g": 1.0,
"carbs_g": 1.0,
"fat_g": 1.0,
"grams": 100.0,
"source": "meal",
"components": ["not-a-dict"],
},
],
}
_foodbank.rebuild_food_bank(log)
assert lookup_food("dinner") is not None

View File

@ -22,11 +22,14 @@ from diet_guard._state import (
log_meal, log_meal,
logged_slots_today, logged_slots_today,
now_local, now_local,
read_raw_log,
remaining_budget, remaining_budget,
resign_entry,
today_entries, today_entries,
today_total_kcal, today_total_kcal,
today_total_macros, today_total_macros,
undo_last_today, undo_last_today,
write_raw_log,
) )
@ -335,3 +338,42 @@ class TestLoadLogSkipsTombstones:
log_meal("a", _nut(100), slot=8) log_meal("a", _nut(100), slot=8)
undo_last_today() undo_last_today()
assert load_log() == {} assert load_log() == {}
class TestRawLogAccess:
"""Public raw read/write, used by the sync orchestration."""
def test_read_raw_log_includes_tombstones(self) -> None:
"""Unlike load_log, read_raw_log keeps a tombstoned entry."""
log_meal("a", _nut(100), slot=8)
undo_last_today()
raw = read_raw_log()
day = next(iter(raw))
assert raw[day][0]["deleted"] is True
def test_write_raw_log_roundtrips(self) -> None:
"""write_raw_log persists exactly what read_raw_log later returns."""
log = {"2026-06-22": [{"id": "x", "time": "2026-06-22T08:00:00+02:00"}]}
write_raw_log(log)
assert read_raw_log() == log
class TestResignEntry:
"""resign_entry recomputes the hmac so a merged entry validates again."""
def test_strips_and_recomputes_signature(self) -> None:
"""A re-signed entry's hmac changes but verifies against the key."""
entry = log_meal("a", _nut(100), slot=8)
tampered = dict(entry, kcal=999.0)
resigned = resign_entry(tampered)
assert resigned["hmac"] != entry["hmac"]
write_raw_log({"2026-06-22": [resigned]})
with patch.object(_state, "_today", return_value="2026-06-22"):
assert today_entries() == [resigned]
def test_no_op_signature_wise_when_no_key_available(self) -> None:
"""Without an HMAC key, resign_entry produces no hmac field."""
entry = log_meal("a", _nut(100), slot=8)
with patch.object(_state, "compute_entry_hmac", return_value=None):
resigned = resign_entry(entry)
assert "hmac" not in resigned

View File

@ -0,0 +1,199 @@
"""Tests for the cross-device sync orchestration.
The GitHub layer is mocked (no network access); conftest.py's
``_isolate_state``/``_hmac_key`` fixtures provide the rest of the isolation
(sync token path, food log path, a deterministic HMAC key).
"""
from __future__ import annotations
import json
from unittest.mock import MagicMock, patch
import pytest
from diet_guard import _sync
from diet_guard._estimator import Nutrition
from diet_guard._foodbank import lookup_food
from diet_guard._state import load_log, log_meal
def _nutrition(kcal: float = 200.0) -> Nutrition:
return Nutrition(
kcal=kcal,
protein_g=10.0,
carbs_g=20.0,
fat_g=5.0,
grams=100.0,
source="manual",
)
def _write_token(token: str = "fake-token") -> None:
_sync.SYNC_TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True)
_sync.SYNC_TOKEN_FILE.write_text(token)
def _mock_client(
*,
devices: tuple[str, ...] = (),
files: dict[str, str] | None = None,
) -> MagicMock:
"""Build a mock ``GitHubSyncClient`` covering the methods sync calls."""
client = MagicMock()
client.list_directory.return_value = list(devices)
resolved_files = files or {}
client.get_file_text.side_effect = lambda path: resolved_files.get(path)
return client
class TestReadToken:
def test_missing_token_file_raises_sync_error(self) -> None:
with pytest.raises(_sync.SyncError):
_sync._read_token()
def test_empty_token_file_raises_sync_error(self) -> None:
_write_token(" ")
with pytest.raises(_sync.SyncError):
_sync._read_token()
def test_present_token_is_read_and_stripped(self) -> None:
_write_token(" abc123 \n")
assert _sync._read_token() == "abc123"
class TestRunSync:
def test_raises_before_touching_github_when_no_token(self) -> None:
with (
patch.object(_sync, "GitHubSyncClient") as client_cls,
pytest.raises(_sync.SyncError),
):
_sync.run_sync()
client_cls.assert_not_called()
def test_pushes_local_log_when_no_other_devices_have_synced(self) -> None:
_write_token()
log_meal("oatmeal", _nutrition(), slot=8)
client = _mock_client(devices=())
with patch.object(_sync, "GitHubSyncClient", return_value=client):
merged = _sync.run_sync()
assert sum(len(entries) for entries in merged.values()) == 1
client.put_file_text.assert_called_once()
pushed_path = client.put_file_text.call_args.args[0]
assert pushed_path == "devices/pc/food_log.json"
def test_skips_its_own_device_id_when_listing(self) -> None:
_write_token()
client = _mock_client(
devices=("pc", "phone"),
files={"devices/phone/food_log.json": "{}"},
)
with patch.object(_sync, "GitHubSyncClient", return_value=client):
_sync.run_sync()
client.get_file_text.assert_called_once_with(
"devices/phone/food_log.json",
)
def test_skips_a_device_with_no_pushed_file_yet(self) -> None:
_write_token()
client = _mock_client(devices=("phone",), files={})
with patch.object(_sync, "GitHubSyncClient", return_value=client):
merged = _sync.run_sync()
assert merged == {}
def test_ignores_a_device_whose_pushed_file_is_not_a_json_object(self) -> None:
_write_token()
client = _mock_client(
devices=("phone",),
files={"devices/phone/food_log.json": "[]"},
)
with patch.object(_sync, "GitHubSyncClient", return_value=client):
merged = _sync.run_sync()
assert merged == {}
def test_skips_a_device_whose_pushed_file_is_corrupt_json(self) -> None:
"""An interrupted/truncated push must not crash every other device's
merge -- it is treated the same as a device that hasn't pushed yet.
"""
_write_token()
client = _mock_client(
devices=("phone",),
files={"devices/phone/food_log.json": "{not valid json"},
)
with patch.object(_sync, "GitHubSyncClient", return_value=client):
merged = _sync.run_sync()
assert merged == {}
def test_merges_in_a_remote_devices_entries(self) -> None:
_write_token()
remote_log_json = json.dumps(
{
"2026-06-22": [
{
"id": "phone-1",
"time": "2026-06-22T09:00:00+02:00",
"desc": "phone meal",
"kcal": 400.0,
"protein_g": 20.0,
"carbs_g": 40.0,
"fat_g": 10.0,
"grams": 300.0,
"source": "manual",
},
],
},
)
client = _mock_client(
devices=("phone",),
files={"devices/phone/food_log.json": remote_log_json},
)
with patch.object(_sync, "GitHubSyncClient", return_value=client):
merged = _sync.run_sync()
descs = {entry["desc"] for entries in merged.values() for entry in entries}
assert "phone meal" in descs
def test_resigns_every_entry_so_an_unsigned_remote_entry_survives_reload(
self,
) -> None:
"""The data-loss trap: an unsigned phone-origin entry must not be
silently dropped by load_log() after sync persists it locally --
_entry_is_valid() rejects any unsigned entry once a key exists.
"""
_write_token()
remote_log_json = json.dumps(
{
"2026-06-22": [
{
"id": "phone-1",
"time": "2026-06-22T09:00:00+02:00",
"desc": "phone meal",
"kcal": 400.0,
"protein_g": 20.0,
"carbs_g": 40.0,
"fat_g": 10.0,
"grams": 300.0,
"source": "manual",
# No "hmac" -- the phone never holds the shared key.
},
],
},
)
client = _mock_client(
devices=("phone",),
files={"devices/phone/food_log.json": remote_log_json},
)
with patch.object(_sync, "GitHubSyncClient", return_value=client):
_sync.run_sync()
reloaded = load_log()
descs = {entry["desc"] for entries in reloaded.values() for entry in entries}
assert "phone meal" in descs
def test_rebuilds_the_food_bank_after_merge(self) -> None:
_write_token()
log_meal("oatmeal", _nutrition(), slot=8)
client = _mock_client(devices=())
with patch.object(_sync, "GitHubSyncClient", return_value=client):
_sync.run_sync()
assert lookup_food("oatmeal") is not None

View File

@ -0,0 +1,197 @@
"""Tests for the GitHub Contents API sync client.
The HTTP layer is fully mocked (``requests.get``/``requests.put``), so every
branch -- success, path-404-but-repo-ok, repo-404, non-2xx, and network
exceptions -- is exercised without any network access, mirroring
``test_estimator.py``'s mocking style.
"""
from __future__ import annotations
import base64
from unittest.mock import MagicMock, patch
import pytest
import requests
from diet_guard import _sync_github
from diet_guard._sync_github import (
GitHubSyncClient,
GitHubSyncError,
RepoNotFoundError,
)
def _response(
status_code: int = 200,
json_data: object = None,
) -> MagicMock:
"""Build a fake ``requests.Response`` with a fixed status and JSON body."""
response = MagicMock()
response.status_code = status_code
response.ok = 200 <= status_code < 300
response.json = MagicMock(return_value=json_data if json_data is not None else {})
return response
def _client() -> GitHubSyncClient:
return GitHubSyncClient("kuhyx", "diet-guard-sync", "fake-token")
def _patch_get(*responses: MagicMock) -> object:
"""Patch ``requests.get`` to return each of ``responses`` in order."""
return patch.object(_sync_github.requests, "get", side_effect=list(responses))
def _patch_get_raises() -> object:
return patch.object(
_sync_github.requests,
"get",
side_effect=requests.ConnectionError("offline"),
)
class TestGetFileText:
def test_returns_decoded_content_on_success(self) -> None:
encoded = base64.b64encode(b"hello world").decode("ascii")
with _patch_get(_response(200, {"content": encoded})):
assert _client().get_file_text("devices/pc/food_log.json") == (
"hello world"
)
def test_returns_none_for_an_unused_path_on_a_real_repo(self) -> None:
with _patch_get(_response(404), _response(200)):
assert _client().get_file_text("devices/phone/food_log.json") is None
def test_raises_repo_not_found_when_the_repo_itself_is_missing(self) -> None:
with (
_patch_get(_response(404), _response(404)),
pytest.raises(
RepoNotFoundError,
),
):
_client().get_file_text("devices/pc/food_log.json")
def test_raises_sync_error_on_a_non_2xx_non_404(self) -> None:
with _patch_get(_response(500)), pytest.raises(GitHubSyncError):
_client().get_file_text("devices/pc/food_log.json")
def test_raises_sync_error_on_a_network_exception(self) -> None:
with _patch_get_raises(), pytest.raises(GitHubSyncError):
_client().get_file_text("devices/pc/food_log.json")
def test_treats_a_network_error_during_the_repo_check_as_repo_missing(
self,
) -> None:
with (
patch.object(
_sync_github.requests,
"get",
side_effect=[_response(404), requests.ConnectionError("offline")],
),
pytest.raises(RepoNotFoundError),
):
_client().get_file_text("devices/pc/food_log.json")
class TestListDirectory:
def test_returns_entry_names(self) -> None:
payload = [{"name": "pc"}, {"name": "phone"}, {"not_a_name": "x"}]
with _patch_get(_response(200, payload)):
assert _client().list_directory("devices") == ["pc", "phone"]
def test_returns_empty_list_when_response_is_not_a_list(self) -> None:
with _patch_get(_response(200, {"unexpected": "shape"})):
assert _client().list_directory("devices") == []
def test_returns_empty_list_for_an_unused_path_on_a_real_repo(self) -> None:
with _patch_get(_response(404), _response(200)):
assert _client().list_directory("devices") == []
def test_raises_repo_not_found_when_the_repo_itself_is_missing(self) -> None:
with (
_patch_get(_response(404), _response(404)),
pytest.raises(
RepoNotFoundError,
),
):
_client().list_directory("devices")
def test_raises_sync_error_on_a_non_2xx_non_404(self) -> None:
with _patch_get(_response(500)), pytest.raises(GitHubSyncError):
_client().list_directory("devices")
class TestPutFileText:
def test_creates_a_new_file_with_no_sha_when_none_existed(self) -> None:
with (
_patch_get(_response(404), _response(200)),
patch.object(
_sync_github.requests,
"put",
return_value=_response(201),
) as put_mock,
):
_client().put_file_text("devices/pc/food_log.json", "{}", message="m")
assert "sha" not in put_mock.call_args.kwargs["json"]
def test_updates_an_existing_file_by_including_its_sha(self) -> None:
with (
_patch_get(_response(200, {"sha": "abc123"})),
patch.object(
_sync_github.requests,
"put",
return_value=_response(200),
) as put_mock,
):
_client().put_file_text("devices/pc/food_log.json", "{}", message="m")
assert put_mock.call_args.kwargs["json"]["sha"] == "abc123"
def test_treats_a_non_string_sha_field_as_absent(self) -> None:
with (
_patch_get(_response(200, {"sha": 12345})),
patch.object(
_sync_github.requests,
"put",
return_value=_response(200),
) as put_mock,
):
_client().put_file_text("devices/pc/food_log.json", "{}", message="m")
assert "sha" not in put_mock.call_args.kwargs["json"]
def test_raises_repo_not_found_when_checking_sha_on_a_missing_repo(self) -> None:
with (
_patch_get(_response(404), _response(404)),
pytest.raises(
RepoNotFoundError,
),
):
_client().put_file_text("devices/pc/food_log.json", "{}", message="m")
def test_raises_sync_error_when_the_sha_check_itself_fails(self) -> None:
with _patch_get(_response(500)), pytest.raises(GitHubSyncError):
_client().put_file_text("devices/pc/food_log.json", "{}", message="m")
def test_raises_sync_error_on_a_put_network_exception(self) -> None:
with (
_patch_get(_response(404), _response(200)),
patch.object(
_sync_github.requests,
"put",
side_effect=requests.ConnectionError("offline"),
),
pytest.raises(GitHubSyncError),
):
_client().put_file_text("devices/pc/food_log.json", "{}", message="m")
def test_raises_sync_error_on_a_put_non_2xx_response(self) -> None:
with (
_patch_get(_response(404), _response(200)),
patch.object(
_sync_github.requests,
"put",
return_value=_response(422),
),
pytest.raises(GitHubSyncError),
):
_client().put_file_text("devices/pc/food_log.json", "{}", message="m")

View File

@ -0,0 +1,109 @@
"""Tests for the pure cross-device log-merge logic."""
from __future__ import annotations
from diet_guard._sync_merge import merge_logs
def _entry(**overrides: object) -> dict[str, object]:
"""Build a minimal valid entry, overriding only what a test cares about."""
entry: dict[str, object] = {
"id": "id-1",
"time": "2026-06-22T08:00:00+02:00",
"desc": "oatmeal",
"kcal": 300.0,
"protein_g": 10.0,
"carbs_g": 50.0,
"fat_g": 5.0,
"grams": 200.0,
"source": "manual",
}
entry.update(overrides)
return entry
class TestUnionById:
def test_disjoint_logs_union_into_one(self) -> None:
a = {"2026-06-22": [_entry(id="a", time="2026-06-22T08:00:00+02:00")]}
b = {"2026-06-22": [_entry(id="b", time="2026-06-22T12:00:00+02:00")]}
merged = merge_logs(a, b)
assert {e["id"] for e in merged["2026-06-22"]} == {"a", "b"}
def test_same_id_in_both_logs_is_not_duplicated(self) -> None:
shared = _entry(id="shared")
merged = merge_logs({"2026-06-22": [shared]}, {"2026-06-22": [shared]})
assert len(merged["2026-06-22"]) == 1
def test_legacy_entries_without_id_dedup_by_time_and_desc(self) -> None:
legacy_a = _entry(id=None, time="2026-06-20T08:00:00+02:00", desc="toast")
legacy_a.pop("id")
legacy_b = dict(legacy_a)
merged = merge_logs({"2026-06-20": [legacy_a]}, {"2026-06-20": [legacy_b]})
assert len(merged["2026-06-20"]) == 1
def test_legacy_and_id_entries_with_different_keys_both_survive(self) -> None:
legacy = _entry(time="2026-06-20T08:00:00+02:00", desc="toast")
legacy.pop("id")
with_id = _entry(id="x", time="2026-06-20T09:00:00+02:00", desc="eggs")
merged = merge_logs({"2026-06-20": [legacy]}, {"2026-06-20": [with_id]})
assert len(merged["2026-06-20"]) == 2
class TestTombstoneWins:
def test_tombstone_beats_a_non_deleted_copy_either_order(self) -> None:
normal = _entry(id="x", deleted=False)
tombstoned = _entry(id="x", deleted=True)
forward = merge_logs(
{"2026-06-22": [normal]},
{"2026-06-22": [tombstoned]},
)
backward = merge_logs(
{"2026-06-22": [tombstoned]},
{"2026-06-22": [normal]},
)
assert forward["2026-06-22"][0]["deleted"] is True
assert backward["2026-06-22"][0]["deleted"] is True
def test_two_tombstoned_copies_stay_tombstoned(self) -> None:
tombstoned = _entry(id="x", deleted=True)
merged = merge_logs(
{"2026-06-22": [tombstoned]},
{"2026-06-22": [dict(tombstoned)]},
)
assert merged["2026-06-22"][0]["deleted"] is True
class TestRebucketingAndOrdering:
def test_entry_is_filed_under_its_own_times_date_not_the_arrival_bucket(
self,
) -> None:
misfiled = _entry(id="x", time="2026-06-21T23:00:00+02:00")
merged = merge_logs({"2026-06-22": [misfiled]}, {})
assert merged == {"2026-06-21": [misfiled]}
def test_a_days_entries_are_sorted_oldest_first(self) -> None:
late = _entry(id="late", time="2026-06-22T20:00:00+02:00")
early = _entry(id="early", time="2026-06-22T08:00:00+02:00")
merged = merge_logs({"2026-06-22": [late]}, {"2026-06-22": [early]})
assert [e["id"] for e in merged["2026-06-22"]] == ["early", "late"]
class TestAlgebraicProperties:
def test_merge_is_commutative(self) -> None:
a = {"2026-06-22": [_entry(id="a")]}
b = {"2026-06-22": [_entry(id="b", time="2026-06-22T09:00:00+02:00")]}
assert merge_logs(a, b) == merge_logs(b, a)
def test_merge_is_idempotent(self) -> None:
canonical = {"2026-06-22": [_entry(id="a")]}
assert merge_logs(canonical, canonical) == canonical
def test_merging_with_an_empty_log_is_a_no_op(self) -> None:
log = {"2026-06-22": [_entry(id="a")]}
assert merge_logs(log, {}) == log
assert merge_logs({}, log) == log
def test_merging_two_empty_logs_is_empty(self) -> None:
assert merge_logs({}, {}) == {}

View File

@ -11,8 +11,11 @@
# a venv, so the package must live where that interpreter can find it — # a venv, so the package must live where that interpreter can find it —
# see CLAUDE.md's "Production dependency installation" section) # see CLAUDE.md's "Production dependency installation" section)
# 3. Installs + enables the systemd user timer that fires the gate every ~30m # 3. Installs + enables the systemd user timer that fires the gate every ~30m
# 4. Seals your daily budget from biometrics (only if not already sealed) # 4. Installs + enables the systemd user timer that syncs the log every ~15m
# 5. Locks the budget file immutable with `chattr +i` (the real tamper gate) # (the sync itself stays unconfigured -- and a no-op -- until you create
# a sync token; see the reminder this step prints)
# 5. Seals your daily budget from biometrics (only if not already sealed)
# 6. Locks the budget file immutable with `chattr +i` (the real tamper gate)
# ============================================================================ # ============================================================================
set -euo pipefail set -euo pipefail
@ -23,14 +26,17 @@ readonly SCRIPT_DIR
readonly REPO_DIR="$SCRIPT_DIR" readonly REPO_DIR="$SCRIPT_DIR"
readonly SERVICE_SRC="$SCRIPT_DIR/diet-guard-gate.service" readonly SERVICE_SRC="$SCRIPT_DIR/diet-guard-gate.service"
readonly TIMER_SRC="$SCRIPT_DIR/diet-guard-gate.timer" readonly TIMER_SRC="$SCRIPT_DIR/diet-guard-gate.timer"
readonly SYNC_SERVICE_SRC="$SCRIPT_DIR/diet-guard-sync.service"
readonly SYNC_TIMER_SRC="$SCRIPT_DIR/diet-guard-sync.timer"
readonly SYSTEMD_USER_DIR="$HOME/.config/systemd/user" readonly SYSTEMD_USER_DIR="$HOME/.config/systemd/user"
readonly DATA_DIR="$HOME/.local/share/diet_guard" readonly DATA_DIR="$HOME/.local/share/diet_guard"
readonly BUDGET_FILE="$DATA_DIR/.budget" readonly BUDGET_FILE="$DATA_DIR/.budget"
readonly SYNC_TOKEN_FILE="$HOME/.config/diet_guard/sync_token"
echo "=== Diet Guard Installer ===" echo "=== Diet Guard Installer ==="
# 1. System dependencies ------------------------------------------------------ # 1. System dependencies ------------------------------------------------------
echo "[1/5] Checking system dependencies..." echo "[1/6] Checking system dependencies..."
if ! command -v setxkbmap &>/dev/null; then if ! command -v setxkbmap &>/dev/null; then
echo " Installing xorg-setxkbmap (gate disables VT switching while locked)..." echo " Installing xorg-setxkbmap (gate disables VT switching while locked)..."
sudo pacman -S --noconfirm xorg-setxkbmap sudo pacman -S --noconfirm xorg-setxkbmap
@ -39,14 +45,14 @@ else
fi fi
# 2. Install this package + its dependencies into system Python ------------- # 2. Install this package + its dependencies into system Python -------------
echo "[2/5] Installing diet_guard + dependencies for /usr/bin/python..." echo "[2/6] Installing diet_guard + dependencies for /usr/bin/python..."
/usr/bin/python3 -m pip install --user --break-system-packages -e "$REPO_DIR" /usr/bin/python3 -m pip install --user --break-system-packages -e "$REPO_DIR"
echo " Installed. Verifying import..." echo " Installed. Verifying import..."
/usr/bin/python3 -c "import diet_guard; import gatelock" \ /usr/bin/python3 -c "import diet_guard; import gatelock" \
&& echo " diet_guard and gatelock import cleanly from the system interpreter." && echo " diet_guard and gatelock import cleanly from the system interpreter."
# 3. systemd user timer + service -------------------------------------------- # 3. systemd user timer + service (gate) -------------------------------------
echo "[3/5] Installing systemd user timer + service..." echo "[3/6] Installing the gate's systemd user timer + service..."
mkdir -p "$SYSTEMD_USER_DIR" mkdir -p "$SYSTEMD_USER_DIR"
cp "$SERVICE_SRC" "$SYSTEMD_USER_DIR/diet-guard-gate.service" cp "$SERVICE_SRC" "$SYSTEMD_USER_DIR/diet-guard-gate.service"
cp "$TIMER_SRC" "$SYSTEMD_USER_DIR/diet-guard-gate.timer" cp "$TIMER_SRC" "$SYSTEMD_USER_DIR/diet-guard-gate.timer"
@ -54,8 +60,24 @@ systemctl --user daemon-reload
systemctl --user enable --now diet-guard-gate.timer systemctl --user enable --now diet-guard-gate.timer
echo " Timer enabled and started (fires the gate every ~30 min)." echo " Timer enabled and started (fires the gate every ~30 min)."
# 4. Seal the daily budget (hidden) ------------------------------------------ # 4. systemd user timer + service (sync) -------------------------------------
echo "[4/5] Sealing your daily budget..." echo "[4/6] Installing the sync's systemd user timer + service..."
cp "$SYNC_SERVICE_SRC" "$SYSTEMD_USER_DIR/diet-guard-sync.service"
cp "$SYNC_TIMER_SRC" "$SYSTEMD_USER_DIR/diet-guard-sync.timer"
systemctl --user daemon-reload
systemctl --user enable --now diet-guard-sync.timer
echo " Timer enabled and started (syncs the log every ~15 min)."
if [[ -e "$SYNC_TOKEN_FILE" ]]; then
echo " Sync token already present at $SYNC_TOKEN_FILE."
else
echo " No sync token yet at $SYNC_TOKEN_FILE -- sync will no-op (and log a"
echo " failure) on every tick until you create a fine-grained GitHub PAT"
echo " scoped to the diet-guard-sync repo's contents and save it there,"
echo " mode 600: chmod 600 \"$SYNC_TOKEN_FILE\""
fi
# 5. Seal the daily budget (hidden) ------------------------------------------
echo "[5/6] Sealing your daily budget..."
if [[ -e "$BUDGET_FILE" ]]; then if [[ -e "$BUDGET_FILE" ]]; then
echo " Budget already sealed at $BUDGET_FILE - skipping init." echo " Budget already sealed at $BUDGET_FILE - skipping init."
else else
@ -63,8 +85,8 @@ else
python -m diet_guard init python -m diet_guard init
fi fi
# 5. Lock the budget immutable (the real tamper friction) -------------------- # 6. Lock the budget immutable (the real tamper friction) --------------------
echo "[5/5] Locking the budget file (chattr +i)..." echo "[6/6] Locking the budget file (chattr +i)..."
read -r attrs _ <<<"$(lsattr -d "$BUDGET_FILE" 2>/dev/null || true)" read -r attrs _ <<<"$(lsattr -d "$BUDGET_FILE" 2>/dev/null || true)"
if [[ "$attrs" == *i* ]]; then if [[ "$attrs" == *i* ]]; then
echo " Already immutable." echo " Already immutable."