Files
2025-12-14 14:43:08 -05:00

322 lines
11 KiB
Python

"""
Proactive Context Monitor - detects opportunities for autonomous suggestions.
"""
import logging
import time
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class ProactiveMonitor:
"""
Monitors conversation context and detects opportunities for proactive suggestions.
Triggers:
- Long silence → Check-in
- Learning queue + high curiosity → Suggest exploration
- Active goals → Progress reminders
- Conversation milestones → Offer summary
- Pattern detection → Helpful suggestions
"""
def __init__(self, min_priority: float = 0.6):
"""
Initialize proactive monitor.
Args:
min_priority: Minimum priority for suggestions (0.0-1.0)
"""
self.min_priority = min_priority
self.last_suggestion_time = {} # session_id -> timestamp
self.cooldown_seconds = 300 # 5 minutes between proactive suggestions
async def analyze_session(
self,
session_id: str,
context_state: Dict[str, Any],
self_state: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""
Analyze session for proactive suggestion opportunities.
Args:
session_id: Current session ID
context_state: Full context including message history
self_state: Lyra's current self-state
Returns:
{
"suggestion": "text to append to response",
"priority": 0.0-1.0,
"reason": "why this suggestion",
"type": "check_in | learning | goal_reminder | summary | pattern"
}
or None if no suggestion
"""
# Check cooldown
if not self._check_cooldown(session_id):
logger.debug(f"[PROACTIVE] Session {session_id} in cooldown, skipping")
return None
suggestions = []
# Check 1: Long silence detection
silence_suggestion = self._check_long_silence(context_state)
if silence_suggestion:
suggestions.append(silence_suggestion)
# Check 2: Learning queue + high curiosity
learning_suggestion = self._check_learning_opportunity(self_state)
if learning_suggestion:
suggestions.append(learning_suggestion)
# Check 3: Active goals reminder
goal_suggestion = self._check_active_goals(self_state, context_state)
if goal_suggestion:
suggestions.append(goal_suggestion)
# Check 4: Conversation milestones
milestone_suggestion = self._check_conversation_milestone(context_state)
if milestone_suggestion:
suggestions.append(milestone_suggestion)
# Check 5: Pattern-based suggestions
pattern_suggestion = self._check_patterns(context_state, self_state)
if pattern_suggestion:
suggestions.append(pattern_suggestion)
# Filter by priority and return highest
valid_suggestions = [s for s in suggestions if s["priority"] >= self.min_priority]
if not valid_suggestions:
return None
# Return highest priority suggestion
best_suggestion = max(valid_suggestions, key=lambda x: x["priority"])
# Update cooldown timer
self._update_cooldown(session_id)
logger.info(f"[PROACTIVE] Suggestion generated: {best_suggestion['type']} (priority: {best_suggestion['priority']:.2f})")
return best_suggestion
def _check_cooldown(self, session_id: str) -> bool:
"""Check if session is past cooldown period."""
if session_id not in self.last_suggestion_time:
return True
elapsed = time.time() - self.last_suggestion_time[session_id]
return elapsed >= self.cooldown_seconds
def _update_cooldown(self, session_id: str) -> None:
"""Update cooldown timer for session."""
self.last_suggestion_time[session_id] = time.time()
def _check_long_silence(self, context_state: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Check if user has been silent for a long time.
"""
minutes_since_last = context_state.get("minutes_since_last_msg", 0)
# If > 30 minutes, suggest check-in
if minutes_since_last > 30:
return {
"suggestion": "\n\n[Aside: I'm still here if you need anything!]",
"priority": 0.7,
"reason": f"User silent for {minutes_since_last:.0f} minutes",
"type": "check_in"
}
return None
def _check_learning_opportunity(self, self_state: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Check if Lyra has learning queue items and high curiosity.
"""
learning_queue = self_state.get("learning_queue", [])
curiosity = self_state.get("curiosity", 0.5)
# If curiosity > 0.7 and learning queue exists
if curiosity > 0.7 and learning_queue:
topic = learning_queue[0] if learning_queue else "new topics"
return {
"suggestion": f"\n\n[Aside: I've been curious about {topic} lately. Would you like to explore it together?]",
"priority": 0.65,
"reason": f"High curiosity ({curiosity:.2f}) and learning queue present",
"type": "learning"
}
return None
def _check_active_goals(
self,
self_state: Dict[str, Any],
context_state: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""
Check if there are active goals worth reminding about.
"""
active_goals = self_state.get("active_goals", [])
if not active_goals:
return None
# Check if we've had multiple messages without goal progress
message_count = context_state.get("message_count", 0)
# Every 10 messages, consider goal reminder
if message_count % 10 == 0 and message_count > 0:
goal = active_goals[0] # First active goal
goal_name = goal if isinstance(goal, str) else goal.get("name", "your goal")
return {
"suggestion": f"\n\n[Aside: Still thinking about {goal_name}. Let me know if you want to work on it.]",
"priority": 0.6,
"reason": f"Active goal present, {message_count} messages since start",
"type": "goal_reminder"
}
return None
def _check_conversation_milestone(self, context_state: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Check for conversation milestones (e.g., every 50 messages).
"""
message_count = context_state.get("message_count", 0)
# Every 50 messages, offer summary
if message_count > 0 and message_count % 50 == 0:
return {
"suggestion": f"\n\n[Aside: We've exchanged {message_count} messages! Would you like a summary of our conversation?]",
"priority": 0.65,
"reason": f"Milestone: {message_count} messages",
"type": "summary"
}
return None
def _check_patterns(
self,
context_state: Dict[str, Any],
self_state: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""
Check for behavioral patterns that merit suggestions.
"""
# Get current focus
focus = self_state.get("focus", "")
# Check if user keeps asking similar questions (detected via focus)
if focus and "repeated" in focus.lower():
return {
"suggestion": "\n\n[Aside: I notice we keep coming back to this topic. Would it help to create a summary or action plan?]",
"priority": 0.7,
"reason": "Repeated topic detected",
"type": "pattern"
}
# Check energy levels - if Lyra is low energy, maybe suggest break
energy = self_state.get("energy", 0.8)
if energy < 0.3:
return {
"suggestion": "\n\n[Aside: We've been at this for a while. Need a break or want to keep going?]",
"priority": 0.65,
"reason": f"Low energy ({energy:.2f})",
"type": "pattern"
}
return None
def format_suggestion(self, suggestion: Dict[str, Any]) -> str:
"""
Format suggestion for appending to response.
Args:
suggestion: Suggestion dict from analyze_session()
Returns:
Formatted string to append to response
"""
return suggestion.get("suggestion", "")
def set_cooldown_duration(self, seconds: int) -> None:
"""
Update cooldown duration.
Args:
seconds: New cooldown duration
"""
self.cooldown_seconds = seconds
logger.info(f"[PROACTIVE] Cooldown updated to {seconds}s")
def reset_cooldown(self, session_id: str) -> None:
"""
Reset cooldown for a specific session.
Args:
session_id: Session to reset
"""
if session_id in self.last_suggestion_time:
del self.last_suggestion_time[session_id]
logger.info(f"[PROACTIVE] Cooldown reset for session {session_id}")
def get_session_stats(self, session_id: str) -> Dict[str, Any]:
"""
Get stats for a session's proactive monitoring.
Args:
session_id: Session to check
Returns:
{
"last_suggestion_time": timestamp or None,
"seconds_since_last": int,
"cooldown_active": bool,
"cooldown_remaining": int
}
"""
last_time = self.last_suggestion_time.get(session_id)
if not last_time:
return {
"last_suggestion_time": None,
"seconds_since_last": 0,
"cooldown_active": False,
"cooldown_remaining": 0
}
seconds_since = int(time.time() - last_time)
cooldown_active = seconds_since < self.cooldown_seconds
cooldown_remaining = max(0, self.cooldown_seconds - seconds_since)
return {
"last_suggestion_time": last_time,
"seconds_since_last": seconds_since,
"cooldown_active": cooldown_active,
"cooldown_remaining": cooldown_remaining
}
# Singleton instance
_monitor_instance = None
def get_proactive_monitor(min_priority: float = 0.6) -> ProactiveMonitor:
"""
Get singleton proactive monitor instance.
Args:
min_priority: Minimum priority threshold (only used on first call)
Returns:
ProactiveMonitor instance
"""
global _monitor_instance
if _monitor_instance is None:
_monitor_instance = ProactiveMonitor(min_priority=min_priority)
return _monitor_instance