feat(monitor): adaptive poll rate, unreachable backoff, device-offline alert
Three changes to cut wasted device/cellular load and surface outages: - Adaptive interval: full-rate (~1.25s) while a browser is subscribed for a smooth chart; relaxed cadence (MONITOR_IDLE_POLL_INTERVAL, default 10s) when the feed is keepalive-only (alerting). ~8x fewer polls with no viewer -> ~8x less cellular traffic on a metered SIM. Note: idle interval also sets the alert sampling resolution when nobody is watching. - Exponential backoff when the device is unreachable (1->2->...->60s cap), reset on the first good poll, so a dead/asleep device stops churning reconnects (log spam + wasted SYN traffic). Capped at 5s while a browser is watching so a recovery still surfaces quickly. - Device-offline alert: the reachable->unreachable transition raises a connectivity AlertEvent (sentinel rule_id=0, metric="connectivity") through the existing evaluator/dispatch seam; recovery clears it. Deduped in memory and via the DB (so a restart mid-outage doesn't duplicate the event). MonitorManager.status() now reports reachable + current mode (watched/idle/ backoff) for observability. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
+64
-1
@@ -112,6 +112,7 @@ class AlertEvaluator:
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._states: Dict[Tuple[str, int], RuleState] = {}
|
self._states: Dict[Tuple[str, int], RuleState] = {}
|
||||||
self._rule_cache: Dict[str, Tuple[float, list]] = {} # unit_id -> (fetched_at, rules)
|
self._rule_cache: Dict[str, Tuple[float, list]] = {} # unit_id -> (fetched_at, rules)
|
||||||
|
self._offline_events: Dict[str, int] = {} # unit_id -> open connectivity AlertEvent id
|
||||||
logger.info("[ALERT] rule-based evaluator ready")
|
logger.info("[ALERT] rule-based evaluator ready")
|
||||||
|
|
||||||
async def evaluate(self, unit_id: str, snap) -> None:
|
async def evaluate(self, unit_id: str, snap) -> None:
|
||||||
@@ -235,9 +236,71 @@ class AlertEvaluator:
|
|||||||
f"recovered to {value:.1f} dB (peak {peak:.1f} dB)",
|
f"recovered to {value:.1f} dB (peak {peak:.1f} dB)",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# -- connectivity (device offline/online) -------------------------------
|
||||||
|
#
|
||||||
|
# Raised by the live monitor when it loses / regains contact with a device.
|
||||||
|
# Persisted as an AlertEvent (sentinel rule_id=0, metric="connectivity") so it
|
||||||
|
# lands in the same events/inbox/ack pipeline as threshold alerts. The in-memory
|
||||||
|
# map dedupes; the DB query also dedupes across a process restart.
|
||||||
|
|
||||||
|
async def device_offline(self, unit_id: str) -> None:
|
||||||
|
if unit_id in self._offline_events:
|
||||||
|
return # already flagged offline
|
||||||
|
from app.database import SessionLocal
|
||||||
|
from app.models import AlertEvent
|
||||||
|
db = SessionLocal()
|
||||||
|
try:
|
||||||
|
existing = db.query(AlertEvent).filter_by(
|
||||||
|
unit_id=unit_id, metric="connectivity", status="active").first()
|
||||||
|
if existing: # already open in the DB (e.g. carried across a restart)
|
||||||
|
self._offline_events[unit_id] = existing.id
|
||||||
|
return
|
||||||
|
evt = AlertEvent(
|
||||||
|
rule_id=0, unit_id=unit_id, rule_name="Device unreachable",
|
||||||
|
metric="connectivity", threshold_db=0.0, status="active",
|
||||||
|
)
|
||||||
|
db.add(evt)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(evt)
|
||||||
|
self._offline_events[unit_id] = evt.id
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[ALERT] failed to record offline for {unit_id}: {e}")
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
await self._dispatch_raw("OFFLINE", unit_id, "Device unreachable",
|
||||||
|
"live monitor lost contact with the device")
|
||||||
|
|
||||||
|
async def device_online(self, unit_id: str) -> None:
|
||||||
|
self._offline_events.pop(unit_id, None)
|
||||||
|
from app.database import SessionLocal
|
||||||
|
from app.models import AlertEvent
|
||||||
|
db = SessionLocal()
|
||||||
|
cleared = 0
|
||||||
|
try:
|
||||||
|
opened = db.query(AlertEvent).filter_by(
|
||||||
|
unit_id=unit_id, metric="connectivity", status="active").all()
|
||||||
|
for evt in opened:
|
||||||
|
evt.clear_at = datetime.utcnow()
|
||||||
|
evt.status = "cleared"
|
||||||
|
cleared += 1
|
||||||
|
if cleared:
|
||||||
|
db.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[ALERT] failed to record online for {unit_id}: {e}")
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
if cleared: # only announce recovery if it was actually flagged offline
|
||||||
|
await self._dispatch_raw("ONLINE", unit_id, "Device recovered",
|
||||||
|
"live monitor regained contact with the device")
|
||||||
|
|
||||||
|
# -- event persistence + dispatch ---------------------------------------
|
||||||
|
|
||||||
async def _dispatch(self, kind: str, unit_id: str, rule, detail: str) -> None:
|
async def _dispatch(self, kind: str, unit_id: str, rule, detail: str) -> None:
|
||||||
|
await self._dispatch_raw(kind, unit_id, rule.name, detail)
|
||||||
|
|
||||||
|
async def _dispatch_raw(self, kind: str, unit_id: str, name: str, detail: str) -> None:
|
||||||
"""POC dispatch: server log. Swap in a Terra-View webhook (email/SMS) here."""
|
"""POC dispatch: server log. Swap in a Terra-View webhook (email/SMS) here."""
|
||||||
logger.warning(f"[ALERT:{kind}] {unit_id} '{rule.name}': {detail}")
|
logger.warning(f"[ALERT:{kind}] {unit_id} '{name}': {detail}")
|
||||||
|
|
||||||
|
|
||||||
# Module-level singleton (the monitor calls alert_evaluator.evaluate per snapshot)
|
# Module-level singleton (the monitor calls alert_evaluator.evaluate per snapshot)
|
||||||
|
|||||||
+50
-4
@@ -27,10 +27,27 @@ from app.alerts import alert_evaluator
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Extra idle between DOD polls. The 1s device rate-limit already paces consecutive
|
# Extra idle between DOD polls WHEN A BROWSER IS WATCHING. The 1s device rate-limit
|
||||||
# DOD? commands, so this just needs to be small — the rate-limit is the real floor.
|
# already paces consecutive DOD? commands, so this just needs to be small — the
|
||||||
|
# rate-limit is the real floor (~1.25s/poll effective).
|
||||||
MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "0.25"))
|
MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "0.25"))
|
||||||
|
|
||||||
|
# Idle cadence when NO browser is subscribed and the feed is only kept alive for
|
||||||
|
# alerting. Same data, ~8x fewer polls -> ~8x less cellular traffic on a metered
|
||||||
|
# SIM (~1 GB/device/month at full rate -> ~125 MB). NOTE: this also sets the alert
|
||||||
|
# sampling resolution when nobody is watching, so keep it <= the smallest alert
|
||||||
|
# duration_s you rely on (default 10s comfortably catches a "sustained 30/60s" rule).
|
||||||
|
MONITOR_IDLE_POLL_INTERVAL = float(os.getenv("MONITOR_IDLE_POLL_INTERVAL", "10"))
|
||||||
|
|
||||||
|
# Exponential backoff once the device is unreachable, so a powered-off / asleep /
|
||||||
|
# out-of-signal device stops churning reconnects every cycle (log spam + a trickle
|
||||||
|
# of wasted cellular data on failed SYNs). delay = min(BASE * 2**(fails-1), MAX),
|
||||||
|
# reset to full-rate on the first good poll. While a browser is actively watching we
|
||||||
|
# cap the backoff lower (WATCHED_MAX) so a recovery surfaces quickly for the viewer.
|
||||||
|
MONITOR_BACKOFF_BASE_S = float(os.getenv("MONITOR_BACKOFF_BASE_S", "1"))
|
||||||
|
MONITOR_BACKOFF_MAX_S = float(os.getenv("MONITOR_BACKOFF_MAX_S", "60"))
|
||||||
|
MONITOR_BACKOFF_WATCHED_MAX_S = float(os.getenv("MONITOR_BACKOFF_WATCHED_MAX_S", "5"))
|
||||||
|
|
||||||
# How often to refresh the run state (Measure?). It changes rarely, so we cache it
|
# How often to refresh the run state (Measure?). It changes rarely, so we cache it
|
||||||
# and skip that second rate-limited command on most polls — roughly halving the
|
# and skip that second rate-limited command on most polls — roughly halving the
|
||||||
# per-update latency (~2.5s -> ~1.3s).
|
# per-update latency (~2.5s -> ~1.3s).
|
||||||
@@ -131,6 +148,12 @@ class DeviceMonitor:
|
|||||||
while self._has_demand():
|
while self._has_demand():
|
||||||
snap, mst = await self._poll_once()
|
snap, mst = await self._poll_once()
|
||||||
if snap is not None:
|
if snap is not None:
|
||||||
|
if not self._reachable:
|
||||||
|
# Recovered from an outage — clear the connectivity alert.
|
||||||
|
try:
|
||||||
|
await alert_evaluator.device_online(self.unit_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[MONITOR] {self.unit_id}: online alert failed: {e}")
|
||||||
self._consec_fail = 0
|
self._consec_fail = 0
|
||||||
self._reachable = True
|
self._reachable = True
|
||||||
payload = _snapshot_payload(snap, self.unit_id, mst)
|
payload = _snapshot_payload(snap, self.unit_id, mst)
|
||||||
@@ -143,7 +166,8 @@ class DeviceMonitor:
|
|||||||
logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}")
|
logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}")
|
||||||
else:
|
else:
|
||||||
# Tell clients the device went offline — once, on transition, after a
|
# Tell clients the device went offline — once, on transition, after a
|
||||||
# few failures so a momentary blip doesn't flap the UI.
|
# few failures so a momentary blip doesn't flap the UI. Same edge
|
||||||
|
# raises the device-offline alert.
|
||||||
self._consec_fail += 1
|
self._consec_fail += 1
|
||||||
if self._reachable and self._consec_fail >= 3:
|
if self._reachable and self._consec_fail >= 3:
|
||||||
self._reachable = False
|
self._reachable = False
|
||||||
@@ -153,6 +177,10 @@ class DeviceMonitor:
|
|||||||
"feed_status": "unreachable",
|
"feed_status": "unreachable",
|
||||||
})
|
})
|
||||||
last_send = loop.time()
|
last_send = loop.time()
|
||||||
|
try:
|
||||||
|
await alert_evaluator.device_offline(self.unit_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[MONITOR] {self.unit_id}: offline alert failed: {e}")
|
||||||
|
|
||||||
# Heartbeat: during quiet/offline stretches, send a keepalive so an
|
# Heartbeat: during quiet/offline stretches, send a keepalive so an
|
||||||
# idle WS isn't dropped by a reverse proxy. Not cached (new subscribers
|
# idle WS isn't dropped by a reverse proxy. Not cached (new subscribers
|
||||||
@@ -166,10 +194,23 @@ class DeviceMonitor:
|
|||||||
}, cache=False)
|
}, cache=False)
|
||||||
last_send = loop.time()
|
last_send = loop.time()
|
||||||
|
|
||||||
await asyncio.sleep(MONITOR_POLL_INTERVAL)
|
await asyncio.sleep(self._next_delay())
|
||||||
finally:
|
finally:
|
||||||
logger.info(f"[MONITOR] {self.unit_id}: feed stopped")
|
logger.info(f"[MONITOR] {self.unit_id}: feed stopped")
|
||||||
|
|
||||||
|
def _next_delay(self) -> float:
|
||||||
|
"""Inter-poll delay: exponential backoff while unreachable, full-rate while a
|
||||||
|
browser is watching, relaxed cadence when the feed is keepalive-only."""
|
||||||
|
if self._consec_fail > 0:
|
||||||
|
shift = min(self._consec_fail - 1, 6) # cap growth at 2**6 = 64x base
|
||||||
|
delay = min(MONITOR_BACKOFF_BASE_S * (2 ** shift), MONITOR_BACKOFF_MAX_S)
|
||||||
|
if self._subscribers:
|
||||||
|
delay = min(delay, MONITOR_BACKOFF_WATCHED_MAX_S)
|
||||||
|
return delay
|
||||||
|
if self._subscribers:
|
||||||
|
return MONITOR_POLL_INTERVAL # a browser is watching — smooth chart
|
||||||
|
return MONITOR_IDLE_POLL_INTERVAL # keepalive-only (alerting) — save data
|
||||||
|
|
||||||
async def _poll_once(self):
|
async def _poll_once(self):
|
||||||
"""One DOD poll: read, persist, return (snapshot, measurement_start_iso)."""
|
"""One DOD poll: read, persist, return (snapshot, measurement_start_iso)."""
|
||||||
db = SessionLocal()
|
db = SessionLocal()
|
||||||
@@ -267,6 +308,11 @@ class MonitorManager:
|
|||||||
"running": m.running,
|
"running": m.running,
|
||||||
"subscribers": m.subscriber_count(),
|
"subscribers": m.subscriber_count(),
|
||||||
"keepalive": m._keepalive,
|
"keepalive": m._keepalive,
|
||||||
|
"reachable": m._reachable,
|
||||||
|
# what cadence the loop is currently using, for observability
|
||||||
|
"mode": ("backoff" if m._consec_fail > 0
|
||||||
|
else "watched" if m._subscribers
|
||||||
|
else "idle"),
|
||||||
}
|
}
|
||||||
for uid, m in self._monitors.items()
|
for uid, m in self._monitors.items()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user