# llm_router.py import os import httpx import json import logging from typing import Optional, List, Dict from autonomy.tools.adapters import OpenAIAdapter, OllamaAdapter, LlamaCppAdapter logger = logging.getLogger(__name__) # ------------------------------------------------------------ # Load backend registry from root .env # ------------------------------------------------------------ BACKENDS = { "PRIMARY": { "provider": os.getenv("LLM_PRIMARY_PROVIDER", "").lower(), "url": os.getenv("LLM_PRIMARY_URL", ""), "model": os.getenv("LLM_PRIMARY_MODEL", "") }, "SECONDARY": { "provider": os.getenv("LLM_SECONDARY_PROVIDER", "").lower(), "url": os.getenv("LLM_SECONDARY_URL", ""), "model": os.getenv("LLM_SECONDARY_MODEL", "") }, "OPENAI": { "provider": os.getenv("LLM_OPENAI_PROVIDER", "").lower(), "url": os.getenv("LLM_OPENAI_URL", ""), "model": os.getenv("LLM_OPENAI_MODEL", ""), "api_key": os.getenv("OPENAI_API_KEY", "") }, "FALLBACK": { "provider": os.getenv("LLM_FALLBACK_PROVIDER", "").lower(), "url": os.getenv("LLM_FALLBACK_URL", ""), "model": os.getenv("LLM_FALLBACK_MODEL", "") }, } DEFAULT_BACKEND = "PRIMARY" # Reusable async HTTP client http_client = httpx.AsyncClient(timeout=120.0) # Tool adapters for each backend TOOL_ADAPTERS = { "OPENAI": OpenAIAdapter(), "OLLAMA": OllamaAdapter(), "MI50": LlamaCppAdapter(), # MI50 uses llama.cpp "PRIMARY": None, # Determined at runtime "SECONDARY": None, # Determined at runtime "FALLBACK": None, # Determined at runtime } # ------------------------------------------------------------ # Public call # ------------------------------------------------------------ async def call_llm( prompt: str = None, messages: list = None, backend: str | None = None, temperature: float = 0.7, max_tokens: int = 512, tools: Optional[List[Dict]] = None, tool_choice: Optional[str] = None, return_adapter_response: bool = False, ): """ Call an LLM backend with optional tool calling support. Args: prompt: String prompt (for completion-style APIs like mi50) messages: List of message dicts (for chat-style APIs like Ollama/OpenAI) backend: Which backend to use (PRIMARY, SECONDARY, OPENAI, etc.) temperature: Sampling temperature max_tokens: Maximum tokens to generate tools: List of Lyra tool definitions (provider-agnostic) tool_choice: How to use tools ("auto", "required", "none") return_adapter_response: If True, return dict with content and tool_calls Returns: str (default) or dict (if return_adapter_response=True): {"content": str, "tool_calls": [...] or None} """ backend = (backend or DEFAULT_BACKEND).upper() if backend not in BACKENDS: raise RuntimeError(f"Unknown backend '{backend}'") cfg = BACKENDS[backend] provider = cfg["provider"] url = cfg["url"] model = cfg["model"] if not url or not model: raise RuntimeError(f"Backend '{backend}' missing url/model in env") # If tools are requested, use adapter to prepare request if tools: # Get adapter for this backend adapter = TOOL_ADAPTERS.get(backend) # For PRIMARY/SECONDARY/FALLBACK, determine adapter based on provider if adapter is None and backend in ["PRIMARY", "SECONDARY", "FALLBACK"]: if provider == "openai": adapter = TOOL_ADAPTERS["OPENAI"] elif provider == "ollama": adapter = TOOL_ADAPTERS["OLLAMA"] elif provider == "mi50": adapter = TOOL_ADAPTERS["MI50"] if adapter: # Use messages array if provided, otherwise convert prompt to messages if not messages: messages = [{"role": "user", "content": prompt}] # Prepare request through adapter adapted_request = await adapter.prepare_request(messages, tools, tool_choice) messages = adapted_request["messages"] # Extract tools in provider format if present provider_tools = adapted_request.get("tools") provider_tool_choice = adapted_request.get("tool_choice") else: logger.warning(f"No adapter available for backend {backend}, ignoring tools") provider_tools = None provider_tool_choice = None else: provider_tools = None provider_tool_choice = None # ------------------------------- # Provider: MI50 (llama.cpp server) # ------------------------------- if provider == "mi50": # If tools requested, convert messages to prompt with tool instructions if messages and tools: # Combine messages into a prompt prompt_parts = [] for msg in messages: role = msg.get("role", "user") content = msg.get("content", "") prompt_parts.append(f"{role.capitalize()}: {content}") prompt = "\n".join(prompt_parts) + "\nAssistant:" payload = { "prompt": prompt, "n_predict": max_tokens, "temperature": temperature, "stop": ["User:", "\nUser:", "Assistant:", "\n\n\n"] } try: r = await http_client.post(f"{url}/completion", json=payload) r.raise_for_status() data = r.json() response_content = data.get("content", "") # If caller wants adapter response with tool calls, parse and return if return_adapter_response and tools: adapter = TOOL_ADAPTERS.get(backend) or TOOL_ADAPTERS["MI50"] return await adapter.parse_response(response_content) else: return response_content except httpx.HTTPError as e: logger.error(f"HTTP error calling mi50: {type(e).__name__}: {str(e)}") raise RuntimeError(f"LLM API error (mi50): {type(e).__name__}: {str(e)}") except (KeyError, json.JSONDecodeError) as e: logger.error(f"Response parsing error from mi50: {e}") raise RuntimeError(f"Invalid response format (mi50): {e}") except Exception as e: logger.error(f"Unexpected error calling mi50: {type(e).__name__}: {str(e)}") raise RuntimeError(f"Unexpected error (mi50): {type(e).__name__}: {str(e)}") # ------------------------------- # Provider: OLLAMA (your 3090) # ------------------------------- logger.info(f"🔍 LLM Router: provider={provider}, checking if ollama...") if provider == "ollama": logger.info(f"🔍 LLM Router: Matched ollama provider, tools={bool(tools)}, return_adapter_response={return_adapter_response}") # Use messages array if provided, otherwise convert prompt to single user message if messages: chat_messages = messages else: chat_messages = [{"role": "user", "content": prompt}] payload = { "model": model, "messages": chat_messages, "stream": False, "options": { "temperature": temperature, "num_predict": max_tokens } } try: r = await http_client.post(f"{url}/api/chat", json=payload) r.raise_for_status() data = r.json() response_content = data["message"]["content"] # If caller wants adapter response with tool calls, parse and return if return_adapter_response and tools: logger.info(f"🔍 Ollama: return_adapter_response=True, calling adapter.parse_response") adapter = TOOL_ADAPTERS.get(backend) or TOOL_ADAPTERS["OLLAMA"] logger.info(f"🔍 Ollama: Using adapter {adapter.__class__.__name__}") result = await adapter.parse_response(response_content) logger.info(f"🔍 Ollama: Adapter returned {result}") return result else: return response_content except httpx.HTTPError as e: logger.error(f"HTTP error calling ollama: {type(e).__name__}: {str(e)}") raise RuntimeError(f"LLM API error (ollama): {type(e).__name__}: {str(e)}") except (KeyError, json.JSONDecodeError) as e: logger.error(f"Response parsing error from ollama: {e}") raise RuntimeError(f"Invalid response format (ollama): {e}") except Exception as e: logger.error(f"Unexpected error calling ollama: {type(e).__name__}: {str(e)}") raise RuntimeError(f"Unexpected error (ollama): {type(e).__name__}: {str(e)}") # ------------------------------- # Provider: OPENAI # ------------------------------- if provider == "openai": headers = { "Authorization": f"Bearer {cfg['api_key']}", "Content-Type": "application/json" } # Use messages array if provided, otherwise convert prompt to single user message if messages: chat_messages = messages else: chat_messages = [{"role": "user", "content": prompt}] payload = { "model": model, "messages": chat_messages, "temperature": temperature, "max_tokens": max_tokens, } # Add tools if available (OpenAI native function calling) if provider_tools: payload["tools"] = provider_tools if provider_tool_choice: payload["tool_choice"] = provider_tool_choice try: r = await http_client.post(f"{url}/chat/completions", json=payload, headers=headers) r.raise_for_status() data = r.json() # If caller wants adapter response with tool calls, parse and return if return_adapter_response and tools: # Create mock response object for adapter class MockChoice: def __init__(self, message_data): self.message = type('obj', (object,), {})() self.message.content = message_data.get("content") # Convert tool_calls dicts to objects raw_tool_calls = message_data.get("tool_calls") if raw_tool_calls: self.message.tool_calls = [] for tc in raw_tool_calls: tool_call_obj = type('obj', (object,), {})() tool_call_obj.id = tc.get("id") tool_call_obj.function = type('obj', (object,), {})() tool_call_obj.function.name = tc.get("function", {}).get("name") tool_call_obj.function.arguments = tc.get("function", {}).get("arguments") self.message.tool_calls.append(tool_call_obj) else: self.message.tool_calls = None class MockResponse: def __init__(self, data): self.choices = [MockChoice(data["choices"][0]["message"])] mock_resp = MockResponse(data) adapter = TOOL_ADAPTERS.get(backend) or TOOL_ADAPTERS["OPENAI"] return await adapter.parse_response(mock_resp) else: return data["choices"][0]["message"]["content"] except httpx.HTTPError as e: logger.error(f"HTTP error calling openai: {type(e).__name__}: {str(e)}") raise RuntimeError(f"LLM API error (openai): {type(e).__name__}: {str(e)}") except (KeyError, json.JSONDecodeError) as e: logger.error(f"Response parsing error from openai: {e}") raise RuntimeError(f"Invalid response format (openai): {e}") except Exception as e: logger.error(f"Unexpected error calling openai: {type(e).__name__}: {str(e)}") raise RuntimeError(f"Unexpected error (openai): {type(e).__name__}: {str(e)}") # ------------------------------- # Unknown provider # ------------------------------- raise RuntimeError(f"Provider '{provider}' not implemented.")