feat: harden fan-out for live clients — instant first frame + offline status
For multiple clients connecting to a live feed (e.g. the client portal):
- cache the last broadcast frame and replay it to a new subscriber on
connect, so a client sees data immediately instead of waiting a full
poll cycle.
- broadcast a {"feed_status":"unreachable"} frame once on transition (after
3 consecutive poll failures) so clients can render an offline state
instead of a frozen chart; data frames now carry "feed_status":"ok".
The cached frame reflects current state, so a client connecting while
offline gets "unreachable" right away too.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -63,6 +63,9 @@ class DeviceMonitor:
|
|||||||
self._keepalive = False
|
self._keepalive = False
|
||||||
self._task: Optional[asyncio.Task] = None
|
self._task: Optional[asyncio.Task] = None
|
||||||
self._lock = asyncio.Lock()
|
self._lock = asyncio.Lock()
|
||||||
|
self._last_payload: Optional[dict] = None # replayed to new subscribers
|
||||||
|
self._consec_fail = 0
|
||||||
|
self._reachable = True # last broadcast reachability (for transition frames)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def running(self) -> bool:
|
def running(self) -> bool:
|
||||||
@@ -82,6 +85,13 @@ class DeviceMonitor:
|
|||||||
q: asyncio.Queue = asyncio.Queue(maxsize=5)
|
q: asyncio.Queue = asyncio.Queue(maxsize=5)
|
||||||
async with self._lock:
|
async with self._lock:
|
||||||
self._subscribers.add(q)
|
self._subscribers.add(q)
|
||||||
|
# Replay the last frame so a client connecting mid-stream sees data
|
||||||
|
# (or the current 'unreachable' state) immediately, not after a poll.
|
||||||
|
if self._last_payload is not None:
|
||||||
|
try:
|
||||||
|
q.put_nowait(self._last_payload)
|
||||||
|
except asyncio.QueueFull:
|
||||||
|
pass
|
||||||
self._ensure_task()
|
self._ensure_task()
|
||||||
return q
|
return q
|
||||||
|
|
||||||
@@ -101,12 +111,26 @@ class DeviceMonitor:
|
|||||||
while self._has_demand():
|
while self._has_demand():
|
||||||
snap, mst = await self._poll_once()
|
snap, mst = await self._poll_once()
|
||||||
if snap is not None:
|
if snap is not None:
|
||||||
|
self._consec_fail = 0
|
||||||
|
self._reachable = True
|
||||||
payload = _snapshot_payload(snap, self.unit_id, mst)
|
payload = _snapshot_payload(snap, self.unit_id, mst)
|
||||||
|
payload["feed_status"] = "ok"
|
||||||
self._broadcast(payload)
|
self._broadcast(payload)
|
||||||
try:
|
try:
|
||||||
await alert_evaluator.evaluate(self.unit_id, snap)
|
await alert_evaluator.evaluate(self.unit_id, snap)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}")
|
logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}")
|
||||||
|
else:
|
||||||
|
# Tell clients the device went offline — once, on transition, after a
|
||||||
|
# few failures so a momentary blip doesn't flap the UI.
|
||||||
|
self._consec_fail += 1
|
||||||
|
if self._reachable and self._consec_fail >= 3:
|
||||||
|
self._reachable = False
|
||||||
|
self._broadcast({
|
||||||
|
"unit_id": self.unit_id,
|
||||||
|
"timestamp": datetime.utcnow().isoformat(),
|
||||||
|
"feed_status": "unreachable",
|
||||||
|
})
|
||||||
await asyncio.sleep(MONITOR_POLL_INTERVAL)
|
await asyncio.sleep(MONITOR_POLL_INTERVAL)
|
||||||
finally:
|
finally:
|
||||||
logger.info(f"[MONITOR] {self.unit_id}: feed stopped")
|
logger.info(f"[MONITOR] {self.unit_id}: feed stopped")
|
||||||
@@ -138,6 +162,7 @@ class DeviceMonitor:
|
|||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
def _broadcast(self, payload: dict) -> None:
|
def _broadcast(self, payload: dict) -> None:
|
||||||
|
self._last_payload = payload # cache for replay to new subscribers
|
||||||
for q in list(self._subscribers):
|
for q in list(self._subscribers):
|
||||||
try:
|
try:
|
||||||
q.put_nowait(payload)
|
q.put_nowait(payload)
|
||||||
|
|||||||
Reference in New Issue
Block a user