diff --git a/lyra/chat.py b/lyra/chat.py index d9ba75b..de80aa0 100644 --- a/lyra/chat.py +++ b/lyra/chat.py @@ -162,3 +162,60 @@ def respond(session_id: str, user_msg: str, backend: Backend = "cloud", # Compact this session once enough new turns have piled up. summary.maybe_summarize(session_id) return reply + + +def respond_stream(session_id: str, user_msg: str, backend: Backend = "cloud", + model_override: str | None = None): + """Streaming generator version of `respond`. + + Yields ("delta", text) as content streams in, and ("tool", name) when a tool + runs. Persists the full exchange and yields a final ("done", reply) — matching + `respond`'s side effects (memory + compaction) exactly. + """ + cfg = config.load() + model = {"local": cfg.local_model, "cloud": cfg.chat_model, "mi50": cfg.mi50_model}.get( + backend, backend + ) + if model_override and backend == "cloud": + model = model_override + logbus.log( + "info", "chat request (stream)", session=session_id, backend=backend, + model=model, embed=cfg.embed_backend, + ) + + messages = build_messages(session_id, user_msg) + tool_specs = toolkit.specs() if backend in TOOL_BACKENDS else None + ctx = {"session_id": session_id, "backend": backend} + parts: list[str] = [] + for _ in range(MAX_TOOL_ROUNDS): + assistant_msg = None + tool_calls = None + for ev, payload in llm.chat_call_stream( + messages, backend=backend, model=model, tools=tool_specs + ): + if ev == "delta": + parts.append(payload) + yield ("delta", payload) + elif ev == "message": + assistant_msg = payload + elif ev == "tool_calls": + tool_calls = payload + if not tool_calls: + break + messages.append(assistant_msg) # her tool-call request + for tc in tool_calls: + result = toolkit.dispatch(tc["name"], tc["arguments"], ctx) + logbus.log("info", "tool call", session=session_id, tool=tc["name"], result=result[:80]) + messages.append({"role": "tool", "tool_call_id": tc["id"], "content": result}) + yield ("tool", tc["name"]) + + reply = "".join(parts) + if not reply: + reply = "(I got tangled using my tools there — say that again?)" + yield ("delta", reply) + logbus.log("info", "reply", session=session_id, chars=len(reply)) + + memory.remember(session_id, "user", user_msg) + memory.remember(session_id, "assistant", reply) + summary.maybe_summarize(session_id) + yield ("done", reply) diff --git a/lyra/llm.py b/lyra/llm.py index f9ff419..fa51d7b 100644 --- a/lyra/llm.py +++ b/lyra/llm.py @@ -1,7 +1,8 @@ """LLM router: local (Ollama) chat, cloud (OpenAI) chat + embeddings.""" from __future__ import annotations -from typing import Literal, TypedDict +import json +from typing import Iterator, Literal, TypedDict import httpx from openai import OpenAI @@ -80,6 +81,88 @@ def chat_call( return {"role": "assistant", "content": complete(messages, backend=backend, model=model)}, None +def chat_call_stream( + messages: list, backend: Backend = "cloud", model: str | None = None, + tools: list | None = None, +) -> Iterator[tuple[str, object]]: + """Streaming variant of `chat_call`. Yields ("delta", text) for each content + chunk as it arrives, then exactly two terminal events: + ("message", assistant_dict) — the full assistant turn, to append back + ("tool_calls", calls | None) — list of {id,name,arguments} or None + + `local` (Ollama) streams NDJSON and never returns tool calls. + """ + cfg = load() + if backend in ("cloud", "mi50"): + if backend == "cloud": + if not cfg.openai_api_key: + raise RuntimeError("OPENAI_API_KEY is not set") + client = OpenAI(api_key=cfg.openai_api_key) + mdl = model or cfg.cloud_model + else: + client = OpenAI(api_key="not-needed", base_url=cfg.mi50_base_url) + mdl = model or cfg.mi50_model + kwargs: dict = {"model": mdl, "messages": messages, "stream": True} + if tools: + kwargs["tools"] = tools + parts: list[str] = [] + frags: dict[int, dict] = {} # tool-call fragments accumulated by index + for chunk in client.chat.completions.create(**kwargs): + if not chunk.choices: + continue + delta = chunk.choices[0].delta + if getattr(delta, "content", None): + parts.append(delta.content) + yield ("delta", delta.content) + for tc in getattr(delta, "tool_calls", None) or []: + slot = frags.setdefault(tc.index, {"id": "", "name": "", "arguments": ""}) + if tc.id: + slot["id"] = tc.id + if tc.function and tc.function.name: + slot["name"] = tc.function.name + if tc.function and tc.function.arguments: + slot["arguments"] += tc.function.arguments + content = "".join(parts) + if frags: + calls = [frags[i] for i in sorted(frags)] + assistant = { + "role": "assistant", + "content": content or None, + "tool_calls": [ + {"id": c["id"], "type": "function", + "function": {"name": c["name"], "arguments": c["arguments"]}} + for c in calls + ], + } + yield ("message", assistant) + yield ("tool_calls", [{"id": c["id"], "name": c["name"], "arguments": c["arguments"]} for c in calls]) + else: + yield ("message", {"role": "assistant", "content": content}) + yield ("tool_calls", None) + return + + # local (Ollama): stream NDJSON, no tools. + parts = [] + with httpx.stream( + "POST", f"{cfg.local_base_url}/api/chat", + json={"model": model or cfg.local_model, "messages": messages, "stream": True}, + timeout=120, + ) as resp: + resp.raise_for_status() + for line in resp.iter_lines(): + if not line: + continue + data = json.loads(line) + piece = (data.get("message") or {}).get("content", "") + if piece: + parts.append(piece) + yield ("delta", piece) + if data.get("done"): + break + yield ("message", {"role": "assistant", "content": "".join(parts)}) + yield ("tool_calls", None) + + def embed(texts: list[str]) -> list[list[float]]: """Embed texts using the configured backend (EMBED_BACKEND: "cloud" or "local"). diff --git a/lyra/web/server.py b/lyra/web/server.py index 655c2ac..08fa998 100644 --- a/lyra/web/server.py +++ b/lyra/web/server.py @@ -111,6 +111,45 @@ def create_app() -> FastAPI: ], } + @app.post("/v1/chat/stream") + async def chat_stream(request: Request) -> StreamingResponse: + """Server-Sent Events: stream Lyra's reply token-by-token. + + `chat.respond_stream` is a blocking generator (httpx/openai), so it runs in + a worker thread and bridges chunks to this async generator via a queue. + """ + body = await request.json() + session_id = body.get("sessionId") or "default" + backend = _backend_for(body.get("backend")) + user_msg = _last_user_message(body.get("messages", [])) + model_override = body.get("model") or None + memory.ensure_session(session_id) + + async def gen(): + loop = asyncio.get_running_loop() + q: asyncio.Queue = asyncio.Queue() + done = object() + + def produce(): + try: + for event in chat.respond_stream(session_id, user_msg, backend, model_override): + loop.call_soon_threadsafe(q.put_nowait, event) + except Exception as exc: # surface to the client stream, don't hang + logbus.log("error", "chat stream failed", session=session_id, error=str(exc)) + loop.call_soon_threadsafe(q.put_nowait, ("error", str(exc))) + finally: + loop.call_soon_threadsafe(q.put_nowait, done) + + loop.run_in_executor(None, produce) + while True: + item = await q.get() + if item is done: + break + ev, payload = item + yield f"data: {json.dumps({'type': ev, 'payload': payload})}\n\n" + + return StreamingResponse(gen(), media_type="text/event-stream") + @app.get("/logs") async def logs_page() -> FileResponse: """Full-page, mobile-friendly live log viewer (separate from the chat UI).""" diff --git a/lyra/web/static/index.html b/lyra/web/static/index.html index df3670e..918f5a6 100644 --- a/lyra/web/static/index.html +++ b/lyra/web/static/index.html @@ -180,6 +180,7 @@