diff --git a/lyra/ingest.py b/lyra/ingest.py new file mode 100644 index 0000000..7f6ea5b --- /dev/null +++ b/lyra/ingest.py @@ -0,0 +1,93 @@ +"""Import parsed ChatGPT chat logs into Lyra's memory. + +Consumes the parser's `{"title": ..., "messages": [{"role", "content"}]}` format +(one JSON file per conversation). Each conversation becomes a Lyra session; each +text message becomes an exchange. Embeddings are batched. Import is idempotent — +a conversation already present (by session id) is skipped. + +Timestamps: this format carries no dates, so imported exchanges are stamped with +`created_at` (default: now). A future timestamped export will let era memory group +by real calendar time; pass real per-message dates then. +""" +from __future__ import annotations + +import json +import sys +from datetime import datetime, timezone +from pathlib import Path + +from lyra import llm, logbus, memory + +EMBED_BATCH = 64 +EMBED_CHAR_CAP = 6000 # cap embed input size; full content is still stored + + +def _session_id(path: Path) -> str: + """Stable id derived from the filename, so re-imports don't duplicate.""" + return "import-" + path.stem + + +def _clean_messages(messages: list[dict]) -> list[tuple[str, str]]: + out: list[tuple[str, str]] = [] + for m in messages: + role = m.get("role") + if role not in ("user", "assistant"): + continue + content = (m.get("content") or "").strip() + if not content or content.startswith('{"content_type"'): # skip empty / image assets + continue + out.append((role, content)) + return out + + +def import_file(path: Path, created_at: str) -> int: + """Import one conversation file. Returns exchanges added (0 if skipped/empty).""" + data = json.loads(path.read_text(encoding="utf-8")) + session_id = _session_id(path) + if memory.history(session_id): # already imported + return 0 + + msgs = _clean_messages(data.get("messages", [])) + if not msgs: + return 0 + + memory.ensure_session(session_id, name=data.get("title") or path.stem) + + rows: list[tuple[str, str, list[float], str]] = [] + for i in range(0, len(msgs), EMBED_BATCH): + batch = msgs[i : i + EMBED_BATCH] + embeddings = llm.embed([content[:EMBED_CHAR_CAP] for _, content in batch]) + for (role, content), emb in zip(batch, embeddings): + rows.append((role, content, emb, created_at)) + + return memory.add_exchanges_bulk(session_id, rows) + + +def import_dir(dirpath: str | Path, created_at: str | None = None) -> dict: + """Import every *.json under dirpath (recursively). Returns a small report.""" + created_at = created_at or datetime.now(timezone.utc).isoformat() + files = sorted(Path(dirpath).rglob("*.json")) + sessions, exchanges = 0, 0 + for path in files: + added = import_file(path, created_at) + if added: + sessions += 1 + exchanges += added + logbus.log( + "info", "import complete", dir=str(dirpath), + files=len(files), sessions=sessions, exchanges=exchanges, + ) + return {"files": len(files), "sessions_imported": sessions, "exchanges": exchanges} + + +def main() -> int: + if len(sys.argv) < 2: + print("usage: lyra-import ", file=sys.stderr) + return 2 + report = import_dir(sys.argv[1]) + print(report) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/lyra/memory.py b/lyra/memory.py index 6119bdd..6001da4 100644 --- a/lyra/memory.py +++ b/lyra/memory.py @@ -108,6 +108,22 @@ def remember(session_id: str, role: str, content: str) -> int: return int(cur.lastrowid) +def add_exchanges_bulk(session_id: str, rows: list[tuple[str, str, list[float], str]]) -> int: + """Insert many pre-embedded exchanges at once. + + Each row is (role, content, embedding, created_at). Used by the importer to + avoid one INSERT (and one embed round-trip) per message. Returns row count. + """ + conn = _connection() + with conn: + conn.executemany( + "INSERT INTO exchanges (session_id, role, content, embedding, created_at) " + "VALUES (?, ?, ?, ?, ?)", + [(session_id, role, content, _to_blob(emb), ca) for role, content, emb, ca in rows], + ) + return len(rows) + + def recent(session_id: str, n: int = 10) -> list[Exchange]: """Last `n` exchanges from a session, oldest first.""" conn = _connection() diff --git a/pyproject.toml b/pyproject.toml index f035399..d333bfa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ [project.scripts] lyra = "lyra.__main__:main" lyra-web = "lyra.web.server:serve" +lyra-import = "lyra.ingest:main" [dependency-groups] dev = [