import os import json from datetime import datetime from typing import List, Dict, Any, TYPE_CHECKING from collections import deque from llm.llm_router import call_llm # ------------------------------------------------------------------- # Global Short-Term Memory (new Intake) # ------------------------------------------------------------------- SESSIONS: dict[str, dict] = {} # session_id → { buffer: deque, created_at: timestamp } # Diagnostic: Verify module loads only once print(f"[Intake Module Init] SESSIONS object id: {id(SESSIONS)}, module: {__name__}") # L10 / L20 history lives here too L10_HISTORY: Dict[str, list[str]] = {} L20_HISTORY: Dict[str, list[str]] = {} from llm.llm_router import call_llm # Use Cortex's shared LLM router if TYPE_CHECKING: # Only for type hints — do NOT redefine SESSIONS here from collections import deque as _deque def bg_summarize(session_id: str) -> None: ... # ───────────────────────────── # Config # ───────────────────────────── INTAKE_LLM = os.getenv("INTAKE_LLM", "PRIMARY").upper() SUMMARY_MAX_TOKENS = int(os.getenv("SUMMARY_MAX_TOKENS", "200")) SUMMARY_TEMPERATURE = float(os.getenv("SUMMARY_TEMPERATURE", "0.3")) NEOMEM_API = os.getenv("NEOMEM_API") NEOMEM_KEY = os.getenv("NEOMEM_KEY") # ───────────────────────────── # Internal history for L10/L20/L30 # ───────────────────────────── L10_HISTORY: Dict[str, list[str]] = {} # session_id → list of L10 blocks L20_HISTORY: Dict[str, list[str]] = {} # session_id → list of merged overviews # ───────────────────────────── # LLM helper (via Cortex router) # ───────────────────────────── async def _llm(prompt: str) -> str: """ Use Cortex's llm_router to run a summary prompt. """ try: text = await call_llm( prompt, backend=INTAKE_LLM, temperature=SUMMARY_TEMPERATURE, max_tokens=SUMMARY_MAX_TOKENS, ) return (text or "").strip() except Exception as e: return f"[Error summarizing: {e}]" # ───────────────────────────── # Formatting helpers # ───────────────────────────── def _format_exchanges(exchanges: List[Dict[str, Any]]) -> str: """ Expect each exchange to look like: { "user_msg": "...", "assistant_msg": "..." } """ chunks = [] for e in exchanges: user = e.get("user_msg", "") assistant = e.get("assistant_msg", "") chunks.append(f"User: {user}\nAssistant: {assistant}\n") return "\n".join(chunks) # ───────────────────────────── # Base factual summary # ───────────────────────────── async def summarize_simple(exchanges: List[Dict[str, Any]]) -> str: """ Simple factual summary of recent exchanges. """ if not exchanges: return "" text = _format_exchanges(exchanges) prompt = f""" Summarize the following conversation between Brian (user) and Lyra (assistant). Focus only on factual content. Avoid names, examples, story tone, or invented details. {text} Summary: """ return await _llm(prompt) # ───────────────────────────── # Multilevel Summaries (L1, L5, L10, L20, L30) # ───────────────────────────── async def summarize_L1(buf: List[Dict[str, Any]]) -> str: # Last ~5 exchanges return await summarize_simple(buf[-5:]) async def summarize_L5(buf: List[Dict[str, Any]]) -> str: # Last ~10 exchanges return await summarize_simple(buf[-10:]) async def summarize_L10(session_id: str, buf: List[Dict[str, Any]]) -> str: # “Reality Check” for last 10 exchanges text = _format_exchanges(buf[-10:]) prompt = f""" You are Lyra Intake performing a short 'Reality Check'. Summarize the last block of conversation (up to 10 exchanges) in one clear paragraph focusing on tone, intent, and direction. {text} Reality Check: """ summary = await _llm(prompt) # Track history for this session L10_HISTORY.setdefault(session_id, []) L10_HISTORY[session_id].append(summary) return summary async def summarize_L20(session_id: str) -> str: """ Merge all L10 Reality Checks into a 'Session Overview'. """ history = L10_HISTORY.get(session_id, []) joined = "\n\n".join(history) if history else "" if not joined: return "" prompt = f""" You are Lyra Intake creating a 'Session Overview'. Merge the following Reality Check paragraphs into one short summary capturing progress, themes, and the direction of the conversation. {joined} Overview: """ summary = await _llm(prompt) L20_HISTORY.setdefault(session_id, []) L20_HISTORY[session_id].append(summary) return summary async def summarize_L30(session_id: str) -> str: """ Merge all L20 session overviews into a 'Continuity Report'. """ history = L20_HISTORY.get(session_id, []) joined = "\n\n".join(history) if history else "" if not joined: return "" prompt = f""" You are Lyra Intake generating a 'Continuity Report'. Condense these session overviews into one high-level reflection, noting major themes, persistent goals, and shifts. {joined} Continuity Report: """ return await _llm(prompt) # ───────────────────────────── # NeoMem push # ───────────────────────────── def push_to_neomem(summary: str, session_id: str, level: str) -> None: """ Fire-and-forget push of a summary into NeoMem. """ if not NEOMEM_API or not summary: return headers = {"Content-Type": "application/json"} if NEOMEM_KEY: headers["Authorization"] = f"Bearer {NEOMEM_KEY}" payload = { "messages": [{"role": "assistant", "content": summary}], "user_id": "brian", "metadata": { "source": "intake", "session_id": session_id, "level": level, }, } try: import requests requests.post( f"{NEOMEM_API}/memories", json=payload, headers=headers, timeout=20, ).raise_for_status() print(f"🧠 NeoMem updated ({level}) for {session_id}") except Exception as e: print(f"NeoMem push failed ({level}, {session_id}): {e}") # ───────────────────────────── # Main entrypoint for Cortex # ───────────────────────────── async def summarize_context(session_id: str, exchanges: list[dict]): """ Internal summarizer that uses Cortex's LLM router. Produces cascading summaries based on exchange count: - L1: Always (most recent activity) - L2: After 2+ exchanges - L5: After 5+ exchanges - L10: After 10+ exchanges - L20: After 20+ exchanges - L30: After 30+ exchanges Args: session_id: The conversation/session ID exchanges: A list of {"user_msg": ..., "assistant_msg": ..., "timestamp": ...} """ exchange_count = len(exchanges) if exchange_count == 0: return { "session_id": session_id, "exchange_count": 0, "L1": "", "L2": "", "L5": "", "L10": "", "L20": "", "L30": "", "last_updated": datetime.now().isoformat() } result = { "session_id": session_id, "exchange_count": exchange_count, "L1": "", "L2": "", "L5": "", "L10": "", "L20": "", "L30": "", "last_updated": datetime.now().isoformat() } try: # L1: Always generate (most recent exchanges) result["L1"] = await summarize_simple(exchanges[-5:]) print(f"[Intake] Generated L1 for {session_id} ({exchange_count} exchanges)") # L2: After 2+ exchanges if exchange_count >= 2: result["L2"] = await summarize_simple(exchanges[-2:]) print(f"[Intake] Generated L2 for {session_id}") # L5: After 5+ exchanges if exchange_count >= 5: result["L5"] = await summarize_simple(exchanges[-10:]) print(f"[Intake] Generated L5 for {session_id}") # L10: After 10+ exchanges (Reality Check) if exchange_count >= 10: result["L10"] = await summarize_L10(session_id, exchanges) print(f"[Intake] Generated L10 for {session_id}") # L20: After 20+ exchanges (Session Overview - merges L10s) if exchange_count >= 20 and exchange_count % 10 == 0: result["L20"] = await summarize_L20(session_id) print(f"[Intake] Generated L20 for {session_id}") # L30: After 30+ exchanges (Continuity Report - merges L20s) if exchange_count >= 30 and exchange_count % 10 == 0: result["L30"] = await summarize_L30(session_id) print(f"[Intake] Generated L30 for {session_id}") return result except Exception as e: print(f"[Intake] Error during summarization: {e}") result["L1"] = f"[Error summarizing: {str(e)}]" return result # ───────────────────────────────── # Background summarization stub # ───────────────────────────────── def bg_summarize(session_id: str): """ Placeholder for background summarization. Actual summarization happens during /reason via summarize_context(). This function exists to prevent NameError when called from add_exchange_internal(). """ print(f"[Intake] Exchange added for {session_id}. Will summarize on next /reason call.") # ───────────────────────────── # Internal entrypoint for Cortex # ───────────────────────────── def add_exchange_internal(exchange: dict): """ Direct internal call — bypasses FastAPI request handling. Cortex uses this to feed user/assistant turns directly into Intake’s buffer and trigger full summarization. """ session_id = exchange.get("session_id") if not session_id: raise ValueError("session_id missing") exchange["timestamp"] = datetime.now().isoformat() # DEBUG: Verify we're using the module-level SESSIONS print(f"[add_exchange_internal] SESSIONS object id: {id(SESSIONS)}, current sessions: {list(SESSIONS.keys())}") # Ensure session exists if session_id not in SESSIONS: SESSIONS[session_id] = { "buffer": deque(maxlen=200), "created_at": datetime.now() } print(f"[add_exchange_internal] Created new session: {session_id}") else: print(f"[add_exchange_internal] Using existing session: {session_id}") # Append exchange into the rolling buffer SESSIONS[session_id]["buffer"].append(exchange) buffer_len = len(SESSIONS[session_id]["buffer"]) print(f"[add_exchange_internal] Added exchange to {session_id}, buffer now has {buffer_len} items") # Trigger summarization immediately try: bg_summarize(session_id) except Exception as e: print(f"[Internal Intake] Summarization error: {e}") return {"ok": True, "session_id": session_id}