feat: Refactor LLM router and integrate health check endpoint

- Simplified LLM call logic in llm_router.py, removing tool adapter complexity and enhancing error handling.
- Added health check endpoint to main.py for system status verification.
- Cleaned up router.py by removing unused imports and commented-out code, streamlining the structure.
- Updated docker-compose.yml to unify services under a single Lyra container, enhancing deployment simplicity.
- Created Dockerfile for unified container setup, including both Relay and Cortex services.
- Added QUICKSTART.md for improved onboarding and usage instructions.
- Implemented start.sh script to manage service startup and health checks.
This commit is contained in:
2026-05-29 18:20:56 -04:00
parent 376b8114ad
commit 5f53fb32a4
14 changed files with 802 additions and 1665 deletions
+1 -392
View File
@@ -6,21 +6,8 @@ import asyncio
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from reasoning.reasoning import reason_check
from reasoning.reflection import reflect_notes
from reasoning.refine import refine_answer
from persona.speak import speak
from persona.identity import load_identity
from context import collect_context, update_last_assistant_message
from intake.intake import add_exchange_internal
from autonomy.monologue.monologue import InnerMonologue
from autonomy.self.state import load_self_state
from autonomy.tools.stream_events import get_stream_manager
# -------------------------------------------------------------------
# Setup
# -------------------------------------------------------------------
LOG_DETAIL_LEVEL = os.getenv("LOG_DETAIL_LEVEL", "summary").lower()
@@ -35,10 +22,7 @@ console_handler.setFormatter(logging.Formatter(
))
logger.addHandler(console_handler)
cortex_router = APIRouter()
inner_monologue = InnerMonologue()
# -------------------------------------------------------------------
# Models
@@ -49,292 +33,6 @@ class ReasonRequest(BaseModel):
temperature: float | None = None
backend: str | None = None
# -------------------------------------------------------------------
# /reason endpoint
# -------------------------------------------------------------------
@cortex_router.post("/reason")
async def run_reason(req: ReasonRequest):
from datetime import datetime
pipeline_start = datetime.now()
stage_timings = {}
# Show pipeline start in detailed/verbose mode
if LOG_DETAIL_LEVEL in ["detailed", "verbose"]:
logger.info(f"\n{'='*100}")
logger.info(f"🚀 PIPELINE START | Session: {req.session_id} | {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
logger.info(f"{'='*100}")
logger.info(f"📝 User: {req.user_prompt[:150]}...")
logger.info(f"{'-'*100}\n")
# ----------------------------------------------------------------
# STAGE 0 — Context
# ----------------------------------------------------------------
stage_start = datetime.now()
context_state = await collect_context(req.session_id, req.user_prompt)
stage_timings["context"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 0.5 — Identity
# ----------------------------------------------------------------
stage_start = datetime.now()
identity_block = load_identity(req.session_id)
stage_timings["identity"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 0.6 — Inner Monologue (observer-only)
# ----------------------------------------------------------------
stage_start = datetime.now()
inner_result = None
try:
self_state = load_self_state()
mono_context = {
"user_message": req.user_prompt,
"session_id": req.session_id,
"self_state": self_state,
"context_summary": context_state,
}
inner_result = await inner_monologue.process(mono_context)
logger.info(f"🧠 Monologue | {inner_result.get('intent', 'unknown')} | Tone: {inner_result.get('tone', 'neutral')}")
# Store in context for downstream use
context_state["monologue"] = inner_result
except Exception as e:
logger.warning(f"⚠️ Monologue failed: {e}")
stage_timings["monologue"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 0.7 — Executive Planning (conditional)
# ----------------------------------------------------------------
stage_start = datetime.now()
executive_plan = None
if inner_result and inner_result.get("consult_executive"):
try:
from autonomy.executive.planner import plan_execution
executive_plan = await plan_execution(
user_prompt=req.user_prompt,
intent=inner_result.get("intent", "unknown"),
context_state=context_state,
identity_block=identity_block
)
logger.info(f"🎯 Executive plan: {executive_plan.get('summary', 'N/A')[:80]}...")
except Exception as e:
logger.warning(f"⚠️ Executive planning failed: {e}")
executive_plan = None
stage_timings["executive"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 0.8 — Autonomous Tool Invocation
# ----------------------------------------------------------------
stage_start = datetime.now()
tool_results = None
autonomous_enabled = os.getenv("ENABLE_AUTONOMOUS_TOOLS", "true").lower() == "true"
tool_confidence_threshold = float(os.getenv("AUTONOMOUS_TOOL_CONFIDENCE_THRESHOLD", "0.6"))
if autonomous_enabled and inner_result:
try:
from autonomy.tools.decision_engine import ToolDecisionEngine
from autonomy.tools.orchestrator import ToolOrchestrator
# Analyze which tools to invoke
decision_engine = ToolDecisionEngine()
tool_decision = await decision_engine.analyze_tool_needs(
user_prompt=req.user_prompt,
monologue=inner_result,
context_state=context_state,
available_tools=["RAG", "WEB", "WEATHER", "CODEBRAIN"]
)
# Execute tools if confidence threshold met
if tool_decision["should_invoke_tools"] and tool_decision["confidence"] >= tool_confidence_threshold:
orchestrator = ToolOrchestrator(tool_timeout=30)
tool_results = await orchestrator.execute_tools(
tools_to_invoke=tool_decision["tools_to_invoke"],
context_state=context_state
)
# Format results for context injection
tool_context = orchestrator.format_results_for_context(tool_results)
context_state["autonomous_tool_results"] = tool_context
summary = tool_results.get("execution_summary", {})
logger.info(f"🛠️ Tools executed: {summary.get('successful', [])} succeeded")
else:
logger.info(f"🛠️ No tools invoked (confidence: {tool_decision.get('confidence', 0):.2f})")
except Exception as e:
logger.warning(f"⚠️ Autonomous tool invocation failed: {e}")
if LOG_DETAIL_LEVEL == "verbose":
import traceback
traceback.print_exc()
stage_timings["tools"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 1-5 — Core Reasoning Pipeline
# ----------------------------------------------------------------
stage_start = datetime.now()
# Extract intake summary
intake_summary = "(no context available)"
if context_state.get("intake"):
l20 = context_state["intake"].get("L20")
if isinstance(l20, dict):
intake_summary = l20.get("summary", intake_summary)
elif isinstance(l20, str):
intake_summary = l20
# Reflection
try:
reflection = await reflect_notes(intake_summary, identity_block=identity_block)
reflection_notes = reflection.get("notes", [])
except Exception as e:
reflection_notes = []
logger.warning(f"⚠️ Reflection failed: {e}")
stage_timings["reflection"] = (datetime.now() - stage_start).total_seconds() * 1000
# Reasoning (draft)
stage_start = datetime.now()
draft = await reason_check(
req.user_prompt,
identity_block=identity_block,
rag_block=context_state.get("rag", []),
reflection_notes=reflection_notes,
context=context_state,
monologue=inner_result,
executive_plan=executive_plan
)
stage_timings["reasoning"] = (datetime.now() - stage_start).total_seconds() * 1000
# Refinement
stage_start = datetime.now()
result = await refine_answer(
draft_output=draft,
reflection_notes=reflection_notes,
identity_block=identity_block,
rag_block=context_state.get("rag", []),
)
final_neutral = result["final_output"]
stage_timings["refinement"] = (datetime.now() - stage_start).total_seconds() * 1000
# Persona
stage_start = datetime.now()
tone = inner_result.get("tone", "neutral") if inner_result else "neutral"
depth = inner_result.get("depth", "medium") if inner_result else "medium"
persona_answer = await speak(final_neutral, tone=tone, depth=depth)
stage_timings["persona"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 6 — Session update
# ----------------------------------------------------------------
update_last_assistant_message(req.session_id, persona_answer)
# ----------------------------------------------------------------
# STAGE 6.5 — Self-state update & Pattern Learning
# ----------------------------------------------------------------
stage_start = datetime.now()
try:
from autonomy.self.analyzer import analyze_and_update_state
await analyze_and_update_state(
monologue=inner_result or {},
user_prompt=req.user_prompt,
response=persona_answer,
context=context_state
)
except Exception as e:
logger.warning(f"⚠️ Self-state update failed: {e}")
try:
from autonomy.learning.pattern_learner import get_pattern_learner
learner = get_pattern_learner()
await learner.learn_from_interaction(
user_prompt=req.user_prompt,
response=persona_answer,
monologue=inner_result or {},
context=context_state
)
except Exception as e:
logger.warning(f"⚠️ Pattern learning failed: {e}")
stage_timings["learning"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 7 — Proactive Monitoring & Suggestions
# ----------------------------------------------------------------
stage_start = datetime.now()
proactive_enabled = os.getenv("ENABLE_PROACTIVE_MONITORING", "true").lower() == "true"
proactive_min_priority = float(os.getenv("PROACTIVE_SUGGESTION_MIN_PRIORITY", "0.6"))
if proactive_enabled:
try:
from autonomy.proactive.monitor import get_proactive_monitor
monitor = get_proactive_monitor(min_priority=proactive_min_priority)
self_state = load_self_state()
suggestion = await monitor.analyze_session(
session_id=req.session_id,
context_state=context_state,
self_state=self_state
)
if suggestion:
suggestion_text = monitor.format_suggestion(suggestion)
persona_answer += suggestion_text
logger.info(f"💡 Proactive suggestion: {suggestion['type']} (priority: {suggestion['priority']:.2f})")
except Exception as e:
logger.warning(f"⚠️ Proactive monitoring failed: {e}")
stage_timings["proactive"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# PIPELINE COMPLETE — Summary
# ----------------------------------------------------------------
total_duration = (datetime.now() - pipeline_start).total_seconds() * 1000
# Always show pipeline completion
logger.info(f"\n{'='*100}")
logger.info(f"✨ PIPELINE COMPLETE | Session: {req.session_id} | Total: {total_duration:.0f}ms")
logger.info(f"{'='*100}")
# Show timing breakdown in detailed/verbose mode
if LOG_DETAIL_LEVEL in ["detailed", "verbose"]:
logger.info("⏱️ Stage Timings:")
for stage, duration in stage_timings.items():
pct = (duration / total_duration) * 100 if total_duration > 0 else 0
logger.info(f" {stage:15s}: {duration:6.0f}ms ({pct:5.1f}%)")
logger.info(f"📤 Output: {len(persona_answer)} chars")
logger.info(f"{'='*100}\n")
# ----------------------------------------------------------------
# RETURN
# ----------------------------------------------------------------
return {
"draft": draft,
"neutral": final_neutral,
"persona": persona_answer,
"reflection": reflection_notes,
"session_id": req.session_id,
"context_summary": {
"rag_results": len(context_state.get("rag", [])),
"minutes_since_last": context_state.get("minutes_since_last_msg"),
"message_count": context_state.get("message_count"),
"mode": context_state.get("mode"),
}
}
# -------------------------------------------------------------------
# /simple endpoint - Standard chatbot mode (no reasoning pipeline)
# -------------------------------------------------------------------
@@ -346,7 +44,6 @@ async def run_simple(req: ReasonRequest):
"""
from datetime import datetime
from llm.llm_router import call_llm
from autonomy.tools.function_caller import FunctionCaller
start_time = datetime.now()
@@ -356,9 +53,6 @@ async def run_simple(req: ReasonRequest):
logger.info(f"📝 User: {req.user_prompt[:150]}...")
logger.info(f"{'-'*100}\n")
# Get conversation history from context and intake buffer
context_state = await collect_context(req.session_id, req.user_prompt)
# Get recent messages from Intake buffer
from intake.intake import get_recent_messages
recent_msgs = get_recent_messages(req.session_id, limit=20)
@@ -400,31 +94,10 @@ async def run_simple(req: ReasonRequest):
temperature = req.temperature if req.temperature is not None else 0.7
# Check if tools are enabled
enable_tools = os.getenv("STANDARD_MODE_ENABLE_TOOLS", "false").lower() == "true"
# Call LLM with or without tools
try:
if enable_tools:
# Use FunctionCaller for tool-enabled conversation
logger.info(f"🛠️ Tool calling enabled for Standard Mode")
logger.info(f"🔍 Creating FunctionCaller with backend={backend}, temp={temperature}")
function_caller = FunctionCaller(backend, temperature)
logger.info(f"🔍 FunctionCaller created, calling call_with_tools...")
result = await function_caller.call_with_tools(
messages=messages,
max_tokens=2048,
session_id=req.session_id # Pass session_id for streaming
)
logger.info(f"🔍 call_with_tools returned: iterations={result.get('iterations')}, tool_calls={len(result.get('tool_calls', []))}")
# Log tool usage
if result.get("tool_calls"):
tool_names = [tc["name"] for tc in result["tool_calls"]]
logger.info(f"🔧 Tools used: {', '.join(tool_names)} ({result['iterations']} iterations)")
response = result["content"].strip()
else:
# Direct LLM call without tools (original behavior)
raw_response = await call_llm(
messages=messages,
@@ -440,7 +113,6 @@ async def run_simple(req: ReasonRequest):
# Update session with the exchange
try:
update_last_assistant_message(req.session_id, response)
add_exchange_internal({
"session_id": req.session_id,
"role": "user",
@@ -473,64 +145,6 @@ async def run_simple(req: ReasonRequest):
}
}
# -------------------------------------------------------------------
# /stream/thinking endpoint - SSE stream for "show your work"
# -------------------------------------------------------------------
@cortex_router.get("/stream/thinking/{session_id}")
async def stream_thinking(session_id: str):
"""
Server-Sent Events stream for tool calling "show your work" feature.
Streams real-time updates about:
- Thinking/planning steps
- Tool calls being made
- Tool execution results
- Final completion
"""
stream_manager = get_stream_manager()
queue = stream_manager.subscribe(session_id)
async def event_generator():
try:
# Send initial connection message
import json
connected_event = json.dumps({"type": "connected", "session_id": session_id})
yield f"data: {connected_event}\n\n"
while True:
# Wait for events with timeout to send keepalive
try:
event = await asyncio.wait_for(queue.get(), timeout=30.0)
# Format as SSE
event_data = json.dumps(event)
yield f"data: {event_data}\n\n"
# If it's a "done" event, close the stream
if event.get("type") == "done":
break
except asyncio.TimeoutError:
# Send keepalive comment
yield ": keepalive\n\n"
except asyncio.CancelledError:
logger.info(f"Stream cancelled for session {session_id}")
finally:
stream_manager.unsubscribe(session_id, queue)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
# -------------------------------------------------------------------
# /ingest endpoint (internal)
# -------------------------------------------------------------------
@@ -542,11 +156,6 @@ class IngestPayload(BaseModel):
@cortex_router.post("/ingest")
async def ingest(payload: IngestPayload):
try:
update_last_assistant_message(payload.session_id, payload.assistant_msg)
except Exception as e:
logger.warning(f"[INGEST] Session update failed: {e}")
try:
add_exchange_internal({
"session_id": payload.session_id,