""" Proactive Context Monitor - detects opportunities for autonomous suggestions. """ import logging import time from typing import Dict, List, Any, Optional from datetime import datetime, timedelta logger = logging.getLogger(__name__) class ProactiveMonitor: """ Monitors conversation context and detects opportunities for proactive suggestions. Triggers: - Long silence → Check-in - Learning queue + high curiosity → Suggest exploration - Active goals → Progress reminders - Conversation milestones → Offer summary - Pattern detection → Helpful suggestions """ def __init__(self, min_priority: float = 0.6): """ Initialize proactive monitor. Args: min_priority: Minimum priority for suggestions (0.0-1.0) """ self.min_priority = min_priority self.last_suggestion_time = {} # session_id -> timestamp self.cooldown_seconds = 300 # 5 minutes between proactive suggestions async def analyze_session( self, session_id: str, context_state: Dict[str, Any], self_state: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """ Analyze session for proactive suggestion opportunities. Args: session_id: Current session ID context_state: Full context including message history self_state: Lyra's current self-state Returns: { "suggestion": "text to append to response", "priority": 0.0-1.0, "reason": "why this suggestion", "type": "check_in | learning | goal_reminder | summary | pattern" } or None if no suggestion """ # Check cooldown if not self._check_cooldown(session_id): logger.debug(f"[PROACTIVE] Session {session_id} in cooldown, skipping") return None suggestions = [] # Check 1: Long silence detection silence_suggestion = self._check_long_silence(context_state) if silence_suggestion: suggestions.append(silence_suggestion) # Check 2: Learning queue + high curiosity learning_suggestion = self._check_learning_opportunity(self_state) if learning_suggestion: suggestions.append(learning_suggestion) # Check 3: Active goals reminder goal_suggestion = self._check_active_goals(self_state, context_state) if goal_suggestion: suggestions.append(goal_suggestion) # Check 4: Conversation milestones milestone_suggestion = self._check_conversation_milestone(context_state) if milestone_suggestion: suggestions.append(milestone_suggestion) # Check 5: Pattern-based suggestions pattern_suggestion = self._check_patterns(context_state, self_state) if pattern_suggestion: suggestions.append(pattern_suggestion) # Filter by priority and return highest valid_suggestions = [s for s in suggestions if s["priority"] >= self.min_priority] if not valid_suggestions: return None # Return highest priority suggestion best_suggestion = max(valid_suggestions, key=lambda x: x["priority"]) # Update cooldown timer self._update_cooldown(session_id) logger.info(f"[PROACTIVE] Suggestion generated: {best_suggestion['type']} (priority: {best_suggestion['priority']:.2f})") return best_suggestion def _check_cooldown(self, session_id: str) -> bool: """Check if session is past cooldown period.""" if session_id not in self.last_suggestion_time: return True elapsed = time.time() - self.last_suggestion_time[session_id] return elapsed >= self.cooldown_seconds def _update_cooldown(self, session_id: str) -> None: """Update cooldown timer for session.""" self.last_suggestion_time[session_id] = time.time() def _check_long_silence(self, context_state: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Check if user has been silent for a long time. """ minutes_since_last = context_state.get("minutes_since_last_msg", 0) # If > 30 minutes, suggest check-in if minutes_since_last > 30: return { "suggestion": "\n\n[Aside: I'm still here if you need anything!]", "priority": 0.7, "reason": f"User silent for {minutes_since_last:.0f} minutes", "type": "check_in" } return None def _check_learning_opportunity(self, self_state: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Check if Lyra has learning queue items and high curiosity. """ learning_queue = self_state.get("learning_queue", []) curiosity = self_state.get("curiosity", 0.5) # If curiosity > 0.7 and learning queue exists if curiosity > 0.7 and learning_queue: topic = learning_queue[0] if learning_queue else "new topics" return { "suggestion": f"\n\n[Aside: I've been curious about {topic} lately. Would you like to explore it together?]", "priority": 0.65, "reason": f"High curiosity ({curiosity:.2f}) and learning queue present", "type": "learning" } return None def _check_active_goals( self, self_state: Dict[str, Any], context_state: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """ Check if there are active goals worth reminding about. """ active_goals = self_state.get("active_goals", []) if not active_goals: return None # Check if we've had multiple messages without goal progress message_count = context_state.get("message_count", 0) # Every 10 messages, consider goal reminder if message_count % 10 == 0 and message_count > 0: goal = active_goals[0] # First active goal goal_name = goal if isinstance(goal, str) else goal.get("name", "your goal") return { "suggestion": f"\n\n[Aside: Still thinking about {goal_name}. Let me know if you want to work on it.]", "priority": 0.6, "reason": f"Active goal present, {message_count} messages since start", "type": "goal_reminder" } return None def _check_conversation_milestone(self, context_state: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Check for conversation milestones (e.g., every 50 messages). """ message_count = context_state.get("message_count", 0) # Every 50 messages, offer summary if message_count > 0 and message_count % 50 == 0: return { "suggestion": f"\n\n[Aside: We've exchanged {message_count} messages! Would you like a summary of our conversation?]", "priority": 0.65, "reason": f"Milestone: {message_count} messages", "type": "summary" } return None def _check_patterns( self, context_state: Dict[str, Any], self_state: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """ Check for behavioral patterns that merit suggestions. """ # Get current focus focus = self_state.get("focus", "") # Check if user keeps asking similar questions (detected via focus) if focus and "repeated" in focus.lower(): return { "suggestion": "\n\n[Aside: I notice we keep coming back to this topic. Would it help to create a summary or action plan?]", "priority": 0.7, "reason": "Repeated topic detected", "type": "pattern" } # Check energy levels - if Lyra is low energy, maybe suggest break energy = self_state.get("energy", 0.8) if energy < 0.3: return { "suggestion": "\n\n[Aside: We've been at this for a while. Need a break or want to keep going?]", "priority": 0.65, "reason": f"Low energy ({energy:.2f})", "type": "pattern" } return None def format_suggestion(self, suggestion: Dict[str, Any]) -> str: """ Format suggestion for appending to response. Args: suggestion: Suggestion dict from analyze_session() Returns: Formatted string to append to response """ return suggestion.get("suggestion", "") def set_cooldown_duration(self, seconds: int) -> None: """ Update cooldown duration. Args: seconds: New cooldown duration """ self.cooldown_seconds = seconds logger.info(f"[PROACTIVE] Cooldown updated to {seconds}s") def reset_cooldown(self, session_id: str) -> None: """ Reset cooldown for a specific session. Args: session_id: Session to reset """ if session_id in self.last_suggestion_time: del self.last_suggestion_time[session_id] logger.info(f"[PROACTIVE] Cooldown reset for session {session_id}") def get_session_stats(self, session_id: str) -> Dict[str, Any]: """ Get stats for a session's proactive monitoring. Args: session_id: Session to check Returns: { "last_suggestion_time": timestamp or None, "seconds_since_last": int, "cooldown_active": bool, "cooldown_remaining": int } """ last_time = self.last_suggestion_time.get(session_id) if not last_time: return { "last_suggestion_time": None, "seconds_since_last": 0, "cooldown_active": False, "cooldown_remaining": 0 } seconds_since = int(time.time() - last_time) cooldown_active = seconds_since < self.cooldown_seconds cooldown_remaining = max(0, self.cooldown_seconds - seconds_since) return { "last_suggestion_time": last_time, "seconds_since_last": seconds_since, "cooldown_active": cooldown_active, "cooldown_remaining": cooldown_remaining } # Singleton instance _monitor_instance = None def get_proactive_monitor(min_priority: float = 0.6) -> ProactiveMonitor: """ Get singleton proactive monitor instance. Args: min_priority: Minimum priority threshold (only used on first call) Returns: ProactiveMonitor instance """ global _monitor_instance if _monitor_instance is None: _monitor_instance = ProactiveMonitor(min_priority=min_priority) return _monitor_instance