"""The thought loop: threaded generation, salience/surface gating, feedback.""" from __future__ import annotations import importlib import json from datetime import timedelta import pytest from lyra import clock @pytest.fixture def lyra(tmp_path, monkeypatch): monkeypatch.setenv("LYRA_DB_PATH", str(tmp_path / "test.db")) monkeypatch.delenv("NTFY_URL", raising=False) # baseline: pinging disabled (ignore .env) from lyra import llm monkeypatch.setattr(llm, "embed", lambda texts: [[0.1, 0.2, 0.3] for _ in texts]) import lyra.memory as memory importlib.reload(memory) import lyra.self_state as self_state importlib.reload(self_state) import lyra.feeds as feeds importlib.reload(feeds) import lyra.thoughts as thoughts importlib.reload(thoughts) # Canned LLM: tests set `box["next"]` to the dict think() should "generate". box = {"next": {}} monkeypatch.setattr(thoughts.llm, "complete", lambda messages, backend=None: json.dumps(box["next"])) # Keep the loop offline + silent by default: no feed fetch, no push. monkeypatch.setattr(thoughts.feeds, "next_item", lambda **k: None) monkeypatch.setattr(thoughts.notify, "push", lambda **k: False) return memory, thoughts, box def _gen(box, **fields): box["next"] = {"title": "t", "kind": "observation", "content": "c", "salience": 0.5, "status": "open"} | fields def test_new_thread_creates_chain(lyra): _, th, box = lyra _gen(box, title="my own restlessness", content="I notice a pull toward new ideas.", salience=0.4) rep = th.think(force_mode="new") assert rep["mode"] == "new" threads = th.list_threads() assert len(threads) == 1 assert threads[0]["title"] == "my own restlessness" assert threads[0]["status"] == "open" chain = th.thread_thoughts(rep["thread_id"]) assert len(chain) == 1 and "restlessness" not in chain[0]["content"].lower() def test_continue_advances_same_thread(lyra): _, th, box = lyra _gen(box, content="first link", salience=0.5) r1 = th.think(force_mode="new") _gen(box, content="second link, a new angle", salience=0.6) r2 = th.think(force_mode="continue") assert r2["mode"] == "continue" assert r2["thread_id"] == r1["thread_id"] # same thread assert len(th.list_threads()) == 1 # no new thread opened chain = th.thread_thoughts(r1["thread_id"]) assert [c["content"] for c in chain] == ["first link", "second link, a new angle"] # thread salience tracks the latest link assert th.get_thread(r1["thread_id"])["salience"] == pytest.approx(0.6) def test_no_parse_returns_none_and_writes_nothing(lyra): _, th, box = lyra box["next"] = {} # empty -> no content -> miss assert th.think(force_mode="new") is None assert th.list_threads() == [] def test_salience_gates_surfacing(lyra): _, th, box = lyra _gen(box, content="a quiet musing", salience=0.3) th.think(force_mode="new") assert th.pending_surface() is None # below the bar _gen(box, content="something I'd actually raise", salience=0.85) th.think(force_mode="new") cand = th.pending_surface() assert cand is not None and cand["latest"]["content"] == "something I'd actually raise" def test_maybe_surface_respects_gap_and_marks_once(lyra): _, th, box = lyra _gen(box, title="restlessness", content="been circling this", salience=0.9) th.think(force_mode="new") # Brian's mid-conversation (recent) -> don't interrupt. from lyra import clock recent = clock.now().isoformat() assert th.maybe_surface(recent) is None # He's been away (no last exchange) -> she leads with it, once. note = th.maybe_surface(None) assert note and "restlessness" in note and "been circling this" in note assert th.maybe_surface(None) is None # already surfaced, no repeat assert th.list_threads(status="surfaced") # status flipped def test_response_then_followup_closes_loop(lyra): memory, th, box = lyra _gen(box, title="RAG vs custom model", content="maybe RAG is enough", salience=0.8) r = th.think(force_mode="new") tid = r["thread_id"] th.mark_surfaced(tid) assert th.record_response(tid, "I think a custom model is the real goal") is True assert th._is_pending(th.get_thread(tid)) is True # awaiting her reaction _gen(box, content="ok — RAG now, own model later", salience=0.7, status="answered") r2 = th.think(force_mode="respond") assert r2["mode"] == "respond" and r2["thread_id"] == tid assert th._is_pending(th.get_thread(tid)) is False # she reacted assert th.get_thread(tid)["status"] == "answered" assert len(th.thread_thoughts(tid)) == 2 def test_set_status_drop_and_reopen(lyra): _, th, box = lyra _gen(box, content="x") r = th.think(force_mode="new") tid = r["thread_id"] assert th.set_status(tid, "dropped") is True assert th.get_thread(tid)["status"] == "dropped" assert th.set_status(tid, "bogus") is False # unknown status rejected assert th.set_status(tid, "open") is True def test_thought_recorded_in_journal(lyra): memory, th, box = lyra _gen(box, content="a thought worth keeping") th.think(force_mode="new") kinds = [e["kind"] for e in memory.list_journal(limit=50)] assert "thought" in kinds def test_decay_rests_stale_threads_but_spares_pending(lyra): _, th, box = lyra _gen(box, title="stale one", content="old idea", salience=0.8) r1 = th.think(force_mode="new") _gen(box, title="stale pending", content="awaiting his reply", salience=0.8) r2 = th.think(force_mode="new") conn = th._c() old = (clock.now() - timedelta(hours=72)).isoformat() with conn: conn.execute("UPDATE thought_threads SET updated_at=? WHERE id=?", (old, r1["thread_id"])) conn.execute("UPDATE thought_threads SET updated_at=?, last_response='hm', responded_at=? WHERE id=?", (old, clock.now().isoformat(), r2["thread_id"])) assert th.decay() == 1 # only the non-pending one rested = th.get_thread(r1["thread_id"]) assert rested["status"] == "resting" assert rested["salience"] == pytest.approx(0.8 * th.RESTING_DECAY) # the pending thread is spared — she still owes a reaction assert th.get_thread(r2["thread_id"])["status"] == "open" assert th._is_pending(th.get_thread(r2["thread_id"])) is True def test_context_note_lists_active_threads(lyra): _, th, box = lyra assert th.context_note() is None # nothing yet _gen(box, title="my own restlessness", content="a real thread of mine", salience=0.6) th.think(force_mode="new") note = th.context_note() assert note and "my own restlessness" in note and "a real thread of mine" in note def test_think_about_tool_seeds_a_thread(lyra): _, th, _ = lyra import lyra.tools as tools importlib.reload(tools) # bind to the reloaded memory/thoughts out = tools.dispatch("think_about", {"title": "am I continuous?", "thought": "do I persist between turns?", "kind": "question"}) assert "am I continuous?" in out threads = th.list_threads() assert len(threads) == 1 and threads[0]["title"] == "am I continuous?" chain = th.thread_thoughts(threads[0]["id"]) assert chain[0]["kind"] == "question" and chain[0]["source"] == "chat" # --- external feed ------------------------------------------------------- RSS = (b'Feed' b'Poker tiphttp://x/1' b'3-bet more in positiong1' b'Secondhttp://x/2d2' b'') ATOM = (b'F' b'HN post' b'something interestinga1') def test_feeds_parse_rss_and_atom(): from lyra import feeds rss = feeds.parse(RSS) assert len(rss) == 2 assert rss[0]["id"] == "g1" and rss[0]["title"] == "Poker tip" and rss[0]["link"] == "http://x/1" assert rss[1]["id"] == "http://x/2" # falls back to link when no guid atom = feeds.parse(ATOM) assert len(atom) == 1 and atom[0]["id"] == "a1" and atom[0]["link"] == "http://y/1" assert feeds.parse(b"not xml") == [] # garbage -> empty, no raise def test_react_mode_makes_a_thread_about_a_feed_item(lyra, monkeypatch): _, th, box = lyra item = {"id": "x1", "title": "World Item", "link": "http://e", "summary": "stuff happened"} monkeypatch.setattr(th.feeds, "next_item", lambda **k: item) used = [] monkeypatch.setattr(th.feeds, "mark_used", lambda i: used.append(i)) box["next"] = {"kind": "observation", "content": "that makes me think...", "salience": 0.5, "status": "open"} rep = th.think(force_mode="react") assert rep["mode"] == "react" assert th.list_threads()[0]["title"] == "World Item" # titled from the item assert used == ["x1"] # item consumed # --- proactive reach-out (ntfy) ------------------------------------------ def test_ping_sends_her_personal_message_when_she_reaches_out(lyra, monkeypatch): _, th, box = lyra monkeypatch.setenv("NTFY_URL", "http://ntfy.test") monkeypatch.setenv("PING_QUIET_HOURS", "0-0") # disable quiet window for the test sent = [] monkeypatch.setattr(th.notify, "push", lambda **k: (sent.append(k), True)[1]) # high salience AND she wrote a personal note to Brian -> texts him that note _gen(box, title="big one", content="internal thought, essay voice", salience=0.9, reach_out="Hey — been thinking about you, got a sec?") r = th.think(force_mode="new") assert r["pinged"] is True assert len(sent) == 1 assert sent[0]["message"] == "Hey — been thinking about you, got a sec?" # her words, not the thought assert th.get_thread(r["thread_id"])["status"] == "surfaced" # ping marks it surfaced def test_no_ping_without_a_reach_out_message(lyra, monkeypatch): _, th, box = lyra monkeypatch.setenv("NTFY_URL", "http://ntfy.test") monkeypatch.setenv("PING_QUIET_HOURS", "0-0") sent = [] monkeypatch.setattr(th.notify, "push", lambda **k: (sent.append(k), True)[1]) # salient thought but she did NOT decide to tell him -> no ping (it's not a broadcast) _gen(box, content="a salient thought with no reach_out", salience=0.95) assert th.think(force_mode="new")["pinged"] is False and sent == [] # the placeholder echo is rejected too (model copying the field name) _gen(box, content="another", salience=0.95, reach_out="reach_out") assert th.think(force_mode="new")["pinged"] is False and sent == [] def test_ping_salience_floor_is_optional(lyra, monkeypatch): _, th, _ = lyra monkeypatch.setenv("NTFY_URL", "http://ntfy.test") monkeypatch.setenv("PING_QUIET_HOURS", "0-0") sent = [] monkeypatch.setattr(th.notify, "push", lambda **k: (sent.append(k), True)[1]) # default floor 0.0 -> her decision (a message) is enough, any salience pings assert th.maybe_ping(1, "hey, thinking of you", 0.2) is True # but a floor can be set to suppress low-salience pings sent.clear() monkeypatch.setenv("PING_SALIENCE", "0.7") assert th.maybe_ping(1, "hey", 0.4) is False assert th.maybe_ping(1, "hey", 0.8) is True def test_no_ping_without_ntfy(lyra, monkeypatch): _, th, _ = lyra sent = [] monkeypatch.setattr(th.notify, "push", lambda **k: (sent.append(k), True)[1]) # no NTFY_URL in env -> disabled even with a message + high salience assert th.maybe_ping(1, "hey there", 0.99) is False assert sent == []