diff --git a/CHANGELOG.md b/CHANGELOG.md index d83bab1..35fcc12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,52 @@ All notable changes to SLMM (Sound Level Meter Manager) will be documented in th The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.4.0] - 2026-06-22 + +### Added + +#### Live Monitor (fan-out feed) +- **Per-device fan-out monitor** - one shared, cached live feed per device. Multiple clients (dashboards, portal, charts) subscribe to the same stream instead of each fighting for the NL-43's single TCP connection: one poller reads the device, all subscribers get the same frames. +- **WebSocket monitor** - `WS /api/nl43/{unit_id}/monitor` delivers an instant first frame from cache, then live updates. +- **Monitor control** - `POST /api/nl43/{unit_id}/monitor/{start|stop}`, `GET /api/nl43/_monitor/status`. A persistent `monitor_enabled` flag auto-starts the keepalive on boot. +- **Adaptive polling** - poll rate adapts to demand; unreachable devices back off; a device-offline alert fires when a monitored unit drops. +- **De-duplication** - the background poller skips units already covered by an active monitor (no double-polling); a heartbeat keeps the feed warm. +- **Lower latency** - the monitor caches run state, roughly halving live-feed latency; fan-out emits an instant first frame + offline status to new clients. + +#### Alert Engine +- **Threshold rules** - per-device alert rules (metric + threshold + cooldown) with full CRUD: `POST/GET/PUT/DELETE /api/nl43/{unit_id}/alerts/rules[/{rule_id}]`. +- **Events + state machine** - onset/clear tracking via `GET /api/nl43/{unit_id}/alerts/events`; acknowledge with `POST .../events/{event_id}/ack`. A `cooldown_s` is enforced between onsets. +- **24/7 evaluation** - enabled rules pin the monitor on, so rules evaluate continuously even with no UI client connected. +- **Resilience** - editing or deleting a rule resets its state and closes any open event; device-offline events are raised when a monitored unit goes unreachable. + +#### Data & History +- **Live-chart backfill** - a downsampled DOD trail is persisted to a new `nl43_readings` table, exposed via `GET /api/nl43/{unit_id}/history` so charts can backfill recent history on load. +- **LN1/LN2 percentiles** - L1/L10 (configurable percentiles) surfaced through SLMM in the status and live-feed payloads. +- **measurement_start_time** included in the cached `/status` response. + +#### Device control +- **Per-device disconnect** - `POST /api/nl43/{unit_id}/disconnect` drops a device's pooled connection. +- **Deactivate / standby** - `POST /api/nl43/{unit_id}/deactivate` and global `POST /api/nl43/_system/standby` to quiesce polling/monitoring. + +### Changed +- **DRD streaming reuses the pooled connection** rather than opening a separate socket, avoiding contention with the persistent pool on a single-connection device. +- **Connection pool** - idle-TTL / max-age checks can now be disabled; pool status is logged periodically. + +### Fixed +- **Measurement-start confirmation** - `/start` now recognizes the device's `Start` state. It previously waited for `Measure`, which never matched, so the start cycle ran the full retry loop and Terra-View's proxy timed out with a misleading "Unknown error" even though the device had started. +- **Garbled reads** - corrupted measurement-state reads that produced phantom STOPPED/STARTED transitions are now ignored. +- **DOD parsing** - corrected field parsing and stopped spurious measurement-time resets. +- **Monitor WebSocket** - quieted a send-after-close race on client disconnect. + +### Database +- **New tables** (auto-created on startup via `Base.metadata.create_all`): `alert_rules`, `alert_events`, `nl43_readings`. +- **Migrations for existing tables** (run once per database): `migrate_add_ln_percentiles.py` (LN1/LN2 on `nl43_status`), `migrate_add_monitor_enabled.py` (`monitor_enabled` on `nl43_config`). + +### Notes +- Pairs with the matching Terra-View `dev` build, which reads SLMM's `/monitor` fan-out feed for live SLM dashboards (L1/L10 lines, live-chart backfill). Ship the two together. + +--- + ## [0.3.0] - 2026-02-17 ### Added diff --git a/README.md b/README.md index 441a1e6..115c645 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # SLMM - Sound Level Meter Manager -**Version 0.3.0** +**Version 0.4.0** Backend API service for controlling and monitoring Rion NL-43/NL-53 Sound Level Meters via TCP and FTP protocols. @@ -12,6 +12,9 @@ SLMM is a standalone backend module that provides REST API routing and command t ## Features +- **Live Monitor (fan-out)**: One shared cached live feed per device — many clients subscribe to the same stream instead of fighting over the meter's single TCP connection +- **Alert Engine**: Per-device threshold rules with onset/clear events, cooldowns, acks, and 24/7 evaluation +- **History & Percentiles**: Downsampled DOD trail + history endpoint for live-chart backfill; LN1/LN2 (L1/L10) percentiles surfaced through the feed - **Persistent TCP Connections**: Cached per-device connections with OS-level keepalive, tuned for cellular modem reliability - **Background Polling**: Continuous automatic polling of devices with configurable intervals - **Offline Detection**: Automatic device reachability tracking with failure counters @@ -44,6 +47,30 @@ SLMM is a standalone backend module that provides REST API routing and command t └──────────────┘ ``` +### Live Monitor — Fan-Out Feed (v0.4.0) + +The NL-43 allows only one TCP control connection at a time, so multiple clients +polling the same device directly would contend for it. The monitor solves this +with a single shared, cached feed per device: + +- **One reader, many subscribers**: a single poller reads the device; every + WebSocket subscriber (`WS /api/nl43/{unit_id}/monitor`) receives the same + frames — an instant first frame from cache, then live updates. +- **Persistent + auto-start**: a `monitor_enabled` flag keeps the feed running + and auto-starts it on boot. Enabled alert rules pin the monitor on for 24/7 + evaluation even with no UI connected. +- **Adaptive & deduplicated**: poll rate adapts to demand, unreachable devices + back off, and the background poller skips units already covered by a monitor. + +### Alert Engine (v0.4.0) + +Per-device threshold alerting evaluated against the live feed: + +- **Rules**: metric + threshold + `cooldown_s`, full CRUD per device +- **Events**: onset/clear state machine, acknowledgement, and a device-offline + alert when a monitored unit drops +- **Robust**: editing/deleting a rule resets its state and closes open events + ### Persistent TCP Connection Pool (v0.3.0) SLMM maintains persistent TCP connections to devices with OS-level keepalive, designed for reliable operation over cellular modems: @@ -145,8 +172,32 @@ Logs are written to: |--------|----------|-------------| | GET | `/api/nl43/{unit_id}/status` | Get cached measurement snapshot (updated by background poller) | | GET | `/api/nl43/{unit_id}/live` | Request fresh DOD data from device (bypasses cache) | +| GET | `/api/nl43/{unit_id}/history` | Downsampled DOD trail for live-chart backfill | | WS | `/api/nl43/{unit_id}/stream` | WebSocket stream for real-time DRD data | +### Live Monitor (fan-out feed) + +| Method | Endpoint | Description | +|--------|----------|-------------| +| WS | `/api/nl43/{unit_id}/monitor` | Subscribe to the shared cached live feed (instant first frame) | +| POST | `/api/nl43/{unit_id}/monitor/start` | Start the device's monitor feed | +| POST | `/api/nl43/{unit_id}/monitor/stop` | Stop the device's monitor feed | +| GET | `/api/nl43/_monitor/status` | Global monitor status across devices | +| POST | `/api/nl43/{unit_id}/disconnect` | Drop the device's pooled TCP connection | +| POST | `/api/nl43/{unit_id}/deactivate` | Quiesce polling/monitoring for one device | +| POST | `/api/nl43/_system/standby` | Global standby — quiesce all polling/monitoring | + +### Alerts + +| Method | Endpoint | Description | +|--------|----------|-------------| +| GET | `/api/nl43/{unit_id}/alerts/rules` | List alert rules for a device | +| POST | `/api/nl43/{unit_id}/alerts/rules` | Create an alert rule (metric, threshold, cooldown) | +| PUT | `/api/nl43/{unit_id}/alerts/rules/{rule_id}` | Update a rule (resets its state, closes open events) | +| DELETE | `/api/nl43/{unit_id}/alerts/rules/{rule_id}` | Delete a rule | +| GET | `/api/nl43/{unit_id}/alerts/events` | List alert events (onset/clear) | +| POST | `/api/nl43/{unit_id}/alerts/events/{event_id}/ack` | Acknowledge an event | + ### Background Polling | Method | Endpoint | Description | @@ -273,11 +324,35 @@ Caches latest measurement snapshot: - `sd_remaining_mb`: Free SD card space (MB) - `sd_free_ratio`: SD card free space ratio - `raw_payload`: Raw device response data -- `is_reachable`: Device reachability status (Boolean) ⭐ NEW -- `consecutive_failures`: Count of consecutive poll failures ⭐ NEW -- `last_poll_attempt`: Last time background poller attempted to poll ⭐ NEW -- `last_success`: Last successful poll timestamp ⭐ NEW -- `last_error`: Last error message (truncated to 500 chars) ⭐ NEW +- `is_reachable`: Device reachability status (Boolean) +- `consecutive_failures`: Count of consecutive poll failures +- `last_poll_attempt`: Last time background poller attempted to poll +- `last_success`: Last successful poll timestamp +- `last_error`: Last error message (truncated to 500 chars) +- `ln1` / `ln2`: LN1/LN2 (L1/L10) percentile levels ⭐ v0.4.0 + +### NL43Readings Table ⭐ v0.4.0 +Downsampled DOD trail backing the live-chart history endpoint (one row/minute, +pruned to a retention window — viewing only, not the report source): +- `id` (PK), `unit_id`, `timestamp` +- `lp` / `leq` / `lmax` / `ln1` / `ln2`: cached level samples + +### AlertRule Table ⭐ v0.4.0 +Per-device threshold alert rules: +- `id` (PK), `unit_id`, `name`, `enabled` +- `metric`, `comparison` (above/below), `threshold_db`, `clear_margin_db` (hysteresis) +- `duration_s` (sustained), `cooldown_s` (min seconds between onsets) +- `channels` / `recipients`, optional `schedule_start`/`schedule_end`/`schedule_days` + +### AlertEvent Table ⭐ v0.4.0 +Alert onset/clear events for history, inbox, and acknowledgement: +- `id` (PK), `unit_id`, `rule_id`, `rule_name`, `metric`, `threshold_db` +- `onset_at` / `onset_value`, `peak_value`, `clear_at`, `status` (active/cleared) +- `acknowledged_at` / `acknowledged_by`, `notes` + +> New tables (`alert_rules`, `alert_events`, `nl43_readings`) auto-create on +> startup. Existing-table columns ship with migrations: +> `migrate_add_ln_percentiles.py`, `migrate_add_monitor_enabled.py`. ## Protocol Details diff --git a/app/alerts.py b/app/alerts.py new file mode 100644 index 0000000..062730c --- /dev/null +++ b/app/alerts.py @@ -0,0 +1,322 @@ +""" +Threshold alert engine. + +Each unit can have any number of AlertRules. A rule is evaluated against the +unit's live monitor snapshots via a small per-(unit, rule) state machine: + + IDLE --(metric exceeds threshold for duration_s)--> ACTIVE (fire ONSET) + ACTIVE --(metric recovers past hysteresis for duration_s)--> IDLE (fire CLEAR) + +duration_s debounces both edges; clear_margin_db adds hysteresis so a level +hovering at the threshold doesn't flap. Onset and clear are distinct events. + +The state-machine logic (`_evaluate_step`) is intentionally pure — no DB, no +real clock — so it can be unit-tested with a synthetic level series and a fake +clock. The AlertEvaluator wraps it with rule loading, scheduling, persistence, +and dispatch. Dispatch is a server log for now (POC); the seam to POST events to +a Terra-View webhook (email/SMS) is _dispatch(). +""" + +import asyncio +import logging +import os +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + +# Local timezone offset for schedule windows (same env var services.py uses). +_TZ_OFFSET_HOURS = float(os.getenv("TIMEZONE_OFFSET", "-5")) + +# How long to cache a unit's rules before re-querying the DB (rules change rarely). +_RULE_CACHE_TTL_S = 15.0 + + +@dataclass +class RuleState: + """In-memory runtime state for one (unit, rule).""" + phase: str = "idle" # "idle" | "active" + edge_since: Optional[float] = None # when the current edge condition began (clock time) + peak: float = 0.0 + event_id: Optional[int] = None # the open AlertEvent row (for the clear update) + last_onset: Optional[float] = None # time of the last onset (for cooldown) + + +def _exceeds(value: float, rule) -> bool: + if rule.comparison == "below": + return value < rule.threshold_db + return value > rule.threshold_db + + +def _recovered(value: float, rule) -> bool: + margin = rule.clear_margin_db or 0.0 + if rule.comparison == "below": + return value > rule.threshold_db + margin + return value < rule.threshold_db - margin + + +def _evaluate_step(state: RuleState, value: float, now: float, rule) -> Optional[str]: + """Advance the state machine by one reading. + + Pure: mutates `state`, returns 'onset' | 'clear' | None. `now` is injected so + tests can drive a fake clock. + """ + duration = rule.duration_s or 0 + + if state.phase == "idle": + if _exceeds(value, rule): + if state.edge_since is None: + state.edge_since = now + if now - state.edge_since >= duration: + # Cooldown: suppress a new onset within cooldown_s of the last one + # (stops a repeatedly-breaching signal from flooding the history). + # Hold edge_since so it fires the moment cooldown lapses if still + # breaching — don't reset it here. + cooldown = getattr(rule, "cooldown_s", 0) or 0 + if state.last_onset is not None and (now - state.last_onset) < cooldown: + return None + state.phase = "active" + state.edge_since = None + state.peak = value + state.last_onset = now + return "onset" + else: + state.edge_since = None + return None + + # active + if rule.comparison == "below": + state.peak = min(state.peak, value) + else: + state.peak = max(state.peak, value) + + if _recovered(value, rule): + if state.edge_since is None: + state.edge_since = now + if now - state.edge_since >= duration: + state.phase = "idle" + state.edge_since = None + return "clear" + else: + state.edge_since = None + return None + + +def _in_window(now_minutes: int, start: str, end: str) -> bool: + """Is now_minutes (minutes since local midnight) within [start, end)? + Handles wraparound windows like 22:00–07:00.""" + def _m(s: str) -> int: + h, m = s.split(":") + return int(h) * 60 + int(m) + s, e = _m(start), _m(end) + if s == e: + return True + if s < e: + return s <= now_minutes < e + return now_minutes >= s or now_minutes < e # wraparound + + +class AlertEvaluator: + def __init__(self): + self._states: Dict[Tuple[str, int], RuleState] = {} + self._rule_cache: Dict[str, Tuple[float, list]] = {} # unit_id -> (fetched_at, rules) + self._offline_events: Dict[str, int] = {} # unit_id -> open connectivity AlertEvent id + logger.info("[ALERT] rule-based evaluator ready") + + async def evaluate(self, unit_id: str, snap) -> None: + """Evaluate every enabled rule for this unit against one snapshot.""" + rules = self._get_rules(unit_id) + if not rules: + return + now = asyncio.get_running_loop().time() + for rule in rules: + if not self._in_schedule(rule): + continue + raw = getattr(snap, rule.metric, None) + try: + value = float(raw) + except (TypeError, ValueError): + continue # missing / non-numeric ("-.-") + state = self._states.setdefault((unit_id, rule.id), RuleState()) + action = _evaluate_step(state, value, now, rule) + if action == "onset": + await self._on_onset(unit_id, rule, value, state) + elif action == "clear": + await self._on_clear(unit_id, rule, value, state) + + # -- rule loading (cached) ---------------------------------------------- + + def _get_rules(self, unit_id: str) -> list: + loop_now = asyncio.get_running_loop().time() + cached = self._rule_cache.get(unit_id) + if cached and loop_now - cached[0] < _RULE_CACHE_TTL_S: + return cached[1] + rules = self._load_rules(unit_id) + self._rule_cache[unit_id] = (loop_now, rules) + return rules + + def _load_rules(self, unit_id: str) -> list: + from app.database import SessionLocal + from app.models import AlertRule + db = SessionLocal() + try: + return db.query(AlertRule).filter_by(unit_id=unit_id, enabled=True).all() + except Exception as e: + logger.warning(f"[ALERT] failed to load rules for {unit_id}: {e}") + return [] + finally: + db.close() + + def invalidate(self, unit_id: Optional[str] = None) -> None: + """Drop cached rules so a change is picked up immediately.""" + if unit_id is None: + self._rule_cache.clear() + else: + self._rule_cache.pop(unit_id, None) + + def forget_rule(self, unit_id: str, rule_id: int) -> None: + """Drop a rule's per-(unit, rule) state machine after the rule is edited or + deleted, so a stale 'active' phase / open event_id from the old config + doesn't bleed into the new one (mis-firing a clear or suppressing an onset).""" + self._states.pop((unit_id, rule_id), None) + + # -- scheduling ---------------------------------------------------------- + + def _in_schedule(self, rule) -> bool: + if not rule.schedule_start or not rule.schedule_end: + day_ok = self._day_ok(rule) + return day_ok + local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS) + if not self._day_ok(rule, local): + return False + return _in_window(local.hour * 60 + local.minute, rule.schedule_start, rule.schedule_end) + + @staticmethod + def _day_ok(rule, local: Optional[datetime] = None) -> bool: + if not rule.schedule_days: + return True + if local is None: + local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS) + allowed = {int(d) for d in str(rule.schedule_days).split(",") if d.strip() != ""} + return local.weekday() in allowed # Mon=0 + + # -- event persistence + dispatch --------------------------------------- + + async def _on_onset(self, unit_id: str, rule, value: float, state: RuleState) -> None: + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + try: + evt = AlertEvent( + rule_id=rule.id, unit_id=unit_id, rule_name=rule.name, + metric=rule.metric, threshold_db=rule.threshold_db, + onset_value=value, peak_value=value, status="active", + ) + db.add(evt) + db.commit() + db.refresh(evt) + state.event_id = evt.id + except Exception as e: + logger.warning(f"[ALERT] failed to record onset for {unit_id}: {e}") + finally: + db.close() + await self._dispatch( + "ONSET", unit_id, rule, + f"{rule.metric.upper()}={value:.1f} dB " + f"{'<' if rule.comparison == 'below' else '>'} {rule.threshold_db:.1f} dB" + f"{f' for {rule.duration_s}s' if rule.duration_s else ''}", + ) + + async def _on_clear(self, unit_id: str, rule, value: float, state: RuleState) -> None: + peak = state.peak + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + try: + if state.event_id is not None: + evt = db.query(AlertEvent).filter_by(id=state.event_id).first() + if evt: + evt.clear_at = datetime.utcnow() + evt.peak_value = peak + evt.status = "cleared" + db.commit() + except Exception as e: + logger.warning(f"[ALERT] failed to record clear for {unit_id}: {e}") + finally: + db.close() + state.event_id = None + await self._dispatch( + "CLEAR", unit_id, rule, + f"recovered to {value:.1f} dB (peak {peak:.1f} dB)", + ) + + # -- connectivity (device offline/online) ------------------------------- + # + # Raised by the live monitor when it loses / regains contact with a device. + # Persisted as an AlertEvent (sentinel rule_id=0, metric="connectivity") so it + # lands in the same events/inbox/ack pipeline as threshold alerts. The in-memory + # map dedupes; the DB query also dedupes across a process restart. + + async def device_offline(self, unit_id: str) -> None: + if unit_id in self._offline_events: + return # already flagged offline + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + try: + existing = db.query(AlertEvent).filter_by( + unit_id=unit_id, metric="connectivity", status="active").first() + if existing: # already open in the DB (e.g. carried across a restart) + self._offline_events[unit_id] = existing.id + return + evt = AlertEvent( + rule_id=0, unit_id=unit_id, rule_name="Device unreachable", + metric="connectivity", threshold_db=0.0, status="active", + ) + db.add(evt) + db.commit() + db.refresh(evt) + self._offline_events[unit_id] = evt.id + except Exception as e: + logger.warning(f"[ALERT] failed to record offline for {unit_id}: {e}") + finally: + db.close() + await self._dispatch_raw("OFFLINE", unit_id, "Device unreachable", + "live monitor lost contact with the device") + + async def device_online(self, unit_id: str) -> None: + self._offline_events.pop(unit_id, None) + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + cleared = 0 + try: + opened = db.query(AlertEvent).filter_by( + unit_id=unit_id, metric="connectivity", status="active").all() + for evt in opened: + evt.clear_at = datetime.utcnow() + evt.status = "cleared" + cleared += 1 + if cleared: + db.commit() + except Exception as e: + logger.warning(f"[ALERT] failed to record online for {unit_id}: {e}") + finally: + db.close() + if cleared: # only announce recovery if it was actually flagged offline + await self._dispatch_raw("ONLINE", unit_id, "Device recovered", + "live monitor regained contact with the device") + + # -- event persistence + dispatch --------------------------------------- + + async def _dispatch(self, kind: str, unit_id: str, rule, detail: str) -> None: + await self._dispatch_raw(kind, unit_id, rule.name, detail) + + async def _dispatch_raw(self, kind: str, unit_id: str, name: str, detail: str) -> None: + """POC dispatch: server log. Swap in a Terra-View webhook (email/SMS) here.""" + logger.warning(f"[ALERT:{kind}] {unit_id} '{name}': {detail}") + + +# Module-level singleton (the monitor calls alert_evaluator.evaluate per snapshot) +alert_evaluator = AlertEvaluator() diff --git a/app/background_poller.py b/app/background_poller.py index 64bcc60..0148671 100644 --- a/app/background_poller.py +++ b/app/background_poller.py @@ -8,6 +8,7 @@ for fast API access without querying devices on every request. import asyncio import logging +import os from datetime import datetime, timedelta from typing import Optional @@ -20,6 +21,11 @@ from app.device_logger import log_device_event, cleanup_old_logs logger = logging.getLogger(__name__) +# Global polling default. Set SLMM_POLLING_ENABLED=false to start an instance in +# standby (running but not polling and not holding device connections) — e.g. a +# dev box that must not latch onto a device that a prod instance owns. +POLLING_ENABLED_DEFAULT = os.getenv("SLMM_POLLING_ENABLED", "true").lower() == "true" + class BackgroundPoller: """ @@ -39,6 +45,7 @@ class BackgroundPoller: self._logger = logger self._last_cleanup = None # Track last log cleanup time self._last_pool_log = None # Track last connection pool heartbeat log + self._active = POLLING_ENABLED_DEFAULT # Global polling on/off (standby toggle) async def start(self): """Start the background polling task.""" @@ -71,15 +78,48 @@ class BackgroundPoller: self._logger.info("Background poller stopped") + def is_active(self) -> bool: + """Whether background polling is currently active (vs standby).""" + return self._active + + async def set_active(self, active: bool): + """Globally enable/disable polling at runtime. + + When deactivated, the loop stays alive but polls nothing and releases all + device connections, so this SLMM instance stops occupying the devices' + single connection slots (e.g. so a prod instance can take over). Runtime + state only — on restart the instance returns to SLMM_POLLING_ENABLED. + """ + self._active = active + if active: + self._logger.info("[SYSTEM] Background polling ACTIVATED") + else: + self._logger.info("[SYSTEM] Background polling DEACTIVATED (standby) — releasing connections") + await self._release_all_connections() + + async def _release_all_connections(self): + """Gracefully close every pooled device connection (no-op if none).""" + from app.services import _connection_pool + for device_key in list(_connection_pool.get_stats().get("connections", {})): + await _connection_pool.discard(device_key) + async def _poll_loop(self): """Main polling loop that runs continuously.""" self._logger.info("Background polling loop started") while self._running: - try: - await self._poll_all_devices() - except Exception as e: - self._logger.error(f"Error in poll loop: {e}", exc_info=True) + if self._active: + try: + await self._poll_all_devices() + except Exception as e: + self._logger.error(f"Error in poll loop: {e}", exc_info=True) + else: + # Standby: poll nothing, and keep holding no device connection slots + # so another SLMM instance (e.g. prod) can talk to the devices. + try: + await self._release_all_connections() + except Exception as e: + self._logger.warning(f"Standby connection release failed: {e}") # Run log cleanup once per hour try: @@ -138,10 +178,19 @@ class BackgroundPoller: now = datetime.utcnow() polled_count = 0 + from app.monitor import monitor_manager + for cfg in configs: if not self._running: break + # Skip units with an active live monitor: it polls them at ~1Hz and + # keeps the status cache fresh, so a redundant background poll would just + # add load/lock-contention on the device's single connection. + if monitor_manager.is_active(cfg.unit_id): + self._logger.debug(f"Skipping {cfg.unit_id} — live monitor active") + continue + # Get current status status = db.query(NL43Status).filter_by(unit_id=cfg.unit_id).first() diff --git a/app/main.py b/app/main.py index 4db5847..d0b6145 100644 --- a/app/main.py +++ b/app/main.py @@ -38,6 +38,25 @@ async def lifespan(app: FastAPI): await poller.start() logger.info("Background poller started") + # Auto-start keepalive live monitors for units configured for 24/7 monitoring + # (monitor_enabled). This is what keeps alerting running unattended across + # restarts — without it a feed only runs while someone has the live view open. + try: + from app.monitor import monitor_manager + from app.database import SessionLocal + from app.models import NL43Config + db = SessionLocal() + try: + units = db.query(NL43Config).filter_by(monitor_enabled=True, tcp_enabled=True).all() + for cfg in units: + m = await monitor_manager.get(cfg.unit_id) + await m.set_keepalive(True) + logger.info(f"Auto-started keepalive monitor for {cfg.unit_id}") + finally: + db.close() + except Exception as e: + logger.error(f"Failed to auto-start monitors: {e}") + yield # Application runs # Shutdown @@ -52,7 +71,7 @@ async def lifespan(app: FastAPI): app = FastAPI( title="SLMM NL43 Addon", description="Standalone module for NL43 configuration and status APIs with background polling", - version="0.3.0", + version="0.4.0", lifespan=lifespan, ) diff --git a/app/models.py b/app/models.py index 4c86514..c60ec6b 100644 --- a/app/models.py +++ b/app/models.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, String, DateTime, Boolean, Integer, Text, func +from sqlalchemy import Column, String, DateTime, Boolean, Integer, Float, Text, func from app.database import Base @@ -23,6 +23,10 @@ class NL43Config(Base): poll_interval_seconds = Column(Integer, nullable=True, default=60) # Polling interval (10-3600 seconds) poll_enabled = Column(Boolean, default=True) # Enable/disable background polling for this device + # Live monitor (fan-out DOD feed). Keepalive runs it 24/7 even with no viewer, + # which is what makes alerting continuous. On by default; toggleable from the UI. + monitor_enabled = Column(Boolean, default=True) + class NL43Status(Base): """ @@ -41,6 +45,8 @@ class NL43Status(Base): lmax = Column(String, nullable=True) # Maximum level lmin = Column(String, nullable=True) # Minimum level lpeak = Column(String, nullable=True) # Peak level + ln1 = Column(String, nullable=True) # Percentile slot LN1 (configurable; device default L5, contract L1) + ln2 = Column(String, nullable=True) # Percentile slot LN2 (configurable; device default L10) battery_level = Column(String, nullable=True) power_source = Column(String, nullable=True) sd_remaining_mb = Column(String, nullable=True) @@ -72,3 +78,74 @@ class DeviceLog(Base): level = Column(String, default="INFO") # DEBUG, INFO, WARNING, ERROR category = Column(String, default="GENERAL") # TCP, FTP, POLL, COMMAND, STATE, SYNC message = Column(Text, nullable=False) + + +class AlertRule(Base): + """A threshold-alert rule evaluated against a unit's live monitor feed. + + Source-agnostic: today it runs over the DOD monitor; the same rule transfers + unchanged if a unit's feed is later sourced from FTP intervals. + """ + + __tablename__ = "alert_rules" + + id = Column(Integer, primary_key=True, autoincrement=True) + unit_id = Column(String, index=True, nullable=False) + name = Column(String, nullable=False, default="Alert") + metric = Column(String, nullable=False, default="lp") # lp/leq/lmax/lmin/lpeak/ln1/ln2 + comparison = Column(String, nullable=False, default="above") # above | below + threshold_db = Column(Float, nullable=False) + duration_s = Column(Integer, nullable=False, default=0) # sustained seconds (0 = instant) + clear_margin_db = Column(Float, nullable=False, default=2.0) # hysteresis band + cooldown_s = Column(Integer, nullable=False, default=300) # min seconds between onsets + # Optional time-of-day scoping (local time). schedule_start/end as "HH:MM"; + # null = always active. schedule_days = CSV of 0-6 (Mon=0); null = every day. + schedule_start = Column(String, nullable=True) + schedule_end = Column(String, nullable=True) + schedule_days = Column(String, nullable=True) + channels = Column(String, nullable=False, default="log") # CSV: log,email,sms + recipients = Column(Text, nullable=True) # CSV of emails/phones + enabled = Column(Boolean, default=True) + created_at = Column(DateTime, default=func.now()) + + +class AlertEvent(Base): + """A fired alert (onset → clear), for history / inbox / acknowledgement.""" + + __tablename__ = "alert_events" + + id = Column(Integer, primary_key=True, autoincrement=True) + rule_id = Column(Integer, index=True, nullable=False) + unit_id = Column(String, index=True, nullable=False) + rule_name = Column(String, nullable=True) + metric = Column(String, nullable=False) + threshold_db = Column(Float, nullable=False) + onset_at = Column(DateTime, default=func.now(), index=True) + onset_value = Column(Float, nullable=True) + peak_value = Column(Float, nullable=True) + clear_at = Column(DateTime, nullable=True) + status = Column(String, default="active") # active | cleared + acknowledged_at = Column(DateTime, nullable=True) + acknowledged_by = Column(String, nullable=True) + notes = Column(Text, nullable=True) + + +class NL43Reading(Base): + """Downsampled time-series of live-monitor readings, for the live-chart + backfill (so a viewer sees recent trend on open, not a blank chart). + + Viewing only — NOT the report source. Reports use the device's authoritative + FTP .rnd intervals. This is a short, capped trail (one row/minute, pruned to + a retention window) fed by the monitor's keepalive poll loop. + """ + + __tablename__ = "nl43_readings" + + id = Column(Integer, primary_key=True, autoincrement=True) + unit_id = Column(String, index=True, nullable=False) + timestamp = Column(DateTime, default=func.now(), index=True) + lp = Column(String, nullable=True) + leq = Column(String, nullable=True) + lmax = Column(String, nullable=True) + ln1 = Column(String, nullable=True) + ln2 = Column(String, nullable=True) diff --git a/app/monitor.py b/app/monitor.py new file mode 100644 index 0000000..25539bc --- /dev/null +++ b/app/monitor.py @@ -0,0 +1,322 @@ +""" +Per-device live monitor (fan-out hub). + +ONE DOD poll loop per device, broadcast to many subscribers: +- browser WebSocket clients (live view) — they no longer each open their own + device stream, so the NL43's single-connection limit stops causing the + "second viewer sees nothing" contention. +- the alert evaluator (threshold alerts), which can keep a device's feed running + even with no browser attached. +- persistence (each snapshot is written to NL43Status, like the poller does). + +The device's one TCP connection is respected: every poll goes through the same +per-device lock + connection pool in services.py, so the monitor, the background +poller, and on-demand commands all serialize safely. +""" + +import asyncio +import logging +import os +from datetime import datetime +from typing import Dict, Optional, Set + +from app.database import SessionLocal +from app.models import NL43Config, NL43Status +from app.services import NL43Client, persist_snapshot +from app.alerts import alert_evaluator + +logger = logging.getLogger(__name__) + +# Extra idle between DOD polls WHEN A BROWSER IS WATCHING. The 1s device rate-limit +# already paces consecutive DOD? commands, so this just needs to be small — the +# rate-limit is the real floor (~1.25s/poll effective). +MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "0.25")) + +# Idle cadence when NO browser is subscribed and the feed is only kept alive for +# alerting. Same data, ~8x fewer polls -> ~8x less cellular traffic on a metered +# SIM (~1 GB/device/month at full rate -> ~125 MB). NOTE: this also sets the alert +# sampling resolution when nobody is watching, so keep it <= the smallest alert +# duration_s you rely on (default 10s comfortably catches a "sustained 30/60s" rule). +MONITOR_IDLE_POLL_INTERVAL = float(os.getenv("MONITOR_IDLE_POLL_INTERVAL", "10")) + +# Exponential backoff once the device is unreachable, so a powered-off / asleep / +# out-of-signal device stops churning reconnects every cycle (log spam + a trickle +# of wasted cellular data on failed SYNs). delay = min(BASE * 2**(fails-1), MAX), +# reset to full-rate on the first good poll. While a browser is actively watching we +# cap the backoff lower (WATCHED_MAX) so a recovery surfaces quickly for the viewer. +MONITOR_BACKOFF_BASE_S = float(os.getenv("MONITOR_BACKOFF_BASE_S", "1")) +MONITOR_BACKOFF_MAX_S = float(os.getenv("MONITOR_BACKOFF_MAX_S", "60")) +MONITOR_BACKOFF_WATCHED_MAX_S = float(os.getenv("MONITOR_BACKOFF_WATCHED_MAX_S", "5")) + +# How often to refresh the run state (Measure?). It changes rarely, so we cache it +# and skip that second rate-limited command on most polls — roughly halving the +# per-update latency (~2.5s -> ~1.3s). +MONITOR_STATE_REFRESH_S = float(os.getenv("MONITOR_STATE_REFRESH_S", "30")) + +# Downsampled trail for the live-chart backfill: store one reading per +# TRAIL_SAMPLE_S and keep TRAIL_RETENTION_HOURS of it (pruned). Viewing only — +# reports use the device's FTP .rnd data, not this. +TRAIL_SAMPLE_S = float(os.getenv("MONITOR_TRAIL_SAMPLE_S", "60")) +TRAIL_RETENTION_HOURS = float(os.getenv("MONITOR_TRAIL_RETENTION_HOURS", "24")) + +# If nothing has been broadcast in this many seconds (e.g. device offline and +# silent), send a keepalive frame so reverse proxies don't drop the idle WS. +MONITOR_HEARTBEAT_S = float(os.getenv("MONITOR_HEARTBEAT_S", "25")) + + +def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict: + """Build the broadcast payload — same shape as the DRD stream, but DOD-sourced + so it carries ln1/ln2 (which DRD cannot).""" + return { + "unit_id": unit_id, + "timestamp": datetime.utcnow().isoformat(), + "measurement_state": snap.measurement_state, + "measurement_start_time": measurement_start_time, + "counter": snap.counter, + "lp": snap.lp, + "leq": snap.leq, + "lmax": snap.lmax, + "lmin": snap.lmin, + "lpeak": snap.lpeak, + "ln1": snap.ln1, + "ln2": snap.ln2, + "raw_payload": snap.raw_payload, + } + + +class DeviceMonitor: + """Owns a single DOD poll loop for one device and fans each snapshot out to + all subscribers. Runs while it has at least one browser subscriber OR the + server-side keep-alive (alerting) flag is set.""" + + def __init__(self, unit_id: str): + self.unit_id = unit_id + self._subscribers: Set[asyncio.Queue] = set() + self._keepalive = False + self._task: Optional[asyncio.Task] = None + self._lock = asyncio.Lock() + self._last_payload: Optional[dict] = None # replayed to new subscribers + self._consec_fail = 0 + self._reachable = True # last broadcast reachability (for transition frames) + self._cached_state: Optional[str] = None # run state, refreshed periodically + self._last_state_refresh = 0.0 + self._last_trail_store = 0.0 # downsample throttle for the backfill trail + + @property + def running(self) -> bool: + return self._task is not None and not self._task.done() + + def subscriber_count(self) -> int: + return len(self._subscribers) + + def _has_demand(self) -> bool: + return bool(self._subscribers) or self._keepalive + + def _ensure_task(self) -> None: + if self._task is None or self._task.done(): + self._task = asyncio.create_task(self._run()) + + async def subscribe(self) -> asyncio.Queue: + q: asyncio.Queue = asyncio.Queue(maxsize=5) + async with self._lock: + self._subscribers.add(q) + # Replay the last frame so a client connecting mid-stream sees data + # (or the current 'unreachable' state) immediately, not after a poll. + if self._last_payload is not None: + try: + q.put_nowait(self._last_payload) + except asyncio.QueueFull: + pass + self._ensure_task() + return q + + async def unsubscribe(self, q: asyncio.Queue) -> None: + async with self._lock: + self._subscribers.discard(q) + + async def set_keepalive(self, on: bool) -> None: + async with self._lock: + self._keepalive = on + if on: + self._ensure_task() + + async def _run(self) -> None: + logger.info(f"[MONITOR] {self.unit_id}: feed started") + loop = asyncio.get_running_loop() + last_send = loop.time() + try: + while self._has_demand(): + snap, mst = await self._poll_once() + if snap is not None: + if not self._reachable: + # Recovered from an outage — clear the connectivity alert. + try: + await alert_evaluator.device_online(self.unit_id) + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: online alert failed: {e}") + self._consec_fail = 0 + self._reachable = True + payload = _snapshot_payload(snap, self.unit_id, mst) + payload["feed_status"] = "ok" + self._broadcast(payload) + last_send = loop.time() + try: + await alert_evaluator.evaluate(self.unit_id, snap) + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}") + else: + # Tell clients the device went offline — once, on transition, after a + # few failures so a momentary blip doesn't flap the UI. Same edge + # raises the device-offline alert. + self._consec_fail += 1 + if self._reachable and self._consec_fail >= 3: + self._reachable = False + self._broadcast({ + "unit_id": self.unit_id, + "timestamp": datetime.utcnow().isoformat(), + "feed_status": "unreachable", + }) + last_send = loop.time() + try: + await alert_evaluator.device_offline(self.unit_id) + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: offline alert failed: {e}") + + # Heartbeat: during quiet/offline stretches, send a keepalive so an + # idle WS isn't dropped by a reverse proxy. Not cached (new subscribers + # should still get the last real frame, not a heartbeat). + if loop.time() - last_send >= MONITOR_HEARTBEAT_S: + self._broadcast({ + "unit_id": self.unit_id, + "timestamp": datetime.utcnow().isoformat(), + "feed_status": "ok" if self._reachable else "unreachable", + "heartbeat": True, + }, cache=False) + last_send = loop.time() + + await asyncio.sleep(self._next_delay()) + finally: + logger.info(f"[MONITOR] {self.unit_id}: feed stopped") + + def _next_delay(self) -> float: + """Inter-poll delay: exponential backoff while unreachable, full-rate while a + browser is watching, relaxed cadence when the feed is keepalive-only.""" + if self._consec_fail > 0: + shift = min(self._consec_fail - 1, 6) # cap growth at 2**6 = 64x base + delay = min(MONITOR_BACKOFF_BASE_S * (2 ** shift), MONITOR_BACKOFF_MAX_S) + if self._subscribers: + delay = min(delay, MONITOR_BACKOFF_WATCHED_MAX_S) + return delay + if self._subscribers: + return MONITOR_POLL_INTERVAL # a browser is watching — smooth chart + return MONITOR_IDLE_POLL_INTERVAL # keepalive-only (alerting) — save data + + async def _poll_once(self): + """One DOD poll: read, persist, return (snapshot, measurement_start_iso).""" + db = SessionLocal() + try: + cfg = db.query(NL43Config).filter_by(unit_id=self.unit_id).first() + if not cfg or not cfg.tcp_enabled: + return None, None + client = NL43Client( + cfg.host, cfg.tcp_port, + ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password, + ftp_port=cfg.ftp_port or 21, + ) + # Refresh the run state only every MONITOR_STATE_REFRESH_S; reuse the + # cached state otherwise so most polls send just DOD? (one rate-limited + # command) instead of DOD? + Measure?. + now = asyncio.get_running_loop().time() + refresh_state = (self._cached_state is None + or now - self._last_state_refresh >= MONITOR_STATE_REFRESH_S) + snap = await client.request_dod( + measurement_state=None if refresh_state else self._cached_state + ) + if refresh_state: + self._cached_state = snap.measurement_state + self._last_state_refresh = now + snap.unit_id = self.unit_id + persist_snapshot(snap, db) + db.commit() + # Append to the downsampled backfill trail (~one row per TRAIL_SAMPLE_S). + if now - self._last_trail_store >= TRAIL_SAMPLE_S: + self._last_trail_store = now + self._store_trail(snap, db) + status = db.query(NL43Status).filter_by(unit_id=self.unit_id).first() + mst = (status.measurement_start_time.isoformat() + if status and status.measurement_start_time else None) + return snap, mst + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: poll failed: {e}") + return None, None + finally: + db.close() + + def _store_trail(self, snap, db) -> None: + """Append one downsampled reading to the backfill trail and prune old rows.""" + from datetime import datetime, timedelta + from app.models import NL43Reading + try: + db.add(NL43Reading( + unit_id=self.unit_id, timestamp=datetime.utcnow(), + lp=snap.lp, leq=snap.leq, lmax=snap.lmax, ln1=snap.ln1, ln2=snap.ln2, + )) + cutoff = datetime.utcnow() - timedelta(hours=TRAIL_RETENTION_HOURS) + db.query(NL43Reading).filter( + NL43Reading.unit_id == self.unit_id, + NL43Reading.timestamp < cutoff, + ).delete() + db.commit() + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: trail store failed: {e}") + + def _broadcast(self, payload: dict, cache: bool = True) -> None: + if cache: + self._last_payload = payload # replayed to new subscribers + for q in list(self._subscribers): + try: + q.put_nowait(payload) + except asyncio.QueueFull: + # Slow consumer — drop this frame rather than stall the whole feed. + pass + + +class MonitorManager: + """Registry of per-device monitors (one per unit_id).""" + + def __init__(self): + self._monitors: Dict[str, DeviceMonitor] = {} + self._lock = asyncio.Lock() + + async def get(self, unit_id: str) -> DeviceMonitor: + async with self._lock: + m = self._monitors.get(unit_id) + if m is None: + m = DeviceMonitor(unit_id) + self._monitors[unit_id] = m + return m + + def is_active(self, unit_id: str) -> bool: + """True if this unit has a running monitor feed (so the background poller + can skip it — the monitor already polls it more often).""" + m = self._monitors.get(unit_id) + return m is not None and m.running + + def status(self) -> dict: + return { + uid: { + "running": m.running, + "subscribers": m.subscriber_count(), + "keepalive": m._keepalive, + "reachable": m._reachable, + # what cadence the loop is currently using, for observability + "mode": ("backoff" if m._consec_fail > 0 + else "watched" if m._subscribers + else "idle"), + } + for uid, m in self._monitors.items() + } + + +# Module-level singleton +monitor_manager = MonitorManager() diff --git a/app/routers.py b/app/routers.py index a21c928..ac455ad 100644 --- a/app/routers.py +++ b/app/routers.py @@ -11,7 +11,7 @@ import os import asyncio from app.database import get_db -from app.models import NL43Config, NL43Status +from app.models import NL43Config, NL43Status, AlertRule, AlertEvent, NL43Reading from app.services import NL43Client, persist_snapshot logger = logging.getLogger(__name__) @@ -121,6 +121,392 @@ async def flush_connection_pool(): return {"status": "ok", "message": "All cached connections closed"} +@router.post("/{unit_id}/disconnect") +async def disconnect_device(unit_id: str, db: Session = Depends(get_db)): + """Cleanly close SLMM's persistent TCP connection to a single device. + + Gracefully closes (TCP FIN + wait_closed) the pooled connection for this + device and removes it from the pool, freeing the NL43's single connection + slot. Idempotent — a no-op if no connection is currently cached. + + Note: this releases the *idle* pooled connection. It does not interrupt an + in-progress DRD stream or an in-flight command (those have the socket + checked out of the pool) — close the stream WebSocket to end a live stream. + """ + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if not cfg: + raise HTTPException(status_code=404, detail="NL43 config not found") + + from app.services import _connection_pool + + device_key = f"{cfg.host}:{cfg.tcp_port}" + had_conn = device_key in _connection_pool.get_stats().get("connections", {}) + + await _connection_pool.discard(device_key) + + return { + "status": "ok", + "unit_id": unit_id, + "device_key": device_key, + "disconnected": had_conn, + "message": "Connection closed" if had_conn else "No cached connection to close", + } + + +@router.post("/{unit_id}/deactivate") +async def deactivate_device(unit_id: str, db: Session = Depends(get_db)): + """Make a single unit dormant: stop background polling for it AND drop its + connection, freeing the device's connection slot. poll_enabled=False is + persisted, so the unit stays dormant across restarts until /activate. + """ + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if not cfg: + raise HTTPException(status_code=404, detail="NL43 config not found") + + cfg.poll_enabled = False + db.commit() + + from app.services import _connection_pool, _get_device_lock + + device_key = f"{cfg.host}:{cfg.tcp_port}" + + # Wait briefly for any in-flight poll/command to finish (so its connection is + # back in the pool), then drop it. If a long-lived stream holds the lock we + # don't block forever — discard the pooled connection regardless. + lock = await _get_device_lock(device_key) + acquired = False + try: + await asyncio.wait_for(lock.acquire(), timeout=10.0) + acquired = True + except asyncio.TimeoutError: + acquired = False + try: + await _connection_pool.discard(device_key) + finally: + if acquired: + lock.release() + + return { + "status": "ok", + "unit_id": unit_id, + "poll_enabled": False, + "message": "Polling disabled and connection closed for this unit", + } + + +@router.post("/{unit_id}/activate") +async def activate_device(unit_id: str, db: Session = Depends(get_db)): + """Resume background polling for a unit previously deactivated.""" + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if not cfg: + raise HTTPException(status_code=404, detail="NL43 config not found") + + cfg.poll_enabled = True + db.commit() + + return { + "status": "ok", + "unit_id": unit_id, + "poll_enabled": True, + "message": "Polling enabled for this unit", + } + + +@router.get("/_system/status") +async def system_status(): + """Report whether this SLMM instance is actively polling or in standby.""" + from app.background_poller import poller + from app.services import _connection_pool + return { + "status": "ok", + "mode": "active" if poller.is_active() else "standby", + "polling_active": poller.is_active(), + "active_connections": _connection_pool.get_stats().get("active_connections", 0), + } + + +@router.post("/_system/standby") +async def system_standby(): + """Put this SLMM instance into standby: stop polling ALL devices and release + every connection, so it stops occupying device slots (e.g. so a prod instance + can take over). Runtime-only — on restart the instance returns to its + SLMM_POLLING_ENABLED default. + """ + from app.background_poller import poller + await poller.set_active(False) + return {"status": "ok", "mode": "standby", + "message": "Polling stopped and all device connections released"} + + +@router.post("/_system/resume") +async def system_resume(): + """Resume polling after standby (global).""" + from app.background_poller import poller + await poller.set_active(True) + return {"status": "ok", "mode": "active", "message": "Polling resumed"} + + +# ============================================================================ +# LIVE MONITOR (fan-out) — one DOD feed per device, broadcast to many clients +# ============================================================================ + +@router.websocket("/{unit_id}/monitor") +async def monitor_stream(websocket: WebSocket, unit_id: str): + """Subscribe a browser to the device's shared 1 Hz DOD feed. + + Any number of clients can attach without each opening its own device + connection (one poll loop per device, fanned out). Same JSON shape as the + DRD stream, but DOD-sourced so it includes ln1/ln2 (L1/L10). + """ + await websocket.accept() + from app.monitor import monitor_manager + + monitor = await monitor_manager.get(unit_id) + queue = await monitor.subscribe() + logger.info(f"Monitor subscriber attached for {unit_id} ({monitor.subscriber_count()} total)") + + async def _watch_disconnect(): + # Completes when the client disconnects, so an idle feed (no data) still + # detects the drop and we don't leak a subscription that keeps the device + # feed (and its connection) alive. + try: + while True: + msg = await websocket.receive() + if msg.get("type") == "websocket.disconnect": + return + except Exception: + return + + gone = asyncio.ensure_future(_watch_disconnect()) + try: + while not gone.done(): + try: + payload = await asyncio.wait_for(queue.get(), timeout=1.0) + except asyncio.TimeoutError: + continue # re-check gone.done() + if gone.done(): + break # client disconnected while we waited — don't send into a closing socket + await websocket.send_json(payload) + except WebSocketDisconnect: + logger.info(f"Monitor subscriber disconnected for {unit_id}") + except Exception as e: + # A frame that races the close (client vanished mid-send) surfaces as + # "Unexpected ASGI message 'websocket.send' after ... websocket.close". + # That's expected on disconnect (the portal closes the socket on every tab + # switch), not an error — log it quietly. + msg = str(e) + if "after sending" in msg or "websocket.close" in msg or "response already completed" in msg: + logger.debug(f"Monitor stream for {unit_id} closed mid-send (client gone)") + else: + logger.warning(f"Monitor stream error for {unit_id}: {e}") + finally: + gone.cancel() + await monitor.unsubscribe(queue) + + +@router.post("/{unit_id}/monitor/start") +async def monitor_start(unit_id: str, db: Session = Depends(get_db)): + """Enable 24/7 keepalive monitoring: persist monitor_enabled and start the feed + now, so alerting evaluates continuously even with no viewer. Survives restarts + (auto-started on boot from the persisted flag).""" + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if cfg: + cfg.monitor_enabled = True + db.commit() + from app.monitor import monitor_manager + monitor = await monitor_manager.get(unit_id) + await monitor.set_keepalive(True) + return {"status": "ok", "unit_id": unit_id, "monitor_enabled": True, "running": monitor.running} + + +@router.post("/{unit_id}/monitor/stop") +async def monitor_stop(unit_id: str, db: Session = Depends(get_db)): + """Disable keepalive monitoring: persist monitor_enabled=False and drop the + keepalive (the feed stops once no browser subscribers remain).""" + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if cfg: + cfg.monitor_enabled = False + db.commit() + from app.monitor import monitor_manager + monitor = await monitor_manager.get(unit_id) + await monitor.set_keepalive(False) + return {"status": "ok", "unit_id": unit_id, "monitor_enabled": False} + + +@router.get("/_monitor/status") +async def monitor_status(): + """Status of every device monitor (running, subscriber count, keep-alive).""" + from app.monitor import monitor_manager + return {"status": "ok", "monitors": monitor_manager.status()} + + +@router.get("/{unit_id}/history") +def get_monitor_history(unit_id: str, hours: float = 2.0, db: Session = Depends(get_db)): + """Recent downsampled monitor readings (the DOD trail) for the live-chart + backfill. Viewing only — NOT the FTP report data.""" + from datetime import timedelta + hours = max(0.1, min(hours, 48.0)) + cutoff = datetime.utcnow() - timedelta(hours=hours) + rows = (db.query(NL43Reading) + .filter(NL43Reading.unit_id == unit_id, NL43Reading.timestamp >= cutoff) + .order_by(NL43Reading.timestamp.asc()).all()) + return { + "status": "ok", + "unit_id": unit_id, + "hours": hours, + "count": len(rows), + "readings": [ + { + "timestamp": r.timestamp.isoformat() if r.timestamp else None, + "lp": r.lp, "leq": r.leq, "lmax": r.lmax, "ln1": r.ln1, "ln2": r.ln2, + } + for r in rows + ], + } + + +# ============================================================================ +# ALERTS — threshold rules + fired events +# ============================================================================ + +class AlertRulePayload(BaseModel): + name: str = "Alert" + metric: str = "lp" # lp/leq/lmax/lmin/lpeak/ln1/ln2 + comparison: str = "above" # above | below + threshold_db: float + duration_s: int = 0 # sustained seconds before firing (0 = instant) + clear_margin_db: float = 2.0 # hysteresis band + cooldown_s: int = 300 + schedule_start: str | None = None # "HH:MM" local; null = always + schedule_end: str | None = None + schedule_days: str | None = None # CSV of 0-6 (Mon=0); null = every day + channels: str = "log" + recipients: str | None = None + enabled: bool = True + + +def _rule_dict(r: AlertRule) -> dict: + return { + "id": r.id, "unit_id": r.unit_id, "name": r.name, "metric": r.metric, + "comparison": r.comparison, "threshold_db": r.threshold_db, + "duration_s": r.duration_s, "clear_margin_db": r.clear_margin_db, + "cooldown_s": r.cooldown_s, "schedule_start": r.schedule_start, + "schedule_end": r.schedule_end, "schedule_days": r.schedule_days, + "channels": r.channels, "recipients": r.recipients, "enabled": r.enabled, + } + + +def _event_dict(e: AlertEvent) -> dict: + return { + "id": e.id, "rule_id": e.rule_id, "unit_id": e.unit_id, + "rule_name": e.rule_name, "metric": e.metric, "threshold_db": e.threshold_db, + "onset_at": e.onset_at.isoformat() if e.onset_at else None, + "onset_value": e.onset_value, "peak_value": e.peak_value, + "clear_at": e.clear_at.isoformat() if e.clear_at else None, + "status": e.status, + "acknowledged_at": e.acknowledged_at.isoformat() if e.acknowledged_at else None, + "acknowledged_by": e.acknowledged_by, + } + + +async def _sync_keepalive_to_rules(unit_id: str, db: Session): + """Keep a unit's monitor running while it has enabled alert rules, so the + evaluator runs 24/7 even with no browser watching. Turns keepalive ON (and + persists monitor_enabled so it survives a restart via the boot auto-start) + when enabled rules exist; never turns it OFF — a device may be kept alive for + other reasons, so operators control that on /admin/slmm.""" + has_enabled = (db.query(AlertRule) + .filter_by(unit_id=unit_id, enabled=True).first() is not None) + if not has_enabled: + return + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if cfg and not cfg.monitor_enabled: + cfg.monitor_enabled = True + db.commit() + from app.monitor import monitor_manager + m = await monitor_manager.get(unit_id) + await m.set_keepalive(True) + + +def _reset_rule_runtime(unit_id: str, rule_id: int, db: Session): + """After a rule edit/delete: drop its evaluator state machine and close any open + event, so a stale 'active' phase doesn't mis-evaluate against the new config and + the client portal doesn't stay 'in alarm' on a rule that changed or is gone.""" + from app.alerts import alert_evaluator + alert_evaluator.forget_rule(unit_id, rule_id) + now = datetime.utcnow() + for evt in db.query(AlertEvent).filter_by(unit_id=unit_id, rule_id=rule_id, status="active").all(): + evt.clear_at = now + evt.status = "cleared" + db.commit() + + +@router.post("/{unit_id}/alerts/rules") +async def create_alert_rule(unit_id: str, payload: AlertRulePayload, db: Session = Depends(get_db)): + rule = AlertRule(unit_id=unit_id, **payload.model_dump()) + db.add(rule) + db.commit() + db.refresh(rule) + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + await _sync_keepalive_to_rules(unit_id, db) + return {"status": "ok", "rule": _rule_dict(rule)} + + +@router.get("/{unit_id}/alerts/rules") +def list_alert_rules(unit_id: str, db: Session = Depends(get_db)): + rules = db.query(AlertRule).filter_by(unit_id=unit_id).all() + return {"status": "ok", "rules": [_rule_dict(r) for r in rules]} + + +@router.put("/{unit_id}/alerts/rules/{rule_id}") +async def update_alert_rule(unit_id: str, rule_id: int, payload: AlertRulePayload, db: Session = Depends(get_db)): + rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first() + if not rule: + raise HTTPException(status_code=404, detail="Alert rule not found") + for field, value in payload.model_dump().items(): + setattr(rule, field, value) + db.commit() + db.refresh(rule) + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + _reset_rule_runtime(unit_id, rule_id, db) + await _sync_keepalive_to_rules(unit_id, db) + return {"status": "ok", "rule": _rule_dict(rule)} + + +@router.delete("/{unit_id}/alerts/rules/{rule_id}") +async def delete_alert_rule(unit_id: str, rule_id: int, db: Session = Depends(get_db)): + rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first() + if not rule: + raise HTTPException(status_code=404, detail="Alert rule not found") + db.delete(rule) + db.commit() + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + _reset_rule_runtime(unit_id, rule_id, db) # close its open event so the portal doesn't stay red + await _sync_keepalive_to_rules(unit_id, db) # no-op if no enabled rules remain + return {"status": "ok", "deleted": rule_id} + + +@router.get("/{unit_id}/alerts/events") +def list_alert_events(unit_id: str, limit: int = 50, db: Session = Depends(get_db)): + events = (db.query(AlertEvent).filter_by(unit_id=unit_id) + .order_by(AlertEvent.onset_at.desc()).limit(limit).all()) + return {"status": "ok", "events": [_event_dict(e) for e in events]} + + +@router.post("/{unit_id}/alerts/events/{event_id}/ack") +def ack_alert_event(unit_id: str, event_id: int, by: str | None = None, db: Session = Depends(get_db)): + evt = db.query(AlertEvent).filter_by(id=event_id, unit_id=unit_id).first() + if not evt: + raise HTTPException(status_code=404, detail="Alert event not found") + evt.acknowledged_at = datetime.utcnow() + evt.acknowledged_by = by + db.commit() + return {"status": "ok", "acknowledged": event_id} + + # ============================================================================ # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # ============================================================================ @@ -197,6 +583,7 @@ def get_roster(db: Session = Depends(get_db)): "web_enabled": cfg.web_enabled, "poll_enabled": cfg.poll_enabled, "poll_interval_seconds": cfg.poll_interval_seconds, + "monitor_enabled": cfg.monitor_enabled, "status": None } @@ -445,11 +832,14 @@ def get_status(unit_id: str, db: Session = Depends(get_db)): "unit_id": unit_id, "last_seen": status.last_seen.isoformat() if status.last_seen else None, "measurement_state": status.measurement_state, + "measurement_start_time": status.measurement_start_time.isoformat() if status.measurement_start_time else None, "lp": status.lp, "leq": status.leq, "lmax": status.lmax, "lmin": status.lmin, "lpeak": status.lpeak, + "ln1": status.ln1, + "ln2": status.ln2, "battery_level": status.battery_level, "power_source": status.power_source, "sd_remaining_mb": status.sd_remaining_mb, @@ -472,6 +862,8 @@ class StatusPayload(BaseModel): lmax: str | None = None lmin: str | None = None lpeak: str | None = None + ln1: str | None = None + ln2: str | None = None battery_level: str | None = None power_source: str | None = None sd_remaining_mb: str | None = None @@ -499,11 +891,14 @@ def upsert_status(unit_id: str, payload: StatusPayload, db: Session = Depends(ge "unit_id": unit_id, "last_seen": status.last_seen.isoformat(), "measurement_state": status.measurement_state, + "measurement_start_time": status.measurement_start_time.isoformat() if status.measurement_start_time else None, "lp": status.lp, "leq": status.leq, "lmax": status.lmax, "lmin": status.lmin, "lpeak": status.lpeak, + "ln1": status.ln1, + "ln2": status.ln2, "battery_level": status.battery_level, "power_source": status.power_source, "sd_remaining_mb": status.sd_remaining_mb, @@ -544,7 +939,7 @@ async def start_measurement(unit_id: str, db: Session = Depends(get_db)): db.expire_all() status = db.query(NL43Status).filter_by(unit_id=unit_id).first() logger.info(f"State check: measurement_state={status.measurement_state if status else 'None'}, start_time={status.measurement_start_time if status else 'None'}") - if status and status.measurement_state == "Measure" and status.measurement_start_time: + if status and status.measurement_state in ("Start", "Measure") and status.measurement_start_time: logger.info(f"✓ Measurement state confirmed for {unit_id} with start time {status.measurement_start_time}") break @@ -1205,6 +1600,8 @@ async def stream_live(websocket: WebSocket, unit_id: str): "lmax": snap.lmax, # Maximum level "lmin": snap.lmin, # Minimum level "lpeak": snap.lpeak, # Peak level + "ln1": snap.ln1, # LN1 percentile (L1/L10 contract); null on DRD stream + "ln2": snap.ln2, # LN2 percentile; null on DRD stream "raw_payload": snap.raw_payload, }) except Exception as e: @@ -1876,6 +2273,8 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)): "lmax": status.lmax, "lmin": status.lmin, "lpeak": status.lpeak, + "ln1": status.ln1, + "ln2": status.ln2, "battery_level": status.battery_level, "power_source": status.power_source, "sd_remaining_mb": status.sd_remaining_mb, diff --git a/app/services.py b/app/services.py index 9375c5c..c608226 100644 --- a/app/services.py +++ b/app/services.py @@ -46,6 +46,8 @@ class NL43Snapshot: lmax: Optional[str] = None # Maximum level lmin: Optional[str] = None # Minimum level lpeak: Optional[str] = None # Peak level + ln1: Optional[str] = None # Percentile slot LN1 (configurable; device default L5, contract L1) + ln2: Optional[str] = None # Percentile slot LN2 (configurable; device default L10) battery_level: Optional[str] = None power_source: Optional[str] = None sd_remaining_mb: Optional[str] = None @@ -69,10 +71,27 @@ def persist_snapshot(s: NL43Snapshot, db: Session): logger.info(f"State transition check for {s.unit_id}: '{previous_state}' -> '{new_state}'") - # Device returns "Start" when measuring, "Stop" when stopped - # Normalize to previous behavior for backward compatibility - is_measuring = new_state == "Start" - was_measuring = previous_state == "Start" + # The device reports "Start" while measuring; the DOD path uses that string, + # but the DRD stream path tags snapshots "Measure" (and the DOD fallback also + # uses "Measure"). Treat ALL of these as "measuring" — otherwise opening and + # closing the live stream flips state "Start"->"Measure"->"Start", which the + # old equality check misread as stop-then-start and reset measurement_start_time. + # + # Also: only act on RECOGNIZED states. A buffer desync on the shared connection + # (e.g. right after a DRD/DOD test) can make a Measure? read return a stray, + # garbled value; treating that as "not measuring" produced constant phantom + # "STOPPED -> STARTED" log pairs and reset the timer. Ignore unknown reads. + MEASURING_STATES = {"Start", "Measure"} + STOPPED_STATES = {"Stop"} + was_measuring = previous_state in MEASURING_STATES + + if new_state in MEASURING_STATES: + is_measuring = True + elif new_state in STOPPED_STATES: + is_measuring = False + else: + logger.warning(f"Ignoring unrecognized measurement state for {s.unit_id}: {new_state!r}") + is_measuring = was_measuring # garbled/unknown read — no transition if not was_measuring and is_measuring: # Measurement just started - record the start time @@ -95,13 +114,18 @@ def persist_snapshot(s: NL43Snapshot, db: Session): except Exception: pass - row.measurement_state = new_state + # Only persist a recognized state so one garbled read can't poison the next + # transition check (which would manufacture the phantom STOPPED/STARTED pair). + if new_state in MEASURING_STATES or new_state in STOPPED_STATES: + row.measurement_state = new_state row.counter = s.counter row.lp = s.lp row.leq = s.leq row.lmax = s.lmax row.lmin = s.lmin row.lpeak = s.lpeak + row.ln1 = s.ln1 + row.ln2 = s.ln2 row.battery_level = s.battery_level row.power_source = s.power_source row.sd_remaining_mb = s.sd_remaining_mb @@ -656,10 +680,12 @@ class NL43Client: else: raise ValueError(f"Unknown result code: {result_code}") - async def request_dod(self) -> NL43Snapshot: + async def request_dod(self, measurement_state: Optional[str] = None) -> NL43Snapshot: """Request DOD (Data Output Display) snapshot from device. - Returns parsed measurement data from the device display. + Returns parsed measurement data from the device display. Pass + measurement_state to reuse a cached run state and skip the extra Measure? + round-trip (the state changes rarely); leave it None to query it. """ # _send_command now handles result code validation and returns the data line resp = await self._send_command("DOD?\r\n") @@ -682,31 +708,41 @@ class NL43Client: logger.info(f"Parsed {len(parts)} data points from DOD response") - # Query actual measurement state (DOD doesn't include this information) - try: - measurement_state = await self.get_measurement_state() - except Exception as e: - logger.warning(f"Failed to get measurement state, defaulting to 'Measure': {e}") - measurement_state = "Measure" + # DOD doesn't include the run state. Query it only when not supplied by the + # caller — the monitor passes a cached state most cycles and refreshes it + # occasionally, avoiding a second rate-limited command on every poll. + if measurement_state is None: + try: + measurement_state = await self.get_measurement_state() + except Exception as e: + logger.warning(f"Failed to get measurement state, defaulting to 'Measure': {e}") + measurement_state = "Measure" snap = NL43Snapshot(unit_id="", raw_payload=resp, measurement_state=measurement_state) - # Parse known positions (based on NL43 communication guide - DRD format) - # DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ... + # Parse DOD positional fields. DOD's layout is DIFFERENT from DRD: it has NO + # leading counter and it includes LE plus LN1–LN5. The device returns 4 channels + # of 16 fields each — [Lp, Leq, LE, Lmax, Lmin, LN1, LN2, LN3, LN4, LN5, Lpeak, + # LIeq, Leq_mov, Ltm5, over, under] — and channel 1 (parts[0:16]) is the main + # display. The previous code reused the DRD map (treating parts[0] as a counter), + # which shifted everything: Lp was reported as the counter, Leq as Lp, LE as Leq, + # and LN1 as Lpeak (you could spot it because "Lpeak" came out < Lmax). try: - # Capture d0 (counter) for timer synchronization if len(parts) >= 1: - snap.counter = parts[0] # d0: Measurement interval counter (1-600) + snap.lp = parts[0] # Lp: instantaneous sound pressure level if len(parts) >= 2: - snap.lp = parts[1] # d1: Instantaneous sound pressure level - if len(parts) >= 3: - snap.leq = parts[2] # d2: Equivalent continuous sound level + snap.leq = parts[1] # Leq: equivalent continuous level + # parts[2] = LE (sound exposure level) — not currently surfaced if len(parts) >= 4: - snap.lmax = parts[3] # d3: Maximum level + snap.lmax = parts[3] # Lmax if len(parts) >= 5: - snap.lmin = parts[4] # d4: Minimum level + snap.lmin = parts[4] # Lmin + if len(parts) >= 11: + snap.lpeak = parts[10] # Lpeak (parts[5] is LN1, NOT Lpeak) if len(parts) >= 6: - snap.lpeak = parts[5] # d5: Peak level + snap.ln1 = parts[5] # LN1 percentile slot (device default L5; contract L1) + if len(parts) >= 7: + snap.ln2 = parts[6] # LN2 percentile slot (device default L10) except (IndexError, ValueError) as e: logger.warning(f"Error parsing DOD data points: {e}") diff --git a/migrate_add_ln_percentiles.py b/migrate_add_ln_percentiles.py new file mode 100644 index 0000000..2e27b34 --- /dev/null +++ b/migrate_add_ln_percentiles.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +""" +Migration script to add ln1 and ln2 percentile columns to the nl43_status table. + +The NL-43 DOD response carries percentile slots LN1-LN5; the live SLM display +(Terra-View) shows two of them (default L1/L10). This adds storage for the two +surfaced slots. Run once per database to update existing schema. +""" + +import sqlite3 +import sys +from pathlib import Path + +DB_PATH = Path(__file__).parent / "data" / "slmm.db" + + +def migrate(): + """Add ln1 and ln2 columns to the nl43_status table.""" + + if not DB_PATH.exists(): + print(f"Database not found at {DB_PATH}") + print("No migration needed - database will be created with new schema") + return + + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + try: + cursor.execute("PRAGMA table_info(nl43_status)") + columns = [row[1] for row in cursor.fetchall()] + + if "ln1" in columns and "ln2" in columns: + print("✓ ln1/ln2 columns already exist, no migration needed") + return + + if "ln1" not in columns: + print("Adding ln1 column...") + cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln1 TEXT") + print("✓ Added ln1 column") + + if "ln2" not in columns: + print("Adding ln2 column...") + cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln2 TEXT") + print("✓ Added ln2 column") + + conn.commit() + print("\n✓ Migration completed successfully!") + + except Exception as e: + conn.rollback() + print(f"✗ Migration failed: {e}", file=sys.stderr) + sys.exit(1) + finally: + conn.close() + + +if __name__ == "__main__": + migrate() diff --git a/migrate_add_monitor_enabled.py b/migrate_add_monitor_enabled.py new file mode 100644 index 0000000..8853e61 --- /dev/null +++ b/migrate_add_monitor_enabled.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +""" +Migration: add monitor_enabled column to nl43_config. + +Controls whether the live fan-out DOD monitor is kept alive 24/7 for a unit +(which is what makes alerting continuous). Defaults to enabled. Run once per DB. +""" + +import sqlite3 +import sys +from pathlib import Path + +DB_PATH = Path(__file__).parent / "data" / "slmm.db" + + +def migrate(): + if not DB_PATH.exists(): + print(f"Database not found at {DB_PATH}") + print("No migration needed - database will be created with new schema") + return + + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + try: + cursor.execute("PRAGMA table_info(nl43_config)") + columns = [row[1] for row in cursor.fetchall()] + + if "monitor_enabled" in columns: + print("✓ monitor_enabled column already exists, no migration needed") + return + + print("Adding monitor_enabled column (default enabled)...") + # SQLite stores booleans as 0/1; default 1 = enabled. + cursor.execute("ALTER TABLE nl43_config ADD COLUMN monitor_enabled BOOLEAN DEFAULT 1") + conn.commit() + print("✓ Added monitor_enabled column") + print("\n✓ Migration completed successfully!") + + except Exception as e: + conn.rollback() + print(f"✗ Migration failed: {e}", file=sys.stderr) + sys.exit(1) + finally: + conn.close() + + +if __name__ == "__main__": + migrate() diff --git a/test_alert_evaluator.py b/test_alert_evaluator.py new file mode 100644 index 0000000..5bacc62 --- /dev/null +++ b/test_alert_evaluator.py @@ -0,0 +1,68 @@ +""" +Synthetic unit test for the alert state machine — no DB, no device. + +Drives `_evaluate_step` with a fake clock + a level series and checks that +onset/clear fire with the right debounce + hysteresis. Run: + + docker compose exec -T slmm python3 test_alert_evaluator.py + # or, if app.alerts imports cleanly standalone: python3 test_alert_evaluator.py +""" + +from types import SimpleNamespace +from app.alerts import RuleState, _evaluate_step + + +def rule(**kw): + base = dict(threshold_db=85.0, duration_s=3, clear_margin_db=2.0, comparison="above") + base.update(kw) + return SimpleNamespace(**base) + + +def run(series, r): + st = RuleState() + events = [(now, a) for value, now in series + if (a := _evaluate_step(st, value, now, r))] + return events, st + + +def main(): + failures = 0 + + def check(label, cond, detail=""): + nonlocal failures + print(("PASS" if cond else "FAIL"), label, detail) + if not cond: + failures += 1 + + # 1) sustained exceedance -> onset after duration; recovery -> clear after duration + r = rule(threshold_db=85, duration_s=3, clear_margin_db=2) + ev, _ = run([(80, 0), (86, 1), (87, 2), (88, 3), (88, 4), + (88, 5), (82, 6), (82, 7), (82, 8), (82, 9)], r) + onsets = [t for t, a in ev if a == "onset"] + clears = [t for t, a in ev if a == "clear"] + check("1 sustained onset@4 / clear@9", onsets == [4] and clears == [9], str(ev)) + + # 2) brief spike under duration -> no onset (debounce) + ev, _ = run([(80, 0), (90, 1), (90, 2), (80, 3), (80, 4)], rule(duration_s=3)) + check("2 brief spike debounced", ev == [], str(ev)) + + # 3) hysteresis: a dip into the margin (below threshold, above threshold-margin) + # does NOT clear + r = rule(threshold_db=85, duration_s=0, clear_margin_db=3) + ev, st = run([(86, 0), (84, 1), (84, 2), (84, 3)], r) + check("3 hysteresis holds ACTIVE", ev == [(0, "onset")] and st.phase == "active", + f"{ev} phase={st.phase}") + + # 4) 'below' comparison (device too quiet) -> onset when value < threshold + ev, _ = run([(30, 0), (15, 1)], rule(threshold_db=20, duration_s=0, + clear_margin_db=2, comparison="below")) + check("4 below-comparison onset@1", ev == [(1, "onset")], str(ev)) + + print() + print("ALL PASS" if failures == 0 else f"{failures} FAILURE(S)") + return failures + + +if __name__ == "__main__": + import sys + sys.exit(1 if main() else 0)