feat(web): stream chat replies token-by-token (M3)
- llm.chat_call_stream: streaming generator for all 3 backends (Ollama NDJSON,
OpenAI/MI50 SSE), accumulating tool-call fragments by index.
- chat.respond_stream: mirrors respond()'s tool loop and persistence/compaction,
yielding ("delta", text) / ("tool", name) / ("done", reply).
- POST /v1/chat/stream: SSE endpoint; blocking generator bridged to async via a
worker thread + asyncio.Queue. Old completions endpoint kept as fallback.
- Client streams into a live bubble with a blinking caret; rAF-throttled render
(no full re-parse per token) and instant scroll during stream — fixes iOS
Safari ghosting from per-token smooth-scroll. Falls back to the blocking
endpoint only if nothing streamed (no double-persist).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -162,3 +162,60 @@ def respond(session_id: str, user_msg: str, backend: Backend = "cloud",
|
||||
# Compact this session once enough new turns have piled up.
|
||||
summary.maybe_summarize(session_id)
|
||||
return reply
|
||||
|
||||
|
||||
def respond_stream(session_id: str, user_msg: str, backend: Backend = "cloud",
|
||||
model_override: str | None = None):
|
||||
"""Streaming generator version of `respond`.
|
||||
|
||||
Yields ("delta", text) as content streams in, and ("tool", name) when a tool
|
||||
runs. Persists the full exchange and yields a final ("done", reply) — matching
|
||||
`respond`'s side effects (memory + compaction) exactly.
|
||||
"""
|
||||
cfg = config.load()
|
||||
model = {"local": cfg.local_model, "cloud": cfg.chat_model, "mi50": cfg.mi50_model}.get(
|
||||
backend, backend
|
||||
)
|
||||
if model_override and backend == "cloud":
|
||||
model = model_override
|
||||
logbus.log(
|
||||
"info", "chat request (stream)", session=session_id, backend=backend,
|
||||
model=model, embed=cfg.embed_backend,
|
||||
)
|
||||
|
||||
messages = build_messages(session_id, user_msg)
|
||||
tool_specs = toolkit.specs() if backend in TOOL_BACKENDS else None
|
||||
ctx = {"session_id": session_id, "backend": backend}
|
||||
parts: list[str] = []
|
||||
for _ in range(MAX_TOOL_ROUNDS):
|
||||
assistant_msg = None
|
||||
tool_calls = None
|
||||
for ev, payload in llm.chat_call_stream(
|
||||
messages, backend=backend, model=model, tools=tool_specs
|
||||
):
|
||||
if ev == "delta":
|
||||
parts.append(payload)
|
||||
yield ("delta", payload)
|
||||
elif ev == "message":
|
||||
assistant_msg = payload
|
||||
elif ev == "tool_calls":
|
||||
tool_calls = payload
|
||||
if not tool_calls:
|
||||
break
|
||||
messages.append(assistant_msg) # her tool-call request
|
||||
for tc in tool_calls:
|
||||
result = toolkit.dispatch(tc["name"], tc["arguments"], ctx)
|
||||
logbus.log("info", "tool call", session=session_id, tool=tc["name"], result=result[:80])
|
||||
messages.append({"role": "tool", "tool_call_id": tc["id"], "content": result})
|
||||
yield ("tool", tc["name"])
|
||||
|
||||
reply = "".join(parts)
|
||||
if not reply:
|
||||
reply = "(I got tangled using my tools there — say that again?)"
|
||||
yield ("delta", reply)
|
||||
logbus.log("info", "reply", session=session_id, chars=len(reply))
|
||||
|
||||
memory.remember(session_id, "user", user_msg)
|
||||
memory.remember(session_id, "assistant", reply)
|
||||
summary.maybe_summarize(session_id)
|
||||
yield ("done", reply)
|
||||
|
||||
+84
-1
@@ -1,7 +1,8 @@
|
||||
"""LLM router: local (Ollama) chat, cloud (OpenAI) chat + embeddings."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Literal, TypedDict
|
||||
import json
|
||||
from typing import Iterator, Literal, TypedDict
|
||||
|
||||
import httpx
|
||||
from openai import OpenAI
|
||||
@@ -80,6 +81,88 @@ def chat_call(
|
||||
return {"role": "assistant", "content": complete(messages, backend=backend, model=model)}, None
|
||||
|
||||
|
||||
def chat_call_stream(
|
||||
messages: list, backend: Backend = "cloud", model: str | None = None,
|
||||
tools: list | None = None,
|
||||
) -> Iterator[tuple[str, object]]:
|
||||
"""Streaming variant of `chat_call`. Yields ("delta", text) for each content
|
||||
chunk as it arrives, then exactly two terminal events:
|
||||
("message", assistant_dict) — the full assistant turn, to append back
|
||||
("tool_calls", calls | None) — list of {id,name,arguments} or None
|
||||
|
||||
`local` (Ollama) streams NDJSON and never returns tool calls.
|
||||
"""
|
||||
cfg = load()
|
||||
if backend in ("cloud", "mi50"):
|
||||
if backend == "cloud":
|
||||
if not cfg.openai_api_key:
|
||||
raise RuntimeError("OPENAI_API_KEY is not set")
|
||||
client = OpenAI(api_key=cfg.openai_api_key)
|
||||
mdl = model or cfg.cloud_model
|
||||
else:
|
||||
client = OpenAI(api_key="not-needed", base_url=cfg.mi50_base_url)
|
||||
mdl = model or cfg.mi50_model
|
||||
kwargs: dict = {"model": mdl, "messages": messages, "stream": True}
|
||||
if tools:
|
||||
kwargs["tools"] = tools
|
||||
parts: list[str] = []
|
||||
frags: dict[int, dict] = {} # tool-call fragments accumulated by index
|
||||
for chunk in client.chat.completions.create(**kwargs):
|
||||
if not chunk.choices:
|
||||
continue
|
||||
delta = chunk.choices[0].delta
|
||||
if getattr(delta, "content", None):
|
||||
parts.append(delta.content)
|
||||
yield ("delta", delta.content)
|
||||
for tc in getattr(delta, "tool_calls", None) or []:
|
||||
slot = frags.setdefault(tc.index, {"id": "", "name": "", "arguments": ""})
|
||||
if tc.id:
|
||||
slot["id"] = tc.id
|
||||
if tc.function and tc.function.name:
|
||||
slot["name"] = tc.function.name
|
||||
if tc.function and tc.function.arguments:
|
||||
slot["arguments"] += tc.function.arguments
|
||||
content = "".join(parts)
|
||||
if frags:
|
||||
calls = [frags[i] for i in sorted(frags)]
|
||||
assistant = {
|
||||
"role": "assistant",
|
||||
"content": content or None,
|
||||
"tool_calls": [
|
||||
{"id": c["id"], "type": "function",
|
||||
"function": {"name": c["name"], "arguments": c["arguments"]}}
|
||||
for c in calls
|
||||
],
|
||||
}
|
||||
yield ("message", assistant)
|
||||
yield ("tool_calls", [{"id": c["id"], "name": c["name"], "arguments": c["arguments"]} for c in calls])
|
||||
else:
|
||||
yield ("message", {"role": "assistant", "content": content})
|
||||
yield ("tool_calls", None)
|
||||
return
|
||||
|
||||
# local (Ollama): stream NDJSON, no tools.
|
||||
parts = []
|
||||
with httpx.stream(
|
||||
"POST", f"{cfg.local_base_url}/api/chat",
|
||||
json={"model": model or cfg.local_model, "messages": messages, "stream": True},
|
||||
timeout=120,
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
for line in resp.iter_lines():
|
||||
if not line:
|
||||
continue
|
||||
data = json.loads(line)
|
||||
piece = (data.get("message") or {}).get("content", "")
|
||||
if piece:
|
||||
parts.append(piece)
|
||||
yield ("delta", piece)
|
||||
if data.get("done"):
|
||||
break
|
||||
yield ("message", {"role": "assistant", "content": "".join(parts)})
|
||||
yield ("tool_calls", None)
|
||||
|
||||
|
||||
def embed(texts: list[str]) -> list[list[float]]:
|
||||
"""Embed texts using the configured backend (EMBED_BACKEND: "cloud" or "local").
|
||||
|
||||
|
||||
@@ -111,6 +111,45 @@ def create_app() -> FastAPI:
|
||||
],
|
||||
}
|
||||
|
||||
@app.post("/v1/chat/stream")
|
||||
async def chat_stream(request: Request) -> StreamingResponse:
|
||||
"""Server-Sent Events: stream Lyra's reply token-by-token.
|
||||
|
||||
`chat.respond_stream` is a blocking generator (httpx/openai), so it runs in
|
||||
a worker thread and bridges chunks to this async generator via a queue.
|
||||
"""
|
||||
body = await request.json()
|
||||
session_id = body.get("sessionId") or "default"
|
||||
backend = _backend_for(body.get("backend"))
|
||||
user_msg = _last_user_message(body.get("messages", []))
|
||||
model_override = body.get("model") or None
|
||||
memory.ensure_session(session_id)
|
||||
|
||||
async def gen():
|
||||
loop = asyncio.get_running_loop()
|
||||
q: asyncio.Queue = asyncio.Queue()
|
||||
done = object()
|
||||
|
||||
def produce():
|
||||
try:
|
||||
for event in chat.respond_stream(session_id, user_msg, backend, model_override):
|
||||
loop.call_soon_threadsafe(q.put_nowait, event)
|
||||
except Exception as exc: # surface to the client stream, don't hang
|
||||
logbus.log("error", "chat stream failed", session=session_id, error=str(exc))
|
||||
loop.call_soon_threadsafe(q.put_nowait, ("error", str(exc)))
|
||||
finally:
|
||||
loop.call_soon_threadsafe(q.put_nowait, done)
|
||||
|
||||
loop.run_in_executor(None, produce)
|
||||
while True:
|
||||
item = await q.get()
|
||||
if item is done:
|
||||
break
|
||||
ev, payload = item
|
||||
yield f"data: {json.dumps({'type': ev, 'payload': payload})}\n\n"
|
||||
|
||||
return StreamingResponse(gen(), media_type="text/event-stream")
|
||||
|
||||
@app.get("/logs")
|
||||
async def logs_page() -> FileResponse:
|
||||
"""Full-page, mobile-friendly live log viewer (separate from the chat UI)."""
|
||||
|
||||
@@ -180,6 +180,7 @@
|
||||
<script>
|
||||
const RELAY_BASE = ""; // same-origin: served by lyra.web.server
|
||||
const API_URL = `${RELAY_BASE}/v1/chat/completions`;
|
||||
const STREAM_URL = `${RELAY_BASE}/v1/chat/stream`;
|
||||
|
||||
function generateSessionId() {
|
||||
return "sess-" + Math.random().toString(36).substring(2, 10);
|
||||
@@ -308,21 +309,101 @@
|
||||
body.model = cloudModel;
|
||||
}
|
||||
|
||||
// Stream the reply token-by-token (SSE). Fall back to the blocking
|
||||
// endpoint only if nothing streamed (e.g. streaming unavailable).
|
||||
const div = createAssistantBubble();
|
||||
let full = "";
|
||||
try {
|
||||
const resp = await fetch(STREAM_URL, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(body)
|
||||
});
|
||||
if (!resp.ok || !resp.body) throw new Error("HTTP " + resp.status);
|
||||
|
||||
const reader = resp.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buf = "";
|
||||
for (;;) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
buf += decoder.decode(value, { stream: true });
|
||||
let i;
|
||||
while ((i = buf.indexOf("\n\n")) !== -1) {
|
||||
const frame = buf.slice(0, i).trim();
|
||||
buf = buf.slice(i + 2);
|
||||
if (!frame.startsWith("data:")) continue;
|
||||
let evt;
|
||||
try { evt = JSON.parse(frame.slice(5).trim()); } catch (e) { continue; }
|
||||
if (evt.type === "delta") {
|
||||
full += evt.payload;
|
||||
updateAssistantBubble(div, full);
|
||||
} else if (evt.type === "done") {
|
||||
if (evt.payload) full = evt.payload;
|
||||
} else if (evt.type === "error") {
|
||||
throw new Error(evt.payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
if (!full) {
|
||||
div.remove();
|
||||
try {
|
||||
const resp = await fetch(API_URL, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(body)
|
||||
});
|
||||
|
||||
const data = await resp.json();
|
||||
const reply = data.choices?.[0]?.message?.content || "(no reply)";
|
||||
addMessage("assistant", reply);
|
||||
history.push({ role: "assistant", content: reply });
|
||||
await saveSession();
|
||||
} catch (err) {
|
||||
addMessage("system", "Error: " + err.message);
|
||||
} catch (err2) {
|
||||
addMessage("system", "Error: " + err2.message);
|
||||
}
|
||||
return;
|
||||
}
|
||||
// Partial content arrived before the error — keep what we streamed.
|
||||
}
|
||||
|
||||
finalizeAssistantBubble(div, full || "(no reply)");
|
||||
history.push({ role: "assistant", content: full || "(no reply)" });
|
||||
await saveSession();
|
||||
}
|
||||
|
||||
function createAssistantBubble() {
|
||||
const messagesEl = document.getElementById("messages");
|
||||
const div = document.createElement("div");
|
||||
div.className = "msg assistant streaming";
|
||||
messagesEl.appendChild(div);
|
||||
messagesEl.scrollTop = messagesEl.scrollHeight; // instant — no smooth chasing
|
||||
return div;
|
||||
}
|
||||
|
||||
// Coalesce token updates to one render per animation frame (avoids re-parsing
|
||||
// the whole message on every token, and the iOS ghosting from rapid repaints).
|
||||
function updateAssistantBubble(div, text) {
|
||||
div._pending = text;
|
||||
if (div._raf) return;
|
||||
div._raf = requestAnimationFrame(() => {
|
||||
div._raf = 0;
|
||||
const messagesEl = document.getElementById("messages");
|
||||
const stick = messagesEl.scrollHeight - messagesEl.scrollTop - messagesEl.clientHeight < 90;
|
||||
div.innerHTML = renderMarkdown(div._pending);
|
||||
div.dataset.raw = div._pending;
|
||||
if (stick) messagesEl.scrollTop = messagesEl.scrollHeight; // follow only if near bottom
|
||||
});
|
||||
}
|
||||
|
||||
function finalizeAssistantBubble(div, text) {
|
||||
if (div._raf) { cancelAnimationFrame(div._raf); div._raf = 0; } // drop any queued render
|
||||
div.classList.remove("streaming");
|
||||
div.innerHTML = renderMarkdown(text);
|
||||
div.dataset.raw = text;
|
||||
addRateBar(div);
|
||||
const messagesEl = document.getElementById("messages");
|
||||
requestAnimationFrame(() => messagesEl.scrollTo({ top: messagesEl.scrollHeight, behavior: "smooth" }));
|
||||
}
|
||||
|
||||
function renderMarkdown(text) {
|
||||
|
||||
@@ -139,7 +139,9 @@ button:hover, select:hover {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: 8px;
|
||||
scroll-behavior: smooth;
|
||||
/* No CSS smooth-scroll: during streaming, per-token smooth scrolls pile up and
|
||||
iOS Safari leaves ghost paint frames. Smooth is applied explicitly in JS where
|
||||
it's a one-shot (load/finalize). */
|
||||
}
|
||||
|
||||
/* Messages */
|
||||
@@ -1090,6 +1092,16 @@ select:hover {
|
||||
}
|
||||
.msg.assistant pre code { background: none; padding: 0; font-size: 0.85em; }
|
||||
|
||||
/* Streaming: a blinking caret while tokens arrive (and a min-size while empty). */
|
||||
.msg.assistant.streaming { min-width: 1.4em; min-height: 1.1em; }
|
||||
.msg.assistant.streaming::after {
|
||||
content: "▋";
|
||||
margin-left: 1px;
|
||||
color: var(--accent);
|
||||
animation: caretBlink 1s steps(1) infinite;
|
||||
}
|
||||
@keyframes caretBlink { 0%, 50% { opacity: 0.85; } 50.01%, 100% { opacity: 0; } }
|
||||
|
||||
/* Behind-the-scenes 👍/👎 feedback (fine-tune signal) — subtle until hovered. */
|
||||
.rate-bar { display: flex; gap: 6px; margin-top: 7px; opacity: 0.3; transition: opacity .15s; }
|
||||
.msg.assistant:hover .rate-bar { opacity: 0.85; }
|
||||
|
||||
Reference in New Issue
Block a user