diff --git a/cortex/intake/intake.py b/cortex/intake/intake.py index ca8a373..050f8d7 100644 --- a/cortex/intake/intake.py +++ b/cortex/intake/intake.py @@ -1,6 +1,13 @@ import os from datetime import datetime -from typing import List, Dict, Any +from typing import List, Dict, Any, TYPE_CHECKING + +if TYPE_CHECKING: + from collections import deque as _deque + SESSIONS: dict + L10_HISTORY: dict + L20_HISTORY: dict + def bg_summarize(session_id: str) -> None: ... from llm.llm_router import call_llm # use Cortex's shared router @@ -258,3 +265,36 @@ async def summarize_context( "L30": L30, "last_updated": datetime.now().isoformat(), } + +# ───────────────────────────── +# Internal entrypoint for Cortex +# ───────────────────────────── +def add_exchange_internal(exchange: dict): + """ + Direct internal call — bypasses FastAPI request handling. + Cortex uses this to feed user/assistant turns directly + into Intake’s buffer and trigger full summarization. + """ + session_id = exchange.get("session_id") + if not session_id: + raise ValueError("session_id missing") + + exchange["timestamp"] = datetime.now().isoformat() + + # Ensure session exists + if session_id not in SESSIONS: + SESSIONS[session_id] = { + "buffer": deque(maxlen=200), + "created_at": datetime.now() + } + + # Append exchange into the rolling buffer + SESSIONS[session_id]["buffer"].append(exchange) + + # Trigger summarization immediately + try: + bg_summarize(session_id) + except Exception as e: + print(f"[Internal Intake] Summarization error: {e}") + + return {"ok": True, "session_id": session_id} diff --git a/cortex/router.py b/cortex/router.py index 4bce5cf..906d3d8 100644 --- a/cortex/router.py +++ b/cortex/router.py @@ -12,6 +12,8 @@ from persona.speak import speak from persona.identity import load_identity from ingest.intake_client import IntakeClient from context import collect_context, update_last_assistant_message +from intake.intake import add_exchange_internal + # ----------------------------- # Debug configuration @@ -192,8 +194,34 @@ async def run_reason(req: ReasonRequest): # ----------------------------- -# Intake ingest passthrough +# Intake ingest (internal feed) # ----------------------------- +class IngestPayload(BaseModel): + session_id: str + user_msg: str + assistant_msg: str + @cortex_router.post("/ingest") -async def ingest_stub(): - return {"status": "ok"} +async def ingest(payload: IngestPayload): + """ + Relay calls this after /reason. + We update Cortex state AND feed Intake's internal buffer. + """ + + # 1. Update Cortex session state + update_last_assistant_message(payload.session_id, payload.assistant_msg) + + # 2. Feed Intake internally (no HTTP) + try: + add_exchange_internal({ + "session_id": payload.session_id, + "user_msg": payload.user_msg, + "assistant_msg": payload.assistant_msg, + }) + + logger.debug(f"[INGEST] Added exchange to Intake for {payload.session_id}") + except Exception as e: + logger.warning(f"[INGEST] Failed to add exchange to Intake: {e}") + + return {"ok": True, "session_id": payload.session_id} + diff --git a/docker-compose.yml b/docker-compose.yml index a4493fe..ecd5f0e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -127,7 +127,7 @@ services: # - ./intake/.env # - ./.env # ports: -3 - "7080:7080" +# - "7080:7080" # volumes: # - ./intake:/app # - ./intake-logs:/app/logs