feat: summarize-all batch (consolidation step 1)
Harden summarize_session to chunk + merge long sessions (imported convos can exceed the local model's context), and add summarize_all: idempotent, resumable batch that summarizes every session needing it (skips up-to-date ones), with progress logged to the live log. `lyra-summarize [limit]` CLI. This is the first consolidation stage feeding the profile (semantic memory) and era-rollup tiers. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+75
-19
@@ -1,17 +1,23 @@
|
|||||||
"""Session summarization: compact a session's raw exchanges into a stored gist.
|
"""Session summarization: compact a session's raw exchanges into a stored gist.
|
||||||
|
|
||||||
This is the compaction half of the tiered memory. Raw exchanges stay for detail
|
This is the first consolidation stage. Raw exchanges stay for detail recall; the
|
||||||
recall; the summary is what surfaces when an *older* session is recalled later —
|
summary is what surfaces when an *older* session is recalled, and it's the input
|
||||||
"a month ago is a general idea," per the design.
|
to the profile (semantic memory) and era-rollup tiers.
|
||||||
|
|
||||||
|
Long sessions are summarized in chunks, then the partial gists are merged, so a
|
||||||
|
big imported conversation doesn't blow the local model's context window.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from lyra import config, llm, logbus, memory
|
import sys
|
||||||
from lyra.llm import Backend
|
|
||||||
|
|
||||||
# Re-summarize a session once it has accumulated this many new raw exchanges
|
from lyra import config, llm, logbus, memory
|
||||||
# beyond what its current summary covers.
|
from lyra.llm import Backend, Message
|
||||||
|
|
||||||
|
# Re-summarize a session once it has accumulated this many new raw exchanges.
|
||||||
SUMMARIZE_AFTER = 20
|
SUMMARIZE_AFTER = 20
|
||||||
|
# Transcript budget per LLM call; longer sessions are chunked + merged.
|
||||||
|
MAX_TRANSCRIPT_CHARS = 24000
|
||||||
|
|
||||||
_PROMPT = """You are compacting a conversation into a long-term memory record \
|
_PROMPT = """You are compacting a conversation into a long-term memory record \
|
||||||
(not replying to anyone). Write a concise gist of the session below: what was \
|
(not replying to anyone). Write a concise gist of the session below: what was \
|
||||||
@@ -24,25 +30,43 @@ def _transcript(exchanges: list[memory.Exchange]) -> str:
|
|||||||
return "\n".join(f"{ex.role}: {ex.content}" for ex in exchanges)
|
return "\n".join(f"{ex.role}: {ex.content}" for ex in exchanges)
|
||||||
|
|
||||||
|
|
||||||
def summarize_session(session_id: str, backend: Backend | None = None) -> str | None:
|
def _chunk(text: str, budget: int) -> list[str]:
|
||||||
"""(Re)generate and store the gist for a session. Returns the summary text.
|
"""Split on line boundaries into pieces under `budget` chars."""
|
||||||
|
chunks, buf, size = [], [], 0
|
||||||
|
for line in text.splitlines(keepends=True):
|
||||||
|
if size + len(line) > budget and buf:
|
||||||
|
chunks.append("".join(buf))
|
||||||
|
buf, size = [], 0
|
||||||
|
buf.append(line)
|
||||||
|
size += len(line)
|
||||||
|
if buf:
|
||||||
|
chunks.append("".join(buf))
|
||||||
|
return chunks
|
||||||
|
|
||||||
Returns None if the session has no exchanges. The summarizer defaults to the
|
|
||||||
local backend so routine compaction stays free.
|
def _summarize_text(text: str, backend: Backend) -> str:
|
||||||
"""
|
messages: list[Message] = [
|
||||||
|
{"role": "system", "content": _PROMPT},
|
||||||
|
{"role": "user", "content": text},
|
||||||
|
]
|
||||||
|
return llm.complete(messages, backend=backend)
|
||||||
|
|
||||||
|
|
||||||
|
def summarize_session(session_id: str, backend: Backend | None = None) -> str | None:
|
||||||
|
"""(Re)generate and store the gist for a session. Returns the summary text."""
|
||||||
exchanges = memory.history(session_id)
|
exchanges = memory.history(session_id)
|
||||||
if not exchanges:
|
if not exchanges:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
backend = backend or config.load().summary_backend
|
backend = backend or config.load().summary_backend
|
||||||
messages = [
|
transcript = _transcript(exchanges)
|
||||||
{"role": "system", "content": _PROMPT},
|
if len(transcript) <= MAX_TRANSCRIPT_CHARS:
|
||||||
{"role": "user", "content": _transcript(exchanges)},
|
gist = _summarize_text(transcript, backend)
|
||||||
]
|
else:
|
||||||
gist = llm.complete(messages, backend=backend)
|
partials = [_summarize_text(c, backend) for c in _chunk(transcript, MAX_TRANSCRIPT_CHARS)]
|
||||||
|
gist = _summarize_text("Partial summaries to merge:\n\n" + "\n\n".join(partials), backend)
|
||||||
|
|
||||||
last_id = exchanges[-1].id
|
memory.store_summary(session_id, gist, exchanges[-1].id)
|
||||||
memory.store_summary(session_id, gist, last_id)
|
|
||||||
logbus.log(
|
logbus.log(
|
||||||
"info", "summarized session", session=session_id,
|
"info", "summarized session", session=session_id,
|
||||||
exchanges=len(exchanges), backend=backend,
|
exchanges=len(exchanges), backend=backend,
|
||||||
@@ -54,3 +78,35 @@ def maybe_summarize(session_id: str, backend: Backend | None = None) -> None:
|
|||||||
"""Summarize the session if enough new turns have accumulated since last time."""
|
"""Summarize the session if enough new turns have accumulated since last time."""
|
||||||
if memory.unsummarized_count(session_id) >= SUMMARIZE_AFTER:
|
if memory.unsummarized_count(session_id) >= SUMMARIZE_AFTER:
|
||||||
summarize_session(session_id, backend=backend)
|
summarize_session(session_id, backend=backend)
|
||||||
|
|
||||||
|
|
||||||
|
def summarize_all(backend: Backend | None = None, limit: int | None = None) -> dict:
|
||||||
|
"""Summarize every session that needs it. Idempotent and resumable: sessions
|
||||||
|
with an up-to-date summary are skipped, so re-running continues where it left off.
|
||||||
|
"""
|
||||||
|
sessions = memory.list_sessions()
|
||||||
|
done, skipped = 0, 0
|
||||||
|
for s in sessions:
|
||||||
|
sid = s["id"]
|
||||||
|
if memory.get_summary(sid) and memory.unsummarized_count(sid) == 0:
|
||||||
|
skipped += 1
|
||||||
|
continue
|
||||||
|
summarize_session(sid, backend=backend)
|
||||||
|
done += 1
|
||||||
|
if done % 25 == 0:
|
||||||
|
logbus.log("info", "summarize-all progress", summarized=done, skipped=skipped)
|
||||||
|
if limit is not None and done >= limit:
|
||||||
|
break
|
||||||
|
report = {"summarized": done, "skipped": skipped, "total": len(sessions)}
|
||||||
|
logbus.log("info", "summarize-all complete", **report)
|
||||||
|
return report
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
limit = int(sys.argv[1]) if len(sys.argv) > 1 else None
|
||||||
|
print(summarize_all(limit=limit))
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise SystemExit(main())
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ dependencies = [
|
|||||||
lyra = "lyra.__main__:main"
|
lyra = "lyra.__main__:main"
|
||||||
lyra-web = "lyra.web.server:serve"
|
lyra-web = "lyra.web.server:serve"
|
||||||
lyra-import = "lyra.ingest:main"
|
lyra-import = "lyra.ingest:main"
|
||||||
|
lyra-summarize = "lyra.summary:main"
|
||||||
|
|
||||||
[dependency-groups]
|
[dependency-groups]
|
||||||
dev = [
|
dev = [
|
||||||
|
|||||||
Reference in New Issue
Block a user