"""LLM router: local (Ollama) chat, cloud (OpenAI) chat + embeddings.""" from __future__ import annotations import json from typing import Iterator, Literal, TypedDict import httpx from openai import OpenAI from lyra.config import load class Message(TypedDict): role: Literal["system", "user", "assistant"] content: str Backend = Literal["local", "cloud", "mi50"] def complete(messages: list[Message], backend: Backend = "local", model: str | None = None) -> str: """Generate a completion. `model` overrides the backend's default model (used so live chat can run a stronger cloud model than bulk consolidation).""" cfg = load() if backend == "cloud": if not cfg.openai_api_key: raise RuntimeError("OPENAI_API_KEY is not set") client = OpenAI(api_key=cfg.openai_api_key) resp = client.chat.completions.create(model=model or cfg.cloud_model, messages=messages) return resp.choices[0].message.content or "" if backend == "mi50": # MI50 box runs an OpenAI-compatible llama.cpp server; key is unused. client = OpenAI(api_key="not-needed", base_url=cfg.mi50_base_url) resp = client.chat.completions.create(model=model or cfg.mi50_model, messages=messages) return resp.choices[0].message.content or "" resp = httpx.post( f"{cfg.local_base_url}/api/chat", json={"model": model or cfg.local_model, "messages": messages, "stream": False}, timeout=120, ) resp.raise_for_status() return resp.json()["message"]["content"] def chat_call( messages: list, backend: Backend = "cloud", model: str | None = None, tools: list | None = None, ) -> tuple[dict, list | None]: """One chat turn that may request tool calls (OpenAI-style backends only). Returns (assistant_message, tool_calls): `assistant_message` is the raw message dict to append back to `messages` before any tool results; `tool_calls` is a list of {id, name, arguments} or None. `local` (Ollama) has no tool support here, so it just returns plain content. """ cfg = load() if backend in ("cloud", "mi50"): if backend == "cloud": if not cfg.openai_api_key: raise RuntimeError("OPENAI_API_KEY is not set") client = OpenAI(api_key=cfg.openai_api_key) mdl = model or cfg.cloud_model else: client = OpenAI(api_key="not-needed", base_url=cfg.mi50_base_url) mdl = model or cfg.mi50_model kwargs: dict = {"model": mdl, "messages": messages} if tools: kwargs["tools"] = tools msg = client.chat.completions.create(**kwargs).choices[0].message tcs = None if getattr(msg, "tool_calls", None): tcs = [ {"id": tc.id, "name": tc.function.name, "arguments": tc.function.arguments} for tc in msg.tool_calls ] return msg.model_dump(), tcs # local (Ollama): no tool-calling here — return plain content. return {"role": "assistant", "content": complete(messages, backend=backend, model=model)}, None def chat_call_stream( messages: list, backend: Backend = "cloud", model: str | None = None, tools: list | None = None, ) -> Iterator[tuple[str, object]]: """Streaming variant of `chat_call`. Yields ("delta", text) for each content chunk as it arrives, then exactly two terminal events: ("message", assistant_dict) — the full assistant turn, to append back ("tool_calls", calls | None) — list of {id,name,arguments} or None `local` (Ollama) streams NDJSON and never returns tool calls. """ cfg = load() if backend in ("cloud", "mi50"): if backend == "cloud": if not cfg.openai_api_key: raise RuntimeError("OPENAI_API_KEY is not set") client = OpenAI(api_key=cfg.openai_api_key) mdl = model or cfg.cloud_model else: client = OpenAI(api_key="not-needed", base_url=cfg.mi50_base_url) mdl = model or cfg.mi50_model kwargs: dict = {"model": mdl, "messages": messages, "stream": True} if tools: kwargs["tools"] = tools parts: list[str] = [] frags: dict[int, dict] = {} # tool-call fragments accumulated by index for chunk in client.chat.completions.create(**kwargs): if not chunk.choices: continue delta = chunk.choices[0].delta if getattr(delta, "content", None): parts.append(delta.content) yield ("delta", delta.content) for tc in getattr(delta, "tool_calls", None) or []: slot = frags.setdefault(tc.index, {"id": "", "name": "", "arguments": ""}) if tc.id: slot["id"] = tc.id if tc.function and tc.function.name: slot["name"] = tc.function.name if tc.function and tc.function.arguments: slot["arguments"] += tc.function.arguments content = "".join(parts) if frags: calls = [frags[i] for i in sorted(frags)] assistant = { "role": "assistant", "content": content or None, "tool_calls": [ {"id": c["id"], "type": "function", "function": {"name": c["name"], "arguments": c["arguments"]}} for c in calls ], } yield ("message", assistant) yield ("tool_calls", [{"id": c["id"], "name": c["name"], "arguments": c["arguments"]} for c in calls]) else: yield ("message", {"role": "assistant", "content": content}) yield ("tool_calls", None) return # local (Ollama): stream NDJSON, no tools. parts = [] with httpx.stream( "POST", f"{cfg.local_base_url}/api/chat", json={"model": model or cfg.local_model, "messages": messages, "stream": True}, timeout=120, ) as resp: resp.raise_for_status() for line in resp.iter_lines(): if not line: continue data = json.loads(line) piece = (data.get("message") or {}).get("content", "") if piece: parts.append(piece) yield ("delta", piece) if data.get("done"): break yield ("message", {"role": "assistant", "content": "".join(parts)}) yield ("tool_calls", None) def embed(texts: list[str]) -> list[list[float]]: """Embed texts using the configured backend (EMBED_BACKEND: "cloud" or "local"). Note: OpenAI and Ollama embeddings live in different vector spaces (and dimensions). A given database is tied to whichever backend created it — don't switch EMBED_BACKEND against an existing DB or cosine recall will break. """ cfg = load() if cfg.embed_backend == "local": resp = httpx.post( f"{cfg.local_base_url}/api/embed", json={"model": cfg.local_embed_model, "input": texts}, timeout=120, ) resp.raise_for_status() return resp.json()["embeddings"] if not cfg.openai_api_key: raise RuntimeError("OPENAI_API_KEY is not set") client = OpenAI(api_key=cfg.openai_api_key) resp = client.embeddings.create(model=cfg.embed_model, input=texts) return [d.embedding for d in resp.data]