Cortex debugging logs cleaned up

This commit is contained in:
serversdwn
2025-12-20 02:49:20 -05:00
parent 970907cf1b
commit 6bb800f5f8
10 changed files with 1452 additions and 242 deletions

View File

@@ -26,7 +26,12 @@ from neomem_client import NeoMemClient
NEOMEM_API = os.getenv("NEOMEM_API", "http://neomem-api:8000")
NEOMEM_ENABLED = os.getenv("NEOMEM_ENABLED", "false").lower() == "true"
RELEVANCE_THRESHOLD = float(os.getenv("RELEVANCE_THRESHOLD", "0.4"))
VERBOSE_DEBUG = os.getenv("VERBOSE_DEBUG", "false").lower() == "true"
LOG_DETAIL_LEVEL = os.getenv("LOG_DETAIL_LEVEL", "summary").lower()
# Loop detection settings
MAX_MESSAGE_HISTORY = int(os.getenv("MAX_MESSAGE_HISTORY", "100")) # Prevent unbounded growth
SESSION_TTL_HOURS = int(os.getenv("SESSION_TTL_HOURS", "24")) # Auto-expire old sessions
ENABLE_DUPLICATE_DETECTION = os.getenv("ENABLE_DUPLICATE_DETECTION", "true").lower() == "true"
# Tools available for future autonomy features
TOOLS_AVAILABLE = ["RAG", "WEB", "WEATHER", "CODEBRAIN", "POKERBRAIN"]
@@ -39,34 +44,18 @@ SESSION_STATE: Dict[str, Dict[str, Any]] = {}
# Logger
logger = logging.getLogger(__name__)
# Set logging level based on VERBOSE_DEBUG
if VERBOSE_DEBUG:
logger.setLevel(logging.DEBUG)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s [CONTEXT] %(levelname)s: %(message)s',
datefmt='%H:%M:%S'
))
logger.addHandler(console_handler)
# File handler - append to log file
try:
os.makedirs('/app/logs', exist_ok=True)
file_handler = logging.FileHandler('/app/logs/cortex_verbose_debug.log', mode='a')
file_handler.setFormatter(logging.Formatter(
'%(asctime)s [CONTEXT] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
logger.addHandler(file_handler)
logger.debug("VERBOSE_DEBUG mode enabled for context.py - logging to file")
except Exception as e:
logger.debug(f"VERBOSE_DEBUG mode enabled for context.py - file logging failed: {e}")
# Always set up basic logging
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s [CONTEXT] %(levelname)s: %(message)s',
datefmt='%H:%M:%S'
))
logger.addHandler(console_handler)
# -----------------------------
# Session initialization
# Session initialization & cleanup
# -----------------------------
def _init_session(session_id: str) -> Dict[str, Any]:
"""
@@ -86,9 +75,76 @@ def _init_session(session_id: str) -> Dict[str, Any]:
"active_project": None, # Future: project context
"message_count": 0,
"message_history": [],
"last_message_hash": None, # For duplicate detection
}
def _cleanup_expired_sessions():
"""Remove sessions that haven't been active for SESSION_TTL_HOURS"""
from datetime import timedelta
now = datetime.now()
expired_sessions = []
for session_id, state in SESSION_STATE.items():
last_active = state.get("last_timestamp", state.get("created_at"))
time_since_active = (now - last_active).total_seconds() / 3600 # hours
if time_since_active > SESSION_TTL_HOURS:
expired_sessions.append(session_id)
for session_id in expired_sessions:
del SESSION_STATE[session_id]
logger.info(f"🗑️ Expired session: {session_id} (inactive for {SESSION_TTL_HOURS}+ hours)")
return len(expired_sessions)
def _is_duplicate_message(session_id: str, user_prompt: str) -> bool:
"""
Check if this message is a duplicate of the last processed message.
Uses simple hash comparison to detect exact duplicates or processing loops.
"""
if not ENABLE_DUPLICATE_DETECTION:
return False
import hashlib
state = SESSION_STATE.get(session_id)
if not state:
return False
# Create hash of normalized message
message_hash = hashlib.md5(user_prompt.strip().lower().encode()).hexdigest()
# Check if it matches the last message
if state.get("last_message_hash") == message_hash:
logger.warning(
f"⚠️ DUPLICATE MESSAGE DETECTED | Session: {session_id} | "
f"Message: {user_prompt[:80]}..."
)
return True
# Update hash for next check
state["last_message_hash"] = message_hash
return False
def _trim_message_history(state: Dict[str, Any]):
"""
Trim message history to prevent unbounded growth.
Keeps only the most recent MAX_MESSAGE_HISTORY messages.
"""
history = state.get("message_history", [])
if len(history) > MAX_MESSAGE_HISTORY:
trimmed_count = len(history) - MAX_MESSAGE_HISTORY
state["message_history"] = history[-MAX_MESSAGE_HISTORY:]
logger.info(f"✂️ Trimmed {trimmed_count} old messages from session {state['session_id']}")
# -----------------------------
# Intake context retrieval
# -----------------------------
@@ -223,26 +279,42 @@ async def collect_context(session_id: str, user_prompt: str) -> Dict[str, Any]:
}
"""
# A. Initialize session state if needed
# A. Cleanup expired sessions periodically (every 100th call)
import random
if random.randint(1, 100) == 1:
_cleanup_expired_sessions()
# B. Initialize session state if needed
if session_id not in SESSION_STATE:
SESSION_STATE[session_id] = _init_session(session_id)
logger.info(f"Initialized new session: {session_id}")
if VERBOSE_DEBUG:
logger.debug(f"[COLLECT_CONTEXT] New session state: {SESSION_STATE[session_id]}")
state = SESSION_STATE[session_id]
if VERBOSE_DEBUG:
logger.debug(f"[COLLECT_CONTEXT] Session {session_id} - User prompt: {user_prompt[:100]}...")
# C. Check for duplicate messages (loop detection)
if _is_duplicate_message(session_id, user_prompt):
# Return cached context with warning flag
logger.warning(f"🔁 LOOP DETECTED - Returning cached context to prevent processing duplicate")
context_state = {
"session_id": session_id,
"timestamp": datetime.now().isoformat(),
"minutes_since_last_msg": 0,
"message_count": state["message_count"],
"intake": {},
"rag": [],
"mode": state["mode"],
"mood": state["mood"],
"active_project": state["active_project"],
"tools_available": TOOLS_AVAILABLE,
"duplicate_detected": True,
}
return context_state
# B. Calculate time delta
now = datetime.now()
time_delta_seconds = (now - state["last_timestamp"]).total_seconds()
minutes_since_last_msg = round(time_delta_seconds / 60.0, 2)
if VERBOSE_DEBUG:
logger.debug(f"[COLLECT_CONTEXT] Time since last message: {minutes_since_last_msg:.2f} minutes")
# C. Gather Intake context (multilevel summaries)
# Build compact message buffer for Intake:
messages_for_intake = []
@@ -257,12 +329,6 @@ async def collect_context(session_id: str, user_prompt: str) -> Dict[str, Any]:
intake_data = await _get_intake_context(session_id, messages_for_intake)
if VERBOSE_DEBUG:
import json
logger.debug(f"[COLLECT_CONTEXT] Intake data retrieved:")
logger.debug(json.dumps(intake_data, indent=2, default=str))
# D. Search NeoMem for relevant memories
if NEOMEM_ENABLED:
rag_results = await _search_neomem(
@@ -274,23 +340,20 @@ async def collect_context(session_id: str, user_prompt: str) -> Dict[str, Any]:
rag_results = []
logger.info("Skipping NeoMem RAG retrieval; NEOMEM_ENABLED is false")
if VERBOSE_DEBUG:
logger.debug(f"[COLLECT_CONTEXT] NeoMem search returned {len(rag_results)} results")
for idx, result in enumerate(rag_results, 1):
score = result.get("score", 0)
data_preview = str(result.get("payload", {}).get("data", ""))[:100]
logger.debug(f" [{idx}] Score: {score:.3f} - {data_preview}...")
# E. Update session state
state["last_user_message"] = user_prompt
state["last_timestamp"] = now
state["message_count"] += 1
# Save user turn to history
state["message_history"].append({
"user": user_prompt,
"assistant": "" # assistant reply filled later by update_last_assistant_message()
"user": user_prompt,
"assistant": "" # assistant reply filled later by update_last_assistant_message()
})
# Trim history to prevent unbounded growth
_trim_message_history(state)
# F. Assemble unified context
@@ -307,18 +370,54 @@ async def collect_context(session_id: str, user_prompt: str) -> Dict[str, Any]:
"tools_available": TOOLS_AVAILABLE,
}
# Log context summary in structured format
logger.info(
f"Context collected for session {session_id}: "
f"{len(rag_results)} RAG results, "
f"{minutes_since_last_msg:.1f} minutes since last message"
f"📊 Context | Session: {session_id} | "
f"Messages: {state['message_count']} | "
f"Last: {minutes_since_last_msg:.1f}min | "
f"RAG: {len(rag_results)} results"
)
if VERBOSE_DEBUG:
logger.debug(f"[COLLECT_CONTEXT] Final context state assembled:")
logger.debug(f" - Message count: {state['message_count']}")
logger.debug(f" - Mode: {state['mode']}, Mood: {state['mood']}")
logger.debug(f" - Active project: {state['active_project']}")
logger.debug(f" - Tools available: {TOOLS_AVAILABLE}")
# Show detailed context in detailed/verbose mode
if LOG_DETAIL_LEVEL in ["detailed", "verbose"]:
import json
logger.info(f"\n{''*100}")
logger.info(f"[CONTEXT] Session {session_id} | User: {user_prompt[:80]}...")
logger.info(f"{''*100}")
logger.info(f" Mode: {state['mode']} | Mood: {state['mood']} | Project: {state['active_project']}")
logger.info(f" Tools: {', '.join(TOOLS_AVAILABLE)}")
# Show intake summaries (condensed)
if intake_data:
logger.info(f"\n ╭─ INTAKE SUMMARIES ────────────────────────────────────────────────")
for level in ["L1", "L5", "L10", "L20", "L30"]:
if level in intake_data:
summary = intake_data[level]
if isinstance(summary, dict):
summary_text = summary.get("summary", str(summary)[:100])
else:
summary_text = str(summary)[:100]
logger.info(f"{level:4s}: {summary_text}...")
logger.info(f" ╰───────────────────────────────────────────────────────────────────")
# Show RAG results (condensed)
if rag_results:
logger.info(f"\n ╭─ RAG RESULTS ({len(rag_results)}) ──────────────────────────────────────────────")
for idx, result in enumerate(rag_results[:5], 1): # Show top 5
score = result.get("score", 0)
data_preview = str(result.get("payload", {}).get("data", ""))[:60]
logger.info(f" │ [{idx}] {score:.3f} | {data_preview}...")
if len(rag_results) > 5:
logger.info(f" │ ... and {len(rag_results) - 5} more results")
logger.info(f" ╰───────────────────────────────────────────────────────────────────")
# Show full raw data only in verbose mode
if LOG_DETAIL_LEVEL == "verbose":
logger.info(f"\n ╭─ RAW INTAKE DATA ─────────────────────────────────────────────────")
logger.info(f"{json.dumps(intake_data, indent=4, default=str)}")
logger.info(f" ╰───────────────────────────────────────────────────────────────────")
logger.info(f"{''*100}\n")
return context_state
@@ -346,9 +445,6 @@ def update_last_assistant_message(session_id: str, message: str) -> None:
# history entry already contains {"user": "...", "assistant": "...?"}
history[-1]["assistant"] = message
if VERBOSE_DEBUG:
logger.debug(f"Updated assistant message for session {session_id}")
def get_session_state(session_id: str) -> Optional[Dict[str, Any]]:

View File

@@ -4,8 +4,8 @@
"focus": "user_request",
"confidence": 0.7,
"curiosity": 1.0,
"last_updated": "2025-12-19T20:25:25.437557",
"interaction_count": 16,
"last_updated": "2025-12-20T07:47:53.826587",
"interaction_count": 20,
"learning_queue": [],
"active_goals": [],
"preferences": {

View File

@@ -20,30 +20,17 @@ from autonomy.self.state import load_self_state
# -------------------------------------------------------------------
# Setup
# -------------------------------------------------------------------
VERBOSE_DEBUG = os.getenv("VERBOSE_DEBUG", "false").lower() == "true"
LOG_DETAIL_LEVEL = os.getenv("LOG_DETAIL_LEVEL", "summary").lower()
logger = logging.getLogger(__name__)
if VERBOSE_DEBUG:
logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s [ROUTER] %(levelname)s: %(message)s',
datefmt='%H:%M:%S'
))
logger.addHandler(console_handler)
try:
os.makedirs('/app/logs', exist_ok=True)
file_handler = logging.FileHandler('/app/logs/cortex_verbose_debug.log', mode='a')
file_handler.setFormatter(logging.Formatter(
'%(asctime)s [ROUTER] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
logger.addHandler(file_handler)
logger.debug("VERBOSE_DEBUG enabled for router.py")
except Exception as e:
logger.debug(f"File logging failed: {e}")
# Always set up basic logging
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s [ROUTER] %(levelname)s: %(message)s',
datefmt='%H:%M:%S'
))
logger.addHandler(console_handler)
cortex_router = APIRouter()
@@ -64,40 +51,36 @@ class ReasonRequest(BaseModel):
# -------------------------------------------------------------------
@cortex_router.post("/reason")
async def run_reason(req: ReasonRequest):
from datetime import datetime
pipeline_start = datetime.now()
stage_timings = {}
if VERBOSE_DEBUG:
logger.debug(f"\n{'='*80}")
logger.debug(f"[PIPELINE START] Session: {req.session_id}")
logger.debug(f"[PIPELINE START] User prompt: {req.user_prompt[:200]}...")
logger.debug(f"{'='*80}\n")
# Show pipeline start in detailed/verbose mode
if LOG_DETAIL_LEVEL in ["detailed", "verbose"]:
logger.info(f"\n{'='*100}")
logger.info(f"🚀 PIPELINE START | Session: {req.session_id} | {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
logger.info(f"{'='*100}")
logger.info(f"📝 User: {req.user_prompt[:150]}...")
logger.info(f"{'-'*100}\n")
# ----------------------------------------------------------------
# STAGE 0 — Context
# ----------------------------------------------------------------
if VERBOSE_DEBUG:
logger.debug("[STAGE 0] Collecting unified context...")
stage_start = datetime.now()
context_state = await collect_context(req.session_id, req.user_prompt)
if VERBOSE_DEBUG:
logger.debug(f"[STAGE 0] Context collected - {len(context_state.get('rag', []))} RAG results")
stage_timings["context"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 0.5 — Identity
# ----------------------------------------------------------------
if VERBOSE_DEBUG:
logger.debug("[STAGE 0.5] Loading identity block...")
stage_start = datetime.now()
identity_block = load_identity(req.session_id)
if VERBOSE_DEBUG:
logger.debug(f"[STAGE 0.5] Identity loaded: {identity_block.get('name', 'Unknown')}")
stage_timings["identity"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 0.6 — Inner Monologue (observer-only)
# ----------------------------------------------------------------
if VERBOSE_DEBUG:
logger.debug("[STAGE 0.6] Running inner monologue...")
stage_start = datetime.now()
inner_result = None
try:
@@ -111,21 +94,22 @@ async def run_reason(req: ReasonRequest):
}
inner_result = await inner_monologue.process(mono_context)
logger.info(f"[INNER_MONOLOGUE] {inner_result}")
logger.info(f"🧠 Monologue | {inner_result.get('intent', 'unknown')} | Tone: {inner_result.get('tone', 'neutral')}")
# Store in context for downstream use
context_state["monologue"] = inner_result
except Exception as e:
logger.warning(f"[INNER_MONOLOGUE] failed: {e}")
logger.warning(f"⚠️ Monologue failed: {e}")
stage_timings["monologue"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 0.7 — Executive Planning (conditional)
# ----------------------------------------------------------------
stage_start = datetime.now()
executive_plan = None
if inner_result and inner_result.get("consult_executive"):
if VERBOSE_DEBUG:
logger.debug("[STAGE 0.7] Executive consultation requested...")
try:
from autonomy.executive.planner import plan_execution
@@ -135,21 +119,22 @@ async def run_reason(req: ReasonRequest):
context_state=context_state,
identity_block=identity_block
)
logger.info(f"[EXECUTIVE] Generated plan: {executive_plan.get('summary', 'N/A')}")
logger.info(f"🎯 Executive plan: {executive_plan.get('summary', 'N/A')[:80]}...")
except Exception as e:
logger.warning(f"[EXECUTIVE] Planning failed: {e}")
logger.warning(f"⚠️ Executive planning failed: {e}")
executive_plan = None
stage_timings["executive"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 0.8 — Autonomous Tool Invocation
# ----------------------------------------------------------------
stage_start = datetime.now()
tool_results = None
autonomous_enabled = os.getenv("ENABLE_AUTONOMOUS_TOOLS", "true").lower() == "true"
tool_confidence_threshold = float(os.getenv("AUTONOMOUS_TOOL_CONFIDENCE_THRESHOLD", "0.6"))
if autonomous_enabled and inner_result:
if VERBOSE_DEBUG:
logger.debug("[STAGE 0.8] Analyzing autonomous tool needs...")
try:
from autonomy.tools.decision_engine import ToolDecisionEngine
@@ -176,22 +161,25 @@ async def run_reason(req: ReasonRequest):
tool_context = orchestrator.format_results_for_context(tool_results)
context_state["autonomous_tool_results"] = tool_context
if VERBOSE_DEBUG:
summary = tool_results.get("execution_summary", {})
logger.debug(f"[STAGE 0.8] Tools executed: {summary.get('successful', [])} succeeded")
summary = tool_results.get("execution_summary", {})
logger.info(f"🛠️ Tools executed: {summary.get('successful', [])} succeeded")
else:
if VERBOSE_DEBUG:
logger.debug(f"[STAGE 0.8] No tools invoked (confidence: {tool_decision.get('confidence', 0):.2f})")
logger.info(f"🛠️ No tools invoked (confidence: {tool_decision.get('confidence', 0):.2f})")
except Exception as e:
logger.warning(f"[STAGE 0.8] Autonomous tool invocation failed: {e}")
if VERBOSE_DEBUG:
logger.warning(f"⚠️ Autonomous tool invocation failed: {e}")
if LOG_DETAIL_LEVEL == "verbose":
import traceback
traceback.print_exc()
stage_timings["tools"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 1 — Intake summary
# STAGE 1-5Core Reasoning Pipeline
# ----------------------------------------------------------------
stage_start = datetime.now()
# Extract intake summary
intake_summary = "(no context available)"
if context_state.get("intake"):
l20 = context_state["intake"].get("L20")
@@ -200,65 +188,46 @@ async def run_reason(req: ReasonRequest):
elif isinstance(l20, str):
intake_summary = l20
if VERBOSE_DEBUG:
logger.debug(f"[STAGE 1] Intake summary extracted (L20): {intake_summary[:150]}...")
# ----------------------------------------------------------------
# STAGE 2 — Reflection
# ----------------------------------------------------------------
if VERBOSE_DEBUG:
logger.debug("[STAGE 2] Running reflection...")
# Reflection
try:
reflection = await reflect_notes(intake_summary, identity_block=identity_block)
reflection_notes = reflection.get("notes", [])
except Exception as e:
reflection_notes = []
if VERBOSE_DEBUG:
logger.debug(f"[STAGE 2] Reflection failed: {e}")
logger.warning(f"⚠️ Reflection failed: {e}")
# ----------------------------------------------------------------
# STAGE 3 — Reasoning (draft)
# ----------------------------------------------------------------
if VERBOSE_DEBUG:
logger.debug("[STAGE 3] Running reasoning (draft)...")
stage_timings["reflection"] = (datetime.now() - stage_start).total_seconds() * 1000
# Reasoning (draft)
stage_start = datetime.now()
draft = await reason_check(
req.user_prompt,
identity_block=identity_block,
rag_block=context_state.get("rag", []),
reflection_notes=reflection_notes,
context=context_state,
monologue=inner_result, # NEW: Pass monologue guidance
executive_plan=executive_plan # NEW: Pass executive plan
monologue=inner_result,
executive_plan=executive_plan
)
stage_timings["reasoning"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 4 — Refinement
# ----------------------------------------------------------------
if VERBOSE_DEBUG:
logger.debug("[STAGE 4] Running refinement...")
# Refinement
stage_start = datetime.now()
result = await refine_answer(
draft_output=draft,
reflection_notes=reflection_notes,
identity_block=identity_block,
rag_block=context_state.get("rag", []),
)
final_neutral = result["final_output"]
stage_timings["refinement"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 5 — Persona
# ----------------------------------------------------------------
if VERBOSE_DEBUG:
logger.debug("[STAGE 5] Applying persona layer...")
# Extract tone and depth from monologue for persona guidance
# Persona
stage_start = datetime.now()
tone = inner_result.get("tone", "neutral") if inner_result else "neutral"
depth = inner_result.get("depth", "medium") if inner_result else "medium"
persona_answer = await speak(final_neutral, tone=tone, depth=depth)
stage_timings["persona"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 6 — Session update
@@ -268,6 +237,7 @@ async def run_reason(req: ReasonRequest):
# ----------------------------------------------------------------
# STAGE 6.5 — Self-state update & Pattern Learning
# ----------------------------------------------------------------
stage_start = datetime.now()
try:
from autonomy.self.analyzer import analyze_and_update_state
await analyze_and_update_state(
@@ -277,9 +247,8 @@ async def run_reason(req: ReasonRequest):
context=context_state
)
except Exception as e:
logger.warning(f"[SELF_STATE] Update failed: {e}")
logger.warning(f"⚠️ Self-state update failed: {e}")
# Pattern learning
try:
from autonomy.learning.pattern_learner import get_pattern_learner
learner = get_pattern_learner()
@@ -290,11 +259,14 @@ async def run_reason(req: ReasonRequest):
context=context_state
)
except Exception as e:
logger.warning(f"[PATTERN_LEARNER] Learning failed: {e}")
logger.warning(f"⚠️ Pattern learning failed: {e}")
stage_timings["learning"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# STAGE 7 — Proactive Monitoring & Suggestions
# ----------------------------------------------------------------
stage_start = datetime.now()
proactive_enabled = os.getenv("ENABLE_PROACTIVE_MONITORING", "true").lower() == "true"
proactive_min_priority = float(os.getenv("PROACTIVE_SUGGESTION_MIN_PRIORITY", "0.6"))
@@ -303,7 +275,7 @@ async def run_reason(req: ReasonRequest):
from autonomy.proactive.monitor import get_proactive_monitor
monitor = get_proactive_monitor(min_priority=proactive_min_priority)
self_state = load_self_state() # Already imported at top of file
self_state = load_self_state()
suggestion = await monitor.analyze_session(
session_id=req.session_id,
@@ -311,22 +283,35 @@ async def run_reason(req: ReasonRequest):
self_state=self_state
)
# Append suggestion to response if exists
if suggestion:
suggestion_text = monitor.format_suggestion(suggestion)
persona_answer += suggestion_text
if VERBOSE_DEBUG:
logger.debug(f"[STAGE 7] Proactive suggestion added: {suggestion['type']} (priority: {suggestion['priority']:.2f})")
logger.info(f"💡 Proactive suggestion: {suggestion['type']} (priority: {suggestion['priority']:.2f})")
except Exception as e:
logger.warning(f"[STAGE 7] Proactive monitoring failed: {e}")
logger.warning(f"⚠️ Proactive monitoring failed: {e}")
if VERBOSE_DEBUG:
logger.debug(f"\n{'='*80}")
logger.debug(f"[PIPELINE COMPLETE] Session: {req.session_id}")
logger.debug(f"[PIPELINE COMPLETE] Final answer length: {len(persona_answer)} chars")
logger.debug(f"{'='*80}\n")
stage_timings["proactive"] = (datetime.now() - stage_start).total_seconds() * 1000
# ----------------------------------------------------------------
# PIPELINE COMPLETE — Summary
# ----------------------------------------------------------------
total_duration = (datetime.now() - pipeline_start).total_seconds() * 1000
# Always show pipeline completion
logger.info(f"\n{'='*100}")
logger.info(f"✨ PIPELINE COMPLETE | Session: {req.session_id} | Total: {total_duration:.0f}ms")
logger.info(f"{'='*100}")
# Show timing breakdown in detailed/verbose mode
if LOG_DETAIL_LEVEL in ["detailed", "verbose"]:
logger.info("⏱️ Stage Timings:")
for stage, duration in stage_timings.items():
pct = (duration / total_duration) * 100 if total_duration > 0 else 0
logger.info(f" {stage:15s}: {duration:6.0f}ms ({pct:5.1f}%)")
logger.info(f"📤 Output: {len(persona_answer)} chars")
logger.info(f"{'='*100}\n")
# ----------------------------------------------------------------
# RETURN

View File

@@ -0,0 +1,223 @@
"""
Structured logging utilities for Cortex pipeline debugging.
Provides hierarchical, scannable logs with clear section markers and raw data visibility.
"""
import json
import logging
from typing import Any, Dict, List, Optional
from datetime import datetime
from enum import Enum
class LogLevel(Enum):
"""Log detail levels"""
MINIMAL = 1 # Only errors and final results
SUMMARY = 2 # Stage summaries + errors
DETAILED = 3 # Include raw LLM outputs, RAG results
VERBOSE = 4 # Everything including intermediate states
class PipelineLogger:
"""
Hierarchical logger for cortex pipeline debugging.
Features:
- Clear visual section markers
- Collapsible detail sections
- Raw data dumps with truncation options
- Stage timing
- Error highlighting
"""
def __init__(self, logger: logging.Logger, level: LogLevel = LogLevel.SUMMARY):
self.logger = logger
self.level = level
self.stage_timings = {}
self.current_stage = None
self.stage_start_time = None
self.pipeline_start_time = None
def pipeline_start(self, session_id: str, user_prompt: str):
"""Mark the start of a pipeline run"""
self.pipeline_start_time = datetime.now()
self.stage_timings = {}
if self.level.value >= LogLevel.SUMMARY.value:
self.logger.info(f"\n{'='*100}")
self.logger.info(f"🚀 PIPELINE START | Session: {session_id} | {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
self.logger.info(f"{'='*100}")
if self.level.value >= LogLevel.DETAILED.value:
self.logger.info(f"📝 User prompt: {user_prompt[:200]}{'...' if len(user_prompt) > 200 else ''}")
self.logger.info(f"{'-'*100}\n")
def stage_start(self, stage_name: str, description: str = ""):
"""Mark the start of a pipeline stage"""
self.current_stage = stage_name
self.stage_start_time = datetime.now()
if self.level.value >= LogLevel.SUMMARY.value:
timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
desc_suffix = f" - {description}" if description else ""
self.logger.info(f"▶️ [{stage_name}]{desc_suffix} | {timestamp}")
def stage_end(self, result_summary: str = ""):
"""Mark the end of a pipeline stage"""
if self.current_stage and self.stage_start_time:
duration_ms = (datetime.now() - self.stage_start_time).total_seconds() * 1000
self.stage_timings[self.current_stage] = duration_ms
if self.level.value >= LogLevel.SUMMARY.value:
summary_suffix = f"{result_summary}" if result_summary else ""
self.logger.info(f"✅ [{self.current_stage}] Complete in {duration_ms:.0f}ms{summary_suffix}\n")
self.current_stage = None
self.stage_start_time = None
def log_llm_call(self, backend: str, prompt: str, response: Any, raw_response: str = None):
"""
Log LLM call details with proper formatting.
Args:
backend: Backend name (PRIMARY, SECONDARY, etc.)
prompt: Input prompt to LLM
response: Parsed response object
raw_response: Raw JSON response string
"""
if self.level.value >= LogLevel.DETAILED.value:
self.logger.info(f" 🧠 LLM Call | Backend: {backend}")
# Show prompt (truncated)
if isinstance(prompt, list):
prompt_preview = prompt[-1].get('content', '')[:150] if prompt else ''
else:
prompt_preview = str(prompt)[:150]
self.logger.info(f" Prompt: {prompt_preview}...")
# Show parsed response
if isinstance(response, dict):
response_text = (
response.get('reply') or
response.get('message', {}).get('content') or
str(response)
)[:200]
else:
response_text = str(response)[:200]
self.logger.info(f" Response: {response_text}...")
# Show raw response in collapsible block
if raw_response and self.level.value >= LogLevel.VERBOSE.value:
self.logger.debug(f" ╭─ RAW RESPONSE ────────────────────────────────────")
for line in raw_response.split('\n')[:50]: # Limit to 50 lines
self.logger.debug(f"{line}")
if raw_response.count('\n') > 50:
self.logger.debug(f" │ ... ({raw_response.count(chr(10)) - 50} more lines)")
self.logger.debug(f" ╰───────────────────────────────────────────────────\n")
def log_rag_results(self, results: List[Dict[str, Any]]):
"""Log RAG/NeoMem results in scannable format"""
if self.level.value >= LogLevel.SUMMARY.value:
self.logger.info(f" 📚 RAG Results: {len(results)} memories retrieved")
if self.level.value >= LogLevel.DETAILED.value and results:
self.logger.info(f" ╭─ MEMORY SCORES ───────────────────────────────────")
for idx, result in enumerate(results[:10], 1): # Show top 10
score = result.get("score", 0)
data_preview = str(result.get("payload", {}).get("data", ""))[:80]
self.logger.info(f" │ [{idx}] {score:.3f} | {data_preview}...")
if len(results) > 10:
self.logger.info(f" │ ... and {len(results) - 10} more results")
self.logger.info(f" ╰───────────────────────────────────────────────────")
def log_context_state(self, context_state: Dict[str, Any]):
"""Log context state summary"""
if self.level.value >= LogLevel.SUMMARY.value:
msg_count = context_state.get("message_count", 0)
minutes_since = context_state.get("minutes_since_last_msg", 0)
rag_count = len(context_state.get("rag", []))
self.logger.info(f" 📊 Context | Messages: {msg_count} | Last: {minutes_since:.1f}min ago | RAG: {rag_count} results")
if self.level.value >= LogLevel.DETAILED.value:
intake = context_state.get("intake", {})
if intake:
self.logger.info(f" ╭─ INTAKE SUMMARIES ────────────────────────────────")
for level in ["L1", "L5", "L10", "L20", "L30"]:
if level in intake:
summary = intake[level]
if isinstance(summary, dict):
summary = summary.get("summary", str(summary)[:100])
else:
summary = str(summary)[:100]
self.logger.info(f"{level}: {summary}...")
self.logger.info(f" ╰───────────────────────────────────────────────────")
def log_error(self, stage: str, error: Exception, critical: bool = False):
"""Log an error with context"""
level_marker = "🔴 CRITICAL" if critical else "⚠️ WARNING"
self.logger.error(f"{level_marker} | Stage: {stage} | Error: {type(error).__name__}: {str(error)}")
if self.level.value >= LogLevel.VERBOSE.value:
import traceback
self.logger.debug(f" Traceback:\n{traceback.format_exc()}")
def log_raw_data(self, label: str, data: Any, max_lines: int = 30):
"""Log raw data in a collapsible format"""
if self.level.value >= LogLevel.VERBOSE.value:
self.logger.debug(f" ╭─ {label.upper()} ──────────────────────────────────")
if isinstance(data, (dict, list)):
json_str = json.dumps(data, indent=2, default=str)
lines = json_str.split('\n')
for line in lines[:max_lines]:
self.logger.debug(f"{line}")
if len(lines) > max_lines:
self.logger.debug(f" │ ... ({len(lines) - max_lines} more lines)")
else:
lines = str(data).split('\n')
for line in lines[:max_lines]:
self.logger.debug(f"{line}")
if len(lines) > max_lines:
self.logger.debug(f" │ ... ({len(lines) - max_lines} more lines)")
self.logger.debug(f" ╰───────────────────────────────────────────────────")
def pipeline_end(self, session_id: str, final_output_length: int):
"""Mark the end of pipeline run with summary"""
if self.pipeline_start_time:
total_duration_ms = (datetime.now() - self.pipeline_start_time).total_seconds() * 1000
if self.level.value >= LogLevel.SUMMARY.value:
self.logger.info(f"\n{'='*100}")
self.logger.info(f"✨ PIPELINE COMPLETE | Session: {session_id} | Total: {total_duration_ms:.0f}ms")
self.logger.info(f"{'='*100}")
# Show timing breakdown
if self.stage_timings and self.level.value >= LogLevel.DETAILED.value:
self.logger.info("⏱️ Stage Timings:")
for stage, duration in self.stage_timings.items():
pct = (duration / total_duration_ms) * 100 if total_duration_ms > 0 else 0
self.logger.info(f" {stage:20s}: {duration:6.0f}ms ({pct:5.1f}%)")
self.logger.info(f"📤 Final output: {final_output_length} characters")
self.logger.info(f"{'='*100}\n")
def get_log_level_from_env() -> LogLevel:
"""Parse log level from environment variable"""
import os
verbose_debug = os.getenv("VERBOSE_DEBUG", "false").lower() == "true"
detail_level = os.getenv("LOG_DETAIL_LEVEL", "").lower()
if detail_level == "minimal":
return LogLevel.MINIMAL
elif detail_level == "summary":
return LogLevel.SUMMARY
elif detail_level == "detailed":
return LogLevel.DETAILED
elif detail_level == "verbose" or verbose_debug:
return LogLevel.VERBOSE
else:
return LogLevel.SUMMARY # Default