5dbcfc7ccf
Her remaining two wishes from the 6-19 sketch: Proactive reach-out (#6, literal): lyra/notify.py pushes to ntfy so she can reach Brian when he's not in the app. thoughts.maybe_ping gates on salience, a cooldown, and local quiet hours (all config-tunable; eager defaults), uses ntfy JSON publish (UTF-8 titles/messages), links to /thoughts, and marks the thread surfaced so chat won't also re-raise it. Disabled unless NTFY_URL is set. External input feed (#1): lyra/feeds.py pulls configurable RSS/Atom feeds (stdlib ElementTree, no new dep; tolerant of RSS 2.0 + Atom), dedupes seen items in a feed_items table, and hands think() one fresh item at a time. New 'react' mode: a would-be new thread instead reacts to a world item (FEED_REACT_PROB). Dream cycle refreshes feeds on its cadence; failures degrade to no item. Config: NTFY_URL/NTFY_TOPIC/LYRA_WEB_URL, PING_SALIENCE/COOLDOWN/QUIET_HOURS, LYRA_TIMEZONE, LYRA_FEEDS, FEED_REACT_PROB (+ .env.example). thought_meta table for ping cooldown. 10 new tests (feeds parse, react mode, ping gating); suite 65. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
134 lines
4.3 KiB
Python
134 lines
4.3 KiB
Python
"""External input stream: RSS/Atom feeds Lyra reacts to (her thought-loop #1).
|
|
|
|
Her own sketch wanted the loop fed by "external data feeds relevant to your
|
|
interests (poker articles, tech news)" — so her thoughts aren't only about her own
|
|
interior. This pulls configured feeds, remembers what it's seen, and hands the
|
|
thought loop one fresh item at a time to react to (see `thoughts.think` react mode).
|
|
|
|
Feeds are configurable (`LYRA_FEEDS`, comma-separated URLs). Parsing is stdlib
|
|
ElementTree — tolerant of both RSS 2.0 and Atom, namespaces stripped — so there's
|
|
no new dependency. Network failures degrade to "no item this pass", never raise.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from xml.etree import ElementTree as ET
|
|
|
|
import httpx
|
|
|
|
from lyra import clock, config, logbus, memory
|
|
|
|
_SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS feed_items (
|
|
id TEXT PRIMARY KEY, -- guid/link, stable per item
|
|
feed TEXT,
|
|
title TEXT,
|
|
link TEXT,
|
|
summary TEXT,
|
|
seen_at TEXT NOT NULL,
|
|
used INTEGER NOT NULL DEFAULT 0
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_feed_items_used ON feed_items(used);
|
|
"""
|
|
|
|
_ensured_for = None
|
|
_UA = {"User-Agent": "Lyra/0.3 (+thought-loop feed reader)"}
|
|
_MAX_SUMMARY = 600
|
|
|
|
|
|
def _c():
|
|
global _ensured_for
|
|
conn = memory._connection()
|
|
if _ensured_for is not conn:
|
|
conn.executescript(_SCHEMA)
|
|
_ensured_for = conn
|
|
return conn
|
|
|
|
|
|
def _local(tag: str) -> str:
|
|
return tag.rsplit("}", 1)[-1].lower()
|
|
|
|
|
|
def _text(el) -> str:
|
|
return (el.text or "").strip() if el is not None else ""
|
|
|
|
|
|
def parse(xml: bytes, feed_url: str = "") -> list[dict]:
|
|
"""Tolerant RSS-2.0 / Atom parse -> [{id,title,link,summary}]. Empty on garbage."""
|
|
try:
|
|
root = ET.fromstring(xml)
|
|
except ET.ParseError:
|
|
return []
|
|
items: list[dict] = []
|
|
for node in root.iter():
|
|
if _local(node.tag) not in ("item", "entry"):
|
|
continue
|
|
title = link = summary = guid = ""
|
|
for child in node:
|
|
name = _local(child.tag)
|
|
if name == "title":
|
|
title = _text(child)
|
|
elif name == "link":
|
|
# RSS: text; Atom: href attribute (prefer rel=alternate / first)
|
|
link = _text(child) or child.attrib.get("href", "") or link
|
|
elif name in ("description", "summary", "content"):
|
|
summary = summary or _text(child)
|
|
elif name in ("guid", "id"):
|
|
guid = _text(child)
|
|
ident = guid or link or title
|
|
if not ident or not (title or summary):
|
|
continue
|
|
items.append({
|
|
"id": ident, "title": title, "link": link,
|
|
"summary": summary[:_MAX_SUMMARY],
|
|
})
|
|
return items
|
|
|
|
|
|
def fetch(url: str) -> list[dict]:
|
|
try:
|
|
r = httpx.get(url, headers=_UA, timeout=10.0, follow_redirects=True)
|
|
if r.status_code >= 400:
|
|
logbus.log("error", "feed fetch failed", url=url, status=r.status_code)
|
|
return []
|
|
return parse(r.content, url)
|
|
except Exception as exc:
|
|
logbus.log("error", "feed fetch error", url=url, error=str(exc)[:160])
|
|
return []
|
|
|
|
|
|
def refresh() -> int:
|
|
"""Pull all configured feeds; store items not seen before. Returns new count."""
|
|
cfg = config.load()
|
|
conn = _c()
|
|
now = clock.now().isoformat()
|
|
new = 0
|
|
for url in cfg.feeds:
|
|
for it in fetch(url):
|
|
with conn:
|
|
cur = conn.execute(
|
|
"INSERT OR IGNORE INTO feed_items (id, feed, title, link, summary, seen_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
(it["id"], url, it["title"], it["link"], it["summary"], now),
|
|
)
|
|
new += cur.rowcount
|
|
if new:
|
|
logbus.log("info", "feeds refreshed", new_items=new)
|
|
return new
|
|
|
|
|
|
def next_item(refresh_first: bool = True) -> dict | None:
|
|
"""One fresh (unused) feed item, newest-seen first. Caller marks it used."""
|
|
if refresh_first:
|
|
refresh()
|
|
row = _c().execute(
|
|
"SELECT id, feed, title, link, summary FROM feed_items "
|
|
"WHERE used = 0 ORDER BY seen_at DESC, rowid DESC LIMIT 1"
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def mark_used(item_id: str) -> None:
|
|
conn = _c()
|
|
with conn:
|
|
conn.execute("UPDATE feed_items SET used = 1 WHERE id = ?", (item_id,))
|