5dc3fa17d7
- llm.chat_call_stream: streaming generator for all 3 backends (Ollama NDJSON,
OpenAI/MI50 SSE), accumulating tool-call fragments by index.
- chat.respond_stream: mirrors respond()'s tool loop and persistence/compaction,
yielding ("delta", text) / ("tool", name) / ("done", reply).
- POST /v1/chat/stream: SSE endpoint; blocking generator bridged to async via a
worker thread + asyncio.Queue. Old completions endpoint kept as fallback.
- Client streams into a live bubble with a blinking caret; rAF-throttled render
(no full re-parse per token) and instant scroll during stream — fixes iOS
Safari ghosting from per-token smooth-scroll. Falls back to the blocking
endpoint only if nothing streamed (no double-persist).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
188 lines
7.3 KiB
Python
188 lines
7.3 KiB
Python
"""LLM router: local (Ollama) chat, cloud (OpenAI) chat + embeddings."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Iterator, Literal, TypedDict
|
|
|
|
import httpx
|
|
from openai import OpenAI
|
|
|
|
from lyra.config import load
|
|
|
|
|
|
class Message(TypedDict):
|
|
role: Literal["system", "user", "assistant"]
|
|
content: str
|
|
|
|
|
|
Backend = Literal["local", "cloud", "mi50"]
|
|
|
|
|
|
def complete(messages: list[Message], backend: Backend = "local", model: str | None = None) -> str:
|
|
"""Generate a completion. `model` overrides the backend's default model
|
|
(used so live chat can run a stronger cloud model than bulk consolidation)."""
|
|
cfg = load()
|
|
if backend == "cloud":
|
|
if not cfg.openai_api_key:
|
|
raise RuntimeError("OPENAI_API_KEY is not set")
|
|
client = OpenAI(api_key=cfg.openai_api_key)
|
|
resp = client.chat.completions.create(model=model or cfg.cloud_model, messages=messages)
|
|
return resp.choices[0].message.content or ""
|
|
|
|
if backend == "mi50":
|
|
# MI50 box runs an OpenAI-compatible llama.cpp server; key is unused.
|
|
client = OpenAI(api_key="not-needed", base_url=cfg.mi50_base_url)
|
|
resp = client.chat.completions.create(model=model or cfg.mi50_model, messages=messages)
|
|
return resp.choices[0].message.content or ""
|
|
|
|
resp = httpx.post(
|
|
f"{cfg.local_base_url}/api/chat",
|
|
json={"model": model or cfg.local_model, "messages": messages, "stream": False},
|
|
timeout=120,
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()["message"]["content"]
|
|
|
|
|
|
def chat_call(
|
|
messages: list, backend: Backend = "cloud", model: str | None = None,
|
|
tools: list | None = None,
|
|
) -> tuple[dict, list | None]:
|
|
"""One chat turn that may request tool calls (OpenAI-style backends only).
|
|
|
|
Returns (assistant_message, tool_calls): `assistant_message` is the raw
|
|
message dict to append back to `messages` before any tool results;
|
|
`tool_calls` is a list of {id, name, arguments} or None. `local` (Ollama)
|
|
has no tool support here, so it just returns plain content.
|
|
"""
|
|
cfg = load()
|
|
if backend in ("cloud", "mi50"):
|
|
if backend == "cloud":
|
|
if not cfg.openai_api_key:
|
|
raise RuntimeError("OPENAI_API_KEY is not set")
|
|
client = OpenAI(api_key=cfg.openai_api_key)
|
|
mdl = model or cfg.cloud_model
|
|
else:
|
|
client = OpenAI(api_key="not-needed", base_url=cfg.mi50_base_url)
|
|
mdl = model or cfg.mi50_model
|
|
kwargs: dict = {"model": mdl, "messages": messages}
|
|
if tools:
|
|
kwargs["tools"] = tools
|
|
msg = client.chat.completions.create(**kwargs).choices[0].message
|
|
tcs = None
|
|
if getattr(msg, "tool_calls", None):
|
|
tcs = [
|
|
{"id": tc.id, "name": tc.function.name, "arguments": tc.function.arguments}
|
|
for tc in msg.tool_calls
|
|
]
|
|
return msg.model_dump(), tcs
|
|
|
|
# local (Ollama): no tool-calling here — return plain content.
|
|
return {"role": "assistant", "content": complete(messages, backend=backend, model=model)}, None
|
|
|
|
|
|
def chat_call_stream(
|
|
messages: list, backend: Backend = "cloud", model: str | None = None,
|
|
tools: list | None = None,
|
|
) -> Iterator[tuple[str, object]]:
|
|
"""Streaming variant of `chat_call`. Yields ("delta", text) for each content
|
|
chunk as it arrives, then exactly two terminal events:
|
|
("message", assistant_dict) — the full assistant turn, to append back
|
|
("tool_calls", calls | None) — list of {id,name,arguments} or None
|
|
|
|
`local` (Ollama) streams NDJSON and never returns tool calls.
|
|
"""
|
|
cfg = load()
|
|
if backend in ("cloud", "mi50"):
|
|
if backend == "cloud":
|
|
if not cfg.openai_api_key:
|
|
raise RuntimeError("OPENAI_API_KEY is not set")
|
|
client = OpenAI(api_key=cfg.openai_api_key)
|
|
mdl = model or cfg.cloud_model
|
|
else:
|
|
client = OpenAI(api_key="not-needed", base_url=cfg.mi50_base_url)
|
|
mdl = model or cfg.mi50_model
|
|
kwargs: dict = {"model": mdl, "messages": messages, "stream": True}
|
|
if tools:
|
|
kwargs["tools"] = tools
|
|
parts: list[str] = []
|
|
frags: dict[int, dict] = {} # tool-call fragments accumulated by index
|
|
for chunk in client.chat.completions.create(**kwargs):
|
|
if not chunk.choices:
|
|
continue
|
|
delta = chunk.choices[0].delta
|
|
if getattr(delta, "content", None):
|
|
parts.append(delta.content)
|
|
yield ("delta", delta.content)
|
|
for tc in getattr(delta, "tool_calls", None) or []:
|
|
slot = frags.setdefault(tc.index, {"id": "", "name": "", "arguments": ""})
|
|
if tc.id:
|
|
slot["id"] = tc.id
|
|
if tc.function and tc.function.name:
|
|
slot["name"] = tc.function.name
|
|
if tc.function and tc.function.arguments:
|
|
slot["arguments"] += tc.function.arguments
|
|
content = "".join(parts)
|
|
if frags:
|
|
calls = [frags[i] for i in sorted(frags)]
|
|
assistant = {
|
|
"role": "assistant",
|
|
"content": content or None,
|
|
"tool_calls": [
|
|
{"id": c["id"], "type": "function",
|
|
"function": {"name": c["name"], "arguments": c["arguments"]}}
|
|
for c in calls
|
|
],
|
|
}
|
|
yield ("message", assistant)
|
|
yield ("tool_calls", [{"id": c["id"], "name": c["name"], "arguments": c["arguments"]} for c in calls])
|
|
else:
|
|
yield ("message", {"role": "assistant", "content": content})
|
|
yield ("tool_calls", None)
|
|
return
|
|
|
|
# local (Ollama): stream NDJSON, no tools.
|
|
parts = []
|
|
with httpx.stream(
|
|
"POST", f"{cfg.local_base_url}/api/chat",
|
|
json={"model": model or cfg.local_model, "messages": messages, "stream": True},
|
|
timeout=120,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
for line in resp.iter_lines():
|
|
if not line:
|
|
continue
|
|
data = json.loads(line)
|
|
piece = (data.get("message") or {}).get("content", "")
|
|
if piece:
|
|
parts.append(piece)
|
|
yield ("delta", piece)
|
|
if data.get("done"):
|
|
break
|
|
yield ("message", {"role": "assistant", "content": "".join(parts)})
|
|
yield ("tool_calls", None)
|
|
|
|
|
|
def embed(texts: list[str]) -> list[list[float]]:
|
|
"""Embed texts using the configured backend (EMBED_BACKEND: "cloud" or "local").
|
|
|
|
Note: OpenAI and Ollama embeddings live in different vector spaces (and
|
|
dimensions). A given database is tied to whichever backend created it — don't
|
|
switch EMBED_BACKEND against an existing DB or cosine recall will break.
|
|
"""
|
|
cfg = load()
|
|
if cfg.embed_backend == "local":
|
|
resp = httpx.post(
|
|
f"{cfg.local_base_url}/api/embed",
|
|
json={"model": cfg.local_embed_model, "input": texts},
|
|
timeout=120,
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()["embeddings"]
|
|
|
|
if not cfg.openai_api_key:
|
|
raise RuntimeError("OPENAI_API_KEY is not set")
|
|
client = OpenAI(api_key=cfg.openai_api_key)
|
|
resp = client.embeddings.create(model=cfg.embed_model, input=texts)
|
|
return [d.embedding for d in resp.data]
|