import os from datetime import datetime from typing import List, Dict, Any, TYPE_CHECKING from collections import deque if TYPE_CHECKING: from collections import deque as _deque SESSIONS: dict L10_HISTORY: dict L20_HISTORY: dict def bg_summarize(session_id: str) -> None: ... from llm.llm_router import call_llm # use Cortex's shared router # ───────────────────────────── # Config # ───────────────────────────── INTAKE_LLM = os.getenv("INTAKE_LLM", "PRIMARY").upper() SUMMARY_MAX_TOKENS = int(os.getenv("SUMMARY_MAX_TOKENS", "200")) SUMMARY_TEMPERATURE = float(os.getenv("SUMMARY_TEMPERATURE", "0.3")) NEOMEM_API = os.getenv("NEOMEM_API") NEOMEM_KEY = os.getenv("NEOMEM_KEY") # ───────────────────────────── # Internal history for L10/L20/L30 # ───────────────────────────── L10_HISTORY: Dict[str, list[str]] = {} # session_id → list of L10 blocks L20_HISTORY: Dict[str, list[str]] = {} # session_id → list of merged overviews # ───────────────────────────── # LLM helper (via Cortex router) # ───────────────────────────── async def _llm(prompt: str) -> str: """ Use Cortex's llm_router to run a summary prompt. """ try: text = await call_llm( prompt, backend=INTAKE_LLM, temperature=SUMMARY_TEMPERATURE, max_tokens=SUMMARY_MAX_TOKENS, ) return (text or "").strip() except Exception as e: return f"[Error summarizing: {e}]" # ───────────────────────────── # Formatting helpers # ───────────────────────────── def _format_exchanges(exchanges: List[Dict[str, Any]]) -> str: """ Expect each exchange to look like: { "user_msg": "...", "assistant_msg": "..." } """ chunks = [] for e in exchanges: user = e.get("user_msg", "") assistant = e.get("assistant_msg", "") chunks.append(f"User: {user}\nAssistant: {assistant}\n") return "\n".join(chunks) # ───────────────────────────── # Base factual summary # ───────────────────────────── async def summarize_simple(exchanges: List[Dict[str, Any]]) -> str: """ Simple factual summary of recent exchanges. """ if not exchanges: return "" text = _format_exchanges(exchanges) prompt = f""" Summarize the following conversation between Brian (user) and Lyra (assistant). Focus only on factual content. Avoid names, examples, story tone, or invented details. {text} Summary: """ return await _llm(prompt) # ───────────────────────────── # Multilevel Summaries (L1, L5, L10, L20, L30) # ───────────────────────────── async def summarize_L1(buf: List[Dict[str, Any]]) -> str: # Last ~5 exchanges return await summarize_simple(buf[-5:]) async def summarize_L5(buf: List[Dict[str, Any]]) -> str: # Last ~10 exchanges return await summarize_simple(buf[-10:]) async def summarize_L10(session_id: str, buf: List[Dict[str, Any]]) -> str: # “Reality Check” for last 10 exchanges text = _format_exchanges(buf[-10:]) prompt = f""" You are Lyra Intake performing a short 'Reality Check'. Summarize the last block of conversation (up to 10 exchanges) in one clear paragraph focusing on tone, intent, and direction. {text} Reality Check: """ summary = await _llm(prompt) # Track history for this session L10_HISTORY.setdefault(session_id, []) L10_HISTORY[session_id].append(summary) return summary async def summarize_L20(session_id: str) -> str: """ Merge all L10 Reality Checks into a 'Session Overview'. """ history = L10_HISTORY.get(session_id, []) joined = "\n\n".join(history) if history else "" if not joined: return "" prompt = f""" You are Lyra Intake creating a 'Session Overview'. Merge the following Reality Check paragraphs into one short summary capturing progress, themes, and the direction of the conversation. {joined} Overview: """ summary = await _llm(prompt) L20_HISTORY.setdefault(session_id, []) L20_HISTORY[session_id].append(summary) return summary async def summarize_L30(session_id: str) -> str: """ Merge all L20 session overviews into a 'Continuity Report'. """ history = L20_HISTORY.get(session_id, []) joined = "\n\n".join(history) if history else "" if not joined: return "" prompt = f""" You are Lyra Intake generating a 'Continuity Report'. Condense these session overviews into one high-level reflection, noting major themes, persistent goals, and shifts. {joined} Continuity Report: """ return await _llm(prompt) # ───────────────────────────── # NeoMem push # ───────────────────────────── def push_to_neomem(summary: str, session_id: str, level: str) -> None: """ Fire-and-forget push of a summary into NeoMem. """ if not NEOMEM_API or not summary: return headers = {"Content-Type": "application/json"} if NEOMEM_KEY: headers["Authorization"] = f"Bearer {NEOMEM_KEY}" payload = { "messages": [{"role": "assistant", "content": summary}], "user_id": "brian", "metadata": { "source": "intake", "session_id": session_id, "level": level, }, } try: import requests requests.post( f"{NEOMEM_API}/memories", json=payload, headers=headers, timeout=20, ).raise_for_status() print(f"🧠 NeoMem updated ({level}) for {session_id}") except Exception as e: print(f"NeoMem push failed ({level}, {session_id}): {e}") # ───────────────────────────── # Main entrypoint for Cortex # ───────────────────────────── async def summarize_context( session_id: str, exchanges: List[Dict[str, Any]], ) -> Dict[str, Any]: """ Main API used by Cortex: summaries = await summarize_context(session_id, exchanges) `exchanges` should be the recent conversation buffer for that session. """ buf = list(exchanges) if not buf: return { "session_id": session_id, "exchange_count": 0, "L1": "", "L5": "", "L10": "", "L20": "", "L30": "", "last_updated": None, } # Base levels L1 = await summarize_L1(buf) L5 = await summarize_L5(buf) L10 = await summarize_L10(session_id, buf) L20 = await summarize_L20(session_id) L30 = await summarize_L30(session_id) # Push the "interesting" tiers into NeoMem push_to_neomem(L10, session_id, "L10") push_to_neomem(L20, session_id, "L20") push_to_neomem(L30, session_id, "L30") return { "session_id": session_id, "exchange_count": len(buf), "L1": L1, "L5": L5, "L10": L10, "L20": L20, "L30": L30, "last_updated": datetime.now().isoformat(), } # ───────────────────────────── # Internal entrypoint for Cortex # ───────────────────────────── def add_exchange_internal(exchange: dict): """ Direct internal call — bypasses FastAPI request handling. Cortex uses this to feed user/assistant turns directly into Intake’s buffer and trigger full summarization. """ session_id = exchange.get("session_id") if not session_id: raise ValueError("session_id missing") exchange["timestamp"] = datetime.now().isoformat() # Ensure session exists if session_id not in SESSIONS: SESSIONS[session_id] = { "buffer": deque(maxlen=200), "created_at": datetime.now() } # Append exchange into the rolling buffer SESSIONS[session_id]["buffer"].append(exchange) # Trigger summarization immediately try: bg_summarize(session_id) except Exception as e: print(f"[Internal Intake] Summarization error: {e}") return {"ok": True, "session_id": session_id}