Files
project-lyra/lyra/memory.py
T
serversdown dfb6425395 feat: session modes (Talk/Cash) + live session HUD
Lyra now switches register based on what she's doing at the table instead of
being a wishy-washy companion mid-session.

Modes (lyra/modes.py):
- Talk (default companion) + Cash (live cash copilot); a mode = prompt card +
  tool allow-list. Tool gating via tools.specs(allow=).
- Two-register Cash voice: act-first one-line logging when fed facts; full warm
  companion voice for strategy / tilt / mental game.
- mode persisted per chat session (new sessions.mode column); auto-switch into
  Cash when start_session fires; UI forces cloud backend in Cash (tools only
  fire there).

Stack tracking + HUD:
- log_stack tool + poker_stack_log table; live net while sitting (stack - buy-in).
- poker.hud() bundle; /session HUD page (stack sparkline, hands, villains, notes,
  stats) polling /session/data every 5s; Talk/Cash switcher + Session nav.

Endpoints: /session, /session/data, GET/POST /sessions/{id}/mode, /modes.
tests/test_modes.py (gating, mode roundtrip, stack/HUD); 36 tests green. v0.3.0.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-19 05:28:15 +00:00

709 lines
25 KiB
Python

"""Persistent memory: SQLite storage + brute-force cosine recall over embeddings.
Each exchange is stored with its OpenAI embedding as a float32 BLOB. Recall
loads all embeddings (optionally scoped to a session) into a matrix and
returns the top-k by cosine similarity. Brute force is fine up to tens of
thousands of rows; swap in a vector index when that stops being true.
"""
from __future__ import annotations
import json
import sqlite3
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
import numpy as np
from lyra import llm
from lyra.config import load
SCHEMA = """
CREATE TABLE IF NOT EXISTS exchanges (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
embedding BLOB NOT NULL,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_session_created ON exchanges(session_id, created_at);
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
name TEXT,
mode TEXT, -- conversation mode (see lyra/modes.py); NULL = default
created_at TEXT NOT NULL
);
-- One compacted "gist" per session. last_exchange_id marks how far the summary
-- covers, so we know when enough new turns have accumulated to re-summarize.
CREATE TABLE IF NOT EXISTS summaries (
session_id TEXT PRIMARY KEY,
content TEXT NOT NULL,
embedding BLOB NOT NULL,
last_exchange_id INTEGER NOT NULL,
created_at TEXT NOT NULL
);
-- Derived semantic memory: standing facts about the user, distilled from the
-- session gists by the consolidation pass. Single row (id='self').
CREATE TABLE IF NOT EXISTS profile (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
sessions_covered INTEGER NOT NULL,
updated_at TEXT NOT NULL
);
-- Temporal memory: one "what was happening" digest per calendar month, rolled
-- up from that month's session gists. month is "YYYY-MM".
CREATE TABLE IF NOT EXISTS eras (
month TEXT PRIMARY KEY,
content TEXT NOT NULL,
embedding BLOB NOT NULL,
session_count INTEGER NOT NULL,
created_at TEXT NOT NULL
);
-- The current narrative: time-aware arc/trends/callbacks (vs the timeless
-- profile). Distilled from profile + recent eras. Single row (id='current').
CREATE TABLE IF NOT EXISTS narrative (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
updated_at TEXT NOT NULL
);
-- Autonomy Core: Lyra's evolving self-state (mood, energy, her own first-person
-- self-narrative, reflections). Stored as a JSON blob. Single row (id='lyra').
CREATE TABLE IF NOT EXISTS self_state (
id TEXT PRIMARY KEY,
data TEXT NOT NULL,
updated_at TEXT NOT NULL
);
-- Lyra's journal: append-only, permanent record of her thoughts. The self_state
-- reflections/metacognition lists are a short rolling window for context; this
-- keeps everything so nothing is lost when those roll over. kind is
-- 'reflection' | 'metacognition' | 'journal' (a deliberate note to herself).
CREATE TABLE IF NOT EXISTS journal (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
kind TEXT NOT NULL,
content TEXT NOT NULL,
source TEXT
);
CREATE INDEX IF NOT EXISTS idx_journal_created ON journal(created_at);
-- Brian's behind-the-scenes feedback on Lyra's outputs (chat replies, reflections,
-- journal/metacognition). Stored as (context, content, rating) — the shape a future
-- fine-tune / preference dataset wants. One row per rated item (re-rating updates it).
CREATE TABLE IF NOT EXISTS ratings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
kind TEXT NOT NULL, -- chat | reflection | metacognition | journal
rating INTEGER NOT NULL, -- +1 (good / want more) or -1 (off / want less)
content TEXT NOT NULL, -- the rated output
context TEXT, -- what prompted it (e.g. the user message for a chat reply)
ref TEXT, -- optional source id (journal id, session id, ...)
note TEXT
);
CREATE INDEX IF NOT EXISTS idx_ratings_created ON ratings(created_at);
"""
_conn: sqlite3.Connection | None = None
_conn_path: Path | None = None
def _connection() -> sqlite3.Connection:
"""Lazily open the SQLite connection. Reopens if LYRA_DB_PATH changed (for tests)."""
global _conn, _conn_path
cfg = load()
if _conn is None or _conn_path != cfg.db_path:
if _conn is not None:
_conn.close()
cfg.db_path.parent.mkdir(parents=True, exist_ok=True)
# check_same_thread=False: the web server runs blocking work in a thread
# pool, so the singleton connection is touched from threads other than
# the one that created it. Safe here under single-user, low-concurrency use.
_conn = sqlite3.connect(cfg.db_path, check_same_thread=False)
_conn.row_factory = sqlite3.Row
# WAL + a busy timeout so a separate dream-cycle process can read/write
# alongside the web server without tripping "database is locked".
_conn.execute("PRAGMA busy_timeout=5000")
_conn.execute("PRAGMA journal_mode=WAL")
_conn.executescript(SCHEMA)
# Migrations for DBs created before a column existed (no-op if present).
for ddl in ("ALTER TABLE sessions ADD COLUMN mode TEXT",):
try:
_conn.execute(ddl)
except sqlite3.OperationalError:
pass
_conn_path = cfg.db_path
return _conn
@dataclass
class Exchange:
id: int
session_id: str
role: str
content: str
created_at: str
score: float | None = None
@dataclass
class Summary:
session_id: str
content: str
last_exchange_id: int
created_at: str # when the gist was generated
session_started_at: str | None = None # when the conversation actually happened
score: float | None = None
@dataclass
class Era:
month: str # "YYYY-MM"
content: str
session_count: int
created_at: str
score: float | None = None
def _to_blob(vec: list[float]) -> bytes:
return np.asarray(vec, dtype=np.float32).tobytes()
def _from_blob(blob: bytes) -> np.ndarray:
return np.frombuffer(blob, dtype=np.float32)
def remember(session_id: str, role: str, content: str) -> int:
"""Embed and persist a single exchange. Returns the new row id."""
[embedding] = llm.embed([content])
now = datetime.now(timezone.utc).isoformat()
conn = _connection()
with conn:
cur = conn.execute(
"INSERT INTO exchanges (session_id, role, content, embedding, created_at) "
"VALUES (?, ?, ?, ?, ?)",
(session_id, role, content, _to_blob(embedding), now),
)
return int(cur.lastrowid)
def add_exchanges_bulk(session_id: str, rows: list[tuple[str, str, list[float], str]]) -> int:
"""Insert many pre-embedded exchanges at once.
Each row is (role, content, embedding, created_at). Used by the importer to
avoid one INSERT (and one embed round-trip) per message. Returns row count.
"""
conn = _connection()
with conn:
conn.executemany(
"INSERT INTO exchanges (session_id, role, content, embedding, created_at) "
"VALUES (?, ?, ?, ?, ?)",
[(session_id, role, content, _to_blob(emb), ca) for role, content, emb, ca in rows],
)
return len(rows)
def recent(session_id: str, n: int = 10) -> list[Exchange]:
"""Last `n` exchanges from a session, oldest first."""
conn = _connection()
rows = conn.execute(
"SELECT id, session_id, role, content, created_at FROM exchanges "
"WHERE session_id = ? ORDER BY id DESC LIMIT ?",
(session_id, n),
).fetchall()
return [
Exchange(
id=r["id"],
session_id=r["session_id"],
role=r["role"],
content=r["content"],
created_at=r["created_at"],
)
for r in reversed(rows)
]
def ensure_session(session_id: str, name: str | None = None) -> None:
"""Create the session row if absent; set its name if one is given."""
now = datetime.now(timezone.utc).isoformat()
conn = _connection()
with conn:
conn.execute(
"INSERT INTO sessions (id, name, created_at) VALUES (?, ?, ?) "
"ON CONFLICT(id) DO NOTHING",
(session_id, name, now),
)
if name is not None:
conn.execute("UPDATE sessions SET name = ? WHERE id = ?", (name, session_id))
def get_session_mode(session_id: str) -> str | None:
"""The session's conversation mode key, or None if unset (caller applies default)."""
conn = _connection()
r = conn.execute("SELECT mode FROM sessions WHERE id = ?", (session_id,)).fetchone()
return r["mode"] if r and r["mode"] else None
def set_session_mode(session_id: str, mode: str) -> None:
"""Persist the session's conversation mode (creating the session row if needed)."""
ensure_session(session_id)
conn = _connection()
with conn:
conn.execute("UPDATE sessions SET mode = ? WHERE id = ?", (mode, session_id))
def list_sessions() -> list[dict]:
"""All known sessions (named rows + any session that has exchanges), newest first."""
conn = _connection()
rows = conn.execute(
"""
SELECT s.id AS id,
s.name AS name,
COALESCE(s.created_at, MIN(e.created_at)) AS created_at
FROM sessions s
LEFT JOIN exchanges e ON e.session_id = s.id
GROUP BY s.id
UNION
SELECT e.session_id AS id, NULL AS name, MIN(e.created_at) AS created_at
FROM exchanges e
WHERE e.session_id NOT IN (SELECT id FROM sessions)
GROUP BY e.session_id
ORDER BY created_at DESC
"""
).fetchall()
return [{"id": r["id"], "name": r["name"]} for r in rows]
def history(session_id: str) -> list[Exchange]:
"""Full conversation for a session, oldest first."""
conn = _connection()
rows = conn.execute(
"SELECT id, session_id, role, content, created_at FROM exchanges "
"WHERE session_id = ? ORDER BY id ASC",
(session_id,),
).fetchall()
return [
Exchange(
id=r["id"],
session_id=r["session_id"],
role=r["role"],
content=r["content"],
created_at=r["created_at"],
)
for r in rows
]
def delete_session(session_id: str) -> None:
"""Remove a session and all its exchanges."""
conn = _connection()
with conn:
conn.execute("DELETE FROM exchanges WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
conn.execute("DELETE FROM summaries WHERE session_id = ?", (session_id,))
def recall(query: str, k: int = 5, session_id: str | None = None) -> list[Exchange]:
"""Top-k exchanges semantically similar to `query`, optionally scoped to a session."""
[q_vec] = llm.embed([query])
q = np.asarray(q_vec, dtype=np.float32)
conn = _connection()
sql = "SELECT id, session_id, role, content, embedding, created_at FROM exchanges"
params: tuple = ()
if session_id is not None:
sql += " WHERE session_id = ?"
params = (session_id,)
rows = conn.execute(sql, params).fetchall()
if not rows:
return []
matrix = np.stack([_from_blob(r["embedding"]) for r in rows])
norms = np.linalg.norm(matrix, axis=1)
scores = (matrix @ q) / (norms * np.linalg.norm(q) + 1e-9)
top_idx = np.argsort(scores)[::-1][:k]
return [
Exchange(
id=rows[i]["id"],
session_id=rows[i]["session_id"],
role=rows[i]["role"],
content=rows[i]["content"],
created_at=rows[i]["created_at"],
score=float(scores[i]),
)
for i in top_idx
]
# --- Summary tier (compacted per-session gists) ---
def store_summary(session_id: str, content: str, last_exchange_id: int) -> None:
"""Embed and persist the gist of a session, replacing any prior summary."""
[embedding] = llm.embed([content])
now = datetime.now(timezone.utc).isoformat()
conn = _connection()
with conn:
conn.execute(
"INSERT INTO summaries (session_id, content, embedding, last_exchange_id, created_at) "
"VALUES (?, ?, ?, ?, ?) "
"ON CONFLICT(session_id) DO UPDATE SET "
"content=excluded.content, embedding=excluded.embedding, "
"last_exchange_id=excluded.last_exchange_id, created_at=excluded.created_at",
(session_id, content, _to_blob(embedding), last_exchange_id, now),
)
def get_summary(session_id: str) -> Summary | None:
conn = _connection()
r = conn.execute(
"SELECT session_id, content, last_exchange_id, created_at, "
"(SELECT MIN(e.created_at) FROM exchanges e WHERE e.session_id = summaries.session_id) "
"AS started_at FROM summaries WHERE session_id = ?",
(session_id,),
).fetchone()
if r is None:
return None
return Summary(
session_id=r["session_id"],
content=r["content"],
last_exchange_id=r["last_exchange_id"],
created_at=r["created_at"],
session_started_at=r["started_at"],
)
def unsummarized_count(session_id: str) -> int:
"""How many exchanges in this session are newer than its current summary."""
conn = _connection()
summary = get_summary(session_id)
cutoff = summary.last_exchange_id if summary else 0
r = conn.execute(
"SELECT COUNT(*) AS n FROM exchanges WHERE session_id = ? AND id > ?",
(session_id, cutoff),
).fetchone()
return int(r["n"])
def list_summaries() -> list[Summary]:
"""Every session gist (for the profile/era consolidation passes)."""
conn = _connection()
rows = conn.execute(
"SELECT session_id, content, last_exchange_id, created_at, "
"(SELECT MIN(e.created_at) FROM exchanges e WHERE e.session_id = summaries.session_id) "
"AS started_at FROM summaries ORDER BY started_at ASC"
).fetchall()
return [
Summary(
session_id=r["session_id"],
content=r["content"],
last_exchange_id=r["last_exchange_id"],
created_at=r["created_at"],
session_started_at=r["started_at"],
)
for r in rows
]
def set_profile(content: str, sessions_covered: int, profile_id: str = "self") -> None:
"""Store/replace the derived semantic profile."""
now = datetime.now(timezone.utc).isoformat()
conn = _connection()
with conn:
conn.execute(
"INSERT INTO profile (id, content, sessions_covered, updated_at) "
"VALUES (?, ?, ?, ?) "
"ON CONFLICT(id) DO UPDATE SET content=excluded.content, "
"sessions_covered=excluded.sessions_covered, updated_at=excluded.updated_at",
(profile_id, content, sessions_covered, now),
)
def get_profile(profile_id: str = "self") -> str | None:
conn = _connection()
r = conn.execute("SELECT content FROM profile WHERE id = ?", (profile_id,)).fetchone()
return r["content"] if r else None
def profile_sessions_covered(profile_id: str = "self") -> int:
"""How many session gists the current profile was built from (0 if none)."""
conn = _connection()
r = conn.execute(
"SELECT sessions_covered FROM profile WHERE id = ?", (profile_id,)
).fetchone()
return int(r["sessions_covered"]) if r else 0
def last_exchange_at() -> str | None:
"""ISO timestamp of the most recent exchange overall (None if there are none).
Used to tell Lyra how long it's been since Brian last said anything — the
gap she perceives between turns and while she's idle between conversations.
"""
conn = _connection()
r = conn.execute("SELECT MAX(created_at) AS m FROM exchanges").fetchone()
return r["m"] if r and r["m"] else None
def backlog_stats(ripe_threshold: int = 20) -> dict:
"""Snapshot of the consolidation backlog, for the dream cycle to sense.
Returns, in one pass over the exchanges: how many sessions have any
unsummarized turns ("dirty"), how many are "ripe" (never summarized, or
>= `ripe_threshold` new turns since their last summary), the total
unsummarized exchanges, and the high-water exchange id (to detect new
activity since the previous cycle).
"""
conn = _connection()
rows = conn.execute(
"""
SELECT
SUM(CASE WHEN e.id > COALESCE(su.last_exchange_id, 0) THEN 1 ELSE 0 END)
AS unsummarized,
(su.session_id IS NULL) AS no_summary
FROM exchanges e
LEFT JOIN summaries su ON su.session_id = e.session_id
GROUP BY e.session_id
"""
).fetchall()
dirty = ripe = unsummarized_total = 0
for r in rows:
u = int(r["unsummarized"] or 0)
unsummarized_total += u
if u > 0:
dirty += 1
if r["no_summary"] or u >= ripe_threshold:
ripe += 1
mx = conn.execute("SELECT COALESCE(MAX(id), 0) AS m FROM exchanges").fetchone()["m"]
return {
"sessions": len(rows),
"dirty": dirty,
"ripe": ripe,
"unsummarized_total": unsummarized_total,
"max_exchange_id": int(mx),
}
# --- Era tier (per-month temporal rollups) ---
def summaries_by_month() -> dict[str, list[str]]:
"""Map "YYYY-MM" -> list of session gists for sessions that occurred that month.
A session's month comes from its earliest exchange timestamp (real ChatGPT
dates for imported sessions), not when it was summarized.
"""
conn = _connection()
rows = conn.execute(
"""
SELECT substr(MIN(e.created_at), 1, 7) AS month, s.content AS content
FROM summaries s JOIN exchanges e ON e.session_id = s.session_id
GROUP BY s.session_id
"""
).fetchall()
out: dict[str, list[str]] = {}
for r in rows:
out.setdefault(r["month"], []).append(r["content"])
return out
def store_era(month: str, content: str, session_count: int) -> None:
"""Embed and persist a month's digest, replacing any prior one."""
[embedding] = llm.embed([content])
now = datetime.now(timezone.utc).isoformat()
conn = _connection()
with conn:
conn.execute(
"INSERT INTO eras (month, content, embedding, session_count, created_at) "
"VALUES (?, ?, ?, ?, ?) "
"ON CONFLICT(month) DO UPDATE SET content=excluded.content, "
"embedding=excluded.embedding, session_count=excluded.session_count, "
"created_at=excluded.created_at",
(month, content, _to_blob(embedding), session_count, now),
)
def list_eras() -> list[Era]:
"""All month digests, chronological."""
conn = _connection()
rows = conn.execute(
"SELECT month, content, session_count, created_at FROM eras ORDER BY month ASC"
).fetchall()
return [
Era(month=r["month"], content=r["content"],
session_count=r["session_count"], created_at=r["created_at"])
for r in rows
]
def set_narrative(content: str, narrative_id: str = "current") -> None:
"""Store/replace the current narrative."""
now = datetime.now(timezone.utc).isoformat()
conn = _connection()
with conn:
conn.execute(
"INSERT INTO narrative (id, content, updated_at) VALUES (?, ?, ?) "
"ON CONFLICT(id) DO UPDATE SET content=excluded.content, updated_at=excluded.updated_at",
(narrative_id, content, now),
)
def get_narrative(narrative_id: str = "current") -> str | None:
conn = _connection()
r = conn.execute("SELECT content FROM narrative WHERE id = ?", (narrative_id,)).fetchone()
return r["content"] if r else None
def get_self_state(state_id: str = "lyra") -> dict | None:
conn = _connection()
r = conn.execute("SELECT data FROM self_state WHERE id = ?", (state_id,)).fetchone()
return json.loads(r["data"]) if r else None
def add_journal_entry(kind: str, content: str, source: str | None = None) -> int:
"""Append a permanent journal entry (never truncated). Returns row id."""
now = datetime.now(timezone.utc).isoformat()
conn = _connection()
with conn:
cur = conn.execute(
"INSERT INTO journal (created_at, kind, content, source) VALUES (?, ?, ?, ?)",
(now, kind, content, source),
)
return int(cur.lastrowid)
def add_rating(kind: str, rating: int, content: str, context: str | None = None,
ref: str | None = None, note: str | None = None) -> int:
"""Record (or replace) Brian's feedback on one Lyra output. One row per item:
re-rating the same content updates it. Returns row id."""
now = datetime.now(timezone.utc).isoformat()
conn = _connection()
with conn:
conn.execute("DELETE FROM ratings WHERE kind = ? AND content = ?", (kind, content))
cur = conn.execute(
"INSERT INTO ratings (created_at, kind, rating, content, context, ref, note) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(now, kind, 1 if rating >= 0 else -1, content, context,
str(ref) if ref is not None else None, note),
)
return int(cur.lastrowid)
def list_ratings(limit: int | None = None) -> list[dict]:
conn = _connection()
sql = "SELECT id, created_at, kind, rating, content, context, ref, note FROM ratings ORDER BY id DESC"
if limit is not None:
sql += f" LIMIT {int(limit)}"
return [dict(r) for r in conn.execute(sql).fetchall()]
def rating_counts() -> dict:
conn = _connection()
r = conn.execute(
"SELECT COUNT(*) AS total, "
"COALESCE(SUM(CASE WHEN rating > 0 THEN 1 ELSE 0 END), 0) AS up, "
"COALESCE(SUM(CASE WHEN rating < 0 THEN 1 ELSE 0 END), 0) AS down FROM ratings"
).fetchone()
return {"total": r["total"], "up": r["up"], "down": r["down"]}
def list_journal(limit: int | None = None, kinds: tuple[str, ...] | None = None) -> list[dict]:
"""Journal entries, newest first. Optionally filter by kind."""
conn = _connection()
sql = "SELECT id, created_at, kind, content, source FROM journal"
params: list = []
if kinds:
sql += " WHERE kind IN (%s)" % ",".join("?" * len(kinds))
params += list(kinds)
sql += " ORDER BY id DESC"
if limit is not None:
sql += " LIMIT ?"
params.append(limit)
return [dict(r) for r in conn.execute(sql, params).fetchall()]
def self_state_updated_at(state_id: str = "lyra") -> str | None:
"""ISO timestamp her self-state was last written (None if never)."""
conn = _connection()
r = conn.execute(
"SELECT updated_at FROM self_state WHERE id = ?", (state_id,)
).fetchone()
return r["updated_at"] if r else None
def set_self_state(state: dict, state_id: str = "lyra") -> None:
now = datetime.now(timezone.utc).isoformat()
conn = _connection()
with conn:
conn.execute(
"INSERT INTO self_state (id, data, updated_at) VALUES (?, ?, ?) "
"ON CONFLICT(id) DO UPDATE SET data=excluded.data, updated_at=excluded.updated_at",
(state_id, json.dumps(state), now),
)
def recall_eras(query: str, k: int = 2) -> list[Era]:
"""Top-k month digests most similar to `query` (time-based context)."""
[q_vec] = llm.embed([query])
q = np.asarray(q_vec, dtype=np.float32)
conn = _connection()
rows = conn.execute(
"SELECT month, content, embedding, session_count, created_at FROM eras"
).fetchall()
if not rows:
return []
matrix = np.stack([_from_blob(r["embedding"]) for r in rows])
norms = np.linalg.norm(matrix, axis=1)
scores = (matrix @ q) / (norms * np.linalg.norm(q) + 1e-9)
top_idx = np.argsort(scores)[::-1][:k]
return [
Era(month=rows[i]["month"], content=rows[i]["content"],
session_count=rows[i]["session_count"], created_at=rows[i]["created_at"],
score=float(scores[i]))
for i in top_idx
]
def recall_summaries(query: str, k: int = 3, exclude_session: str | None = None) -> list[Summary]:
"""Top-k session summaries most similar to `query` (the long-term gist tier)."""
[q_vec] = llm.embed([query])
q = np.asarray(q_vec, dtype=np.float32)
conn = _connection()
sql = (
"SELECT session_id, content, embedding, last_exchange_id, created_at, "
"(SELECT MIN(e.created_at) FROM exchanges e WHERE e.session_id = summaries.session_id) "
"AS started_at FROM summaries"
)
params: tuple = ()
if exclude_session is not None:
sql += " WHERE session_id != ?"
params = (exclude_session,)
rows = conn.execute(sql, params).fetchall()
if not rows:
return []
matrix = np.stack([_from_blob(r["embedding"]) for r in rows])
norms = np.linalg.norm(matrix, axis=1)
scores = (matrix @ q) / (norms * np.linalg.norm(q) + 1e-9)
top_idx = np.argsort(scores)[::-1][:k]
return [
Summary(
session_id=rows[i]["session_id"],
content=rows[i]["content"],
last_exchange_id=rows[i]["last_exchange_id"],
created_at=rows[i]["created_at"],
session_started_at=rows[i]["started_at"],
score=float(scores[i]),
)
for i in top_idx
]