""" Structured logging utilities for Cortex pipeline debugging. Provides hierarchical, scannable logs with clear section markers and raw data visibility. """ import json import logging from typing import Any, Dict, List, Optional from datetime import datetime from enum import Enum class LogLevel(Enum): """Log detail levels""" MINIMAL = 1 # Only errors and final results SUMMARY = 2 # Stage summaries + errors DETAILED = 3 # Include raw LLM outputs, RAG results VERBOSE = 4 # Everything including intermediate states class PipelineLogger: """ Hierarchical logger for cortex pipeline debugging. Features: - Clear visual section markers - Collapsible detail sections - Raw data dumps with truncation options - Stage timing - Error highlighting """ def __init__(self, logger: logging.Logger, level: LogLevel = LogLevel.SUMMARY): self.logger = logger self.level = level self.stage_timings = {} self.current_stage = None self.stage_start_time = None self.pipeline_start_time = None def pipeline_start(self, session_id: str, user_prompt: str): """Mark the start of a pipeline run""" self.pipeline_start_time = datetime.now() self.stage_timings = {} if self.level.value >= LogLevel.SUMMARY.value: self.logger.info(f"\n{'='*100}") self.logger.info(f"🚀 PIPELINE START | Session: {session_id} | {datetime.now().strftime('%H:%M:%S.%f')[:-3]}") self.logger.info(f"{'='*100}") if self.level.value >= LogLevel.DETAILED.value: self.logger.info(f"📝 User prompt: {user_prompt[:200]}{'...' if len(user_prompt) > 200 else ''}") self.logger.info(f"{'-'*100}\n") def stage_start(self, stage_name: str, description: str = ""): """Mark the start of a pipeline stage""" self.current_stage = stage_name self.stage_start_time = datetime.now() if self.level.value >= LogLevel.SUMMARY.value: timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3] desc_suffix = f" - {description}" if description else "" self.logger.info(f"▶️ [{stage_name}]{desc_suffix} | {timestamp}") def stage_end(self, result_summary: str = ""): """Mark the end of a pipeline stage""" if self.current_stage and self.stage_start_time: duration_ms = (datetime.now() - self.stage_start_time).total_seconds() * 1000 self.stage_timings[self.current_stage] = duration_ms if self.level.value >= LogLevel.SUMMARY.value: summary_suffix = f" → {result_summary}" if result_summary else "" self.logger.info(f"✅ [{self.current_stage}] Complete in {duration_ms:.0f}ms{summary_suffix}\n") self.current_stage = None self.stage_start_time = None def log_llm_call(self, backend: str, prompt: str, response: Any, raw_response: str = None): """ Log LLM call details with proper formatting. Args: backend: Backend name (PRIMARY, SECONDARY, etc.) prompt: Input prompt to LLM response: Parsed response object raw_response: Raw JSON response string """ if self.level.value >= LogLevel.DETAILED.value: self.logger.info(f" 🧠 LLM Call | Backend: {backend}") # Show prompt (truncated) if isinstance(prompt, list): prompt_preview = prompt[-1].get('content', '')[:150] if prompt else '' else: prompt_preview = str(prompt)[:150] self.logger.info(f" Prompt: {prompt_preview}...") # Show parsed response if isinstance(response, dict): response_text = ( response.get('reply') or response.get('message', {}).get('content') or str(response) )[:200] else: response_text = str(response)[:200] self.logger.info(f" Response: {response_text}...") # Show raw response in collapsible block if raw_response and self.level.value >= LogLevel.VERBOSE.value: self.logger.debug(f" ╭─ RAW RESPONSE ────────────────────────────────────") for line in raw_response.split('\n')[:50]: # Limit to 50 lines self.logger.debug(f" │ {line}") if raw_response.count('\n') > 50: self.logger.debug(f" │ ... ({raw_response.count(chr(10)) - 50} more lines)") self.logger.debug(f" ╰───────────────────────────────────────────────────\n") def log_rag_results(self, results: List[Dict[str, Any]]): """Log RAG/NeoMem results in scannable format""" if self.level.value >= LogLevel.SUMMARY.value: self.logger.info(f" 📚 RAG Results: {len(results)} memories retrieved") if self.level.value >= LogLevel.DETAILED.value and results: self.logger.info(f" ╭─ MEMORY SCORES ───────────────────────────────────") for idx, result in enumerate(results[:10], 1): # Show top 10 score = result.get("score", 0) data_preview = str(result.get("payload", {}).get("data", ""))[:80] self.logger.info(f" │ [{idx}] {score:.3f} | {data_preview}...") if len(results) > 10: self.logger.info(f" │ ... and {len(results) - 10} more results") self.logger.info(f" ╰───────────────────────────────────────────────────") def log_context_state(self, context_state: Dict[str, Any]): """Log context state summary""" if self.level.value >= LogLevel.SUMMARY.value: msg_count = context_state.get("message_count", 0) minutes_since = context_state.get("minutes_since_last_msg", 0) rag_count = len(context_state.get("rag", [])) self.logger.info(f" 📊 Context | Messages: {msg_count} | Last: {minutes_since:.1f}min ago | RAG: {rag_count} results") if self.level.value >= LogLevel.DETAILED.value: intake = context_state.get("intake", {}) if intake: self.logger.info(f" ╭─ INTAKE SUMMARIES ────────────────────────────────") for level in ["L1", "L5", "L10", "L20", "L30"]: if level in intake: summary = intake[level] if isinstance(summary, dict): summary = summary.get("summary", str(summary)[:100]) else: summary = str(summary)[:100] self.logger.info(f" │ {level}: {summary}...") self.logger.info(f" ╰───────────────────────────────────────────────────") def log_error(self, stage: str, error: Exception, critical: bool = False): """Log an error with context""" level_marker = "🔴 CRITICAL" if critical else "⚠️ WARNING" self.logger.error(f"{level_marker} | Stage: {stage} | Error: {type(error).__name__}: {str(error)}") if self.level.value >= LogLevel.VERBOSE.value: import traceback self.logger.debug(f" Traceback:\n{traceback.format_exc()}") def log_raw_data(self, label: str, data: Any, max_lines: int = 30): """Log raw data in a collapsible format""" if self.level.value >= LogLevel.VERBOSE.value: self.logger.debug(f" ╭─ {label.upper()} ──────────────────────────────────") if isinstance(data, (dict, list)): json_str = json.dumps(data, indent=2, default=str) lines = json_str.split('\n') for line in lines[:max_lines]: self.logger.debug(f" │ {line}") if len(lines) > max_lines: self.logger.debug(f" │ ... ({len(lines) - max_lines} more lines)") else: lines = str(data).split('\n') for line in lines[:max_lines]: self.logger.debug(f" │ {line}") if len(lines) > max_lines: self.logger.debug(f" │ ... ({len(lines) - max_lines} more lines)") self.logger.debug(f" ╰───────────────────────────────────────────────────") def pipeline_end(self, session_id: str, final_output_length: int): """Mark the end of pipeline run with summary""" if self.pipeline_start_time: total_duration_ms = (datetime.now() - self.pipeline_start_time).total_seconds() * 1000 if self.level.value >= LogLevel.SUMMARY.value: self.logger.info(f"\n{'='*100}") self.logger.info(f"✨ PIPELINE COMPLETE | Session: {session_id} | Total: {total_duration_ms:.0f}ms") self.logger.info(f"{'='*100}") # Show timing breakdown if self.stage_timings and self.level.value >= LogLevel.DETAILED.value: self.logger.info("⏱️ Stage Timings:") for stage, duration in self.stage_timings.items(): pct = (duration / total_duration_ms) * 100 if total_duration_ms > 0 else 0 self.logger.info(f" {stage:20s}: {duration:6.0f}ms ({pct:5.1f}%)") self.logger.info(f"📤 Final output: {final_output_length} characters") self.logger.info(f"{'='*100}\n") def get_log_level_from_env() -> LogLevel: """Parse log level from environment variable""" import os verbose_debug = os.getenv("VERBOSE_DEBUG", "false").lower() == "true" detail_level = os.getenv("LOG_DETAIL_LEVEL", "").lower() if detail_level == "minimal": return LogLevel.MINIMAL elif detail_level == "summary": return LogLevel.SUMMARY elif detail_level == "detailed": return LogLevel.DETAILED elif detail_level == "verbose" or verbose_debug: return LogLevel.VERBOSE else: return LogLevel.SUMMARY # Default