""" Per-device live monitor (fan-out hub). ONE DOD poll loop per device, broadcast to many subscribers: - browser WebSocket clients (live view) — they no longer each open their own device stream, so the NL43's single-connection limit stops causing the "second viewer sees nothing" contention. - the alert evaluator (threshold alerts), which can keep a device's feed running even with no browser attached. - persistence (each snapshot is written to NL43Status, like the poller does). The device's one TCP connection is respected: every poll goes through the same per-device lock + connection pool in services.py, so the monitor, the background poller, and on-demand commands all serialize safely. """ import asyncio import logging import os from datetime import datetime from typing import Dict, Optional, Set from app.database import SessionLocal from app.models import NL43Config, NL43Status from app.services import NL43Client, persist_snapshot from app.alerts import alert_evaluator logger = logging.getLogger(__name__) # Sleep between DOD polls. Note the 1s device rate-limit (and DOD?+Measure? per # poll) already paces the effective rate to a few seconds; this is the extra idle. MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "1.0")) def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict: """Build the broadcast payload — same shape as the DRD stream, but DOD-sourced so it carries ln1/ln2 (which DRD cannot).""" return { "unit_id": unit_id, "timestamp": datetime.utcnow().isoformat(), "measurement_state": snap.measurement_state, "measurement_start_time": measurement_start_time, "counter": snap.counter, "lp": snap.lp, "leq": snap.leq, "lmax": snap.lmax, "lmin": snap.lmin, "lpeak": snap.lpeak, "ln1": snap.ln1, "ln2": snap.ln2, "raw_payload": snap.raw_payload, } class DeviceMonitor: """Owns a single DOD poll loop for one device and fans each snapshot out to all subscribers. Runs while it has at least one browser subscriber OR the server-side keep-alive (alerting) flag is set.""" def __init__(self, unit_id: str): self.unit_id = unit_id self._subscribers: Set[asyncio.Queue] = set() self._keepalive = False self._task: Optional[asyncio.Task] = None self._lock = asyncio.Lock() @property def running(self) -> bool: return self._task is not None and not self._task.done() def subscriber_count(self) -> int: return len(self._subscribers) def _has_demand(self) -> bool: return bool(self._subscribers) or self._keepalive def _ensure_task(self) -> None: if self._task is None or self._task.done(): self._task = asyncio.create_task(self._run()) async def subscribe(self) -> asyncio.Queue: q: asyncio.Queue = asyncio.Queue(maxsize=5) async with self._lock: self._subscribers.add(q) self._ensure_task() return q async def unsubscribe(self, q: asyncio.Queue) -> None: async with self._lock: self._subscribers.discard(q) async def set_keepalive(self, on: bool) -> None: async with self._lock: self._keepalive = on if on: self._ensure_task() async def _run(self) -> None: logger.info(f"[MONITOR] {self.unit_id}: feed started") try: while self._has_demand(): snap, mst = await self._poll_once() if snap is not None: payload = _snapshot_payload(snap, self.unit_id, mst) self._broadcast(payload) try: await alert_evaluator.evaluate(self.unit_id, snap) except Exception as e: logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}") await asyncio.sleep(MONITOR_POLL_INTERVAL) finally: logger.info(f"[MONITOR] {self.unit_id}: feed stopped") async def _poll_once(self): """One DOD poll: read, persist, return (snapshot, measurement_start_iso).""" db = SessionLocal() try: cfg = db.query(NL43Config).filter_by(unit_id=self.unit_id).first() if not cfg or not cfg.tcp_enabled: return None, None client = NL43Client( cfg.host, cfg.tcp_port, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password, ftp_port=cfg.ftp_port or 21, ) snap = await client.request_dod() snap.unit_id = self.unit_id persist_snapshot(snap, db) db.commit() status = db.query(NL43Status).filter_by(unit_id=self.unit_id).first() mst = (status.measurement_start_time.isoformat() if status and status.measurement_start_time else None) return snap, mst except Exception as e: logger.warning(f"[MONITOR] {self.unit_id}: poll failed: {e}") return None, None finally: db.close() def _broadcast(self, payload: dict) -> None: for q in list(self._subscribers): try: q.put_nowait(payload) except asyncio.QueueFull: # Slow consumer — drop this frame rather than stall the whole feed. pass class MonitorManager: """Registry of per-device monitors (one per unit_id).""" def __init__(self): self._monitors: Dict[str, DeviceMonitor] = {} self._lock = asyncio.Lock() async def get(self, unit_id: str) -> DeviceMonitor: async with self._lock: m = self._monitors.get(unit_id) if m is None: m = DeviceMonitor(unit_id) self._monitors[unit_id] = m return m def status(self) -> dict: return { uid: { "running": m.running, "subscribers": m.subscriber_count(), "keepalive": m._keepalive, } for uid, m in self._monitors.items() } # Module-level singleton monitor_manager = MonitorManager()