feat: era-rollup + narrative engine (consolidation steps 3-4)
Complete the consolidation pipeline: summaries -> profile + eras -> narrative.
- memory: eras table (per-month digests) + Era, summaries_by_month, store_era,
list_eras, recall_eras; narrative table + set/get_narrative
- lyra/era.py (lyra-era): groups session gists by the month the session occurred
(real timestamps) and map-reduces each month into a "what was happening" digest
- lyra/narrative.py (lyra-narrative): distills profile + recent eras into the
current arc/trends/callbacks ("remember when…", "you're trending toward…")
- chat.build_messages injects the narrative alongside the profile
Verified on the real corpus: 17 monthly eras (Dec 2024-Jun 2026) + a narrative
that surfaces specific callbacks (the $573 Hollywood session, 4 years sober).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -47,6 +47,13 @@ def build_messages(session_id: str, user_msg: str) -> list[Message]:
|
||||
{"role": "system", "content": "What you know about Brian:\n" + profile}
|
||||
)
|
||||
|
||||
# Time-aware memory: the current narrative (recent arc, trends, callbacks).
|
||||
narrative = memory.get_narrative()
|
||||
if narrative:
|
||||
messages.append(
|
||||
{"role": "system", "content": "What's going on with Brian lately:\n" + narrative}
|
||||
)
|
||||
|
||||
recent = memory.recent(session_id, n=RECENT_N)
|
||||
recent_ids = {ex.id for ex in recent}
|
||||
|
||||
|
||||
+83
@@ -0,0 +1,83 @@
|
||||
"""Era rollups: per-month "what was happening" digests (consolidation step 3).
|
||||
|
||||
Groups session gists by the calendar month the session occurred (from real
|
||||
exchange timestamps) and map-reduces each month into one digest. These are the
|
||||
temporal memory tier — they answer "what was going on last December" and feed
|
||||
the narrative engine. Runs on the consolidation backend (MI50 in steady state).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
from lyra import config, llm, logbus, memory
|
||||
from lyra.llm import Backend, Message
|
||||
|
||||
BATCH_CHARS = 18000
|
||||
|
||||
_PROMPT = """You are writing a monthly memory digest about Brian from the session \
|
||||
summaries below (all from the same month). Capture: what he was focused on (poker \
|
||||
and otherwise), notable events/results/decisions, recurring themes, and his mood \
|
||||
and arc across the month. Third person, referring to him as "Brian". 5-10 \
|
||||
sentences. This is a memory record, not a reply. No preamble."""
|
||||
|
||||
_MERGE_PROMPT = """Merge these partial monthly digests (same month) into one \
|
||||
coherent digest about Brian for that month. Keep it tight, 5-10 sentences, no \
|
||||
repetition. Third person."""
|
||||
|
||||
|
||||
def _batch_texts(texts: list[str], budget: int) -> list[str]:
|
||||
blocks, buf, size = [], [], 0
|
||||
for t in texts:
|
||||
if size + len(t) > budget and buf:
|
||||
blocks.append("\n\n".join(buf))
|
||||
buf, size = [], 0
|
||||
buf.append(t)
|
||||
size += len(t)
|
||||
if buf:
|
||||
blocks.append("\n\n".join(buf))
|
||||
return blocks
|
||||
|
||||
|
||||
def _call(prompt: str, body: str, backend: Backend) -> str:
|
||||
messages: list[Message] = [
|
||||
{"role": "system", "content": prompt},
|
||||
{"role": "user", "content": body},
|
||||
]
|
||||
return llm.complete(messages, backend=backend)
|
||||
|
||||
|
||||
def _digest_month(gists: list[str], backend: Backend) -> str:
|
||||
"""Map-reduce a month's session gists into one digest."""
|
||||
blocks = _batch_texts(gists, BATCH_CHARS)
|
||||
partials = [_call(_PROMPT, b, backend) for b in blocks]
|
||||
while len(partials) > 1:
|
||||
partials = [_call(_MERGE_PROMPT, g, backend) for g in _batch_texts(partials, BATCH_CHARS)]
|
||||
return partials[0]
|
||||
|
||||
|
||||
def rebuild_eras(backend: Backend | None = None) -> dict:
|
||||
"""(Re)build a digest for every month that has session gists."""
|
||||
backend = backend or config.load().summary_backend
|
||||
by_month = memory.summaries_by_month()
|
||||
months = 0
|
||||
for month in sorted(by_month):
|
||||
digest = _digest_month(by_month[month], backend)
|
||||
memory.store_era(month, digest, len(by_month[month]))
|
||||
months += 1
|
||||
logbus.log("info", "era built", month=month, sessions=len(by_month[month]))
|
||||
report = {"months": months}
|
||||
logbus.log("info", "eras complete", **report)
|
||||
return report
|
||||
|
||||
|
||||
def main() -> int:
|
||||
report = rebuild_eras()
|
||||
if not report["months"]:
|
||||
print("No summaries yet — run lyra-summarize first.")
|
||||
return 1
|
||||
for era in memory.list_eras():
|
||||
print(f"\n## {era.month} ({era.session_count} sessions)\n{era.content}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
+119
@@ -52,6 +52,24 @@ CREATE TABLE IF NOT EXISTS profile (
|
||||
sessions_covered INTEGER NOT NULL,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
|
||||
-- Temporal memory: one "what was happening" digest per calendar month, rolled
|
||||
-- up from that month's session gists. month is "YYYY-MM".
|
||||
CREATE TABLE IF NOT EXISTS eras (
|
||||
month TEXT PRIMARY KEY,
|
||||
content TEXT NOT NULL,
|
||||
embedding BLOB NOT NULL,
|
||||
session_count INTEGER NOT NULL,
|
||||
created_at TEXT NOT NULL
|
||||
);
|
||||
|
||||
-- The current narrative: time-aware arc/trends/callbacks (vs the timeless
|
||||
-- profile). Distilled from profile + recent eras. Single row (id='current').
|
||||
CREATE TABLE IF NOT EXISTS narrative (
|
||||
id TEXT PRIMARY KEY,
|
||||
content TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
"""
|
||||
|
||||
_conn: sqlite3.Connection | None = None
|
||||
@@ -95,6 +113,15 @@ class Summary:
|
||||
score: float | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class Era:
|
||||
month: str # "YYYY-MM"
|
||||
content: str
|
||||
session_count: int
|
||||
created_at: str
|
||||
score: float | None = None
|
||||
|
||||
|
||||
def _to_blob(vec: list[float]) -> bytes:
|
||||
return np.asarray(vec, dtype=np.float32).tobytes()
|
||||
|
||||
@@ -337,6 +364,98 @@ def get_profile(profile_id: str = "self") -> str | None:
|
||||
return r["content"] if r else None
|
||||
|
||||
|
||||
# --- Era tier (per-month temporal rollups) ---
|
||||
|
||||
|
||||
def summaries_by_month() -> dict[str, list[str]]:
|
||||
"""Map "YYYY-MM" -> list of session gists for sessions that occurred that month.
|
||||
|
||||
A session's month comes from its earliest exchange timestamp (real ChatGPT
|
||||
dates for imported sessions), not when it was summarized.
|
||||
"""
|
||||
conn = _connection()
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT substr(MIN(e.created_at), 1, 7) AS month, s.content AS content
|
||||
FROM summaries s JOIN exchanges e ON e.session_id = s.session_id
|
||||
GROUP BY s.session_id
|
||||
"""
|
||||
).fetchall()
|
||||
out: dict[str, list[str]] = {}
|
||||
for r in rows:
|
||||
out.setdefault(r["month"], []).append(r["content"])
|
||||
return out
|
||||
|
||||
|
||||
def store_era(month: str, content: str, session_count: int) -> None:
|
||||
"""Embed and persist a month's digest, replacing any prior one."""
|
||||
[embedding] = llm.embed([content])
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
conn = _connection()
|
||||
with conn:
|
||||
conn.execute(
|
||||
"INSERT INTO eras (month, content, embedding, session_count, created_at) "
|
||||
"VALUES (?, ?, ?, ?, ?) "
|
||||
"ON CONFLICT(month) DO UPDATE SET content=excluded.content, "
|
||||
"embedding=excluded.embedding, session_count=excluded.session_count, "
|
||||
"created_at=excluded.created_at",
|
||||
(month, content, _to_blob(embedding), session_count, now),
|
||||
)
|
||||
|
||||
|
||||
def list_eras() -> list[Era]:
|
||||
"""All month digests, chronological."""
|
||||
conn = _connection()
|
||||
rows = conn.execute(
|
||||
"SELECT month, content, session_count, created_at FROM eras ORDER BY month ASC"
|
||||
).fetchall()
|
||||
return [
|
||||
Era(month=r["month"], content=r["content"],
|
||||
session_count=r["session_count"], created_at=r["created_at"])
|
||||
for r in rows
|
||||
]
|
||||
|
||||
|
||||
def set_narrative(content: str, narrative_id: str = "current") -> None:
|
||||
"""Store/replace the current narrative."""
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
conn = _connection()
|
||||
with conn:
|
||||
conn.execute(
|
||||
"INSERT INTO narrative (id, content, updated_at) VALUES (?, ?, ?) "
|
||||
"ON CONFLICT(id) DO UPDATE SET content=excluded.content, updated_at=excluded.updated_at",
|
||||
(narrative_id, content, now),
|
||||
)
|
||||
|
||||
|
||||
def get_narrative(narrative_id: str = "current") -> str | None:
|
||||
conn = _connection()
|
||||
r = conn.execute("SELECT content FROM narrative WHERE id = ?", (narrative_id,)).fetchone()
|
||||
return r["content"] if r else None
|
||||
|
||||
|
||||
def recall_eras(query: str, k: int = 2) -> list[Era]:
|
||||
"""Top-k month digests most similar to `query` (time-based context)."""
|
||||
[q_vec] = llm.embed([query])
|
||||
q = np.asarray(q_vec, dtype=np.float32)
|
||||
conn = _connection()
|
||||
rows = conn.execute(
|
||||
"SELECT month, content, embedding, session_count, created_at FROM eras"
|
||||
).fetchall()
|
||||
if not rows:
|
||||
return []
|
||||
matrix = np.stack([_from_blob(r["embedding"]) for r in rows])
|
||||
norms = np.linalg.norm(matrix, axis=1)
|
||||
scores = (matrix @ q) / (norms * np.linalg.norm(q) + 1e-9)
|
||||
top_idx = np.argsort(scores)[::-1][:k]
|
||||
return [
|
||||
Era(month=rows[i]["month"], content=rows[i]["content"],
|
||||
session_count=rows[i]["session_count"], created_at=rows[i]["created_at"],
|
||||
score=float(scores[i]))
|
||||
for i in top_idx
|
||||
]
|
||||
|
||||
|
||||
def recall_summaries(query: str, k: int = 3, exclude_session: str | None = None) -> list[Summary]:
|
||||
"""Top-k session summaries most similar to `query` (the long-term gist tier)."""
|
||||
[q_vec] = llm.embed([query])
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
"""Narrative engine (consolidation step 4): the current arc, trends, callbacks.
|
||||
|
||||
Where the profile is timeless ("who Brian is"), the narrative is time-aware
|
||||
("what's going on lately, where things are trending"). It distills the profile
|
||||
plus the most recent monthly era digests into the current story — recent focus,
|
||||
notable trends or changes, mood/arc, and a few specific callbacks worth
|
||||
referencing. Injected into chat so Lyra follows along like a friend who's been
|
||||
paying attention. Runs on the consolidation backend (MI50 in steady state).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
from lyra import config, llm, logbus, memory
|
||||
from lyra.llm import Backend, Message
|
||||
|
||||
RECENT_ERAS = 4
|
||||
|
||||
_PROMPT = """You are distilling the CURRENT narrative about Brian — what a close \
|
||||
friend who has been following along would keep in mind right now. From his profile \
|
||||
and recent monthly digests below, write: what he's been focused on lately, any \
|
||||
notable trends or changes (improving, slipping, new patterns), his current arc and \
|
||||
mood, and 2-4 specific things worth referencing back to him ("remember when…"). \
|
||||
Third person, referring to him as "Brian". 6-10 sentences. This is a memory note, \
|
||||
not a reply. No preamble."""
|
||||
|
||||
|
||||
def rebuild_narrative(backend: Backend | None = None) -> str | None:
|
||||
"""(Re)derive the current narrative from the profile + recent era digests."""
|
||||
backend = backend or config.load().summary_backend
|
||||
profile = memory.get_profile()
|
||||
eras = memory.list_eras()
|
||||
if not profile and not eras:
|
||||
return None
|
||||
|
||||
parts = []
|
||||
if profile:
|
||||
parts.append("PROFILE (timeless):\n" + profile)
|
||||
recent = eras[-RECENT_ERAS:]
|
||||
if recent:
|
||||
parts.append(
|
||||
"RECENT MONTHS (oldest first):\n"
|
||||
+ "\n\n".join(f"[{e.month}]\n{e.content}" for e in recent)
|
||||
)
|
||||
body = "\n\n".join(parts)
|
||||
|
||||
messages: list[Message] = [
|
||||
{"role": "system", "content": _PROMPT},
|
||||
{"role": "user", "content": body},
|
||||
]
|
||||
narrative = llm.complete(messages, backend=backend)
|
||||
memory.set_narrative(narrative)
|
||||
logbus.log("info", "narrative rebuilt", chars=len(narrative), eras=len(recent))
|
||||
return narrative
|
||||
|
||||
|
||||
def main() -> int:
|
||||
narrative = rebuild_narrative()
|
||||
if narrative is None:
|
||||
print("Need a profile and/or eras first — run lyra-profile and lyra-era.")
|
||||
return 1
|
||||
print(narrative)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -19,6 +19,8 @@ lyra-web = "lyra.web.server:serve"
|
||||
lyra-import = "lyra.ingest:main"
|
||||
lyra-summarize = "lyra.summary:main"
|
||||
lyra-profile = "lyra.profile:main"
|
||||
lyra-era = "lyra.era:main"
|
||||
lyra-narrative = "lyra.narrative:main"
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
|
||||
Reference in New Issue
Block a user