cortex rework continued.
This commit is contained in:
@@ -4,4 +4,6 @@ COPY requirements.txt .
|
||||
RUN pip install -r requirements.txt
|
||||
COPY . .
|
||||
EXPOSE 7081
|
||||
# NOTE: Running with single worker to maintain SESSIONS global state in Intake.
|
||||
# If scaling to multiple workers, migrate SESSIONS to Redis or shared storage.
|
||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7081"]
|
||||
|
||||
@@ -84,6 +84,7 @@ def _init_session(session_id: str) -> Dict[str, Any]:
|
||||
"mood": "neutral", # Future: mood tracking
|
||||
"active_project": None, # Future: project context
|
||||
"message_count": 0,
|
||||
"message_history": [],
|
||||
}
|
||||
|
||||
|
||||
@@ -275,6 +276,13 @@ async def collect_context(session_id: str, user_prompt: str) -> Dict[str, Any]:
|
||||
state["last_user_message"] = user_prompt
|
||||
state["last_timestamp"] = now
|
||||
state["message_count"] += 1
|
||||
# Save user turn to history
|
||||
state["message_history"].append({
|
||||
"user": user_prompt,
|
||||
"assistant": "" # assistant reply filled later by update_last_assistant_message()
|
||||
})
|
||||
|
||||
|
||||
|
||||
# F. Assemble unified context
|
||||
context_state = {
|
||||
@@ -311,20 +319,27 @@ async def collect_context(session_id: str, user_prompt: str) -> Dict[str, Any]:
|
||||
# -----------------------------
|
||||
def update_last_assistant_message(session_id: str, message: str) -> None:
|
||||
"""
|
||||
Update session state with assistant's response.
|
||||
|
||||
Called by router.py after persona layer completes.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier
|
||||
message: Assistant's final response text
|
||||
Update session state with assistant's response and complete
|
||||
the last turn inside message_history.
|
||||
"""
|
||||
if session_id in SESSION_STATE:
|
||||
SESSION_STATE[session_id]["last_assistant_message"] = message
|
||||
SESSION_STATE[session_id]["last_timestamp"] = datetime.now()
|
||||
logger.debug(f"Updated assistant message for session {session_id}")
|
||||
else:
|
||||
session = SESSION_STATE.get(session_id)
|
||||
if not session:
|
||||
logger.warning(f"Attempted to update non-existent session: {session_id}")
|
||||
return
|
||||
|
||||
# Update last assistant message + timestamp
|
||||
session["last_assistant_message"] = message
|
||||
session["last_timestamp"] = datetime.now()
|
||||
|
||||
# Fill in assistant reply for the most recent turn
|
||||
history = session.get("message_history", [])
|
||||
if history:
|
||||
# history entry already contains {"user": "...", "assistant": "...?"}
|
||||
history[-1]["assistant"] = message
|
||||
|
||||
if VERBOSE_DEBUG:
|
||||
logger.debug(f"Updated assistant message for session {session_id}")
|
||||
|
||||
|
||||
|
||||
def get_session_state(session_id: str) -> Optional[Dict[str, Any]]:
|
||||
|
||||
18
cortex/intake/__init__.py
Normal file
18
cortex/intake/__init__.py
Normal file
@@ -0,0 +1,18 @@
|
||||
"""
|
||||
Intake module - short-term memory summarization.
|
||||
|
||||
Runs inside the Cortex container as a pure Python module.
|
||||
No standalone API server - called internally by Cortex.
|
||||
"""
|
||||
|
||||
from .intake import (
|
||||
SESSIONS,
|
||||
add_exchange_internal,
|
||||
summarize_context,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"SESSIONS",
|
||||
"add_exchange_internal",
|
||||
"summarize_context",
|
||||
]
|
||||
@@ -1,18 +1,29 @@
|
||||
import os
|
||||
import json
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Any, TYPE_CHECKING
|
||||
from collections import deque
|
||||
from llm.llm_router import call_llm
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Global Short-Term Memory (new Intake)
|
||||
# -------------------------------------------------------------------
|
||||
SESSIONS: dict[str, dict] = {} # session_id → { buffer: deque, created_at: timestamp }
|
||||
|
||||
# Diagnostic: Verify module loads only once
|
||||
print(f"[Intake Module Init] SESSIONS object id: {id(SESSIONS)}, module: {__name__}")
|
||||
|
||||
# L10 / L20 history lives here too
|
||||
L10_HISTORY: Dict[str, list[str]] = {}
|
||||
L20_HISTORY: Dict[str, list[str]] = {}
|
||||
|
||||
from llm.llm_router import call_llm # Use Cortex's shared LLM router
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# Only for type hints — do NOT redefine SESSIONS here
|
||||
from collections import deque as _deque
|
||||
SESSIONS: dict
|
||||
L10_HISTORY: dict
|
||||
L20_HISTORY: dict
|
||||
def bg_summarize(session_id: str) -> None: ...
|
||||
|
||||
from llm.llm_router import call_llm # use Cortex's shared router
|
||||
|
||||
# ─────────────────────────────
|
||||
# Config
|
||||
# ─────────────────────────────
|
||||
@@ -220,20 +231,24 @@ def push_to_neomem(summary: str, session_id: str, level: str) -> None:
|
||||
# ─────────────────────────────
|
||||
# Main entrypoint for Cortex
|
||||
# ─────────────────────────────
|
||||
|
||||
async def summarize_context(
|
||||
session_id: str,
|
||||
exchanges: List[Dict[str, Any]],
|
||||
) -> Dict[str, Any]:
|
||||
async def summarize_context(session_id: str, exchanges: list[dict]):
|
||||
"""
|
||||
Main API used by Cortex:
|
||||
Internal summarizer that uses Cortex's LLM router.
|
||||
Produces L1 / L5 / L10 / L20 / L30 summaries.
|
||||
|
||||
summaries = await summarize_context(session_id, exchanges)
|
||||
|
||||
`exchanges` should be the recent conversation buffer for that session.
|
||||
Args:
|
||||
session_id: The conversation/session ID
|
||||
exchanges: A list of {"user_msg": ..., "assistant_msg": ..., "timestamp": ...}
|
||||
"""
|
||||
buf = list(exchanges)
|
||||
if not buf:
|
||||
|
||||
# Build raw conversation text
|
||||
convo_lines = []
|
||||
for ex in exchanges:
|
||||
convo_lines.append(f"User: {ex.get('user_msg','')}")
|
||||
convo_lines.append(f"Assistant: {ex.get('assistant_msg','')}")
|
||||
convo_text = "\n".join(convo_lines)
|
||||
|
||||
if not convo_text.strip():
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"exchange_count": 0,
|
||||
@@ -242,31 +257,72 @@ async def summarize_context(
|
||||
"L10": "",
|
||||
"L20": "",
|
||||
"L30": "",
|
||||
"last_updated": None,
|
||||
"last_updated": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
# Base levels
|
||||
L1 = await summarize_L1(buf)
|
||||
L5 = await summarize_L5(buf)
|
||||
L10 = await summarize_L10(session_id, buf)
|
||||
L20 = await summarize_L20(session_id)
|
||||
L30 = await summarize_L30(session_id)
|
||||
# Prompt the LLM (internal — no HTTP)
|
||||
prompt = f"""
|
||||
Summarize the conversation below into multiple compression levels.
|
||||
|
||||
# Push the "interesting" tiers into NeoMem
|
||||
push_to_neomem(L10, session_id, "L10")
|
||||
push_to_neomem(L20, session_id, "L20")
|
||||
push_to_neomem(L30, session_id, "L30")
|
||||
Conversation:
|
||||
----------------
|
||||
{convo_text}
|
||||
----------------
|
||||
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"exchange_count": len(buf),
|
||||
"L1": L1,
|
||||
"L5": L5,
|
||||
"L10": L10,
|
||||
"L20": L20,
|
||||
"L30": L30,
|
||||
"last_updated": datetime.now().isoformat(),
|
||||
}
|
||||
Output strictly in JSON with keys:
|
||||
L1 → ultra short summary (1–2 sentences max)
|
||||
L5 → short summary
|
||||
L10 → medium summary
|
||||
L20 → detailed overview
|
||||
L30 → full detailed summary
|
||||
|
||||
JSON only. No text outside JSON.
|
||||
"""
|
||||
|
||||
try:
|
||||
llm_response = await call_llm(
|
||||
prompt,
|
||||
temperature=0.2
|
||||
)
|
||||
|
||||
|
||||
# LLM should return JSON, parse it
|
||||
summary = json.loads(llm_response)
|
||||
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"exchange_count": len(exchanges),
|
||||
"L1": summary.get("L1", ""),
|
||||
"L5": summary.get("L5", ""),
|
||||
"L10": summary.get("L10", ""),
|
||||
"L20": summary.get("L20", ""),
|
||||
"L30": summary.get("L30", ""),
|
||||
"last_updated": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"exchange_count": len(exchanges),
|
||||
"L1": f"[Error summarizing: {str(e)}]",
|
||||
"L5": "",
|
||||
"L10": "",
|
||||
"L20": "",
|
||||
"L30": "",
|
||||
"last_updated": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
# ─────────────────────────────────
|
||||
# Background summarization stub
|
||||
# ─────────────────────────────────
|
||||
def bg_summarize(session_id: str):
|
||||
"""
|
||||
Placeholder for background summarization.
|
||||
Actual summarization happens during /reason via summarize_context().
|
||||
|
||||
This function exists to prevent NameError when called from add_exchange_internal().
|
||||
"""
|
||||
print(f"[Intake] Exchange added for {session_id}. Will summarize on next /reason call.")
|
||||
|
||||
# ─────────────────────────────
|
||||
# Internal entrypoint for Cortex
|
||||
@@ -283,15 +339,23 @@ def add_exchange_internal(exchange: dict):
|
||||
|
||||
exchange["timestamp"] = datetime.now().isoformat()
|
||||
|
||||
# DEBUG: Verify we're using the module-level SESSIONS
|
||||
print(f"[add_exchange_internal] SESSIONS object id: {id(SESSIONS)}, current sessions: {list(SESSIONS.keys())}")
|
||||
|
||||
# Ensure session exists
|
||||
if session_id not in SESSIONS:
|
||||
SESSIONS[session_id] = {
|
||||
"buffer": deque(maxlen=200),
|
||||
"created_at": datetime.now()
|
||||
}
|
||||
print(f"[add_exchange_internal] Created new session: {session_id}")
|
||||
else:
|
||||
print(f"[add_exchange_internal] Using existing session: {session_id}")
|
||||
|
||||
# Append exchange into the rolling buffer
|
||||
SESSIONS[session_id]["buffer"].append(exchange)
|
||||
buffer_len = len(SESSIONS[session_id]["buffer"])
|
||||
print(f"[add_exchange_internal] Added exchange to {session_id}, buffer now has {buffer_len} items")
|
||||
|
||||
# Trigger summarization immediately
|
||||
try:
|
||||
|
||||
106
cortex/router.py
106
cortex/router.py
@@ -197,26 +197,110 @@ class IngestPayload(BaseModel):
|
||||
user_msg: str
|
||||
assistant_msg: str
|
||||
|
||||
|
||||
@cortex_router.post("/ingest")
|
||||
async def ingest_stub():
|
||||
# Intake is internal now — this endpoint is only for compatibility.
|
||||
return {"status": "ok", "note": "intake is internal now"}
|
||||
async def ingest(payload: IngestPayload):
|
||||
"""
|
||||
Receives (session_id, user_msg, assistant_msg) from Relay
|
||||
and pushes directly into Intake's in-memory buffer.
|
||||
|
||||
|
||||
# 1. Update Cortex session state
|
||||
update_last_assistant_message(payload.session_id, payload.assistant_msg)
|
||||
|
||||
# 2. Feed Intake internally (no HTTP)
|
||||
Uses lenient error handling - always returns success to avoid
|
||||
breaking the chat pipeline.
|
||||
"""
|
||||
try:
|
||||
# 1. Update Cortex session state
|
||||
update_last_assistant_message(payload.session_id, payload.assistant_msg)
|
||||
except Exception as e:
|
||||
logger.warning(f"[INGEST] Failed to update session state: {e}")
|
||||
# Continue anyway (lenient mode)
|
||||
|
||||
try:
|
||||
# 2. Feed Intake internally (no HTTP)
|
||||
add_exchange_internal({
|
||||
"session_id": payload.session_id,
|
||||
"user_msg": payload.user_msg,
|
||||
"assistant_msg": payload.assistant_msg,
|
||||
})
|
||||
|
||||
logger.debug(f"[INGEST] Added exchange to Intake for {payload.session_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[INGEST] Failed to add exchange to Intake: {e}")
|
||||
logger.warning(f"[INGEST] Failed to add to Intake: {e}")
|
||||
# Continue anyway (lenient mode)
|
||||
|
||||
return {"ok": True, "session_id": payload.session_id}
|
||||
# Always return success (user requirement: never fail chat pipeline)
|
||||
return {
|
||||
"status": "ok",
|
||||
"session_id": payload.session_id
|
||||
}
|
||||
|
||||
# -----------------------------
|
||||
# Debug endpoint: summarized context
|
||||
# -----------------------------
|
||||
@cortex_router.get("/debug/summary")
|
||||
async def debug_summary(session_id: str):
|
||||
"""
|
||||
Diagnostic endpoint that runs Intake's summarize_context() for a session.
|
||||
|
||||
Shows exactly what L1/L5/L10/L20/L30 summaries would look like
|
||||
inside the actual Uvicorn worker, using the real SESSIONS buffer.
|
||||
"""
|
||||
from intake.intake import SESSIONS, summarize_context
|
||||
|
||||
# Validate session
|
||||
session = SESSIONS.get(session_id)
|
||||
if not session:
|
||||
return {"error": "session not found", "session_id": session_id}
|
||||
|
||||
# Convert deque into the structure summarize_context expects
|
||||
buffer = session["buffer"]
|
||||
exchanges = [
|
||||
{
|
||||
"user_msg": ex.get("user_msg", ""),
|
||||
"assistant_msg": ex.get("assistant_msg", ""),
|
||||
}
|
||||
for ex in buffer
|
||||
]
|
||||
|
||||
# 🔥 CRITICAL FIX — summarize_context is async
|
||||
summary = await summarize_context(session_id, exchanges)
|
||||
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"buffer_size": len(buffer),
|
||||
"exchanges_preview": exchanges[-5:], # last 5 items
|
||||
"summary": summary
|
||||
}
|
||||
|
||||
# -----------------------------
|
||||
# Debug endpoint for SESSIONS
|
||||
# -----------------------------
|
||||
@cortex_router.get("/debug/sessions")
|
||||
async def debug_sessions():
|
||||
"""
|
||||
Diagnostic endpoint to inspect SESSIONS from within the running Uvicorn worker.
|
||||
This shows the actual state of the in-memory SESSIONS dict.
|
||||
"""
|
||||
from intake.intake import SESSIONS
|
||||
|
||||
sessions_data = {}
|
||||
for session_id, session_info in SESSIONS.items():
|
||||
buffer = session_info["buffer"]
|
||||
sessions_data[session_id] = {
|
||||
"created_at": session_info["created_at"].isoformat(),
|
||||
"buffer_size": len(buffer),
|
||||
"buffer_maxlen": buffer.maxlen,
|
||||
"recent_exchanges": [
|
||||
{
|
||||
"user_msg": ex.get("user_msg", "")[:100],
|
||||
"assistant_msg": ex.get("assistant_msg", "")[:100],
|
||||
"timestamp": ex.get("timestamp", "")
|
||||
}
|
||||
for ex in list(buffer)[-5:] # Last 5 exchanges
|
||||
]
|
||||
}
|
||||
|
||||
return {
|
||||
"sessions_object_id": id(SESSIONS),
|
||||
"total_sessions": len(SESSIONS),
|
||||
"sessions": sessions_data
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user