301 lines
9.0 KiB
Python
301 lines
9.0 KiB
Python
import os
|
||
from datetime import datetime
|
||
from typing import List, Dict, Any, TYPE_CHECKING
|
||
|
||
if TYPE_CHECKING:
|
||
from collections import deque as _deque
|
||
SESSIONS: dict
|
||
L10_HISTORY: dict
|
||
L20_HISTORY: dict
|
||
def bg_summarize(session_id: str) -> None: ...
|
||
|
||
from llm.llm_router import call_llm # use Cortex's shared router
|
||
|
||
# ─────────────────────────────
|
||
# Config
|
||
# ─────────────────────────────
|
||
|
||
INTAKE_LLM = os.getenv("INTAKE_LLM", "PRIMARY").upper()
|
||
|
||
SUMMARY_MAX_TOKENS = int(os.getenv("SUMMARY_MAX_TOKENS", "200"))
|
||
SUMMARY_TEMPERATURE = float(os.getenv("SUMMARY_TEMPERATURE", "0.3"))
|
||
|
||
NEOMEM_API = os.getenv("NEOMEM_API")
|
||
NEOMEM_KEY = os.getenv("NEOMEM_KEY")
|
||
|
||
# ─────────────────────────────
|
||
# Internal history for L10/L20/L30
|
||
# ─────────────────────────────
|
||
|
||
L10_HISTORY: Dict[str, list[str]] = {} # session_id → list of L10 blocks
|
||
L20_HISTORY: Dict[str, list[str]] = {} # session_id → list of merged overviews
|
||
|
||
|
||
# ─────────────────────────────
|
||
# LLM helper (via Cortex router)
|
||
# ─────────────────────────────
|
||
|
||
async def _llm(prompt: str) -> str:
|
||
"""
|
||
Use Cortex's llm_router to run a summary prompt.
|
||
"""
|
||
try:
|
||
text = await call_llm(
|
||
prompt,
|
||
backend=INTAKE_LLM,
|
||
temperature=SUMMARY_TEMPERATURE,
|
||
max_tokens=SUMMARY_MAX_TOKENS,
|
||
)
|
||
return (text or "").strip()
|
||
except Exception as e:
|
||
return f"[Error summarizing: {e}]"
|
||
|
||
|
||
# ─────────────────────────────
|
||
# Formatting helpers
|
||
# ─────────────────────────────
|
||
|
||
def _format_exchanges(exchanges: List[Dict[str, Any]]) -> str:
|
||
"""
|
||
Expect each exchange to look like:
|
||
{ "user_msg": "...", "assistant_msg": "..." }
|
||
"""
|
||
chunks = []
|
||
for e in exchanges:
|
||
user = e.get("user_msg", "")
|
||
assistant = e.get("assistant_msg", "")
|
||
chunks.append(f"User: {user}\nAssistant: {assistant}\n")
|
||
return "\n".join(chunks)
|
||
|
||
|
||
# ─────────────────────────────
|
||
# Base factual summary
|
||
# ─────────────────────────────
|
||
|
||
async def summarize_simple(exchanges: List[Dict[str, Any]]) -> str:
|
||
"""
|
||
Simple factual summary of recent exchanges.
|
||
"""
|
||
if not exchanges:
|
||
return ""
|
||
|
||
text = _format_exchanges(exchanges)
|
||
|
||
prompt = f"""
|
||
Summarize the following conversation between Brian (user) and Lyra (assistant).
|
||
Focus only on factual content. Avoid names, examples, story tone, or invented details.
|
||
|
||
{text}
|
||
|
||
Summary:
|
||
"""
|
||
return await _llm(prompt)
|
||
|
||
|
||
# ─────────────────────────────
|
||
# Multilevel Summaries (L1, L5, L10, L20, L30)
|
||
# ─────────────────────────────
|
||
|
||
async def summarize_L1(buf: List[Dict[str, Any]]) -> str:
|
||
# Last ~5 exchanges
|
||
return await summarize_simple(buf[-5:])
|
||
|
||
|
||
async def summarize_L5(buf: List[Dict[str, Any]]) -> str:
|
||
# Last ~10 exchanges
|
||
return await summarize_simple(buf[-10:])
|
||
|
||
|
||
async def summarize_L10(session_id: str, buf: List[Dict[str, Any]]) -> str:
|
||
# “Reality Check” for last 10 exchanges
|
||
text = _format_exchanges(buf[-10:])
|
||
|
||
prompt = f"""
|
||
You are Lyra Intake performing a short 'Reality Check'.
|
||
Summarize the last block of conversation (up to 10 exchanges)
|
||
in one clear paragraph focusing on tone, intent, and direction.
|
||
|
||
{text}
|
||
|
||
Reality Check:
|
||
"""
|
||
summary = await _llm(prompt)
|
||
|
||
# Track history for this session
|
||
L10_HISTORY.setdefault(session_id, [])
|
||
L10_HISTORY[session_id].append(summary)
|
||
|
||
return summary
|
||
|
||
|
||
async def summarize_L20(session_id: str) -> str:
|
||
"""
|
||
Merge all L10 Reality Checks into a 'Session Overview'.
|
||
"""
|
||
history = L10_HISTORY.get(session_id, [])
|
||
joined = "\n\n".join(history) if history else ""
|
||
|
||
if not joined:
|
||
return ""
|
||
|
||
prompt = f"""
|
||
You are Lyra Intake creating a 'Session Overview'.
|
||
Merge the following Reality Check paragraphs into one short summary
|
||
capturing progress, themes, and the direction of the conversation.
|
||
|
||
{joined}
|
||
|
||
Overview:
|
||
"""
|
||
summary = await _llm(prompt)
|
||
|
||
L20_HISTORY.setdefault(session_id, [])
|
||
L20_HISTORY[session_id].append(summary)
|
||
|
||
return summary
|
||
|
||
|
||
async def summarize_L30(session_id: str) -> str:
|
||
"""
|
||
Merge all L20 session overviews into a 'Continuity Report'.
|
||
"""
|
||
history = L20_HISTORY.get(session_id, [])
|
||
joined = "\n\n".join(history) if history else ""
|
||
|
||
if not joined:
|
||
return ""
|
||
|
||
prompt = f"""
|
||
You are Lyra Intake generating a 'Continuity Report'.
|
||
Condense these session overviews into one high-level reflection,
|
||
noting major themes, persistent goals, and shifts.
|
||
|
||
{joined}
|
||
|
||
Continuity Report:
|
||
"""
|
||
return await _llm(prompt)
|
||
|
||
|
||
# ─────────────────────────────
|
||
# NeoMem push
|
||
# ─────────────────────────────
|
||
|
||
def push_to_neomem(summary: str, session_id: str, level: str) -> None:
|
||
"""
|
||
Fire-and-forget push of a summary into NeoMem.
|
||
"""
|
||
if not NEOMEM_API or not summary:
|
||
return
|
||
|
||
headers = {"Content-Type": "application/json"}
|
||
if NEOMEM_KEY:
|
||
headers["Authorization"] = f"Bearer {NEOMEM_KEY}"
|
||
|
||
payload = {
|
||
"messages": [{"role": "assistant", "content": summary}],
|
||
"user_id": "brian",
|
||
"metadata": {
|
||
"source": "intake",
|
||
"session_id": session_id,
|
||
"level": level,
|
||
},
|
||
}
|
||
|
||
try:
|
||
import requests
|
||
requests.post(
|
||
f"{NEOMEM_API}/memories",
|
||
json=payload,
|
||
headers=headers,
|
||
timeout=20,
|
||
).raise_for_status()
|
||
print(f"🧠 NeoMem updated ({level}) for {session_id}")
|
||
except Exception as e:
|
||
print(f"NeoMem push failed ({level}, {session_id}): {e}")
|
||
|
||
|
||
# ─────────────────────────────
|
||
# Main entrypoint for Cortex
|
||
# ─────────────────────────────
|
||
|
||
async def summarize_context(
|
||
session_id: str,
|
||
exchanges: List[Dict[str, Any]],
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Main API used by Cortex:
|
||
|
||
summaries = await summarize_context(session_id, exchanges)
|
||
|
||
`exchanges` should be the recent conversation buffer for that session.
|
||
"""
|
||
buf = list(exchanges)
|
||
if not buf:
|
||
return {
|
||
"session_id": session_id,
|
||
"exchange_count": 0,
|
||
"L1": "",
|
||
"L5": "",
|
||
"L10": "",
|
||
"L20": "",
|
||
"L30": "",
|
||
"last_updated": None,
|
||
}
|
||
|
||
# Base levels
|
||
L1 = await summarize_L1(buf)
|
||
L5 = await summarize_L5(buf)
|
||
L10 = await summarize_L10(session_id, buf)
|
||
L20 = await summarize_L20(session_id)
|
||
L30 = await summarize_L30(session_id)
|
||
|
||
# Push the "interesting" tiers into NeoMem
|
||
push_to_neomem(L10, session_id, "L10")
|
||
push_to_neomem(L20, session_id, "L20")
|
||
push_to_neomem(L30, session_id, "L30")
|
||
|
||
return {
|
||
"session_id": session_id,
|
||
"exchange_count": len(buf),
|
||
"L1": L1,
|
||
"L5": L5,
|
||
"L10": L10,
|
||
"L20": L20,
|
||
"L30": L30,
|
||
"last_updated": datetime.now().isoformat(),
|
||
}
|
||
|
||
# ─────────────────────────────
|
||
# Internal entrypoint for Cortex
|
||
# ─────────────────────────────
|
||
def add_exchange_internal(exchange: dict):
|
||
"""
|
||
Direct internal call — bypasses FastAPI request handling.
|
||
Cortex uses this to feed user/assistant turns directly
|
||
into Intake’s buffer and trigger full summarization.
|
||
"""
|
||
session_id = exchange.get("session_id")
|
||
if not session_id:
|
||
raise ValueError("session_id missing")
|
||
|
||
exchange["timestamp"] = datetime.now().isoformat()
|
||
|
||
# Ensure session exists
|
||
if session_id not in SESSIONS:
|
||
SESSIONS[session_id] = {
|
||
"buffer": deque(maxlen=200),
|
||
"created_at": datetime.now()
|
||
}
|
||
|
||
# Append exchange into the rolling buffer
|
||
SESSIONS[session_id]["buffer"].append(exchange)
|
||
|
||
# Trigger summarization immediately
|
||
try:
|
||
bg_summarize(session_id)
|
||
except Exception as e:
|
||
print(f"[Internal Intake] Summarization error: {e}")
|
||
|
||
return {"ok": True, "session_id": session_id}
|