Files
project-lyra/cortex/context.py
2025-11-28 19:29:41 -05:00

413 lines
12 KiB
Python

# context.py
"""
Context layer for Cortex reasoning pipeline.
Provides unified context collection from:
- Intake (short-term memory, multilevel summaries L1-L30)
- NeoMem (long-term memory, semantic search)
- Session state (timestamps, messages, mode, mood, active_project)
Maintains per-session state for continuity across conversations.
"""
import os
import logging
from datetime import datetime
from typing import Dict, Any, Optional, List
import httpx
from neomem_client import NeoMemClient
# -----------------------------
# Configuration
# -----------------------------
INTAKE_API_URL = os.getenv("INTAKE_API_URL", "http://intake:7080")
NEOMEM_API = os.getenv("NEOMEM_API", "http://neomem-api:8000")
RELEVANCE_THRESHOLD = float(os.getenv("RELEVANCE_THRESHOLD", "0.4"))
# Tools available for future autonomy features
TOOLS_AVAILABLE = ["RAG", "WEB", "WEATHER", "CODEBRAIN", "POKERBRAIN"]
# -----------------------------
# Module-level session state
# -----------------------------
SESSION_STATE: Dict[str, Dict[str, Any]] = {}
# Logger
logger = logging.getLogger(__name__)
# -----------------------------
# Session initialization
# -----------------------------
def _init_session(session_id: str) -> Dict[str, Any]:
"""
Initialize a new session state entry.
Returns:
Dictionary with default session state fields
"""
return {
"session_id": session_id,
"created_at": datetime.now(),
"last_timestamp": datetime.now(),
"last_user_message": None,
"last_assistant_message": None,
"mode": "default", # Future: "autonomous", "focused", "creative", etc.
"mood": "neutral", # Future: mood tracking
"active_project": None, # Future: project context
"message_count": 0,
}
# -----------------------------
# Intake context retrieval
# -----------------------------
async def _get_intake_context(session_id: str) -> Dict[str, Any]:
"""
Retrieve multilevel summaries from Intake /context endpoint.
Returns L1-L30 summary hierarchy:
- L1: Last 5 exchanges
- L5: Last 10 exchanges (reality check)
- L10: Intermediate checkpoint
- L20: Session overview
- L30: Continuity report
Args:
session_id: Session identifier
Returns:
Dict with multilevel summaries or empty structure on failure
"""
url = f"{INTAKE_API_URL}/context"
params = {"session_id": session_id}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
# Expected format from Intake:
# {
# "session_id": "...",
# "L1": [...],
# "L5": [...],
# "L10": {...},
# "L20": {...},
# "L30": {...}
# }
logger.info(f"Retrieved Intake context for session {session_id}")
return data
except httpx.HTTPError as e:
logger.warning(f"Failed to retrieve Intake context: {e}")
return {
"session_id": session_id,
"L1": [],
"L5": [],
"L10": None,
"L20": None,
"L30": None,
"error": str(e)
}
except Exception as e:
logger.error(f"Unexpected error retrieving Intake context: {e}")
return {
"session_id": session_id,
"L1": [],
"L5": [],
"L10": None,
"L20": None,
"L30": None,
"error": str(e)
}
# -----------------------------
# NeoMem semantic search
# -----------------------------
async def _search_neomem(
query: str,
user_id: str = "brian",
limit: int = 5
) -> List[Dict[str, Any]]:
"""
Search NeoMem for relevant long-term memories.
Returns full response structure from NeoMem:
[
{
"id": "mem_abc123",
"score": 0.92,
"payload": {
"data": "Memory text content...",
"metadata": {
"category": "...",
"created_at": "...",
...
}
}
},
...
]
Args:
query: Search query text
user_id: User identifier for memory filtering
limit: Maximum number of results
Returns:
List of memory objects with full structure, or empty list on failure
"""
try:
client = NeoMemClient(base_url=NEOMEM_API)
results = await client.search(
query=query,
user_id=user_id,
limit=limit
)
# Filter by relevance threshold
filtered = [
r for r in results
if r.get("score", 0.0) >= RELEVANCE_THRESHOLD
]
logger.info(f"NeoMem search returned {len(filtered)}/{len(results)} relevant results")
return filtered
except Exception as e:
logger.warning(f"NeoMem search failed: {e}")
return []
# -----------------------------
# Main context collection
# -----------------------------
async def collect_context(session_id: str, user_prompt: str) -> Dict[str, Any]:
"""
Collect unified context from all sources.
Orchestrates:
1. Initialize or update session state
2. Calculate time since last message
3. Retrieve Intake multilevel summaries (L1-L30)
4. Search NeoMem for relevant long-term memories
5. Update session state with current user message
6. Return unified context_state dictionary
Args:
session_id: Session identifier
user_prompt: Current user message
Returns:
Unified context state dictionary with structure:
{
"session_id": "...",
"timestamp": "2025-11-28T12:34:56",
"minutes_since_last_msg": 5.2,
"message_count": 42,
"intake": {
"L1": [...],
"L5": [...],
"L10": {...},
"L20": {...},
"L30": {...}
},
"rag": [
{
"id": "mem_123",
"score": 0.92,
"payload": {
"data": "...",
"metadata": {...}
}
},
...
],
"mode": "default",
"mood": "neutral",
"active_project": null,
"tools_available": ["RAG", "WEB", "WEATHER", "CODEBRAIN", "POKERBRAIN"]
}
"""
# A. Initialize session state if needed
if session_id not in SESSION_STATE:
SESSION_STATE[session_id] = _init_session(session_id)
logger.info(f"Initialized new session: {session_id}")
state = SESSION_STATE[session_id]
# B. Calculate time delta
now = datetime.now()
time_delta_seconds = (now - state["last_timestamp"]).total_seconds()
minutes_since_last_msg = round(time_delta_seconds / 60.0, 2)
# C. Gather Intake context (multilevel summaries)
intake_data = await _get_intake_context(session_id)
# D. Search NeoMem for relevant memories
rag_results = await _search_neomem(
query=user_prompt,
user_id="brian", # TODO: Make configurable per session
limit=5
)
# E. Update session state
state["last_user_message"] = user_prompt
state["last_timestamp"] = now
state["message_count"] += 1
# F. Assemble unified context
context_state = {
"session_id": session_id,
"timestamp": now.isoformat(),
"minutes_since_last_msg": minutes_since_last_msg,
"message_count": state["message_count"],
"intake": intake_data,
"rag": rag_results,
"mode": state["mode"],
"mood": state["mood"],
"active_project": state["active_project"],
"tools_available": TOOLS_AVAILABLE,
}
logger.info(
f"Context collected for session {session_id}: "
f"{len(rag_results)} RAG results, "
f"{minutes_since_last_msg:.1f} minutes since last message"
)
return context_state
# -----------------------------
# Session state management
# -----------------------------
def update_last_assistant_message(session_id: str, message: str) -> None:
"""
Update session state with assistant's response.
Called by router.py after persona layer completes.
Args:
session_id: Session identifier
message: Assistant's final response text
"""
if session_id in SESSION_STATE:
SESSION_STATE[session_id]["last_assistant_message"] = message
SESSION_STATE[session_id]["last_timestamp"] = datetime.now()
logger.debug(f"Updated assistant message for session {session_id}")
else:
logger.warning(f"Attempted to update non-existent session: {session_id}")
def get_session_state(session_id: str) -> Optional[Dict[str, Any]]:
"""
Retrieve current session state.
Args:
session_id: Session identifier
Returns:
Session state dict or None if session doesn't exist
"""
return SESSION_STATE.get(session_id)
def close_session(session_id: str) -> bool:
"""
Close and cleanup a session.
Args:
session_id: Session identifier
Returns:
True if session was closed, False if it didn't exist
"""
if session_id in SESSION_STATE:
del SESSION_STATE[session_id]
logger.info(f"Closed session: {session_id}")
return True
return False
# -----------------------------
# Extension hooks for future autonomy
# -----------------------------
def update_mode(session_id: str, new_mode: str) -> None:
"""
Update session mode.
Future modes: "autonomous", "focused", "creative", "collaborative", etc.
Args:
session_id: Session identifier
new_mode: New mode string
"""
if session_id in SESSION_STATE:
old_mode = SESSION_STATE[session_id]["mode"]
SESSION_STATE[session_id]["mode"] = new_mode
logger.info(f"Session {session_id} mode changed: {old_mode} -> {new_mode}")
def update_mood(session_id: str, new_mood: str) -> None:
"""
Update session mood.
Future implementation: Sentiment analysis, emotional state tracking.
Args:
session_id: Session identifier
new_mood: New mood string
"""
if session_id in SESSION_STATE:
old_mood = SESSION_STATE[session_id]["mood"]
SESSION_STATE[session_id]["mood"] = new_mood
logger.info(f"Session {session_id} mood changed: {old_mood} -> {new_mood}")
def update_active_project(session_id: str, project: Optional[str]) -> None:
"""
Update active project context.
Future implementation: Project-specific memory, tools, preferences.
Args:
session_id: Session identifier
project: Project identifier or None
"""
if session_id in SESSION_STATE:
SESSION_STATE[session_id]["active_project"] = project
logger.info(f"Session {session_id} active project set to: {project}")
async def autonomous_heartbeat(session_id: str) -> Optional[str]:
"""
Autonomous thinking heartbeat.
Future implementation:
- Check if Lyra should initiate internal dialogue
- Generate self-prompted thoughts based on session state
- Update mood/mode based on context changes
- Trigger proactive suggestions or reminders
Args:
session_id: Session identifier
Returns:
Optional autonomous thought/action string
"""
# Stub for future implementation
# Example logic:
# - If minutes_since_last_msg > 60: Check for pending reminders
# - If mood == "curious" and active_project: Generate research questions
# - If mode == "autonomous": Self-prompt based on project goals
logger.debug(f"Autonomous heartbeat for session {session_id} (not yet implemented)")
return None