# context.py """ Context layer for Cortex reasoning pipeline. Provides unified context collection from: - Intake (short-term memory, multilevel summaries L1-L30) - NeoMem (long-term memory, semantic search) - Session state (timestamps, messages, mode, mood, active_project) Maintains per-session state for continuity across conversations. """ import os import logging from datetime import datetime from typing import Dict, Any, Optional, List import httpx from intake.intake import summarize_context from neomem_client import NeoMemClient # ----------------------------- # Configuration # ----------------------------- NEOMEM_API = os.getenv("NEOMEM_API", "http://neomem-api:8000") NEOMEM_ENABLED = os.getenv("NEOMEM_ENABLED", "false").lower() == "true" RELEVANCE_THRESHOLD = float(os.getenv("RELEVANCE_THRESHOLD", "0.4")) LOG_DETAIL_LEVEL = os.getenv("LOG_DETAIL_LEVEL", "summary").lower() # Loop detection settings MAX_MESSAGE_HISTORY = int(os.getenv("MAX_MESSAGE_HISTORY", "100")) # Prevent unbounded growth SESSION_TTL_HOURS = int(os.getenv("SESSION_TTL_HOURS", "24")) # Auto-expire old sessions ENABLE_DUPLICATE_DETECTION = os.getenv("ENABLE_DUPLICATE_DETECTION", "true").lower() == "true" # Tools available for future autonomy features TOOLS_AVAILABLE = ["RAG", "WEB", "WEATHER", "CODEBRAIN", "POKERBRAIN"] # ----------------------------- # Module-level session state # ----------------------------- SESSION_STATE: Dict[str, Dict[str, Any]] = {} # Logger logger = logging.getLogger(__name__) # Always set up basic logging logger.setLevel(logging.INFO) console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( '%(asctime)s [CONTEXT] %(levelname)s: %(message)s', datefmt='%H:%M:%S' )) logger.addHandler(console_handler) # ----------------------------- # Session initialization & cleanup # ----------------------------- def _init_session(session_id: str) -> Dict[str, Any]: """ Initialize a new session state entry. Returns: Dictionary with default session state fields """ return { "session_id": session_id, "created_at": datetime.now(), "last_timestamp": datetime.now(), "last_user_message": None, "last_assistant_message": None, "mode": "default", # Future: "autonomous", "focused", "creative", etc. "mood": "neutral", # Future: mood tracking "active_project": None, # Future: project context "message_count": 0, "message_history": [], "last_message_hash": None, # For duplicate detection } def _cleanup_expired_sessions(): """Remove sessions that haven't been active for SESSION_TTL_HOURS""" from datetime import timedelta now = datetime.now() expired_sessions = [] for session_id, state in SESSION_STATE.items(): last_active = state.get("last_timestamp", state.get("created_at")) time_since_active = (now - last_active).total_seconds() / 3600 # hours if time_since_active > SESSION_TTL_HOURS: expired_sessions.append(session_id) for session_id in expired_sessions: del SESSION_STATE[session_id] logger.info(f"🗑️ Expired session: {session_id} (inactive for {SESSION_TTL_HOURS}+ hours)") return len(expired_sessions) def _is_duplicate_message(session_id: str, user_prompt: str) -> bool: """ Check if this message is a duplicate of the last processed message. Uses simple hash comparison to detect exact duplicates or processing loops. """ if not ENABLE_DUPLICATE_DETECTION: return False import hashlib state = SESSION_STATE.get(session_id) if not state: return False # Create hash of normalized message message_hash = hashlib.md5(user_prompt.strip().lower().encode()).hexdigest() # Check if it matches the last message if state.get("last_message_hash") == message_hash: logger.warning( f"⚠️ DUPLICATE MESSAGE DETECTED | Session: {session_id} | " f"Message: {user_prompt[:80]}..." ) return True # Update hash for next check state["last_message_hash"] = message_hash return False def _trim_message_history(state: Dict[str, Any]): """ Trim message history to prevent unbounded growth. Keeps only the most recent MAX_MESSAGE_HISTORY messages. """ history = state.get("message_history", []) if len(history) > MAX_MESSAGE_HISTORY: trimmed_count = len(history) - MAX_MESSAGE_HISTORY state["message_history"] = history[-MAX_MESSAGE_HISTORY:] logger.info(f"✂️ Trimmed {trimmed_count} old messages from session {state['session_id']}") # ----------------------------- # Intake context retrieval # ----------------------------- async def _get_intake_context(session_id: str, messages: List[Dict[str, str]]): """ Internal Intake — Direct call to summarize_context() No HTTP, no containers, no failures. """ try: return await summarize_context(session_id, messages) except Exception as e: logger.error(f"Internal Intake summarization failed: {e}") return { "session_id": session_id, "L1": "", "L5": "", "L10": "", "L20": "", "L30": "", "error": str(e) } # ----------------------------- # NeoMem semantic search # ----------------------------- async def _search_neomem( query: str, user_id: str = "brian", limit: int = 5 ) -> List[Dict[str, Any]]: """ Search NeoMem for relevant long-term memories. Returns full response structure from NeoMem: [ { "id": "mem_abc123", "score": 0.92, "payload": { "data": "Memory text content...", "metadata": { "category": "...", "created_at": "...", ... } } }, ... ] Args: query: Search query text user_id: User identifier for memory filtering limit: Maximum number of results Returns: List of memory objects with full structure, or empty list on failure """ if not NEOMEM_ENABLED: logger.info("NeoMem search skipped (NEOMEM_ENABLED is false)") return [] try: # NeoMemClient reads NEOMEM_API from environment, no base_url parameter client = NeoMemClient() results = await client.search( query=query, user_id=user_id, limit=limit, threshold=RELEVANCE_THRESHOLD ) # Results are already filtered by threshold in NeoMemClient.search() logger.info(f"NeoMem search returned {len(results)} relevant results") return results except Exception as e: logger.warning(f"NeoMem search failed: {e}") return [] # ----------------------------- # Main context collection # ----------------------------- async def collect_context(session_id: str, user_prompt: str) -> Dict[str, Any]: """ Collect unified context from all sources. Orchestrates: 1. Initialize or update session state 2. Calculate time since last message 3. Retrieve Intake multilevel summaries (L1-L30) 4. Search NeoMem for relevant long-term memories 5. Update session state with current user message 6. Return unified context_state dictionary Args: session_id: Session identifier user_prompt: Current user message Returns: Unified context state dictionary with structure: { "session_id": "...", "timestamp": "2025-11-28T12:34:56", "minutes_since_last_msg": 5.2, "message_count": 42, "intake": { "L1": [...], "L5": [...], "L10": {...}, "L20": {...}, "L30": {...} }, "rag": [ { "id": "mem_123", "score": 0.92, "payload": { "data": "...", "metadata": {...} } }, ... ], "mode": "default", "mood": "neutral", "active_project": null, "tools_available": ["RAG", "WEB", "WEATHER", "CODEBRAIN", "POKERBRAIN"] } """ # A. Cleanup expired sessions periodically (every 100th call) import random if random.randint(1, 100) == 1: _cleanup_expired_sessions() # B. Initialize session state if needed if session_id not in SESSION_STATE: SESSION_STATE[session_id] = _init_session(session_id) logger.info(f"Initialized new session: {session_id}") state = SESSION_STATE[session_id] # C. Check for duplicate messages (loop detection) if _is_duplicate_message(session_id, user_prompt): # Return cached context with warning flag logger.warning(f"🔁 LOOP DETECTED - Returning cached context to prevent processing duplicate") context_state = { "session_id": session_id, "timestamp": datetime.now().isoformat(), "minutes_since_last_msg": 0, "message_count": state["message_count"], "intake": {}, "rag": [], "mode": state["mode"], "mood": state["mood"], "active_project": state["active_project"], "tools_available": TOOLS_AVAILABLE, "duplicate_detected": True, } return context_state # B. Calculate time delta now = datetime.now() time_delta_seconds = (now - state["last_timestamp"]).total_seconds() minutes_since_last_msg = round(time_delta_seconds / 60.0, 2) # C. Gather Intake context (multilevel summaries) # Build compact message buffer for Intake: messages_for_intake = [] # You track messages inside SESSION_STATE — assemble it here: if "message_history" in state: for turn in state["message_history"]: messages_for_intake.append({ "user_msg": turn.get("user", ""), "assistant_msg": turn.get("assistant", "") }) intake_data = await _get_intake_context(session_id, messages_for_intake) # D. Search NeoMem for relevant memories if NEOMEM_ENABLED: rag_results = await _search_neomem( query=user_prompt, user_id="brian", # TODO: Make configurable per session limit=5 ) else: rag_results = [] logger.info("Skipping NeoMem RAG retrieval; NEOMEM_ENABLED is false") # E. Update session state state["last_user_message"] = user_prompt state["last_timestamp"] = now state["message_count"] += 1 # Save user turn to history state["message_history"].append({ "user": user_prompt, "assistant": "" # assistant reply filled later by update_last_assistant_message() }) # Trim history to prevent unbounded growth _trim_message_history(state) # F. Assemble unified context context_state = { "session_id": session_id, "timestamp": now.isoformat(), "minutes_since_last_msg": minutes_since_last_msg, "message_count": state["message_count"], "intake": intake_data, "rag": rag_results, "mode": state["mode"], "mood": state["mood"], "active_project": state["active_project"], "tools_available": TOOLS_AVAILABLE, } # Log context summary in structured format logger.info( f"📊 Context | Session: {session_id} | " f"Messages: {state['message_count']} | " f"Last: {minutes_since_last_msg:.1f}min | " f"RAG: {len(rag_results)} results" ) # Show detailed context in detailed/verbose mode if LOG_DETAIL_LEVEL in ["detailed", "verbose"]: import json logger.info(f"\n{'─'*100}") logger.info(f"[CONTEXT] Session {session_id} | User: {user_prompt[:80]}...") logger.info(f"{'─'*100}") logger.info(f" Mode: {state['mode']} | Mood: {state['mood']} | Project: {state['active_project']}") logger.info(f" Tools: {', '.join(TOOLS_AVAILABLE)}") # Show intake summaries (condensed) if intake_data: logger.info(f"\n ╭─ INTAKE SUMMARIES ────────────────────────────────────────────────") for level in ["L1", "L5", "L10", "L20", "L30"]: if level in intake_data: summary = intake_data[level] if isinstance(summary, dict): summary_text = summary.get("summary", str(summary)[:100]) else: summary_text = str(summary)[:100] logger.info(f" │ {level:4s}: {summary_text}...") logger.info(f" ╰───────────────────────────────────────────────────────────────────") # Show RAG results (condensed) if rag_results: logger.info(f"\n ╭─ RAG RESULTS ({len(rag_results)}) ──────────────────────────────────────────────") for idx, result in enumerate(rag_results[:5], 1): # Show top 5 score = result.get("score", 0) data_preview = str(result.get("payload", {}).get("data", ""))[:60] logger.info(f" │ [{idx}] {score:.3f} | {data_preview}...") if len(rag_results) > 5: logger.info(f" │ ... and {len(rag_results) - 5} more results") logger.info(f" ╰───────────────────────────────────────────────────────────────────") # Show full raw data only in verbose mode if LOG_DETAIL_LEVEL == "verbose": logger.info(f"\n ╭─ RAW INTAKE DATA ─────────────────────────────────────────────────") logger.info(f" │ {json.dumps(intake_data, indent=4, default=str)}") logger.info(f" ╰───────────────────────────────────────────────────────────────────") logger.info(f"{'─'*100}\n") return context_state # ----------------------------- # Session state management # ----------------------------- def update_last_assistant_message(session_id: str, message: str) -> None: """ Update session state with assistant's response and complete the last turn inside message_history. """ session = SESSION_STATE.get(session_id) if not session: logger.warning(f"Attempted to update non-existent session: {session_id}") return # Update last assistant message + timestamp session["last_assistant_message"] = message session["last_timestamp"] = datetime.now() # Fill in assistant reply for the most recent turn history = session.get("message_history", []) if history: # history entry already contains {"user": "...", "assistant": "...?"} history[-1]["assistant"] = message def get_session_state(session_id: str) -> Optional[Dict[str, Any]]: """ Retrieve current session state. Args: session_id: Session identifier Returns: Session state dict or None if session doesn't exist """ return SESSION_STATE.get(session_id) def close_session(session_id: str) -> bool: """ Close and cleanup a session. Args: session_id: Session identifier Returns: True if session was closed, False if it didn't exist """ if session_id in SESSION_STATE: del SESSION_STATE[session_id] logger.info(f"Closed session: {session_id}") return True return False # ----------------------------- # Extension hooks for future autonomy # ----------------------------- def update_mode(session_id: str, new_mode: str) -> None: """ Update session mode. Future modes: "autonomous", "focused", "creative", "collaborative", etc. Args: session_id: Session identifier new_mode: New mode string """ if session_id in SESSION_STATE: old_mode = SESSION_STATE[session_id]["mode"] SESSION_STATE[session_id]["mode"] = new_mode logger.info(f"Session {session_id} mode changed: {old_mode} -> {new_mode}") def update_mood(session_id: str, new_mood: str) -> None: """ Update session mood. Future implementation: Sentiment analysis, emotional state tracking. Args: session_id: Session identifier new_mood: New mood string """ if session_id in SESSION_STATE: old_mood = SESSION_STATE[session_id]["mood"] SESSION_STATE[session_id]["mood"] = new_mood logger.info(f"Session {session_id} mood changed: {old_mood} -> {new_mood}") def update_active_project(session_id: str, project: Optional[str]) -> None: """ Update active project context. Future implementation: Project-specific memory, tools, preferences. Args: session_id: Session identifier project: Project identifier or None """ if session_id in SESSION_STATE: SESSION_STATE[session_id]["active_project"] = project logger.info(f"Session {session_id} active project set to: {project}") async def autonomous_heartbeat(session_id: str) -> Optional[str]: """ Autonomous thinking heartbeat. Future implementation: - Check if Lyra should initiate internal dialogue - Generate self-prompted thoughts based on session state - Update mood/mode based on context changes - Trigger proactive suggestions or reminders Args: session_id: Session identifier Returns: Optional autonomous thought/action string """ # Stub for future implementation # Example logic: # - If minutes_since_last_msg > 60: Check for pending reminders # - If mood == "curious" and active_project: Generate research questions # - If mode == "autonomous": Self-prompt based on project goals logger.debug(f"Autonomous heartbeat for session {session_id} (not yet implemented)") return None