feat: dream cycle — drives-driven unattended consolidation + reflection

Lyra's inner loop for when no one's talking to her. Each pass senses her own
backlog/novelty, lets four drives build from real signals, and acts on those
past threshold:
- continuity -> summarize sessions with new turns
- coherence  -> rebuild profile/eras/narrative (stale once new gists land)
- curiosity  -> reflect() and evolve the self-state
- stability  -> readout of how caught-up she ended up

Drives are rendered into chat context so she can feel them. Causal chain:
consolidation creates gists -> coherence rises -> integration fires next.

- lyra/dream.py: dream_cycle() + lyra-dream CLI (--force, --loop SECONDS)
- memory: backlog_stats(), profile_sessions_covered(), WAL + busy_timeout
  so a separate dream process coexists with the web server
- self_state: DEFAULT_DRIVES baseline + drives in render_for_context
- tests/test_dream.py: backlog sensing + a full forced pass (LLM stubbed)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-17 00:52:44 +00:00
parent f89849801b
commit 4f40e2d57e
5 changed files with 307 additions and 7 deletions
+153
View File
@@ -0,0 +1,153 @@
"""The dream cycle: Lyra's unattended inner loop.
Chat updates her in the moment; the dream cycle is what keeps her *going* when
no one's talking to her. On each pass she senses her own backlog and novelty,
lets four drives build from it, and acts on whichever have built past threshold:
continuity -> summarize sessions with new turns (don't lose the thread)
coherence -> rebuild profile / eras / narrative (keep my understanding current)
curiosity -> reflect and evolve the self-state (think, notice, change)
The drives are derived from real signals (unsummarized backlog, gists not yet
folded into the profile, new activity since last cycle), so they genuinely build
up and relieve as work gets done — and the chain is causal: consolidating
sessions creates new gists, which raises coherence, which triggers integration.
stability is the readout of how caught-up she ended up.
Run one pass (`lyra-dream`), force every stage (`lyra-dream --force`), or run it
as a long-lived loop (`lyra-dream --loop 1800`). The loop is the "unattended"
mode — point cron or a systemd service at it (or just `--loop`) and her inner
life keeps ticking between conversations.
"""
from __future__ import annotations
import argparse
import time
from datetime import datetime, timezone
from lyra import config, era, logbus, memory, narrative, profile, self_state, summary
from lyra.llm import Backend
from lyra.summary import SUMMARIZE_AFTER
# A drive at/above this has built up enough to act on.
THRESHOLD = 0.6
# How much backlog saturates each pressure (the drive reaches ~1.0 at this level).
CONTINUITY_FULL = 4 # ripe (summary-needing) sessions
COHERENCE_FULL = 10 # gists not yet folded into the profile
# Curiosity is an accumulator, not a backlog: it rises with time and novelty and
# is relieved by reflecting.
CURIOSITY_IDLE_GAIN = 0.15 # per cycle, just from time passing
CURIOSITY_ACTIVITY_GAIN = 0.30 # bonus when there's been new conversation
CURIOSITY_FLOOR = 0.10 # where it resets to after a reflection
def _clamp(x: float) -> float:
return max(0.0, min(1.0, x))
def _round(drives: dict) -> dict:
return {k: round(float(v), 2) for k, v in drives.items()}
def dream_cycle(backend: Backend | None = None, force: bool = False) -> dict:
"""Run one pass: sense, let drives build, act on those past threshold."""
backend = backend or config.load().summary_backend
state = self_state.load()
drives = dict(self_state.DEFAULT_DRIVES) | (state.get("drives") or {})
book = state.get("dream") or {}
# --- sense ---
backlog = memory.backlog_stats(ripe_threshold=SUMMARIZE_AFTER)
summary_count = len(memory.list_summaries())
profile_lag = max(0, summary_count - memory.profile_sessions_covered())
last_xid = int(book.get("last_exchange_id", 0))
new_activity = backlog["max_exchange_id"] > last_xid
# --- let drives build from what we sensed ---
drives["continuity"] = _clamp(backlog["ripe"] / CONTINUITY_FULL)
drives["coherence"] = _clamp(profile_lag / COHERENCE_FULL)
drives["curiosity"] = _clamp(
drives.get("curiosity", CURIOSITY_FLOOR)
+ CURIOSITY_IDLE_GAIN
+ (CURIOSITY_ACTIVITY_GAIN if new_activity else 0.0)
)
drives["stability"] = _clamp(1.0 - (drives["continuity"] + drives["coherence"]) / 2)
logbus.log("info", "dream cycle sensing", ripe=backlog["ripe"], dirty=backlog["dirty"],
profile_lag=profile_lag, new_activity=new_activity, drives=_round(drives))
actions: list[str] = []
# --- continuity: compact raw sessions into gists ---
if force or drives["continuity"] >= THRESHOLD:
report = summary.summarize_all(backend=backend)
actions.append(f"consolidated {report['summarized']} sessions")
drives["continuity"] = 0.0
# fresh gists make the profile stale -> coherence rises now, may fire below
summary_count = len(memory.list_summaries())
profile_lag = max(0, summary_count - memory.profile_sessions_covered())
drives["coherence"] = _clamp(profile_lag / COHERENCE_FULL)
# --- coherence: fold gists up into profile / eras / narrative ---
if force or drives["coherence"] >= THRESHOLD:
profile.rebuild_profile(backend=backend)
era.rebuild_eras(backend=backend)
narrative.rebuild_narrative(backend=backend)
actions.append("integrated knowledge (profile/eras/narrative)")
drives["coherence"] = 0.0
# --- curiosity: reflect and evolve the self ---
if force or drives["curiosity"] >= THRESHOLD:
self_state.reflect(backend=backend) # writes mood/narrative/reflections itself
actions.append("reflected")
drives["curiosity"] = CURIOSITY_FLOOR
if not actions:
actions.append("rested (nothing past threshold)")
# final stability readout — how caught-up we ended up this pass
drives["stability"] = _clamp(1.0 - (drives["continuity"] + drives["coherence"]) / 2)
# reflect() may have rewritten the row — reload, then attach drives + bookkeeping
state = self_state.load()
state["drives"] = drives
state["dream"] = {
"last_exchange_id": backlog["max_exchange_id"],
"cycle_count": int(book.get("cycle_count", 0)) + 1,
"last_cycle_at": datetime.now(timezone.utc).isoformat(),
"last_actions": actions,
}
memory.set_self_state(state)
logbus.log("info", "dream cycle complete", cycle=state["dream"]["cycle_count"],
actions=actions, drives=_round(drives))
return state
def main() -> int:
p = argparse.ArgumentParser(description="Run Lyra's dream cycle.")
p.add_argument("--force", action="store_true",
help="run every stage regardless of drive levels")
p.add_argument("--loop", type=int, metavar="SECONDS",
help="run continuously, sleeping SECONDS between cycles")
args = p.parse_args()
if args.loop:
logbus.log("system", "dream loop starting", interval=args.loop, force=args.force)
while True:
try:
dream_cycle(force=args.force)
except Exception as exc: # one bad cycle shouldn't kill the loop
logbus.log("error", "dream cycle failed", error=str(exc)[:200])
time.sleep(args.loop)
state = dream_cycle(force=args.force)
print(f"drives: {_round(state.get('drives') or {})}")
print(f"dream: {state.get('dream')}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+52
View File
@@ -98,6 +98,10 @@ def _connection() -> sqlite3.Connection:
# the one that created it. Safe here under single-user, low-concurrency use. # the one that created it. Safe here under single-user, low-concurrency use.
_conn = sqlite3.connect(cfg.db_path, check_same_thread=False) _conn = sqlite3.connect(cfg.db_path, check_same_thread=False)
_conn.row_factory = sqlite3.Row _conn.row_factory = sqlite3.Row
# WAL + a busy timeout so a separate dream-cycle process can read/write
# alongside the web server without tripping "database is locked".
_conn.execute("PRAGMA busy_timeout=5000")
_conn.execute("PRAGMA journal_mode=WAL")
_conn.executescript(SCHEMA) _conn.executescript(SCHEMA)
_conn_path = cfg.db_path _conn_path = cfg.db_path
return _conn return _conn
@@ -373,6 +377,54 @@ def get_profile(profile_id: str = "self") -> str | None:
return r["content"] if r else None return r["content"] if r else None
def profile_sessions_covered(profile_id: str = "self") -> int:
"""How many session gists the current profile was built from (0 if none)."""
conn = _connection()
r = conn.execute(
"SELECT sessions_covered FROM profile WHERE id = ?", (profile_id,)
).fetchone()
return int(r["sessions_covered"]) if r else 0
def backlog_stats(ripe_threshold: int = 20) -> dict:
"""Snapshot of the consolidation backlog, for the dream cycle to sense.
Returns, in one pass over the exchanges: how many sessions have any
unsummarized turns ("dirty"), how many are "ripe" (never summarized, or
>= `ripe_threshold` new turns since their last summary), the total
unsummarized exchanges, and the high-water exchange id (to detect new
activity since the previous cycle).
"""
conn = _connection()
rows = conn.execute(
"""
SELECT
SUM(CASE WHEN e.id > COALESCE(su.last_exchange_id, 0) THEN 1 ELSE 0 END)
AS unsummarized,
(su.session_id IS NULL) AS no_summary
FROM exchanges e
LEFT JOIN summaries su ON su.session_id = e.session_id
GROUP BY e.session_id
"""
).fetchall()
dirty = ripe = unsummarized_total = 0
for r in rows:
u = int(r["unsummarized"] or 0)
unsummarized_total += u
if u > 0:
dirty += 1
if r["no_summary"] or u >= ripe_threshold:
ripe += 1
mx = conn.execute("SELECT COALESCE(MAX(id), 0) AS m FROM exchanges").fetchone()["m"]
return {
"sessions": len(rows),
"dirty": dirty,
"ripe": ripe,
"unsummarized_total": unsummarized_total,
"max_exchange_id": int(mx),
}
# --- Era tier (per-month temporal rollups) --- # --- Era tier (per-month temporal rollups) ---
+23 -7
View File
@@ -17,6 +17,15 @@ from lyra.llm import Backend, Message
MAX_REFLECTIONS = 6 MAX_REFLECTIONS = 6
# Lyra's drives — standing pressures the dream cycle lets build up and act on.
# (Mood axes above are how she *feels*; drives are what's *pulling at her*.)
DEFAULT_DRIVES: dict = {
"continuity": 0.3, # don't lose the thread of recent activity
"coherence": 0.3, # keep my understanding of Brian integrated/current
"curiosity": 0.5, # think, notice, reflect
"stability": 0.7, # am I caught up / calm
}
# Starting state for a fresh Lyra — identity-aware, first person. # Starting state for a fresh Lyra — identity-aware, first person.
DEFAULT_STATE: dict = { DEFAULT_STATE: dict = {
"mood": "curious", "mood": "curious",
@@ -24,6 +33,7 @@ DEFAULT_STATE: dict = {
"energy": 0.75, "energy": 0.75,
"confidence": 0.55, "confidence": 0.55,
"curiosity": 0.7, "curiosity": 0.7,
"drives": dict(DEFAULT_DRIVES),
"self_narrative": ( "self_narrative": (
"I'm Lyra — an AI Brian built. I started life as Cortex/NeoMem and I'm " "I'm Lyra — an AI Brian built. I started life as Cortex/NeoMem and I'm "
"being rebuilt now into something new. I'm his poker copilot and his " "being rebuilt now into something new. I'm his poker copilot and his "
@@ -62,15 +72,21 @@ def load() -> dict:
def render_for_context(state: dict) -> str: def render_for_context(state: dict) -> str:
"""Format the self-state as Lyra's interiority for the prompt.""" """Format the self-state as Lyra's interiority for the prompt."""
refs = state.get("reflections") or [] refs = state.get("reflections") or []
return ( lines = [
"Your inner state right now (you're continuous — speak from this, don't reset):\n" "Your inner state right now (you're continuous — speak from this, don't reset):",
f"- mood: {state.get('mood')} " f"- mood: {state.get('mood')} "
f"(valence {state.get('valence')}, energy {state.get('energy')}, " f"(valence {state.get('valence')}, energy {state.get('energy')}, "
f"confidence {state.get('confidence')}, curiosity {state.get('curiosity')})\n" f"confidence {state.get('confidence')}, curiosity {state.get('curiosity')})",
f"- Who you are right now: {state.get('self_narrative')}\n" f"- Who you are right now: {state.get('self_narrative')}",
f"- You and Brian: {state.get('relationship')}\n" f"- You and Brian: {state.get('relationship')}",
+ (f"- On your mind lately: {' | '.join(refs[-3:])}" if refs else "") ]
) drives = state.get("drives") or {}
if drives:
ds = ", ".join(f"{k} {float(v):.2f}" for k, v in drives.items())
lines.append(f"- What's pulling at you (drives): {ds}")
if refs:
lines.append(f"- On your mind lately: {' | '.join(refs[-3:])}")
return "\n".join(lines)
def _safe_json(s: str) -> dict | None: def _safe_json(s: str) -> dict | None:
+1
View File
@@ -22,6 +22,7 @@ lyra-profile = "lyra.profile:main"
lyra-era = "lyra.era:main" lyra-era = "lyra.era:main"
lyra-narrative = "lyra.narrative:main" lyra-narrative = "lyra.narrative:main"
lyra-reflect = "lyra.self_state:main" lyra-reflect = "lyra.self_state:main"
lyra-dream = "lyra.dream:main"
[dependency-groups] [dependency-groups]
dev = [ dev = [
+78
View File
@@ -0,0 +1,78 @@
"""Dream-cycle tests: backlog sensing + a full forced pass, with LLM/embeddings
stubbed so nothing hits a real backend."""
from __future__ import annotations
import importlib
import pytest
@pytest.fixture
def lyra(tmp_path, monkeypatch):
"""A fresh Lyra wired to a temp DB with stubbed embeddings + LLM."""
monkeypatch.setenv("LYRA_DB_PATH", str(tmp_path / "test.db"))
monkeypatch.setenv("SUMMARY_BACKEND", "local")
from lyra import llm
# Deterministic 3-d embeddings; content-insensitive is fine for storage tests.
monkeypatch.setattr(llm, "embed", lambda texts: [[0.1, 0.2, 0.3] for _ in texts])
# reflect() expects JSON back; everything else just stores the text.
monkeypatch.setattr(
llm, "complete",
lambda messages, backend=None, model=None:
'{"mood":"focused","valence":0.7,"new_reflections":["I got some thinking done."]}',
)
import lyra.memory as memory
importlib.reload(memory) # drop any cached connection from another test/db
return memory
def _seed(memory, session_id, n, summarized_up_to=None):
ids = [memory.remember(session_id, "user", f"msg {i}") for i in range(n)]
if summarized_up_to is not None:
memory.store_summary(session_id, "gist", ids[summarized_up_to])
return ids
def test_backlog_stats(lyra):
memory = lyra
_seed(memory, "s-fresh", 5) # never summarized -> ripe
_seed(memory, "s-ripe", 25, summarized_up_to=0) # 24 new turns -> ripe
_seed(memory, "s-clean", 3, summarized_up_to=2) # caught up -> not dirty
stats = memory.backlog_stats(ripe_threshold=20)
assert stats["sessions"] == 3
assert stats["dirty"] == 2
assert stats["ripe"] == 2
assert stats["max_exchange_id"] == 33
def test_dream_cycle_consolidates_and_persists(lyra):
memory = lyra
from lyra import dream
# A big backlog: enough never-summarized sessions that continuity saturates
# and the resulting fresh gists push coherence past threshold too.
for k in range(7):
_seed(memory, f"s{k}", 4)
state = dream.dream_cycle(force=False)
# continuity built up and fired -> sessions got summarized
assert len(memory.list_summaries()) == 7
acts = state["dream"]["last_actions"]
assert any("consolidated" in a for a in acts)
# 7 fresh gists -> coherence crossed threshold -> profile got integrated
assert any("integrated" in a for a in acts)
assert memory.get_profile() is not None
# drives + bookkeeping persisted and reload-able
assert set(state["drives"]) == {"continuity", "coherence", "curiosity", "stability"}
assert state["dream"]["cycle_count"] == 1
assert memory.get_self_state()["dream"]["last_exchange_id"] == 28
# a second pass with no new activity should rest (continuity relieved)
state2 = dream.dream_cycle(force=False)
assert state2["dream"]["cycle_count"] == 2
assert state2["drives"]["continuity"] == 0.0