# context.py """ Context layer for Cortex reasoning pipeline. Provides unified context collection from: - Intake (short-term memory, multilevel summaries L1-L30) - NeoMem (long-term memory, semantic search) - Session state (timestamps, messages, mode, mood, active_project) Maintains per-session state for continuity across conversations. """ import os import logging from datetime import datetime from typing import Dict, Any, Optional, List import httpx from intake.intake import summarize_context from neomem_client import NeoMemClient # ----------------------------- # Configuration # ----------------------------- NEOMEM_API = os.getenv("NEOMEM_API", "http://neomem-api:8000") NEOMEM_ENABLED = os.getenv("NEOMEM_ENABLED", "false").lower() == "true" RELEVANCE_THRESHOLD = float(os.getenv("RELEVANCE_THRESHOLD", "0.4")) VERBOSE_DEBUG = os.getenv("VERBOSE_DEBUG", "false").lower() == "true" # Tools available for future autonomy features TOOLS_AVAILABLE = ["RAG", "WEB", "WEATHER", "CODEBRAIN", "POKERBRAIN"] # ----------------------------- # Module-level session state # ----------------------------- SESSION_STATE: Dict[str, Dict[str, Any]] = {} # Logger logger = logging.getLogger(__name__) # Set logging level based on VERBOSE_DEBUG if VERBOSE_DEBUG: logger.setLevel(logging.DEBUG) # Console handler console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( '%(asctime)s [CONTEXT] %(levelname)s: %(message)s', datefmt='%H:%M:%S' )) logger.addHandler(console_handler) # File handler - append to log file try: os.makedirs('/app/logs', exist_ok=True) file_handler = logging.FileHandler('/app/logs/cortex_verbose_debug.log', mode='a') file_handler.setFormatter(logging.Formatter( '%(asctime)s [CONTEXT] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' )) logger.addHandler(file_handler) logger.debug("VERBOSE_DEBUG mode enabled for context.py - logging to file") except Exception as e: logger.debug(f"VERBOSE_DEBUG mode enabled for context.py - file logging failed: {e}") # ----------------------------- # Session initialization # ----------------------------- def _init_session(session_id: str) -> Dict[str, Any]: """ Initialize a new session state entry. Returns: Dictionary with default session state fields """ return { "session_id": session_id, "created_at": datetime.now(), "last_timestamp": datetime.now(), "last_user_message": None, "last_assistant_message": None, "mode": "default", # Future: "autonomous", "focused", "creative", etc. "mood": "neutral", # Future: mood tracking "active_project": None, # Future: project context "message_count": 0, "message_history": [], } # ----------------------------- # Intake context retrieval # ----------------------------- async def _get_intake_context(session_id: str, messages: List[Dict[str, str]]): """ Internal Intake — Direct call to summarize_context() No HTTP, no containers, no failures. """ try: return await summarize_context(session_id, messages) except Exception as e: logger.error(f"Internal Intake summarization failed: {e}") return { "session_id": session_id, "L1": "", "L5": "", "L10": "", "L20": "", "L30": "", "error": str(e) } # ----------------------------- # NeoMem semantic search # ----------------------------- async def _search_neomem( query: str, user_id: str = "brian", limit: int = 5 ) -> List[Dict[str, Any]]: """ Search NeoMem for relevant long-term memories. Returns full response structure from NeoMem: [ { "id": "mem_abc123", "score": 0.92, "payload": { "data": "Memory text content...", "metadata": { "category": "...", "created_at": "...", ... } } }, ... ] Args: query: Search query text user_id: User identifier for memory filtering limit: Maximum number of results Returns: List of memory objects with full structure, or empty list on failure """ if not NEOMEM_ENABLED: logger.info("NeoMem search skipped (NEOMEM_ENABLED is false)") return [] try: # NeoMemClient reads NEOMEM_API from environment, no base_url parameter client = NeoMemClient() results = await client.search( query=query, user_id=user_id, limit=limit, threshold=RELEVANCE_THRESHOLD ) # Results are already filtered by threshold in NeoMemClient.search() logger.info(f"NeoMem search returned {len(results)} relevant results") return results except Exception as e: logger.warning(f"NeoMem search failed: {e}") return [] # ----------------------------- # Main context collection # ----------------------------- async def collect_context(session_id: str, user_prompt: str) -> Dict[str, Any]: """ Collect unified context from all sources. Orchestrates: 1. Initialize or update session state 2. Calculate time since last message 3. Retrieve Intake multilevel summaries (L1-L30) 4. Search NeoMem for relevant long-term memories 5. Update session state with current user message 6. Return unified context_state dictionary Args: session_id: Session identifier user_prompt: Current user message Returns: Unified context state dictionary with structure: { "session_id": "...", "timestamp": "2025-11-28T12:34:56", "minutes_since_last_msg": 5.2, "message_count": 42, "intake": { "L1": [...], "L5": [...], "L10": {...}, "L20": {...}, "L30": {...} }, "rag": [ { "id": "mem_123", "score": 0.92, "payload": { "data": "...", "metadata": {...} } }, ... ], "mode": "default", "mood": "neutral", "active_project": null, "tools_available": ["RAG", "WEB", "WEATHER", "CODEBRAIN", "POKERBRAIN"] } """ # A. Initialize session state if needed if session_id not in SESSION_STATE: SESSION_STATE[session_id] = _init_session(session_id) logger.info(f"Initialized new session: {session_id}") if VERBOSE_DEBUG: logger.debug(f"[COLLECT_CONTEXT] New session state: {SESSION_STATE[session_id]}") state = SESSION_STATE[session_id] if VERBOSE_DEBUG: logger.debug(f"[COLLECT_CONTEXT] Session {session_id} - User prompt: {user_prompt[:100]}...") # B. Calculate time delta now = datetime.now() time_delta_seconds = (now - state["last_timestamp"]).total_seconds() minutes_since_last_msg = round(time_delta_seconds / 60.0, 2) if VERBOSE_DEBUG: logger.debug(f"[COLLECT_CONTEXT] Time since last message: {minutes_since_last_msg:.2f} minutes") # C. Gather Intake context (multilevel summaries) # Build compact message buffer for Intake: messages_for_intake = [] # You track messages inside SESSION_STATE — assemble it here: if "message_history" in state: for turn in state["message_history"]: messages_for_intake.append({ "user_msg": turn.get("user", ""), "assistant_msg": turn.get("assistant", "") }) intake_data = await _get_intake_context(session_id, messages_for_intake) if VERBOSE_DEBUG: import json logger.debug(f"[COLLECT_CONTEXT] Intake data retrieved:") logger.debug(json.dumps(intake_data, indent=2, default=str)) # D. Search NeoMem for relevant memories if NEOMEM_ENABLED: rag_results = await _search_neomem( query=user_prompt, user_id="brian", # TODO: Make configurable per session limit=5 ) else: rag_results = [] logger.info("Skipping NeoMem RAG retrieval; NEOMEM_ENABLED is false") if VERBOSE_DEBUG: logger.debug(f"[COLLECT_CONTEXT] NeoMem search returned {len(rag_results)} results") for idx, result in enumerate(rag_results, 1): score = result.get("score", 0) data_preview = str(result.get("payload", {}).get("data", ""))[:100] logger.debug(f" [{idx}] Score: {score:.3f} - {data_preview}...") # E. Update session state state["last_user_message"] = user_prompt state["last_timestamp"] = now state["message_count"] += 1 # Save user turn to history state["message_history"].append({ "user": user_prompt, "assistant": "" # assistant reply filled later by update_last_assistant_message() }) # F. Assemble unified context context_state = { "session_id": session_id, "timestamp": now.isoformat(), "minutes_since_last_msg": minutes_since_last_msg, "message_count": state["message_count"], "intake": intake_data, "rag": rag_results, "mode": state["mode"], "mood": state["mood"], "active_project": state["active_project"], "tools_available": TOOLS_AVAILABLE, } logger.info( f"Context collected for session {session_id}: " f"{len(rag_results)} RAG results, " f"{minutes_since_last_msg:.1f} minutes since last message" ) if VERBOSE_DEBUG: logger.debug(f"[COLLECT_CONTEXT] Final context state assembled:") logger.debug(f" - Message count: {state['message_count']}") logger.debug(f" - Mode: {state['mode']}, Mood: {state['mood']}") logger.debug(f" - Active project: {state['active_project']}") logger.debug(f" - Tools available: {TOOLS_AVAILABLE}") return context_state # ----------------------------- # Session state management # ----------------------------- def update_last_assistant_message(session_id: str, message: str) -> None: """ Update session state with assistant's response and complete the last turn inside message_history. """ session = SESSION_STATE.get(session_id) if not session: logger.warning(f"Attempted to update non-existent session: {session_id}") return # Update last assistant message + timestamp session["last_assistant_message"] = message session["last_timestamp"] = datetime.now() # Fill in assistant reply for the most recent turn history = session.get("message_history", []) if history: # history entry already contains {"user": "...", "assistant": "...?"} history[-1]["assistant"] = message if VERBOSE_DEBUG: logger.debug(f"Updated assistant message for session {session_id}") def get_session_state(session_id: str) -> Optional[Dict[str, Any]]: """ Retrieve current session state. Args: session_id: Session identifier Returns: Session state dict or None if session doesn't exist """ return SESSION_STATE.get(session_id) def close_session(session_id: str) -> bool: """ Close and cleanup a session. Args: session_id: Session identifier Returns: True if session was closed, False if it didn't exist """ if session_id in SESSION_STATE: del SESSION_STATE[session_id] logger.info(f"Closed session: {session_id}") return True return False # ----------------------------- # Extension hooks for future autonomy # ----------------------------- def update_mode(session_id: str, new_mode: str) -> None: """ Update session mode. Future modes: "autonomous", "focused", "creative", "collaborative", etc. Args: session_id: Session identifier new_mode: New mode string """ if session_id in SESSION_STATE: old_mode = SESSION_STATE[session_id]["mode"] SESSION_STATE[session_id]["mode"] = new_mode logger.info(f"Session {session_id} mode changed: {old_mode} -> {new_mode}") def update_mood(session_id: str, new_mood: str) -> None: """ Update session mood. Future implementation: Sentiment analysis, emotional state tracking. Args: session_id: Session identifier new_mood: New mood string """ if session_id in SESSION_STATE: old_mood = SESSION_STATE[session_id]["mood"] SESSION_STATE[session_id]["mood"] = new_mood logger.info(f"Session {session_id} mood changed: {old_mood} -> {new_mood}") def update_active_project(session_id: str, project: Optional[str]) -> None: """ Update active project context. Future implementation: Project-specific memory, tools, preferences. Args: session_id: Session identifier project: Project identifier or None """ if session_id in SESSION_STATE: SESSION_STATE[session_id]["active_project"] = project logger.info(f"Session {session_id} active project set to: {project}") async def autonomous_heartbeat(session_id: str) -> Optional[str]: """ Autonomous thinking heartbeat. Future implementation: - Check if Lyra should initiate internal dialogue - Generate self-prompted thoughts based on session state - Update mood/mode based on context changes - Trigger proactive suggestions or reminders Args: session_id: Session identifier Returns: Optional autonomous thought/action string """ # Stub for future implementation # Example logic: # - If minutes_since_last_msg > 60: Check for pending reminders # - If mood == "curious" and active_project: Generate research questions # - If mode == "autonomous": Self-prompt based on project goals logger.debug(f"Autonomous heartbeat for session {session_id} (not yet implemented)") return None