feat: monitor heartbeat + background poller skips active-monitored units
- Heartbeat: if nothing has been broadcast in MONITOR_HEARTBEAT_S (default 25s) — e.g. device offline and silent — send a non-cached keepalive frame so a reverse proxy (NPM) doesn't drop the idle WS. New subscribers still get the last real frame, not a heartbeat. - Poller-skip: the 60s background poller now skips any unit with a running monitor (MonitorManager.is_active). The monitor already polls it ~1Hz and keeps the status cache fresh, so the background poll was redundant and just added load/lock-contention on the device's single connection (and churn, which matters for the cellular wedge). Trade-off: the FTP start-time sync (only in the poller) doesn't run while a unit is actively monitored — fine, since reports take the authoritative start time from the FTP .rnd data. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+30
-2
@@ -31,6 +31,10 @@ logger = logging.getLogger(__name__)
|
||||
# poll) already paces the effective rate to a few seconds; this is the extra idle.
|
||||
MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "1.0"))
|
||||
|
||||
# If nothing has been broadcast in this many seconds (e.g. device offline and
|
||||
# silent), send a keepalive frame so reverse proxies don't drop the idle WS.
|
||||
MONITOR_HEARTBEAT_S = float(os.getenv("MONITOR_HEARTBEAT_S", "25"))
|
||||
|
||||
|
||||
def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict:
|
||||
"""Build the broadcast payload — same shape as the DRD stream, but DOD-sourced
|
||||
@@ -107,6 +111,8 @@ class DeviceMonitor:
|
||||
|
||||
async def _run(self) -> None:
|
||||
logger.info(f"[MONITOR] {self.unit_id}: feed started")
|
||||
loop = asyncio.get_running_loop()
|
||||
last_send = loop.time()
|
||||
try:
|
||||
while self._has_demand():
|
||||
snap, mst = await self._poll_once()
|
||||
@@ -116,6 +122,7 @@ class DeviceMonitor:
|
||||
payload = _snapshot_payload(snap, self.unit_id, mst)
|
||||
payload["feed_status"] = "ok"
|
||||
self._broadcast(payload)
|
||||
last_send = loop.time()
|
||||
try:
|
||||
await alert_evaluator.evaluate(self.unit_id, snap)
|
||||
except Exception as e:
|
||||
@@ -131,6 +138,20 @@ class DeviceMonitor:
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"feed_status": "unreachable",
|
||||
})
|
||||
last_send = loop.time()
|
||||
|
||||
# Heartbeat: during quiet/offline stretches, send a keepalive so an
|
||||
# idle WS isn't dropped by a reverse proxy. Not cached (new subscribers
|
||||
# should still get the last real frame, not a heartbeat).
|
||||
if loop.time() - last_send >= MONITOR_HEARTBEAT_S:
|
||||
self._broadcast({
|
||||
"unit_id": self.unit_id,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"feed_status": "ok" if self._reachable else "unreachable",
|
||||
"heartbeat": True,
|
||||
}, cache=False)
|
||||
last_send = loop.time()
|
||||
|
||||
await asyncio.sleep(MONITOR_POLL_INTERVAL)
|
||||
finally:
|
||||
logger.info(f"[MONITOR] {self.unit_id}: feed stopped")
|
||||
@@ -161,8 +182,9 @@ class DeviceMonitor:
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
def _broadcast(self, payload: dict) -> None:
|
||||
self._last_payload = payload # cache for replay to new subscribers
|
||||
def _broadcast(self, payload: dict, cache: bool = True) -> None:
|
||||
if cache:
|
||||
self._last_payload = payload # replayed to new subscribers
|
||||
for q in list(self._subscribers):
|
||||
try:
|
||||
q.put_nowait(payload)
|
||||
@@ -186,6 +208,12 @@ class MonitorManager:
|
||||
self._monitors[unit_id] = m
|
||||
return m
|
||||
|
||||
def is_active(self, unit_id: str) -> bool:
|
||||
"""True if this unit has a running monitor feed (so the background poller
|
||||
can skip it — the monitor already polls it more often)."""
|
||||
m = self._monitors.get(unit_id)
|
||||
return m is not None and m.running
|
||||
|
||||
def status(self) -> dict:
|
||||
return {
|
||||
uid: {
|
||||
|
||||
Reference in New Issue
Block a user