""" Autonomous Action Manager - executes safe, self-initiated actions. """ import logging import json from typing import Dict, List, Any, Optional from datetime import datetime logger = logging.getLogger(__name__) class AutonomousActionManager: """ Manages safe autonomous actions that Lyra can take without explicit user prompting. Whitelist of allowed actions: - create_memory: Store information in NeoMem - update_goal: Modify goal status - schedule_reminder: Create future reminder - summarize_session: Generate conversation summary - learn_topic: Add topic to learning queue - update_focus: Change current focus area """ def __init__(self): """Initialize action manager with whitelisted actions.""" self.allowed_actions = { "create_memory": self._create_memory, "update_goal": self._update_goal, "schedule_reminder": self._schedule_reminder, "summarize_session": self._summarize_session, "learn_topic": self._learn_topic, "update_focus": self._update_focus } self.action_log = [] # Track all actions for audit async def execute_action( self, action_type: str, parameters: Dict[str, Any], context: Dict[str, Any] ) -> Dict[str, Any]: """ Execute a single autonomous action. Args: action_type: Type of action (must be in whitelist) parameters: Action-specific parameters context: Current context state Returns: { "success": bool, "action": action_type, "result": action_result, "timestamp": ISO timestamp, "error": optional error message } """ # Safety check: action must be whitelisted if action_type not in self.allowed_actions: logger.error(f"[ACTIONS] Attempted to execute non-whitelisted action: {action_type}") return { "success": False, "action": action_type, "error": f"Action '{action_type}' not in whitelist", "timestamp": datetime.utcnow().isoformat() } try: logger.info(f"[ACTIONS] Executing autonomous action: {action_type}") # Execute the action action_func = self.allowed_actions[action_type] result = await action_func(parameters, context) # Log successful action action_record = { "success": True, "action": action_type, "result": result, "timestamp": datetime.utcnow().isoformat(), "parameters": parameters } self.action_log.append(action_record) logger.info(f"[ACTIONS] Action {action_type} completed successfully") return action_record except Exception as e: logger.error(f"[ACTIONS] Action {action_type} failed: {e}") error_record = { "success": False, "action": action_type, "error": str(e), "timestamp": datetime.utcnow().isoformat(), "parameters": parameters } self.action_log.append(error_record) return error_record async def execute_batch( self, actions: List[Dict[str, Any]], context: Dict[str, Any] ) -> List[Dict[str, Any]]: """ Execute multiple actions sequentially. Args: actions: List of {"action": str, "parameters": dict} context: Current context state Returns: List of action results """ results = [] for action_spec in actions: action_type = action_spec.get("action") parameters = action_spec.get("parameters", {}) result = await self.execute_action(action_type, parameters, context) results.append(result) # Stop on first failure if critical if not result["success"] and action_spec.get("critical", False): logger.warning(f"[ACTIONS] Critical action {action_type} failed, stopping batch") break return results # ======================================== # Whitelisted Action Implementations # ======================================== async def _create_memory( self, parameters: Dict[str, Any], context: Dict[str, Any] ) -> Dict[str, Any]: """ Create a memory entry in NeoMem. Parameters: - text: Memory content (required) - tags: Optional tags for memory - importance: 0.0-1.0 importance score """ text = parameters.get("text") if not text: raise ValueError("Memory text required") tags = parameters.get("tags", []) importance = parameters.get("importance", 0.5) session_id = context.get("session_id", "autonomous") # Import NeoMem client try: from memory.neomem_client import store_memory result = await store_memory( text=text, session_id=session_id, tags=tags, importance=importance ) return { "memory_id": result.get("id"), "text": text[:50] + "..." if len(text) > 50 else text } except ImportError: logger.warning("[ACTIONS] NeoMem client not available, simulating memory storage") return { "memory_id": "simulated", "text": text[:50] + "..." if len(text) > 50 else text, "note": "NeoMem not available, memory not persisted" } async def _update_goal( self, parameters: Dict[str, Any], context: Dict[str, Any] ) -> Dict[str, Any]: """ Update goal status in self-state. Parameters: - goal_id: Goal identifier (required) - status: New status (pending/in_progress/completed) - progress: Optional progress note """ goal_id = parameters.get("goal_id") if not goal_id: raise ValueError("goal_id required") status = parameters.get("status", "in_progress") progress = parameters.get("progress") # Import self-state manager from autonomy.self.state import get_self_state_instance state = get_self_state_instance() active_goals = state._state.get("active_goals", []) # Find and update goal updated = False for goal in active_goals: if isinstance(goal, dict) and goal.get("id") == goal_id: goal["status"] = status if progress: goal["progress"] = progress goal["updated_at"] = datetime.utcnow().isoformat() updated = True break if updated: state._save_state() return { "goal_id": goal_id, "status": status, "updated": True } else: return { "goal_id": goal_id, "updated": False, "note": "Goal not found" } async def _schedule_reminder( self, parameters: Dict[str, Any], context: Dict[str, Any] ) -> Dict[str, Any]: """ Schedule a future reminder. Parameters: - message: Reminder text (required) - delay_minutes: Minutes until reminder - priority: 0.0-1.0 priority score """ message = parameters.get("message") if not message: raise ValueError("Reminder message required") delay_minutes = parameters.get("delay_minutes", 60) priority = parameters.get("priority", 0.5) # For now, store in self-state's learning queue # In future: integrate with scheduler/cron system from autonomy.self.state import get_self_state_instance state = get_self_state_instance() reminder = { "type": "reminder", "message": message, "scheduled_at": datetime.utcnow().isoformat(), "trigger_at_minutes": delay_minutes, "priority": priority } # Add to learning queue as placeholder state._state.setdefault("reminders", []).append(reminder) state._save_state(state._state) # Pass state dict as argument logger.info(f"[ACTIONS] Reminder scheduled: {message} (in {delay_minutes}min)") return { "message": message, "delay_minutes": delay_minutes, "note": "Reminder stored in self-state (scheduler integration pending)" } async def _summarize_session( self, parameters: Dict[str, Any], context: Dict[str, Any] ) -> Dict[str, Any]: """ Generate a summary of current session. Parameters: - max_length: Max summary length in words - focus_topics: Optional list of topics to emphasize """ max_length = parameters.get("max_length", 200) session_id = context.get("session_id", "unknown") # Import summarizer (from deferred_summary or create simple one) try: from utils.deferred_summary import summarize_conversation summary = await summarize_conversation( session_id=session_id, max_words=max_length ) return { "summary": summary, "word_count": len(summary.split()) } except ImportError: # Fallback: simple summary message_count = context.get("message_count", 0) focus = context.get("monologue", {}).get("intent", "general") summary = f"Session {session_id}: {message_count} messages exchanged, focused on {focus}." return { "summary": summary, "word_count": len(summary.split()), "note": "Simple summary (full summarizer not available)" } async def _learn_topic( self, parameters: Dict[str, Any], context: Dict[str, Any] ) -> Dict[str, Any]: """ Add topic to learning queue. Parameters: - topic: Topic name (required) - reason: Why this topic - priority: 0.0-1.0 priority score """ topic = parameters.get("topic") if not topic: raise ValueError("Topic required") reason = parameters.get("reason", "autonomous learning") priority = parameters.get("priority", 0.5) # Import self-state manager from autonomy.self.state import get_self_state_instance state = get_self_state_instance() state.add_learning_goal(topic) # Only pass topic parameter logger.info(f"[ACTIONS] Added to learning queue: {topic} (reason: {reason})") return { "topic": topic, "reason": reason, "queue_position": len(state._state.get("learning_queue", [])) } async def _update_focus( self, parameters: Dict[str, Any], context: Dict[str, Any] ) -> Dict[str, Any]: """ Update current focus area. Parameters: - focus: New focus area (required) - reason: Why this focus """ focus = parameters.get("focus") if not focus: raise ValueError("Focus required") reason = parameters.get("reason", "autonomous update") # Import self-state manager from autonomy.self.state import get_self_state_instance state = get_self_state_instance() old_focus = state._state.get("focus", "none") state._state["focus"] = focus state._state["focus_updated_at"] = datetime.utcnow().isoformat() state._state["focus_reason"] = reason state._save_state(state._state) # Pass state dict as argument logger.info(f"[ACTIONS] Focus updated: {old_focus} -> {focus}") return { "old_focus": old_focus, "new_focus": focus, "reason": reason } # ======================================== # Utility Methods # ======================================== def get_allowed_actions(self) -> List[str]: """Get list of all allowed action types.""" return list(self.allowed_actions.keys()) def get_action_log(self, limit: int = 50) -> List[Dict[str, Any]]: """ Get recent action log. Args: limit: Max number of entries to return Returns: List of action records """ return self.action_log[-limit:] def clear_action_log(self) -> None: """Clear action log.""" self.action_log = [] logger.info("[ACTIONS] Action log cleared") def validate_action(self, action_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]: """ Validate an action without executing it. Args: action_type: Type of action parameters: Action parameters Returns: { "valid": bool, "action": action_type, "errors": [error messages] or [] } """ errors = [] # Check whitelist if action_type not in self.allowed_actions: errors.append(f"Action '{action_type}' not in whitelist") # Check required parameters (basic validation) if action_type == "create_memory" and not parameters.get("text"): errors.append("Memory 'text' parameter required") if action_type == "update_goal" and not parameters.get("goal_id"): errors.append("Goal 'goal_id' parameter required") if action_type == "schedule_reminder" and not parameters.get("message"): errors.append("Reminder 'message' parameter required") if action_type == "learn_topic" and not parameters.get("topic"): errors.append("Learning 'topic' parameter required") if action_type == "update_focus" and not parameters.get("focus"): errors.append("Focus 'focus' parameter required") return { "valid": len(errors) == 0, "action": action_type, "errors": errors } # Singleton instance _action_manager_instance = None def get_action_manager() -> AutonomousActionManager: """ Get singleton action manager instance. Returns: AutonomousActionManager instance """ global _action_manager_instance if _action_manager_instance is None: _action_manager_instance = AutonomousActionManager() return _action_manager_instance