diff --git a/core/relay/server.js b/core/relay/server.js index 71e5dca..c9e2192 100644 --- a/core/relay/server.js +++ b/core/relay/server.js @@ -13,7 +13,6 @@ const PORT = Number(process.env.PORT || 7078); // core endpoints const CORTEX_REASON = process.env.CORTEX_REASON_URL || "http://cortex:7081/reason"; const CORTEX_INGEST = process.env.CORTEX_INGEST_URL || "http://cortex:7081/ingest"; -const INTAKE_URL = process.env.INTAKE_URL || "http://intake:7080/add_exchange"; // ----------------------------------------------------- // Helper request wrapper @@ -28,6 +27,7 @@ async function postJSON(url, data) { const raw = await resp.text(); let json; + // Try to parse JSON safely try { json = raw ? JSON.parse(raw) : null; } catch (e) { @@ -45,7 +45,7 @@ async function postJSON(url, data) { // Shared chat handler logic // ----------------------------------------------------- async function handleChatRequest(session_id, user_msg) { - // 1. → Cortex.reason + // 1. → Cortex.reason: the main pipeline let reason; try { reason = await postJSON(CORTEX_REASON, { @@ -60,20 +60,16 @@ async function handleChatRequest(session_id, user_msg) { const persona = reason.final_output || reason.persona || "(no persona text)"; // 2. → Cortex.ingest (async, non-blocking) + // Cortex might still want this for separate ingestion pipeline. postJSON(CORTEX_INGEST, { session_id, user_msg, assistant_msg: persona - }).catch(e => console.warn("Relay → Cortex.ingest failed:", e.message)); + }).catch(e => + console.warn("Relay → Cortex.ingest failed:", e.message) + ); - // 3. → Intake summary (async, non-blocking) - postJSON(INTAKE_URL, { - session_id, - user_msg, - assistant_msg: persona - }).catch(e => console.warn("Relay → Intake failed:", e.message)); - - // 4. Return result + // 3. Return corrected result return { session_id, reply: persona @@ -88,11 +84,10 @@ app.get("/_health", (_, res) => { }); // ----------------------------------------------------- -// OPENAI-COMPATIBLE ENDPOINT (for UI) +// OPENAI-COMPATIBLE ENDPOINT (for UI & clients) // ----------------------------------------------------- app.post("/v1/chat/completions", async (req, res) => { try { - // Extract from OpenAI format const session_id = req.body.session_id || req.body.user || "default"; const messages = req.body.messages || []; const lastMessage = messages[messages.length - 1]; @@ -104,10 +99,8 @@ app.post("/v1/chat/completions", async (req, res) => { console.log(`Relay (v1) → received: "${user_msg}"`); - // Call the same logic as /chat const result = await handleChatRequest(session_id, user_msg); - // Return in OpenAI format return res.json({ id: `chatcmpl-${Date.now()}`, object: "chat.completion", @@ -129,7 +122,7 @@ app.post("/v1/chat/completions", async (req, res) => { }); } catch (err) { - console.error("Relay v1 endpoint fatal:", err); + console.error("Relay v1 fatal:", err); res.status(500).json({ error: { message: err.message || String(err), @@ -141,7 +134,7 @@ app.post("/v1/chat/completions", async (req, res) => { }); // ----------------------------------------------------- -// MAIN ENDPOINT (new canonical) +// MAIN ENDPOINT (canonical Lyra UI entrance) // ----------------------------------------------------- app.post("/chat", async (req, res) => { try { diff --git a/cortex/context.py b/cortex/context.py index 75f9ede..aff3327 100644 --- a/cortex/context.py +++ b/cortex/context.py @@ -15,13 +15,14 @@ import logging from datetime import datetime from typing import Dict, Any, Optional, List import httpx +from intake.intake import summarize_context + from neomem_client import NeoMemClient # ----------------------------- # Configuration # ----------------------------- -INTAKE_API_URL = os.getenv("INTAKE_API_URL", "http://intake:7080") NEOMEM_API = os.getenv("NEOMEM_API", "http://neomem-api:8000") RELEVANCE_THRESHOLD = float(os.getenv("RELEVANCE_THRESHOLD", "0.4")) VERBOSE_DEBUG = os.getenv("VERBOSE_DEBUG", "false").lower() == "true" @@ -89,69 +90,27 @@ def _init_session(session_id: str) -> Dict[str, Any]: # ----------------------------- # Intake context retrieval # ----------------------------- -async def _get_intake_context(session_id: str) -> Dict[str, Any]: +async def _get_intake_context(session_id: str, messages: List[Dict[str, str]]): """ - Retrieve multilevel summaries from Intake /context endpoint. - - Returns L1-L30 summary hierarchy: - - L1: Last 5 exchanges - - L5: Last 10 exchanges (reality check) - - L10: Intermediate checkpoint - - L20: Session overview - - L30: Continuity report - - Args: - session_id: Session identifier - - Returns: - Dict with multilevel summaries or empty structure on failure + Internal Intake — Direct call to summarize_context() + No HTTP, no containers, no failures. """ - url = f"{INTAKE_API_URL}/context" - params = {"session_id": session_id} - try: - async with httpx.AsyncClient(timeout=5.0) as client: - response = await client.get(url, params=params) - response.raise_for_status() - data = response.json() - - # Expected format from Intake: - # { - # "session_id": "...", - # "L1": [...], - # "L5": [...], - # "L10": {...}, - # "L20": {...}, - # "L30": {...} - # } - - logger.info(f"Retrieved Intake context for session {session_id}") - return data - - except httpx.HTTPError as e: - logger.warning(f"Failed to retrieve Intake context: {e}") - return { - "session_id": session_id, - "L1": [], - "L5": [], - "L10": None, - "L20": None, - "L30": None, - "error": str(e) - } + return await summarize_context(session_id, messages) except Exception as e: - logger.error(f"Unexpected error retrieving Intake context: {e}") + logger.error(f"Internal Intake summarization failed: {e}") return { "session_id": session_id, - "L1": [], - "L5": [], - "L10": None, - "L20": None, - "L30": None, + "L1": "", + "L5": "", + "L10": "", + "L20": "", + "L30": "", "error": str(e) } + # ----------------------------- # NeoMem semantic search # ----------------------------- @@ -279,7 +238,19 @@ async def collect_context(session_id: str, user_prompt: str) -> Dict[str, Any]: logger.debug(f"[COLLECT_CONTEXT] Time since last message: {minutes_since_last_msg:.2f} minutes") # C. Gather Intake context (multilevel summaries) - intake_data = await _get_intake_context(session_id) + # Build compact message buffer for Intake: + messages_for_intake = [] + + # You track messages inside SESSION_STATE — assemble it here: + if "message_history" in state: + for turn in state["message_history"]: + messages_for_intake.append({ + "user_msg": turn.get("user", ""), + "assistant_msg": turn.get("assistant", "") + }) + + intake_data = await _get_intake_context(session_id, messages_for_intake) + if VERBOSE_DEBUG: import json diff --git a/cortex/intake/intake.py b/cortex/intake/intake.py new file mode 100644 index 0000000..ca8a373 --- /dev/null +++ b/cortex/intake/intake.py @@ -0,0 +1,260 @@ +import os +from datetime import datetime +from typing import List, Dict, Any + +from llm.llm_router import call_llm # use Cortex's shared router + +# ───────────────────────────── +# Config +# ───────────────────────────── + +INTAKE_LLM = os.getenv("INTAKE_LLM", "PRIMARY").upper() + +SUMMARY_MAX_TOKENS = int(os.getenv("SUMMARY_MAX_TOKENS", "200")) +SUMMARY_TEMPERATURE = float(os.getenv("SUMMARY_TEMPERATURE", "0.3")) + +NEOMEM_API = os.getenv("NEOMEM_API") +NEOMEM_KEY = os.getenv("NEOMEM_KEY") + +# ───────────────────────────── +# Internal history for L10/L20/L30 +# ───────────────────────────── + +L10_HISTORY: Dict[str, list[str]] = {} # session_id → list of L10 blocks +L20_HISTORY: Dict[str, list[str]] = {} # session_id → list of merged overviews + + +# ───────────────────────────── +# LLM helper (via Cortex router) +# ───────────────────────────── + +async def _llm(prompt: str) -> str: + """ + Use Cortex's llm_router to run a summary prompt. + """ + try: + text = await call_llm( + prompt, + backend=INTAKE_LLM, + temperature=SUMMARY_TEMPERATURE, + max_tokens=SUMMARY_MAX_TOKENS, + ) + return (text or "").strip() + except Exception as e: + return f"[Error summarizing: {e}]" + + +# ───────────────────────────── +# Formatting helpers +# ───────────────────────────── + +def _format_exchanges(exchanges: List[Dict[str, Any]]) -> str: + """ + Expect each exchange to look like: + { "user_msg": "...", "assistant_msg": "..." } + """ + chunks = [] + for e in exchanges: + user = e.get("user_msg", "") + assistant = e.get("assistant_msg", "") + chunks.append(f"User: {user}\nAssistant: {assistant}\n") + return "\n".join(chunks) + + +# ───────────────────────────── +# Base factual summary +# ───────────────────────────── + +async def summarize_simple(exchanges: List[Dict[str, Any]]) -> str: + """ + Simple factual summary of recent exchanges. + """ + if not exchanges: + return "" + + text = _format_exchanges(exchanges) + + prompt = f""" +Summarize the following conversation between Brian (user) and Lyra (assistant). +Focus only on factual content. Avoid names, examples, story tone, or invented details. + +{text} + +Summary: +""" + return await _llm(prompt) + + +# ───────────────────────────── +# Multilevel Summaries (L1, L5, L10, L20, L30) +# ───────────────────────────── + +async def summarize_L1(buf: List[Dict[str, Any]]) -> str: + # Last ~5 exchanges + return await summarize_simple(buf[-5:]) + + +async def summarize_L5(buf: List[Dict[str, Any]]) -> str: + # Last ~10 exchanges + return await summarize_simple(buf[-10:]) + + +async def summarize_L10(session_id: str, buf: List[Dict[str, Any]]) -> str: + # “Reality Check” for last 10 exchanges + text = _format_exchanges(buf[-10:]) + + prompt = f""" +You are Lyra Intake performing a short 'Reality Check'. +Summarize the last block of conversation (up to 10 exchanges) +in one clear paragraph focusing on tone, intent, and direction. + +{text} + +Reality Check: +""" + summary = await _llm(prompt) + + # Track history for this session + L10_HISTORY.setdefault(session_id, []) + L10_HISTORY[session_id].append(summary) + + return summary + + +async def summarize_L20(session_id: str) -> str: + """ + Merge all L10 Reality Checks into a 'Session Overview'. + """ + history = L10_HISTORY.get(session_id, []) + joined = "\n\n".join(history) if history else "" + + if not joined: + return "" + + prompt = f""" +You are Lyra Intake creating a 'Session Overview'. +Merge the following Reality Check paragraphs into one short summary +capturing progress, themes, and the direction of the conversation. + +{joined} + +Overview: +""" + summary = await _llm(prompt) + + L20_HISTORY.setdefault(session_id, []) + L20_HISTORY[session_id].append(summary) + + return summary + + +async def summarize_L30(session_id: str) -> str: + """ + Merge all L20 session overviews into a 'Continuity Report'. + """ + history = L20_HISTORY.get(session_id, []) + joined = "\n\n".join(history) if history else "" + + if not joined: + return "" + + prompt = f""" +You are Lyra Intake generating a 'Continuity Report'. +Condense these session overviews into one high-level reflection, +noting major themes, persistent goals, and shifts. + +{joined} + +Continuity Report: +""" + return await _llm(prompt) + + +# ───────────────────────────── +# NeoMem push +# ───────────────────────────── + +def push_to_neomem(summary: str, session_id: str, level: str) -> None: + """ + Fire-and-forget push of a summary into NeoMem. + """ + if not NEOMEM_API or not summary: + return + + headers = {"Content-Type": "application/json"} + if NEOMEM_KEY: + headers["Authorization"] = f"Bearer {NEOMEM_KEY}" + + payload = { + "messages": [{"role": "assistant", "content": summary}], + "user_id": "brian", + "metadata": { + "source": "intake", + "session_id": session_id, + "level": level, + }, + } + + try: + import requests + requests.post( + f"{NEOMEM_API}/memories", + json=payload, + headers=headers, + timeout=20, + ).raise_for_status() + print(f"🧠 NeoMem updated ({level}) for {session_id}") + except Exception as e: + print(f"NeoMem push failed ({level}, {session_id}): {e}") + + +# ───────────────────────────── +# Main entrypoint for Cortex +# ───────────────────────────── + +async def summarize_context( + session_id: str, + exchanges: List[Dict[str, Any]], +) -> Dict[str, Any]: + """ + Main API used by Cortex: + + summaries = await summarize_context(session_id, exchanges) + + `exchanges` should be the recent conversation buffer for that session. + """ + buf = list(exchanges) + if not buf: + return { + "session_id": session_id, + "exchange_count": 0, + "L1": "", + "L5": "", + "L10": "", + "L20": "", + "L30": "", + "last_updated": None, + } + + # Base levels + L1 = await summarize_L1(buf) + L5 = await summarize_L5(buf) + L10 = await summarize_L10(session_id, buf) + L20 = await summarize_L20(session_id) + L30 = await summarize_L30(session_id) + + # Push the "interesting" tiers into NeoMem + push_to_neomem(L10, session_id, "L10") + push_to_neomem(L20, session_id, "L20") + push_to_neomem(L30, session_id, "L30") + + return { + "session_id": session_id, + "exchange_count": len(buf), + "L1": L1, + "L5": L5, + "L10": L10, + "L20": L20, + "L30": L30, + "last_updated": datetime.now().isoformat(), + } diff --git a/docker-compose.yml b/docker-compose.yml index 38303b3..a4493fe 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -118,23 +118,23 @@ services: # ============================================================ # Intake # ============================================================ - intake: - build: - context: ./intake - container_name: intake - restart: unless-stopped - env_file: - - ./intake/.env - - ./.env - ports: - - "7080:7080" - volumes: - - ./intake:/app - - ./intake-logs:/app/logs - depends_on: - - cortex - networks: - - lyra_net +# intake: +# build: +# context: ./intake +# container_name: intake +# restart: unless-stopped +# env_file: +# - ./intake/.env +# - ./.env +# ports: +3 - "7080:7080" +# volumes: +# - ./intake:/app +# - ./intake-logs:/app/logs +# depends_on: +# - cortex +# networks: +# - lyra_net # ============================================================ # RAG Service