# router.py import os import logging from fastapi import APIRouter, HTTPException from pydantic import BaseModel from reasoning.reasoning import reason_check from reasoning.reflection import reflect_notes from reasoning.refine import refine_answer from persona.speak import speak from persona.identity import load_identity from context import collect_context, update_last_assistant_message from intake.intake import add_exchange_internal # ----------------------------- # Debug configuration # ----------------------------- VERBOSE_DEBUG = os.getenv("VERBOSE_DEBUG", "false").lower() == "true" logger = logging.getLogger(__name__) if VERBOSE_DEBUG: logger.setLevel(logging.DEBUG) # Console handler console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( '%(asctime)s [ROUTER] %(levelname)s: %(message)s', datefmt='%H:%M:%S' )) logger.addHandler(console_handler) # File handler try: os.makedirs('/app/logs', exist_ok=True) file_handler = logging.FileHandler('/app/logs/cortex_verbose_debug.log', mode='a') file_handler.setFormatter(logging.Formatter( '%(asctime)s [ROUTER] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' )) logger.addHandler(file_handler) logger.debug("VERBOSE_DEBUG mode enabled for router.py - logging to file") except Exception as e: logger.debug(f"VERBOSE_DEBUG mode enabled for router.py - file logging failed: {e}") # ----------------------------- # Router (NOT FastAPI app) # ----------------------------- cortex_router = APIRouter() # ----------------------------- # Pydantic models # ----------------------------- class ReasonRequest(BaseModel): session_id: str user_prompt: str temperature: float | None = None # ----------------------------- # /reason endpoint # ----------------------------- @cortex_router.post("/reason") async def run_reason(req: ReasonRequest): if VERBOSE_DEBUG: logger.debug(f"\n{'='*80}") logger.debug(f"[PIPELINE START] Session: {req.session_id}") logger.debug(f"[PIPELINE START] User prompt: {req.user_prompt[:200]}...") logger.debug(f"{'='*80}\n") # 0. Collect unified context from all sources if VERBOSE_DEBUG: logger.debug("[STAGE 0] Collecting unified context...") context_state = await collect_context(req.session_id, req.user_prompt) if VERBOSE_DEBUG: logger.debug(f"[STAGE 0] Context collected - {len(context_state.get('rag', []))} RAG results") # 0.5. Load identity block if VERBOSE_DEBUG: logger.debug("[STAGE 0.5] Loading identity block...") identity_block = load_identity(req.session_id) if VERBOSE_DEBUG: logger.debug(f"[STAGE 0.5] Identity loaded: {identity_block.get('name', 'Unknown')}") # 1. Extract Intake summary for reflection # Use L20 (Session Overview) as primary summary for reflection intake_summary = "(no context available)" if context_state.get("intake"): l20_summary = context_state["intake"].get("L20") if l20_summary and isinstance(l20_summary, dict): intake_summary = l20_summary.get("summary", "(no context available)") elif isinstance(l20_summary, str): intake_summary = l20_summary if VERBOSE_DEBUG: logger.debug(f"[STAGE 1] Intake summary extracted (L20): {intake_summary[:150]}...") # 2. Reflection if VERBOSE_DEBUG: logger.debug("[STAGE 2] Running reflection...") try: reflection = await reflect_notes(intake_summary, identity_block=identity_block) reflection_notes = reflection.get("notes", []) if VERBOSE_DEBUG: logger.debug(f"[STAGE 2] Reflection complete - {len(reflection_notes)} notes generated") for idx, note in enumerate(reflection_notes, 1): logger.debug(f" Note {idx}: {note}") except Exception as e: reflection_notes = [] if VERBOSE_DEBUG: logger.debug(f"[STAGE 2] Reflection failed: {e}") # 3. First-pass reasoning draft if VERBOSE_DEBUG: logger.debug("[STAGE 3] Running reasoning (draft)...") draft = await reason_check( req.user_prompt, identity_block=identity_block, rag_block=context_state.get("rag", []), reflection_notes=reflection_notes, context=context_state ) if VERBOSE_DEBUG: logger.debug(f"[STAGE 3] Draft answer ({len(draft)} chars):") logger.debug(f"--- DRAFT START ---\n{draft}\n--- DRAFT END ---") # 4. Refinement if VERBOSE_DEBUG: logger.debug("[STAGE 4] Running refinement...") result = await refine_answer( draft_output=draft, reflection_notes=reflection_notes, identity_block=identity_block, rag_block=context_state.get("rag", []), ) final_neutral = result["final_output"] if VERBOSE_DEBUG: logger.debug(f"[STAGE 4] Refined answer ({len(final_neutral)} chars):") logger.debug(f"--- REFINED START ---\n{final_neutral}\n--- REFINED END ---") # 5. Persona layer if VERBOSE_DEBUG: logger.debug("[STAGE 5] Applying persona layer...") persona_answer = await speak(final_neutral) if VERBOSE_DEBUG: logger.debug(f"[STAGE 5] Persona answer ({len(persona_answer)} chars):") logger.debug(f"--- PERSONA START ---\n{persona_answer}\n--- PERSONA END ---") # 6. Update session state with assistant's response if VERBOSE_DEBUG: logger.debug("[STAGE 6] Updating session state...") update_last_assistant_message(req.session_id, persona_answer) if VERBOSE_DEBUG: logger.debug(f"\n{'='*80}") logger.debug(f"[PIPELINE COMPLETE] Session: {req.session_id}") logger.debug(f"[PIPELINE COMPLETE] Final answer length: {len(persona_answer)} chars") logger.debug(f"{'='*80}\n") # 7. Return full bundle return { "draft": draft, "neutral": final_neutral, "persona": persona_answer, "reflection": reflection_notes, "session_id": req.session_id, "context_summary": { "rag_results": len(context_state.get("rag", [])), "minutes_since_last": context_state.get("minutes_since_last_msg"), "message_count": context_state.get("message_count"), "mode": context_state.get("mode"), } } # ----------------------------- # Intake ingest (internal feed) # ----------------------------- class IngestPayload(BaseModel): session_id: str user_msg: str assistant_msg: str @cortex_router.post("/ingest") async def ingest(payload: IngestPayload): """ Receives (session_id, user_msg, assistant_msg) from Relay and pushes directly into Intake's in-memory buffer. Uses lenient error handling - always returns success to avoid breaking the chat pipeline. """ try: # 1. Update Cortex session state update_last_assistant_message(payload.session_id, payload.assistant_msg) except Exception as e: logger.warning(f"[INGEST] Failed to update session state: {e}") # Continue anyway (lenient mode) try: # 2. Feed Intake internally (no HTTP) add_exchange_internal({ "session_id": payload.session_id, "user_msg": payload.user_msg, "assistant_msg": payload.assistant_msg, }) logger.debug(f"[INGEST] Added exchange to Intake for {payload.session_id}") except Exception as e: logger.warning(f"[INGEST] Failed to add to Intake: {e}") # Continue anyway (lenient mode) # Always return success (user requirement: never fail chat pipeline) return { "status": "ok", "session_id": payload.session_id } # ----------------------------- # Debug endpoint: summarized context # ----------------------------- @cortex_router.get("/debug/summary") async def debug_summary(session_id: str): """ Diagnostic endpoint that runs Intake's summarize_context() for a session. Shows exactly what L1/L5/L10/L20/L30 summaries would look like inside the actual Uvicorn worker, using the real SESSIONS buffer. """ from intake.intake import SESSIONS, summarize_context # Validate session session = SESSIONS.get(session_id) if not session: return {"error": "session not found", "session_id": session_id} # Convert deque into the structure summarize_context expects buffer = session["buffer"] exchanges = [ { "user_msg": ex.get("user_msg", ""), "assistant_msg": ex.get("assistant_msg", ""), } for ex in buffer ] # 🔥 CRITICAL FIX — summarize_context is async summary = await summarize_context(session_id, exchanges) return { "session_id": session_id, "buffer_size": len(buffer), "exchanges_preview": exchanges[-5:], # last 5 items "summary": summary } # ----------------------------- # Debug endpoint for SESSIONS # ----------------------------- @cortex_router.get("/debug/sessions") async def debug_sessions(): """ Diagnostic endpoint to inspect SESSIONS from within the running Uvicorn worker. This shows the actual state of the in-memory SESSIONS dict. """ from intake.intake import SESSIONS sessions_data = {} for session_id, session_info in SESSIONS.items(): buffer = session_info["buffer"] sessions_data[session_id] = { "created_at": session_info["created_at"].isoformat(), "buffer_size": len(buffer), "buffer_maxlen": buffer.maxlen, "recent_exchanges": [ { "user_msg": ex.get("user_msg", "")[:100], "assistant_msg": ex.get("assistant_msg", "")[:100], "timestamp": ex.get("timestamp", "") } for ex in list(buffer)[-5:] # Last 5 exchanges ] } return { "sessions_object_id": id(SESSIONS), "total_sessions": len(SESSIONS), "sessions": sessions_data }