Files
project-lyra/cortex/intake/intake.py
2025-12-06 04:32:42 -05:00

303 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from datetime import datetime
from typing import List, Dict, Any, TYPE_CHECKING
from collections import deque
if TYPE_CHECKING:
from collections import deque as _deque
SESSIONS: dict
L10_HISTORY: dict
L20_HISTORY: dict
def bg_summarize(session_id: str) -> None: ...
from llm.llm_router import call_llm # use Cortex's shared router
# ─────────────────────────────
# Config
# ─────────────────────────────
INTAKE_LLM = os.getenv("INTAKE_LLM", "PRIMARY").upper()
SUMMARY_MAX_TOKENS = int(os.getenv("SUMMARY_MAX_TOKENS", "200"))
SUMMARY_TEMPERATURE = float(os.getenv("SUMMARY_TEMPERATURE", "0.3"))
NEOMEM_API = os.getenv("NEOMEM_API")
NEOMEM_KEY = os.getenv("NEOMEM_KEY")
# ─────────────────────────────
# Internal history for L10/L20/L30
# ─────────────────────────────
L10_HISTORY: Dict[str, list[str]] = {} # session_id → list of L10 blocks
L20_HISTORY: Dict[str, list[str]] = {} # session_id → list of merged overviews
# ─────────────────────────────
# LLM helper (via Cortex router)
# ─────────────────────────────
async def _llm(prompt: str) -> str:
"""
Use Cortex's llm_router to run a summary prompt.
"""
try:
text = await call_llm(
prompt,
backend=INTAKE_LLM,
temperature=SUMMARY_TEMPERATURE,
max_tokens=SUMMARY_MAX_TOKENS,
)
return (text or "").strip()
except Exception as e:
return f"[Error summarizing: {e}]"
# ─────────────────────────────
# Formatting helpers
# ─────────────────────────────
def _format_exchanges(exchanges: List[Dict[str, Any]]) -> str:
"""
Expect each exchange to look like:
{ "user_msg": "...", "assistant_msg": "..." }
"""
chunks = []
for e in exchanges:
user = e.get("user_msg", "")
assistant = e.get("assistant_msg", "")
chunks.append(f"User: {user}\nAssistant: {assistant}\n")
return "\n".join(chunks)
# ─────────────────────────────
# Base factual summary
# ─────────────────────────────
async def summarize_simple(exchanges: List[Dict[str, Any]]) -> str:
"""
Simple factual summary of recent exchanges.
"""
if not exchanges:
return ""
text = _format_exchanges(exchanges)
prompt = f"""
Summarize the following conversation between Brian (user) and Lyra (assistant).
Focus only on factual content. Avoid names, examples, story tone, or invented details.
{text}
Summary:
"""
return await _llm(prompt)
# ─────────────────────────────
# Multilevel Summaries (L1, L5, L10, L20, L30)
# ─────────────────────────────
async def summarize_L1(buf: List[Dict[str, Any]]) -> str:
# Last ~5 exchanges
return await summarize_simple(buf[-5:])
async def summarize_L5(buf: List[Dict[str, Any]]) -> str:
# Last ~10 exchanges
return await summarize_simple(buf[-10:])
async def summarize_L10(session_id: str, buf: List[Dict[str, Any]]) -> str:
# “Reality Check” for last 10 exchanges
text = _format_exchanges(buf[-10:])
prompt = f"""
You are Lyra Intake performing a short 'Reality Check'.
Summarize the last block of conversation (up to 10 exchanges)
in one clear paragraph focusing on tone, intent, and direction.
{text}
Reality Check:
"""
summary = await _llm(prompt)
# Track history for this session
L10_HISTORY.setdefault(session_id, [])
L10_HISTORY[session_id].append(summary)
return summary
async def summarize_L20(session_id: str) -> str:
"""
Merge all L10 Reality Checks into a 'Session Overview'.
"""
history = L10_HISTORY.get(session_id, [])
joined = "\n\n".join(history) if history else ""
if not joined:
return ""
prompt = f"""
You are Lyra Intake creating a 'Session Overview'.
Merge the following Reality Check paragraphs into one short summary
capturing progress, themes, and the direction of the conversation.
{joined}
Overview:
"""
summary = await _llm(prompt)
L20_HISTORY.setdefault(session_id, [])
L20_HISTORY[session_id].append(summary)
return summary
async def summarize_L30(session_id: str) -> str:
"""
Merge all L20 session overviews into a 'Continuity Report'.
"""
history = L20_HISTORY.get(session_id, [])
joined = "\n\n".join(history) if history else ""
if not joined:
return ""
prompt = f"""
You are Lyra Intake generating a 'Continuity Report'.
Condense these session overviews into one high-level reflection,
noting major themes, persistent goals, and shifts.
{joined}
Continuity Report:
"""
return await _llm(prompt)
# ─────────────────────────────
# NeoMem push
# ─────────────────────────────
def push_to_neomem(summary: str, session_id: str, level: str) -> None:
"""
Fire-and-forget push of a summary into NeoMem.
"""
if not NEOMEM_API or not summary:
return
headers = {"Content-Type": "application/json"}
if NEOMEM_KEY:
headers["Authorization"] = f"Bearer {NEOMEM_KEY}"
payload = {
"messages": [{"role": "assistant", "content": summary}],
"user_id": "brian",
"metadata": {
"source": "intake",
"session_id": session_id,
"level": level,
},
}
try:
import requests
requests.post(
f"{NEOMEM_API}/memories",
json=payload,
headers=headers,
timeout=20,
).raise_for_status()
print(f"🧠 NeoMem updated ({level}) for {session_id}")
except Exception as e:
print(f"NeoMem push failed ({level}, {session_id}): {e}")
# ─────────────────────────────
# Main entrypoint for Cortex
# ─────────────────────────────
async def summarize_context(
session_id: str,
exchanges: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""
Main API used by Cortex:
summaries = await summarize_context(session_id, exchanges)
`exchanges` should be the recent conversation buffer for that session.
"""
buf = list(exchanges)
if not buf:
return {
"session_id": session_id,
"exchange_count": 0,
"L1": "",
"L5": "",
"L10": "",
"L20": "",
"L30": "",
"last_updated": None,
}
# Base levels
L1 = await summarize_L1(buf)
L5 = await summarize_L5(buf)
L10 = await summarize_L10(session_id, buf)
L20 = await summarize_L20(session_id)
L30 = await summarize_L30(session_id)
# Push the "interesting" tiers into NeoMem
push_to_neomem(L10, session_id, "L10")
push_to_neomem(L20, session_id, "L20")
push_to_neomem(L30, session_id, "L30")
return {
"session_id": session_id,
"exchange_count": len(buf),
"L1": L1,
"L5": L5,
"L10": L10,
"L20": L20,
"L30": L30,
"last_updated": datetime.now().isoformat(),
}
# ─────────────────────────────
# Internal entrypoint for Cortex
# ─────────────────────────────
def add_exchange_internal(exchange: dict):
"""
Direct internal call — bypasses FastAPI request handling.
Cortex uses this to feed user/assistant turns directly
into Intakes buffer and trigger full summarization.
"""
session_id = exchange.get("session_id")
if not session_id:
raise ValueError("session_id missing")
exchange["timestamp"] = datetime.now().isoformat()
# Ensure session exists
if session_id not in SESSIONS:
SESSIONS[session_id] = {
"buffer": deque(maxlen=200),
"created_at": datetime.now()
}
# Append exchange into the rolling buffer
SESSIONS[session_id]["buffer"].append(exchange)
# Trigger summarization immediately
try:
bg_summarize(session_id)
except Exception as e:
print(f"[Internal Intake] Summarization error: {e}")
return {"ok": True, "session_id": session_id}