From 1f5f1fb1f62f3e1a99f05351cf8bd4849c577588 Mon Sep 17 00:00:00 2001 From: serversdown Date: Wed, 10 Jun 2026 06:47:20 +0000 Subject: [PATCH] feat(monitor): adaptive poll rate, unreachable backoff, device-offline alert Three changes to cut wasted device/cellular load and surface outages: - Adaptive interval: full-rate (~1.25s) while a browser is subscribed for a smooth chart; relaxed cadence (MONITOR_IDLE_POLL_INTERVAL, default 10s) when the feed is keepalive-only (alerting). ~8x fewer polls with no viewer -> ~8x less cellular traffic on a metered SIM. Note: idle interval also sets the alert sampling resolution when nobody is watching. - Exponential backoff when the device is unreachable (1->2->...->60s cap), reset on the first good poll, so a dead/asleep device stops churning reconnects (log spam + wasted SYN traffic). Capped at 5s while a browser is watching so a recovery still surfaces quickly. - Device-offline alert: the reachable->unreachable transition raises a connectivity AlertEvent (sentinel rule_id=0, metric="connectivity") through the existing evaluator/dispatch seam; recovery clears it. Deduped in memory and via the DB (so a restart mid-outage doesn't duplicate the event). MonitorManager.status() now reports reachable + current mode (watched/idle/ backoff) for observability. Co-Authored-By: Claude Opus 4.8 --- app/alerts.py | 65 +++++++++++++++++++++++++++++++++++++++++++++++++- app/monitor.py | 54 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 114 insertions(+), 5 deletions(-) diff --git a/app/alerts.py b/app/alerts.py index 1fb84e7..f5eb142 100644 --- a/app/alerts.py +++ b/app/alerts.py @@ -112,6 +112,7 @@ class AlertEvaluator: def __init__(self): self._states: Dict[Tuple[str, int], RuleState] = {} self._rule_cache: Dict[str, Tuple[float, list]] = {} # unit_id -> (fetched_at, rules) + self._offline_events: Dict[str, int] = {} # unit_id -> open connectivity AlertEvent id logger.info("[ALERT] rule-based evaluator ready") async def evaluate(self, unit_id: str, snap) -> None: @@ -235,9 +236,71 @@ class AlertEvaluator: f"recovered to {value:.1f} dB (peak {peak:.1f} dB)", ) + # -- connectivity (device offline/online) ------------------------------- + # + # Raised by the live monitor when it loses / regains contact with a device. + # Persisted as an AlertEvent (sentinel rule_id=0, metric="connectivity") so it + # lands in the same events/inbox/ack pipeline as threshold alerts. The in-memory + # map dedupes; the DB query also dedupes across a process restart. + + async def device_offline(self, unit_id: str) -> None: + if unit_id in self._offline_events: + return # already flagged offline + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + try: + existing = db.query(AlertEvent).filter_by( + unit_id=unit_id, metric="connectivity", status="active").first() + if existing: # already open in the DB (e.g. carried across a restart) + self._offline_events[unit_id] = existing.id + return + evt = AlertEvent( + rule_id=0, unit_id=unit_id, rule_name="Device unreachable", + metric="connectivity", threshold_db=0.0, status="active", + ) + db.add(evt) + db.commit() + db.refresh(evt) + self._offline_events[unit_id] = evt.id + except Exception as e: + logger.warning(f"[ALERT] failed to record offline for {unit_id}: {e}") + finally: + db.close() + await self._dispatch_raw("OFFLINE", unit_id, "Device unreachable", + "live monitor lost contact with the device") + + async def device_online(self, unit_id: str) -> None: + self._offline_events.pop(unit_id, None) + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + cleared = 0 + try: + opened = db.query(AlertEvent).filter_by( + unit_id=unit_id, metric="connectivity", status="active").all() + for evt in opened: + evt.clear_at = datetime.utcnow() + evt.status = "cleared" + cleared += 1 + if cleared: + db.commit() + except Exception as e: + logger.warning(f"[ALERT] failed to record online for {unit_id}: {e}") + finally: + db.close() + if cleared: # only announce recovery if it was actually flagged offline + await self._dispatch_raw("ONLINE", unit_id, "Device recovered", + "live monitor regained contact with the device") + + # -- event persistence + dispatch --------------------------------------- + async def _dispatch(self, kind: str, unit_id: str, rule, detail: str) -> None: + await self._dispatch_raw(kind, unit_id, rule.name, detail) + + async def _dispatch_raw(self, kind: str, unit_id: str, name: str, detail: str) -> None: """POC dispatch: server log. Swap in a Terra-View webhook (email/SMS) here.""" - logger.warning(f"[ALERT:{kind}] {unit_id} '{rule.name}': {detail}") + logger.warning(f"[ALERT:{kind}] {unit_id} '{name}': {detail}") # Module-level singleton (the monitor calls alert_evaluator.evaluate per snapshot) diff --git a/app/monitor.py b/app/monitor.py index a03dca5..25539bc 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -27,10 +27,27 @@ from app.alerts import alert_evaluator logger = logging.getLogger(__name__) -# Extra idle between DOD polls. The 1s device rate-limit already paces consecutive -# DOD? commands, so this just needs to be small — the rate-limit is the real floor. +# Extra idle between DOD polls WHEN A BROWSER IS WATCHING. The 1s device rate-limit +# already paces consecutive DOD? commands, so this just needs to be small — the +# rate-limit is the real floor (~1.25s/poll effective). MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "0.25")) +# Idle cadence when NO browser is subscribed and the feed is only kept alive for +# alerting. Same data, ~8x fewer polls -> ~8x less cellular traffic on a metered +# SIM (~1 GB/device/month at full rate -> ~125 MB). NOTE: this also sets the alert +# sampling resolution when nobody is watching, so keep it <= the smallest alert +# duration_s you rely on (default 10s comfortably catches a "sustained 30/60s" rule). +MONITOR_IDLE_POLL_INTERVAL = float(os.getenv("MONITOR_IDLE_POLL_INTERVAL", "10")) + +# Exponential backoff once the device is unreachable, so a powered-off / asleep / +# out-of-signal device stops churning reconnects every cycle (log spam + a trickle +# of wasted cellular data on failed SYNs). delay = min(BASE * 2**(fails-1), MAX), +# reset to full-rate on the first good poll. While a browser is actively watching we +# cap the backoff lower (WATCHED_MAX) so a recovery surfaces quickly for the viewer. +MONITOR_BACKOFF_BASE_S = float(os.getenv("MONITOR_BACKOFF_BASE_S", "1")) +MONITOR_BACKOFF_MAX_S = float(os.getenv("MONITOR_BACKOFF_MAX_S", "60")) +MONITOR_BACKOFF_WATCHED_MAX_S = float(os.getenv("MONITOR_BACKOFF_WATCHED_MAX_S", "5")) + # How often to refresh the run state (Measure?). It changes rarely, so we cache it # and skip that second rate-limited command on most polls — roughly halving the # per-update latency (~2.5s -> ~1.3s). @@ -131,6 +148,12 @@ class DeviceMonitor: while self._has_demand(): snap, mst = await self._poll_once() if snap is not None: + if not self._reachable: + # Recovered from an outage — clear the connectivity alert. + try: + await alert_evaluator.device_online(self.unit_id) + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: online alert failed: {e}") self._consec_fail = 0 self._reachable = True payload = _snapshot_payload(snap, self.unit_id, mst) @@ -143,7 +166,8 @@ class DeviceMonitor: logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}") else: # Tell clients the device went offline — once, on transition, after a - # few failures so a momentary blip doesn't flap the UI. + # few failures so a momentary blip doesn't flap the UI. Same edge + # raises the device-offline alert. self._consec_fail += 1 if self._reachable and self._consec_fail >= 3: self._reachable = False @@ -153,6 +177,10 @@ class DeviceMonitor: "feed_status": "unreachable", }) last_send = loop.time() + try: + await alert_evaluator.device_offline(self.unit_id) + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: offline alert failed: {e}") # Heartbeat: during quiet/offline stretches, send a keepalive so an # idle WS isn't dropped by a reverse proxy. Not cached (new subscribers @@ -166,10 +194,23 @@ class DeviceMonitor: }, cache=False) last_send = loop.time() - await asyncio.sleep(MONITOR_POLL_INTERVAL) + await asyncio.sleep(self._next_delay()) finally: logger.info(f"[MONITOR] {self.unit_id}: feed stopped") + def _next_delay(self) -> float: + """Inter-poll delay: exponential backoff while unreachable, full-rate while a + browser is watching, relaxed cadence when the feed is keepalive-only.""" + if self._consec_fail > 0: + shift = min(self._consec_fail - 1, 6) # cap growth at 2**6 = 64x base + delay = min(MONITOR_BACKOFF_BASE_S * (2 ** shift), MONITOR_BACKOFF_MAX_S) + if self._subscribers: + delay = min(delay, MONITOR_BACKOFF_WATCHED_MAX_S) + return delay + if self._subscribers: + return MONITOR_POLL_INTERVAL # a browser is watching — smooth chart + return MONITOR_IDLE_POLL_INTERVAL # keepalive-only (alerting) — save data + async def _poll_once(self): """One DOD poll: read, persist, return (snapshot, measurement_start_iso).""" db = SessionLocal() @@ -267,6 +308,11 @@ class MonitorManager: "running": m.running, "subscribers": m.subscriber_count(), "keepalive": m._keepalive, + "reachable": m._reachable, + # what cadence the loop is currently using, for observability + "mode": ("backoff" if m._consec_fail > 0 + else "watched" if m._subscribers + else "idle"), } for uid, m in self._monitors.items() }