Files
project-lyra/cortex/intake/intake.py
2025-12-12 02:58:23 -05:00

373 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
from datetime import datetime
from typing import List, Dict, Any, TYPE_CHECKING
from collections import deque
from llm.llm_router import call_llm
# -------------------------------------------------------------------
# Global Short-Term Memory (new Intake)
# -------------------------------------------------------------------
SESSIONS: dict[str, dict] = {} # session_id → { buffer: deque, created_at: timestamp }
# Diagnostic: Verify module loads only once
print(f"[Intake Module Init] SESSIONS object id: {id(SESSIONS)}, module: {__name__}")
# L10 / L20 history lives here too
L10_HISTORY: Dict[str, list[str]] = {}
L20_HISTORY: Dict[str, list[str]] = {}
from llm.llm_router import call_llm # Use Cortex's shared LLM router
if TYPE_CHECKING:
# Only for type hints — do NOT redefine SESSIONS here
from collections import deque as _deque
def bg_summarize(session_id: str) -> None: ...
# ─────────────────────────────
# Config
# ─────────────────────────────
INTAKE_LLM = os.getenv("INTAKE_LLM", "PRIMARY").upper()
SUMMARY_MAX_TOKENS = int(os.getenv("SUMMARY_MAX_TOKENS", "200"))
SUMMARY_TEMPERATURE = float(os.getenv("SUMMARY_TEMPERATURE", "0.3"))
NEOMEM_API = os.getenv("NEOMEM_API")
NEOMEM_KEY = os.getenv("NEOMEM_KEY")
# ─────────────────────────────
# Internal history for L10/L20/L30
# ─────────────────────────────
L10_HISTORY: Dict[str, list[str]] = {} # session_id → list of L10 blocks
L20_HISTORY: Dict[str, list[str]] = {} # session_id → list of merged overviews
# ─────────────────────────────
# LLM helper (via Cortex router)
# ─────────────────────────────
async def _llm(prompt: str) -> str:
"""
Use Cortex's llm_router to run a summary prompt.
"""
try:
text = await call_llm(
prompt,
backend=INTAKE_LLM,
temperature=SUMMARY_TEMPERATURE,
max_tokens=SUMMARY_MAX_TOKENS,
)
return (text or "").strip()
except Exception as e:
return f"[Error summarizing: {e}]"
# ─────────────────────────────
# Formatting helpers
# ─────────────────────────────
def _format_exchanges(exchanges: List[Dict[str, Any]]) -> str:
"""
Expect each exchange to look like:
{ "user_msg": "...", "assistant_msg": "..." }
"""
chunks = []
for e in exchanges:
user = e.get("user_msg", "")
assistant = e.get("assistant_msg", "")
chunks.append(f"User: {user}\nAssistant: {assistant}\n")
return "\n".join(chunks)
# ─────────────────────────────
# Base factual summary
# ─────────────────────────────
async def summarize_simple(exchanges: List[Dict[str, Any]]) -> str:
"""
Simple factual summary of recent exchanges.
"""
if not exchanges:
return ""
text = _format_exchanges(exchanges)
prompt = f"""
Summarize the following conversation between Brian (user) and Lyra (assistant).
Focus only on factual content. Avoid names, examples, story tone, or invented details.
{text}
Summary:
"""
return await _llm(prompt)
# ─────────────────────────────
# Multilevel Summaries (L1, L5, L10, L20, L30)
# ─────────────────────────────
async def summarize_L1(buf: List[Dict[str, Any]]) -> str:
# Last ~5 exchanges
return await summarize_simple(buf[-5:])
async def summarize_L5(buf: List[Dict[str, Any]]) -> str:
# Last ~10 exchanges
return await summarize_simple(buf[-10:])
async def summarize_L10(session_id: str, buf: List[Dict[str, Any]]) -> str:
# “Reality Check” for last 10 exchanges
text = _format_exchanges(buf[-10:])
prompt = f"""
You are Lyra Intake performing a short 'Reality Check'.
Summarize the last block of conversation (up to 10 exchanges)
in one clear paragraph focusing on tone, intent, and direction.
{text}
Reality Check:
"""
summary = await _llm(prompt)
# Track history for this session
L10_HISTORY.setdefault(session_id, [])
L10_HISTORY[session_id].append(summary)
return summary
async def summarize_L20(session_id: str) -> str:
"""
Merge all L10 Reality Checks into a 'Session Overview'.
"""
history = L10_HISTORY.get(session_id, [])
joined = "\n\n".join(history) if history else ""
if not joined:
return ""
prompt = f"""
You are Lyra Intake creating a 'Session Overview'.
Merge the following Reality Check paragraphs into one short summary
capturing progress, themes, and the direction of the conversation.
{joined}
Overview:
"""
summary = await _llm(prompt)
L20_HISTORY.setdefault(session_id, [])
L20_HISTORY[session_id].append(summary)
return summary
async def summarize_L30(session_id: str) -> str:
"""
Merge all L20 session overviews into a 'Continuity Report'.
"""
history = L20_HISTORY.get(session_id, [])
joined = "\n\n".join(history) if history else ""
if not joined:
return ""
prompt = f"""
You are Lyra Intake generating a 'Continuity Report'.
Condense these session overviews into one high-level reflection,
noting major themes, persistent goals, and shifts.
{joined}
Continuity Report:
"""
return await _llm(prompt)
# ─────────────────────────────
# NeoMem push
# ─────────────────────────────
def push_to_neomem(summary: str, session_id: str, level: str) -> None:
"""
Fire-and-forget push of a summary into NeoMem.
"""
if not NEOMEM_API or not summary:
return
headers = {"Content-Type": "application/json"}
if NEOMEM_KEY:
headers["Authorization"] = f"Bearer {NEOMEM_KEY}"
payload = {
"messages": [{"role": "assistant", "content": summary}],
"user_id": "brian",
"metadata": {
"source": "intake",
"session_id": session_id,
"level": level,
},
}
try:
import requests
requests.post(
f"{NEOMEM_API}/memories",
json=payload,
headers=headers,
timeout=20,
).raise_for_status()
print(f"🧠 NeoMem updated ({level}) for {session_id}")
except Exception as e:
print(f"NeoMem push failed ({level}, {session_id}): {e}")
# ─────────────────────────────
# Main entrypoint for Cortex
# ─────────────────────────────
async def summarize_context(session_id: str, exchanges: list[dict]):
"""
Internal summarizer that uses Cortex's LLM router.
Produces L1 / L5 / L10 / L20 / L30 summaries.
Args:
session_id: The conversation/session ID
exchanges: A list of {"user_msg": ..., "assistant_msg": ..., "timestamp": ...}
"""
# Build raw conversation text
convo_lines = []
for ex in exchanges:
convo_lines.append(f"User: {ex.get('user_msg','')}")
convo_lines.append(f"Assistant: {ex.get('assistant_msg','')}")
convo_text = "\n".join(convo_lines)
if not convo_text.strip():
return {
"session_id": session_id,
"exchange_count": 0,
"L1": "",
"L5": "",
"L10": "",
"L20": "",
"L30": "",
"last_updated": datetime.now().isoformat()
}
# Prompt the LLM (internal — no HTTP)
prompt = f"""
Summarize the conversation below into multiple compression levels.
Conversation:
----------------
{convo_text}
----------------
Output strictly in JSON with keys:
L1 → ultra short summary (12 sentences max)
L5 → short summary
L10 → medium summary
L20 → detailed overview
L30 → full detailed summary
JSON only. No text outside JSON.
"""
try:
llm_response = await call_llm(
prompt,
backend=INTAKE_LLM,
temperature=0.2
)
print(f"[Intake] LLM response length: {len(llm_response) if llm_response else 0}")
print(f"[Intake] LLM response preview: {llm_response[:200] if llm_response else '(empty)'}")
# LLM should return JSON, parse it
if not llm_response or not llm_response.strip():
raise ValueError("Empty response from LLM")
summary = json.loads(llm_response)
return {
"session_id": session_id,
"exchange_count": len(exchanges),
"L1": summary.get("L1", ""),
"L5": summary.get("L5", ""),
"L10": summary.get("L10", ""),
"L20": summary.get("L20", ""),
"L30": summary.get("L30", ""),
"last_updated": datetime.now().isoformat()
}
except Exception as e:
return {
"session_id": session_id,
"exchange_count": len(exchanges),
"L1": f"[Error summarizing: {str(e)}]",
"L5": "",
"L10": "",
"L20": "",
"L30": "",
"last_updated": datetime.now().isoformat()
}
# ─────────────────────────────────
# Background summarization stub
# ─────────────────────────────────
def bg_summarize(session_id: str):
"""
Placeholder for background summarization.
Actual summarization happens during /reason via summarize_context().
This function exists to prevent NameError when called from add_exchange_internal().
"""
print(f"[Intake] Exchange added for {session_id}. Will summarize on next /reason call.")
# ─────────────────────────────
# Internal entrypoint for Cortex
# ─────────────────────────────
def add_exchange_internal(exchange: dict):
"""
Direct internal call — bypasses FastAPI request handling.
Cortex uses this to feed user/assistant turns directly
into Intakes buffer and trigger full summarization.
"""
session_id = exchange.get("session_id")
if not session_id:
raise ValueError("session_id missing")
exchange["timestamp"] = datetime.now().isoformat()
# DEBUG: Verify we're using the module-level SESSIONS
print(f"[add_exchange_internal] SESSIONS object id: {id(SESSIONS)}, current sessions: {list(SESSIONS.keys())}")
# Ensure session exists
if session_id not in SESSIONS:
SESSIONS[session_id] = {
"buffer": deque(maxlen=200),
"created_at": datetime.now()
}
print(f"[add_exchange_internal] Created new session: {session_id}")
else:
print(f"[add_exchange_internal] Using existing session: {session_id}")
# Append exchange into the rolling buffer
SESSIONS[session_id]["buffer"].append(exchange)
buffer_len = len(SESSIONS[session_id]["buffer"])
print(f"[add_exchange_internal] Added exchange to {session_id}, buffer now has {buffer_len} items")
# Trigger summarization immediately
try:
bg_summarize(session_id)
except Exception as e:
print(f"[Internal Intake] Summarization error: {e}")
return {"ok": True, "session_id": session_id}