feat: per-device live monitor (fan-out) + alert evaluator (POC)
The piece the live-view + alerting work was building toward.
monitor.py — one DOD poll loop per device, broadcast to many subscribers:
- browser WebSockets (fixes the single-connection "second viewer sees
nothing" contention — browsers no longer each open a device stream)
- the alert evaluator (can keep a feed running with no browser via
/monitor/start, so alerting runs continuously)
- persistence (each snapshot written like the poller)
DOD-sourced, so the broadcast carries ln1/ln2 (which DRD cannot). All polls
go through the existing per-device lock + pool, so it serializes safely with
the background poller and on-demand commands.
alerts.py — pluggable POC evaluator: fires (logs) when ALERT_METRIC exceeds
ALERT_THRESHOLD_DB with an ALERT_COOLDOWN_SECONDS cooldown. The rule
(instantaneous vs sustained vs L10) is the single swap point; dispatch is a
server log for now (email/SMS later).
Endpoints:
- WS /api/nl43/{unit_id}/monitor subscribe to the shared feed
- POST /api/nl43/{unit_id}/monitor/start keep feed alive w/o a browser
- POST /api/nl43/{unit_id}/monitor/stop drop the keep-alive
- GET /api/nl43/_monitor/status running/subscribers/keepalive
WS endpoint races queue.get() against a disconnect watcher so an idle feed
still detects client drop and doesn't leak a subscription.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,71 @@
|
|||||||
|
"""
|
||||||
|
Alert evaluation (POC).
|
||||||
|
|
||||||
|
Receives each monitor snapshot and fires an alert when a configured metric
|
||||||
|
exceeds a threshold, with a cooldown so a sustained loud period doesn't spam.
|
||||||
|
|
||||||
|
The RULE here is intentionally simple and swappable. Instantaneous Lp vs a
|
||||||
|
sustained window vs L10 is still an open design decision — this evaluator is the
|
||||||
|
single plug point for it. For the POC the rule is "instantaneous metric >
|
||||||
|
threshold, rate-limited by a cooldown", and dispatch is just a server-side log.
|
||||||
|
Wire email/SMS (likely via a Terra-View webhook) into _dispatch() later.
|
||||||
|
|
||||||
|
Config via env:
|
||||||
|
- ALERT_ENABLED (default true)
|
||||||
|
- ALERT_METRIC which snapshot field to test: lp/leq/lmax/ln1/ln2 (default lp)
|
||||||
|
- ALERT_THRESHOLD_DB numeric dB threshold (default 85)
|
||||||
|
- ALERT_COOLDOWN_SECONDS min seconds between alerts per unit (default 60)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class AlertEvaluator:
|
||||||
|
def __init__(self):
|
||||||
|
self.enabled = os.getenv("ALERT_ENABLED", "true").lower() == "true"
|
||||||
|
self.metric = os.getenv("ALERT_METRIC", "lp").lower()
|
||||||
|
self.threshold_db = float(os.getenv("ALERT_THRESHOLD_DB", "85"))
|
||||||
|
self.cooldown_s = float(os.getenv("ALERT_COOLDOWN_SECONDS", "60"))
|
||||||
|
self._last_fired: Dict[str, float] = {}
|
||||||
|
logger.info(
|
||||||
|
f"[ALERT] evaluator ready: enabled={self.enabled} metric={self.metric} "
|
||||||
|
f"threshold={self.threshold_db}dB cooldown={self.cooldown_s}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
async def evaluate(self, unit_id: str, snap) -> None:
|
||||||
|
"""Evaluate one snapshot; fire (log) if the metric exceeds threshold."""
|
||||||
|
if not self.enabled:
|
||||||
|
return
|
||||||
|
|
||||||
|
raw = getattr(snap, self.metric, None)
|
||||||
|
try:
|
||||||
|
level = float(raw)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return # missing / non-numeric (e.g. "-.-")
|
||||||
|
|
||||||
|
if level <= self.threshold_db:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Cooldown — use the event loop clock (Math.random/Date.now-free).
|
||||||
|
now = asyncio.get_running_loop().time()
|
||||||
|
if now - self._last_fired.get(unit_id, 0.0) < self.cooldown_s:
|
||||||
|
return
|
||||||
|
self._last_fired[unit_id] = now
|
||||||
|
|
||||||
|
await self._dispatch(unit_id, level)
|
||||||
|
|
||||||
|
async def _dispatch(self, unit_id: str, level: float) -> None:
|
||||||
|
"""POC dispatch: server-side log. Swap in email/SMS here later."""
|
||||||
|
logger.warning(
|
||||||
|
f"[ALERT] {unit_id}: {self.metric.upper()}={level:.1f} dB exceeded "
|
||||||
|
f"threshold {self.threshold_db:.1f} dB"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level singleton
|
||||||
|
alert_evaluator = AlertEvaluator()
|
||||||
+176
@@ -0,0 +1,176 @@
|
|||||||
|
"""
|
||||||
|
Per-device live monitor (fan-out hub).
|
||||||
|
|
||||||
|
ONE DOD poll loop per device, broadcast to many subscribers:
|
||||||
|
- browser WebSocket clients (live view) — they no longer each open their own
|
||||||
|
device stream, so the NL43's single-connection limit stops causing the
|
||||||
|
"second viewer sees nothing" contention.
|
||||||
|
- the alert evaluator (threshold alerts), which can keep a device's feed running
|
||||||
|
even with no browser attached.
|
||||||
|
- persistence (each snapshot is written to NL43Status, like the poller does).
|
||||||
|
|
||||||
|
The device's one TCP connection is respected: every poll goes through the same
|
||||||
|
per-device lock + connection pool in services.py, so the monitor, the background
|
||||||
|
poller, and on-demand commands all serialize safely.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Dict, Optional, Set
|
||||||
|
|
||||||
|
from app.database import SessionLocal
|
||||||
|
from app.models import NL43Config, NL43Status
|
||||||
|
from app.services import NL43Client, persist_snapshot
|
||||||
|
from app.alerts import alert_evaluator
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Sleep between DOD polls. Note the 1s device rate-limit (and DOD?+Measure? per
|
||||||
|
# poll) already paces the effective rate to a few seconds; this is the extra idle.
|
||||||
|
MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "1.0"))
|
||||||
|
|
||||||
|
|
||||||
|
def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict:
|
||||||
|
"""Build the broadcast payload — same shape as the DRD stream, but DOD-sourced
|
||||||
|
so it carries ln1/ln2 (which DRD cannot)."""
|
||||||
|
return {
|
||||||
|
"unit_id": unit_id,
|
||||||
|
"timestamp": datetime.utcnow().isoformat(),
|
||||||
|
"measurement_state": snap.measurement_state,
|
||||||
|
"measurement_start_time": measurement_start_time,
|
||||||
|
"counter": snap.counter,
|
||||||
|
"lp": snap.lp,
|
||||||
|
"leq": snap.leq,
|
||||||
|
"lmax": snap.lmax,
|
||||||
|
"lmin": snap.lmin,
|
||||||
|
"lpeak": snap.lpeak,
|
||||||
|
"ln1": snap.ln1,
|
||||||
|
"ln2": snap.ln2,
|
||||||
|
"raw_payload": snap.raw_payload,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class DeviceMonitor:
|
||||||
|
"""Owns a single DOD poll loop for one device and fans each snapshot out to
|
||||||
|
all subscribers. Runs while it has at least one browser subscriber OR the
|
||||||
|
server-side keep-alive (alerting) flag is set."""
|
||||||
|
|
||||||
|
def __init__(self, unit_id: str):
|
||||||
|
self.unit_id = unit_id
|
||||||
|
self._subscribers: Set[asyncio.Queue] = set()
|
||||||
|
self._keepalive = False
|
||||||
|
self._task: Optional[asyncio.Task] = None
|
||||||
|
self._lock = asyncio.Lock()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def running(self) -> bool:
|
||||||
|
return self._task is not None and not self._task.done()
|
||||||
|
|
||||||
|
def subscriber_count(self) -> int:
|
||||||
|
return len(self._subscribers)
|
||||||
|
|
||||||
|
def _has_demand(self) -> bool:
|
||||||
|
return bool(self._subscribers) or self._keepalive
|
||||||
|
|
||||||
|
def _ensure_task(self) -> None:
|
||||||
|
if self._task is None or self._task.done():
|
||||||
|
self._task = asyncio.create_task(self._run())
|
||||||
|
|
||||||
|
async def subscribe(self) -> asyncio.Queue:
|
||||||
|
q: asyncio.Queue = asyncio.Queue(maxsize=5)
|
||||||
|
async with self._lock:
|
||||||
|
self._subscribers.add(q)
|
||||||
|
self._ensure_task()
|
||||||
|
return q
|
||||||
|
|
||||||
|
async def unsubscribe(self, q: asyncio.Queue) -> None:
|
||||||
|
async with self._lock:
|
||||||
|
self._subscribers.discard(q)
|
||||||
|
|
||||||
|
async def set_keepalive(self, on: bool) -> None:
|
||||||
|
async with self._lock:
|
||||||
|
self._keepalive = on
|
||||||
|
if on:
|
||||||
|
self._ensure_task()
|
||||||
|
|
||||||
|
async def _run(self) -> None:
|
||||||
|
logger.info(f"[MONITOR] {self.unit_id}: feed started")
|
||||||
|
try:
|
||||||
|
while self._has_demand():
|
||||||
|
snap, mst = await self._poll_once()
|
||||||
|
if snap is not None:
|
||||||
|
payload = _snapshot_payload(snap, self.unit_id, mst)
|
||||||
|
self._broadcast(payload)
|
||||||
|
try:
|
||||||
|
await alert_evaluator.evaluate(self.unit_id, snap)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}")
|
||||||
|
await asyncio.sleep(MONITOR_POLL_INTERVAL)
|
||||||
|
finally:
|
||||||
|
logger.info(f"[MONITOR] {self.unit_id}: feed stopped")
|
||||||
|
|
||||||
|
async def _poll_once(self):
|
||||||
|
"""One DOD poll: read, persist, return (snapshot, measurement_start_iso)."""
|
||||||
|
db = SessionLocal()
|
||||||
|
try:
|
||||||
|
cfg = db.query(NL43Config).filter_by(unit_id=self.unit_id).first()
|
||||||
|
if not cfg or not cfg.tcp_enabled:
|
||||||
|
return None, None
|
||||||
|
client = NL43Client(
|
||||||
|
cfg.host, cfg.tcp_port,
|
||||||
|
ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password,
|
||||||
|
ftp_port=cfg.ftp_port or 21,
|
||||||
|
)
|
||||||
|
snap = await client.request_dod()
|
||||||
|
snap.unit_id = self.unit_id
|
||||||
|
persist_snapshot(snap, db)
|
||||||
|
db.commit()
|
||||||
|
status = db.query(NL43Status).filter_by(unit_id=self.unit_id).first()
|
||||||
|
mst = (status.measurement_start_time.isoformat()
|
||||||
|
if status and status.measurement_start_time else None)
|
||||||
|
return snap, mst
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[MONITOR] {self.unit_id}: poll failed: {e}")
|
||||||
|
return None, None
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
def _broadcast(self, payload: dict) -> None:
|
||||||
|
for q in list(self._subscribers):
|
||||||
|
try:
|
||||||
|
q.put_nowait(payload)
|
||||||
|
except asyncio.QueueFull:
|
||||||
|
# Slow consumer — drop this frame rather than stall the whole feed.
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class MonitorManager:
|
||||||
|
"""Registry of per-device monitors (one per unit_id)."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._monitors: Dict[str, DeviceMonitor] = {}
|
||||||
|
self._lock = asyncio.Lock()
|
||||||
|
|
||||||
|
async def get(self, unit_id: str) -> DeviceMonitor:
|
||||||
|
async with self._lock:
|
||||||
|
m = self._monitors.get(unit_id)
|
||||||
|
if m is None:
|
||||||
|
m = DeviceMonitor(unit_id)
|
||||||
|
self._monitors[unit_id] = m
|
||||||
|
return m
|
||||||
|
|
||||||
|
def status(self) -> dict:
|
||||||
|
return {
|
||||||
|
uid: {
|
||||||
|
"running": m.running,
|
||||||
|
"subscribers": m.subscriber_count(),
|
||||||
|
"keepalive": m._keepalive,
|
||||||
|
}
|
||||||
|
for uid, m in self._monitors.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level singleton
|
||||||
|
monitor_manager = MonitorManager()
|
||||||
@@ -246,6 +246,80 @@ async def system_resume():
|
|||||||
return {"status": "ok", "mode": "active", "message": "Polling resumed"}
|
return {"status": "ok", "mode": "active", "message": "Polling resumed"}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# LIVE MONITOR (fan-out) — one DOD feed per device, broadcast to many clients
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
@router.websocket("/{unit_id}/monitor")
|
||||||
|
async def monitor_stream(websocket: WebSocket, unit_id: str):
|
||||||
|
"""Subscribe a browser to the device's shared 1 Hz DOD feed.
|
||||||
|
|
||||||
|
Any number of clients can attach without each opening its own device
|
||||||
|
connection (one poll loop per device, fanned out). Same JSON shape as the
|
||||||
|
DRD stream, but DOD-sourced so it includes ln1/ln2 (L1/L10).
|
||||||
|
"""
|
||||||
|
await websocket.accept()
|
||||||
|
from app.monitor import monitor_manager
|
||||||
|
|
||||||
|
monitor = await monitor_manager.get(unit_id)
|
||||||
|
queue = await monitor.subscribe()
|
||||||
|
logger.info(f"Monitor subscriber attached for {unit_id} ({monitor.subscriber_count()} total)")
|
||||||
|
|
||||||
|
async def _watch_disconnect():
|
||||||
|
# Completes when the client disconnects, so an idle feed (no data) still
|
||||||
|
# detects the drop and we don't leak a subscription that keeps the device
|
||||||
|
# feed (and its connection) alive.
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
msg = await websocket.receive()
|
||||||
|
if msg.get("type") == "websocket.disconnect":
|
||||||
|
return
|
||||||
|
except Exception:
|
||||||
|
return
|
||||||
|
|
||||||
|
gone = asyncio.ensure_future(_watch_disconnect())
|
||||||
|
try:
|
||||||
|
while not gone.done():
|
||||||
|
try:
|
||||||
|
payload = await asyncio.wait_for(queue.get(), timeout=1.0)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
continue # re-check gone.done()
|
||||||
|
await websocket.send_json(payload)
|
||||||
|
except WebSocketDisconnect:
|
||||||
|
logger.info(f"Monitor subscriber disconnected for {unit_id}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Monitor stream error for {unit_id}: {e}")
|
||||||
|
finally:
|
||||||
|
gone.cancel()
|
||||||
|
await monitor.unsubscribe(queue)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/{unit_id}/monitor/start")
|
||||||
|
async def monitor_start(unit_id: str):
|
||||||
|
"""Keep the device's feed running even with no browser attached, so alerting
|
||||||
|
evaluates continuously. Runtime-only (resets on restart)."""
|
||||||
|
from app.monitor import monitor_manager
|
||||||
|
monitor = await monitor_manager.get(unit_id)
|
||||||
|
await monitor.set_keepalive(True)
|
||||||
|
return {"status": "ok", "unit_id": unit_id, "running": monitor.running, "keepalive": True}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/{unit_id}/monitor/stop")
|
||||||
|
async def monitor_stop(unit_id: str):
|
||||||
|
"""Drop the keep-alive; the feed stops once no browser subscribers remain."""
|
||||||
|
from app.monitor import monitor_manager
|
||||||
|
monitor = await monitor_manager.get(unit_id)
|
||||||
|
await monitor.set_keepalive(False)
|
||||||
|
return {"status": "ok", "unit_id": unit_id, "keepalive": False}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/_monitor/status")
|
||||||
|
async def monitor_status():
|
||||||
|
"""Status of every device monitor (running, subscriber count, keep-alive)."""
|
||||||
|
from app.monitor import monitor_manager
|
||||||
|
return {"status": "ok", "monitors": monitor_manager.status()}
|
||||||
|
|
||||||
|
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
# GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes)
|
# GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes)
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
|
|||||||
Reference in New Issue
Block a user