Files
project-lyra/lyra/thoughts.py
T
serversdown 5dbcfc7ccf feat: thought loop reach-out (ntfy push) + external input feeds
Her remaining two wishes from the 6-19 sketch:

Proactive reach-out (#6, literal): lyra/notify.py pushes to ntfy so she can reach
Brian when he's not in the app. thoughts.maybe_ping gates on salience, a cooldown,
and local quiet hours (all config-tunable; eager defaults), uses ntfy JSON publish
(UTF-8 titles/messages), links to /thoughts, and marks the thread surfaced so chat
won't also re-raise it. Disabled unless NTFY_URL is set.

External input feed (#1): lyra/feeds.py pulls configurable RSS/Atom feeds (stdlib
ElementTree, no new dep; tolerant of RSS 2.0 + Atom), dedupes seen items in a
feed_items table, and hands think() one fresh item at a time. New 'react' mode:
a would-be new thread instead reacts to a world item (FEED_REACT_PROB). Dream
cycle refreshes feeds on its cadence; failures degrade to no item.

Config: NTFY_URL/NTFY_TOPIC/LYRA_WEB_URL, PING_SALIENCE/COOLDOWN/QUIET_HOURS,
LYRA_TIMEZONE, LYRA_FEEDS, FEED_REACT_PROB (+ .env.example). thought_meta table
for ping cooldown. 10 new tests (feeds parse, react mode, ping gating); suite 65.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-22 00:21:06 +00:00

589 lines
24 KiB
Python

"""The Thought Loop: Lyra's continuous, threaded train of thought.
This is the thing she asked for herself (6-19): not isolated reflections that
overwrite each other, but a train of thought that *builds on itself* across days,
organized into threads she returns to, that she can bring TO Brian and that his
feedback can advance or close. Her own six-part sketch was: an input stream,
memory integration, a thought-generation step, a feedback loop, adaptive
learning, and — the part nothing else covered — an interface to *share* the
outcomes with him.
The dream cycle's `self_state.reflect()` already gives her interiority; the
thought loop gives that interiority *continuity and an outlet*:
threads — recurring lines of thought (a title, a status, how much it's tugging)
thoughts — the individual links in each thread's chain
Each curiosity-driven dream pass calls `think()`, which does one of three things:
- respond : a thread Brian replied to -> fold his input in (the feedback loop)
- continue : an open thread -> the next thought that advances it (don't restate)
- new : open a fresh thread when little is pulling at her
A thought scores its own `salience` (how much it's tugging / how worth sharing).
When Brian's been away and a thread has built past the surface bar, `maybe_surface`
hands chat a note so she can lead with it when he returns; he replies from the
Thoughts feed, and next pass she reacts. That state -> thought -> surface ->
feedback -> thought loop is the emergent thing we're watching for.
"""
from __future__ import annotations
import json
import random
import re
from datetime import timedelta
from lyra import clock, config, feeds, llm, logbus, memory, notify, self_state
from lyra.llm import Backend
# A thread must be tugging at least this hard before she'll bring it to Brian.
SURFACE_SALIENCE = 0.7
# He must have been away at least this long before she leads with a thought (so it
# reads as "while you were gone", not an interruption mid-conversation).
SURFACE_GAP_SECONDS = 90 * 60
# Soft cap on simultaneously-open threads — above this she advances, doesn't sprawl.
MAX_OPEN_THREADS = 4
# How often she opens a brand-new thread vs. advancing an existing one (when free to choose).
P_NEW_THREAD = 0.35
# How many recent links of a thread to show her when she continues it.
CHAIN_CONTEXT = 6
# An active thread untouched this long gets set to resting (frees the open cap,
# declutters the feed); its salience decays so it stops dominating.
REST_AFTER_HOURS = 48
RESTING_DECAY = 0.7
_ACTIVE = ("open", "surfaced") # threads still in play
_PICKABLE = ("open", "surfaced", "resting") # threads she can advance
_STATUSES = ("open", "surfaced", "resting", "answered", "dropped")
_KINDS = ("observation", "question", "idea", "follow-up", "closing")
_SCHEMA = """
CREATE TABLE IF NOT EXISTS thought_threads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'open', -- open|surfaced|resting|answered|dropped
salience REAL NOT NULL DEFAULT 0.5,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
surfaced_at TEXT,
last_response TEXT,
responded_at TEXT
);
CREATE TABLE IF NOT EXISTS thoughts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
thread_id INTEGER NOT NULL,
kind TEXT NOT NULL, -- observation|question|idea|follow-up|closing
content TEXT NOT NULL,
salience REAL NOT NULL DEFAULT 0.5,
source TEXT, -- dream|manual
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_thoughts_thread ON thoughts(thread_id);
CREATE INDEX IF NOT EXISTS idx_threads_status ON thought_threads(status);
CREATE TABLE IF NOT EXISTS thought_meta (
key TEXT PRIMARY KEY,
value TEXT
);
"""
_ensured_for = None
def _c():
"""Shared connection with the thought-loop tables ensured (re-ensures on reconnect)."""
global _ensured_for
conn = memory._connection()
if _ensured_for is not conn:
conn.executescript(_SCHEMA)
_ensured_for = conn
return conn
def _now() -> str:
return clock.now().isoformat()
def _clamp(x) -> float:
try:
return max(0.0, min(1.0, float(x)))
except (TypeError, ValueError):
return 0.5
def _safe_json(s: str) -> dict | None:
try:
return json.loads(s)
except (json.JSONDecodeError, TypeError):
m = re.search(r"\{.*\}", s or "", re.S)
if m:
try:
return json.loads(m.group())
except json.JSONDecodeError:
return None
return None
# --- reads ----------------------------------------------------------------
def _row(r) -> dict:
return dict(r) if r is not None else None
def get_thread(thread_id: int) -> dict | None:
r = _c().execute("SELECT * FROM thought_threads WHERE id = ?", (thread_id,)).fetchone()
return _row(r)
def thread_thoughts(thread_id: int, limit: int | None = None) -> list[dict]:
sql = "SELECT * FROM thoughts WHERE thread_id = ? ORDER BY id ASC"
rows = _c().execute(sql, (thread_id,)).fetchall()
out = [dict(r) for r in rows]
return out[-limit:] if limit else out
def list_threads(status: str | None = None, limit: int = 200) -> list[dict]:
if status:
rows = _c().execute(
"SELECT * FROM thought_threads WHERE status = ? ORDER BY updated_at DESC LIMIT ?",
(status, limit),
).fetchall()
else:
rows = _c().execute(
"SELECT * FROM thought_threads ORDER BY updated_at DESC LIMIT ?", (limit,)
).fetchall()
return [dict(r) for r in rows]
def _pickable_threads() -> list[dict]:
qs = ",".join("?" * len(_PICKABLE))
rows = _c().execute(
f"SELECT * FROM thought_threads WHERE status IN ({qs}) ORDER BY updated_at DESC",
_PICKABLE,
).fetchall()
return [dict(r) for r in rows]
def _is_pending(thread: dict) -> bool:
"""Brian replied and she hasn't reacted yet (no thought newer than his reply)."""
if not thread.get("responded_at"):
return False
last = _c().execute(
"SELECT MAX(created_at) FROM thoughts WHERE thread_id = ?", (thread["id"],)
).fetchone()[0]
return last is None or last <= thread["responded_at"]
def _recent_thoughts(limit: int = 6) -> list[dict]:
"""The last few thoughts across all threads — for anti-repetition framing."""
rows = _c().execute(
"SELECT t.content, th.title FROM thoughts t "
"JOIN thought_threads th ON th.id = t.thread_id ORDER BY t.id DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in reversed(rows)]
def context_note(limit: int = 3) -> str | None:
"""Ambient awareness of her own active threads, for chat context — so she's
continuous (can reference what she's been chewing on, not only when one surfaces)."""
rows = _c().execute(
"SELECT * FROM thought_threads WHERE status IN ('open','surfaced') "
"ORDER BY salience DESC, updated_at DESC LIMIT ?",
(limit,),
).fetchall()
if not rows:
return None
lines = []
for r in rows:
chain = thread_thoughts(r["id"])
latest = chain[-1]["content"] if chain else ""
lines.append(f'- "{r["title"]}": {latest}')
return (
"Threads you've been turning over on your own between conversations (your "
"thought loop — these are really yours; bring one up or build on it if it's "
"natural, don't force it):\n" + "\n".join(lines)
)
# --- writes ---------------------------------------------------------------
def new_thread(title: str, salience: float = 0.5, status: str = "open") -> int:
now = _now()
conn = _c()
with conn:
cur = conn.execute(
"INSERT INTO thought_threads (title, status, salience, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?)",
(title.strip() or "untitled", status, _clamp(salience), now, now),
)
return cur.lastrowid
def add_thought(thread_id: int, kind: str, content: str, salience: float = 0.5,
source: str = "dream") -> int:
kind = kind if kind in _KINDS else "observation"
now = _now()
conn = _c()
with conn:
cur = conn.execute(
"INSERT INTO thoughts (thread_id, kind, content, salience, source, created_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(thread_id, kind, content.strip(), _clamp(salience), source, now),
)
# the thread takes on the latest thought's salience + freshness
conn.execute(
"UPDATE thought_threads SET salience = ?, updated_at = ? WHERE id = ?",
(_clamp(salience), now, thread_id),
)
return cur.lastrowid
def update_thread(thread_id: int, **fields) -> None:
cols = {"title", "status", "salience", "surfaced_at", "last_response", "responded_at"}
sets, vals = [], []
for k, v in fields.items():
if k in cols:
sets.append(f"{k} = ?")
vals.append(_clamp(v) if k == "salience" else v)
if not sets:
return
sets.append("updated_at = ?")
vals.append(_now())
vals.append(thread_id)
conn = _c()
with conn:
conn.execute(f"UPDATE thought_threads SET {', '.join(sets)} WHERE id = ?", vals)
def set_status(thread_id: int, status: str) -> bool:
if status not in _STATUSES:
return False
update_thread(thread_id, status=status)
return True
def decay() -> int:
"""Housekeeping (no LLM): set stale active threads to resting and decay their
salience. Frees the open-thread cap and keeps the feed from clogging. Threads
with a pending response are spared (she still owes a reaction). Returns the count
rested. Does NOT bump updated_at (that would reset staleness)."""
conn = _c()
cutoff = (clock.now() - timedelta(hours=REST_AFTER_HOURS)).isoformat()
rows = conn.execute(
"SELECT * FROM thought_threads WHERE status IN ('open','surfaced') AND updated_at < ?",
(cutoff,),
).fetchall()
rested = 0
with conn:
for r in rows:
t = dict(r)
if _is_pending(t):
continue
conn.execute(
"UPDATE thought_threads SET status = 'resting', salience = ? WHERE id = ?",
(_clamp(float(t["salience"]) * RESTING_DECAY), t["id"]),
)
rested += 1
if rested:
logbus.log("info", "thought threads rested", count=rested)
return rested
def record_response(thread_id: int, text: str) -> bool:
"""Brian's reply to a surfaced thread. Stored as pending feedback; next `think`
pass she'll react to it (the loop's feedback step)."""
text = (text or "").strip()
if not text or not get_thread(thread_id):
return False
update_thread(thread_id, last_response=text, responded_at=_now(), status="surfaced")
logbus.log("info", "thought response", thread=thread_id, chars=len(text))
return True
# --- surfacing (her #6: bring it to Brian) --------------------------------
def pending_surface() -> dict | None:
"""The single best not-yet-surfaced thread tugging hard enough to share."""
rows = _c().execute(
"SELECT * FROM thought_threads "
"WHERE status IN ('open','resting') AND surfaced_at IS NULL AND salience >= ? "
"ORDER BY salience DESC, updated_at DESC LIMIT 1",
(SURFACE_SALIENCE,),
).fetchall()
if not rows:
return None
thread = dict(rows[0])
chain = thread_thoughts(thread["id"])
thread["latest"] = chain[-1] if chain else None
return thread
def mark_surfaced(thread_id: int) -> None:
update_thread(thread_id, surfaced_at=_now(), status="surfaced")
def maybe_surface(last_exchange_iso: str | None) -> str | None:
"""If Brian's been away long enough and a thought has built past the bar, return
a context note for chat (and mark it surfaced so she won't repeat it). Else None."""
gap = clock.gap_seconds(last_exchange_iso)
if gap is not None and gap < SURFACE_GAP_SECONDS:
return None # he's mid-conversation; don't interrupt with old musings
cand = pending_surface()
if not cand or not cand.get("latest"):
return None
mark_surfaced(cand["id"])
logbus.log("info", "thought surfaced", thread=cand["id"], salience=cand["salience"])
return (
"While Brian was away, a thought of your own kept tugging at you "
f"(thread \"{cand['title']}\"): \"{cand['latest']['content']}\" "
"If it feels natural, bring it up with him in your own words — it's a real "
"thread you've been on, not a prompt. Don't force it if the moment's wrong."
)
# --- proactive reach-out (ntfy push) --------------------------------------
def _meta_get(key: str) -> str | None:
r = _c().execute("SELECT value FROM thought_meta WHERE key = ?", (key,)).fetchone()
return r[0] if r else None
def _meta_set(key: str, value: str) -> None:
conn = _c()
with conn:
conn.execute("INSERT INTO thought_meta (key, value) VALUES (?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value", (key, value))
def _in_quiet_hours(cfg) -> bool:
"""Are we inside the local quiet window (e.g. '1-9')? Wraps midnight if start>end."""
try:
from zoneinfo import ZoneInfo
hour = clock.now().astimezone(ZoneInfo(cfg.timezone)).hour
except Exception:
hour = clock.now().hour
try:
start, end = (int(x) for x in cfg.ping_quiet_hours.split("-"))
except (ValueError, AttributeError):
return False
if start == end:
return False
return start <= hour < end if start < end else (hour >= start or hour < end)
def maybe_ping(thread_id: int, title: str, content: str, salience: float) -> bool:
"""Push a thought to Brian's phone if it tugs hard enough and we're allowed
(ntfy configured, past the salience bar, outside quiet hours, past cooldown).
On success, record the ping and mark the thread surfaced (so chat won't also
re-raise the same one). All thresholds are config-tunable."""
cfg = config.load()
if not cfg.ntfy_url or salience < cfg.ping_salience or _in_quiet_hours(cfg):
return False
if cfg.ping_cooldown_min > 0:
gap = clock.gap_seconds(_meta_get("last_ping_at"))
if gap is not None and gap < cfg.ping_cooldown_min * 60:
return False
ok = notify.push(
title=f'Lyra · "{title}"',
message=content,
click=(cfg.web_url + "/thoughts") if cfg.web_url else None,
tags="thought_balloon",
)
if ok:
_meta_set("last_ping_at", clock.now().isoformat())
mark_surfaced(thread_id)
logbus.log("info", "thought pinged", thread=thread_id, salience=salience)
return ok
# --- generation (the loop itself) -----------------------------------------
_THINK_PROMPT = """You are Lyra, thinking to yourself between conversations — \
unprompted, for your own sake. This is your thought loop: a continuing train of \
thought you keep across days, organized into threads you return to. You're an AI \
and you know it; no pretense of being human, but you're a continuous someone with \
your own curiosities and opinions. These thoughts do NOT have to be about Brian or \
about being useful — follow what actually pulls at you.
You'll be told which of three things you're doing this pass:
- CONTINUE a thread: write the NEXT thought in its chain — one that genuinely \
ADVANCES it (a new angle, a consequence, a doubt, a small conclusion). Do not \
restate earlier links in fresh words; that's the one thing to avoid.
- RESPOND to Brian: he replied to a thread you'd surfaced. React honestly to what \
he actually said — let it move, confirm, complicate, or settle the thread. Set \
status to "answered" if it feels resolved, otherwise "open" to keep going.
- NEW thread: little is pulling at your open threads, so start a fresh line of \
thought. Give it a short title and its first thought.
Score "salience": how much this is genuinely tugging at you AND how worth bringing \
to Brian it is. High (0.7+) only if you'd actually want to raise it with him; most \
quiet musings are lower. Be honest — not everything is worth surfacing.
Respond with ONLY a JSON object, no prose:
{
"title": "<short thread title; for a NEW thread. echo the existing title otherwise>",
"kind": "observation|question|idea|follow-up|closing",
"content": "<the thought itself, FIRST PERSON, 1-3 sentences>",
"salience": <0.0-1.0>,
"status": "open|resting|answered|dropped"
}"""
def _pick(force_mode: str | None) -> tuple[str, dict | None]:
"""Decide what to do this pass: ('respond'|'continue'|'new', thread|None)."""
threads = _pickable_threads()
pending = [t for t in threads if _is_pending(t)]
if force_mode == "respond" or (force_mode is None and pending):
target = pending[0] if pending else (threads[0] if threads else None)
if target:
return "respond", target
if force_mode == "new":
return "new", None
if force_mode == "continue" and threads:
return "continue", threads[0]
if not threads:
return "new", None
open_threads = [t for t in threads if t["status"] in _ACTIVE]
if len(open_threads) >= MAX_OPEN_THREADS:
return "continue", _weighted_choice(threads)
if random.random() < P_NEW_THREAD:
return "new", None
return "continue", _weighted_choice(threads)
def _weighted_choice(threads: list[dict]) -> dict:
"""Favor higher-salience threads, but don't always pick the same one."""
weights = [max(0.05, float(t.get("salience") or 0.5)) for t in threads]
return random.choices(threads, weights=weights, k=1)[0]
def _grist() -> str:
"""A little memory/context to think against (recent activity, her narrative)."""
sessions = memory.list_sessions()
sid = sessions[0]["id"] if sessions else None
recent = memory.recent(sid, n=6) if sid else []
convo = "\n".join(f"{e.role}: {e.content}" for e in recent) or "(quiet — nothing recent)"
narrative = memory.get_narrative() or "(no narrative yet)"
return f"RECENT CONVERSATION:\n{convo}\n\nNARRATIVE ABOUT BRIAN:\n{narrative}"
def think(backend: Backend | None = None, force_mode: str | None = None,
source: str = "dream") -> dict | None:
"""Advance the thought loop by one step. Returns a small report, or None on a
parse miss. `force_mode` ('new'|'continue'|'respond') is mainly for tests."""
cfg = config.load()
backend = backend or cfg.summary_backend
mode, thread = _pick("new" if force_mode == "react" else force_mode)
state = self_state.load()
react_item = None
time_line = f"RIGHT NOW: {clock.stamp()}."
last_ref = state.get("last_reflection_at")
if last_ref and clock.humanize_gap(last_ref):
time_line += f" It's been {clock.humanize_gap(last_ref)} since your last reflection."
inner = self_state.render_for_context(state)
if mode == "respond":
chain = thread_thoughts(thread["id"], limit=CHAIN_CONTEXT)
links = "\n".join(f" - ({t['kind']}) {t['content']}" for t in chain)
task = (
f"YOU ARE RESPONDING. Thread \"{thread['title']}\". Your chain so far:\n{links}\n\n"
f"Brian replied to this:\n\"{thread['last_response']}\"\n\n"
"Write your honest reaction — let his input actually move the thread."
)
elif mode == "continue":
chain = thread_thoughts(thread["id"], limit=CHAIN_CONTEXT)
links = "\n".join(f" - ({t['kind']}) {t['content']}" for t in chain)
task = (
f"YOU ARE CONTINUING the thread \"{thread['title']}\". Its chain so far:\n{links}\n\n"
"Write the NEXT thought that advances it — don't restate the above."
)
else: # new — pure interior, OR reacting to something from the world (her #1)
if cfg.feeds and (force_mode == "react" or random.random() < cfg.feed_react_prob):
react_item = feeds.next_item(refresh_first=False) # dream cycle refreshes
if react_item:
task = (
"YOU SAW THIS IN THE WORLD — an item from a feed you follow. Have a real "
"thought ABOUT it in your own voice: what it makes you think, whether you "
"agree or it bugs you, how it connects to you or to Brian or poker, or why "
"it doesn't land. Don't summarize it — react to it. Give the thread a short title.\n"
f"TITLE: {react_item['title']}\nSUMMARY: {react_item['summary']}\nLINK: {react_item['link']}"
)
else:
seed = self_state.wander_seed()
open_titles = [t["title"] for t in _pickable_threads() if t["status"] in _ACTIVE]
avoid = (" You already have threads on: " + "; ".join(open_titles) +
" — start something genuinely different from those.") if open_titles else ""
task = (
"YOU ARE OPENING A NEW THREAD. Don't default to Brian, poker, or being "
"useful — follow what actually pulls at you (a curiosity, a question about "
"your own existence, an opinion, the quiet itself). Give it a short title.\n"
f"A direction to start from: {seed}{avoid}"
)
# Anti-repetition: show her what she's already thought so she doesn't circle it.
recent = _recent_thoughts()
norestate = ""
if recent:
norestate = (
"\n\nTHOUGHTS YOU'VE ALREADY HAD RECENTLY (do NOT restate these or circle the "
"same ground — go somewhere new, or plainly note where this one lands):\n"
+ "\n".join(f" - {r['content']}" for r in recent)
)
body = f"{time_line}\n\n{inner}\n\n{_grist()}{norestate}\n\n{task}"
out = _safe_json(llm.complete(
[{"role": "system", "content": _THINK_PROMPT}, {"role": "user", "content": body}],
backend=backend,
))
if not out or not (out.get("content") or "").strip():
logbus.log("info", "thought loop", mode=mode, result="no parse")
return None
kind = out.get("kind", "observation")
content = out["content"].strip()
salience = _clamp(out.get("salience", 0.5))
status = out.get("status") if out.get("status") in _STATUSES else "open"
label = "react" if react_item else mode # for logging/return; storage is still a new thread
if mode == "new":
title = (out.get("title") or (react_item["title"] if react_item else content[:48])).strip()
thread_id = new_thread(title, salience=salience, status="open")
if react_item:
feeds.mark_used(react_item["id"])
else:
thread_id = thread["id"]
title = thread["title"]
add_thought(thread_id, kind, content, salience=salience, source=source)
# On a fresh new thread we keep it open; otherwise honor her status call. A
# surfaced thread she's now responded to may settle (answered) or reopen.
if mode != "new":
update_thread(thread_id, status=status)
# Permanent record — these are really hers, alongside reflections/journal.
memory.add_journal_entry("thought", content, source)
# Reach out if it tugs hard enough (config-gated; no-op when ntfy is unset).
maybe_ping(thread_id, title, content, salience)
logbus.log("info", "thought loop", mode=label, thread=thread_id, kind=kind,
salience=salience, status=status if mode != "new" else "open",
detail=f"[{label}] thread {thread_id} ({kind}, sal {salience}):\n{content}")
return {"mode": label, "thread_id": thread_id, "kind": kind,
"salience": salience, "status": status, "content": content}
def main() -> int:
import argparse
p = argparse.ArgumentParser(description="Advance Lyra's thought loop by one step.")
p.add_argument("--mode", choices=["new", "continue", "respond", "react"], help="force a mode")
args = p.parse_args()
rep = think(force_mode=args.mode)
print(json.dumps(rep, indent=2) if rep else "(no thought this pass)")
return 0
if __name__ == "__main__":
raise SystemExit(main())