diff --git a/lyra/chat.py b/lyra/chat.py index b54a897..7917b58 100644 --- a/lyra/chat.py +++ b/lyra/chat.py @@ -200,7 +200,7 @@ def respond(session_id: str, user_msg: str, backend: Backend = "cloud", memory.remember(session_id, "assistant", reply) # Compact this session once enough new turns have piled up. - summary.maybe_summarize(session_id) + summary.maybe_summarize_async(session_id) return reply @@ -259,5 +259,5 @@ def respond_stream(session_id: str, user_msg: str, backend: Backend = "cloud", memory.remember(session_id, "user", user_msg) memory.remember(session_id, "assistant", reply) - summary.maybe_summarize(session_id) + summary.maybe_summarize_async(session_id) yield ("done", reply) diff --git a/lyra/summary.py b/lyra/summary.py index 39506d8..3ffee1a 100644 --- a/lyra/summary.py +++ b/lyra/summary.py @@ -10,6 +10,7 @@ big imported conversation doesn't blow the local model's context window. from __future__ import annotations import sys +import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed @@ -20,8 +21,15 @@ _RETRIES = 4 # Re-summarize a session once it has accumulated this many new raw exchanges. SUMMARIZE_AFTER = 20 -# Transcript budget per LLM call; longer sessions are chunked + merged. +# Transcript budget per LLM call; longer sessions are chunked + merged. Cloud has +# a large context window; the local llama.cpp/Ollama servers have small ones, so a +# 24k-char chunk overflows them ("Context size has been exceeded") — keep local small. MAX_TRANSCRIPT_CHARS = 24000 +LOCAL_TRANSCRIPT_CHARS = 8000 + + +def _budget(backend: Backend) -> int: + return MAX_TRANSCRIPT_CHARS if backend == "cloud" else LOCAL_TRANSCRIPT_CHARS _PROMPT = """You are compacting a conversation into a long-term memory record \ (not replying to anyone). Write a concise gist of the session below: what was \ @@ -66,11 +74,14 @@ def _summarize_text(text: str, backend: Backend) -> str: def _summarize_transcript(transcript: str, backend: Backend) -> str: - """Transcript -> gist (LLM only, no DB). Chunks + merges if oversized.""" - if len(transcript) <= MAX_TRANSCRIPT_CHARS: + """Transcript -> gist (LLM only, no DB). Chunks + merges if oversized, and + recurses so even the merged partials never exceed the backend's window.""" + budget = _budget(backend) + if len(transcript) <= budget: return _summarize_text(transcript, backend) - partials = [_summarize_text(c, backend) for c in _chunk(transcript, MAX_TRANSCRIPT_CHARS)] - return _summarize_text("Partial summaries to merge:\n\n" + "\n\n".join(partials), backend) + partials = [_summarize_text(c, backend) for c in _chunk(transcript, budget)] + merged = "Partial summaries to merge:\n\n" + "\n\n".join(partials) + return _summarize_transcript(merged, backend) def summarize_session(session_id: str, backend: Backend | None = None) -> str | None: @@ -91,6 +102,32 @@ def maybe_summarize(session_id: str, backend: Backend | None = None) -> None: summarize_session(session_id, backend=backend) +_inflight: set[str] = set() +_inflight_lock = threading.Lock() + + +def maybe_summarize_async(session_id: str, backend: Backend | None = None) -> None: + """Run maybe_summarize off the chat turn's critical path. Consolidation is + background maintenance — it must never stall the reply or surface an error to + the user (a slow/oversized local model would otherwise block the turn). At most + one summary per session runs at a time.""" + with _inflight_lock: + if session_id in _inflight: + return + _inflight.add(session_id) + + def _run() -> None: + try: + maybe_summarize(session_id, backend=backend) + except Exception as exc: + logbus.log("error", "summary skipped", session=session_id, error=str(exc)[:120]) + finally: + with _inflight_lock: + _inflight.discard(session_id) + + threading.Thread(target=_run, daemon=True, name="summarize").start() + + def summarize_all( backend: Backend | None = None, limit: int | None = None, workers: int = 8 ) -> dict: