feat: thought loop reach-out (ntfy push) + external input feeds
Her remaining two wishes from the 6-19 sketch: Proactive reach-out (#6, literal): lyra/notify.py pushes to ntfy so she can reach Brian when he's not in the app. thoughts.maybe_ping gates on salience, a cooldown, and local quiet hours (all config-tunable; eager defaults), uses ntfy JSON publish (UTF-8 titles/messages), links to /thoughts, and marks the thread surfaced so chat won't also re-raise it. Disabled unless NTFY_URL is set. External input feed (#1): lyra/feeds.py pulls configurable RSS/Atom feeds (stdlib ElementTree, no new dep; tolerant of RSS 2.0 + Atom), dedupes seen items in a feed_items table, and hands think() one fresh item at a time. New 'react' mode: a would-be new thread instead reacts to a world item (FEED_REACT_PROB). Dream cycle refreshes feeds on its cadence; failures degrade to no item. Config: NTFY_URL/NTFY_TOPIC/LYRA_WEB_URL, PING_SALIENCE/COOLDOWN/QUIET_HOURS, LYRA_TIMEZONE, LYRA_FEEDS, FEED_REACT_PROB (+ .env.example). thought_meta table for ping cooldown. 10 new tests (feeds parse, react mode, ping gating); suite 65. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+133
@@ -0,0 +1,133 @@
|
||||
"""External input stream: RSS/Atom feeds Lyra reacts to (her thought-loop #1).
|
||||
|
||||
Her own sketch wanted the loop fed by "external data feeds relevant to your
|
||||
interests (poker articles, tech news)" — so her thoughts aren't only about her own
|
||||
interior. This pulls configured feeds, remembers what it's seen, and hands the
|
||||
thought loop one fresh item at a time to react to (see `thoughts.think` react mode).
|
||||
|
||||
Feeds are configurable (`LYRA_FEEDS`, comma-separated URLs). Parsing is stdlib
|
||||
ElementTree — tolerant of both RSS 2.0 and Atom, namespaces stripped — so there's
|
||||
no new dependency. Network failures degrade to "no item this pass", never raise.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
import httpx
|
||||
|
||||
from lyra import clock, config, logbus, memory
|
||||
|
||||
_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS feed_items (
|
||||
id TEXT PRIMARY KEY, -- guid/link, stable per item
|
||||
feed TEXT,
|
||||
title TEXT,
|
||||
link TEXT,
|
||||
summary TEXT,
|
||||
seen_at TEXT NOT NULL,
|
||||
used INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_feed_items_used ON feed_items(used);
|
||||
"""
|
||||
|
||||
_ensured_for = None
|
||||
_UA = {"User-Agent": "Lyra/0.3 (+thought-loop feed reader)"}
|
||||
_MAX_SUMMARY = 600
|
||||
|
||||
|
||||
def _c():
|
||||
global _ensured_for
|
||||
conn = memory._connection()
|
||||
if _ensured_for is not conn:
|
||||
conn.executescript(_SCHEMA)
|
||||
_ensured_for = conn
|
||||
return conn
|
||||
|
||||
|
||||
def _local(tag: str) -> str:
|
||||
return tag.rsplit("}", 1)[-1].lower()
|
||||
|
||||
|
||||
def _text(el) -> str:
|
||||
return (el.text or "").strip() if el is not None else ""
|
||||
|
||||
|
||||
def parse(xml: bytes, feed_url: str = "") -> list[dict]:
|
||||
"""Tolerant RSS-2.0 / Atom parse -> [{id,title,link,summary}]. Empty on garbage."""
|
||||
try:
|
||||
root = ET.fromstring(xml)
|
||||
except ET.ParseError:
|
||||
return []
|
||||
items: list[dict] = []
|
||||
for node in root.iter():
|
||||
if _local(node.tag) not in ("item", "entry"):
|
||||
continue
|
||||
title = link = summary = guid = ""
|
||||
for child in node:
|
||||
name = _local(child.tag)
|
||||
if name == "title":
|
||||
title = _text(child)
|
||||
elif name == "link":
|
||||
# RSS: text; Atom: href attribute (prefer rel=alternate / first)
|
||||
link = _text(child) or child.attrib.get("href", "") or link
|
||||
elif name in ("description", "summary", "content"):
|
||||
summary = summary or _text(child)
|
||||
elif name in ("guid", "id"):
|
||||
guid = _text(child)
|
||||
ident = guid or link or title
|
||||
if not ident or not (title or summary):
|
||||
continue
|
||||
items.append({
|
||||
"id": ident, "title": title, "link": link,
|
||||
"summary": summary[:_MAX_SUMMARY],
|
||||
})
|
||||
return items
|
||||
|
||||
|
||||
def fetch(url: str) -> list[dict]:
|
||||
try:
|
||||
r = httpx.get(url, headers=_UA, timeout=10.0, follow_redirects=True)
|
||||
if r.status_code >= 400:
|
||||
logbus.log("error", "feed fetch failed", url=url, status=r.status_code)
|
||||
return []
|
||||
return parse(r.content, url)
|
||||
except Exception as exc:
|
||||
logbus.log("error", "feed fetch error", url=url, error=str(exc)[:160])
|
||||
return []
|
||||
|
||||
|
||||
def refresh() -> int:
|
||||
"""Pull all configured feeds; store items not seen before. Returns new count."""
|
||||
cfg = config.load()
|
||||
conn = _c()
|
||||
now = clock.now().isoformat()
|
||||
new = 0
|
||||
for url in cfg.feeds:
|
||||
for it in fetch(url):
|
||||
with conn:
|
||||
cur = conn.execute(
|
||||
"INSERT OR IGNORE INTO feed_items (id, feed, title, link, summary, seen_at) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(it["id"], url, it["title"], it["link"], it["summary"], now),
|
||||
)
|
||||
new += cur.rowcount
|
||||
if new:
|
||||
logbus.log("info", "feeds refreshed", new_items=new)
|
||||
return new
|
||||
|
||||
|
||||
def next_item(refresh_first: bool = True) -> dict | None:
|
||||
"""One fresh (unused) feed item, newest-seen first. Caller marks it used."""
|
||||
if refresh_first:
|
||||
refresh()
|
||||
row = _c().execute(
|
||||
"SELECT id, feed, title, link, summary FROM feed_items "
|
||||
"WHERE used = 0 ORDER BY seen_at DESC, rowid DESC LIMIT 1"
|
||||
).fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
|
||||
def mark_used(item_id: str) -> None:
|
||||
conn = _c()
|
||||
with conn:
|
||||
conn.execute("UPDATE feed_items SET used = 1 WHERE id = ?", (item_id,))
|
||||
Reference in New Issue
Block a user