From 194e3e64b9244da79171e011cd4b43bff6d0c327 Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 16 Jun 2026 02:40:32 +0000 Subject: [PATCH] feat: import raw ChatGPT export (new sharded format) OpenAI's export changed: conversations.json is now sharded into conversations-000.json..NNN.json, each a JSON array of conversations with the mapping tree and per-message create_time. ingest now reads that format directly (supersedes the old convert/trim/split scripts): walks each conversation's mapping ordered by create_time, keeps text and multimodal_text (drops thoughts/reasoning_recap), captures real per-message timestamps, and imports idempotently by conversation_id. `lyra-import ` auto-detects raw-export vs legacy {title,messages} dirs; optional limit arg. Verified on 15 conversations: real dates, correct ordering, recall returns dated poker history. Co-Authored-By: Claude Opus 4.8 (1M context) --- lyra/ingest.py | 95 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 93 insertions(+), 2 deletions(-) diff --git a/lyra/ingest.py b/lyra/ingest.py index 7f6ea5b..ef027df 100644 --- a/lyra/ingest.py +++ b/lyra/ingest.py @@ -21,6 +21,10 @@ from lyra import llm, logbus, memory EMBED_BATCH = 64 EMBED_CHAR_CAP = 6000 # cap embed input size; full content is still stored +# Message content types worth keeping from a raw ChatGPT export. We drop +# 'thoughts' (internal chain-of-thought) and 'reasoning_recap' (meta). +KEEP_CONTENT_TYPES = {"text", "multimodal_text"} + def _session_id(path: Path) -> str: """Stable id derived from the filename, so re-imports don't duplicate.""" @@ -80,11 +84,98 @@ def import_dir(dirpath: str | Path, created_at: str | None = None) -> dict: return {"files": len(files), "sessions_imported": sessions, "exchanges": exchanges} +# --- Raw ChatGPT export (sharded conversations-*.json with timestamps) --- + + +def _ts_to_iso(ts: float | None, fallback: str) -> str: + if not ts: + return fallback + return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() + + +def _message_text(msg: dict) -> str | None: + """Extract plain text from a ChatGPT message node, or None to skip it.""" + content = msg.get("content") or {} + if content.get("content_type") not in KEEP_CONTENT_TYPES: + return None + parts = [p for p in (content.get("parts") or []) if isinstance(p, str) and p.strip()] + text = "\n".join(parts).strip() + return text or None + + +def _convo_rows(convo: dict) -> list[tuple[float, str, str]]: + """(create_time, role, text) for each keepable message, chronologically.""" + rows: list[tuple[float, str, str]] = [] + conv_ct = convo.get("create_time") or 0 + for node in convo.get("mapping", {}).values(): + msg = node.get("message") + if not msg: + continue + role = (msg.get("author") or {}).get("role") + if role not in ("user", "assistant"): + continue + text = _message_text(msg) + if text is None: + continue + rows.append((msg.get("create_time") or conv_ct, role, text)) + rows.sort(key=lambda r: r[0] or 0) + return rows + + +def import_conversation(convo: dict) -> int: + """Import one raw-export conversation. Idempotent by conversation_id.""" + session_id = convo.get("conversation_id") or convo.get("id") + if not session_id or memory.history(session_id): + return 0 + rows = _convo_rows(convo) + if not rows: + return 0 + + memory.ensure_session(session_id, name=convo.get("title") or "untitled") + fallback = datetime.now(timezone.utc).isoformat() + exchanges: list[tuple[str, str, list[float], str]] = [] + for i in range(0, len(rows), EMBED_BATCH): + batch = rows[i : i + EMBED_BATCH] + embeddings = llm.embed([text[:EMBED_CHAR_CAP] for _, _, text in batch]) + for (ts, role, text), emb in zip(batch, embeddings): + exchanges.append((role, text, emb, _ts_to_iso(ts, fallback))) + return memory.add_exchanges_bulk(session_id, exchanges) + + +def import_export(export_dir: str | Path, limit: int | None = None) -> dict: + """Import a raw ChatGPT export directory (sharded conversations-*.json).""" + shards = sorted(Path(export_dir).glob("conversations-*.json")) + convos, exchanges, seen = 0, 0, 0 + for shard in shards: + for convo in json.loads(shard.read_text(encoding="utf-8")): + if limit is not None and seen >= limit: + break + seen += 1 + added = import_conversation(convo) + if added: + convos += 1 + exchanges += added + if limit is not None and seen >= limit: + break + logbus.log( + "info", "export import complete", + shards=len(shards), conversations=convos, exchanges=exchanges, + ) + return {"shards": len(shards), "conversations_imported": convos, "exchanges": exchanges} + + def main() -> int: if len(sys.argv) < 2: - print("usage: lyra-import ", file=sys.stderr) + print("usage: lyra-import [limit]", file=sys.stderr) return 2 - report = import_dir(sys.argv[1]) + path = Path(sys.argv[1]) + limit = int(sys.argv[2]) if len(sys.argv) > 2 else None + # A raw ChatGPT export has sharded conversations-*.json; otherwise treat the + # directory as legacy {title, messages} files. + if list(path.glob("conversations-*.json")): + report = import_export(path, limit=limit) + else: + report = import_dir(path) print(report) return 0