- Added `trillium.py` for searching and creating notes with Trillium's ETAPI. - Implemented `search_notes` and `create_note` functions with appropriate error handling and validation. feat: Add web search functionality using DuckDuckGo - Introduced `web_search.py` for performing web searches without API keys. - Implemented `search_web` function with result handling and validation. feat: Create provider-agnostic function caller for iterative tool calling - Developed `function_caller.py` to manage LLM interactions with tools. - Implemented iterative calling logic with error handling and tool execution. feat: Establish a tool registry for managing available tools - Created `registry.py` to define and manage tool availability and execution. - Integrated feature flags for enabling/disabling tools based on environment variables. feat: Implement event streaming for tool calling processes - Added `stream_events.py` to manage Server-Sent Events (SSE) for tool calling. - Enabled real-time updates during tool execution for enhanced user experience. test: Add tests for tool calling system components - Created `test_tools.py` to validate functionality of code execution, web search, and tool registry. - Implemented asynchronous tests to ensure proper execution and result handling. chore: Add Dockerfile for sandbox environment setup - Created `Dockerfile` to set up a Python environment with necessary dependencies for code execution. chore: Add debug regex script for testing XML parsing - Introduced `debug_regex.py` to validate regex patterns against XML tool calls. chore: Add HTML template for displaying thinking stream events - Created `test_thinking_stream.html` for visualizing tool calling events in a user-friendly format. test: Add tests for OllamaAdapter XML parsing - Developed `test_ollama_parser.py` to validate XML parsing with various test cases, including malformed XML.
236 lines
8.9 KiB
Python
236 lines
8.9 KiB
Python
"""
|
|
Provider-agnostic function caller with iterative tool calling loop.
|
|
|
|
This module implements the iterative loop that allows LLMs to call tools
|
|
multiple times until they have the information they need to answer the user.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from typing import Dict, List, Optional
|
|
from llm.llm_router import call_llm, TOOL_ADAPTERS, BACKENDS
|
|
from .registry import get_registry
|
|
from .stream_events import get_stream_manager
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FunctionCaller:
|
|
"""Provider-agnostic iterative tool calling loop.
|
|
|
|
This class orchestrates the back-and-forth between the LLM and tools:
|
|
1. Call LLM with tools available
|
|
2. If LLM requests tool calls, execute them
|
|
3. Add results to conversation
|
|
4. Repeat until LLM is done or max iterations reached
|
|
"""
|
|
|
|
def __init__(self, backend: str, temperature: float = 0.7):
|
|
"""Initialize function caller.
|
|
|
|
Args:
|
|
backend: LLM backend to use ("OPENAI", "OLLAMA", etc.)
|
|
temperature: Temperature for LLM calls
|
|
"""
|
|
self.backend = backend
|
|
self.temperature = temperature
|
|
self.registry = get_registry()
|
|
self.max_iterations = int(os.getenv("MAX_TOOL_ITERATIONS", "5"))
|
|
|
|
# Resolve adapter for this backend
|
|
self.adapter = self._get_adapter()
|
|
|
|
def _get_adapter(self):
|
|
"""Get the appropriate adapter for this backend."""
|
|
adapter = TOOL_ADAPTERS.get(self.backend)
|
|
|
|
# For PRIMARY/SECONDARY/FALLBACK, determine adapter based on provider
|
|
if adapter is None and self.backend in ["PRIMARY", "SECONDARY", "FALLBACK"]:
|
|
cfg = BACKENDS.get(self.backend, {})
|
|
provider = cfg.get("provider", "").lower()
|
|
|
|
if provider == "openai":
|
|
adapter = TOOL_ADAPTERS["OPENAI"]
|
|
elif provider == "ollama":
|
|
adapter = TOOL_ADAPTERS["OLLAMA"]
|
|
elif provider == "mi50":
|
|
adapter = TOOL_ADAPTERS["MI50"]
|
|
|
|
return adapter
|
|
|
|
async def call_with_tools(
|
|
self,
|
|
messages: List[Dict],
|
|
max_tokens: int = 2048,
|
|
session_id: Optional[str] = None
|
|
) -> Dict:
|
|
"""Execute LLM with iterative tool calling.
|
|
|
|
Args:
|
|
messages: Conversation history
|
|
max_tokens: Maximum tokens for LLM response
|
|
session_id: Optional session ID for streaming events
|
|
|
|
Returns:
|
|
dict: {
|
|
"content": str, # Final response
|
|
"iterations": int, # Number of iterations
|
|
"tool_calls": list, # All tool calls made
|
|
"messages": list, # Full conversation history
|
|
"truncated": bool (optional) # True if max iterations reached
|
|
}
|
|
"""
|
|
logger.info(f"🔍 FunctionCaller.call_with_tools() invoked with {len(messages)} messages")
|
|
tools = self.registry.get_tool_definitions()
|
|
logger.info(f"🔍 Got {len(tools or [])} tool definitions from registry")
|
|
|
|
# Get stream manager for emitting events
|
|
stream_manager = get_stream_manager()
|
|
should_stream = session_id and stream_manager.has_subscribers(session_id)
|
|
|
|
# If no tools are enabled, just call LLM directly
|
|
if not tools:
|
|
logger.warning("FunctionCaller invoked but no tools are enabled")
|
|
response = await call_llm(
|
|
messages=messages,
|
|
backend=self.backend,
|
|
temperature=self.temperature,
|
|
max_tokens=max_tokens
|
|
)
|
|
return {
|
|
"content": response,
|
|
"iterations": 1,
|
|
"tool_calls": [],
|
|
"messages": messages + [{"role": "assistant", "content": response}]
|
|
}
|
|
|
|
conversation = messages.copy()
|
|
all_tool_calls = []
|
|
|
|
for iteration in range(self.max_iterations):
|
|
logger.info(f"Tool calling iteration {iteration + 1}/{self.max_iterations}")
|
|
|
|
# Emit thinking event
|
|
if should_stream:
|
|
await stream_manager.emit(session_id, "thinking", {
|
|
"message": f"🤔 Thinking... (iteration {iteration + 1}/{self.max_iterations})"
|
|
})
|
|
|
|
# Call LLM with tools
|
|
try:
|
|
response = await call_llm(
|
|
messages=conversation,
|
|
backend=self.backend,
|
|
temperature=self.temperature,
|
|
max_tokens=max_tokens,
|
|
tools=tools,
|
|
tool_choice="auto",
|
|
return_adapter_response=True
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"LLM call failed: {str(e)}")
|
|
if should_stream:
|
|
await stream_manager.emit(session_id, "error", {
|
|
"message": f"❌ Error: {str(e)}"
|
|
})
|
|
return {
|
|
"content": f"Error calling LLM: {str(e)}",
|
|
"iterations": iteration + 1,
|
|
"tool_calls": all_tool_calls,
|
|
"messages": conversation,
|
|
"error": True
|
|
}
|
|
|
|
# Add assistant message to conversation
|
|
if response.get("content"):
|
|
conversation.append({
|
|
"role": "assistant",
|
|
"content": response["content"]
|
|
})
|
|
|
|
# Check for tool calls
|
|
tool_calls = response.get("tool_calls")
|
|
logger.debug(f"Response from LLM: content_length={len(response.get('content', ''))}, tool_calls={tool_calls}")
|
|
if not tool_calls:
|
|
# No more tool calls - LLM is done
|
|
logger.info(f"Tool calling complete after {iteration + 1} iterations")
|
|
if should_stream:
|
|
await stream_manager.emit(session_id, "done", {
|
|
"message": "✅ Complete!",
|
|
"final_answer": response["content"]
|
|
})
|
|
return {
|
|
"content": response["content"],
|
|
"iterations": iteration + 1,
|
|
"tool_calls": all_tool_calls,
|
|
"messages": conversation
|
|
}
|
|
|
|
# Execute each tool call
|
|
logger.info(f"Executing {len(tool_calls)} tool call(s)")
|
|
for tool_call in tool_calls:
|
|
all_tool_calls.append(tool_call)
|
|
|
|
tool_name = tool_call.get("name")
|
|
tool_args = tool_call.get("arguments", {})
|
|
tool_id = tool_call.get("id", "unknown")
|
|
|
|
logger.info(f"Calling tool: {tool_name} with args: {tool_args}")
|
|
|
|
# Emit tool call event
|
|
if should_stream:
|
|
await stream_manager.emit(session_id, "tool_call", {
|
|
"tool": tool_name,
|
|
"args": tool_args,
|
|
"message": f"🔧 Using tool: {tool_name}"
|
|
})
|
|
|
|
try:
|
|
# Execute tool
|
|
result = await self.registry.execute_tool(tool_name, tool_args)
|
|
logger.info(f"Tool {tool_name} executed successfully")
|
|
|
|
# Emit tool result event
|
|
if should_stream:
|
|
# Format result preview
|
|
result_preview = str(result)
|
|
if len(result_preview) > 200:
|
|
result_preview = result_preview[:200] + "..."
|
|
|
|
await stream_manager.emit(session_id, "tool_result", {
|
|
"tool": tool_name,
|
|
"result": result,
|
|
"message": f"📊 Result: {result_preview}"
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Tool {tool_name} execution failed: {str(e)}")
|
|
result = {"error": f"Tool execution failed: {str(e)}"}
|
|
|
|
# Format result using adapter
|
|
if not self.adapter:
|
|
logger.warning(f"No adapter available for backend {self.backend}, using fallback format")
|
|
result_msg = {
|
|
"role": "user",
|
|
"content": f"Tool {tool_name} result: {result}"
|
|
}
|
|
else:
|
|
result_msg = self.adapter.format_tool_result(
|
|
tool_id,
|
|
tool_name,
|
|
result
|
|
)
|
|
|
|
conversation.append(result_msg)
|
|
|
|
# Max iterations reached without completion
|
|
logger.warning(f"Tool calling truncated after {self.max_iterations} iterations")
|
|
return {
|
|
"content": response.get("content", ""),
|
|
"iterations": self.max_iterations,
|
|
"tool_calls": all_tool_calls,
|
|
"messages": conversation,
|
|
"truncated": True
|
|
}
|