From 6b1ec753964aa1a3cb65248655ceb66cac65af63 Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 9 Jun 2026 17:13:21 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20harden=20fan-out=20for=20live=20clients?= =?UTF-8?q?=20=E2=80=94=20instant=20first=20frame=20+=20offline=20status?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For multiple clients connecting to a live feed (e.g. the client portal): - cache the last broadcast frame and replay it to a new subscriber on connect, so a client sees data immediately instead of waiting a full poll cycle. - broadcast a {"feed_status":"unreachable"} frame once on transition (after 3 consecutive poll failures) so clients can render an offline state instead of a frozen chart; data frames now carry "feed_status":"ok". The cached frame reflects current state, so a client connecting while offline gets "unreachable" right away too. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/monitor.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/app/monitor.py b/app/monitor.py index 10e190c..1177893 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -63,6 +63,9 @@ class DeviceMonitor: self._keepalive = False self._task: Optional[asyncio.Task] = None self._lock = asyncio.Lock() + self._last_payload: Optional[dict] = None # replayed to new subscribers + self._consec_fail = 0 + self._reachable = True # last broadcast reachability (for transition frames) @property def running(self) -> bool: @@ -82,6 +85,13 @@ class DeviceMonitor: q: asyncio.Queue = asyncio.Queue(maxsize=5) async with self._lock: self._subscribers.add(q) + # Replay the last frame so a client connecting mid-stream sees data + # (or the current 'unreachable' state) immediately, not after a poll. + if self._last_payload is not None: + try: + q.put_nowait(self._last_payload) + except asyncio.QueueFull: + pass self._ensure_task() return q @@ -101,12 +111,26 @@ class DeviceMonitor: while self._has_demand(): snap, mst = await self._poll_once() if snap is not None: + self._consec_fail = 0 + self._reachable = True payload = _snapshot_payload(snap, self.unit_id, mst) + payload["feed_status"] = "ok" self._broadcast(payload) try: await alert_evaluator.evaluate(self.unit_id, snap) except Exception as e: logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}") + else: + # Tell clients the device went offline — once, on transition, after a + # few failures so a momentary blip doesn't flap the UI. + self._consec_fail += 1 + if self._reachable and self._consec_fail >= 3: + self._reachable = False + self._broadcast({ + "unit_id": self.unit_id, + "timestamp": datetime.utcnow().isoformat(), + "feed_status": "unreachable", + }) await asyncio.sleep(MONITOR_POLL_INTERVAL) finally: logger.info(f"[MONITOR] {self.unit_id}: feed stopped") @@ -138,6 +162,7 @@ class DeviceMonitor: db.close() def _broadcast(self, payload: dict) -> None: + self._last_payload = payload # cache for replay to new subscribers for q in list(self._subscribers): try: q.put_nowait(payload)