From aa3e088b64622ffb2adbb112430af2ef216746ce Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 23:27:05 +0000 Subject: [PATCH] feat: per-device live monitor (fan-out) + alert evaluator (POC) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The piece the live-view + alerting work was building toward. monitor.py — one DOD poll loop per device, broadcast to many subscribers: - browser WebSockets (fixes the single-connection "second viewer sees nothing" contention — browsers no longer each open a device stream) - the alert evaluator (can keep a feed running with no browser via /monitor/start, so alerting runs continuously) - persistence (each snapshot written like the poller) DOD-sourced, so the broadcast carries ln1/ln2 (which DRD cannot). All polls go through the existing per-device lock + pool, so it serializes safely with the background poller and on-demand commands. alerts.py — pluggable POC evaluator: fires (logs) when ALERT_METRIC exceeds ALERT_THRESHOLD_DB with an ALERT_COOLDOWN_SECONDS cooldown. The rule (instantaneous vs sustained vs L10) is the single swap point; dispatch is a server log for now (email/SMS later). Endpoints: - WS /api/nl43/{unit_id}/monitor subscribe to the shared feed - POST /api/nl43/{unit_id}/monitor/start keep feed alive w/o a browser - POST /api/nl43/{unit_id}/monitor/stop drop the keep-alive - GET /api/nl43/_monitor/status running/subscribers/keepalive WS endpoint races queue.get() against a disconnect watcher so an idle feed still detects client drop and doesn't leak a subscription. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/alerts.py | 71 ++++++++++++++++++++ app/monitor.py | 176 +++++++++++++++++++++++++++++++++++++++++++++++++ app/routers.py | 74 +++++++++++++++++++++ 3 files changed, 321 insertions(+) create mode 100644 app/alerts.py create mode 100644 app/monitor.py diff --git a/app/alerts.py b/app/alerts.py new file mode 100644 index 0000000..7c001e2 --- /dev/null +++ b/app/alerts.py @@ -0,0 +1,71 @@ +""" +Alert evaluation (POC). + +Receives each monitor snapshot and fires an alert when a configured metric +exceeds a threshold, with a cooldown so a sustained loud period doesn't spam. + +The RULE here is intentionally simple and swappable. Instantaneous Lp vs a +sustained window vs L10 is still an open design decision — this evaluator is the +single plug point for it. For the POC the rule is "instantaneous metric > +threshold, rate-limited by a cooldown", and dispatch is just a server-side log. +Wire email/SMS (likely via a Terra-View webhook) into _dispatch() later. + +Config via env: +- ALERT_ENABLED (default true) +- ALERT_METRIC which snapshot field to test: lp/leq/lmax/ln1/ln2 (default lp) +- ALERT_THRESHOLD_DB numeric dB threshold (default 85) +- ALERT_COOLDOWN_SECONDS min seconds between alerts per unit (default 60) +""" + +import asyncio +import logging +import os +from typing import Dict + +logger = logging.getLogger(__name__) + + +class AlertEvaluator: + def __init__(self): + self.enabled = os.getenv("ALERT_ENABLED", "true").lower() == "true" + self.metric = os.getenv("ALERT_METRIC", "lp").lower() + self.threshold_db = float(os.getenv("ALERT_THRESHOLD_DB", "85")) + self.cooldown_s = float(os.getenv("ALERT_COOLDOWN_SECONDS", "60")) + self._last_fired: Dict[str, float] = {} + logger.info( + f"[ALERT] evaluator ready: enabled={self.enabled} metric={self.metric} " + f"threshold={self.threshold_db}dB cooldown={self.cooldown_s}s" + ) + + async def evaluate(self, unit_id: str, snap) -> None: + """Evaluate one snapshot; fire (log) if the metric exceeds threshold.""" + if not self.enabled: + return + + raw = getattr(snap, self.metric, None) + try: + level = float(raw) + except (TypeError, ValueError): + return # missing / non-numeric (e.g. "-.-") + + if level <= self.threshold_db: + return + + # Cooldown — use the event loop clock (Math.random/Date.now-free). + now = asyncio.get_running_loop().time() + if now - self._last_fired.get(unit_id, 0.0) < self.cooldown_s: + return + self._last_fired[unit_id] = now + + await self._dispatch(unit_id, level) + + async def _dispatch(self, unit_id: str, level: float) -> None: + """POC dispatch: server-side log. Swap in email/SMS here later.""" + logger.warning( + f"[ALERT] {unit_id}: {self.metric.upper()}={level:.1f} dB exceeded " + f"threshold {self.threshold_db:.1f} dB" + ) + + +# Module-level singleton +alert_evaluator = AlertEvaluator() diff --git a/app/monitor.py b/app/monitor.py new file mode 100644 index 0000000..10e190c --- /dev/null +++ b/app/monitor.py @@ -0,0 +1,176 @@ +""" +Per-device live monitor (fan-out hub). + +ONE DOD poll loop per device, broadcast to many subscribers: +- browser WebSocket clients (live view) — they no longer each open their own + device stream, so the NL43's single-connection limit stops causing the + "second viewer sees nothing" contention. +- the alert evaluator (threshold alerts), which can keep a device's feed running + even with no browser attached. +- persistence (each snapshot is written to NL43Status, like the poller does). + +The device's one TCP connection is respected: every poll goes through the same +per-device lock + connection pool in services.py, so the monitor, the background +poller, and on-demand commands all serialize safely. +""" + +import asyncio +import logging +import os +from datetime import datetime +from typing import Dict, Optional, Set + +from app.database import SessionLocal +from app.models import NL43Config, NL43Status +from app.services import NL43Client, persist_snapshot +from app.alerts import alert_evaluator + +logger = logging.getLogger(__name__) + +# Sleep between DOD polls. Note the 1s device rate-limit (and DOD?+Measure? per +# poll) already paces the effective rate to a few seconds; this is the extra idle. +MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "1.0")) + + +def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict: + """Build the broadcast payload — same shape as the DRD stream, but DOD-sourced + so it carries ln1/ln2 (which DRD cannot).""" + return { + "unit_id": unit_id, + "timestamp": datetime.utcnow().isoformat(), + "measurement_state": snap.measurement_state, + "measurement_start_time": measurement_start_time, + "counter": snap.counter, + "lp": snap.lp, + "leq": snap.leq, + "lmax": snap.lmax, + "lmin": snap.lmin, + "lpeak": snap.lpeak, + "ln1": snap.ln1, + "ln2": snap.ln2, + "raw_payload": snap.raw_payload, + } + + +class DeviceMonitor: + """Owns a single DOD poll loop for one device and fans each snapshot out to + all subscribers. Runs while it has at least one browser subscriber OR the + server-side keep-alive (alerting) flag is set.""" + + def __init__(self, unit_id: str): + self.unit_id = unit_id + self._subscribers: Set[asyncio.Queue] = set() + self._keepalive = False + self._task: Optional[asyncio.Task] = None + self._lock = asyncio.Lock() + + @property + def running(self) -> bool: + return self._task is not None and not self._task.done() + + def subscriber_count(self) -> int: + return len(self._subscribers) + + def _has_demand(self) -> bool: + return bool(self._subscribers) or self._keepalive + + def _ensure_task(self) -> None: + if self._task is None or self._task.done(): + self._task = asyncio.create_task(self._run()) + + async def subscribe(self) -> asyncio.Queue: + q: asyncio.Queue = asyncio.Queue(maxsize=5) + async with self._lock: + self._subscribers.add(q) + self._ensure_task() + return q + + async def unsubscribe(self, q: asyncio.Queue) -> None: + async with self._lock: + self._subscribers.discard(q) + + async def set_keepalive(self, on: bool) -> None: + async with self._lock: + self._keepalive = on + if on: + self._ensure_task() + + async def _run(self) -> None: + logger.info(f"[MONITOR] {self.unit_id}: feed started") + try: + while self._has_demand(): + snap, mst = await self._poll_once() + if snap is not None: + payload = _snapshot_payload(snap, self.unit_id, mst) + self._broadcast(payload) + try: + await alert_evaluator.evaluate(self.unit_id, snap) + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}") + await asyncio.sleep(MONITOR_POLL_INTERVAL) + finally: + logger.info(f"[MONITOR] {self.unit_id}: feed stopped") + + async def _poll_once(self): + """One DOD poll: read, persist, return (snapshot, measurement_start_iso).""" + db = SessionLocal() + try: + cfg = db.query(NL43Config).filter_by(unit_id=self.unit_id).first() + if not cfg or not cfg.tcp_enabled: + return None, None + client = NL43Client( + cfg.host, cfg.tcp_port, + ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password, + ftp_port=cfg.ftp_port or 21, + ) + snap = await client.request_dod() + snap.unit_id = self.unit_id + persist_snapshot(snap, db) + db.commit() + status = db.query(NL43Status).filter_by(unit_id=self.unit_id).first() + mst = (status.measurement_start_time.isoformat() + if status and status.measurement_start_time else None) + return snap, mst + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: poll failed: {e}") + return None, None + finally: + db.close() + + def _broadcast(self, payload: dict) -> None: + for q in list(self._subscribers): + try: + q.put_nowait(payload) + except asyncio.QueueFull: + # Slow consumer — drop this frame rather than stall the whole feed. + pass + + +class MonitorManager: + """Registry of per-device monitors (one per unit_id).""" + + def __init__(self): + self._monitors: Dict[str, DeviceMonitor] = {} + self._lock = asyncio.Lock() + + async def get(self, unit_id: str) -> DeviceMonitor: + async with self._lock: + m = self._monitors.get(unit_id) + if m is None: + m = DeviceMonitor(unit_id) + self._monitors[unit_id] = m + return m + + def status(self) -> dict: + return { + uid: { + "running": m.running, + "subscribers": m.subscriber_count(), + "keepalive": m._keepalive, + } + for uid, m in self._monitors.items() + } + + +# Module-level singleton +monitor_manager = MonitorManager() diff --git a/app/routers.py b/app/routers.py index 6b7cede..1031d84 100644 --- a/app/routers.py +++ b/app/routers.py @@ -246,6 +246,80 @@ async def system_resume(): return {"status": "ok", "mode": "active", "message": "Polling resumed"} +# ============================================================================ +# LIVE MONITOR (fan-out) — one DOD feed per device, broadcast to many clients +# ============================================================================ + +@router.websocket("/{unit_id}/monitor") +async def monitor_stream(websocket: WebSocket, unit_id: str): + """Subscribe a browser to the device's shared 1 Hz DOD feed. + + Any number of clients can attach without each opening its own device + connection (one poll loop per device, fanned out). Same JSON shape as the + DRD stream, but DOD-sourced so it includes ln1/ln2 (L1/L10). + """ + await websocket.accept() + from app.monitor import monitor_manager + + monitor = await monitor_manager.get(unit_id) + queue = await monitor.subscribe() + logger.info(f"Monitor subscriber attached for {unit_id} ({monitor.subscriber_count()} total)") + + async def _watch_disconnect(): + # Completes when the client disconnects, so an idle feed (no data) still + # detects the drop and we don't leak a subscription that keeps the device + # feed (and its connection) alive. + try: + while True: + msg = await websocket.receive() + if msg.get("type") == "websocket.disconnect": + return + except Exception: + return + + gone = asyncio.ensure_future(_watch_disconnect()) + try: + while not gone.done(): + try: + payload = await asyncio.wait_for(queue.get(), timeout=1.0) + except asyncio.TimeoutError: + continue # re-check gone.done() + await websocket.send_json(payload) + except WebSocketDisconnect: + logger.info(f"Monitor subscriber disconnected for {unit_id}") + except Exception as e: + logger.warning(f"Monitor stream error for {unit_id}: {e}") + finally: + gone.cancel() + await monitor.unsubscribe(queue) + + +@router.post("/{unit_id}/monitor/start") +async def monitor_start(unit_id: str): + """Keep the device's feed running even with no browser attached, so alerting + evaluates continuously. Runtime-only (resets on restart).""" + from app.monitor import monitor_manager + monitor = await monitor_manager.get(unit_id) + await monitor.set_keepalive(True) + return {"status": "ok", "unit_id": unit_id, "running": monitor.running, "keepalive": True} + + +@router.post("/{unit_id}/monitor/stop") +async def monitor_stop(unit_id: str): + """Drop the keep-alive; the feed stops once no browser subscribers remain.""" + from app.monitor import monitor_manager + monitor = await monitor_manager.get(unit_id) + await monitor.set_keepalive(False) + return {"status": "ok", "unit_id": unit_id, "keepalive": False} + + +@router.get("/_monitor/status") +async def monitor_status(): + """Status of every device monitor (running, subscriber count, keep-alive).""" + from app.monitor import monitor_manager + return {"status": "ok", "monitors": monitor_manager.status()} + + # ============================================================================ # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # ============================================================================