diff --git a/app/alerts.py b/app/alerts.py new file mode 100644 index 0000000..1fb84e7 --- /dev/null +++ b/app/alerts.py @@ -0,0 +1,244 @@ +""" +Threshold alert engine. + +Each unit can have any number of AlertRules. A rule is evaluated against the +unit's live monitor snapshots via a small per-(unit, rule) state machine: + + IDLE --(metric exceeds threshold for duration_s)--> ACTIVE (fire ONSET) + ACTIVE --(metric recovers past hysteresis for duration_s)--> IDLE (fire CLEAR) + +duration_s debounces both edges; clear_margin_db adds hysteresis so a level +hovering at the threshold doesn't flap. Onset and clear are distinct events. + +The state-machine logic (`_evaluate_step`) is intentionally pure — no DB, no +real clock — so it can be unit-tested with a synthetic level series and a fake +clock. The AlertEvaluator wraps it with rule loading, scheduling, persistence, +and dispatch. Dispatch is a server log for now (POC); the seam to POST events to +a Terra-View webhook (email/SMS) is _dispatch(). +""" + +import asyncio +import logging +import os +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + +# Local timezone offset for schedule windows (same env var services.py uses). +_TZ_OFFSET_HOURS = float(os.getenv("TIMEZONE_OFFSET", "-5")) + +# How long to cache a unit's rules before re-querying the DB (rules change rarely). +_RULE_CACHE_TTL_S = 15.0 + + +@dataclass +class RuleState: + """In-memory runtime state for one (unit, rule).""" + phase: str = "idle" # "idle" | "active" + edge_since: Optional[float] = None # when the current edge condition began (clock time) + peak: float = 0.0 + event_id: Optional[int] = None # the open AlertEvent row (for the clear update) + + +def _exceeds(value: float, rule) -> bool: + if rule.comparison == "below": + return value < rule.threshold_db + return value > rule.threshold_db + + +def _recovered(value: float, rule) -> bool: + margin = rule.clear_margin_db or 0.0 + if rule.comparison == "below": + return value > rule.threshold_db + margin + return value < rule.threshold_db - margin + + +def _evaluate_step(state: RuleState, value: float, now: float, rule) -> Optional[str]: + """Advance the state machine by one reading. + + Pure: mutates `state`, returns 'onset' | 'clear' | None. `now` is injected so + tests can drive a fake clock. + """ + duration = rule.duration_s or 0 + + if state.phase == "idle": + if _exceeds(value, rule): + if state.edge_since is None: + state.edge_since = now + if now - state.edge_since >= duration: + state.phase = "active" + state.edge_since = None + state.peak = value + return "onset" + else: + state.edge_since = None + return None + + # active + if rule.comparison == "below": + state.peak = min(state.peak, value) + else: + state.peak = max(state.peak, value) + + if _recovered(value, rule): + if state.edge_since is None: + state.edge_since = now + if now - state.edge_since >= duration: + state.phase = "idle" + state.edge_since = None + return "clear" + else: + state.edge_since = None + return None + + +def _in_window(now_minutes: int, start: str, end: str) -> bool: + """Is now_minutes (minutes since local midnight) within [start, end)? + Handles wraparound windows like 22:00–07:00.""" + def _m(s: str) -> int: + h, m = s.split(":") + return int(h) * 60 + int(m) + s, e = _m(start), _m(end) + if s == e: + return True + if s < e: + return s <= now_minutes < e + return now_minutes >= s or now_minutes < e # wraparound + + +class AlertEvaluator: + def __init__(self): + self._states: Dict[Tuple[str, int], RuleState] = {} + self._rule_cache: Dict[str, Tuple[float, list]] = {} # unit_id -> (fetched_at, rules) + logger.info("[ALERT] rule-based evaluator ready") + + async def evaluate(self, unit_id: str, snap) -> None: + """Evaluate every enabled rule for this unit against one snapshot.""" + rules = self._get_rules(unit_id) + if not rules: + return + now = asyncio.get_running_loop().time() + for rule in rules: + if not self._in_schedule(rule): + continue + raw = getattr(snap, rule.metric, None) + try: + value = float(raw) + except (TypeError, ValueError): + continue # missing / non-numeric ("-.-") + state = self._states.setdefault((unit_id, rule.id), RuleState()) + action = _evaluate_step(state, value, now, rule) + if action == "onset": + await self._on_onset(unit_id, rule, value, state) + elif action == "clear": + await self._on_clear(unit_id, rule, value, state) + + # -- rule loading (cached) ---------------------------------------------- + + def _get_rules(self, unit_id: str) -> list: + loop_now = asyncio.get_running_loop().time() + cached = self._rule_cache.get(unit_id) + if cached and loop_now - cached[0] < _RULE_CACHE_TTL_S: + return cached[1] + rules = self._load_rules(unit_id) + self._rule_cache[unit_id] = (loop_now, rules) + return rules + + def _load_rules(self, unit_id: str) -> list: + from app.database import SessionLocal + from app.models import AlertRule + db = SessionLocal() + try: + return db.query(AlertRule).filter_by(unit_id=unit_id, enabled=True).all() + except Exception as e: + logger.warning(f"[ALERT] failed to load rules for {unit_id}: {e}") + return [] + finally: + db.close() + + def invalidate(self, unit_id: Optional[str] = None) -> None: + """Drop cached rules so a change is picked up immediately.""" + if unit_id is None: + self._rule_cache.clear() + else: + self._rule_cache.pop(unit_id, None) + + # -- scheduling ---------------------------------------------------------- + + def _in_schedule(self, rule) -> bool: + if not rule.schedule_start or not rule.schedule_end: + day_ok = self._day_ok(rule) + return day_ok + local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS) + if not self._day_ok(rule, local): + return False + return _in_window(local.hour * 60 + local.minute, rule.schedule_start, rule.schedule_end) + + @staticmethod + def _day_ok(rule, local: Optional[datetime] = None) -> bool: + if not rule.schedule_days: + return True + if local is None: + local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS) + allowed = {int(d) for d in str(rule.schedule_days).split(",") if d.strip() != ""} + return local.weekday() in allowed # Mon=0 + + # -- event persistence + dispatch --------------------------------------- + + async def _on_onset(self, unit_id: str, rule, value: float, state: RuleState) -> None: + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + try: + evt = AlertEvent( + rule_id=rule.id, unit_id=unit_id, rule_name=rule.name, + metric=rule.metric, threshold_db=rule.threshold_db, + onset_value=value, peak_value=value, status="active", + ) + db.add(evt) + db.commit() + db.refresh(evt) + state.event_id = evt.id + except Exception as e: + logger.warning(f"[ALERT] failed to record onset for {unit_id}: {e}") + finally: + db.close() + await self._dispatch( + "ONSET", unit_id, rule, + f"{rule.metric.upper()}={value:.1f} dB " + f"{'<' if rule.comparison == 'below' else '>'} {rule.threshold_db:.1f} dB" + f"{f' for {rule.duration_s}s' if rule.duration_s else ''}", + ) + + async def _on_clear(self, unit_id: str, rule, value: float, state: RuleState) -> None: + peak = state.peak + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + try: + if state.event_id is not None: + evt = db.query(AlertEvent).filter_by(id=state.event_id).first() + if evt: + evt.clear_at = datetime.utcnow() + evt.peak_value = peak + evt.status = "cleared" + db.commit() + except Exception as e: + logger.warning(f"[ALERT] failed to record clear for {unit_id}: {e}") + finally: + db.close() + state.event_id = None + await self._dispatch( + "CLEAR", unit_id, rule, + f"recovered to {value:.1f} dB (peak {peak:.1f} dB)", + ) + + async def _dispatch(self, kind: str, unit_id: str, rule, detail: str) -> None: + """POC dispatch: server log. Swap in a Terra-View webhook (email/SMS) here.""" + logger.warning(f"[ALERT:{kind}] {unit_id} '{rule.name}': {detail}") + + +# Module-level singleton (the monitor calls alert_evaluator.evaluate per snapshot) +alert_evaluator = AlertEvaluator() diff --git a/app/background_poller.py b/app/background_poller.py index 64bcc60..0148671 100644 --- a/app/background_poller.py +++ b/app/background_poller.py @@ -8,6 +8,7 @@ for fast API access without querying devices on every request. import asyncio import logging +import os from datetime import datetime, timedelta from typing import Optional @@ -20,6 +21,11 @@ from app.device_logger import log_device_event, cleanup_old_logs logger = logging.getLogger(__name__) +# Global polling default. Set SLMM_POLLING_ENABLED=false to start an instance in +# standby (running but not polling and not holding device connections) — e.g. a +# dev box that must not latch onto a device that a prod instance owns. +POLLING_ENABLED_DEFAULT = os.getenv("SLMM_POLLING_ENABLED", "true").lower() == "true" + class BackgroundPoller: """ @@ -39,6 +45,7 @@ class BackgroundPoller: self._logger = logger self._last_cleanup = None # Track last log cleanup time self._last_pool_log = None # Track last connection pool heartbeat log + self._active = POLLING_ENABLED_DEFAULT # Global polling on/off (standby toggle) async def start(self): """Start the background polling task.""" @@ -71,15 +78,48 @@ class BackgroundPoller: self._logger.info("Background poller stopped") + def is_active(self) -> bool: + """Whether background polling is currently active (vs standby).""" + return self._active + + async def set_active(self, active: bool): + """Globally enable/disable polling at runtime. + + When deactivated, the loop stays alive but polls nothing and releases all + device connections, so this SLMM instance stops occupying the devices' + single connection slots (e.g. so a prod instance can take over). Runtime + state only — on restart the instance returns to SLMM_POLLING_ENABLED. + """ + self._active = active + if active: + self._logger.info("[SYSTEM] Background polling ACTIVATED") + else: + self._logger.info("[SYSTEM] Background polling DEACTIVATED (standby) — releasing connections") + await self._release_all_connections() + + async def _release_all_connections(self): + """Gracefully close every pooled device connection (no-op if none).""" + from app.services import _connection_pool + for device_key in list(_connection_pool.get_stats().get("connections", {})): + await _connection_pool.discard(device_key) + async def _poll_loop(self): """Main polling loop that runs continuously.""" self._logger.info("Background polling loop started") while self._running: - try: - await self._poll_all_devices() - except Exception as e: - self._logger.error(f"Error in poll loop: {e}", exc_info=True) + if self._active: + try: + await self._poll_all_devices() + except Exception as e: + self._logger.error(f"Error in poll loop: {e}", exc_info=True) + else: + # Standby: poll nothing, and keep holding no device connection slots + # so another SLMM instance (e.g. prod) can talk to the devices. + try: + await self._release_all_connections() + except Exception as e: + self._logger.warning(f"Standby connection release failed: {e}") # Run log cleanup once per hour try: @@ -138,10 +178,19 @@ class BackgroundPoller: now = datetime.utcnow() polled_count = 0 + from app.monitor import monitor_manager + for cfg in configs: if not self._running: break + # Skip units with an active live monitor: it polls them at ~1Hz and + # keeps the status cache fresh, so a redundant background poll would just + # add load/lock-contention on the device's single connection. + if monitor_manager.is_active(cfg.unit_id): + self._logger.debug(f"Skipping {cfg.unit_id} — live monitor active") + continue + # Get current status status = db.query(NL43Status).filter_by(unit_id=cfg.unit_id).first() diff --git a/app/models.py b/app/models.py index 4c86514..67e08ed 100644 --- a/app/models.py +++ b/app/models.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, String, DateTime, Boolean, Integer, Text, func +from sqlalchemy import Column, String, DateTime, Boolean, Integer, Float, Text, func from app.database import Base @@ -41,6 +41,8 @@ class NL43Status(Base): lmax = Column(String, nullable=True) # Maximum level lmin = Column(String, nullable=True) # Minimum level lpeak = Column(String, nullable=True) # Peak level + ln1 = Column(String, nullable=True) # Percentile slot LN1 (configurable; device default L5, contract L1) + ln2 = Column(String, nullable=True) # Percentile slot LN2 (configurable; device default L10) battery_level = Column(String, nullable=True) power_source = Column(String, nullable=True) sd_remaining_mb = Column(String, nullable=True) @@ -72,3 +74,53 @@ class DeviceLog(Base): level = Column(String, default="INFO") # DEBUG, INFO, WARNING, ERROR category = Column(String, default="GENERAL") # TCP, FTP, POLL, COMMAND, STATE, SYNC message = Column(Text, nullable=False) + + +class AlertRule(Base): + """A threshold-alert rule evaluated against a unit's live monitor feed. + + Source-agnostic: today it runs over the DOD monitor; the same rule transfers + unchanged if a unit's feed is later sourced from FTP intervals. + """ + + __tablename__ = "alert_rules" + + id = Column(Integer, primary_key=True, autoincrement=True) + unit_id = Column(String, index=True, nullable=False) + name = Column(String, nullable=False, default="Alert") + metric = Column(String, nullable=False, default="lp") # lp/leq/lmax/lmin/lpeak/ln1/ln2 + comparison = Column(String, nullable=False, default="above") # above | below + threshold_db = Column(Float, nullable=False) + duration_s = Column(Integer, nullable=False, default=0) # sustained seconds (0 = instant) + clear_margin_db = Column(Float, nullable=False, default=2.0) # hysteresis band + cooldown_s = Column(Integer, nullable=False, default=300) # min seconds between onsets + # Optional time-of-day scoping (local time). schedule_start/end as "HH:MM"; + # null = always active. schedule_days = CSV of 0-6 (Mon=0); null = every day. + schedule_start = Column(String, nullable=True) + schedule_end = Column(String, nullable=True) + schedule_days = Column(String, nullable=True) + channels = Column(String, nullable=False, default="log") # CSV: log,email,sms + recipients = Column(Text, nullable=True) # CSV of emails/phones + enabled = Column(Boolean, default=True) + created_at = Column(DateTime, default=func.now()) + + +class AlertEvent(Base): + """A fired alert (onset → clear), for history / inbox / acknowledgement.""" + + __tablename__ = "alert_events" + + id = Column(Integer, primary_key=True, autoincrement=True) + rule_id = Column(Integer, index=True, nullable=False) + unit_id = Column(String, index=True, nullable=False) + rule_name = Column(String, nullable=True) + metric = Column(String, nullable=False) + threshold_db = Column(Float, nullable=False) + onset_at = Column(DateTime, default=func.now(), index=True) + onset_value = Column(Float, nullable=True) + peak_value = Column(Float, nullable=True) + clear_at = Column(DateTime, nullable=True) + status = Column(String, default="active") # active | cleared + acknowledged_at = Column(DateTime, nullable=True) + acknowledged_by = Column(String, nullable=True) + notes = Column(Text, nullable=True) diff --git a/app/monitor.py b/app/monitor.py new file mode 100644 index 0000000..47c21be --- /dev/null +++ b/app/monitor.py @@ -0,0 +1,229 @@ +""" +Per-device live monitor (fan-out hub). + +ONE DOD poll loop per device, broadcast to many subscribers: +- browser WebSocket clients (live view) — they no longer each open their own + device stream, so the NL43's single-connection limit stops causing the + "second viewer sees nothing" contention. +- the alert evaluator (threshold alerts), which can keep a device's feed running + even with no browser attached. +- persistence (each snapshot is written to NL43Status, like the poller does). + +The device's one TCP connection is respected: every poll goes through the same +per-device lock + connection pool in services.py, so the monitor, the background +poller, and on-demand commands all serialize safely. +""" + +import asyncio +import logging +import os +from datetime import datetime +from typing import Dict, Optional, Set + +from app.database import SessionLocal +from app.models import NL43Config, NL43Status +from app.services import NL43Client, persist_snapshot +from app.alerts import alert_evaluator + +logger = logging.getLogger(__name__) + +# Sleep between DOD polls. Note the 1s device rate-limit (and DOD?+Measure? per +# poll) already paces the effective rate to a few seconds; this is the extra idle. +MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "1.0")) + +# If nothing has been broadcast in this many seconds (e.g. device offline and +# silent), send a keepalive frame so reverse proxies don't drop the idle WS. +MONITOR_HEARTBEAT_S = float(os.getenv("MONITOR_HEARTBEAT_S", "25")) + + +def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict: + """Build the broadcast payload — same shape as the DRD stream, but DOD-sourced + so it carries ln1/ln2 (which DRD cannot).""" + return { + "unit_id": unit_id, + "timestamp": datetime.utcnow().isoformat(), + "measurement_state": snap.measurement_state, + "measurement_start_time": measurement_start_time, + "counter": snap.counter, + "lp": snap.lp, + "leq": snap.leq, + "lmax": snap.lmax, + "lmin": snap.lmin, + "lpeak": snap.lpeak, + "ln1": snap.ln1, + "ln2": snap.ln2, + "raw_payload": snap.raw_payload, + } + + +class DeviceMonitor: + """Owns a single DOD poll loop for one device and fans each snapshot out to + all subscribers. Runs while it has at least one browser subscriber OR the + server-side keep-alive (alerting) flag is set.""" + + def __init__(self, unit_id: str): + self.unit_id = unit_id + self._subscribers: Set[asyncio.Queue] = set() + self._keepalive = False + self._task: Optional[asyncio.Task] = None + self._lock = asyncio.Lock() + self._last_payload: Optional[dict] = None # replayed to new subscribers + self._consec_fail = 0 + self._reachable = True # last broadcast reachability (for transition frames) + + @property + def running(self) -> bool: + return self._task is not None and not self._task.done() + + def subscriber_count(self) -> int: + return len(self._subscribers) + + def _has_demand(self) -> bool: + return bool(self._subscribers) or self._keepalive + + def _ensure_task(self) -> None: + if self._task is None or self._task.done(): + self._task = asyncio.create_task(self._run()) + + async def subscribe(self) -> asyncio.Queue: + q: asyncio.Queue = asyncio.Queue(maxsize=5) + async with self._lock: + self._subscribers.add(q) + # Replay the last frame so a client connecting mid-stream sees data + # (or the current 'unreachable' state) immediately, not after a poll. + if self._last_payload is not None: + try: + q.put_nowait(self._last_payload) + except asyncio.QueueFull: + pass + self._ensure_task() + return q + + async def unsubscribe(self, q: asyncio.Queue) -> None: + async with self._lock: + self._subscribers.discard(q) + + async def set_keepalive(self, on: bool) -> None: + async with self._lock: + self._keepalive = on + if on: + self._ensure_task() + + async def _run(self) -> None: + logger.info(f"[MONITOR] {self.unit_id}: feed started") + loop = asyncio.get_running_loop() + last_send = loop.time() + try: + while self._has_demand(): + snap, mst = await self._poll_once() + if snap is not None: + self._consec_fail = 0 + self._reachable = True + payload = _snapshot_payload(snap, self.unit_id, mst) + payload["feed_status"] = "ok" + self._broadcast(payload) + last_send = loop.time() + try: + await alert_evaluator.evaluate(self.unit_id, snap) + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}") + else: + # Tell clients the device went offline — once, on transition, after a + # few failures so a momentary blip doesn't flap the UI. + self._consec_fail += 1 + if self._reachable and self._consec_fail >= 3: + self._reachable = False + self._broadcast({ + "unit_id": self.unit_id, + "timestamp": datetime.utcnow().isoformat(), + "feed_status": "unreachable", + }) + last_send = loop.time() + + # Heartbeat: during quiet/offline stretches, send a keepalive so an + # idle WS isn't dropped by a reverse proxy. Not cached (new subscribers + # should still get the last real frame, not a heartbeat). + if loop.time() - last_send >= MONITOR_HEARTBEAT_S: + self._broadcast({ + "unit_id": self.unit_id, + "timestamp": datetime.utcnow().isoformat(), + "feed_status": "ok" if self._reachable else "unreachable", + "heartbeat": True, + }, cache=False) + last_send = loop.time() + + await asyncio.sleep(MONITOR_POLL_INTERVAL) + finally: + logger.info(f"[MONITOR] {self.unit_id}: feed stopped") + + async def _poll_once(self): + """One DOD poll: read, persist, return (snapshot, measurement_start_iso).""" + db = SessionLocal() + try: + cfg = db.query(NL43Config).filter_by(unit_id=self.unit_id).first() + if not cfg or not cfg.tcp_enabled: + return None, None + client = NL43Client( + cfg.host, cfg.tcp_port, + ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password, + ftp_port=cfg.ftp_port or 21, + ) + snap = await client.request_dod() + snap.unit_id = self.unit_id + persist_snapshot(snap, db) + db.commit() + status = db.query(NL43Status).filter_by(unit_id=self.unit_id).first() + mst = (status.measurement_start_time.isoformat() + if status and status.measurement_start_time else None) + return snap, mst + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: poll failed: {e}") + return None, None + finally: + db.close() + + def _broadcast(self, payload: dict, cache: bool = True) -> None: + if cache: + self._last_payload = payload # replayed to new subscribers + for q in list(self._subscribers): + try: + q.put_nowait(payload) + except asyncio.QueueFull: + # Slow consumer — drop this frame rather than stall the whole feed. + pass + + +class MonitorManager: + """Registry of per-device monitors (one per unit_id).""" + + def __init__(self): + self._monitors: Dict[str, DeviceMonitor] = {} + self._lock = asyncio.Lock() + + async def get(self, unit_id: str) -> DeviceMonitor: + async with self._lock: + m = self._monitors.get(unit_id) + if m is None: + m = DeviceMonitor(unit_id) + self._monitors[unit_id] = m + return m + + def is_active(self, unit_id: str) -> bool: + """True if this unit has a running monitor feed (so the background poller + can skip it — the monitor already polls it more often).""" + m = self._monitors.get(unit_id) + return m is not None and m.running + + def status(self) -> dict: + return { + uid: { + "running": m.running, + "subscribers": m.subscriber_count(), + "keepalive": m._keepalive, + } + for uid, m in self._monitors.items() + } + + +# Module-level singleton +monitor_manager = MonitorManager() diff --git a/app/routers.py b/app/routers.py index a21c928..f317e15 100644 --- a/app/routers.py +++ b/app/routers.py @@ -11,7 +11,7 @@ import os import asyncio from app.database import get_db -from app.models import NL43Config, NL43Status +from app.models import NL43Config, NL43Status, AlertRule, AlertEvent from app.services import NL43Client, persist_snapshot logger = logging.getLogger(__name__) @@ -121,6 +121,310 @@ async def flush_connection_pool(): return {"status": "ok", "message": "All cached connections closed"} +@router.post("/{unit_id}/disconnect") +async def disconnect_device(unit_id: str, db: Session = Depends(get_db)): + """Cleanly close SLMM's persistent TCP connection to a single device. + + Gracefully closes (TCP FIN + wait_closed) the pooled connection for this + device and removes it from the pool, freeing the NL43's single connection + slot. Idempotent — a no-op if no connection is currently cached. + + Note: this releases the *idle* pooled connection. It does not interrupt an + in-progress DRD stream or an in-flight command (those have the socket + checked out of the pool) — close the stream WebSocket to end a live stream. + """ + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if not cfg: + raise HTTPException(status_code=404, detail="NL43 config not found") + + from app.services import _connection_pool + + device_key = f"{cfg.host}:{cfg.tcp_port}" + had_conn = device_key in _connection_pool.get_stats().get("connections", {}) + + await _connection_pool.discard(device_key) + + return { + "status": "ok", + "unit_id": unit_id, + "device_key": device_key, + "disconnected": had_conn, + "message": "Connection closed" if had_conn else "No cached connection to close", + } + + +@router.post("/{unit_id}/deactivate") +async def deactivate_device(unit_id: str, db: Session = Depends(get_db)): + """Make a single unit dormant: stop background polling for it AND drop its + connection, freeing the device's connection slot. poll_enabled=False is + persisted, so the unit stays dormant across restarts until /activate. + """ + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if not cfg: + raise HTTPException(status_code=404, detail="NL43 config not found") + + cfg.poll_enabled = False + db.commit() + + from app.services import _connection_pool, _get_device_lock + + device_key = f"{cfg.host}:{cfg.tcp_port}" + + # Wait briefly for any in-flight poll/command to finish (so its connection is + # back in the pool), then drop it. If a long-lived stream holds the lock we + # don't block forever — discard the pooled connection regardless. + lock = await _get_device_lock(device_key) + acquired = False + try: + await asyncio.wait_for(lock.acquire(), timeout=10.0) + acquired = True + except asyncio.TimeoutError: + acquired = False + try: + await _connection_pool.discard(device_key) + finally: + if acquired: + lock.release() + + return { + "status": "ok", + "unit_id": unit_id, + "poll_enabled": False, + "message": "Polling disabled and connection closed for this unit", + } + + +@router.post("/{unit_id}/activate") +async def activate_device(unit_id: str, db: Session = Depends(get_db)): + """Resume background polling for a unit previously deactivated.""" + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if not cfg: + raise HTTPException(status_code=404, detail="NL43 config not found") + + cfg.poll_enabled = True + db.commit() + + return { + "status": "ok", + "unit_id": unit_id, + "poll_enabled": True, + "message": "Polling enabled for this unit", + } + + +@router.get("/_system/status") +async def system_status(): + """Report whether this SLMM instance is actively polling or in standby.""" + from app.background_poller import poller + from app.services import _connection_pool + return { + "status": "ok", + "mode": "active" if poller.is_active() else "standby", + "polling_active": poller.is_active(), + "active_connections": _connection_pool.get_stats().get("active_connections", 0), + } + + +@router.post("/_system/standby") +async def system_standby(): + """Put this SLMM instance into standby: stop polling ALL devices and release + every connection, so it stops occupying device slots (e.g. so a prod instance + can take over). Runtime-only — on restart the instance returns to its + SLMM_POLLING_ENABLED default. + """ + from app.background_poller import poller + await poller.set_active(False) + return {"status": "ok", "mode": "standby", + "message": "Polling stopped and all device connections released"} + + +@router.post("/_system/resume") +async def system_resume(): + """Resume polling after standby (global).""" + from app.background_poller import poller + await poller.set_active(True) + return {"status": "ok", "mode": "active", "message": "Polling resumed"} + + +# ============================================================================ +# LIVE MONITOR (fan-out) — one DOD feed per device, broadcast to many clients +# ============================================================================ + +@router.websocket("/{unit_id}/monitor") +async def monitor_stream(websocket: WebSocket, unit_id: str): + """Subscribe a browser to the device's shared 1 Hz DOD feed. + + Any number of clients can attach without each opening its own device + connection (one poll loop per device, fanned out). Same JSON shape as the + DRD stream, but DOD-sourced so it includes ln1/ln2 (L1/L10). + """ + await websocket.accept() + from app.monitor import monitor_manager + + monitor = await monitor_manager.get(unit_id) + queue = await monitor.subscribe() + logger.info(f"Monitor subscriber attached for {unit_id} ({monitor.subscriber_count()} total)") + + async def _watch_disconnect(): + # Completes when the client disconnects, so an idle feed (no data) still + # detects the drop and we don't leak a subscription that keeps the device + # feed (and its connection) alive. + try: + while True: + msg = await websocket.receive() + if msg.get("type") == "websocket.disconnect": + return + except Exception: + return + + gone = asyncio.ensure_future(_watch_disconnect()) + try: + while not gone.done(): + try: + payload = await asyncio.wait_for(queue.get(), timeout=1.0) + except asyncio.TimeoutError: + continue # re-check gone.done() + await websocket.send_json(payload) + except WebSocketDisconnect: + logger.info(f"Monitor subscriber disconnected for {unit_id}") + except Exception as e: + logger.warning(f"Monitor stream error for {unit_id}: {e}") + finally: + gone.cancel() + await monitor.unsubscribe(queue) + + +@router.post("/{unit_id}/monitor/start") +async def monitor_start(unit_id: str): + """Keep the device's feed running even with no browser attached, so alerting + evaluates continuously. Runtime-only (resets on restart).""" + from app.monitor import monitor_manager + monitor = await monitor_manager.get(unit_id) + await monitor.set_keepalive(True) + return {"status": "ok", "unit_id": unit_id, "running": monitor.running, "keepalive": True} + + +@router.post("/{unit_id}/monitor/stop") +async def monitor_stop(unit_id: str): + """Drop the keep-alive; the feed stops once no browser subscribers remain.""" + from app.monitor import monitor_manager + monitor = await monitor_manager.get(unit_id) + await monitor.set_keepalive(False) + return {"status": "ok", "unit_id": unit_id, "keepalive": False} + + +@router.get("/_monitor/status") +async def monitor_status(): + """Status of every device monitor (running, subscriber count, keep-alive).""" + from app.monitor import monitor_manager + return {"status": "ok", "monitors": monitor_manager.status()} + + +# ============================================================================ +# ALERTS — threshold rules + fired events +# ============================================================================ + +class AlertRulePayload(BaseModel): + name: str = "Alert" + metric: str = "lp" # lp/leq/lmax/lmin/lpeak/ln1/ln2 + comparison: str = "above" # above | below + threshold_db: float + duration_s: int = 0 # sustained seconds before firing (0 = instant) + clear_margin_db: float = 2.0 # hysteresis band + cooldown_s: int = 300 + schedule_start: str | None = None # "HH:MM" local; null = always + schedule_end: str | None = None + schedule_days: str | None = None # CSV of 0-6 (Mon=0); null = every day + channels: str = "log" + recipients: str | None = None + enabled: bool = True + + +def _rule_dict(r: AlertRule) -> dict: + return { + "id": r.id, "unit_id": r.unit_id, "name": r.name, "metric": r.metric, + "comparison": r.comparison, "threshold_db": r.threshold_db, + "duration_s": r.duration_s, "clear_margin_db": r.clear_margin_db, + "cooldown_s": r.cooldown_s, "schedule_start": r.schedule_start, + "schedule_end": r.schedule_end, "schedule_days": r.schedule_days, + "channels": r.channels, "recipients": r.recipients, "enabled": r.enabled, + } + + +def _event_dict(e: AlertEvent) -> dict: + return { + "id": e.id, "rule_id": e.rule_id, "unit_id": e.unit_id, + "rule_name": e.rule_name, "metric": e.metric, "threshold_db": e.threshold_db, + "onset_at": e.onset_at.isoformat() if e.onset_at else None, + "onset_value": e.onset_value, "peak_value": e.peak_value, + "clear_at": e.clear_at.isoformat() if e.clear_at else None, + "status": e.status, + "acknowledged_at": e.acknowledged_at.isoformat() if e.acknowledged_at else None, + "acknowledged_by": e.acknowledged_by, + } + + +@router.post("/{unit_id}/alerts/rules") +def create_alert_rule(unit_id: str, payload: AlertRulePayload, db: Session = Depends(get_db)): + rule = AlertRule(unit_id=unit_id, **payload.model_dump()) + db.add(rule) + db.commit() + db.refresh(rule) + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + return {"status": "ok", "rule": _rule_dict(rule)} + + +@router.get("/{unit_id}/alerts/rules") +def list_alert_rules(unit_id: str, db: Session = Depends(get_db)): + rules = db.query(AlertRule).filter_by(unit_id=unit_id).all() + return {"status": "ok", "rules": [_rule_dict(r) for r in rules]} + + +@router.put("/{unit_id}/alerts/rules/{rule_id}") +def update_alert_rule(unit_id: str, rule_id: int, payload: AlertRulePayload, db: Session = Depends(get_db)): + rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first() + if not rule: + raise HTTPException(status_code=404, detail="Alert rule not found") + for field, value in payload.model_dump().items(): + setattr(rule, field, value) + db.commit() + db.refresh(rule) + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + return {"status": "ok", "rule": _rule_dict(rule)} + + +@router.delete("/{unit_id}/alerts/rules/{rule_id}") +def delete_alert_rule(unit_id: str, rule_id: int, db: Session = Depends(get_db)): + rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first() + if not rule: + raise HTTPException(status_code=404, detail="Alert rule not found") + db.delete(rule) + db.commit() + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + return {"status": "ok", "deleted": rule_id} + + +@router.get("/{unit_id}/alerts/events") +def list_alert_events(unit_id: str, limit: int = 50, db: Session = Depends(get_db)): + events = (db.query(AlertEvent).filter_by(unit_id=unit_id) + .order_by(AlertEvent.onset_at.desc()).limit(limit).all()) + return {"status": "ok", "events": [_event_dict(e) for e in events]} + + +@router.post("/{unit_id}/alerts/events/{event_id}/ack") +def ack_alert_event(unit_id: str, event_id: int, by: str | None = None, db: Session = Depends(get_db)): + evt = db.query(AlertEvent).filter_by(id=event_id, unit_id=unit_id).first() + if not evt: + raise HTTPException(status_code=404, detail="Alert event not found") + evt.acknowledged_at = datetime.utcnow() + evt.acknowledged_by = by + db.commit() + return {"status": "ok", "acknowledged": event_id} + + # ============================================================================ # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # ============================================================================ @@ -450,6 +754,8 @@ def get_status(unit_id: str, db: Session = Depends(get_db)): "lmax": status.lmax, "lmin": status.lmin, "lpeak": status.lpeak, + "ln1": status.ln1, + "ln2": status.ln2, "battery_level": status.battery_level, "power_source": status.power_source, "sd_remaining_mb": status.sd_remaining_mb, @@ -472,6 +778,8 @@ class StatusPayload(BaseModel): lmax: str | None = None lmin: str | None = None lpeak: str | None = None + ln1: str | None = None + ln2: str | None = None battery_level: str | None = None power_source: str | None = None sd_remaining_mb: str | None = None @@ -504,6 +812,8 @@ def upsert_status(unit_id: str, payload: StatusPayload, db: Session = Depends(ge "lmax": status.lmax, "lmin": status.lmin, "lpeak": status.lpeak, + "ln1": status.ln1, + "ln2": status.ln2, "battery_level": status.battery_level, "power_source": status.power_source, "sd_remaining_mb": status.sd_remaining_mb, @@ -1205,6 +1515,8 @@ async def stream_live(websocket: WebSocket, unit_id: str): "lmax": snap.lmax, # Maximum level "lmin": snap.lmin, # Minimum level "lpeak": snap.lpeak, # Peak level + "ln1": snap.ln1, # LN1 percentile (L1/L10 contract); null on DRD stream + "ln2": snap.ln2, # LN2 percentile; null on DRD stream "raw_payload": snap.raw_payload, }) except Exception as e: @@ -1876,6 +2188,8 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)): "lmax": status.lmax, "lmin": status.lmin, "lpeak": status.lpeak, + "ln1": status.ln1, + "ln2": status.ln2, "battery_level": status.battery_level, "power_source": status.power_source, "sd_remaining_mb": status.sd_remaining_mb, diff --git a/app/services.py b/app/services.py index 9375c5c..b5c68ea 100644 --- a/app/services.py +++ b/app/services.py @@ -46,6 +46,8 @@ class NL43Snapshot: lmax: Optional[str] = None # Maximum level lmin: Optional[str] = None # Minimum level lpeak: Optional[str] = None # Peak level + ln1: Optional[str] = None # Percentile slot LN1 (configurable; device default L5, contract L1) + ln2: Optional[str] = None # Percentile slot LN2 (configurable; device default L10) battery_level: Optional[str] = None power_source: Optional[str] = None sd_remaining_mb: Optional[str] = None @@ -69,10 +71,27 @@ def persist_snapshot(s: NL43Snapshot, db: Session): logger.info(f"State transition check for {s.unit_id}: '{previous_state}' -> '{new_state}'") - # Device returns "Start" when measuring, "Stop" when stopped - # Normalize to previous behavior for backward compatibility - is_measuring = new_state == "Start" - was_measuring = previous_state == "Start" + # The device reports "Start" while measuring; the DOD path uses that string, + # but the DRD stream path tags snapshots "Measure" (and the DOD fallback also + # uses "Measure"). Treat ALL of these as "measuring" — otherwise opening and + # closing the live stream flips state "Start"->"Measure"->"Start", which the + # old equality check misread as stop-then-start and reset measurement_start_time. + # + # Also: only act on RECOGNIZED states. A buffer desync on the shared connection + # (e.g. right after a DRD/DOD test) can make a Measure? read return a stray, + # garbled value; treating that as "not measuring" produced constant phantom + # "STOPPED -> STARTED" log pairs and reset the timer. Ignore unknown reads. + MEASURING_STATES = {"Start", "Measure"} + STOPPED_STATES = {"Stop"} + was_measuring = previous_state in MEASURING_STATES + + if new_state in MEASURING_STATES: + is_measuring = True + elif new_state in STOPPED_STATES: + is_measuring = False + else: + logger.warning(f"Ignoring unrecognized measurement state for {s.unit_id}: {new_state!r}") + is_measuring = was_measuring # garbled/unknown read — no transition if not was_measuring and is_measuring: # Measurement just started - record the start time @@ -95,13 +114,18 @@ def persist_snapshot(s: NL43Snapshot, db: Session): except Exception: pass - row.measurement_state = new_state + # Only persist a recognized state so one garbled read can't poison the next + # transition check (which would manufacture the phantom STOPPED/STARTED pair). + if new_state in MEASURING_STATES or new_state in STOPPED_STATES: + row.measurement_state = new_state row.counter = s.counter row.lp = s.lp row.leq = s.leq row.lmax = s.lmax row.lmin = s.lmin row.lpeak = s.lpeak + row.ln1 = s.ln1 + row.ln2 = s.ln2 row.battery_level = s.battery_level row.power_source = s.power_source row.sd_remaining_mb = s.sd_remaining_mb @@ -691,22 +715,29 @@ class NL43Client: snap = NL43Snapshot(unit_id="", raw_payload=resp, measurement_state=measurement_state) - # Parse known positions (based on NL43 communication guide - DRD format) - # DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ... + # Parse DOD positional fields. DOD's layout is DIFFERENT from DRD: it has NO + # leading counter and it includes LE plus LN1–LN5. The device returns 4 channels + # of 16 fields each — [Lp, Leq, LE, Lmax, Lmin, LN1, LN2, LN3, LN4, LN5, Lpeak, + # LIeq, Leq_mov, Ltm5, over, under] — and channel 1 (parts[0:16]) is the main + # display. The previous code reused the DRD map (treating parts[0] as a counter), + # which shifted everything: Lp was reported as the counter, Leq as Lp, LE as Leq, + # and LN1 as Lpeak (you could spot it because "Lpeak" came out < Lmax). try: - # Capture d0 (counter) for timer synchronization if len(parts) >= 1: - snap.counter = parts[0] # d0: Measurement interval counter (1-600) + snap.lp = parts[0] # Lp: instantaneous sound pressure level if len(parts) >= 2: - snap.lp = parts[1] # d1: Instantaneous sound pressure level - if len(parts) >= 3: - snap.leq = parts[2] # d2: Equivalent continuous sound level + snap.leq = parts[1] # Leq: equivalent continuous level + # parts[2] = LE (sound exposure level) — not currently surfaced if len(parts) >= 4: - snap.lmax = parts[3] # d3: Maximum level + snap.lmax = parts[3] # Lmax if len(parts) >= 5: - snap.lmin = parts[4] # d4: Minimum level + snap.lmin = parts[4] # Lmin + if len(parts) >= 11: + snap.lpeak = parts[10] # Lpeak (parts[5] is LN1, NOT Lpeak) if len(parts) >= 6: - snap.lpeak = parts[5] # d5: Peak level + snap.ln1 = parts[5] # LN1 percentile slot (device default L5; contract L1) + if len(parts) >= 7: + snap.ln2 = parts[6] # LN2 percentile slot (device default L10) except (IndexError, ValueError) as e: logger.warning(f"Error parsing DOD data points: {e}") diff --git a/migrate_add_ln_percentiles.py b/migrate_add_ln_percentiles.py new file mode 100644 index 0000000..2e27b34 --- /dev/null +++ b/migrate_add_ln_percentiles.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +""" +Migration script to add ln1 and ln2 percentile columns to the nl43_status table. + +The NL-43 DOD response carries percentile slots LN1-LN5; the live SLM display +(Terra-View) shows two of them (default L1/L10). This adds storage for the two +surfaced slots. Run once per database to update existing schema. +""" + +import sqlite3 +import sys +from pathlib import Path + +DB_PATH = Path(__file__).parent / "data" / "slmm.db" + + +def migrate(): + """Add ln1 and ln2 columns to the nl43_status table.""" + + if not DB_PATH.exists(): + print(f"Database not found at {DB_PATH}") + print("No migration needed - database will be created with new schema") + return + + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + try: + cursor.execute("PRAGMA table_info(nl43_status)") + columns = [row[1] for row in cursor.fetchall()] + + if "ln1" in columns and "ln2" in columns: + print("✓ ln1/ln2 columns already exist, no migration needed") + return + + if "ln1" not in columns: + print("Adding ln1 column...") + cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln1 TEXT") + print("✓ Added ln1 column") + + if "ln2" not in columns: + print("Adding ln2 column...") + cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln2 TEXT") + print("✓ Added ln2 column") + + conn.commit() + print("\n✓ Migration completed successfully!") + + except Exception as e: + conn.rollback() + print(f"✗ Migration failed: {e}", file=sys.stderr) + sys.exit(1) + finally: + conn.close() + + +if __name__ == "__main__": + migrate() diff --git a/test_alert_evaluator.py b/test_alert_evaluator.py new file mode 100644 index 0000000..5bacc62 --- /dev/null +++ b/test_alert_evaluator.py @@ -0,0 +1,68 @@ +""" +Synthetic unit test for the alert state machine — no DB, no device. + +Drives `_evaluate_step` with a fake clock + a level series and checks that +onset/clear fire with the right debounce + hysteresis. Run: + + docker compose exec -T slmm python3 test_alert_evaluator.py + # or, if app.alerts imports cleanly standalone: python3 test_alert_evaluator.py +""" + +from types import SimpleNamespace +from app.alerts import RuleState, _evaluate_step + + +def rule(**kw): + base = dict(threshold_db=85.0, duration_s=3, clear_margin_db=2.0, comparison="above") + base.update(kw) + return SimpleNamespace(**base) + + +def run(series, r): + st = RuleState() + events = [(now, a) for value, now in series + if (a := _evaluate_step(st, value, now, r))] + return events, st + + +def main(): + failures = 0 + + def check(label, cond, detail=""): + nonlocal failures + print(("PASS" if cond else "FAIL"), label, detail) + if not cond: + failures += 1 + + # 1) sustained exceedance -> onset after duration; recovery -> clear after duration + r = rule(threshold_db=85, duration_s=3, clear_margin_db=2) + ev, _ = run([(80, 0), (86, 1), (87, 2), (88, 3), (88, 4), + (88, 5), (82, 6), (82, 7), (82, 8), (82, 9)], r) + onsets = [t for t, a in ev if a == "onset"] + clears = [t for t, a in ev if a == "clear"] + check("1 sustained onset@4 / clear@9", onsets == [4] and clears == [9], str(ev)) + + # 2) brief spike under duration -> no onset (debounce) + ev, _ = run([(80, 0), (90, 1), (90, 2), (80, 3), (80, 4)], rule(duration_s=3)) + check("2 brief spike debounced", ev == [], str(ev)) + + # 3) hysteresis: a dip into the margin (below threshold, above threshold-margin) + # does NOT clear + r = rule(threshold_db=85, duration_s=0, clear_margin_db=3) + ev, st = run([(86, 0), (84, 1), (84, 2), (84, 3)], r) + check("3 hysteresis holds ACTIVE", ev == [(0, "onset")] and st.phase == "active", + f"{ev} phase={st.phase}") + + # 4) 'below' comparison (device too quiet) -> onset when value < threshold + ev, _ = run([(30, 0), (15, 1)], rule(threshold_db=20, duration_s=0, + clear_margin_db=2, comparison="below")) + check("4 below-comparison onset@1", ev == [(1, "onset")], str(ev)) + + print() + print("ALL PASS" if failures == 0 else f"{failures} FAILURE(S)") + return failures + + +if __name__ == "__main__": + import sys + sys.exit(1 if main() else 0)