"""The user's personal food bank: a local corpus of previously logged foods. Every food the user logs is remembered here with its full macros, keyed by a normalized name. The gate's autocomplete searches *only* this corpus -- never Open Food Facts. OFF (in :mod:`diet_guard._estimator`) is used only to *fill in* the macros of a brand-new food the first time it is entered; from then on the food is served from the bank, so search quality improves with use and works fully offline. Search is intentionally typo-tolerant. Rather than a prefix/exact match, it combines substring containment with :func:`difflib.SequenceMatcher` similarity (stdlib -- no extra dependency), so "chiken breast" still finds "chicken breast". Results are ranked by match quality, then by how often the food has been logged, so your staples float to the top. """ from __future__ import annotations import json import logging import time from typing import TYPE_CHECKING from diet_guard._coerce import as_float from diet_guard._constants import FOOD_BANK_FILE from diet_guard._estimator import Nutrition from diet_guard._fuzzy import match_score from diet_guard._meal import MealItem, meal_total if TYPE_CHECKING: from collections.abc import Sequence _logger = logging.getLogger(__name__) # Below this similarity ratio a non-substring candidate is not a plausible typo # of the query and is dropped. SequenceMatcher's own "close match" default is # 0.6; we reuse it so behavior matches difflib intuitions. _FUZZY_THRESHOLD = 0.6 # Default number of autocomplete suggestions to surface. DEFAULT_SUGGESTIONS = 8 # On-disk shape: {normalized_name: {"desc", "kcal", "protein_g", "carbs_g", # "fat_g", "grams", "count"}}. ``count`` ranks frequently eaten staples first. BankRecord = dict[str, object] def _normalize(description: str) -> str: """Return the lookup key for a description (trimmed, case-folded).""" return description.strip().casefold() def _read_bank() -> dict[str, BankRecord]: """Read the food bank from disk (empty dict on any error). A corrupt or unreadable file is moved aside (see :func:`_quarantine_corrupt_bank`) rather than re-warned about on every call: the gate reads the bank on each keystroke, so a single bad file would otherwise flood the journal and then be silently overwritten by the next write. """ if not FOOD_BANK_FILE.exists(): return {} try: with FOOD_BANK_FILE.open() as handle: data = json.load(handle) except (OSError, json.JSONDecodeError): _quarantine_corrupt_bank() return {} if not isinstance(data, dict): return {} return { key: value for key, value in data.items() if isinstance(key, str) and isinstance(value, dict) } def _quarantine_corrupt_bank() -> None: """Move an unreadable bank aside to a timestamped backup, warning once. Renaming the bad file means the next read finds nothing and returns an empty bank quietly (no per-keystroke warning flood), the next write starts a fresh bank, and the original is preserved for manual recovery instead of being silently overwritten and lost. """ backup = FOOD_BANK_FILE.with_name( f"{FOOD_BANK_FILE.name}.corrupt-{int(time.time())}", ) try: FOOD_BANK_FILE.rename(backup) except OSError: _logger.warning( "Food bank %s is unreadable and cannot be moved", FOOD_BANK_FILE ) return _logger.warning( "Food bank %s was unreadable; moved aside to %s and starting fresh", FOOD_BANK_FILE, backup, ) def _write_bank(bank: dict[str, BankRecord]) -> None: """Persist the food bank to disk, creating the data directory if needed.""" FOOD_BANK_FILE.parent.mkdir(parents=True, exist_ok=True) with FOOD_BANK_FILE.open("w") as handle: json.dump(bank, handle, indent=2, sort_keys=True) def _record_to_nutrition(record: BankRecord) -> Nutrition: """Build a :class:`Nutrition` from a stored bank record. Missing or non-numeric fields default to 0.0 so a hand-edited or partial record can never raise while the user is mid-log. Args: record: A stored food-bank record. Returns: The reconstructed Nutrition (source marked as the food bank). """ return Nutrition( kcal=as_float(record.get("kcal")), protein_g=as_float(record.get("protein_g")), carbs_g=as_float(record.get("carbs_g")), fat_g=as_float(record.get("fat_g")), grams=as_float(record.get("grams")), source="food bank", ) def remember_food(description: str, nutrition: Nutrition) -> None: """Record (or refresh) a food in the bank, bumping its use count. The latest macros win, so correcting a food's calories once fixes every future suggestion. A blank description is ignored. Args: description: The user's free-text food name. nutrition: The macros to store for it. """ _upsert(description, nutrition, components=None) def remember_meal(name: str, items: Sequence[MealItem]) -> Nutrition: """Bank each component and the composite meal, returning the summed macros. Each item is remembered on its own (so it autocompletes next time) and the meal is stored as one entry carrying its summed macros plus its component names, so the whole meal can be re-picked later as a single summed food. A blank meal name still banks the items but stores no empty-keyed composite. Args: name: The composite meal's name (e.g. ``"dinner"``). items: The meal's components, each with its own nutrition. Returns: The summed nutrition for the whole meal. """ for item in items: remember_food(item.name, item.nutrition) total = meal_total(items) _upsert(name, total, components=[item.name for item in items]) return total def _apply_upsert( bank: dict[str, BankRecord], description: str, nutrition: Nutrition, *, components: list[str] | None, ) -> None: """Insert or refresh one record in ``bank`` in place, bumping its count. Pure (no I/O), so it is shared by the disk-backed :func:`_upsert` and by :func:`rebuild_food_bank`, which replays a whole log into a fresh in-memory bank without a read/write round trip per entry. A blank description is ignored, so an unnamed entry is never stored. Args: bank: The in-memory bank to update. description: The food or meal name (its normalized form is the key). nutrition: The macros to store. components: Component names for a composite meal, or None for a food. """ key = _normalize(description) if not key: return previous = bank.get(key, {}) count = as_float(previous.get("count")) + 1 record: BankRecord = { "desc": description.strip(), "kcal": nutrition.kcal, "protein_g": nutrition.protein_g, "carbs_g": nutrition.carbs_g, "fat_g": nutrition.fat_g, "grams": nutrition.grams, "count": count, } if components is not None: record["components"] = list(components) bank[key] = record def _upsert( description: str, nutrition: Nutrition, *, components: list[str] | None, ) -> None: """Insert or refresh one bank record on disk, bumping its use count. Shared by :func:`remember_food` (a single food) and :func:`remember_meal` (a composite, which additionally records its ``components``). Args: description: The food or meal name (its normalized form is the key). nutrition: The macros to store. components: Component names for a composite meal, or None for a food. """ bank = _read_bank() _apply_upsert(bank, description, nutrition, components=components) _write_bank(bank) def _entry_nutrition(entry: dict[str, object], *, source: str) -> Nutrition: """Build a :class:`Nutrition` from a raw log entry's macro fields.""" return Nutrition( kcal=as_float(entry.get("kcal")), protein_g=as_float(entry.get("protein_g")), carbs_g=as_float(entry.get("carbs_g")), fat_g=as_float(entry.get("fat_g")), grams=as_float(entry.get("grams")), source=source, ) def rebuild_food_bank(log: dict[str, list[dict[str, object]]]) -> dict[str, BankRecord]: """Rebuild the bank from scratch by replaying ``log``'s entries, then persist it. Replays in a fixed, device-independent order (by ``time`` then ``id``), so two devices that converge on the same merged log also converge on the same bank -- this is what lets the food bank stay *derived*, never synced, with no counter-merge (CRDT) logic needed for ``count``. Mirrors the Dart port's ``FoodBankService.rebuild`` exactly, including the composite-meal branch (banks each component, then the composite itself). Deleted (tombstoned) entries are skipped entirely, same as :func:`diet_guard._state.load_log`. Args: log: A full log keyed by date, e.g. from :func:`diet_guard._state.read_raw_log` after a sync merge. Returns: The freshly rebuilt bank (also written to disk). """ entries = sorted( ( entry for day_entries in log.values() for entry in day_entries if not entry.get("deleted") ), key=lambda entry: (str(entry.get("time", "")), str(entry.get("id", ""))), ) bank: dict[str, BankRecord] = {} for entry in entries: components = entry.get("components") component_names: list[str] | None = None if isinstance(components, list): component_names = [] for component in components: if not isinstance(component, dict): continue name = str(component.get("name", "")) component_names.append(name) _apply_upsert( bank, name, _entry_nutrition(component, source="food bank"), components=None, ) _apply_upsert( bank, str(entry.get("desc", "")), _entry_nutrition(entry, source=str(entry.get("source", "manual"))), components=component_names, ) _write_bank(bank) return bank def lookup_food(description: str) -> Nutrition | None: """Return the exact-match macros for ``description``, or None. Args: description: The food name to look up verbatim (case-insensitive). Returns: The stored Nutrition, or None if the food is not banked. """ record = _read_bank().get(_normalize(description)) return _record_to_nutrition(record) if record is not None else None def _display_name(record: BankRecord, key: str) -> str: """Return a record's display name, falling back to its key.""" desc = record.get("desc") return desc if isinstance(desc, str) and desc.strip() else key def search_foods( query: str, limit: int = DEFAULT_SUGGESTIONS, ) -> list[tuple[str, Nutrition]]: """Return banked foods matching ``query``, best match first. An empty query returns the most-logged foods (the expandable full list). A non-empty query keeps substring and close-typo matches, ranked by match quality then by use count. Args: query: Free-text the user has typed so far. limit: Maximum number of suggestions to return. Returns: ``(display_name, Nutrition)`` pairs, ranked, at most ``limit`` long. """ bank = _read_bank() normalized = _normalize(query) if not normalized: return _ranked_all(bank, limit) scored: list[tuple[float, float, str, Nutrition]] = [] for key, record in bank.items(): score = match_score(normalized, key) if score < _FUZZY_THRESHOLD: continue count = as_float(record.get("count")) scored.append( (score, count, _display_name(record, key), _record_to_nutrition(record)), ) # Sort by score then frequency, both descending. scored.sort(key=lambda item: (item[0], item[1]), reverse=True) return [(name, nutrition) for _, _, name, nutrition in scored[:limit]] def _ranked_all( bank: dict[str, BankRecord], limit: int, ) -> list[tuple[str, Nutrition]]: """Return all banked foods ranked by use count, most-logged first.""" ranked = sorted( bank.items(), key=lambda item: as_float(item[1].get("count")), reverse=True, ) return [ (_display_name(record, key), _record_to_nutrition(record)) for key, record in ranked[:limit] ]