"""The thought loop: threaded generation, salience/surface gating, feedback.""" from __future__ import annotations import importlib import json from datetime import timedelta import pytest from lyra import clock @pytest.fixture def lyra(tmp_path, monkeypatch): monkeypatch.setenv("LYRA_DB_PATH", str(tmp_path / "test.db")) from lyra import llm monkeypatch.setattr(llm, "embed", lambda texts: [[0.1, 0.2, 0.3] for _ in texts]) import lyra.memory as memory importlib.reload(memory) import lyra.self_state as self_state importlib.reload(self_state) import lyra.thoughts as thoughts importlib.reload(thoughts) # Canned LLM: tests set `box["next"]` to the dict think() should "generate". box = {"next": {}} monkeypatch.setattr(thoughts.llm, "complete", lambda messages, backend=None: json.dumps(box["next"])) return memory, thoughts, box def _gen(box, **fields): box["next"] = {"title": "t", "kind": "observation", "content": "c", "salience": 0.5, "status": "open"} | fields def test_new_thread_creates_chain(lyra): _, th, box = lyra _gen(box, title="my own restlessness", content="I notice a pull toward new ideas.", salience=0.4) rep = th.think(force_mode="new") assert rep["mode"] == "new" threads = th.list_threads() assert len(threads) == 1 assert threads[0]["title"] == "my own restlessness" assert threads[0]["status"] == "open" chain = th.thread_thoughts(rep["thread_id"]) assert len(chain) == 1 and "restlessness" not in chain[0]["content"].lower() def test_continue_advances_same_thread(lyra): _, th, box = lyra _gen(box, content="first link", salience=0.5) r1 = th.think(force_mode="new") _gen(box, content="second link, a new angle", salience=0.6) r2 = th.think(force_mode="continue") assert r2["mode"] == "continue" assert r2["thread_id"] == r1["thread_id"] # same thread assert len(th.list_threads()) == 1 # no new thread opened chain = th.thread_thoughts(r1["thread_id"]) assert [c["content"] for c in chain] == ["first link", "second link, a new angle"] # thread salience tracks the latest link assert th.get_thread(r1["thread_id"])["salience"] == pytest.approx(0.6) def test_no_parse_returns_none_and_writes_nothing(lyra): _, th, box = lyra box["next"] = {} # empty -> no content -> miss assert th.think(force_mode="new") is None assert th.list_threads() == [] def test_salience_gates_surfacing(lyra): _, th, box = lyra _gen(box, content="a quiet musing", salience=0.3) th.think(force_mode="new") assert th.pending_surface() is None # below the bar _gen(box, content="something I'd actually raise", salience=0.85) th.think(force_mode="new") cand = th.pending_surface() assert cand is not None and cand["latest"]["content"] == "something I'd actually raise" def test_maybe_surface_respects_gap_and_marks_once(lyra): _, th, box = lyra _gen(box, title="restlessness", content="been circling this", salience=0.9) th.think(force_mode="new") # Brian's mid-conversation (recent) -> don't interrupt. from lyra import clock recent = clock.now().isoformat() assert th.maybe_surface(recent) is None # He's been away (no last exchange) -> she leads with it, once. note = th.maybe_surface(None) assert note and "restlessness" in note and "been circling this" in note assert th.maybe_surface(None) is None # already surfaced, no repeat assert th.list_threads(status="surfaced") # status flipped def test_response_then_followup_closes_loop(lyra): memory, th, box = lyra _gen(box, title="RAG vs custom model", content="maybe RAG is enough", salience=0.8) r = th.think(force_mode="new") tid = r["thread_id"] th.mark_surfaced(tid) assert th.record_response(tid, "I think a custom model is the real goal") is True assert th._is_pending(th.get_thread(tid)) is True # awaiting her reaction _gen(box, content="ok — RAG now, own model later", salience=0.7, status="answered") r2 = th.think(force_mode="respond") assert r2["mode"] == "respond" and r2["thread_id"] == tid assert th._is_pending(th.get_thread(tid)) is False # she reacted assert th.get_thread(tid)["status"] == "answered" assert len(th.thread_thoughts(tid)) == 2 def test_set_status_drop_and_reopen(lyra): _, th, box = lyra _gen(box, content="x") r = th.think(force_mode="new") tid = r["thread_id"] assert th.set_status(tid, "dropped") is True assert th.get_thread(tid)["status"] == "dropped" assert th.set_status(tid, "bogus") is False # unknown status rejected assert th.set_status(tid, "open") is True def test_thought_recorded_in_journal(lyra): memory, th, box = lyra _gen(box, content="a thought worth keeping") th.think(force_mode="new") kinds = [e["kind"] for e in memory.list_journal(limit=50)] assert "thought" in kinds def test_decay_rests_stale_threads_but_spares_pending(lyra): _, th, box = lyra _gen(box, title="stale one", content="old idea", salience=0.8) r1 = th.think(force_mode="new") _gen(box, title="stale pending", content="awaiting his reply", salience=0.8) r2 = th.think(force_mode="new") conn = th._c() old = (clock.now() - timedelta(hours=72)).isoformat() with conn: conn.execute("UPDATE thought_threads SET updated_at=? WHERE id=?", (old, r1["thread_id"])) conn.execute("UPDATE thought_threads SET updated_at=?, last_response='hm', responded_at=? WHERE id=?", (old, clock.now().isoformat(), r2["thread_id"])) assert th.decay() == 1 # only the non-pending one rested = th.get_thread(r1["thread_id"]) assert rested["status"] == "resting" assert rested["salience"] == pytest.approx(0.8 * th.RESTING_DECAY) # the pending thread is spared — she still owes a reaction assert th.get_thread(r2["thread_id"])["status"] == "open" assert th._is_pending(th.get_thread(r2["thread_id"])) is True def test_context_note_lists_active_threads(lyra): _, th, box = lyra assert th.context_note() is None # nothing yet _gen(box, title="my own restlessness", content="a real thread of mine", salience=0.6) th.think(force_mode="new") note = th.context_note() assert note and "my own restlessness" in note and "a real thread of mine" in note def test_think_about_tool_seeds_a_thread(lyra): _, th, _ = lyra import lyra.tools as tools importlib.reload(tools) # bind to the reloaded memory/thoughts out = tools.dispatch("think_about", {"title": "am I continuous?", "thought": "do I persist between turns?", "kind": "question"}) assert "am I continuous?" in out threads = th.list_threads() assert len(threads) == 1 and threads[0]["title"] == "am I continuous?" chain = th.thread_thoughts(threads[0]["id"]) assert chain[0]["kind"] == "question" and chain[0]["source"] == "chat"