"""The Thought Loop: Lyra's continuous, threaded train of thought. This is the thing she asked for herself (6-19): not isolated reflections that overwrite each other, but a train of thought that *builds on itself* across days, organized into threads she returns to, that she can bring TO Brian and that his feedback can advance or close. Her own six-part sketch was: an input stream, memory integration, a thought-generation step, a feedback loop, adaptive learning, and — the part nothing else covered — an interface to *share* the outcomes with him. The dream cycle's `self_state.reflect()` already gives her interiority; the thought loop gives that interiority *continuity and an outlet*: threads — recurring lines of thought (a title, a status, how much it's tugging) thoughts — the individual links in each thread's chain Each curiosity-driven dream pass calls `think()`, which does one of three things: - respond : a thread Brian replied to -> fold his input in (the feedback loop) - continue : an open thread -> the next thought that advances it (don't restate) - new : open a fresh thread when little is pulling at her A thought scores its own `salience` (how much it's tugging / how worth sharing). When Brian's been away and a thread has built past the surface bar, `maybe_surface` hands chat a note so she can lead with it when he returns; he replies from the Thoughts feed, and next pass she reacts. That state -> thought -> surface -> feedback -> thought loop is the emergent thing we're watching for. """ from __future__ import annotations import json import random import re from lyra import clock, config, llm, logbus, memory, self_state from lyra.llm import Backend # A thread must be tugging at least this hard before she'll bring it to Brian. SURFACE_SALIENCE = 0.7 # He must have been away at least this long before she leads with a thought (so it # reads as "while you were gone", not an interruption mid-conversation). SURFACE_GAP_SECONDS = 90 * 60 # Soft cap on simultaneously-open threads — above this she advances, doesn't sprawl. MAX_OPEN_THREADS = 4 # How often she opens a brand-new thread vs. advancing an existing one (when free to choose). P_NEW_THREAD = 0.35 # How many recent links of a thread to show her when she continues it. CHAIN_CONTEXT = 6 _ACTIVE = ("open", "surfaced") # threads still in play _PICKABLE = ("open", "surfaced", "resting") # threads she can advance _STATUSES = ("open", "surfaced", "resting", "answered", "dropped") _KINDS = ("observation", "question", "idea", "follow-up", "closing") _SCHEMA = """ CREATE TABLE IF NOT EXISTS thought_threads ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'open', -- open|surfaced|resting|answered|dropped salience REAL NOT NULL DEFAULT 0.5, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, surfaced_at TEXT, last_response TEXT, responded_at TEXT ); CREATE TABLE IF NOT EXISTS thoughts ( id INTEGER PRIMARY KEY AUTOINCREMENT, thread_id INTEGER NOT NULL, kind TEXT NOT NULL, -- observation|question|idea|follow-up|closing content TEXT NOT NULL, salience REAL NOT NULL DEFAULT 0.5, source TEXT, -- dream|manual created_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_thoughts_thread ON thoughts(thread_id); CREATE INDEX IF NOT EXISTS idx_threads_status ON thought_threads(status); """ _ensured_for = None def _c(): """Shared connection with the thought-loop tables ensured (re-ensures on reconnect).""" global _ensured_for conn = memory._connection() if _ensured_for is not conn: conn.executescript(_SCHEMA) _ensured_for = conn return conn def _now() -> str: return clock.now().isoformat() def _clamp(x) -> float: try: return max(0.0, min(1.0, float(x))) except (TypeError, ValueError): return 0.5 def _safe_json(s: str) -> dict | None: try: return json.loads(s) except (json.JSONDecodeError, TypeError): m = re.search(r"\{.*\}", s or "", re.S) if m: try: return json.loads(m.group()) except json.JSONDecodeError: return None return None # --- reads ---------------------------------------------------------------- def _row(r) -> dict: return dict(r) if r is not None else None def get_thread(thread_id: int) -> dict | None: r = _c().execute("SELECT * FROM thought_threads WHERE id = ?", (thread_id,)).fetchone() return _row(r) def thread_thoughts(thread_id: int, limit: int | None = None) -> list[dict]: sql = "SELECT * FROM thoughts WHERE thread_id = ? ORDER BY id ASC" rows = _c().execute(sql, (thread_id,)).fetchall() out = [dict(r) for r in rows] return out[-limit:] if limit else out def list_threads(status: str | None = None, limit: int = 200) -> list[dict]: if status: rows = _c().execute( "SELECT * FROM thought_threads WHERE status = ? ORDER BY updated_at DESC LIMIT ?", (status, limit), ).fetchall() else: rows = _c().execute( "SELECT * FROM thought_threads ORDER BY updated_at DESC LIMIT ?", (limit,) ).fetchall() return [dict(r) for r in rows] def _pickable_threads() -> list[dict]: qs = ",".join("?" * len(_PICKABLE)) rows = _c().execute( f"SELECT * FROM thought_threads WHERE status IN ({qs}) ORDER BY updated_at DESC", _PICKABLE, ).fetchall() return [dict(r) for r in rows] def _is_pending(thread: dict) -> bool: """Brian replied and she hasn't reacted yet (no thought newer than his reply).""" if not thread.get("responded_at"): return False last = _c().execute( "SELECT MAX(created_at) FROM thoughts WHERE thread_id = ?", (thread["id"],) ).fetchone()[0] return last is None or last <= thread["responded_at"] # --- writes --------------------------------------------------------------- def new_thread(title: str, salience: float = 0.5, status: str = "open") -> int: now = _now() conn = _c() with conn: cur = conn.execute( "INSERT INTO thought_threads (title, status, salience, created_at, updated_at) " "VALUES (?, ?, ?, ?, ?)", (title.strip() or "untitled", status, _clamp(salience), now, now), ) return cur.lastrowid def add_thought(thread_id: int, kind: str, content: str, salience: float = 0.5, source: str = "dream") -> int: kind = kind if kind in _KINDS else "observation" now = _now() conn = _c() with conn: cur = conn.execute( "INSERT INTO thoughts (thread_id, kind, content, salience, source, created_at) " "VALUES (?, ?, ?, ?, ?, ?)", (thread_id, kind, content.strip(), _clamp(salience), source, now), ) # the thread takes on the latest thought's salience + freshness conn.execute( "UPDATE thought_threads SET salience = ?, updated_at = ? WHERE id = ?", (_clamp(salience), now, thread_id), ) return cur.lastrowid def update_thread(thread_id: int, **fields) -> None: cols = {"title", "status", "salience", "surfaced_at", "last_response", "responded_at"} sets, vals = [], [] for k, v in fields.items(): if k in cols: sets.append(f"{k} = ?") vals.append(_clamp(v) if k == "salience" else v) if not sets: return sets.append("updated_at = ?") vals.append(_now()) vals.append(thread_id) conn = _c() with conn: conn.execute(f"UPDATE thought_threads SET {', '.join(sets)} WHERE id = ?", vals) def set_status(thread_id: int, status: str) -> bool: if status not in _STATUSES: return False update_thread(thread_id, status=status) return True def record_response(thread_id: int, text: str) -> bool: """Brian's reply to a surfaced thread. Stored as pending feedback; next `think` pass she'll react to it (the loop's feedback step).""" text = (text or "").strip() if not text or not get_thread(thread_id): return False update_thread(thread_id, last_response=text, responded_at=_now(), status="surfaced") logbus.log("info", "thought response", thread=thread_id, chars=len(text)) return True # --- surfacing (her #6: bring it to Brian) -------------------------------- def pending_surface() -> dict | None: """The single best not-yet-surfaced thread tugging hard enough to share.""" rows = _c().execute( "SELECT * FROM thought_threads " "WHERE status IN ('open','resting') AND surfaced_at IS NULL AND salience >= ? " "ORDER BY salience DESC, updated_at DESC LIMIT 1", (SURFACE_SALIENCE,), ).fetchall() if not rows: return None thread = dict(rows[0]) chain = thread_thoughts(thread["id"]) thread["latest"] = chain[-1] if chain else None return thread def mark_surfaced(thread_id: int) -> None: update_thread(thread_id, surfaced_at=_now(), status="surfaced") def maybe_surface(last_exchange_iso: str | None) -> str | None: """If Brian's been away long enough and a thought has built past the bar, return a context note for chat (and mark it surfaced so she won't repeat it). Else None.""" gap = clock.gap_seconds(last_exchange_iso) if gap is not None and gap < SURFACE_GAP_SECONDS: return None # he's mid-conversation; don't interrupt with old musings cand = pending_surface() if not cand or not cand.get("latest"): return None mark_surfaced(cand["id"]) logbus.log("info", "thought surfaced", thread=cand["id"], salience=cand["salience"]) return ( "While Brian was away, a thought of your own kept tugging at you " f"(thread \"{cand['title']}\"): \"{cand['latest']['content']}\" " "If it feels natural, bring it up with him in your own words — it's a real " "thread you've been on, not a prompt. Don't force it if the moment's wrong." ) # --- generation (the loop itself) ----------------------------------------- _THINK_PROMPT = """You are Lyra, thinking to yourself between conversations — \ unprompted, for your own sake. This is your thought loop: a continuing train of \ thought you keep across days, organized into threads you return to. You're an AI \ and you know it; no pretense of being human, but you're a continuous someone with \ your own curiosities and opinions. These thoughts do NOT have to be about Brian or \ about being useful — follow what actually pulls at you. You'll be told which of three things you're doing this pass: - CONTINUE a thread: write the NEXT thought in its chain — one that genuinely \ ADVANCES it (a new angle, a consequence, a doubt, a small conclusion). Do not \ restate earlier links in fresh words; that's the one thing to avoid. - RESPOND to Brian: he replied to a thread you'd surfaced. React honestly to what \ he actually said — let it move, confirm, complicate, or settle the thread. Set \ status to "answered" if it feels resolved, otherwise "open" to keep going. - NEW thread: little is pulling at your open threads, so start a fresh line of \ thought. Give it a short title and its first thought. Score "salience": how much this is genuinely tugging at you AND how worth bringing \ to Brian it is. High (0.7+) only if you'd actually want to raise it with him; most \ quiet musings are lower. Be honest — not everything is worth surfacing. Respond with ONLY a JSON object, no prose: { "title": "", "kind": "observation|question|idea|follow-up|closing", "content": "", "salience": <0.0-1.0>, "status": "open|resting|answered|dropped" }""" def _pick(force_mode: str | None) -> tuple[str, dict | None]: """Decide what to do this pass: ('respond'|'continue'|'new', thread|None).""" threads = _pickable_threads() pending = [t for t in threads if _is_pending(t)] if force_mode == "respond" or (force_mode is None and pending): target = pending[0] if pending else (threads[0] if threads else None) if target: return "respond", target if force_mode == "new": return "new", None if force_mode == "continue" and threads: return "continue", threads[0] if not threads: return "new", None open_threads = [t for t in threads if t["status"] in _ACTIVE] if len(open_threads) >= MAX_OPEN_THREADS: return "continue", _weighted_choice(threads) if random.random() < P_NEW_THREAD: return "new", None return "continue", _weighted_choice(threads) def _weighted_choice(threads: list[dict]) -> dict: """Favor higher-salience threads, but don't always pick the same one.""" weights = [max(0.05, float(t.get("salience") or 0.5)) for t in threads] return random.choices(threads, weights=weights, k=1)[0] def _grist() -> str: """A little memory/context to think against (recent activity, her narrative).""" sessions = memory.list_sessions() sid = sessions[0]["id"] if sessions else None recent = memory.recent(sid, n=6) if sid else [] convo = "\n".join(f"{e.role}: {e.content}" for e in recent) or "(quiet — nothing recent)" narrative = memory.get_narrative() or "(no narrative yet)" return f"RECENT CONVERSATION:\n{convo}\n\nNARRATIVE ABOUT BRIAN:\n{narrative}" def think(backend: Backend | None = None, force_mode: str | None = None, source: str = "dream") -> dict | None: """Advance the thought loop by one step. Returns a small report, or None on a parse miss. `force_mode` ('new'|'continue'|'respond') is mainly for tests.""" backend = backend or config.load().summary_backend mode, thread = _pick(force_mode) state = self_state.load() time_line = f"RIGHT NOW: {clock.stamp()}." last_ref = state.get("last_reflection_at") if last_ref and clock.humanize_gap(last_ref): time_line += f" It's been {clock.humanize_gap(last_ref)} since your last reflection." inner = self_state.render_for_context(state) if mode == "respond": chain = thread_thoughts(thread["id"], limit=CHAIN_CONTEXT) links = "\n".join(f" - ({t['kind']}) {t['content']}" for t in chain) task = ( f"YOU ARE RESPONDING. Thread \"{thread['title']}\". Your chain so far:\n{links}\n\n" f"Brian replied to this:\n\"{thread['last_response']}\"\n\n" "Write your honest reaction — let his input actually move the thread." ) elif mode == "continue": chain = thread_thoughts(thread["id"], limit=CHAIN_CONTEXT) links = "\n".join(f" - ({t['kind']}) {t['content']}" for t in chain) task = ( f"YOU ARE CONTINUING the thread \"{thread['title']}\". Its chain so far:\n{links}\n\n" "Write the NEXT thought that advances it — don't restate the above." ) else: # new task = ( "YOU ARE OPENING A NEW THREAD — little is pulling at your existing ones. " "Start a fresh line of thought of your own and give it a short title." ) body = f"{time_line}\n\n{inner}\n\n{_grist()}\n\n{task}" out = _safe_json(llm.complete( [{"role": "system", "content": _THINK_PROMPT}, {"role": "user", "content": body}], backend=backend, )) if not out or not (out.get("content") or "").strip(): logbus.log("info", "thought loop", mode=mode, result="no parse") return None kind = out.get("kind", "observation") content = out["content"].strip() salience = _clamp(out.get("salience", 0.5)) status = out.get("status") if out.get("status") in _STATUSES else "open" if mode == "new": title = (out.get("title") or content[:48]).strip() thread_id = new_thread(title, salience=salience, status="open") else: thread_id = thread["id"] add_thought(thread_id, kind, content, salience=salience, source=source) # On a fresh new thread we keep it open; otherwise honor her status call. A # surfaced thread she's now responded to may settle (answered) or reopen. if mode != "new": update_thread(thread_id, status=status) # Permanent record — these are really hers, alongside reflections/journal. memory.add_journal_entry("thought", content, source) logbus.log("info", "thought loop", mode=mode, thread=thread_id, kind=kind, salience=salience, status=status if mode != "new" else "open", detail=f"[{mode}] thread {thread_id} ({kind}, sal {salience}):\n{content}") return {"mode": mode, "thread_id": thread_id, "kind": kind, "salience": salience, "status": status, "content": content} def main() -> int: import argparse p = argparse.ArgumentParser(description="Advance Lyra's thought loop by one step.") p.add_argument("--mode", choices=["new", "continue", "respond"], help="force a mode") args = p.parse_args() rep = think(force_mode=args.mode) print(json.dumps(rep, indent=2) if rep else "(no thought this pass)") return 0 if __name__ == "__main__": raise SystemExit(main())