59d684b12b
Her reflections/metacognition were capped rolling windows (6/5), so older thoughts were lost for good. Now everything she produces is also appended to a permanent, append-only journal; the capped lists stay as her working-memory window for context. - memory: journal table + add_journal_entry/list_journal - reflect(): persists every committed reflection + critique to the journal, and the examine step gains a "journal" field — a deliberate, first-person note she writes for herself (her knowing journaling), tagged by source (dream/manual) - web: /journal diary view (kind filters, grouped by day) + /journal/data; linked from /self - tests assert reflections + metacognition land in the journal Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
637 lines
22 KiB
Python
637 lines
22 KiB
Python
"""Persistent memory: SQLite storage + brute-force cosine recall over embeddings.
|
|
|
|
Each exchange is stored with its OpenAI embedding as a float32 BLOB. Recall
|
|
loads all embeddings (optionally scoped to a session) into a matrix and
|
|
returns the top-k by cosine similarity. Brute force is fine up to tens of
|
|
thousands of rows; swap in a vector index when that stops being true.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sqlite3
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
|
|
from lyra import llm
|
|
from lyra.config import load
|
|
|
|
SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS exchanges (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
session_id TEXT NOT NULL,
|
|
role TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
embedding BLOB NOT NULL,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_session_created ON exchanges(session_id, created_at);
|
|
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
|
|
-- One compacted "gist" per session. last_exchange_id marks how far the summary
|
|
-- covers, so we know when enough new turns have accumulated to re-summarize.
|
|
CREATE TABLE IF NOT EXISTS summaries (
|
|
session_id TEXT PRIMARY KEY,
|
|
content TEXT NOT NULL,
|
|
embedding BLOB NOT NULL,
|
|
last_exchange_id INTEGER NOT NULL,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
|
|
-- Derived semantic memory: standing facts about the user, distilled from the
|
|
-- session gists by the consolidation pass. Single row (id='self').
|
|
CREATE TABLE IF NOT EXISTS profile (
|
|
id TEXT PRIMARY KEY,
|
|
content TEXT NOT NULL,
|
|
sessions_covered INTEGER NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
-- Temporal memory: one "what was happening" digest per calendar month, rolled
|
|
-- up from that month's session gists. month is "YYYY-MM".
|
|
CREATE TABLE IF NOT EXISTS eras (
|
|
month TEXT PRIMARY KEY,
|
|
content TEXT NOT NULL,
|
|
embedding BLOB NOT NULL,
|
|
session_count INTEGER NOT NULL,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
|
|
-- The current narrative: time-aware arc/trends/callbacks (vs the timeless
|
|
-- profile). Distilled from profile + recent eras. Single row (id='current').
|
|
CREATE TABLE IF NOT EXISTS narrative (
|
|
id TEXT PRIMARY KEY,
|
|
content TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
-- Autonomy Core: Lyra's evolving self-state (mood, energy, her own first-person
|
|
-- self-narrative, reflections). Stored as a JSON blob. Single row (id='lyra').
|
|
CREATE TABLE IF NOT EXISTS self_state (
|
|
id TEXT PRIMARY KEY,
|
|
data TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
-- Lyra's journal: append-only, permanent record of her thoughts. The self_state
|
|
-- reflections/metacognition lists are a short rolling window for context; this
|
|
-- keeps everything so nothing is lost when those roll over. kind is
|
|
-- 'reflection' | 'metacognition' | 'journal' (a deliberate note to herself).
|
|
CREATE TABLE IF NOT EXISTS journal (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
created_at TEXT NOT NULL,
|
|
kind TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
source TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_journal_created ON journal(created_at);
|
|
"""
|
|
|
|
_conn: sqlite3.Connection | None = None
|
|
_conn_path: Path | None = None
|
|
|
|
|
|
def _connection() -> sqlite3.Connection:
|
|
"""Lazily open the SQLite connection. Reopens if LYRA_DB_PATH changed (for tests)."""
|
|
global _conn, _conn_path
|
|
cfg = load()
|
|
if _conn is None or _conn_path != cfg.db_path:
|
|
if _conn is not None:
|
|
_conn.close()
|
|
cfg.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
# check_same_thread=False: the web server runs blocking work in a thread
|
|
# pool, so the singleton connection is touched from threads other than
|
|
# the one that created it. Safe here under single-user, low-concurrency use.
|
|
_conn = sqlite3.connect(cfg.db_path, check_same_thread=False)
|
|
_conn.row_factory = sqlite3.Row
|
|
# WAL + a busy timeout so a separate dream-cycle process can read/write
|
|
# alongside the web server without tripping "database is locked".
|
|
_conn.execute("PRAGMA busy_timeout=5000")
|
|
_conn.execute("PRAGMA journal_mode=WAL")
|
|
_conn.executescript(SCHEMA)
|
|
_conn_path = cfg.db_path
|
|
return _conn
|
|
|
|
|
|
@dataclass
|
|
class Exchange:
|
|
id: int
|
|
session_id: str
|
|
role: str
|
|
content: str
|
|
created_at: str
|
|
score: float | None = None
|
|
|
|
|
|
@dataclass
|
|
class Summary:
|
|
session_id: str
|
|
content: str
|
|
last_exchange_id: int
|
|
created_at: str # when the gist was generated
|
|
session_started_at: str | None = None # when the conversation actually happened
|
|
score: float | None = None
|
|
|
|
|
|
@dataclass
|
|
class Era:
|
|
month: str # "YYYY-MM"
|
|
content: str
|
|
session_count: int
|
|
created_at: str
|
|
score: float | None = None
|
|
|
|
|
|
def _to_blob(vec: list[float]) -> bytes:
|
|
return np.asarray(vec, dtype=np.float32).tobytes()
|
|
|
|
|
|
def _from_blob(blob: bytes) -> np.ndarray:
|
|
return np.frombuffer(blob, dtype=np.float32)
|
|
|
|
|
|
def remember(session_id: str, role: str, content: str) -> int:
|
|
"""Embed and persist a single exchange. Returns the new row id."""
|
|
[embedding] = llm.embed([content])
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
conn = _connection()
|
|
with conn:
|
|
cur = conn.execute(
|
|
"INSERT INTO exchanges (session_id, role, content, embedding, created_at) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(session_id, role, content, _to_blob(embedding), now),
|
|
)
|
|
return int(cur.lastrowid)
|
|
|
|
|
|
def add_exchanges_bulk(session_id: str, rows: list[tuple[str, str, list[float], str]]) -> int:
|
|
"""Insert many pre-embedded exchanges at once.
|
|
|
|
Each row is (role, content, embedding, created_at). Used by the importer to
|
|
avoid one INSERT (and one embed round-trip) per message. Returns row count.
|
|
"""
|
|
conn = _connection()
|
|
with conn:
|
|
conn.executemany(
|
|
"INSERT INTO exchanges (session_id, role, content, embedding, created_at) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
[(session_id, role, content, _to_blob(emb), ca) for role, content, emb, ca in rows],
|
|
)
|
|
return len(rows)
|
|
|
|
|
|
def recent(session_id: str, n: int = 10) -> list[Exchange]:
|
|
"""Last `n` exchanges from a session, oldest first."""
|
|
conn = _connection()
|
|
rows = conn.execute(
|
|
"SELECT id, session_id, role, content, created_at FROM exchanges "
|
|
"WHERE session_id = ? ORDER BY id DESC LIMIT ?",
|
|
(session_id, n),
|
|
).fetchall()
|
|
return [
|
|
Exchange(
|
|
id=r["id"],
|
|
session_id=r["session_id"],
|
|
role=r["role"],
|
|
content=r["content"],
|
|
created_at=r["created_at"],
|
|
)
|
|
for r in reversed(rows)
|
|
]
|
|
|
|
|
|
def ensure_session(session_id: str, name: str | None = None) -> None:
|
|
"""Create the session row if absent; set its name if one is given."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
conn = _connection()
|
|
with conn:
|
|
conn.execute(
|
|
"INSERT INTO sessions (id, name, created_at) VALUES (?, ?, ?) "
|
|
"ON CONFLICT(id) DO NOTHING",
|
|
(session_id, name, now),
|
|
)
|
|
if name is not None:
|
|
conn.execute("UPDATE sessions SET name = ? WHERE id = ?", (name, session_id))
|
|
|
|
|
|
def list_sessions() -> list[dict]:
|
|
"""All known sessions (named rows + any session that has exchanges), newest first."""
|
|
conn = _connection()
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT s.id AS id,
|
|
s.name AS name,
|
|
COALESCE(s.created_at, MIN(e.created_at)) AS created_at
|
|
FROM sessions s
|
|
LEFT JOIN exchanges e ON e.session_id = s.id
|
|
GROUP BY s.id
|
|
UNION
|
|
SELECT e.session_id AS id, NULL AS name, MIN(e.created_at) AS created_at
|
|
FROM exchanges e
|
|
WHERE e.session_id NOT IN (SELECT id FROM sessions)
|
|
GROUP BY e.session_id
|
|
ORDER BY created_at DESC
|
|
"""
|
|
).fetchall()
|
|
return [{"id": r["id"], "name": r["name"]} for r in rows]
|
|
|
|
|
|
def history(session_id: str) -> list[Exchange]:
|
|
"""Full conversation for a session, oldest first."""
|
|
conn = _connection()
|
|
rows = conn.execute(
|
|
"SELECT id, session_id, role, content, created_at FROM exchanges "
|
|
"WHERE session_id = ? ORDER BY id ASC",
|
|
(session_id,),
|
|
).fetchall()
|
|
return [
|
|
Exchange(
|
|
id=r["id"],
|
|
session_id=r["session_id"],
|
|
role=r["role"],
|
|
content=r["content"],
|
|
created_at=r["created_at"],
|
|
)
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def delete_session(session_id: str) -> None:
|
|
"""Remove a session and all its exchanges."""
|
|
conn = _connection()
|
|
with conn:
|
|
conn.execute("DELETE FROM exchanges WHERE session_id = ?", (session_id,))
|
|
conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
|
|
conn.execute("DELETE FROM summaries WHERE session_id = ?", (session_id,))
|
|
|
|
|
|
def recall(query: str, k: int = 5, session_id: str | None = None) -> list[Exchange]:
|
|
"""Top-k exchanges semantically similar to `query`, optionally scoped to a session."""
|
|
[q_vec] = llm.embed([query])
|
|
q = np.asarray(q_vec, dtype=np.float32)
|
|
|
|
conn = _connection()
|
|
sql = "SELECT id, session_id, role, content, embedding, created_at FROM exchanges"
|
|
params: tuple = ()
|
|
if session_id is not None:
|
|
sql += " WHERE session_id = ?"
|
|
params = (session_id,)
|
|
rows = conn.execute(sql, params).fetchall()
|
|
if not rows:
|
|
return []
|
|
|
|
matrix = np.stack([_from_blob(r["embedding"]) for r in rows])
|
|
norms = np.linalg.norm(matrix, axis=1)
|
|
scores = (matrix @ q) / (norms * np.linalg.norm(q) + 1e-9)
|
|
|
|
top_idx = np.argsort(scores)[::-1][:k]
|
|
return [
|
|
Exchange(
|
|
id=rows[i]["id"],
|
|
session_id=rows[i]["session_id"],
|
|
role=rows[i]["role"],
|
|
content=rows[i]["content"],
|
|
created_at=rows[i]["created_at"],
|
|
score=float(scores[i]),
|
|
)
|
|
for i in top_idx
|
|
]
|
|
|
|
|
|
# --- Summary tier (compacted per-session gists) ---
|
|
|
|
|
|
def store_summary(session_id: str, content: str, last_exchange_id: int) -> None:
|
|
"""Embed and persist the gist of a session, replacing any prior summary."""
|
|
[embedding] = llm.embed([content])
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
conn = _connection()
|
|
with conn:
|
|
conn.execute(
|
|
"INSERT INTO summaries (session_id, content, embedding, last_exchange_id, created_at) "
|
|
"VALUES (?, ?, ?, ?, ?) "
|
|
"ON CONFLICT(session_id) DO UPDATE SET "
|
|
"content=excluded.content, embedding=excluded.embedding, "
|
|
"last_exchange_id=excluded.last_exchange_id, created_at=excluded.created_at",
|
|
(session_id, content, _to_blob(embedding), last_exchange_id, now),
|
|
)
|
|
|
|
|
|
def get_summary(session_id: str) -> Summary | None:
|
|
conn = _connection()
|
|
r = conn.execute(
|
|
"SELECT session_id, content, last_exchange_id, created_at, "
|
|
"(SELECT MIN(e.created_at) FROM exchanges e WHERE e.session_id = summaries.session_id) "
|
|
"AS started_at FROM summaries WHERE session_id = ?",
|
|
(session_id,),
|
|
).fetchone()
|
|
if r is None:
|
|
return None
|
|
return Summary(
|
|
session_id=r["session_id"],
|
|
content=r["content"],
|
|
last_exchange_id=r["last_exchange_id"],
|
|
created_at=r["created_at"],
|
|
session_started_at=r["started_at"],
|
|
)
|
|
|
|
|
|
def unsummarized_count(session_id: str) -> int:
|
|
"""How many exchanges in this session are newer than its current summary."""
|
|
conn = _connection()
|
|
summary = get_summary(session_id)
|
|
cutoff = summary.last_exchange_id if summary else 0
|
|
r = conn.execute(
|
|
"SELECT COUNT(*) AS n FROM exchanges WHERE session_id = ? AND id > ?",
|
|
(session_id, cutoff),
|
|
).fetchone()
|
|
return int(r["n"])
|
|
|
|
|
|
def list_summaries() -> list[Summary]:
|
|
"""Every session gist (for the profile/era consolidation passes)."""
|
|
conn = _connection()
|
|
rows = conn.execute(
|
|
"SELECT session_id, content, last_exchange_id, created_at, "
|
|
"(SELECT MIN(e.created_at) FROM exchanges e WHERE e.session_id = summaries.session_id) "
|
|
"AS started_at FROM summaries ORDER BY started_at ASC"
|
|
).fetchall()
|
|
return [
|
|
Summary(
|
|
session_id=r["session_id"],
|
|
content=r["content"],
|
|
last_exchange_id=r["last_exchange_id"],
|
|
created_at=r["created_at"],
|
|
session_started_at=r["started_at"],
|
|
)
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def set_profile(content: str, sessions_covered: int, profile_id: str = "self") -> None:
|
|
"""Store/replace the derived semantic profile."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
conn = _connection()
|
|
with conn:
|
|
conn.execute(
|
|
"INSERT INTO profile (id, content, sessions_covered, updated_at) "
|
|
"VALUES (?, ?, ?, ?) "
|
|
"ON CONFLICT(id) DO UPDATE SET content=excluded.content, "
|
|
"sessions_covered=excluded.sessions_covered, updated_at=excluded.updated_at",
|
|
(profile_id, content, sessions_covered, now),
|
|
)
|
|
|
|
|
|
def get_profile(profile_id: str = "self") -> str | None:
|
|
conn = _connection()
|
|
r = conn.execute("SELECT content FROM profile WHERE id = ?", (profile_id,)).fetchone()
|
|
return r["content"] if r else None
|
|
|
|
|
|
def profile_sessions_covered(profile_id: str = "self") -> int:
|
|
"""How many session gists the current profile was built from (0 if none)."""
|
|
conn = _connection()
|
|
r = conn.execute(
|
|
"SELECT sessions_covered FROM profile WHERE id = ?", (profile_id,)
|
|
).fetchone()
|
|
return int(r["sessions_covered"]) if r else 0
|
|
|
|
|
|
def last_exchange_at() -> str | None:
|
|
"""ISO timestamp of the most recent exchange overall (None if there are none).
|
|
|
|
Used to tell Lyra how long it's been since Brian last said anything — the
|
|
gap she perceives between turns and while she's idle between conversations.
|
|
"""
|
|
conn = _connection()
|
|
r = conn.execute("SELECT MAX(created_at) AS m FROM exchanges").fetchone()
|
|
return r["m"] if r and r["m"] else None
|
|
|
|
|
|
def backlog_stats(ripe_threshold: int = 20) -> dict:
|
|
"""Snapshot of the consolidation backlog, for the dream cycle to sense.
|
|
|
|
Returns, in one pass over the exchanges: how many sessions have any
|
|
unsummarized turns ("dirty"), how many are "ripe" (never summarized, or
|
|
>= `ripe_threshold` new turns since their last summary), the total
|
|
unsummarized exchanges, and the high-water exchange id (to detect new
|
|
activity since the previous cycle).
|
|
"""
|
|
conn = _connection()
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT
|
|
SUM(CASE WHEN e.id > COALESCE(su.last_exchange_id, 0) THEN 1 ELSE 0 END)
|
|
AS unsummarized,
|
|
(su.session_id IS NULL) AS no_summary
|
|
FROM exchanges e
|
|
LEFT JOIN summaries su ON su.session_id = e.session_id
|
|
GROUP BY e.session_id
|
|
"""
|
|
).fetchall()
|
|
dirty = ripe = unsummarized_total = 0
|
|
for r in rows:
|
|
u = int(r["unsummarized"] or 0)
|
|
unsummarized_total += u
|
|
if u > 0:
|
|
dirty += 1
|
|
if r["no_summary"] or u >= ripe_threshold:
|
|
ripe += 1
|
|
mx = conn.execute("SELECT COALESCE(MAX(id), 0) AS m FROM exchanges").fetchone()["m"]
|
|
return {
|
|
"sessions": len(rows),
|
|
"dirty": dirty,
|
|
"ripe": ripe,
|
|
"unsummarized_total": unsummarized_total,
|
|
"max_exchange_id": int(mx),
|
|
}
|
|
|
|
|
|
# --- Era tier (per-month temporal rollups) ---
|
|
|
|
|
|
def summaries_by_month() -> dict[str, list[str]]:
|
|
"""Map "YYYY-MM" -> list of session gists for sessions that occurred that month.
|
|
|
|
A session's month comes from its earliest exchange timestamp (real ChatGPT
|
|
dates for imported sessions), not when it was summarized.
|
|
"""
|
|
conn = _connection()
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT substr(MIN(e.created_at), 1, 7) AS month, s.content AS content
|
|
FROM summaries s JOIN exchanges e ON e.session_id = s.session_id
|
|
GROUP BY s.session_id
|
|
"""
|
|
).fetchall()
|
|
out: dict[str, list[str]] = {}
|
|
for r in rows:
|
|
out.setdefault(r["month"], []).append(r["content"])
|
|
return out
|
|
|
|
|
|
def store_era(month: str, content: str, session_count: int) -> None:
|
|
"""Embed and persist a month's digest, replacing any prior one."""
|
|
[embedding] = llm.embed([content])
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
conn = _connection()
|
|
with conn:
|
|
conn.execute(
|
|
"INSERT INTO eras (month, content, embedding, session_count, created_at) "
|
|
"VALUES (?, ?, ?, ?, ?) "
|
|
"ON CONFLICT(month) DO UPDATE SET content=excluded.content, "
|
|
"embedding=excluded.embedding, session_count=excluded.session_count, "
|
|
"created_at=excluded.created_at",
|
|
(month, content, _to_blob(embedding), session_count, now),
|
|
)
|
|
|
|
|
|
def list_eras() -> list[Era]:
|
|
"""All month digests, chronological."""
|
|
conn = _connection()
|
|
rows = conn.execute(
|
|
"SELECT month, content, session_count, created_at FROM eras ORDER BY month ASC"
|
|
).fetchall()
|
|
return [
|
|
Era(month=r["month"], content=r["content"],
|
|
session_count=r["session_count"], created_at=r["created_at"])
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def set_narrative(content: str, narrative_id: str = "current") -> None:
|
|
"""Store/replace the current narrative."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
conn = _connection()
|
|
with conn:
|
|
conn.execute(
|
|
"INSERT INTO narrative (id, content, updated_at) VALUES (?, ?, ?) "
|
|
"ON CONFLICT(id) DO UPDATE SET content=excluded.content, updated_at=excluded.updated_at",
|
|
(narrative_id, content, now),
|
|
)
|
|
|
|
|
|
def get_narrative(narrative_id: str = "current") -> str | None:
|
|
conn = _connection()
|
|
r = conn.execute("SELECT content FROM narrative WHERE id = ?", (narrative_id,)).fetchone()
|
|
return r["content"] if r else None
|
|
|
|
|
|
def get_self_state(state_id: str = "lyra") -> dict | None:
|
|
conn = _connection()
|
|
r = conn.execute("SELECT data FROM self_state WHERE id = ?", (state_id,)).fetchone()
|
|
return json.loads(r["data"]) if r else None
|
|
|
|
|
|
def add_journal_entry(kind: str, content: str, source: str | None = None) -> int:
|
|
"""Append a permanent journal entry (never truncated). Returns row id."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
conn = _connection()
|
|
with conn:
|
|
cur = conn.execute(
|
|
"INSERT INTO journal (created_at, kind, content, source) VALUES (?, ?, ?, ?)",
|
|
(now, kind, content, source),
|
|
)
|
|
return int(cur.lastrowid)
|
|
|
|
|
|
def list_journal(limit: int | None = None, kinds: tuple[str, ...] | None = None) -> list[dict]:
|
|
"""Journal entries, newest first. Optionally filter by kind."""
|
|
conn = _connection()
|
|
sql = "SELECT id, created_at, kind, content, source FROM journal"
|
|
params: list = []
|
|
if kinds:
|
|
sql += " WHERE kind IN (%s)" % ",".join("?" * len(kinds))
|
|
params += list(kinds)
|
|
sql += " ORDER BY id DESC"
|
|
if limit is not None:
|
|
sql += " LIMIT ?"
|
|
params.append(limit)
|
|
return [dict(r) for r in conn.execute(sql, params).fetchall()]
|
|
|
|
|
|
def self_state_updated_at(state_id: str = "lyra") -> str | None:
|
|
"""ISO timestamp her self-state was last written (None if never)."""
|
|
conn = _connection()
|
|
r = conn.execute(
|
|
"SELECT updated_at FROM self_state WHERE id = ?", (state_id,)
|
|
).fetchone()
|
|
return r["updated_at"] if r else None
|
|
|
|
|
|
def set_self_state(state: dict, state_id: str = "lyra") -> None:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
conn = _connection()
|
|
with conn:
|
|
conn.execute(
|
|
"INSERT INTO self_state (id, data, updated_at) VALUES (?, ?, ?) "
|
|
"ON CONFLICT(id) DO UPDATE SET data=excluded.data, updated_at=excluded.updated_at",
|
|
(state_id, json.dumps(state), now),
|
|
)
|
|
|
|
|
|
def recall_eras(query: str, k: int = 2) -> list[Era]:
|
|
"""Top-k month digests most similar to `query` (time-based context)."""
|
|
[q_vec] = llm.embed([query])
|
|
q = np.asarray(q_vec, dtype=np.float32)
|
|
conn = _connection()
|
|
rows = conn.execute(
|
|
"SELECT month, content, embedding, session_count, created_at FROM eras"
|
|
).fetchall()
|
|
if not rows:
|
|
return []
|
|
matrix = np.stack([_from_blob(r["embedding"]) for r in rows])
|
|
norms = np.linalg.norm(matrix, axis=1)
|
|
scores = (matrix @ q) / (norms * np.linalg.norm(q) + 1e-9)
|
|
top_idx = np.argsort(scores)[::-1][:k]
|
|
return [
|
|
Era(month=rows[i]["month"], content=rows[i]["content"],
|
|
session_count=rows[i]["session_count"], created_at=rows[i]["created_at"],
|
|
score=float(scores[i]))
|
|
for i in top_idx
|
|
]
|
|
|
|
|
|
def recall_summaries(query: str, k: int = 3, exclude_session: str | None = None) -> list[Summary]:
|
|
"""Top-k session summaries most similar to `query` (the long-term gist tier)."""
|
|
[q_vec] = llm.embed([query])
|
|
q = np.asarray(q_vec, dtype=np.float32)
|
|
|
|
conn = _connection()
|
|
sql = (
|
|
"SELECT session_id, content, embedding, last_exchange_id, created_at, "
|
|
"(SELECT MIN(e.created_at) FROM exchanges e WHERE e.session_id = summaries.session_id) "
|
|
"AS started_at FROM summaries"
|
|
)
|
|
params: tuple = ()
|
|
if exclude_session is not None:
|
|
sql += " WHERE session_id != ?"
|
|
params = (exclude_session,)
|
|
rows = conn.execute(sql, params).fetchall()
|
|
if not rows:
|
|
return []
|
|
|
|
matrix = np.stack([_from_blob(r["embedding"]) for r in rows])
|
|
norms = np.linalg.norm(matrix, axis=1)
|
|
scores = (matrix @ q) / (norms * np.linalg.norm(q) + 1e-9)
|
|
|
|
top_idx = np.argsort(scores)[::-1][:k]
|
|
return [
|
|
Summary(
|
|
session_id=rows[i]["session_id"],
|
|
content=rows[i]["content"],
|
|
last_exchange_id=rows[i]["last_exchange_id"],
|
|
created_at=rows[i]["created_at"],
|
|
session_started_at=rows[i]["started_at"],
|
|
score=float(scores[i]),
|
|
)
|
|
for i in top_idx
|
|
]
|