"""Era rollups: per-month "what was happening" digests (consolidation step 3). Groups session gists by the calendar month the session occurred (from real exchange timestamps) and map-reduces each month into one digest. These are the temporal memory tier — they answer "what was going on last December" and feed the narrative engine. Runs on the consolidation backend (MI50 in steady state). """ from __future__ import annotations from lyra import config, llm, logbus, memory from lyra.llm import Backend, Message BATCH_CHARS = 18000 _PROMPT = """You are writing a monthly memory digest about Brian from the session \ summaries below (all from the same month). Capture: what he was focused on (poker \ and otherwise), notable events/results/decisions, recurring themes, and his mood \ and arc across the month. Third person, referring to him as "Brian". 5-10 \ sentences. This is a memory record, not a reply. No preamble.""" _MERGE_PROMPT = """Merge these partial monthly digests (same month) into one \ coherent digest about Brian for that month. Keep it tight, 5-10 sentences, no \ repetition. Third person.""" def _batch_texts(texts: list[str], budget: int) -> list[str]: blocks, buf, size = [], [], 0 for t in texts: if size + len(t) > budget and buf: blocks.append("\n\n".join(buf)) buf, size = [], 0 buf.append(t) size += len(t) if buf: blocks.append("\n\n".join(buf)) return blocks def _call(prompt: str, body: str, backend: Backend) -> str: messages: list[Message] = [ {"role": "system", "content": prompt}, {"role": "user", "content": body}, ] return llm.complete(messages, backend=backend) def _digest_month(gists: list[str], backend: Backend) -> str: """Map-reduce a month's session gists into one digest.""" blocks = _batch_texts(gists, BATCH_CHARS) partials = [_call(_PROMPT, b, backend) for b in blocks] while len(partials) > 1: partials = [_call(_MERGE_PROMPT, g, backend) for g in _batch_texts(partials, BATCH_CHARS)] return partials[0] def rebuild_eras(backend: Backend | None = None, force: bool = False) -> dict: """Build a digest per month, but only for months whose session count changed since the last build — old months don't change, so re-digesting them every consolidation pass was pure wasted LLM work (and MI50 heat). `force=True` rebuilds everything.""" backend = backend or config.load().summary_backend by_month = memory.summaries_by_month() have = {e.month: e.session_count for e in memory.list_eras()} built = skipped = 0 for month in sorted(by_month): n = len(by_month[month]) if not force and have.get(month) == n: skipped += 1 continue # unchanged month — keep its existing digest digest = _digest_month(by_month[month], backend) memory.store_era(month, digest, n) built += 1 logbus.log("info", "era built", month=month, sessions=n) report = {"built": built, "skipped": skipped, "months": built + skipped} logbus.log("info", "eras complete", **report) return report def main() -> int: report = rebuild_eras() if not report["months"]: print("No summaries yet — run lyra-summarize first.") return 1 for era in memory.list_eras(): print(f"\n## {era.month} ({era.session_count} sessions)\n{era.content}") return 0 if __name__ == "__main__": raise SystemExit(main())