feat: ChatGPT chat-log importer
Import the parser's {title, messages} JSON into Lyra's memory so past
conversations seed recall (and, later, the era-rollup tier).
- lyra/ingest.py: one conversation -> one session, text messages -> exchanges;
skips non-text (image asset) messages and non user/assistant roles; embeddings
batched; idempotent by filename-derived session id; `lyra-import <dir>` CLI
- memory.add_exchanges_bulk: batched insert of pre-embedded rows
Format has no timestamps yet, so imports are stamped at import time; a future
dated export will let era memory group by real calendar time.
Verified on the 68-file lyra dev set: 7519 exchanges, idempotent re-run, recall
returns relevant history.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,93 @@
|
||||
"""Import parsed ChatGPT chat logs into Lyra's memory.
|
||||
|
||||
Consumes the parser's `{"title": ..., "messages": [{"role", "content"}]}` format
|
||||
(one JSON file per conversation). Each conversation becomes a Lyra session; each
|
||||
text message becomes an exchange. Embeddings are batched. Import is idempotent —
|
||||
a conversation already present (by session id) is skipped.
|
||||
|
||||
Timestamps: this format carries no dates, so imported exchanges are stamped with
|
||||
`created_at` (default: now). A future timestamped export will let era memory group
|
||||
by real calendar time; pass real per-message dates then.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from lyra import llm, logbus, memory
|
||||
|
||||
EMBED_BATCH = 64
|
||||
EMBED_CHAR_CAP = 6000 # cap embed input size; full content is still stored
|
||||
|
||||
|
||||
def _session_id(path: Path) -> str:
|
||||
"""Stable id derived from the filename, so re-imports don't duplicate."""
|
||||
return "import-" + path.stem
|
||||
|
||||
|
||||
def _clean_messages(messages: list[dict]) -> list[tuple[str, str]]:
|
||||
out: list[tuple[str, str]] = []
|
||||
for m in messages:
|
||||
role = m.get("role")
|
||||
if role not in ("user", "assistant"):
|
||||
continue
|
||||
content = (m.get("content") or "").strip()
|
||||
if not content or content.startswith('{"content_type"'): # skip empty / image assets
|
||||
continue
|
||||
out.append((role, content))
|
||||
return out
|
||||
|
||||
|
||||
def import_file(path: Path, created_at: str) -> int:
|
||||
"""Import one conversation file. Returns exchanges added (0 if skipped/empty)."""
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
session_id = _session_id(path)
|
||||
if memory.history(session_id): # already imported
|
||||
return 0
|
||||
|
||||
msgs = _clean_messages(data.get("messages", []))
|
||||
if not msgs:
|
||||
return 0
|
||||
|
||||
memory.ensure_session(session_id, name=data.get("title") or path.stem)
|
||||
|
||||
rows: list[tuple[str, str, list[float], str]] = []
|
||||
for i in range(0, len(msgs), EMBED_BATCH):
|
||||
batch = msgs[i : i + EMBED_BATCH]
|
||||
embeddings = llm.embed([content[:EMBED_CHAR_CAP] for _, content in batch])
|
||||
for (role, content), emb in zip(batch, embeddings):
|
||||
rows.append((role, content, emb, created_at))
|
||||
|
||||
return memory.add_exchanges_bulk(session_id, rows)
|
||||
|
||||
|
||||
def import_dir(dirpath: str | Path, created_at: str | None = None) -> dict:
|
||||
"""Import every *.json under dirpath (recursively). Returns a small report."""
|
||||
created_at = created_at or datetime.now(timezone.utc).isoformat()
|
||||
files = sorted(Path(dirpath).rglob("*.json"))
|
||||
sessions, exchanges = 0, 0
|
||||
for path in files:
|
||||
added = import_file(path, created_at)
|
||||
if added:
|
||||
sessions += 1
|
||||
exchanges += added
|
||||
logbus.log(
|
||||
"info", "import complete", dir=str(dirpath),
|
||||
files=len(files), sessions=sessions, exchanges=exchanges,
|
||||
)
|
||||
return {"files": len(files), "sessions_imported": sessions, "exchanges": exchanges}
|
||||
|
||||
|
||||
def main() -> int:
|
||||
if len(sys.argv) < 2:
|
||||
print("usage: lyra-import <dir-of-chat-json>", file=sys.stderr)
|
||||
return 2
|
||||
report = import_dir(sys.argv[1])
|
||||
print(report)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -108,6 +108,22 @@ def remember(session_id: str, role: str, content: str) -> int:
|
||||
return int(cur.lastrowid)
|
||||
|
||||
|
||||
def add_exchanges_bulk(session_id: str, rows: list[tuple[str, str, list[float], str]]) -> int:
|
||||
"""Insert many pre-embedded exchanges at once.
|
||||
|
||||
Each row is (role, content, embedding, created_at). Used by the importer to
|
||||
avoid one INSERT (and one embed round-trip) per message. Returns row count.
|
||||
"""
|
||||
conn = _connection()
|
||||
with conn:
|
||||
conn.executemany(
|
||||
"INSERT INTO exchanges (session_id, role, content, embedding, created_at) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
[(session_id, role, content, _to_blob(emb), ca) for role, content, emb, ca in rows],
|
||||
)
|
||||
return len(rows)
|
||||
|
||||
|
||||
def recent(session_id: str, n: int = 10) -> list[Exchange]:
|
||||
"""Last `n` exchanges from a session, oldest first."""
|
||||
conn = _connection()
|
||||
|
||||
Reference in New Issue
Block a user