intake internalized by cortex, removed intake route in relay

This commit is contained in:
serversdwn
2025-11-29 19:08:15 -05:00
parent cc014d0a73
commit 320bf4439b
4 changed files with 313 additions and 89 deletions

View File

@@ -13,7 +13,6 @@ const PORT = Number(process.env.PORT || 7078);
// core endpoints
const CORTEX_REASON = process.env.CORTEX_REASON_URL || "http://cortex:7081/reason";
const CORTEX_INGEST = process.env.CORTEX_INGEST_URL || "http://cortex:7081/ingest";
const INTAKE_URL = process.env.INTAKE_URL || "http://intake:7080/add_exchange";
// -----------------------------------------------------
// Helper request wrapper
@@ -28,6 +27,7 @@ async function postJSON(url, data) {
const raw = await resp.text();
let json;
// Try to parse JSON safely
try {
json = raw ? JSON.parse(raw) : null;
} catch (e) {
@@ -45,7 +45,7 @@ async function postJSON(url, data) {
// Shared chat handler logic
// -----------------------------------------------------
async function handleChatRequest(session_id, user_msg) {
// 1. → Cortex.reason
// 1. → Cortex.reason: the main pipeline
let reason;
try {
reason = await postJSON(CORTEX_REASON, {
@@ -60,20 +60,16 @@ async function handleChatRequest(session_id, user_msg) {
const persona = reason.final_output || reason.persona || "(no persona text)";
// 2. → Cortex.ingest (async, non-blocking)
// Cortex might still want this for separate ingestion pipeline.
postJSON(CORTEX_INGEST, {
session_id,
user_msg,
assistant_msg: persona
}).catch(e => console.warn("Relay → Cortex.ingest failed:", e.message));
}).catch(e =>
console.warn("Relay → Cortex.ingest failed:", e.message)
);
// 3. → Intake summary (async, non-blocking)
postJSON(INTAKE_URL, {
session_id,
user_msg,
assistant_msg: persona
}).catch(e => console.warn("Relay → Intake failed:", e.message));
// 4. Return result
// 3. Return corrected result
return {
session_id,
reply: persona
@@ -88,11 +84,10 @@ app.get("/_health", (_, res) => {
});
// -----------------------------------------------------
// OPENAI-COMPATIBLE ENDPOINT (for UI)
// OPENAI-COMPATIBLE ENDPOINT (for UI & clients)
// -----------------------------------------------------
app.post("/v1/chat/completions", async (req, res) => {
try {
// Extract from OpenAI format
const session_id = req.body.session_id || req.body.user || "default";
const messages = req.body.messages || [];
const lastMessage = messages[messages.length - 1];
@@ -104,10 +99,8 @@ app.post("/v1/chat/completions", async (req, res) => {
console.log(`Relay (v1) → received: "${user_msg}"`);
// Call the same logic as /chat
const result = await handleChatRequest(session_id, user_msg);
// Return in OpenAI format
return res.json({
id: `chatcmpl-${Date.now()}`,
object: "chat.completion",
@@ -129,7 +122,7 @@ app.post("/v1/chat/completions", async (req, res) => {
});
} catch (err) {
console.error("Relay v1 endpoint fatal:", err);
console.error("Relay v1 fatal:", err);
res.status(500).json({
error: {
message: err.message || String(err),
@@ -141,7 +134,7 @@ app.post("/v1/chat/completions", async (req, res) => {
});
// -----------------------------------------------------
// MAIN ENDPOINT (new canonical)
// MAIN ENDPOINT (canonical Lyra UI entrance)
// -----------------------------------------------------
app.post("/chat", async (req, res) => {
try {

View File

@@ -15,13 +15,14 @@ import logging
from datetime import datetime
from typing import Dict, Any, Optional, List
import httpx
from intake.intake import summarize_context
from neomem_client import NeoMemClient
# -----------------------------
# Configuration
# -----------------------------
INTAKE_API_URL = os.getenv("INTAKE_API_URL", "http://intake:7080")
NEOMEM_API = os.getenv("NEOMEM_API", "http://neomem-api:8000")
RELEVANCE_THRESHOLD = float(os.getenv("RELEVANCE_THRESHOLD", "0.4"))
VERBOSE_DEBUG = os.getenv("VERBOSE_DEBUG", "false").lower() == "true"
@@ -89,69 +90,27 @@ def _init_session(session_id: str) -> Dict[str, Any]:
# -----------------------------
# Intake context retrieval
# -----------------------------
async def _get_intake_context(session_id: str) -> Dict[str, Any]:
async def _get_intake_context(session_id: str, messages: List[Dict[str, str]]):
"""
Retrieve multilevel summaries from Intake /context endpoint.
Returns L1-L30 summary hierarchy:
- L1: Last 5 exchanges
- L5: Last 10 exchanges (reality check)
- L10: Intermediate checkpoint
- L20: Session overview
- L30: Continuity report
Args:
session_id: Session identifier
Returns:
Dict with multilevel summaries or empty structure on failure
Internal Intake — Direct call to summarize_context()
No HTTP, no containers, no failures.
"""
url = f"{INTAKE_API_URL}/context"
params = {"session_id": session_id}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
# Expected format from Intake:
# {
# "session_id": "...",
# "L1": [...],
# "L5": [...],
# "L10": {...},
# "L20": {...},
# "L30": {...}
# }
logger.info(f"Retrieved Intake context for session {session_id}")
return data
except httpx.HTTPError as e:
logger.warning(f"Failed to retrieve Intake context: {e}")
return {
"session_id": session_id,
"L1": [],
"L5": [],
"L10": None,
"L20": None,
"L30": None,
"error": str(e)
}
return await summarize_context(session_id, messages)
except Exception as e:
logger.error(f"Unexpected error retrieving Intake context: {e}")
logger.error(f"Internal Intake summarization failed: {e}")
return {
"session_id": session_id,
"L1": [],
"L5": [],
"L10": None,
"L20": None,
"L30": None,
"L1": "",
"L5": "",
"L10": "",
"L20": "",
"L30": "",
"error": str(e)
}
# -----------------------------
# NeoMem semantic search
# -----------------------------
@@ -279,7 +238,19 @@ async def collect_context(session_id: str, user_prompt: str) -> Dict[str, Any]:
logger.debug(f"[COLLECT_CONTEXT] Time since last message: {minutes_since_last_msg:.2f} minutes")
# C. Gather Intake context (multilevel summaries)
intake_data = await _get_intake_context(session_id)
# Build compact message buffer for Intake:
messages_for_intake = []
# You track messages inside SESSION_STATE — assemble it here:
if "message_history" in state:
for turn in state["message_history"]:
messages_for_intake.append({
"user_msg": turn.get("user", ""),
"assistant_msg": turn.get("assistant", "")
})
intake_data = await _get_intake_context(session_id, messages_for_intake)
if VERBOSE_DEBUG:
import json

260
cortex/intake/intake.py Normal file
View File

@@ -0,0 +1,260 @@
import os
from datetime import datetime
from typing import List, Dict, Any
from llm.llm_router import call_llm # use Cortex's shared router
# ─────────────────────────────
# Config
# ─────────────────────────────
INTAKE_LLM = os.getenv("INTAKE_LLM", "PRIMARY").upper()
SUMMARY_MAX_TOKENS = int(os.getenv("SUMMARY_MAX_TOKENS", "200"))
SUMMARY_TEMPERATURE = float(os.getenv("SUMMARY_TEMPERATURE", "0.3"))
NEOMEM_API = os.getenv("NEOMEM_API")
NEOMEM_KEY = os.getenv("NEOMEM_KEY")
# ─────────────────────────────
# Internal history for L10/L20/L30
# ─────────────────────────────
L10_HISTORY: Dict[str, list[str]] = {} # session_id → list of L10 blocks
L20_HISTORY: Dict[str, list[str]] = {} # session_id → list of merged overviews
# ─────────────────────────────
# LLM helper (via Cortex router)
# ─────────────────────────────
async def _llm(prompt: str) -> str:
"""
Use Cortex's llm_router to run a summary prompt.
"""
try:
text = await call_llm(
prompt,
backend=INTAKE_LLM,
temperature=SUMMARY_TEMPERATURE,
max_tokens=SUMMARY_MAX_TOKENS,
)
return (text or "").strip()
except Exception as e:
return f"[Error summarizing: {e}]"
# ─────────────────────────────
# Formatting helpers
# ─────────────────────────────
def _format_exchanges(exchanges: List[Dict[str, Any]]) -> str:
"""
Expect each exchange to look like:
{ "user_msg": "...", "assistant_msg": "..." }
"""
chunks = []
for e in exchanges:
user = e.get("user_msg", "")
assistant = e.get("assistant_msg", "")
chunks.append(f"User: {user}\nAssistant: {assistant}\n")
return "\n".join(chunks)
# ─────────────────────────────
# Base factual summary
# ─────────────────────────────
async def summarize_simple(exchanges: List[Dict[str, Any]]) -> str:
"""
Simple factual summary of recent exchanges.
"""
if not exchanges:
return ""
text = _format_exchanges(exchanges)
prompt = f"""
Summarize the following conversation between Brian (user) and Lyra (assistant).
Focus only on factual content. Avoid names, examples, story tone, or invented details.
{text}
Summary:
"""
return await _llm(prompt)
# ─────────────────────────────
# Multilevel Summaries (L1, L5, L10, L20, L30)
# ─────────────────────────────
async def summarize_L1(buf: List[Dict[str, Any]]) -> str:
# Last ~5 exchanges
return await summarize_simple(buf[-5:])
async def summarize_L5(buf: List[Dict[str, Any]]) -> str:
# Last ~10 exchanges
return await summarize_simple(buf[-10:])
async def summarize_L10(session_id: str, buf: List[Dict[str, Any]]) -> str:
# “Reality Check” for last 10 exchanges
text = _format_exchanges(buf[-10:])
prompt = f"""
You are Lyra Intake performing a short 'Reality Check'.
Summarize the last block of conversation (up to 10 exchanges)
in one clear paragraph focusing on tone, intent, and direction.
{text}
Reality Check:
"""
summary = await _llm(prompt)
# Track history for this session
L10_HISTORY.setdefault(session_id, [])
L10_HISTORY[session_id].append(summary)
return summary
async def summarize_L20(session_id: str) -> str:
"""
Merge all L10 Reality Checks into a 'Session Overview'.
"""
history = L10_HISTORY.get(session_id, [])
joined = "\n\n".join(history) if history else ""
if not joined:
return ""
prompt = f"""
You are Lyra Intake creating a 'Session Overview'.
Merge the following Reality Check paragraphs into one short summary
capturing progress, themes, and the direction of the conversation.
{joined}
Overview:
"""
summary = await _llm(prompt)
L20_HISTORY.setdefault(session_id, [])
L20_HISTORY[session_id].append(summary)
return summary
async def summarize_L30(session_id: str) -> str:
"""
Merge all L20 session overviews into a 'Continuity Report'.
"""
history = L20_HISTORY.get(session_id, [])
joined = "\n\n".join(history) if history else ""
if not joined:
return ""
prompt = f"""
You are Lyra Intake generating a 'Continuity Report'.
Condense these session overviews into one high-level reflection,
noting major themes, persistent goals, and shifts.
{joined}
Continuity Report:
"""
return await _llm(prompt)
# ─────────────────────────────
# NeoMem push
# ─────────────────────────────
def push_to_neomem(summary: str, session_id: str, level: str) -> None:
"""
Fire-and-forget push of a summary into NeoMem.
"""
if not NEOMEM_API or not summary:
return
headers = {"Content-Type": "application/json"}
if NEOMEM_KEY:
headers["Authorization"] = f"Bearer {NEOMEM_KEY}"
payload = {
"messages": [{"role": "assistant", "content": summary}],
"user_id": "brian",
"metadata": {
"source": "intake",
"session_id": session_id,
"level": level,
},
}
try:
import requests
requests.post(
f"{NEOMEM_API}/memories",
json=payload,
headers=headers,
timeout=20,
).raise_for_status()
print(f"🧠 NeoMem updated ({level}) for {session_id}")
except Exception as e:
print(f"NeoMem push failed ({level}, {session_id}): {e}")
# ─────────────────────────────
# Main entrypoint for Cortex
# ─────────────────────────────
async def summarize_context(
session_id: str,
exchanges: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""
Main API used by Cortex:
summaries = await summarize_context(session_id, exchanges)
`exchanges` should be the recent conversation buffer for that session.
"""
buf = list(exchanges)
if not buf:
return {
"session_id": session_id,
"exchange_count": 0,
"L1": "",
"L5": "",
"L10": "",
"L20": "",
"L30": "",
"last_updated": None,
}
# Base levels
L1 = await summarize_L1(buf)
L5 = await summarize_L5(buf)
L10 = await summarize_L10(session_id, buf)
L20 = await summarize_L20(session_id)
L30 = await summarize_L30(session_id)
# Push the "interesting" tiers into NeoMem
push_to_neomem(L10, session_id, "L10")
push_to_neomem(L20, session_id, "L20")
push_to_neomem(L30, session_id, "L30")
return {
"session_id": session_id,
"exchange_count": len(buf),
"L1": L1,
"L5": L5,
"L10": L10,
"L20": L20,
"L30": L30,
"last_updated": datetime.now().isoformat(),
}

View File

@@ -118,23 +118,23 @@ services:
# ============================================================
# Intake
# ============================================================
intake:
build:
context: ./intake
container_name: intake
restart: unless-stopped
env_file:
- ./intake/.env
- ./.env
ports:
- "7080:7080"
volumes:
- ./intake:/app
- ./intake-logs:/app/logs
depends_on:
- cortex
networks:
- lyra_net
# intake:
# build:
# context: ./intake
# container_name: intake
# restart: unless-stopped
# env_file:
# - ./intake/.env
# - ./.env
# ports:
3 - "7080:7080"
# volumes:
# - ./intake:/app
# - ./intake-logs:/app/logs
# depends_on:
# - cortex
# networks:
# - lyra_net
# ============================================================
# RAG Service