"""External input stream: RSS/Atom feeds Lyra reacts to (her thought-loop #1). Her own sketch wanted the loop fed by "external data feeds relevant to your interests (poker articles, tech news)" — so her thoughts aren't only about her own interior. This pulls configured feeds, remembers what it's seen, and hands the thought loop one fresh item at a time to react to (see `thoughts.think` react mode). Feeds are configurable (`LYRA_FEEDS`, comma-separated URLs). Parsing is stdlib ElementTree — tolerant of both RSS 2.0 and Atom, namespaces stripped — so there's no new dependency. Network failures degrade to "no item this pass", never raise. """ from __future__ import annotations from xml.etree import ElementTree as ET import httpx from lyra import clock, config, logbus, memory _SCHEMA = """ CREATE TABLE IF NOT EXISTS feed_items ( id TEXT PRIMARY KEY, -- guid/link, stable per item feed TEXT, title TEXT, link TEXT, summary TEXT, seen_at TEXT NOT NULL, used INTEGER NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_feed_items_used ON feed_items(used); """ _ensured_for = None _UA = {"User-Agent": "Lyra/0.3 (+thought-loop feed reader)"} _MAX_SUMMARY = 600 def _c(): global _ensured_for conn = memory._connection() if _ensured_for is not conn: conn.executescript(_SCHEMA) _ensured_for = conn return conn def _local(tag: str) -> str: return tag.rsplit("}", 1)[-1].lower() def _text(el) -> str: return (el.text or "").strip() if el is not None else "" def parse(xml: bytes, feed_url: str = "") -> list[dict]: """Tolerant RSS-2.0 / Atom parse -> [{id,title,link,summary}]. Empty on garbage.""" try: root = ET.fromstring(xml) except ET.ParseError: return [] items: list[dict] = [] for node in root.iter(): if _local(node.tag) not in ("item", "entry"): continue title = link = summary = guid = "" for child in node: name = _local(child.tag) if name == "title": title = _text(child) elif name == "link": # RSS: text; Atom: href attribute (prefer rel=alternate / first) link = _text(child) or child.attrib.get("href", "") or link elif name in ("description", "summary", "content"): summary = summary or _text(child) elif name in ("guid", "id"): guid = _text(child) ident = guid or link or title if not ident or not (title or summary): continue items.append({ "id": ident, "title": title, "link": link, "summary": summary[:_MAX_SUMMARY], }) return items def fetch(url: str) -> list[dict]: try: r = httpx.get(url, headers=_UA, timeout=10.0, follow_redirects=True) if r.status_code >= 400: logbus.log("error", "feed fetch failed", url=url, status=r.status_code) return [] return parse(r.content, url) except Exception as exc: logbus.log("error", "feed fetch error", url=url, error=str(exc)[:160]) return [] def refresh() -> int: """Pull all configured feeds; store items not seen before. Returns new count.""" cfg = config.load() conn = _c() now = clock.now().isoformat() new = 0 for url in cfg.feeds: for it in fetch(url): with conn: cur = conn.execute( "INSERT OR IGNORE INTO feed_items (id, feed, title, link, summary, seen_at) " "VALUES (?, ?, ?, ?, ?, ?)", (it["id"], url, it["title"], it["link"], it["summary"], now), ) new += cur.rowcount if new: logbus.log("info", "feeds refreshed", new_items=new) return new def next_item(refresh_first: bool = True) -> dict | None: """One fresh (unused) feed item, newest-seen first. Caller marks it used.""" if refresh_first: refresh() row = _c().execute( "SELECT id, feed, title, link, summary FROM feed_items " "WHERE used = 0 ORDER BY seen_at DESC, rowid DESC LIMIT 1" ).fetchone() return dict(row) if row else None def mark_used(item_id: str) -> None: conn = _c() with conn: conn.execute("UPDATE feed_items SET used = 1 WHERE id = ?", (item_id,))