194e3e64b9
OpenAI's export changed: conversations.json is now sharded into
conversations-000.json..NNN.json, each a JSON array of conversations with the
mapping tree and per-message create_time.
ingest now reads that format directly (supersedes the old convert/trim/split
scripts): walks each conversation's mapping ordered by create_time, keeps text
and multimodal_text (drops thoughts/reasoning_recap), captures real per-message
timestamps, and imports idempotently by conversation_id. `lyra-import <dir>`
auto-detects raw-export vs legacy {title,messages} dirs; optional limit arg.
Verified on 15 conversations: real dates, correct ordering, recall returns
dated poker history.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
185 lines
6.7 KiB
Python
185 lines
6.7 KiB
Python
"""Import parsed ChatGPT chat logs into Lyra's memory.
|
|
|
|
Consumes the parser's `{"title": ..., "messages": [{"role", "content"}]}` format
|
|
(one JSON file per conversation). Each conversation becomes a Lyra session; each
|
|
text message becomes an exchange. Embeddings are batched. Import is idempotent —
|
|
a conversation already present (by session id) is skipped.
|
|
|
|
Timestamps: this format carries no dates, so imported exchanges are stamped with
|
|
`created_at` (default: now). A future timestamped export will let era memory group
|
|
by real calendar time; pass real per-message dates then.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from lyra import llm, logbus, memory
|
|
|
|
EMBED_BATCH = 64
|
|
EMBED_CHAR_CAP = 6000 # cap embed input size; full content is still stored
|
|
|
|
# Message content types worth keeping from a raw ChatGPT export. We drop
|
|
# 'thoughts' (internal chain-of-thought) and 'reasoning_recap' (meta).
|
|
KEEP_CONTENT_TYPES = {"text", "multimodal_text"}
|
|
|
|
|
|
def _session_id(path: Path) -> str:
|
|
"""Stable id derived from the filename, so re-imports don't duplicate."""
|
|
return "import-" + path.stem
|
|
|
|
|
|
def _clean_messages(messages: list[dict]) -> list[tuple[str, str]]:
|
|
out: list[tuple[str, str]] = []
|
|
for m in messages:
|
|
role = m.get("role")
|
|
if role not in ("user", "assistant"):
|
|
continue
|
|
content = (m.get("content") or "").strip()
|
|
if not content or content.startswith('{"content_type"'): # skip empty / image assets
|
|
continue
|
|
out.append((role, content))
|
|
return out
|
|
|
|
|
|
def import_file(path: Path, created_at: str) -> int:
|
|
"""Import one conversation file. Returns exchanges added (0 if skipped/empty)."""
|
|
data = json.loads(path.read_text(encoding="utf-8"))
|
|
session_id = _session_id(path)
|
|
if memory.history(session_id): # already imported
|
|
return 0
|
|
|
|
msgs = _clean_messages(data.get("messages", []))
|
|
if not msgs:
|
|
return 0
|
|
|
|
memory.ensure_session(session_id, name=data.get("title") or path.stem)
|
|
|
|
rows: list[tuple[str, str, list[float], str]] = []
|
|
for i in range(0, len(msgs), EMBED_BATCH):
|
|
batch = msgs[i : i + EMBED_BATCH]
|
|
embeddings = llm.embed([content[:EMBED_CHAR_CAP] for _, content in batch])
|
|
for (role, content), emb in zip(batch, embeddings):
|
|
rows.append((role, content, emb, created_at))
|
|
|
|
return memory.add_exchanges_bulk(session_id, rows)
|
|
|
|
|
|
def import_dir(dirpath: str | Path, created_at: str | None = None) -> dict:
|
|
"""Import every *.json under dirpath (recursively). Returns a small report."""
|
|
created_at = created_at or datetime.now(timezone.utc).isoformat()
|
|
files = sorted(Path(dirpath).rglob("*.json"))
|
|
sessions, exchanges = 0, 0
|
|
for path in files:
|
|
added = import_file(path, created_at)
|
|
if added:
|
|
sessions += 1
|
|
exchanges += added
|
|
logbus.log(
|
|
"info", "import complete", dir=str(dirpath),
|
|
files=len(files), sessions=sessions, exchanges=exchanges,
|
|
)
|
|
return {"files": len(files), "sessions_imported": sessions, "exchanges": exchanges}
|
|
|
|
|
|
# --- Raw ChatGPT export (sharded conversations-*.json with timestamps) ---
|
|
|
|
|
|
def _ts_to_iso(ts: float | None, fallback: str) -> str:
|
|
if not ts:
|
|
return fallback
|
|
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
|
|
|
|
|
|
def _message_text(msg: dict) -> str | None:
|
|
"""Extract plain text from a ChatGPT message node, or None to skip it."""
|
|
content = msg.get("content") or {}
|
|
if content.get("content_type") not in KEEP_CONTENT_TYPES:
|
|
return None
|
|
parts = [p for p in (content.get("parts") or []) if isinstance(p, str) and p.strip()]
|
|
text = "\n".join(parts).strip()
|
|
return text or None
|
|
|
|
|
|
def _convo_rows(convo: dict) -> list[tuple[float, str, str]]:
|
|
"""(create_time, role, text) for each keepable message, chronologically."""
|
|
rows: list[tuple[float, str, str]] = []
|
|
conv_ct = convo.get("create_time") or 0
|
|
for node in convo.get("mapping", {}).values():
|
|
msg = node.get("message")
|
|
if not msg:
|
|
continue
|
|
role = (msg.get("author") or {}).get("role")
|
|
if role not in ("user", "assistant"):
|
|
continue
|
|
text = _message_text(msg)
|
|
if text is None:
|
|
continue
|
|
rows.append((msg.get("create_time") or conv_ct, role, text))
|
|
rows.sort(key=lambda r: r[0] or 0)
|
|
return rows
|
|
|
|
|
|
def import_conversation(convo: dict) -> int:
|
|
"""Import one raw-export conversation. Idempotent by conversation_id."""
|
|
session_id = convo.get("conversation_id") or convo.get("id")
|
|
if not session_id or memory.history(session_id):
|
|
return 0
|
|
rows = _convo_rows(convo)
|
|
if not rows:
|
|
return 0
|
|
|
|
memory.ensure_session(session_id, name=convo.get("title") or "untitled")
|
|
fallback = datetime.now(timezone.utc).isoformat()
|
|
exchanges: list[tuple[str, str, list[float], str]] = []
|
|
for i in range(0, len(rows), EMBED_BATCH):
|
|
batch = rows[i : i + EMBED_BATCH]
|
|
embeddings = llm.embed([text[:EMBED_CHAR_CAP] for _, _, text in batch])
|
|
for (ts, role, text), emb in zip(batch, embeddings):
|
|
exchanges.append((role, text, emb, _ts_to_iso(ts, fallback)))
|
|
return memory.add_exchanges_bulk(session_id, exchanges)
|
|
|
|
|
|
def import_export(export_dir: str | Path, limit: int | None = None) -> dict:
|
|
"""Import a raw ChatGPT export directory (sharded conversations-*.json)."""
|
|
shards = sorted(Path(export_dir).glob("conversations-*.json"))
|
|
convos, exchanges, seen = 0, 0, 0
|
|
for shard in shards:
|
|
for convo in json.loads(shard.read_text(encoding="utf-8")):
|
|
if limit is not None and seen >= limit:
|
|
break
|
|
seen += 1
|
|
added = import_conversation(convo)
|
|
if added:
|
|
convos += 1
|
|
exchanges += added
|
|
if limit is not None and seen >= limit:
|
|
break
|
|
logbus.log(
|
|
"info", "export import complete",
|
|
shards=len(shards), conversations=convos, exchanges=exchanges,
|
|
)
|
|
return {"shards": len(shards), "conversations_imported": convos, "exchanges": exchanges}
|
|
|
|
|
|
def main() -> int:
|
|
if len(sys.argv) < 2:
|
|
print("usage: lyra-import <dir> [limit]", file=sys.stderr)
|
|
return 2
|
|
path = Path(sys.argv[1])
|
|
limit = int(sys.argv[2]) if len(sys.argv) > 2 else None
|
|
# A raw ChatGPT export has sharded conversations-*.json; otherwise treat the
|
|
# directory as legacy {title, messages} files.
|
|
if list(path.glob("conversations-*.json")):
|
|
report = import_export(path, limit=limit)
|
|
else:
|
|
report = import_dir(path)
|
|
print(report)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|