From 193bf814ecb508e75cee7a94ebebb949adaaa16c Mon Sep 17 00:00:00 2001 From: serversdwn Date: Sun, 14 Dec 2025 14:43:08 -0500 Subject: [PATCH] autonomy phase 2 --- cortex/autonomy/actions/__init__.py | 1 + cortex/autonomy/actions/autonomous_actions.py | 480 +++++++++++++++++ cortex/autonomy/learning/__init__.py | 1 + cortex/autonomy/learning/pattern_learner.py | 383 ++++++++++++++ cortex/autonomy/proactive/__init__.py | 1 + cortex/autonomy/proactive/monitor.py | 321 ++++++++++++ cortex/autonomy/tools/__init__.py | 1 + cortex/autonomy/tools/decision_engine.py | 124 +++++ cortex/autonomy/tools/orchestrator.py | 354 +++++++++++++ cortex/data/self_state.json | 6 +- cortex/router.py | 95 +++- cortex/tests/test_autonomy_phase2.py | 495 ++++++++++++++++++ 12 files changed, 2258 insertions(+), 4 deletions(-) create mode 100644 cortex/autonomy/actions/__init__.py create mode 100644 cortex/autonomy/actions/autonomous_actions.py create mode 100644 cortex/autonomy/learning/__init__.py create mode 100644 cortex/autonomy/learning/pattern_learner.py create mode 100644 cortex/autonomy/proactive/__init__.py create mode 100644 cortex/autonomy/proactive/monitor.py create mode 100644 cortex/autonomy/tools/__init__.py create mode 100644 cortex/autonomy/tools/decision_engine.py create mode 100644 cortex/autonomy/tools/orchestrator.py create mode 100644 cortex/tests/test_autonomy_phase2.py diff --git a/cortex/autonomy/actions/__init__.py b/cortex/autonomy/actions/__init__.py new file mode 100644 index 0000000..f7f9355 --- /dev/null +++ b/cortex/autonomy/actions/__init__.py @@ -0,0 +1 @@ +"""Autonomous action execution system.""" diff --git a/cortex/autonomy/actions/autonomous_actions.py b/cortex/autonomy/actions/autonomous_actions.py new file mode 100644 index 0000000..98d573e --- /dev/null +++ b/cortex/autonomy/actions/autonomous_actions.py @@ -0,0 +1,480 @@ +""" +Autonomous Action Manager - executes safe, self-initiated actions. +""" + +import logging +import json +from typing import Dict, List, Any, Optional +from datetime import datetime + +logger = logging.getLogger(__name__) + + +class AutonomousActionManager: + """ + Manages safe autonomous actions that Lyra can take without explicit user prompting. + + Whitelist of allowed actions: + - create_memory: Store information in NeoMem + - update_goal: Modify goal status + - schedule_reminder: Create future reminder + - summarize_session: Generate conversation summary + - learn_topic: Add topic to learning queue + - update_focus: Change current focus area + """ + + def __init__(self): + """Initialize action manager with whitelisted actions.""" + self.allowed_actions = { + "create_memory": self._create_memory, + "update_goal": self._update_goal, + "schedule_reminder": self._schedule_reminder, + "summarize_session": self._summarize_session, + "learn_topic": self._learn_topic, + "update_focus": self._update_focus + } + + self.action_log = [] # Track all actions for audit + + async def execute_action( + self, + action_type: str, + parameters: Dict[str, Any], + context: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Execute a single autonomous action. + + Args: + action_type: Type of action (must be in whitelist) + parameters: Action-specific parameters + context: Current context state + + Returns: + { + "success": bool, + "action": action_type, + "result": action_result, + "timestamp": ISO timestamp, + "error": optional error message + } + """ + # Safety check: action must be whitelisted + if action_type not in self.allowed_actions: + logger.error(f"[ACTIONS] Attempted to execute non-whitelisted action: {action_type}") + return { + "success": False, + "action": action_type, + "error": f"Action '{action_type}' not in whitelist", + "timestamp": datetime.utcnow().isoformat() + } + + try: + logger.info(f"[ACTIONS] Executing autonomous action: {action_type}") + + # Execute the action + action_func = self.allowed_actions[action_type] + result = await action_func(parameters, context) + + # Log successful action + action_record = { + "success": True, + "action": action_type, + "result": result, + "timestamp": datetime.utcnow().isoformat(), + "parameters": parameters + } + + self.action_log.append(action_record) + logger.info(f"[ACTIONS] Action {action_type} completed successfully") + + return action_record + + except Exception as e: + logger.error(f"[ACTIONS] Action {action_type} failed: {e}") + + error_record = { + "success": False, + "action": action_type, + "error": str(e), + "timestamp": datetime.utcnow().isoformat(), + "parameters": parameters + } + + self.action_log.append(error_record) + return error_record + + async def execute_batch( + self, + actions: List[Dict[str, Any]], + context: Dict[str, Any] + ) -> List[Dict[str, Any]]: + """ + Execute multiple actions sequentially. + + Args: + actions: List of {"action": str, "parameters": dict} + context: Current context state + + Returns: + List of action results + """ + results = [] + + for action_spec in actions: + action_type = action_spec.get("action") + parameters = action_spec.get("parameters", {}) + + result = await self.execute_action(action_type, parameters, context) + results.append(result) + + # Stop on first failure if critical + if not result["success"] and action_spec.get("critical", False): + logger.warning(f"[ACTIONS] Critical action {action_type} failed, stopping batch") + break + + return results + + # ======================================== + # Whitelisted Action Implementations + # ======================================== + + async def _create_memory( + self, + parameters: Dict[str, Any], + context: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Create a memory entry in NeoMem. + + Parameters: + - text: Memory content (required) + - tags: Optional tags for memory + - importance: 0.0-1.0 importance score + """ + text = parameters.get("text") + if not text: + raise ValueError("Memory text required") + + tags = parameters.get("tags", []) + importance = parameters.get("importance", 0.5) + session_id = context.get("session_id", "autonomous") + + # Import NeoMem client + try: + from memory.neomem_client import store_memory + + result = await store_memory( + text=text, + session_id=session_id, + tags=tags, + importance=importance + ) + + return { + "memory_id": result.get("id"), + "text": text[:50] + "..." if len(text) > 50 else text + } + + except ImportError: + logger.warning("[ACTIONS] NeoMem client not available, simulating memory storage") + return { + "memory_id": "simulated", + "text": text[:50] + "..." if len(text) > 50 else text, + "note": "NeoMem not available, memory not persisted" + } + + async def _update_goal( + self, + parameters: Dict[str, Any], + context: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Update goal status in self-state. + + Parameters: + - goal_id: Goal identifier (required) + - status: New status (pending/in_progress/completed) + - progress: Optional progress note + """ + goal_id = parameters.get("goal_id") + if not goal_id: + raise ValueError("goal_id required") + + status = parameters.get("status", "in_progress") + progress = parameters.get("progress") + + # Import self-state manager + from autonomy.self.state import get_self_state_instance + + state = get_self_state_instance() + active_goals = state._state.get("active_goals", []) + + # Find and update goal + updated = False + for goal in active_goals: + if isinstance(goal, dict) and goal.get("id") == goal_id: + goal["status"] = status + if progress: + goal["progress"] = progress + goal["updated_at"] = datetime.utcnow().isoformat() + updated = True + break + + if updated: + state._save_state() + return { + "goal_id": goal_id, + "status": status, + "updated": True + } + else: + return { + "goal_id": goal_id, + "updated": False, + "note": "Goal not found" + } + + async def _schedule_reminder( + self, + parameters: Dict[str, Any], + context: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Schedule a future reminder. + + Parameters: + - message: Reminder text (required) + - delay_minutes: Minutes until reminder + - priority: 0.0-1.0 priority score + """ + message = parameters.get("message") + if not message: + raise ValueError("Reminder message required") + + delay_minutes = parameters.get("delay_minutes", 60) + priority = parameters.get("priority", 0.5) + + # For now, store in self-state's learning queue + # In future: integrate with scheduler/cron system + from autonomy.self.state import get_self_state_instance + + state = get_self_state_instance() + + reminder = { + "type": "reminder", + "message": message, + "scheduled_at": datetime.utcnow().isoformat(), + "trigger_at_minutes": delay_minutes, + "priority": priority + } + + # Add to learning queue as placeholder + state._state.setdefault("reminders", []).append(reminder) + state._save_state(state._state) # Pass state dict as argument + + logger.info(f"[ACTIONS] Reminder scheduled: {message} (in {delay_minutes}min)") + + return { + "message": message, + "delay_minutes": delay_minutes, + "note": "Reminder stored in self-state (scheduler integration pending)" + } + + async def _summarize_session( + self, + parameters: Dict[str, Any], + context: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Generate a summary of current session. + + Parameters: + - max_length: Max summary length in words + - focus_topics: Optional list of topics to emphasize + """ + max_length = parameters.get("max_length", 200) + session_id = context.get("session_id", "unknown") + + # Import summarizer (from deferred_summary or create simple one) + try: + from utils.deferred_summary import summarize_conversation + + summary = await summarize_conversation( + session_id=session_id, + max_words=max_length + ) + + return { + "summary": summary, + "word_count": len(summary.split()) + } + + except ImportError: + # Fallback: simple summary + message_count = context.get("message_count", 0) + focus = context.get("monologue", {}).get("intent", "general") + + summary = f"Session {session_id}: {message_count} messages exchanged, focused on {focus}." + + return { + "summary": summary, + "word_count": len(summary.split()), + "note": "Simple summary (full summarizer not available)" + } + + async def _learn_topic( + self, + parameters: Dict[str, Any], + context: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Add topic to learning queue. + + Parameters: + - topic: Topic name (required) + - reason: Why this topic + - priority: 0.0-1.0 priority score + """ + topic = parameters.get("topic") + if not topic: + raise ValueError("Topic required") + + reason = parameters.get("reason", "autonomous learning") + priority = parameters.get("priority", 0.5) + + # Import self-state manager + from autonomy.self.state import get_self_state_instance + + state = get_self_state_instance() + state.add_learning_goal(topic) # Only pass topic parameter + + logger.info(f"[ACTIONS] Added to learning queue: {topic} (reason: {reason})") + + return { + "topic": topic, + "reason": reason, + "queue_position": len(state._state.get("learning_queue", [])) + } + + async def _update_focus( + self, + parameters: Dict[str, Any], + context: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Update current focus area. + + Parameters: + - focus: New focus area (required) + - reason: Why this focus + """ + focus = parameters.get("focus") + if not focus: + raise ValueError("Focus required") + + reason = parameters.get("reason", "autonomous update") + + # Import self-state manager + from autonomy.self.state import get_self_state_instance + + state = get_self_state_instance() + old_focus = state._state.get("focus", "none") + + state._state["focus"] = focus + state._state["focus_updated_at"] = datetime.utcnow().isoformat() + state._state["focus_reason"] = reason + state._save_state(state._state) # Pass state dict as argument + + logger.info(f"[ACTIONS] Focus updated: {old_focus} -> {focus}") + + return { + "old_focus": old_focus, + "new_focus": focus, + "reason": reason + } + + # ======================================== + # Utility Methods + # ======================================== + + def get_allowed_actions(self) -> List[str]: + """Get list of all allowed action types.""" + return list(self.allowed_actions.keys()) + + def get_action_log(self, limit: int = 50) -> List[Dict[str, Any]]: + """ + Get recent action log. + + Args: + limit: Max number of entries to return + + Returns: + List of action records + """ + return self.action_log[-limit:] + + def clear_action_log(self) -> None: + """Clear action log.""" + self.action_log = [] + logger.info("[ACTIONS] Action log cleared") + + def validate_action(self, action_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]: + """ + Validate an action without executing it. + + Args: + action_type: Type of action + parameters: Action parameters + + Returns: + { + "valid": bool, + "action": action_type, + "errors": [error messages] or [] + } + """ + errors = [] + + # Check whitelist + if action_type not in self.allowed_actions: + errors.append(f"Action '{action_type}' not in whitelist") + + # Check required parameters (basic validation) + if action_type == "create_memory" and not parameters.get("text"): + errors.append("Memory 'text' parameter required") + + if action_type == "update_goal" and not parameters.get("goal_id"): + errors.append("Goal 'goal_id' parameter required") + + if action_type == "schedule_reminder" and not parameters.get("message"): + errors.append("Reminder 'message' parameter required") + + if action_type == "learn_topic" and not parameters.get("topic"): + errors.append("Learning 'topic' parameter required") + + if action_type == "update_focus" and not parameters.get("focus"): + errors.append("Focus 'focus' parameter required") + + return { + "valid": len(errors) == 0, + "action": action_type, + "errors": errors + } + + +# Singleton instance +_action_manager_instance = None + + +def get_action_manager() -> AutonomousActionManager: + """ + Get singleton action manager instance. + + Returns: + AutonomousActionManager instance + """ + global _action_manager_instance + if _action_manager_instance is None: + _action_manager_instance = AutonomousActionManager() + return _action_manager_instance diff --git a/cortex/autonomy/learning/__init__.py b/cortex/autonomy/learning/__init__.py new file mode 100644 index 0000000..aa193cb --- /dev/null +++ b/cortex/autonomy/learning/__init__.py @@ -0,0 +1 @@ +"""Pattern learning and adaptation system.""" diff --git a/cortex/autonomy/learning/pattern_learner.py b/cortex/autonomy/learning/pattern_learner.py new file mode 100644 index 0000000..61dd74c --- /dev/null +++ b/cortex/autonomy/learning/pattern_learner.py @@ -0,0 +1,383 @@ +""" +Pattern Learning System - learns from interaction patterns to improve autonomy. +""" + +import logging +import json +import os +from typing import Dict, List, Any, Optional +from datetime import datetime +from collections import defaultdict + +logger = logging.getLogger(__name__) + + +class PatternLearner: + """ + Learns from interaction patterns to improve Lyra's autonomous behavior. + + Tracks: + - Topic frequencies (what users talk about) + - Time-of-day patterns (when users interact) + - User preferences (how users like responses) + - Successful response strategies (what works well) + """ + + def __init__(self, patterns_file: str = "/app/data/learned_patterns.json"): + """ + Initialize pattern learner. + + Args: + patterns_file: Path to persistent patterns storage + """ + self.patterns_file = patterns_file + self.patterns = self._load_patterns() + + def _load_patterns(self) -> Dict[str, Any]: + """Load patterns from disk.""" + if os.path.exists(self.patterns_file): + try: + with open(self.patterns_file, 'r') as f: + patterns = json.load(f) + logger.info(f"[PATTERN_LEARNER] Loaded patterns from {self.patterns_file}") + return patterns + except Exception as e: + logger.error(f"[PATTERN_LEARNER] Failed to load patterns: {e}") + + # Initialize empty patterns + return { + "topic_frequencies": {}, + "time_patterns": {}, + "user_preferences": {}, + "successful_strategies": {}, + "interaction_count": 0, + "last_updated": datetime.utcnow().isoformat() + } + + def _save_patterns(self) -> None: + """Save patterns to disk.""" + try: + # Ensure directory exists + os.makedirs(os.path.dirname(self.patterns_file), exist_ok=True) + + self.patterns["last_updated"] = datetime.utcnow().isoformat() + + with open(self.patterns_file, 'w') as f: + json.dump(self.patterns, f, indent=2) + + logger.debug(f"[PATTERN_LEARNER] Saved patterns to {self.patterns_file}") + + except Exception as e: + logger.error(f"[PATTERN_LEARNER] Failed to save patterns: {e}") + + async def learn_from_interaction( + self, + user_prompt: str, + response: str, + monologue: Dict[str, Any], + context: Dict[str, Any] + ) -> None: + """ + Learn from a single interaction. + + Args: + user_prompt: User's message + response: Lyra's response + monologue: Inner monologue analysis + context: Full context state + """ + self.patterns["interaction_count"] += 1 + + # Learn topic frequencies + self._learn_topics(user_prompt, monologue) + + # Learn time patterns + self._learn_time_patterns() + + # Learn user preferences + self._learn_preferences(monologue, context) + + # Learn successful strategies + self._learn_strategies(monologue, response, context) + + # Save periodically (every 10 interactions) + if self.patterns["interaction_count"] % 10 == 0: + self._save_patterns() + + def _learn_topics(self, user_prompt: str, monologue: Dict[str, Any]) -> None: + """Track topic frequencies.""" + intent = monologue.get("intent", "unknown") + + # Increment topic counter + topic_freq = self.patterns["topic_frequencies"] + topic_freq[intent] = topic_freq.get(intent, 0) + 1 + + # Extract keywords (simple approach - words > 5 chars) + keywords = [word.lower() for word in user_prompt.split() if len(word) > 5] + + for keyword in keywords: + topic_freq[f"keyword:{keyword}"] = topic_freq.get(f"keyword:{keyword}", 0) + 1 + + logger.debug(f"[PATTERN_LEARNER] Topic learned: {intent}") + + def _learn_time_patterns(self) -> None: + """Track time-of-day patterns.""" + now = datetime.utcnow() + hour = now.hour + + # Track interactions by hour + time_patterns = self.patterns["time_patterns"] + hour_key = f"hour_{hour:02d}" + time_patterns[hour_key] = time_patterns.get(hour_key, 0) + 1 + + # Track day of week + day_key = f"day_{now.strftime('%A').lower()}" + time_patterns[day_key] = time_patterns.get(day_key, 0) + 1 + + def _learn_preferences(self, monologue: Dict[str, Any], context: Dict[str, Any]) -> None: + """Learn user preferences from detected tone and depth.""" + tone = monologue.get("tone", "neutral") + depth = monologue.get("depth", "medium") + + prefs = self.patterns["user_preferences"] + + # Track preferred tone + prefs.setdefault("tone_counts", {}) + prefs["tone_counts"][tone] = prefs["tone_counts"].get(tone, 0) + 1 + + # Track preferred depth + prefs.setdefault("depth_counts", {}) + prefs["depth_counts"][depth] = prefs["depth_counts"].get(depth, 0) + 1 + + def _learn_strategies( + self, + monologue: Dict[str, Any], + response: str, + context: Dict[str, Any] + ) -> None: + """ + Learn which response strategies are successful. + + Success indicators: + - Executive was consulted and plan generated + - Response length matches depth request + - Tone matches request + """ + intent = monologue.get("intent", "unknown") + executive_used = context.get("executive_plan") is not None + + strategies = self.patterns["successful_strategies"] + strategies.setdefault(intent, {}) + + # Track executive usage for this intent + if executive_used: + key = f"{intent}:executive_used" + strategies.setdefault(key, 0) + strategies[key] += 1 + + # Track response length patterns + response_length = len(response.split()) + depth = monologue.get("depth", "medium") + + length_key = f"{depth}:avg_words" + if length_key not in strategies: + strategies[length_key] = response_length + else: + # Running average + strategies[length_key] = (strategies[length_key] + response_length) / 2 + + # ======================================== + # Pattern Analysis and Recommendations + # ======================================== + + def get_top_topics(self, limit: int = 10) -> List[tuple]: + """ + Get most frequent topics. + + Args: + limit: Max number of topics to return + + Returns: + List of (topic, count) tuples, sorted by count + """ + topics = self.patterns["topic_frequencies"] + sorted_topics = sorted(topics.items(), key=lambda x: x[1], reverse=True) + return sorted_topics[:limit] + + def get_preferred_tone(self) -> str: + """ + Get user's most preferred tone. + + Returns: + Preferred tone string + """ + prefs = self.patterns["user_preferences"] + tone_counts = prefs.get("tone_counts", {}) + + if not tone_counts: + return "neutral" + + return max(tone_counts.items(), key=lambda x: x[1])[0] + + def get_preferred_depth(self) -> str: + """ + Get user's most preferred response depth. + + Returns: + Preferred depth string + """ + prefs = self.patterns["user_preferences"] + depth_counts = prefs.get("depth_counts", {}) + + if not depth_counts: + return "medium" + + return max(depth_counts.items(), key=lambda x: x[1])[0] + + def get_peak_hours(self, limit: int = 3) -> List[int]: + """ + Get peak interaction hours. + + Args: + limit: Number of top hours to return + + Returns: + List of hours (0-23) + """ + time_patterns = self.patterns["time_patterns"] + hour_counts = {k: v for k, v in time_patterns.items() if k.startswith("hour_")} + + if not hour_counts: + return [] + + sorted_hours = sorted(hour_counts.items(), key=lambda x: x[1], reverse=True) + top_hours = sorted_hours[:limit] + + # Extract hour numbers + return [int(h[0].split("_")[1]) for h in top_hours] + + def should_use_executive(self, intent: str) -> bool: + """ + Recommend whether to use executive for given intent based on patterns. + + Args: + intent: Intent type + + Returns: + True if executive is recommended + """ + strategies = self.patterns["successful_strategies"] + key = f"{intent}:executive_used" + + # If we've used executive for this intent >= 3 times, recommend it + return strategies.get(key, 0) >= 3 + + def get_recommended_response_length(self, depth: str) -> int: + """ + Get recommended response length in words for given depth. + + Args: + depth: Depth level (short/medium/deep) + + Returns: + Recommended word count + """ + strategies = self.patterns["successful_strategies"] + key = f"{depth}:avg_words" + + avg_length = strategies.get(key, None) + + if avg_length: + return int(avg_length) + + # Defaults if no pattern learned + defaults = { + "short": 50, + "medium": 150, + "deep": 300 + } + + return defaults.get(depth, 150) + + def get_insights(self) -> Dict[str, Any]: + """ + Get high-level insights from learned patterns. + + Returns: + { + "total_interactions": int, + "top_topics": [(topic, count), ...], + "preferred_tone": str, + "preferred_depth": str, + "peak_hours": [hours], + "learning_recommendations": [str] + } + """ + recommendations = [] + + # Check if user consistently prefers certain settings + preferred_tone = self.get_preferred_tone() + preferred_depth = self.get_preferred_depth() + + if preferred_tone != "neutral": + recommendations.append(f"User prefers {preferred_tone} tone") + + if preferred_depth != "medium": + recommendations.append(f"User prefers {preferred_depth} depth responses") + + # Check for recurring topics + top_topics = self.get_top_topics(limit=3) + if top_topics: + top_topic = top_topics[0][0] + recommendations.append(f"Consider adding '{top_topic}' to learning queue") + + return { + "total_interactions": self.patterns["interaction_count"], + "top_topics": self.get_top_topics(limit=5), + "preferred_tone": preferred_tone, + "preferred_depth": preferred_depth, + "peak_hours": self.get_peak_hours(limit=3), + "learning_recommendations": recommendations + } + + def reset_patterns(self) -> None: + """Reset all learned patterns (use with caution).""" + self.patterns = { + "topic_frequencies": {}, + "time_patterns": {}, + "user_preferences": {}, + "successful_strategies": {}, + "interaction_count": 0, + "last_updated": datetime.utcnow().isoformat() + } + self._save_patterns() + logger.warning("[PATTERN_LEARNER] Patterns reset") + + def export_patterns(self) -> Dict[str, Any]: + """ + Export all patterns for analysis. + + Returns: + Complete patterns dict + """ + return self.patterns.copy() + + +# Singleton instance +_learner_instance = None + + +def get_pattern_learner(patterns_file: str = "/app/data/learned_patterns.json") -> PatternLearner: + """ + Get singleton pattern learner instance. + + Args: + patterns_file: Path to patterns file (only used on first call) + + Returns: + PatternLearner instance + """ + global _learner_instance + if _learner_instance is None: + _learner_instance = PatternLearner(patterns_file=patterns_file) + return _learner_instance diff --git a/cortex/autonomy/proactive/__init__.py b/cortex/autonomy/proactive/__init__.py new file mode 100644 index 0000000..056c046 --- /dev/null +++ b/cortex/autonomy/proactive/__init__.py @@ -0,0 +1 @@ +"""Proactive monitoring and suggestion system.""" diff --git a/cortex/autonomy/proactive/monitor.py b/cortex/autonomy/proactive/monitor.py new file mode 100644 index 0000000..c324709 --- /dev/null +++ b/cortex/autonomy/proactive/monitor.py @@ -0,0 +1,321 @@ +""" +Proactive Context Monitor - detects opportunities for autonomous suggestions. +""" + +import logging +import time +from typing import Dict, List, Any, Optional +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + + +class ProactiveMonitor: + """ + Monitors conversation context and detects opportunities for proactive suggestions. + + Triggers: + - Long silence → Check-in + - Learning queue + high curiosity → Suggest exploration + - Active goals → Progress reminders + - Conversation milestones → Offer summary + - Pattern detection → Helpful suggestions + """ + + def __init__(self, min_priority: float = 0.6): + """ + Initialize proactive monitor. + + Args: + min_priority: Minimum priority for suggestions (0.0-1.0) + """ + self.min_priority = min_priority + self.last_suggestion_time = {} # session_id -> timestamp + self.cooldown_seconds = 300 # 5 minutes between proactive suggestions + + async def analyze_session( + self, + session_id: str, + context_state: Dict[str, Any], + self_state: Dict[str, Any] + ) -> Optional[Dict[str, Any]]: + """ + Analyze session for proactive suggestion opportunities. + + Args: + session_id: Current session ID + context_state: Full context including message history + self_state: Lyra's current self-state + + Returns: + { + "suggestion": "text to append to response", + "priority": 0.0-1.0, + "reason": "why this suggestion", + "type": "check_in | learning | goal_reminder | summary | pattern" + } + or None if no suggestion + """ + # Check cooldown + if not self._check_cooldown(session_id): + logger.debug(f"[PROACTIVE] Session {session_id} in cooldown, skipping") + return None + + suggestions = [] + + # Check 1: Long silence detection + silence_suggestion = self._check_long_silence(context_state) + if silence_suggestion: + suggestions.append(silence_suggestion) + + # Check 2: Learning queue + high curiosity + learning_suggestion = self._check_learning_opportunity(self_state) + if learning_suggestion: + suggestions.append(learning_suggestion) + + # Check 3: Active goals reminder + goal_suggestion = self._check_active_goals(self_state, context_state) + if goal_suggestion: + suggestions.append(goal_suggestion) + + # Check 4: Conversation milestones + milestone_suggestion = self._check_conversation_milestone(context_state) + if milestone_suggestion: + suggestions.append(milestone_suggestion) + + # Check 5: Pattern-based suggestions + pattern_suggestion = self._check_patterns(context_state, self_state) + if pattern_suggestion: + suggestions.append(pattern_suggestion) + + # Filter by priority and return highest + valid_suggestions = [s for s in suggestions if s["priority"] >= self.min_priority] + + if not valid_suggestions: + return None + + # Return highest priority suggestion + best_suggestion = max(valid_suggestions, key=lambda x: x["priority"]) + + # Update cooldown timer + self._update_cooldown(session_id) + + logger.info(f"[PROACTIVE] Suggestion generated: {best_suggestion['type']} (priority: {best_suggestion['priority']:.2f})") + + return best_suggestion + + def _check_cooldown(self, session_id: str) -> bool: + """Check if session is past cooldown period.""" + if session_id not in self.last_suggestion_time: + return True + + elapsed = time.time() - self.last_suggestion_time[session_id] + return elapsed >= self.cooldown_seconds + + def _update_cooldown(self, session_id: str) -> None: + """Update cooldown timer for session.""" + self.last_suggestion_time[session_id] = time.time() + + def _check_long_silence(self, context_state: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """ + Check if user has been silent for a long time. + """ + minutes_since_last = context_state.get("minutes_since_last_msg", 0) + + # If > 30 minutes, suggest check-in + if minutes_since_last > 30: + return { + "suggestion": "\n\n[Aside: I'm still here if you need anything!]", + "priority": 0.7, + "reason": f"User silent for {minutes_since_last:.0f} minutes", + "type": "check_in" + } + + return None + + def _check_learning_opportunity(self, self_state: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """ + Check if Lyra has learning queue items and high curiosity. + """ + learning_queue = self_state.get("learning_queue", []) + curiosity = self_state.get("curiosity", 0.5) + + # If curiosity > 0.7 and learning queue exists + if curiosity > 0.7 and learning_queue: + topic = learning_queue[0] if learning_queue else "new topics" + return { + "suggestion": f"\n\n[Aside: I've been curious about {topic} lately. Would you like to explore it together?]", + "priority": 0.65, + "reason": f"High curiosity ({curiosity:.2f}) and learning queue present", + "type": "learning" + } + + return None + + def _check_active_goals( + self, + self_state: Dict[str, Any], + context_state: Dict[str, Any] + ) -> Optional[Dict[str, Any]]: + """ + Check if there are active goals worth reminding about. + """ + active_goals = self_state.get("active_goals", []) + + if not active_goals: + return None + + # Check if we've had multiple messages without goal progress + message_count = context_state.get("message_count", 0) + + # Every 10 messages, consider goal reminder + if message_count % 10 == 0 and message_count > 0: + goal = active_goals[0] # First active goal + goal_name = goal if isinstance(goal, str) else goal.get("name", "your goal") + + return { + "suggestion": f"\n\n[Aside: Still thinking about {goal_name}. Let me know if you want to work on it.]", + "priority": 0.6, + "reason": f"Active goal present, {message_count} messages since start", + "type": "goal_reminder" + } + + return None + + def _check_conversation_milestone(self, context_state: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """ + Check for conversation milestones (e.g., every 50 messages). + """ + message_count = context_state.get("message_count", 0) + + # Every 50 messages, offer summary + if message_count > 0 and message_count % 50 == 0: + return { + "suggestion": f"\n\n[Aside: We've exchanged {message_count} messages! Would you like a summary of our conversation?]", + "priority": 0.65, + "reason": f"Milestone: {message_count} messages", + "type": "summary" + } + + return None + + def _check_patterns( + self, + context_state: Dict[str, Any], + self_state: Dict[str, Any] + ) -> Optional[Dict[str, Any]]: + """ + Check for behavioral patterns that merit suggestions. + """ + # Get current focus + focus = self_state.get("focus", "") + + # Check if user keeps asking similar questions (detected via focus) + if focus and "repeated" in focus.lower(): + return { + "suggestion": "\n\n[Aside: I notice we keep coming back to this topic. Would it help to create a summary or action plan?]", + "priority": 0.7, + "reason": "Repeated topic detected", + "type": "pattern" + } + + # Check energy levels - if Lyra is low energy, maybe suggest break + energy = self_state.get("energy", 0.8) + if energy < 0.3: + return { + "suggestion": "\n\n[Aside: We've been at this for a while. Need a break or want to keep going?]", + "priority": 0.65, + "reason": f"Low energy ({energy:.2f})", + "type": "pattern" + } + + return None + + def format_suggestion(self, suggestion: Dict[str, Any]) -> str: + """ + Format suggestion for appending to response. + + Args: + suggestion: Suggestion dict from analyze_session() + + Returns: + Formatted string to append to response + """ + return suggestion.get("suggestion", "") + + def set_cooldown_duration(self, seconds: int) -> None: + """ + Update cooldown duration. + + Args: + seconds: New cooldown duration + """ + self.cooldown_seconds = seconds + logger.info(f"[PROACTIVE] Cooldown updated to {seconds}s") + + def reset_cooldown(self, session_id: str) -> None: + """ + Reset cooldown for a specific session. + + Args: + session_id: Session to reset + """ + if session_id in self.last_suggestion_time: + del self.last_suggestion_time[session_id] + logger.info(f"[PROACTIVE] Cooldown reset for session {session_id}") + + def get_session_stats(self, session_id: str) -> Dict[str, Any]: + """ + Get stats for a session's proactive monitoring. + + Args: + session_id: Session to check + + Returns: + { + "last_suggestion_time": timestamp or None, + "seconds_since_last": int, + "cooldown_active": bool, + "cooldown_remaining": int + } + """ + last_time = self.last_suggestion_time.get(session_id) + + if not last_time: + return { + "last_suggestion_time": None, + "seconds_since_last": 0, + "cooldown_active": False, + "cooldown_remaining": 0 + } + + seconds_since = int(time.time() - last_time) + cooldown_active = seconds_since < self.cooldown_seconds + cooldown_remaining = max(0, self.cooldown_seconds - seconds_since) + + return { + "last_suggestion_time": last_time, + "seconds_since_last": seconds_since, + "cooldown_active": cooldown_active, + "cooldown_remaining": cooldown_remaining + } + + +# Singleton instance +_monitor_instance = None + + +def get_proactive_monitor(min_priority: float = 0.6) -> ProactiveMonitor: + """ + Get singleton proactive monitor instance. + + Args: + min_priority: Minimum priority threshold (only used on first call) + + Returns: + ProactiveMonitor instance + """ + global _monitor_instance + if _monitor_instance is None: + _monitor_instance = ProactiveMonitor(min_priority=min_priority) + return _monitor_instance diff --git a/cortex/autonomy/tools/__init__.py b/cortex/autonomy/tools/__init__.py new file mode 100644 index 0000000..510fad9 --- /dev/null +++ b/cortex/autonomy/tools/__init__.py @@ -0,0 +1 @@ +"""Autonomous tool invocation system.""" diff --git a/cortex/autonomy/tools/decision_engine.py b/cortex/autonomy/tools/decision_engine.py new file mode 100644 index 0000000..3247436 --- /dev/null +++ b/cortex/autonomy/tools/decision_engine.py @@ -0,0 +1,124 @@ +""" +Tool Decision Engine - decides which tools to invoke autonomously. +""" + +import logging +from typing import Dict, List, Any + +logger = logging.getLogger(__name__) + + +class ToolDecisionEngine: + """Decides which tools to invoke based on context analysis.""" + + async def analyze_tool_needs( + self, + user_prompt: str, + monologue: Dict[str, Any], + context_state: Dict[str, Any], + available_tools: List[str] + ) -> Dict[str, Any]: + """ + Analyze if tools should be invoked and which ones. + + Args: + user_prompt: User's message + monologue: Inner monologue analysis + context_state: Full context + available_tools: List of available tools + + Returns: + { + "should_invoke_tools": bool, + "tools_to_invoke": [ + { + "tool": "RAG | WEB | WEATHER | etc", + "query": "search query", + "reason": "why this tool", + "priority": 0.0-1.0 + }, + ... + ], + "confidence": 0.0-1.0 + } + """ + + tools_to_invoke = [] + + # Check for memory/context needs + if any(word in user_prompt.lower() for word in [ + "remember", "you said", "we discussed", "earlier", "before", + "last time", "previously", "what did" + ]): + tools_to_invoke.append({ + "tool": "RAG", + "query": user_prompt, + "reason": "User references past conversation", + "priority": 0.9 + }) + + # Check for web search needs + if any(word in user_prompt.lower() for word in [ + "current", "latest", "news", "today", "what's happening", + "look up", "search for", "find information", "recent" + ]): + tools_to_invoke.append({ + "tool": "WEB", + "query": user_prompt, + "reason": "Requires current information", + "priority": 0.8 + }) + + # Check for weather needs + if any(word in user_prompt.lower() for word in [ + "weather", "temperature", "forecast", "rain", "sunny", "climate" + ]): + tools_to_invoke.append({ + "tool": "WEATHER", + "query": user_prompt, + "reason": "Weather information requested", + "priority": 0.95 + }) + + # Check for code-related needs + if any(word in user_prompt.lower() for word in [ + "code", "function", "debug", "implement", "algorithm", + "programming", "script", "syntax" + ]): + if "CODEBRAIN" in available_tools: + tools_to_invoke.append({ + "tool": "CODEBRAIN", + "query": user_prompt, + "reason": "Code-related task", + "priority": 0.85 + }) + + # Proactive RAG for complex queries (based on monologue) + intent = monologue.get("intent", "") if monologue else "" + if monologue and monologue.get("consult_executive"): + # Complex query - might benefit from context + if not any(t["tool"] == "RAG" for t in tools_to_invoke): + tools_to_invoke.append({ + "tool": "RAG", + "query": user_prompt, + "reason": "Complex query benefits from context", + "priority": 0.6 + }) + + # Sort by priority + tools_to_invoke.sort(key=lambda x: x["priority"], reverse=True) + + max_priority = max([t["priority"] for t in tools_to_invoke]) if tools_to_invoke else 0.0 + + result = { + "should_invoke_tools": len(tools_to_invoke) > 0, + "tools_to_invoke": tools_to_invoke, + "confidence": max_priority + } + + if tools_to_invoke: + logger.info(f"[TOOL_DECISION] Autonomous tool invocation recommended: {len(tools_to_invoke)} tools") + for tool in tools_to_invoke: + logger.info(f" - {tool['tool']} (priority: {tool['priority']:.2f}): {tool['reason']}") + + return result diff --git a/cortex/autonomy/tools/orchestrator.py b/cortex/autonomy/tools/orchestrator.py new file mode 100644 index 0000000..9658721 --- /dev/null +++ b/cortex/autonomy/tools/orchestrator.py @@ -0,0 +1,354 @@ +""" +Tool Orchestrator - executes autonomous tool invocations asynchronously. +""" + +import asyncio +import logging +from typing import Dict, List, Any, Optional +import os + +logger = logging.getLogger(__name__) + + +class ToolOrchestrator: + """Orchestrates async tool execution and result aggregation.""" + + def __init__(self, tool_timeout: int = 30): + """ + Initialize orchestrator. + + Args: + tool_timeout: Max seconds per tool call (default 30) + """ + self.tool_timeout = tool_timeout + self.available_tools = self._discover_tools() + + def _discover_tools(self) -> Dict[str, Any]: + """Discover available tool modules.""" + tools = {} + + # Import tool modules as they become available + try: + from memory.neomem_client import search_neomem + tools["RAG"] = search_neomem + logger.debug("[ORCHESTRATOR] RAG tool available") + except ImportError: + logger.debug("[ORCHESTRATOR] RAG tool not available") + + try: + from integrations.web_search import web_search + tools["WEB"] = web_search + logger.debug("[ORCHESTRATOR] WEB tool available") + except ImportError: + logger.debug("[ORCHESTRATOR] WEB tool not available") + + try: + from integrations.weather import get_weather + tools["WEATHER"] = get_weather + logger.debug("[ORCHESTRATOR] WEATHER tool available") + except ImportError: + logger.debug("[ORCHESTRATOR] WEATHER tool not available") + + try: + from integrations.codebrain import query_codebrain + tools["CODEBRAIN"] = query_codebrain + logger.debug("[ORCHESTRATOR] CODEBRAIN tool available") + except ImportError: + logger.debug("[ORCHESTRATOR] CODEBRAIN tool not available") + + return tools + + async def execute_tools( + self, + tools_to_invoke: List[Dict[str, Any]], + context_state: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Execute multiple tools asynchronously. + + Args: + tools_to_invoke: List of tool specs from decision engine + [{"tool": "RAG", "query": "...", "reason": "...", "priority": 0.9}, ...] + context_state: Full context for tool execution + + Returns: + { + "results": { + "RAG": {...}, + "WEB": {...}, + ... + }, + "execution_summary": { + "tools_invoked": ["RAG", "WEB"], + "successful": ["RAG"], + "failed": ["WEB"], + "total_time_ms": 1234 + } + } + """ + import time + start_time = time.time() + + logger.info(f"[ORCHESTRATOR] Executing {len(tools_to_invoke)} tools asynchronously") + + # Create tasks for each tool + tasks = [] + tool_names = [] + + for tool_spec in tools_to_invoke: + tool_name = tool_spec["tool"] + query = tool_spec["query"] + + if tool_name in self.available_tools: + task = self._execute_single_tool(tool_name, query, context_state) + tasks.append(task) + tool_names.append(tool_name) + logger.debug(f"[ORCHESTRATOR] Queued {tool_name}: {query[:50]}...") + else: + logger.warning(f"[ORCHESTRATOR] Tool {tool_name} not available, skipping") + + # Execute all tools concurrently with timeout + results = {} + successful = [] + failed = [] + + if tasks: + try: + # Wait for all tasks with global timeout + completed = await asyncio.wait_for( + asyncio.gather(*tasks, return_exceptions=True), + timeout=self.tool_timeout + ) + + # Process results + for tool_name, result in zip(tool_names, completed): + if isinstance(result, Exception): + logger.error(f"[ORCHESTRATOR] {tool_name} failed: {result}") + results[tool_name] = {"error": str(result), "success": False} + failed.append(tool_name) + else: + logger.info(f"[ORCHESTRATOR] {tool_name} completed successfully") + results[tool_name] = result + successful.append(tool_name) + + except asyncio.TimeoutError: + logger.error(f"[ORCHESTRATOR] Global timeout ({self.tool_timeout}s) exceeded") + for tool_name in tool_names: + if tool_name not in results: + results[tool_name] = {"error": "timeout", "success": False} + failed.append(tool_name) + + end_time = time.time() + total_time_ms = int((end_time - start_time) * 1000) + + execution_summary = { + "tools_invoked": tool_names, + "successful": successful, + "failed": failed, + "total_time_ms": total_time_ms + } + + logger.info(f"[ORCHESTRATOR] Execution complete: {len(successful)}/{len(tool_names)} successful in {total_time_ms}ms") + + return { + "results": results, + "execution_summary": execution_summary + } + + async def _execute_single_tool( + self, + tool_name: str, + query: str, + context_state: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Execute a single tool with error handling. + + Args: + tool_name: Name of tool (RAG, WEB, etc.) + query: Query string for the tool + context_state: Context for tool execution + + Returns: + Tool-specific result dict + """ + tool_func = self.available_tools.get(tool_name) + if not tool_func: + raise ValueError(f"Tool {tool_name} not available") + + try: + logger.debug(f"[ORCHESTRATOR] Invoking {tool_name}...") + + # Different tools have different signatures - adapt as needed + if tool_name == "RAG": + result = await self._invoke_rag(tool_func, query, context_state) + elif tool_name == "WEB": + result = await self._invoke_web(tool_func, query) + elif tool_name == "WEATHER": + result = await self._invoke_weather(tool_func, query) + elif tool_name == "CODEBRAIN": + result = await self._invoke_codebrain(tool_func, query, context_state) + else: + # Generic invocation + result = await tool_func(query) + + return { + "success": True, + "tool": tool_name, + "query": query, + "data": result + } + + except Exception as e: + logger.error(f"[ORCHESTRATOR] {tool_name} execution failed: {e}") + raise + + async def _invoke_rag(self, func, query: str, context: Dict[str, Any]) -> Any: + """Invoke RAG tool (NeoMem search).""" + session_id = context.get("session_id", "unknown") + # RAG searches memory for relevant past interactions + try: + results = await func(query, limit=5, session_id=session_id) + return results + except Exception as e: + logger.warning(f"[ORCHESTRATOR] RAG invocation failed, returning empty: {e}") + return [] + + async def _invoke_web(self, func, query: str) -> Any: + """Invoke web search tool.""" + try: + results = await func(query, max_results=5) + return results + except Exception as e: + logger.warning(f"[ORCHESTRATOR] WEB invocation failed: {e}") + return {"error": str(e), "results": []} + + async def _invoke_weather(self, func, query: str) -> Any: + """Invoke weather tool.""" + # Extract location from query (simple heuristic) + # In future: use LLM to extract location + try: + location = self._extract_location(query) + results = await func(location) + return results + except Exception as e: + logger.warning(f"[ORCHESTRATOR] WEATHER invocation failed: {e}") + return {"error": str(e)} + + async def _invoke_codebrain(self, func, query: str, context: Dict[str, Any]) -> Any: + """Invoke codebrain tool.""" + try: + results = await func(query, context=context) + return results + except Exception as e: + logger.warning(f"[ORCHESTRATOR] CODEBRAIN invocation failed: {e}") + return {"error": str(e)} + + def _extract_location(self, query: str) -> str: + """ + Extract location from weather query. + Simple heuristic - in future use LLM. + """ + # Common location indicators + indicators = ["in ", "at ", "for ", "weather in ", "temperature in "] + + query_lower = query.lower() + for indicator in indicators: + if indicator in query_lower: + # Get text after indicator + parts = query_lower.split(indicator, 1) + if len(parts) > 1: + location = parts[1].strip().split()[0] # First word after indicator + return location + + # Default fallback + return "current location" + + def format_results_for_context(self, orchestrator_result: Dict[str, Any]) -> str: + """ + Format tool results for inclusion in context/prompt. + + Args: + orchestrator_result: Output from execute_tools() + + Returns: + Formatted string for prompt injection + """ + results = orchestrator_result.get("results", {}) + summary = orchestrator_result.get("execution_summary", {}) + + if not results: + return "" + + formatted = "\n=== AUTONOMOUS TOOL RESULTS ===\n" + + for tool_name, tool_result in results.items(): + if tool_result.get("success", False): + formatted += f"\n[{tool_name}]\n" + data = tool_result.get("data", {}) + + # Format based on tool type + if tool_name == "RAG": + formatted += self._format_rag_results(data) + elif tool_name == "WEB": + formatted += self._format_web_results(data) + elif tool_name == "WEATHER": + formatted += self._format_weather_results(data) + elif tool_name == "CODEBRAIN": + formatted += self._format_codebrain_results(data) + else: + formatted += f"{data}\n" + else: + formatted += f"\n[{tool_name}] - Failed: {tool_result.get('error', 'unknown')}\n" + + formatted += f"\n(Tools executed in {summary.get('total_time_ms', 0)}ms)\n" + formatted += "=" * 40 + "\n" + + return formatted + + def _format_rag_results(self, data: Any) -> str: + """Format RAG/memory search results.""" + if not data: + return "No relevant memories found.\n" + + formatted = "Relevant memories:\n" + for i, item in enumerate(data[:3], 1): # Top 3 + text = item.get("text", item.get("content", str(item))) + formatted += f" {i}. {text[:100]}...\n" + return formatted + + def _format_web_results(self, data: Any) -> str: + """Format web search results.""" + if isinstance(data, dict) and data.get("error"): + return f"Web search failed: {data['error']}\n" + + results = data.get("results", []) if isinstance(data, dict) else data + if not results: + return "No web results found.\n" + + formatted = "Web search results:\n" + for i, item in enumerate(results[:3], 1): # Top 3 + title = item.get("title", "No title") + snippet = item.get("snippet", item.get("description", "")) + formatted += f" {i}. {title}\n {snippet[:100]}...\n" + return formatted + + def _format_weather_results(self, data: Any) -> str: + """Format weather results.""" + if isinstance(data, dict) and data.get("error"): + return f"Weather lookup failed: {data['error']}\n" + + # Assuming weather API returns temp, conditions, etc. + temp = data.get("temperature", "unknown") + conditions = data.get("conditions", "unknown") + location = data.get("location", "requested location") + + return f"Weather for {location}: {temp}, {conditions}\n" + + def _format_codebrain_results(self, data: Any) -> str: + """Format codebrain results.""" + if isinstance(data, dict) and data.get("error"): + return f"Codebrain failed: {data['error']}\n" + + # Format code-related results + return f"{data}\n" diff --git a/cortex/data/self_state.json b/cortex/data/self_state.json index 1f6871d..b9fc83f 100644 --- a/cortex/data/self_state.json +++ b/cortex/data/self_state.json @@ -3,9 +3,9 @@ "energy": 0.8, "focus": "user_request", "confidence": 0.7, - "curiosity": 0.6000000000000001, - "last_updated": "2025-12-14T06:36:21.236816", - "interaction_count": 3, + "curiosity": 0.7000000000000002, + "last_updated": "2025-12-14T19:29:49.051207", + "interaction_count": 5, "learning_queue": [], "active_goals": [], "preferences": { diff --git a/cortex/router.py b/cortex/router.py index 48bb790..75d514d 100644 --- a/cortex/router.py +++ b/cortex/router.py @@ -140,6 +140,55 @@ async def run_reason(req: ReasonRequest): logger.warning(f"[EXECUTIVE] Planning failed: {e}") executive_plan = None + # ---------------------------------------------------------------- + # STAGE 0.8 — Autonomous Tool Invocation + # ---------------------------------------------------------------- + tool_results = None + autonomous_enabled = os.getenv("ENABLE_AUTONOMOUS_TOOLS", "true").lower() == "true" + tool_confidence_threshold = float(os.getenv("AUTONOMOUS_TOOL_CONFIDENCE_THRESHOLD", "0.6")) + + if autonomous_enabled and inner_result: + if VERBOSE_DEBUG: + logger.debug("[STAGE 0.8] Analyzing autonomous tool needs...") + + try: + from autonomy.tools.decision_engine import ToolDecisionEngine + from autonomy.tools.orchestrator import ToolOrchestrator + + # Analyze which tools to invoke + decision_engine = ToolDecisionEngine() + tool_decision = await decision_engine.analyze_tool_needs( + user_prompt=req.user_prompt, + monologue=inner_result, + context_state=context_state, + available_tools=["RAG", "WEB", "WEATHER", "CODEBRAIN"] + ) + + # Execute tools if confidence threshold met + if tool_decision["should_invoke_tools"] and tool_decision["confidence"] >= tool_confidence_threshold: + orchestrator = ToolOrchestrator(tool_timeout=30) + tool_results = await orchestrator.execute_tools( + tools_to_invoke=tool_decision["tools_to_invoke"], + context_state=context_state + ) + + # Format results for context injection + tool_context = orchestrator.format_results_for_context(tool_results) + context_state["autonomous_tool_results"] = tool_context + + if VERBOSE_DEBUG: + summary = tool_results.get("execution_summary", {}) + logger.debug(f"[STAGE 0.8] Tools executed: {summary.get('successful', [])} succeeded") + else: + if VERBOSE_DEBUG: + logger.debug(f"[STAGE 0.8] No tools invoked (confidence: {tool_decision.get('confidence', 0):.2f})") + + except Exception as e: + logger.warning(f"[STAGE 0.8] Autonomous tool invocation failed: {e}") + if VERBOSE_DEBUG: + import traceback + traceback.print_exc() + # ---------------------------------------------------------------- # STAGE 1 — Intake summary # ---------------------------------------------------------------- @@ -217,7 +266,7 @@ async def run_reason(req: ReasonRequest): update_last_assistant_message(req.session_id, persona_answer) # ---------------------------------------------------------------- - # STAGE 6.5 — Self-state update + # STAGE 6.5 — Self-state update & Pattern Learning # ---------------------------------------------------------------- try: from autonomy.self.analyzer import analyze_and_update_state @@ -230,6 +279,50 @@ async def run_reason(req: ReasonRequest): except Exception as e: logger.warning(f"[SELF_STATE] Update failed: {e}") + # Pattern learning + try: + from autonomy.learning.pattern_learner import get_pattern_learner + learner = get_pattern_learner() + await learner.learn_from_interaction( + user_prompt=req.user_prompt, + response=persona_answer, + monologue=inner_result or {}, + context=context_state + ) + except Exception as e: + logger.warning(f"[PATTERN_LEARNER] Learning failed: {e}") + + # ---------------------------------------------------------------- + # STAGE 7 — Proactive Monitoring & Suggestions + # ---------------------------------------------------------------- + proactive_enabled = os.getenv("ENABLE_PROACTIVE_MONITORING", "true").lower() == "true" + proactive_min_priority = float(os.getenv("PROACTIVE_SUGGESTION_MIN_PRIORITY", "0.6")) + + if proactive_enabled: + try: + from autonomy.proactive.monitor import get_proactive_monitor + from autonomy.self.state import load_self_state + + monitor = get_proactive_monitor(min_priority=proactive_min_priority) + self_state = load_self_state() + + suggestion = await monitor.analyze_session( + session_id=req.session_id, + context_state=context_state, + self_state=self_state + ) + + # Append suggestion to response if exists + if suggestion: + suggestion_text = monitor.format_suggestion(suggestion) + persona_answer += suggestion_text + + if VERBOSE_DEBUG: + logger.debug(f"[STAGE 7] Proactive suggestion added: {suggestion['type']} (priority: {suggestion['priority']:.2f})") + + except Exception as e: + logger.warning(f"[STAGE 7] Proactive monitoring failed: {e}") + if VERBOSE_DEBUG: logger.debug(f"\n{'='*80}") logger.debug(f"[PIPELINE COMPLETE] Session: {req.session_id}") diff --git a/cortex/tests/test_autonomy_phase2.py b/cortex/tests/test_autonomy_phase2.py new file mode 100644 index 0000000..aa5956a --- /dev/null +++ b/cortex/tests/test_autonomy_phase2.py @@ -0,0 +1,495 @@ +""" +Integration tests for Phase 2 autonomy features. +Tests autonomous tool invocation, proactive monitoring, actions, and pattern learning. +""" + +import asyncio +import json +import sys +import os + +# Add parent directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +# Override self-state file path for testing +os.environ["SELF_STATE_FILE"] = "/tmp/test_self_state.json" + +from autonomy.tools.decision_engine import ToolDecisionEngine +from autonomy.tools.orchestrator import ToolOrchestrator +from autonomy.proactive.monitor import ProactiveMonitor +from autonomy.actions.autonomous_actions import AutonomousActionManager +from autonomy.learning.pattern_learner import PatternLearner +from autonomy.self.state import load_self_state, get_self_state_instance + + +async def test_tool_decision_engine(): + """Test autonomous tool decision making.""" + print("\n" + "="*60) + print("TEST 1: Tool Decision Engine") + print("="*60) + + engine = ToolDecisionEngine() + + # Test 1a: Memory reference detection + result = await engine.analyze_tool_needs( + user_prompt="What did we discuss earlier about Python?", + monologue={"intent": "clarification", "consult_executive": False}, + context_state={}, + available_tools=["RAG", "WEB", "WEATHER"] + ) + + assert result["should_invoke_tools"], "Should invoke tools for memory reference" + assert any(t["tool"] == "RAG" for t in result["tools_to_invoke"]), "Should recommend RAG" + assert result["confidence"] > 0.8, f"Confidence should be high for clear memory reference: {result['confidence']}" + + print(f" ✓ Memory reference detection passed") + print(f" Tools: {[t['tool'] for t in result['tools_to_invoke']]}") + print(f" Confidence: {result['confidence']:.2f}") + + # Test 1b: Web search detection + result = await engine.analyze_tool_needs( + user_prompt="What's the latest news about AI developments?", + monologue={"intent": "information_seeking", "consult_executive": False}, + context_state={}, + available_tools=["RAG", "WEB", "WEATHER"] + ) + + assert result["should_invoke_tools"], "Should invoke tools for current info request" + assert any(t["tool"] == "WEB" for t in result["tools_to_invoke"]), "Should recommend WEB" + + print(f" ✓ Web search detection passed") + print(f" Tools: {[t['tool'] for t in result['tools_to_invoke']]}") + + # Test 1c: Weather detection + result = await engine.analyze_tool_needs( + user_prompt="What's the weather like today in Boston?", + monologue={"intent": "information_seeking", "consult_executive": False}, + context_state={}, + available_tools=["RAG", "WEB", "WEATHER"] + ) + + assert result["should_invoke_tools"], "Should invoke tools for weather query" + assert any(t["tool"] == "WEATHER" for t in result["tools_to_invoke"]), "Should recommend WEATHER" + + print(f" ✓ Weather detection passed") + + # Test 1d: Proactive RAG for complex queries + result = await engine.analyze_tool_needs( + user_prompt="Design a microservices architecture", + monologue={"intent": "technical_implementation", "consult_executive": True}, + context_state={}, + available_tools=["RAG", "WEB", "CODEBRAIN"] + ) + + assert result["should_invoke_tools"], "Should proactively invoke tools for complex queries" + rag_tools = [t for t in result["tools_to_invoke"] if t["tool"] == "RAG"] + assert len(rag_tools) > 0, "Should include proactive RAG" + + print(f" ✓ Proactive RAG detection passed") + print(f" Reason: {rag_tools[0]['reason']}") + + print("\n✓ Tool Decision Engine tests passed\n") + return result + + +async def test_tool_orchestrator(): + """Test tool orchestration (mock mode).""" + print("\n" + "="*60) + print("TEST 2: Tool Orchestrator (Mock Mode)") + print("="*60) + + orchestrator = ToolOrchestrator(tool_timeout=5) + + # Since actual tools may not be available, test the orchestrator structure + print(f" Available tools: {list(orchestrator.available_tools.keys())}") + + # Test with tools_to_invoke (will fail gracefully if tools unavailable) + tools_to_invoke = [ + {"tool": "RAG", "query": "test query", "reason": "testing", "priority": 0.9} + ] + + result = await orchestrator.execute_tools( + tools_to_invoke=tools_to_invoke, + context_state={"session_id": "test"} + ) + + assert "results" in result, "Should return results dict" + assert "execution_summary" in result, "Should return execution summary" + + summary = result["execution_summary"] + assert "tools_invoked" in summary, "Summary should include tools_invoked" + assert "total_time_ms" in summary, "Summary should include timing" + + print(f" ✓ Orchestrator structure valid") + print(f" Summary: {summary}") + + # Test result formatting + formatted = orchestrator.format_results_for_context(result) + assert isinstance(formatted, str), "Should format results as string" + + print(f" ✓ Result formatting works") + print(f" Formatted length: {len(formatted)} chars") + + print("\n✓ Tool Orchestrator tests passed\n") + return result + + +async def test_proactive_monitor(): + """Test proactive monitoring and suggestions.""" + print("\n" + "="*60) + print("TEST 3: Proactive Monitor") + print("="*60) + + monitor = ProactiveMonitor(min_priority=0.6) + + # Test 3a: Long silence detection + context_state = { + "message_count": 5, + "minutes_since_last_msg": 35 # > 30 minutes + } + + self_state = load_self_state() + + suggestion = await monitor.analyze_session( + session_id="test_silence", + context_state=context_state, + self_state=self_state + ) + + assert suggestion is not None, "Should generate suggestion for long silence" + assert suggestion["type"] == "check_in", f"Should be check_in type: {suggestion['type']}" + assert suggestion["priority"] >= 0.6, "Priority should meet threshold" + + print(f" ✓ Long silence detection passed") + print(f" Type: {suggestion['type']}, Priority: {suggestion['priority']:.2f}") + print(f" Suggestion: {suggestion['suggestion'][:50]}...") + + # Test 3b: Learning opportunity (high curiosity) + self_state["curiosity"] = 0.8 + self_state["learning_queue"] = ["quantum computing", "rust programming"] + + # Reset cooldown for this test + monitor.reset_cooldown("test_learning") + + suggestion = await monitor.analyze_session( + session_id="test_learning", + context_state={"message_count": 3, "minutes_since_last_msg": 2}, + self_state=self_state + ) + + assert suggestion is not None, "Should generate learning suggestion" + assert suggestion["type"] == "learning", f"Should be learning type: {suggestion['type']}" + + print(f" ✓ Learning opportunity detection passed") + print(f" Suggestion: {suggestion['suggestion'][:70]}...") + + # Test 3c: Conversation milestone + monitor.reset_cooldown("test_milestone") + + # Reset curiosity to avoid learning suggestion taking precedence + self_state["curiosity"] = 0.5 + self_state["learning_queue"] = [] + + suggestion = await monitor.analyze_session( + session_id="test_milestone", + context_state={"message_count": 50, "minutes_since_last_msg": 1}, + self_state=self_state + ) + + assert suggestion is not None, "Should generate milestone suggestion" + # Note: learning or summary both valid - check it's a reasonable suggestion + assert suggestion["type"] in ["summary", "learning", "check_in"], f"Should be valid type: {suggestion['type']}" + + print(f" ✓ Conversation milestone detection passed (type: {suggestion['type']})") + + # Test 3d: Cooldown mechanism + # Try to get another suggestion immediately (should be blocked) + suggestion2 = await monitor.analyze_session( + session_id="test_milestone", + context_state={"message_count": 51, "minutes_since_last_msg": 1}, + self_state=self_state + ) + + assert suggestion2 is None, "Should not generate suggestion during cooldown" + + print(f" ✓ Cooldown mechanism working") + + # Check stats + stats = monitor.get_session_stats("test_milestone") + assert stats["cooldown_active"], "Cooldown should be active" + print(f" Cooldown remaining: {stats['cooldown_remaining']}s") + + print("\n✓ Proactive Monitor tests passed\n") + return suggestion + + +async def test_autonomous_actions(): + """Test autonomous action execution.""" + print("\n" + "="*60) + print("TEST 4: Autonomous Actions") + print("="*60) + + manager = AutonomousActionManager() + + # Test 4a: List allowed actions + allowed = manager.get_allowed_actions() + assert "create_memory" in allowed, "Should have create_memory action" + assert "update_goal" in allowed, "Should have update_goal action" + assert "learn_topic" in allowed, "Should have learn_topic action" + + print(f" ✓ Allowed actions: {allowed}") + + # Test 4b: Validate actions + validation = manager.validate_action("create_memory", {"text": "test memory"}) + assert validation["valid"], "Should validate correct action" + + print(f" ✓ Action validation passed") + + # Test 4c: Execute learn_topic action + result = await manager.execute_action( + action_type="learn_topic", + parameters={"topic": "rust programming", "reason": "testing", "priority": 0.8}, + context={"session_id": "test"} + ) + + assert result["success"], f"Action should succeed: {result.get('error', 'unknown')}" + assert "topic" in result["result"], "Should return topic info" + + print(f" ✓ learn_topic action executed") + print(f" Topic: {result['result']['topic']}") + print(f" Queue position: {result['result']['queue_position']}") + + # Test 4d: Execute update_focus action + result = await manager.execute_action( + action_type="update_focus", + parameters={"focus": "autonomy_testing", "reason": "running tests"}, + context={"session_id": "test"} + ) + + assert result["success"], "update_focus should succeed" + + print(f" ✓ update_focus action executed") + print(f" New focus: {result['result']['new_focus']}") + + # Test 4e: Reject non-whitelisted action + result = await manager.execute_action( + action_type="delete_all_files", # NOT in whitelist + parameters={}, + context={"session_id": "test"} + ) + + assert not result["success"], "Should reject non-whitelisted action" + assert "not in whitelist" in result["error"], "Should indicate whitelist violation" + + print(f" ✓ Non-whitelisted action rejected") + + # Test 4f: Action log + log = manager.get_action_log(limit=10) + assert len(log) >= 2, f"Should have logged multiple actions (got {len(log)})" + + print(f" ✓ Action log contains {len(log)} entries") + + print("\n✓ Autonomous Actions tests passed\n") + return result + + +async def test_pattern_learner(): + """Test pattern learning system.""" + print("\n" + "="*60) + print("TEST 5: Pattern Learner") + print("="*60) + + # Use temp file for testing + test_file = "/tmp/test_patterns.json" + learner = PatternLearner(patterns_file=test_file) + + # Test 5a: Learn from multiple interactions + for i in range(5): + await learner.learn_from_interaction( + user_prompt=f"Help me with Python coding task {i}", + response=f"Here's help with task {i}...", + monologue={"intent": "coding_help", "tone": "focused", "depth": "medium"}, + context={"session_id": "test", "executive_plan": None} + ) + + print(f" ✓ Learned from 5 interactions") + + # Test 5b: Get top topics + top_topics = learner.get_top_topics(limit=5) + assert len(top_topics) > 0, "Should have learned topics" + assert "coding_help" == top_topics[0][0], "coding_help should be top topic" + + print(f" ✓ Top topics: {[t[0] for t in top_topics[:3]]}") + + # Test 5c: Get preferred tone + preferred_tone = learner.get_preferred_tone() + assert preferred_tone == "focused", "Should detect focused as preferred tone" + + print(f" ✓ Preferred tone: {preferred_tone}") + + # Test 5d: Get preferred depth + preferred_depth = learner.get_preferred_depth() + assert preferred_depth == "medium", "Should detect medium as preferred depth" + + print(f" ✓ Preferred depth: {preferred_depth}") + + # Test 5e: Get insights + insights = learner.get_insights() + assert insights["total_interactions"] == 5, "Should track interaction count" + assert insights["preferred_tone"] == "focused", "Insights should include tone" + + print(f" ✓ Insights generated:") + print(f" Total interactions: {insights['total_interactions']}") + print(f" Recommendations: {insights['learning_recommendations']}") + + # Test 5f: Export patterns + exported = learner.export_patterns() + assert "topic_frequencies" in exported, "Should export all patterns" + + print(f" ✓ Patterns exported ({len(exported)} keys)") + + # Cleanup + if os.path.exists(test_file): + os.remove(test_file) + + print("\n✓ Pattern Learner tests passed\n") + return insights + + +async def test_end_to_end_autonomy(): + """Test complete autonomous flow.""" + print("\n" + "="*60) + print("TEST 6: End-to-End Autonomy Flow") + print("="*60) + + # Simulate a complex user query that triggers multiple autonomous systems + user_prompt = "Remember what we discussed about machine learning? I need current research on transformers." + + monologue = { + "intent": "technical_research", + "tone": "focused", + "depth": "deep", + "consult_executive": True + } + + context_state = { + "session_id": "e2e_test", + "message_count": 15, + "minutes_since_last_msg": 5 + } + + print(f" User prompt: {user_prompt}") + print(f" Monologue intent: {monologue['intent']}") + + # Step 1: Tool decision engine + engine = ToolDecisionEngine() + tool_decision = await engine.analyze_tool_needs( + user_prompt=user_prompt, + monologue=monologue, + context_state=context_state, + available_tools=["RAG", "WEB", "CODEBRAIN"] + ) + + print(f"\n Step 1: Tool Decision") + print(f" Should invoke: {tool_decision['should_invoke_tools']}") + print(f" Tools: {[t['tool'] for t in tool_decision['tools_to_invoke']]}") + assert tool_decision["should_invoke_tools"], "Should invoke tools" + assert len(tool_decision["tools_to_invoke"]) >= 2, "Should recommend multiple tools (RAG + WEB)" + + # Step 2: Pattern learning + learner = PatternLearner(patterns_file="/tmp/e2e_test_patterns.json") + await learner.learn_from_interaction( + user_prompt=user_prompt, + response="Here's information about transformers...", + monologue=monologue, + context=context_state + ) + + print(f"\n Step 2: Pattern Learning") + top_topics = learner.get_top_topics(limit=3) + print(f" Learned topics: {[t[0] for t in top_topics]}") + + # Step 3: Autonomous action + action_manager = AutonomousActionManager() + action_result = await action_manager.execute_action( + action_type="learn_topic", + parameters={"topic": "transformer architectures", "reason": "user interest detected"}, + context=context_state + ) + + print(f"\n Step 3: Autonomous Action") + print(f" Action: learn_topic") + print(f" Success: {action_result['success']}") + + # Step 4: Proactive monitoring (won't trigger due to low message count) + monitor = ProactiveMonitor(min_priority=0.6) + monitor.reset_cooldown("e2e_test") + + suggestion = await monitor.analyze_session( + session_id="e2e_test", + context_state=context_state, + self_state=load_self_state() + ) + + print(f"\n Step 4: Proactive Monitoring") + print(f" Suggestion: {suggestion['type'] if suggestion else 'None (expected for low message count)'}") + + # Cleanup + if os.path.exists("/tmp/e2e_test_patterns.json"): + os.remove("/tmp/e2e_test_patterns.json") + + print("\n✓ End-to-End Autonomy Flow tests passed\n") + return True + + +async def run_all_tests(): + """Run all Phase 2 tests.""" + print("\n" + "="*60) + print("PHASE 2 AUTONOMY TESTS") + print("="*60) + + try: + # Test 1: Tool Decision Engine + await test_tool_decision_engine() + + # Test 2: Tool Orchestrator + await test_tool_orchestrator() + + # Test 3: Proactive Monitor + await test_proactive_monitor() + + # Test 4: Autonomous Actions + await test_autonomous_actions() + + # Test 5: Pattern Learner + await test_pattern_learner() + + # Test 6: End-to-End + await test_end_to_end_autonomy() + + print("\n" + "="*60) + print("ALL PHASE 2 TESTS PASSED ✓") + print("="*60) + + print("\nPhase 2 Features Validated:") + print(" ✓ Autonomous tool decision making") + print(" ✓ Tool orchestration and execution") + print(" ✓ Proactive monitoring and suggestions") + print(" ✓ Safe autonomous actions") + print(" ✓ Pattern learning and adaptation") + print(" ✓ End-to-end autonomous flow") + + return True + + except Exception as e: + print("\n" + "="*60) + print(f"TEST FAILED: {e}") + print("="*60) + import traceback + traceback.print_exc() + return False + + +if __name__ == "__main__": + success = asyncio.run(run_all_tests()) + sys.exit(0 if success else 1)