From ba622c67d848de1a7c525d09a8c5feff2e0aebf8 Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 9 Jun 2026 17:33:29 +0000 Subject: [PATCH] feat: monitor heartbeat + background poller skips active-monitored units MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Heartbeat: if nothing has been broadcast in MONITOR_HEARTBEAT_S (default 25s) — e.g. device offline and silent — send a non-cached keepalive frame so a reverse proxy (NPM) doesn't drop the idle WS. New subscribers still get the last real frame, not a heartbeat. - Poller-skip: the 60s background poller now skips any unit with a running monitor (MonitorManager.is_active). The monitor already polls it ~1Hz and keeps the status cache fresh, so the background poll was redundant and just added load/lock-contention on the device's single connection (and churn, which matters for the cellular wedge). Trade-off: the FTP start-time sync (only in the poller) doesn't run while a unit is actively monitored — fine, since reports take the authoritative start time from the FTP .rnd data. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/background_poller.py | 9 +++++++++ app/monitor.py | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/app/background_poller.py b/app/background_poller.py index 071fc31..0148671 100644 --- a/app/background_poller.py +++ b/app/background_poller.py @@ -178,10 +178,19 @@ class BackgroundPoller: now = datetime.utcnow() polled_count = 0 + from app.monitor import monitor_manager + for cfg in configs: if not self._running: break + # Skip units with an active live monitor: it polls them at ~1Hz and + # keeps the status cache fresh, so a redundant background poll would just + # add load/lock-contention on the device's single connection. + if monitor_manager.is_active(cfg.unit_id): + self._logger.debug(f"Skipping {cfg.unit_id} — live monitor active") + continue + # Get current status status = db.query(NL43Status).filter_by(unit_id=cfg.unit_id).first() diff --git a/app/monitor.py b/app/monitor.py index 1177893..47c21be 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -31,6 +31,10 @@ logger = logging.getLogger(__name__) # poll) already paces the effective rate to a few seconds; this is the extra idle. MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "1.0")) +# If nothing has been broadcast in this many seconds (e.g. device offline and +# silent), send a keepalive frame so reverse proxies don't drop the idle WS. +MONITOR_HEARTBEAT_S = float(os.getenv("MONITOR_HEARTBEAT_S", "25")) + def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict: """Build the broadcast payload — same shape as the DRD stream, but DOD-sourced @@ -107,6 +111,8 @@ class DeviceMonitor: async def _run(self) -> None: logger.info(f"[MONITOR] {self.unit_id}: feed started") + loop = asyncio.get_running_loop() + last_send = loop.time() try: while self._has_demand(): snap, mst = await self._poll_once() @@ -116,6 +122,7 @@ class DeviceMonitor: payload = _snapshot_payload(snap, self.unit_id, mst) payload["feed_status"] = "ok" self._broadcast(payload) + last_send = loop.time() try: await alert_evaluator.evaluate(self.unit_id, snap) except Exception as e: @@ -131,6 +138,20 @@ class DeviceMonitor: "timestamp": datetime.utcnow().isoformat(), "feed_status": "unreachable", }) + last_send = loop.time() + + # Heartbeat: during quiet/offline stretches, send a keepalive so an + # idle WS isn't dropped by a reverse proxy. Not cached (new subscribers + # should still get the last real frame, not a heartbeat). + if loop.time() - last_send >= MONITOR_HEARTBEAT_S: + self._broadcast({ + "unit_id": self.unit_id, + "timestamp": datetime.utcnow().isoformat(), + "feed_status": "ok" if self._reachable else "unreachable", + "heartbeat": True, + }, cache=False) + last_send = loop.time() + await asyncio.sleep(MONITOR_POLL_INTERVAL) finally: logger.info(f"[MONITOR] {self.unit_id}: feed stopped") @@ -161,8 +182,9 @@ class DeviceMonitor: finally: db.close() - def _broadcast(self, payload: dict) -> None: - self._last_payload = payload # cache for replay to new subscribers + def _broadcast(self, payload: dict, cache: bool = True) -> None: + if cache: + self._last_payload = payload # replayed to new subscribers for q in list(self._subscribers): try: q.put_nowait(payload) @@ -186,6 +208,12 @@ class MonitorManager: self._monitors[unit_id] = m return m + def is_active(self, unit_id: str) -> bool: + """True if this unit has a running monitor feed (so the background poller + can skip it — the monitor already polls it more often).""" + m = self._monitors.get(unit_id) + return m is not None and m.running + def status(self) -> dict: return { uid: {