Files
project-lyra/lyra/web/server.py
T
serversdown 4f770f2e43 feat: behind-the-scenes 👍/👎 rating system (fine-tune data collection)
Brian can rate Lyra's outputs as he uses her; each rating is stored as a
(context, content, rating) triple — the shape a future fine-tune / preference
dataset wants, collected passively during real use.

- memory: ratings table + add_rating (upsert: one row per item, re-rating
  replaces), list_ratings, rating_counts
- server: POST /rate, GET /ratings/counts, GET /ratings/export (JSONL download)
- chat UI: subtle 👍/👎 on each assistant reply, captures the prompting message
  as context
- journal/reflection UI: 👍/👎 on each thought
- tests: counts + upsert-replace behavior

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 19:32:27 +00:00

250 lines
9.1 KiB
Python

"""Web server for the vendored chat UI.
Serves the static single-page UI and implements the small endpoint contract it
expects (originally provided by the old Node relay), backed by the new Python
chat loop and SQLite memory. SQLite is the single source of truth for messages:
`/v1/chat/completions` persists via `chat.respond`, so the UI's `POST /sessions`
saves are accepted but treated as no-ops (the row is ensured, messages are not
re-stored).
"""
from __future__ import annotations
import asyncio
import json
import time
from pathlib import Path
from fastapi import FastAPI, Request, Response
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from lyra import chat, logbus, memory, poker, self_state, summary
from lyra.llm import Backend
def _sse(event: dict) -> str:
return f"data: {json.dumps(event)}\n\n"
_STATIC = Path(__file__).parent / "static"
# UI backend labels -> our two backends. Cloud is the default.
_CLOUD = {"OPENAI", "cloud", "custom"}
def _backend_for(label: str | None) -> Backend:
key = (label or "").lower()
if key == "mi50":
return "mi50"
if key in {"local", "primary", "secondary", "fallback"}:
return "local"
return "cloud"
def _last_user_message(messages: list[dict]) -> str:
for m in reversed(messages):
if m.get("role") == "user":
return m.get("content", "")
return messages[-1].get("content", "") if messages else ""
def create_app() -> FastAPI:
app = FastAPI(title="Lyra Web")
@app.get("/_health")
async def health() -> dict:
return {"ok": True}
@app.get("/sessions")
async def list_sessions() -> list[dict]:
return memory.list_sessions()
@app.get("/sessions/{session_id}")
async def get_session(session_id: str) -> list[dict]:
return [{"role": ex.role, "content": ex.content} for ex in memory.history(session_id)]
@app.post("/sessions/{session_id}")
async def save_session(session_id: str, request: Request) -> dict:
# Messages are already persisted by chat.respond; just ensure the row exists.
await request.body() # drain the history payload we intentionally ignore
memory.ensure_session(session_id)
return {"ok": True}
@app.patch("/sessions/{session_id}/metadata")
async def rename_session(session_id: str, request: Request) -> dict:
body = await request.json()
memory.ensure_session(session_id, name=body.get("name"))
return {"ok": True}
@app.delete("/sessions/{session_id}")
async def delete_session(session_id: str) -> dict:
memory.delete_session(session_id)
return {"ok": True}
@app.post("/sessions/{session_id}/summarize")
async def summarize(session_id: str) -> dict:
gist = await asyncio.to_thread(summary.summarize_session, session_id)
return {"ok": gist is not None, "summary": gist}
@app.post("/v1/chat/completions")
async def chat_completions(request: Request) -> dict:
body = await request.json()
session_id = body.get("sessionId") or "default"
backend = _backend_for(body.get("backend"))
user_msg = _last_user_message(body.get("messages", []))
model_override = body.get("model") or None
memory.ensure_session(session_id)
try:
reply = await asyncio.to_thread(chat.respond, session_id, user_msg, backend, model_override)
except Exception as exc:
logbus.log("error", "chat failed", session=session_id, error=str(exc))
reply = f"[error] {exc}"
return {
"object": "chat.completion",
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": reply},
"finish_reason": "stop",
}
],
}
@app.get("/logs")
async def logs_page() -> FileResponse:
"""Full-page, mobile-friendly live log viewer (separate from the chat UI)."""
return FileResponse(str(_STATIC / "logs.html"))
@app.get("/self")
async def self_page() -> FileResponse:
"""'Read her mind' — a view of Lyra's current self-state."""
return FileResponse(str(_STATIC / "self.html"))
@app.get("/self/state")
async def self_state_json() -> dict:
"""Lyra's current interiority + when it last changed."""
return {"state": self_state.load(), "updated_at": memory.self_state_updated_at()}
@app.post("/self/reflect")
async def self_reflect() -> dict:
"""Run one two-step reflection now, in this process, so the draft ->
revised -> critique lands in the live log (/logs)."""
state = await asyncio.to_thread(self_state.reflect)
return {"ok": True, "mood": state.get("mood")}
@app.get("/journal")
async def journal_page() -> FileResponse:
"""Lyra's journal — the permanent, append-only record of her thoughts."""
return FileResponse(str(_STATIC / "journal.html"))
@app.get("/journal/data")
async def journal_data(limit: int = 300) -> dict:
return {"entries": memory.list_journal(limit=limit)}
@app.post("/rate")
async def rate(request: Request) -> dict:
"""Record Brian's 👍/👎 on a Lyra output (chat reply, reflection, journal)."""
b = await request.json()
rating = int(b.get("rating", 0))
content = (b.get("content") or "").strip()
if not content or rating == 0:
return {"ok": False}
memory.add_rating(
kind=b.get("kind") or "chat", rating=rating, content=content,
context=(b.get("context") or None), ref=b.get("ref"), note=b.get("note"),
)
logbus.log("info", "rating", kind=b.get("kind"), rating=1 if rating >= 0 else -1)
return {"ok": True, "counts": memory.rating_counts()}
@app.get("/ratings/counts")
async def ratings_counts() -> dict:
return memory.rating_counts()
@app.get("/ratings/export")
async def ratings_export() -> Response:
"""All ratings as JSONL — the seed for a future fine-tune / preference set."""
lines = "\n".join(json.dumps(r) for r in memory.list_ratings())
return Response(content=lines + ("\n" if lines else ""), media_type="application/x-ndjson",
headers={"Content-Disposition": 'attachment; filename="lyra_ratings.jsonl"'})
@app.get("/hand/{hand_id}")
async def hand_page(hand_id: int) -> FileResponse:
"""Replayable hand-history viewer."""
return FileResponse(str(_STATIC / "hand.html"))
@app.get("/hand/{hand_id}/data")
async def hand_data(hand_id: int) -> dict:
return poker.get_hand(hand_id) or {}
@app.get("/hands")
async def hands_page() -> FileResponse:
return FileResponse(str(_STATIC / "hands.html"))
@app.get("/hands/data")
async def hands_data(limit: int = 60) -> dict:
return {"hands": poker.list_recent_hands(limit=limit)}
@app.get("/recap/{session_id}")
async def recap_page() -> FileResponse:
return FileResponse(str(_STATIC / "recap.html"))
@app.get("/recap/{session_id}/data")
async def recap_data(session_id: int) -> dict:
s = poker.get_session(session_id) or {}
return {"session": s, "markdown": s.get("recap_md")}
@app.get("/recap/{session_id}/download")
async def recap_download(session_id: int) -> Response:
s = poker.get_session(session_id) or {}
md = s.get("recap_md") or "# No recap generated yet\n"
date = (s.get("started_at") or "session")[:10]
fname = f"pokerlog_{date}_s{session_id}.md"
return Response(content=md, media_type="text/markdown",
headers={"Content-Disposition": f'attachment; filename="{fname}"'})
@app.get("/stream/logs")
async def stream_logs(request: Request) -> StreamingResponse:
"""Live activity feed: replay the recent buffer, then stream new events."""
async def gen():
backlog = logbus.since(0)
last = backlog[-1]["seq"] if backlog else 0
for e in backlog:
yield _sse(e)
yield _sse(
{"seq": last, "ts": time.time(), "level": "system",
"msg": "live log connected", "fields": {}}
)
while True:
if await request.is_disconnected():
break
for e in logbus.since(last):
last = e["seq"]
yield _sse(e)
await asyncio.sleep(0.5)
return StreamingResponse(gen(), media_type="text/event-stream")
# Static UI last, so the API routes above take precedence. html=True serves
# index.html at "/" and assets (style.css, manifest.json) at their paths.
app.mount("/", StaticFiles(directory=str(_STATIC), html=True), name="ui")
return app
app = create_app()
def serve() -> None:
"""Console-script entry: `lyra-web`."""
import os
import uvicorn
host = os.getenv("LYRA_WEB_HOST", "0.0.0.0")
port = int(os.getenv("LYRA_WEB_PORT", "7078"))
uvicorn.run(app, host=host, port=port)
if __name__ == "__main__":
serve()