# router.py import os import logging import asyncio from fastapi import APIRouter from fastapi.responses import StreamingResponse from pydantic import BaseModel from reasoning.reasoning import reason_check from reasoning.reflection import reflect_notes from reasoning.refine import refine_answer from persona.speak import speak from persona.identity import load_identity from context import collect_context, update_last_assistant_message from intake.intake import add_exchange_internal from autonomy.monologue.monologue import InnerMonologue from autonomy.self.state import load_self_state from autonomy.tools.stream_events import get_stream_manager # ------------------------------------------------------------------- # Setup # ------------------------------------------------------------------- LOG_DETAIL_LEVEL = os.getenv("LOG_DETAIL_LEVEL", "summary").lower() logger = logging.getLogger(__name__) # Always set up basic logging logger.setLevel(logging.INFO) console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( '%(asctime)s [ROUTER] %(levelname)s: %(message)s', datefmt='%H:%M:%S' )) logger.addHandler(console_handler) cortex_router = APIRouter() inner_monologue = InnerMonologue() # ------------------------------------------------------------------- # Models # ------------------------------------------------------------------- class ReasonRequest(BaseModel): session_id: str user_prompt: str temperature: float | None = None backend: str | None = None # ------------------------------------------------------------------- # /reason endpoint # ------------------------------------------------------------------- @cortex_router.post("/reason") async def run_reason(req: ReasonRequest): from datetime import datetime pipeline_start = datetime.now() stage_timings = {} # Show pipeline start in detailed/verbose mode if LOG_DETAIL_LEVEL in ["detailed", "verbose"]: logger.info(f"\n{'='*100}") logger.info(f"🚀 PIPELINE START | Session: {req.session_id} | {datetime.now().strftime('%H:%M:%S.%f')[:-3]}") logger.info(f"{'='*100}") logger.info(f"📝 User: {req.user_prompt[:150]}...") logger.info(f"{'-'*100}\n") # ---------------------------------------------------------------- # STAGE 0 — Context # ---------------------------------------------------------------- stage_start = datetime.now() context_state = await collect_context(req.session_id, req.user_prompt) stage_timings["context"] = (datetime.now() - stage_start).total_seconds() * 1000 # ---------------------------------------------------------------- # STAGE 0.5 — Identity # ---------------------------------------------------------------- stage_start = datetime.now() identity_block = load_identity(req.session_id) stage_timings["identity"] = (datetime.now() - stage_start).total_seconds() * 1000 # ---------------------------------------------------------------- # STAGE 0.6 — Inner Monologue (observer-only) # ---------------------------------------------------------------- stage_start = datetime.now() inner_result = None try: self_state = load_self_state() mono_context = { "user_message": req.user_prompt, "session_id": req.session_id, "self_state": self_state, "context_summary": context_state, } inner_result = await inner_monologue.process(mono_context) logger.info(f"🧠 Monologue | {inner_result.get('intent', 'unknown')} | Tone: {inner_result.get('tone', 'neutral')}") # Store in context for downstream use context_state["monologue"] = inner_result except Exception as e: logger.warning(f"⚠️ Monologue failed: {e}") stage_timings["monologue"] = (datetime.now() - stage_start).total_seconds() * 1000 # ---------------------------------------------------------------- # STAGE 0.7 — Executive Planning (conditional) # ---------------------------------------------------------------- stage_start = datetime.now() executive_plan = None if inner_result and inner_result.get("consult_executive"): try: from autonomy.executive.planner import plan_execution executive_plan = await plan_execution( user_prompt=req.user_prompt, intent=inner_result.get("intent", "unknown"), context_state=context_state, identity_block=identity_block ) logger.info(f"🎯 Executive plan: {executive_plan.get('summary', 'N/A')[:80]}...") except Exception as e: logger.warning(f"⚠️ Executive planning failed: {e}") executive_plan = None stage_timings["executive"] = (datetime.now() - stage_start).total_seconds() * 1000 # ---------------------------------------------------------------- # STAGE 0.8 — Autonomous Tool Invocation # ---------------------------------------------------------------- stage_start = datetime.now() tool_results = None autonomous_enabled = os.getenv("ENABLE_AUTONOMOUS_TOOLS", "true").lower() == "true" tool_confidence_threshold = float(os.getenv("AUTONOMOUS_TOOL_CONFIDENCE_THRESHOLD", "0.6")) if autonomous_enabled and inner_result: try: from autonomy.tools.decision_engine import ToolDecisionEngine from autonomy.tools.orchestrator import ToolOrchestrator # Analyze which tools to invoke decision_engine = ToolDecisionEngine() tool_decision = await decision_engine.analyze_tool_needs( user_prompt=req.user_prompt, monologue=inner_result, context_state=context_state, available_tools=["RAG", "WEB", "WEATHER", "CODEBRAIN"] ) # Execute tools if confidence threshold met if tool_decision["should_invoke_tools"] and tool_decision["confidence"] >= tool_confidence_threshold: orchestrator = ToolOrchestrator(tool_timeout=30) tool_results = await orchestrator.execute_tools( tools_to_invoke=tool_decision["tools_to_invoke"], context_state=context_state ) # Format results for context injection tool_context = orchestrator.format_results_for_context(tool_results) context_state["autonomous_tool_results"] = tool_context summary = tool_results.get("execution_summary", {}) logger.info(f"🛠️ Tools executed: {summary.get('successful', [])} succeeded") else: logger.info(f"🛠️ No tools invoked (confidence: {tool_decision.get('confidence', 0):.2f})") except Exception as e: logger.warning(f"⚠️ Autonomous tool invocation failed: {e}") if LOG_DETAIL_LEVEL == "verbose": import traceback traceback.print_exc() stage_timings["tools"] = (datetime.now() - stage_start).total_seconds() * 1000 # ---------------------------------------------------------------- # STAGE 1-5 — Core Reasoning Pipeline # ---------------------------------------------------------------- stage_start = datetime.now() # Extract intake summary intake_summary = "(no context available)" if context_state.get("intake"): l20 = context_state["intake"].get("L20") if isinstance(l20, dict): intake_summary = l20.get("summary", intake_summary) elif isinstance(l20, str): intake_summary = l20 # Reflection try: reflection = await reflect_notes(intake_summary, identity_block=identity_block) reflection_notes = reflection.get("notes", []) except Exception as e: reflection_notes = [] logger.warning(f"⚠️ Reflection failed: {e}") stage_timings["reflection"] = (datetime.now() - stage_start).total_seconds() * 1000 # Reasoning (draft) stage_start = datetime.now() draft = await reason_check( req.user_prompt, identity_block=identity_block, rag_block=context_state.get("rag", []), reflection_notes=reflection_notes, context=context_state, monologue=inner_result, executive_plan=executive_plan ) stage_timings["reasoning"] = (datetime.now() - stage_start).total_seconds() * 1000 # Refinement stage_start = datetime.now() result = await refine_answer( draft_output=draft, reflection_notes=reflection_notes, identity_block=identity_block, rag_block=context_state.get("rag", []), ) final_neutral = result["final_output"] stage_timings["refinement"] = (datetime.now() - stage_start).total_seconds() * 1000 # Persona stage_start = datetime.now() tone = inner_result.get("tone", "neutral") if inner_result else "neutral" depth = inner_result.get("depth", "medium") if inner_result else "medium" persona_answer = await speak(final_neutral, tone=tone, depth=depth) stage_timings["persona"] = (datetime.now() - stage_start).total_seconds() * 1000 # ---------------------------------------------------------------- # STAGE 6 — Session update # ---------------------------------------------------------------- update_last_assistant_message(req.session_id, persona_answer) # ---------------------------------------------------------------- # STAGE 6.5 — Self-state update & Pattern Learning # ---------------------------------------------------------------- stage_start = datetime.now() try: from autonomy.self.analyzer import analyze_and_update_state await analyze_and_update_state( monologue=inner_result or {}, user_prompt=req.user_prompt, response=persona_answer, context=context_state ) except Exception as e: logger.warning(f"⚠️ Self-state update failed: {e}") try: from autonomy.learning.pattern_learner import get_pattern_learner learner = get_pattern_learner() await learner.learn_from_interaction( user_prompt=req.user_prompt, response=persona_answer, monologue=inner_result or {}, context=context_state ) except Exception as e: logger.warning(f"⚠️ Pattern learning failed: {e}") stage_timings["learning"] = (datetime.now() - stage_start).total_seconds() * 1000 # ---------------------------------------------------------------- # STAGE 7 — Proactive Monitoring & Suggestions # ---------------------------------------------------------------- stage_start = datetime.now() proactive_enabled = os.getenv("ENABLE_PROACTIVE_MONITORING", "true").lower() == "true" proactive_min_priority = float(os.getenv("PROACTIVE_SUGGESTION_MIN_PRIORITY", "0.6")) if proactive_enabled: try: from autonomy.proactive.monitor import get_proactive_monitor monitor = get_proactive_monitor(min_priority=proactive_min_priority) self_state = load_self_state() suggestion = await monitor.analyze_session( session_id=req.session_id, context_state=context_state, self_state=self_state ) if suggestion: suggestion_text = monitor.format_suggestion(suggestion) persona_answer += suggestion_text logger.info(f"💡 Proactive suggestion: {suggestion['type']} (priority: {suggestion['priority']:.2f})") except Exception as e: logger.warning(f"⚠️ Proactive monitoring failed: {e}") stage_timings["proactive"] = (datetime.now() - stage_start).total_seconds() * 1000 # ---------------------------------------------------------------- # PIPELINE COMPLETE — Summary # ---------------------------------------------------------------- total_duration = (datetime.now() - pipeline_start).total_seconds() * 1000 # Always show pipeline completion logger.info(f"\n{'='*100}") logger.info(f"✨ PIPELINE COMPLETE | Session: {req.session_id} | Total: {total_duration:.0f}ms") logger.info(f"{'='*100}") # Show timing breakdown in detailed/verbose mode if LOG_DETAIL_LEVEL in ["detailed", "verbose"]: logger.info("⏱️ Stage Timings:") for stage, duration in stage_timings.items(): pct = (duration / total_duration) * 100 if total_duration > 0 else 0 logger.info(f" {stage:15s}: {duration:6.0f}ms ({pct:5.1f}%)") logger.info(f"📤 Output: {len(persona_answer)} chars") logger.info(f"{'='*100}\n") # ---------------------------------------------------------------- # RETURN # ---------------------------------------------------------------- return { "draft": draft, "neutral": final_neutral, "persona": persona_answer, "reflection": reflection_notes, "session_id": req.session_id, "context_summary": { "rag_results": len(context_state.get("rag", [])), "minutes_since_last": context_state.get("minutes_since_last_msg"), "message_count": context_state.get("message_count"), "mode": context_state.get("mode"), } } # ------------------------------------------------------------------- # /simple endpoint - Standard chatbot mode (no reasoning pipeline) # ------------------------------------------------------------------- @cortex_router.post("/simple") async def run_simple(req: ReasonRequest): """ Standard chatbot mode - bypasses all cortex reasoning pipeline. Just a simple conversation loop like a typical chatbot. """ from datetime import datetime from llm.llm_router import call_llm from autonomy.tools.function_caller import FunctionCaller start_time = datetime.now() logger.info(f"\n{'='*100}") logger.info(f"💬 SIMPLE MODE | Session: {req.session_id} | {datetime.now().strftime('%H:%M:%S.%f')[:-3]}") logger.info(f"{'='*100}") logger.info(f"📝 User: {req.user_prompt[:150]}...") logger.info(f"{'-'*100}\n") # Get conversation history from context and intake buffer context_state = await collect_context(req.session_id, req.user_prompt) # Get recent messages from Intake buffer from intake.intake import get_recent_messages recent_msgs = get_recent_messages(req.session_id, limit=20) logger.info(f"📋 Retrieved {len(recent_msgs)} recent messages from Intake buffer") # Build simple conversation history with system message system_message = { "role": "system", "content": ( "You are a helpful AI assistant. Provide direct, concise responses to the user's questions. " "Maintain context from previous messages in the conversation." ) } messages = [system_message] # Add conversation history if recent_msgs: for msg in recent_msgs: messages.append({ "role": msg.get("role", "user"), "content": msg.get("content", "") }) logger.info(f" - {msg.get('role')}: {msg.get('content', '')[:50]}...") # Add current user message messages.append({ "role": "user", "content": req.user_prompt }) logger.info(f"📨 Total messages being sent to LLM: {len(messages)} (including system message)") # Get backend from request, otherwise fall back to env variable backend = req.backend if req.backend else os.getenv("STANDARD_MODE_LLM", "SECONDARY") backend = backend.upper() # Normalize to uppercase logger.info(f"🔧 Using backend: {backend}") temperature = req.temperature if req.temperature is not None else 0.7 # Check if tools are enabled enable_tools = os.getenv("STANDARD_MODE_ENABLE_TOOLS", "false").lower() == "true" # Call LLM with or without tools try: if enable_tools: # Use FunctionCaller for tool-enabled conversation logger.info(f"🛠️ Tool calling enabled for Standard Mode") logger.info(f"🔍 Creating FunctionCaller with backend={backend}, temp={temperature}") function_caller = FunctionCaller(backend, temperature) logger.info(f"🔍 FunctionCaller created, calling call_with_tools...") result = await function_caller.call_with_tools( messages=messages, max_tokens=2048, session_id=req.session_id # Pass session_id for streaming ) logger.info(f"🔍 call_with_tools returned: iterations={result.get('iterations')}, tool_calls={len(result.get('tool_calls', []))}") # Log tool usage if result.get("tool_calls"): tool_names = [tc["name"] for tc in result["tool_calls"]] logger.info(f"🔧 Tools used: {', '.join(tool_names)} ({result['iterations']} iterations)") response = result["content"].strip() else: # Direct LLM call without tools (original behavior) raw_response = await call_llm( messages=messages, backend=backend, temperature=temperature, max_tokens=2048 ) response = raw_response.strip() except Exception as e: logger.error(f"❌ LLM call failed: {e}") response = f"Error: {str(e)}" # Update session with the exchange try: update_last_assistant_message(req.session_id, response) add_exchange_internal({ "session_id": req.session_id, "role": "user", "content": req.user_prompt }) add_exchange_internal({ "session_id": req.session_id, "role": "assistant", "content": response }) except Exception as e: logger.warning(f"⚠️ Session update failed: {e}") duration = (datetime.now() - start_time).total_seconds() * 1000 logger.info(f"\n{'='*100}") logger.info(f"✨ SIMPLE MODE COMPLETE | Session: {req.session_id} | Total: {duration:.0f}ms") logger.info(f"📤 Output: {len(response)} chars") logger.info(f"{'='*100}\n") return { "draft": response, "neutral": response, "persona": response, "reflection": "", "session_id": req.session_id, "context_summary": { "message_count": len(messages), "mode": "standard" } } # ------------------------------------------------------------------- # /stream/thinking endpoint - SSE stream for "show your work" # ------------------------------------------------------------------- @cortex_router.get("/stream/thinking/{session_id}") async def stream_thinking(session_id: str): """ Server-Sent Events stream for tool calling "show your work" feature. Streams real-time updates about: - Thinking/planning steps - Tool calls being made - Tool execution results - Final completion """ stream_manager = get_stream_manager() queue = stream_manager.subscribe(session_id) async def event_generator(): try: # Send initial connection message import json connected_event = json.dumps({"type": "connected", "session_id": session_id}) yield f"data: {connected_event}\n\n" while True: # Wait for events with timeout to send keepalive try: event = await asyncio.wait_for(queue.get(), timeout=30.0) # Format as SSE event_data = json.dumps(event) yield f"data: {event_data}\n\n" # If it's a "done" event, close the stream if event.get("type") == "done": break except asyncio.TimeoutError: # Send keepalive comment yield ": keepalive\n\n" except asyncio.CancelledError: logger.info(f"Stream cancelled for session {session_id}") finally: stream_manager.unsubscribe(session_id, queue) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Disable nginx buffering } ) # ------------------------------------------------------------------- # /ingest endpoint (internal) # ------------------------------------------------------------------- class IngestPayload(BaseModel): session_id: str user_msg: str assistant_msg: str @cortex_router.post("/ingest") async def ingest(payload: IngestPayload): try: update_last_assistant_message(payload.session_id, payload.assistant_msg) except Exception as e: logger.warning(f"[INGEST] Session update failed: {e}") try: add_exchange_internal({ "session_id": payload.session_id, "user_msg": payload.user_msg, "assistant_msg": payload.assistant_msg, }) except Exception as e: logger.warning(f"[INGEST] Intake update failed: {e}") return {"status": "ok", "session_id": payload.session_id}