554 lines
19 KiB
Python
554 lines
19 KiB
Python
# context.py
|
|
"""
|
|
Context layer for Cortex reasoning pipeline.
|
|
|
|
Provides unified context collection from:
|
|
- Intake (short-term memory, multilevel summaries L1-L30)
|
|
- NeoMem (long-term memory, semantic search)
|
|
- Session state (timestamps, messages, mode, mood, active_project)
|
|
|
|
Maintains per-session state for continuity across conversations.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, List
|
|
import httpx
|
|
from intake.intake import summarize_context
|
|
|
|
|
|
from neomem_client import NeoMemClient
|
|
|
|
# -----------------------------
|
|
# Configuration
|
|
# -----------------------------
|
|
NEOMEM_API = os.getenv("NEOMEM_API", "http://neomem-api:8000")
|
|
NEOMEM_ENABLED = os.getenv("NEOMEM_ENABLED", "false").lower() == "true"
|
|
RELEVANCE_THRESHOLD = float(os.getenv("RELEVANCE_THRESHOLD", "0.4"))
|
|
LOG_DETAIL_LEVEL = os.getenv("LOG_DETAIL_LEVEL", "summary").lower()
|
|
|
|
# Loop detection settings
|
|
MAX_MESSAGE_HISTORY = int(os.getenv("MAX_MESSAGE_HISTORY", "100")) # Prevent unbounded growth
|
|
SESSION_TTL_HOURS = int(os.getenv("SESSION_TTL_HOURS", "24")) # Auto-expire old sessions
|
|
ENABLE_DUPLICATE_DETECTION = os.getenv("ENABLE_DUPLICATE_DETECTION", "true").lower() == "true"
|
|
|
|
# Tools available for future autonomy features
|
|
TOOLS_AVAILABLE = ["RAG", "WEB", "WEATHER", "CODEBRAIN", "POKERBRAIN"]
|
|
|
|
# -----------------------------
|
|
# Module-level session state
|
|
# -----------------------------
|
|
SESSION_STATE: Dict[str, Dict[str, Any]] = {}
|
|
|
|
# Logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Always set up basic logging
|
|
logger.setLevel(logging.INFO)
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(logging.Formatter(
|
|
'%(asctime)s [CONTEXT] %(levelname)s: %(message)s',
|
|
datefmt='%H:%M:%S'
|
|
))
|
|
logger.addHandler(console_handler)
|
|
|
|
|
|
# -----------------------------
|
|
# Session initialization & cleanup
|
|
# -----------------------------
|
|
def _init_session(session_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Initialize a new session state entry.
|
|
|
|
Returns:
|
|
Dictionary with default session state fields
|
|
"""
|
|
return {
|
|
"session_id": session_id,
|
|
"created_at": datetime.now(),
|
|
"last_timestamp": datetime.now(),
|
|
"last_user_message": None,
|
|
"last_assistant_message": None,
|
|
"mode": "default", # Future: "autonomous", "focused", "creative", etc.
|
|
"mood": "neutral", # Future: mood tracking
|
|
"active_project": None, # Future: project context
|
|
"message_count": 0,
|
|
"message_history": [],
|
|
"last_message_hash": None, # For duplicate detection
|
|
}
|
|
|
|
|
|
def _cleanup_expired_sessions():
|
|
"""Remove sessions that haven't been active for SESSION_TTL_HOURS"""
|
|
from datetime import timedelta
|
|
|
|
now = datetime.now()
|
|
expired_sessions = []
|
|
|
|
for session_id, state in SESSION_STATE.items():
|
|
last_active = state.get("last_timestamp", state.get("created_at"))
|
|
time_since_active = (now - last_active).total_seconds() / 3600 # hours
|
|
|
|
if time_since_active > SESSION_TTL_HOURS:
|
|
expired_sessions.append(session_id)
|
|
|
|
for session_id in expired_sessions:
|
|
del SESSION_STATE[session_id]
|
|
logger.info(f"🗑️ Expired session: {session_id} (inactive for {SESSION_TTL_HOURS}+ hours)")
|
|
|
|
return len(expired_sessions)
|
|
|
|
|
|
def _is_duplicate_message(session_id: str, user_prompt: str) -> bool:
|
|
"""
|
|
Check if this message is a duplicate of the last processed message.
|
|
|
|
Uses simple hash comparison to detect exact duplicates or processing loops.
|
|
"""
|
|
if not ENABLE_DUPLICATE_DETECTION:
|
|
return False
|
|
|
|
import hashlib
|
|
|
|
state = SESSION_STATE.get(session_id)
|
|
if not state:
|
|
return False
|
|
|
|
# Create hash of normalized message
|
|
message_hash = hashlib.md5(user_prompt.strip().lower().encode()).hexdigest()
|
|
|
|
# Check if it matches the last message
|
|
if state.get("last_message_hash") == message_hash:
|
|
logger.warning(
|
|
f"⚠️ DUPLICATE MESSAGE DETECTED | Session: {session_id} | "
|
|
f"Message: {user_prompt[:80]}..."
|
|
)
|
|
return True
|
|
|
|
# Update hash for next check
|
|
state["last_message_hash"] = message_hash
|
|
return False
|
|
|
|
|
|
def _trim_message_history(state: Dict[str, Any]):
|
|
"""
|
|
Trim message history to prevent unbounded growth.
|
|
|
|
Keeps only the most recent MAX_MESSAGE_HISTORY messages.
|
|
"""
|
|
history = state.get("message_history", [])
|
|
|
|
if len(history) > MAX_MESSAGE_HISTORY:
|
|
trimmed_count = len(history) - MAX_MESSAGE_HISTORY
|
|
state["message_history"] = history[-MAX_MESSAGE_HISTORY:]
|
|
logger.info(f"✂️ Trimmed {trimmed_count} old messages from session {state['session_id']}")
|
|
|
|
|
|
# -----------------------------
|
|
# Intake context retrieval
|
|
# -----------------------------
|
|
async def _get_intake_context(session_id: str, messages: List[Dict[str, str]]):
|
|
"""
|
|
Internal Intake — Direct call to summarize_context()
|
|
No HTTP, no containers, no failures.
|
|
"""
|
|
try:
|
|
return await summarize_context(session_id, messages)
|
|
except Exception as e:
|
|
logger.error(f"Internal Intake summarization failed: {e}")
|
|
return {
|
|
"session_id": session_id,
|
|
"L1": "",
|
|
"L5": "",
|
|
"L10": "",
|
|
"L20": "",
|
|
"L30": "",
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
|
|
# -----------------------------
|
|
# NeoMem semantic search
|
|
# -----------------------------
|
|
async def _search_neomem(
|
|
query: str,
|
|
user_id: str = "brian",
|
|
limit: int = 5
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Search NeoMem for relevant long-term memories.
|
|
|
|
Returns full response structure from NeoMem:
|
|
[
|
|
{
|
|
"id": "mem_abc123",
|
|
"score": 0.92,
|
|
"payload": {
|
|
"data": "Memory text content...",
|
|
"metadata": {
|
|
"category": "...",
|
|
"created_at": "...",
|
|
...
|
|
}
|
|
}
|
|
},
|
|
...
|
|
]
|
|
|
|
Args:
|
|
query: Search query text
|
|
user_id: User identifier for memory filtering
|
|
limit: Maximum number of results
|
|
|
|
Returns:
|
|
List of memory objects with full structure, or empty list on failure
|
|
"""
|
|
if not NEOMEM_ENABLED:
|
|
logger.info("NeoMem search skipped (NEOMEM_ENABLED is false)")
|
|
return []
|
|
|
|
try:
|
|
# NeoMemClient reads NEOMEM_API from environment, no base_url parameter
|
|
client = NeoMemClient()
|
|
results = await client.search(
|
|
query=query,
|
|
user_id=user_id,
|
|
limit=limit,
|
|
threshold=RELEVANCE_THRESHOLD
|
|
)
|
|
|
|
# Results are already filtered by threshold in NeoMemClient.search()
|
|
logger.info(f"NeoMem search returned {len(results)} relevant results")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.warning(f"NeoMem search failed: {e}")
|
|
return []
|
|
|
|
|
|
# -----------------------------
|
|
# Main context collection
|
|
# -----------------------------
|
|
async def collect_context(session_id: str, user_prompt: str) -> Dict[str, Any]:
|
|
"""
|
|
Collect unified context from all sources.
|
|
|
|
Orchestrates:
|
|
1. Initialize or update session state
|
|
2. Calculate time since last message
|
|
3. Retrieve Intake multilevel summaries (L1-L30)
|
|
4. Search NeoMem for relevant long-term memories
|
|
5. Update session state with current user message
|
|
6. Return unified context_state dictionary
|
|
|
|
Args:
|
|
session_id: Session identifier
|
|
user_prompt: Current user message
|
|
|
|
Returns:
|
|
Unified context state dictionary with structure:
|
|
{
|
|
"session_id": "...",
|
|
"timestamp": "2025-11-28T12:34:56",
|
|
"minutes_since_last_msg": 5.2,
|
|
"message_count": 42,
|
|
"intake": {
|
|
"L1": [...],
|
|
"L5": [...],
|
|
"L10": {...},
|
|
"L20": {...},
|
|
"L30": {...}
|
|
},
|
|
"rag": [
|
|
{
|
|
"id": "mem_123",
|
|
"score": 0.92,
|
|
"payload": {
|
|
"data": "...",
|
|
"metadata": {...}
|
|
}
|
|
},
|
|
...
|
|
],
|
|
"mode": "default",
|
|
"mood": "neutral",
|
|
"active_project": null,
|
|
"tools_available": ["RAG", "WEB", "WEATHER", "CODEBRAIN", "POKERBRAIN"]
|
|
}
|
|
"""
|
|
|
|
# A. Cleanup expired sessions periodically (every 100th call)
|
|
import random
|
|
if random.randint(1, 100) == 1:
|
|
_cleanup_expired_sessions()
|
|
|
|
# B. Initialize session state if needed
|
|
if session_id not in SESSION_STATE:
|
|
SESSION_STATE[session_id] = _init_session(session_id)
|
|
logger.info(f"Initialized new session: {session_id}")
|
|
|
|
state = SESSION_STATE[session_id]
|
|
|
|
# C. Check for duplicate messages (loop detection)
|
|
if _is_duplicate_message(session_id, user_prompt):
|
|
# Return cached context with warning flag
|
|
logger.warning(f"🔁 LOOP DETECTED - Returning cached context to prevent processing duplicate")
|
|
context_state = {
|
|
"session_id": session_id,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"minutes_since_last_msg": 0,
|
|
"message_count": state["message_count"],
|
|
"intake": {},
|
|
"rag": [],
|
|
"mode": state["mode"],
|
|
"mood": state["mood"],
|
|
"active_project": state["active_project"],
|
|
"tools_available": TOOLS_AVAILABLE,
|
|
"duplicate_detected": True,
|
|
}
|
|
return context_state
|
|
|
|
# B. Calculate time delta
|
|
now = datetime.now()
|
|
time_delta_seconds = (now - state["last_timestamp"]).total_seconds()
|
|
minutes_since_last_msg = round(time_delta_seconds / 60.0, 2)
|
|
|
|
# C. Gather Intake context (multilevel summaries)
|
|
# Build compact message buffer for Intake:
|
|
messages_for_intake = []
|
|
|
|
# You track messages inside SESSION_STATE — assemble it here:
|
|
if "message_history" in state:
|
|
for turn in state["message_history"]:
|
|
messages_for_intake.append({
|
|
"user_msg": turn.get("user", ""),
|
|
"assistant_msg": turn.get("assistant", "")
|
|
})
|
|
|
|
intake_data = await _get_intake_context(session_id, messages_for_intake)
|
|
|
|
# D. Search NeoMem for relevant memories
|
|
if NEOMEM_ENABLED:
|
|
rag_results = await _search_neomem(
|
|
query=user_prompt,
|
|
user_id="brian", # TODO: Make configurable per session
|
|
limit=5
|
|
)
|
|
else:
|
|
rag_results = []
|
|
logger.info("Skipping NeoMem RAG retrieval; NEOMEM_ENABLED is false")
|
|
|
|
# E. Update session state
|
|
state["last_user_message"] = user_prompt
|
|
state["last_timestamp"] = now
|
|
state["message_count"] += 1
|
|
|
|
# Save user turn to history
|
|
state["message_history"].append({
|
|
"user": user_prompt,
|
|
"assistant": "" # assistant reply filled later by update_last_assistant_message()
|
|
})
|
|
|
|
# Trim history to prevent unbounded growth
|
|
_trim_message_history(state)
|
|
|
|
|
|
|
|
# F. Assemble unified context
|
|
context_state = {
|
|
"session_id": session_id,
|
|
"timestamp": now.isoformat(),
|
|
"minutes_since_last_msg": minutes_since_last_msg,
|
|
"message_count": state["message_count"],
|
|
"intake": intake_data,
|
|
"rag": rag_results,
|
|
"mode": state["mode"],
|
|
"mood": state["mood"],
|
|
"active_project": state["active_project"],
|
|
"tools_available": TOOLS_AVAILABLE,
|
|
}
|
|
|
|
# Log context summary in structured format
|
|
logger.info(
|
|
f"📊 Context | Session: {session_id} | "
|
|
f"Messages: {state['message_count']} | "
|
|
f"Last: {minutes_since_last_msg:.1f}min | "
|
|
f"RAG: {len(rag_results)} results"
|
|
)
|
|
|
|
# Show detailed context in detailed/verbose mode
|
|
if LOG_DETAIL_LEVEL in ["detailed", "verbose"]:
|
|
import json
|
|
logger.info(f"\n{'─'*100}")
|
|
logger.info(f"[CONTEXT] Session {session_id} | User: {user_prompt[:80]}...")
|
|
logger.info(f"{'─'*100}")
|
|
logger.info(f" Mode: {state['mode']} | Mood: {state['mood']} | Project: {state['active_project']}")
|
|
logger.info(f" Tools: {', '.join(TOOLS_AVAILABLE)}")
|
|
|
|
# Show intake summaries (condensed)
|
|
if intake_data:
|
|
logger.info(f"\n ╭─ INTAKE SUMMARIES ────────────────────────────────────────────────")
|
|
for level in ["L1", "L5", "L10", "L20", "L30"]:
|
|
if level in intake_data:
|
|
summary = intake_data[level]
|
|
if isinstance(summary, dict):
|
|
summary_text = summary.get("summary", str(summary)[:100])
|
|
else:
|
|
summary_text = str(summary)[:100]
|
|
logger.info(f" │ {level:4s}: {summary_text}...")
|
|
logger.info(f" ╰───────────────────────────────────────────────────────────────────")
|
|
|
|
# Show RAG results (condensed)
|
|
if rag_results:
|
|
logger.info(f"\n ╭─ RAG RESULTS ({len(rag_results)}) ──────────────────────────────────────────────")
|
|
for idx, result in enumerate(rag_results[:5], 1): # Show top 5
|
|
score = result.get("score", 0)
|
|
data_preview = str(result.get("payload", {}).get("data", ""))[:60]
|
|
logger.info(f" │ [{idx}] {score:.3f} | {data_preview}...")
|
|
if len(rag_results) > 5:
|
|
logger.info(f" │ ... and {len(rag_results) - 5} more results")
|
|
logger.info(f" ╰───────────────────────────────────────────────────────────────────")
|
|
|
|
# Show full raw data only in verbose mode
|
|
if LOG_DETAIL_LEVEL == "verbose":
|
|
logger.info(f"\n ╭─ RAW INTAKE DATA ─────────────────────────────────────────────────")
|
|
logger.info(f" │ {json.dumps(intake_data, indent=4, default=str)}")
|
|
logger.info(f" ╰───────────────────────────────────────────────────────────────────")
|
|
|
|
logger.info(f"{'─'*100}\n")
|
|
|
|
return context_state
|
|
|
|
|
|
# -----------------------------
|
|
# Session state management
|
|
# -----------------------------
|
|
def update_last_assistant_message(session_id: str, message: str) -> None:
|
|
"""
|
|
Update session state with assistant's response and complete
|
|
the last turn inside message_history.
|
|
"""
|
|
session = SESSION_STATE.get(session_id)
|
|
if not session:
|
|
logger.warning(f"Attempted to update non-existent session: {session_id}")
|
|
return
|
|
|
|
# Update last assistant message + timestamp
|
|
session["last_assistant_message"] = message
|
|
session["last_timestamp"] = datetime.now()
|
|
|
|
# Fill in assistant reply for the most recent turn
|
|
history = session.get("message_history", [])
|
|
if history:
|
|
# history entry already contains {"user": "...", "assistant": "...?"}
|
|
history[-1]["assistant"] = message
|
|
|
|
|
|
|
|
def get_session_state(session_id: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Retrieve current session state.
|
|
|
|
Args:
|
|
session_id: Session identifier
|
|
|
|
Returns:
|
|
Session state dict or None if session doesn't exist
|
|
"""
|
|
return SESSION_STATE.get(session_id)
|
|
|
|
|
|
def close_session(session_id: str) -> bool:
|
|
"""
|
|
Close and cleanup a session.
|
|
|
|
Args:
|
|
session_id: Session identifier
|
|
|
|
Returns:
|
|
True if session was closed, False if it didn't exist
|
|
"""
|
|
if session_id in SESSION_STATE:
|
|
del SESSION_STATE[session_id]
|
|
logger.info(f"Closed session: {session_id}")
|
|
return True
|
|
return False
|
|
|
|
|
|
# -----------------------------
|
|
# Extension hooks for future autonomy
|
|
# -----------------------------
|
|
def update_mode(session_id: str, new_mode: str) -> None:
|
|
"""
|
|
Update session mode.
|
|
|
|
Future modes: "autonomous", "focused", "creative", "collaborative", etc.
|
|
|
|
Args:
|
|
session_id: Session identifier
|
|
new_mode: New mode string
|
|
"""
|
|
if session_id in SESSION_STATE:
|
|
old_mode = SESSION_STATE[session_id]["mode"]
|
|
SESSION_STATE[session_id]["mode"] = new_mode
|
|
logger.info(f"Session {session_id} mode changed: {old_mode} -> {new_mode}")
|
|
|
|
|
|
def update_mood(session_id: str, new_mood: str) -> None:
|
|
"""
|
|
Update session mood.
|
|
|
|
Future implementation: Sentiment analysis, emotional state tracking.
|
|
|
|
Args:
|
|
session_id: Session identifier
|
|
new_mood: New mood string
|
|
"""
|
|
if session_id in SESSION_STATE:
|
|
old_mood = SESSION_STATE[session_id]["mood"]
|
|
SESSION_STATE[session_id]["mood"] = new_mood
|
|
logger.info(f"Session {session_id} mood changed: {old_mood} -> {new_mood}")
|
|
|
|
|
|
def update_active_project(session_id: str, project: Optional[str]) -> None:
|
|
"""
|
|
Update active project context.
|
|
|
|
Future implementation: Project-specific memory, tools, preferences.
|
|
|
|
Args:
|
|
session_id: Session identifier
|
|
project: Project identifier or None
|
|
"""
|
|
if session_id in SESSION_STATE:
|
|
SESSION_STATE[session_id]["active_project"] = project
|
|
logger.info(f"Session {session_id} active project set to: {project}")
|
|
|
|
|
|
async def autonomous_heartbeat(session_id: str) -> Optional[str]:
|
|
"""
|
|
Autonomous thinking heartbeat.
|
|
|
|
Future implementation:
|
|
- Check if Lyra should initiate internal dialogue
|
|
- Generate self-prompted thoughts based on session state
|
|
- Update mood/mode based on context changes
|
|
- Trigger proactive suggestions or reminders
|
|
|
|
Args:
|
|
session_id: Session identifier
|
|
|
|
Returns:
|
|
Optional autonomous thought/action string
|
|
"""
|
|
# Stub for future implementation
|
|
# Example logic:
|
|
# - If minutes_since_last_msg > 60: Check for pending reminders
|
|
# - If mood == "curious" and active_project: Generate research questions
|
|
# - If mode == "autonomous": Self-prompt based on project goals
|
|
|
|
logger.debug(f"Autonomous heartbeat for session {session_id} (not yet implemented)")
|
|
return None
|