Merge pull request 'Update to v 0.4.0' (#6) from dev into main

Reviewed-on: #6
This commit was merged in pull request #6.
This commit is contained in:
2026-06-22 18:07:36 -04:00
12 changed files with 1556 additions and 37 deletions
+46
View File
@@ -5,6 +5,52 @@ All notable changes to SLMM (Sound Level Meter Manager) will be documented in th
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.4.0] - 2026-06-22
### Added
#### Live Monitor (fan-out feed)
- **Per-device fan-out monitor** - one shared, cached live feed per device. Multiple clients (dashboards, portal, charts) subscribe to the same stream instead of each fighting for the NL-43's single TCP connection: one poller reads the device, all subscribers get the same frames.
- **WebSocket monitor** - `WS /api/nl43/{unit_id}/monitor` delivers an instant first frame from cache, then live updates.
- **Monitor control** - `POST /api/nl43/{unit_id}/monitor/{start|stop}`, `GET /api/nl43/_monitor/status`. A persistent `monitor_enabled` flag auto-starts the keepalive on boot.
- **Adaptive polling** - poll rate adapts to demand; unreachable devices back off; a device-offline alert fires when a monitored unit drops.
- **De-duplication** - the background poller skips units already covered by an active monitor (no double-polling); a heartbeat keeps the feed warm.
- **Lower latency** - the monitor caches run state, roughly halving live-feed latency; fan-out emits an instant first frame + offline status to new clients.
#### Alert Engine
- **Threshold rules** - per-device alert rules (metric + threshold + cooldown) with full CRUD: `POST/GET/PUT/DELETE /api/nl43/{unit_id}/alerts/rules[/{rule_id}]`.
- **Events + state machine** - onset/clear tracking via `GET /api/nl43/{unit_id}/alerts/events`; acknowledge with `POST .../events/{event_id}/ack`. A `cooldown_s` is enforced between onsets.
- **24/7 evaluation** - enabled rules pin the monitor on, so rules evaluate continuously even with no UI client connected.
- **Resilience** - editing or deleting a rule resets its state and closes any open event; device-offline events are raised when a monitored unit goes unreachable.
#### Data & History
- **Live-chart backfill** - a downsampled DOD trail is persisted to a new `nl43_readings` table, exposed via `GET /api/nl43/{unit_id}/history` so charts can backfill recent history on load.
- **LN1/LN2 percentiles** - L1/L10 (configurable percentiles) surfaced through SLMM in the status and live-feed payloads.
- **measurement_start_time** included in the cached `/status` response.
#### Device control
- **Per-device disconnect** - `POST /api/nl43/{unit_id}/disconnect` drops a device's pooled connection.
- **Deactivate / standby** - `POST /api/nl43/{unit_id}/deactivate` and global `POST /api/nl43/_system/standby` to quiesce polling/monitoring.
### Changed
- **DRD streaming reuses the pooled connection** rather than opening a separate socket, avoiding contention with the persistent pool on a single-connection device.
- **Connection pool** - idle-TTL / max-age checks can now be disabled; pool status is logged periodically.
### Fixed
- **Measurement-start confirmation** - `/start` now recognizes the device's `Start` state. It previously waited for `Measure`, which never matched, so the start cycle ran the full retry loop and Terra-View's proxy timed out with a misleading "Unknown error" even though the device had started.
- **Garbled reads** - corrupted measurement-state reads that produced phantom STOPPED/STARTED transitions are now ignored.
- **DOD parsing** - corrected field parsing and stopped spurious measurement-time resets.
- **Monitor WebSocket** - quieted a send-after-close race on client disconnect.
### Database
- **New tables** (auto-created on startup via `Base.metadata.create_all`): `alert_rules`, `alert_events`, `nl43_readings`.
- **Migrations for existing tables** (run once per database): `migrate_add_ln_percentiles.py` (LN1/LN2 on `nl43_status`), `migrate_add_monitor_enabled.py` (`monitor_enabled` on `nl43_config`).
### Notes
- Pairs with the matching Terra-View `dev` build, which reads SLMM's `/monitor` fan-out feed for live SLM dashboards (L1/L10 lines, live-chart backfill). Ship the two together.
---
## [0.3.0] - 2026-02-17
### Added
+81 -6
View File
@@ -1,6 +1,6 @@
# SLMM - Sound Level Meter Manager
**Version 0.3.0**
**Version 0.4.0**
Backend API service for controlling and monitoring Rion NL-43/NL-53 Sound Level Meters via TCP and FTP protocols.
@@ -12,6 +12,9 @@ SLMM is a standalone backend module that provides REST API routing and command t
## Features
- **Live Monitor (fan-out)**: One shared cached live feed per device — many clients subscribe to the same stream instead of fighting over the meter's single TCP connection
- **Alert Engine**: Per-device threshold rules with onset/clear events, cooldowns, acks, and 24/7 evaluation
- **History & Percentiles**: Downsampled DOD trail + history endpoint for live-chart backfill; LN1/LN2 (L1/L10) percentiles surfaced through the feed
- **Persistent TCP Connections**: Cached per-device connections with OS-level keepalive, tuned for cellular modem reliability
- **Background Polling**: Continuous automatic polling of devices with configurable intervals
- **Offline Detection**: Automatic device reachability tracking with failure counters
@@ -44,6 +47,30 @@ SLMM is a standalone backend module that provides REST API routing and command t
└──────────────┘
```
### Live Monitor — Fan-Out Feed (v0.4.0)
The NL-43 allows only one TCP control connection at a time, so multiple clients
polling the same device directly would contend for it. The monitor solves this
with a single shared, cached feed per device:
- **One reader, many subscribers**: a single poller reads the device; every
WebSocket subscriber (`WS /api/nl43/{unit_id}/monitor`) receives the same
frames — an instant first frame from cache, then live updates.
- **Persistent + auto-start**: a `monitor_enabled` flag keeps the feed running
and auto-starts it on boot. Enabled alert rules pin the monitor on for 24/7
evaluation even with no UI connected.
- **Adaptive & deduplicated**: poll rate adapts to demand, unreachable devices
back off, and the background poller skips units already covered by a monitor.
### Alert Engine (v0.4.0)
Per-device threshold alerting evaluated against the live feed:
- **Rules**: metric + threshold + `cooldown_s`, full CRUD per device
- **Events**: onset/clear state machine, acknowledgement, and a device-offline
alert when a monitored unit drops
- **Robust**: editing/deleting a rule resets its state and closes open events
### Persistent TCP Connection Pool (v0.3.0)
SLMM maintains persistent TCP connections to devices with OS-level keepalive, designed for reliable operation over cellular modems:
@@ -145,8 +172,32 @@ Logs are written to:
|--------|----------|-------------|
| GET | `/api/nl43/{unit_id}/status` | Get cached measurement snapshot (updated by background poller) |
| GET | `/api/nl43/{unit_id}/live` | Request fresh DOD data from device (bypasses cache) |
| GET | `/api/nl43/{unit_id}/history` | Downsampled DOD trail for live-chart backfill |
| WS | `/api/nl43/{unit_id}/stream` | WebSocket stream for real-time DRD data |
### Live Monitor (fan-out feed)
| Method | Endpoint | Description |
|--------|----------|-------------|
| WS | `/api/nl43/{unit_id}/monitor` | Subscribe to the shared cached live feed (instant first frame) |
| POST | `/api/nl43/{unit_id}/monitor/start` | Start the device's monitor feed |
| POST | `/api/nl43/{unit_id}/monitor/stop` | Stop the device's monitor feed |
| GET | `/api/nl43/_monitor/status` | Global monitor status across devices |
| POST | `/api/nl43/{unit_id}/disconnect` | Drop the device's pooled TCP connection |
| POST | `/api/nl43/{unit_id}/deactivate` | Quiesce polling/monitoring for one device |
| POST | `/api/nl43/_system/standby` | Global standby — quiesce all polling/monitoring |
### Alerts
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/api/nl43/{unit_id}/alerts/rules` | List alert rules for a device |
| POST | `/api/nl43/{unit_id}/alerts/rules` | Create an alert rule (metric, threshold, cooldown) |
| PUT | `/api/nl43/{unit_id}/alerts/rules/{rule_id}` | Update a rule (resets its state, closes open events) |
| DELETE | `/api/nl43/{unit_id}/alerts/rules/{rule_id}` | Delete a rule |
| GET | `/api/nl43/{unit_id}/alerts/events` | List alert events (onset/clear) |
| POST | `/api/nl43/{unit_id}/alerts/events/{event_id}/ack` | Acknowledge an event |
### Background Polling
| Method | Endpoint | Description |
@@ -273,11 +324,35 @@ Caches latest measurement snapshot:
- `sd_remaining_mb`: Free SD card space (MB)
- `sd_free_ratio`: SD card free space ratio
- `raw_payload`: Raw device response data
- `is_reachable`: Device reachability status (Boolean) ⭐ NEW
- `consecutive_failures`: Count of consecutive poll failures ⭐ NEW
- `last_poll_attempt`: Last time background poller attempted to poll ⭐ NEW
- `last_success`: Last successful poll timestamp ⭐ NEW
- `last_error`: Last error message (truncated to 500 chars) ⭐ NEW
- `is_reachable`: Device reachability status (Boolean)
- `consecutive_failures`: Count of consecutive poll failures
- `last_poll_attempt`: Last time background poller attempted to poll
- `last_success`: Last successful poll timestamp
- `last_error`: Last error message (truncated to 500 chars)
- `ln1` / `ln2`: LN1/LN2 (L1/L10) percentile levels ⭐ v0.4.0
### NL43Readings Table ⭐ v0.4.0
Downsampled DOD trail backing the live-chart history endpoint (one row/minute,
pruned to a retention window — viewing only, not the report source):
- `id` (PK), `unit_id`, `timestamp`
- `lp` / `leq` / `lmax` / `ln1` / `ln2`: cached level samples
### AlertRule Table ⭐ v0.4.0
Per-device threshold alert rules:
- `id` (PK), `unit_id`, `name`, `enabled`
- `metric`, `comparison` (above/below), `threshold_db`, `clear_margin_db` (hysteresis)
- `duration_s` (sustained), `cooldown_s` (min seconds between onsets)
- `channels` / `recipients`, optional `schedule_start`/`schedule_end`/`schedule_days`
### AlertEvent Table ⭐ v0.4.0
Alert onset/clear events for history, inbox, and acknowledgement:
- `id` (PK), `unit_id`, `rule_id`, `rule_name`, `metric`, `threshold_db`
- `onset_at` / `onset_value`, `peak_value`, `clear_at`, `status` (active/cleared)
- `acknowledged_at` / `acknowledged_by`, `notes`
> New tables (`alert_rules`, `alert_events`, `nl43_readings`) auto-create on
> startup. Existing-table columns ship with migrations:
> `migrate_add_ln_percentiles.py`, `migrate_add_monitor_enabled.py`.
## Protocol Details
+322
View File
@@ -0,0 +1,322 @@
"""
Threshold alert engine.
Each unit can have any number of AlertRules. A rule is evaluated against the
unit's live monitor snapshots via a small per-(unit, rule) state machine:
IDLE --(metric exceeds threshold for duration_s)--> ACTIVE (fire ONSET)
ACTIVE --(metric recovers past hysteresis for duration_s)--> IDLE (fire CLEAR)
duration_s debounces both edges; clear_margin_db adds hysteresis so a level
hovering at the threshold doesn't flap. Onset and clear are distinct events.
The state-machine logic (`_evaluate_step`) is intentionally pure — no DB, no
real clock — so it can be unit-tested with a synthetic level series and a fake
clock. The AlertEvaluator wraps it with rule loading, scheduling, persistence,
and dispatch. Dispatch is a server log for now (POC); the seam to POST events to
a Terra-View webhook (email/SMS) is _dispatch().
"""
import asyncio
import logging
import os
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Local timezone offset for schedule windows (same env var services.py uses).
_TZ_OFFSET_HOURS = float(os.getenv("TIMEZONE_OFFSET", "-5"))
# How long to cache a unit's rules before re-querying the DB (rules change rarely).
_RULE_CACHE_TTL_S = 15.0
@dataclass
class RuleState:
"""In-memory runtime state for one (unit, rule)."""
phase: str = "idle" # "idle" | "active"
edge_since: Optional[float] = None # when the current edge condition began (clock time)
peak: float = 0.0
event_id: Optional[int] = None # the open AlertEvent row (for the clear update)
last_onset: Optional[float] = None # time of the last onset (for cooldown)
def _exceeds(value: float, rule) -> bool:
if rule.comparison == "below":
return value < rule.threshold_db
return value > rule.threshold_db
def _recovered(value: float, rule) -> bool:
margin = rule.clear_margin_db or 0.0
if rule.comparison == "below":
return value > rule.threshold_db + margin
return value < rule.threshold_db - margin
def _evaluate_step(state: RuleState, value: float, now: float, rule) -> Optional[str]:
"""Advance the state machine by one reading.
Pure: mutates `state`, returns 'onset' | 'clear' | None. `now` is injected so
tests can drive a fake clock.
"""
duration = rule.duration_s or 0
if state.phase == "idle":
if _exceeds(value, rule):
if state.edge_since is None:
state.edge_since = now
if now - state.edge_since >= duration:
# Cooldown: suppress a new onset within cooldown_s of the last one
# (stops a repeatedly-breaching signal from flooding the history).
# Hold edge_since so it fires the moment cooldown lapses if still
# breaching — don't reset it here.
cooldown = getattr(rule, "cooldown_s", 0) or 0
if state.last_onset is not None and (now - state.last_onset) < cooldown:
return None
state.phase = "active"
state.edge_since = None
state.peak = value
state.last_onset = now
return "onset"
else:
state.edge_since = None
return None
# active
if rule.comparison == "below":
state.peak = min(state.peak, value)
else:
state.peak = max(state.peak, value)
if _recovered(value, rule):
if state.edge_since is None:
state.edge_since = now
if now - state.edge_since >= duration:
state.phase = "idle"
state.edge_since = None
return "clear"
else:
state.edge_since = None
return None
def _in_window(now_minutes: int, start: str, end: str) -> bool:
"""Is now_minutes (minutes since local midnight) within [start, end)?
Handles wraparound windows like 22:0007:00."""
def _m(s: str) -> int:
h, m = s.split(":")
return int(h) * 60 + int(m)
s, e = _m(start), _m(end)
if s == e:
return True
if s < e:
return s <= now_minutes < e
return now_minutes >= s or now_minutes < e # wraparound
class AlertEvaluator:
def __init__(self):
self._states: Dict[Tuple[str, int], RuleState] = {}
self._rule_cache: Dict[str, Tuple[float, list]] = {} # unit_id -> (fetched_at, rules)
self._offline_events: Dict[str, int] = {} # unit_id -> open connectivity AlertEvent id
logger.info("[ALERT] rule-based evaluator ready")
async def evaluate(self, unit_id: str, snap) -> None:
"""Evaluate every enabled rule for this unit against one snapshot."""
rules = self._get_rules(unit_id)
if not rules:
return
now = asyncio.get_running_loop().time()
for rule in rules:
if not self._in_schedule(rule):
continue
raw = getattr(snap, rule.metric, None)
try:
value = float(raw)
except (TypeError, ValueError):
continue # missing / non-numeric ("-.-")
state = self._states.setdefault((unit_id, rule.id), RuleState())
action = _evaluate_step(state, value, now, rule)
if action == "onset":
await self._on_onset(unit_id, rule, value, state)
elif action == "clear":
await self._on_clear(unit_id, rule, value, state)
# -- rule loading (cached) ----------------------------------------------
def _get_rules(self, unit_id: str) -> list:
loop_now = asyncio.get_running_loop().time()
cached = self._rule_cache.get(unit_id)
if cached and loop_now - cached[0] < _RULE_CACHE_TTL_S:
return cached[1]
rules = self._load_rules(unit_id)
self._rule_cache[unit_id] = (loop_now, rules)
return rules
def _load_rules(self, unit_id: str) -> list:
from app.database import SessionLocal
from app.models import AlertRule
db = SessionLocal()
try:
return db.query(AlertRule).filter_by(unit_id=unit_id, enabled=True).all()
except Exception as e:
logger.warning(f"[ALERT] failed to load rules for {unit_id}: {e}")
return []
finally:
db.close()
def invalidate(self, unit_id: Optional[str] = None) -> None:
"""Drop cached rules so a change is picked up immediately."""
if unit_id is None:
self._rule_cache.clear()
else:
self._rule_cache.pop(unit_id, None)
def forget_rule(self, unit_id: str, rule_id: int) -> None:
"""Drop a rule's per-(unit, rule) state machine after the rule is edited or
deleted, so a stale 'active' phase / open event_id from the old config
doesn't bleed into the new one (mis-firing a clear or suppressing an onset)."""
self._states.pop((unit_id, rule_id), None)
# -- scheduling ----------------------------------------------------------
def _in_schedule(self, rule) -> bool:
if not rule.schedule_start or not rule.schedule_end:
day_ok = self._day_ok(rule)
return day_ok
local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS)
if not self._day_ok(rule, local):
return False
return _in_window(local.hour * 60 + local.minute, rule.schedule_start, rule.schedule_end)
@staticmethod
def _day_ok(rule, local: Optional[datetime] = None) -> bool:
if not rule.schedule_days:
return True
if local is None:
local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS)
allowed = {int(d) for d in str(rule.schedule_days).split(",") if d.strip() != ""}
return local.weekday() in allowed # Mon=0
# -- event persistence + dispatch ---------------------------------------
async def _on_onset(self, unit_id: str, rule, value: float, state: RuleState) -> None:
from app.database import SessionLocal
from app.models import AlertEvent
db = SessionLocal()
try:
evt = AlertEvent(
rule_id=rule.id, unit_id=unit_id, rule_name=rule.name,
metric=rule.metric, threshold_db=rule.threshold_db,
onset_value=value, peak_value=value, status="active",
)
db.add(evt)
db.commit()
db.refresh(evt)
state.event_id = evt.id
except Exception as e:
logger.warning(f"[ALERT] failed to record onset for {unit_id}: {e}")
finally:
db.close()
await self._dispatch(
"ONSET", unit_id, rule,
f"{rule.metric.upper()}={value:.1f} dB "
f"{'<' if rule.comparison == 'below' else '>'} {rule.threshold_db:.1f} dB"
f"{f' for {rule.duration_s}s' if rule.duration_s else ''}",
)
async def _on_clear(self, unit_id: str, rule, value: float, state: RuleState) -> None:
peak = state.peak
from app.database import SessionLocal
from app.models import AlertEvent
db = SessionLocal()
try:
if state.event_id is not None:
evt = db.query(AlertEvent).filter_by(id=state.event_id).first()
if evt:
evt.clear_at = datetime.utcnow()
evt.peak_value = peak
evt.status = "cleared"
db.commit()
except Exception as e:
logger.warning(f"[ALERT] failed to record clear for {unit_id}: {e}")
finally:
db.close()
state.event_id = None
await self._dispatch(
"CLEAR", unit_id, rule,
f"recovered to {value:.1f} dB (peak {peak:.1f} dB)",
)
# -- connectivity (device offline/online) -------------------------------
#
# Raised by the live monitor when it loses / regains contact with a device.
# Persisted as an AlertEvent (sentinel rule_id=0, metric="connectivity") so it
# lands in the same events/inbox/ack pipeline as threshold alerts. The in-memory
# map dedupes; the DB query also dedupes across a process restart.
async def device_offline(self, unit_id: str) -> None:
if unit_id in self._offline_events:
return # already flagged offline
from app.database import SessionLocal
from app.models import AlertEvent
db = SessionLocal()
try:
existing = db.query(AlertEvent).filter_by(
unit_id=unit_id, metric="connectivity", status="active").first()
if existing: # already open in the DB (e.g. carried across a restart)
self._offline_events[unit_id] = existing.id
return
evt = AlertEvent(
rule_id=0, unit_id=unit_id, rule_name="Device unreachable",
metric="connectivity", threshold_db=0.0, status="active",
)
db.add(evt)
db.commit()
db.refresh(evt)
self._offline_events[unit_id] = evt.id
except Exception as e:
logger.warning(f"[ALERT] failed to record offline for {unit_id}: {e}")
finally:
db.close()
await self._dispatch_raw("OFFLINE", unit_id, "Device unreachable",
"live monitor lost contact with the device")
async def device_online(self, unit_id: str) -> None:
self._offline_events.pop(unit_id, None)
from app.database import SessionLocal
from app.models import AlertEvent
db = SessionLocal()
cleared = 0
try:
opened = db.query(AlertEvent).filter_by(
unit_id=unit_id, metric="connectivity", status="active").all()
for evt in opened:
evt.clear_at = datetime.utcnow()
evt.status = "cleared"
cleared += 1
if cleared:
db.commit()
except Exception as e:
logger.warning(f"[ALERT] failed to record online for {unit_id}: {e}")
finally:
db.close()
if cleared: # only announce recovery if it was actually flagged offline
await self._dispatch_raw("ONLINE", unit_id, "Device recovered",
"live monitor regained contact with the device")
# -- event persistence + dispatch ---------------------------------------
async def _dispatch(self, kind: str, unit_id: str, rule, detail: str) -> None:
await self._dispatch_raw(kind, unit_id, rule.name, detail)
async def _dispatch_raw(self, kind: str, unit_id: str, name: str, detail: str) -> None:
"""POC dispatch: server log. Swap in a Terra-View webhook (email/SMS) here."""
logger.warning(f"[ALERT:{kind}] {unit_id} '{name}': {detail}")
# Module-level singleton (the monitor calls alert_evaluator.evaluate per snapshot)
alert_evaluator = AlertEvaluator()
+49
View File
@@ -8,6 +8,7 @@ for fast API access without querying devices on every request.
import asyncio
import logging
import os
from datetime import datetime, timedelta
from typing import Optional
@@ -20,6 +21,11 @@ from app.device_logger import log_device_event, cleanup_old_logs
logger = logging.getLogger(__name__)
# Global polling default. Set SLMM_POLLING_ENABLED=false to start an instance in
# standby (running but not polling and not holding device connections) — e.g. a
# dev box that must not latch onto a device that a prod instance owns.
POLLING_ENABLED_DEFAULT = os.getenv("SLMM_POLLING_ENABLED", "true").lower() == "true"
class BackgroundPoller:
"""
@@ -39,6 +45,7 @@ class BackgroundPoller:
self._logger = logger
self._last_cleanup = None # Track last log cleanup time
self._last_pool_log = None # Track last connection pool heartbeat log
self._active = POLLING_ENABLED_DEFAULT # Global polling on/off (standby toggle)
async def start(self):
"""Start the background polling task."""
@@ -71,15 +78,48 @@ class BackgroundPoller:
self._logger.info("Background poller stopped")
def is_active(self) -> bool:
"""Whether background polling is currently active (vs standby)."""
return self._active
async def set_active(self, active: bool):
"""Globally enable/disable polling at runtime.
When deactivated, the loop stays alive but polls nothing and releases all
device connections, so this SLMM instance stops occupying the devices'
single connection slots (e.g. so a prod instance can take over). Runtime
state only — on restart the instance returns to SLMM_POLLING_ENABLED.
"""
self._active = active
if active:
self._logger.info("[SYSTEM] Background polling ACTIVATED")
else:
self._logger.info("[SYSTEM] Background polling DEACTIVATED (standby) — releasing connections")
await self._release_all_connections()
async def _release_all_connections(self):
"""Gracefully close every pooled device connection (no-op if none)."""
from app.services import _connection_pool
for device_key in list(_connection_pool.get_stats().get("connections", {})):
await _connection_pool.discard(device_key)
async def _poll_loop(self):
"""Main polling loop that runs continuously."""
self._logger.info("Background polling loop started")
while self._running:
if self._active:
try:
await self._poll_all_devices()
except Exception as e:
self._logger.error(f"Error in poll loop: {e}", exc_info=True)
else:
# Standby: poll nothing, and keep holding no device connection slots
# so another SLMM instance (e.g. prod) can talk to the devices.
try:
await self._release_all_connections()
except Exception as e:
self._logger.warning(f"Standby connection release failed: {e}")
# Run log cleanup once per hour
try:
@@ -138,10 +178,19 @@ class BackgroundPoller:
now = datetime.utcnow()
polled_count = 0
from app.monitor import monitor_manager
for cfg in configs:
if not self._running:
break
# Skip units with an active live monitor: it polls them at ~1Hz and
# keeps the status cache fresh, so a redundant background poll would just
# add load/lock-contention on the device's single connection.
if monitor_manager.is_active(cfg.unit_id):
self._logger.debug(f"Skipping {cfg.unit_id} — live monitor active")
continue
# Get current status
status = db.query(NL43Status).filter_by(unit_id=cfg.unit_id).first()
+20 -1
View File
@@ -38,6 +38,25 @@ async def lifespan(app: FastAPI):
await poller.start()
logger.info("Background poller started")
# Auto-start keepalive live monitors for units configured for 24/7 monitoring
# (monitor_enabled). This is what keeps alerting running unattended across
# restarts — without it a feed only runs while someone has the live view open.
try:
from app.monitor import monitor_manager
from app.database import SessionLocal
from app.models import NL43Config
db = SessionLocal()
try:
units = db.query(NL43Config).filter_by(monitor_enabled=True, tcp_enabled=True).all()
for cfg in units:
m = await monitor_manager.get(cfg.unit_id)
await m.set_keepalive(True)
logger.info(f"Auto-started keepalive monitor for {cfg.unit_id}")
finally:
db.close()
except Exception as e:
logger.error(f"Failed to auto-start monitors: {e}")
yield # Application runs
# Shutdown
@@ -52,7 +71,7 @@ async def lifespan(app: FastAPI):
app = FastAPI(
title="SLMM NL43 Addon",
description="Standalone module for NL43 configuration and status APIs with background polling",
version="0.3.0",
version="0.4.0",
lifespan=lifespan,
)
+78 -1
View File
@@ -1,4 +1,4 @@
from sqlalchemy import Column, String, DateTime, Boolean, Integer, Text, func
from sqlalchemy import Column, String, DateTime, Boolean, Integer, Float, Text, func
from app.database import Base
@@ -23,6 +23,10 @@ class NL43Config(Base):
poll_interval_seconds = Column(Integer, nullable=True, default=60) # Polling interval (10-3600 seconds)
poll_enabled = Column(Boolean, default=True) # Enable/disable background polling for this device
# Live monitor (fan-out DOD feed). Keepalive runs it 24/7 even with no viewer,
# which is what makes alerting continuous. On by default; toggleable from the UI.
monitor_enabled = Column(Boolean, default=True)
class NL43Status(Base):
"""
@@ -41,6 +45,8 @@ class NL43Status(Base):
lmax = Column(String, nullable=True) # Maximum level
lmin = Column(String, nullable=True) # Minimum level
lpeak = Column(String, nullable=True) # Peak level
ln1 = Column(String, nullable=True) # Percentile slot LN1 (configurable; device default L5, contract L1)
ln2 = Column(String, nullable=True) # Percentile slot LN2 (configurable; device default L10)
battery_level = Column(String, nullable=True)
power_source = Column(String, nullable=True)
sd_remaining_mb = Column(String, nullable=True)
@@ -72,3 +78,74 @@ class DeviceLog(Base):
level = Column(String, default="INFO") # DEBUG, INFO, WARNING, ERROR
category = Column(String, default="GENERAL") # TCP, FTP, POLL, COMMAND, STATE, SYNC
message = Column(Text, nullable=False)
class AlertRule(Base):
"""A threshold-alert rule evaluated against a unit's live monitor feed.
Source-agnostic: today it runs over the DOD monitor; the same rule transfers
unchanged if a unit's feed is later sourced from FTP intervals.
"""
__tablename__ = "alert_rules"
id = Column(Integer, primary_key=True, autoincrement=True)
unit_id = Column(String, index=True, nullable=False)
name = Column(String, nullable=False, default="Alert")
metric = Column(String, nullable=False, default="lp") # lp/leq/lmax/lmin/lpeak/ln1/ln2
comparison = Column(String, nullable=False, default="above") # above | below
threshold_db = Column(Float, nullable=False)
duration_s = Column(Integer, nullable=False, default=0) # sustained seconds (0 = instant)
clear_margin_db = Column(Float, nullable=False, default=2.0) # hysteresis band
cooldown_s = Column(Integer, nullable=False, default=300) # min seconds between onsets
# Optional time-of-day scoping (local time). schedule_start/end as "HH:MM";
# null = always active. schedule_days = CSV of 0-6 (Mon=0); null = every day.
schedule_start = Column(String, nullable=True)
schedule_end = Column(String, nullable=True)
schedule_days = Column(String, nullable=True)
channels = Column(String, nullable=False, default="log") # CSV: log,email,sms
recipients = Column(Text, nullable=True) # CSV of emails/phones
enabled = Column(Boolean, default=True)
created_at = Column(DateTime, default=func.now())
class AlertEvent(Base):
"""A fired alert (onset → clear), for history / inbox / acknowledgement."""
__tablename__ = "alert_events"
id = Column(Integer, primary_key=True, autoincrement=True)
rule_id = Column(Integer, index=True, nullable=False)
unit_id = Column(String, index=True, nullable=False)
rule_name = Column(String, nullable=True)
metric = Column(String, nullable=False)
threshold_db = Column(Float, nullable=False)
onset_at = Column(DateTime, default=func.now(), index=True)
onset_value = Column(Float, nullable=True)
peak_value = Column(Float, nullable=True)
clear_at = Column(DateTime, nullable=True)
status = Column(String, default="active") # active | cleared
acknowledged_at = Column(DateTime, nullable=True)
acknowledged_by = Column(String, nullable=True)
notes = Column(Text, nullable=True)
class NL43Reading(Base):
"""Downsampled time-series of live-monitor readings, for the live-chart
backfill (so a viewer sees recent trend on open, not a blank chart).
Viewing only — NOT the report source. Reports use the device's authoritative
FTP .rnd intervals. This is a short, capped trail (one row/minute, pruned to
a retention window) fed by the monitor's keepalive poll loop.
"""
__tablename__ = "nl43_readings"
id = Column(Integer, primary_key=True, autoincrement=True)
unit_id = Column(String, index=True, nullable=False)
timestamp = Column(DateTime, default=func.now(), index=True)
lp = Column(String, nullable=True)
leq = Column(String, nullable=True)
lmax = Column(String, nullable=True)
ln1 = Column(String, nullable=True)
ln2 = Column(String, nullable=True)
+322
View File
@@ -0,0 +1,322 @@
"""
Per-device live monitor (fan-out hub).
ONE DOD poll loop per device, broadcast to many subscribers:
- browser WebSocket clients (live view) — they no longer each open their own
device stream, so the NL43's single-connection limit stops causing the
"second viewer sees nothing" contention.
- the alert evaluator (threshold alerts), which can keep a device's feed running
even with no browser attached.
- persistence (each snapshot is written to NL43Status, like the poller does).
The device's one TCP connection is respected: every poll goes through the same
per-device lock + connection pool in services.py, so the monitor, the background
poller, and on-demand commands all serialize safely.
"""
import asyncio
import logging
import os
from datetime import datetime
from typing import Dict, Optional, Set
from app.database import SessionLocal
from app.models import NL43Config, NL43Status
from app.services import NL43Client, persist_snapshot
from app.alerts import alert_evaluator
logger = logging.getLogger(__name__)
# Extra idle between DOD polls WHEN A BROWSER IS WATCHING. The 1s device rate-limit
# already paces consecutive DOD? commands, so this just needs to be small — the
# rate-limit is the real floor (~1.25s/poll effective).
MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "0.25"))
# Idle cadence when NO browser is subscribed and the feed is only kept alive for
# alerting. Same data, ~8x fewer polls -> ~8x less cellular traffic on a metered
# SIM (~1 GB/device/month at full rate -> ~125 MB). NOTE: this also sets the alert
# sampling resolution when nobody is watching, so keep it <= the smallest alert
# duration_s you rely on (default 10s comfortably catches a "sustained 30/60s" rule).
MONITOR_IDLE_POLL_INTERVAL = float(os.getenv("MONITOR_IDLE_POLL_INTERVAL", "10"))
# Exponential backoff once the device is unreachable, so a powered-off / asleep /
# out-of-signal device stops churning reconnects every cycle (log spam + a trickle
# of wasted cellular data on failed SYNs). delay = min(BASE * 2**(fails-1), MAX),
# reset to full-rate on the first good poll. While a browser is actively watching we
# cap the backoff lower (WATCHED_MAX) so a recovery surfaces quickly for the viewer.
MONITOR_BACKOFF_BASE_S = float(os.getenv("MONITOR_BACKOFF_BASE_S", "1"))
MONITOR_BACKOFF_MAX_S = float(os.getenv("MONITOR_BACKOFF_MAX_S", "60"))
MONITOR_BACKOFF_WATCHED_MAX_S = float(os.getenv("MONITOR_BACKOFF_WATCHED_MAX_S", "5"))
# How often to refresh the run state (Measure?). It changes rarely, so we cache it
# and skip that second rate-limited command on most polls — roughly halving the
# per-update latency (~2.5s -> ~1.3s).
MONITOR_STATE_REFRESH_S = float(os.getenv("MONITOR_STATE_REFRESH_S", "30"))
# Downsampled trail for the live-chart backfill: store one reading per
# TRAIL_SAMPLE_S and keep TRAIL_RETENTION_HOURS of it (pruned). Viewing only —
# reports use the device's FTP .rnd data, not this.
TRAIL_SAMPLE_S = float(os.getenv("MONITOR_TRAIL_SAMPLE_S", "60"))
TRAIL_RETENTION_HOURS = float(os.getenv("MONITOR_TRAIL_RETENTION_HOURS", "24"))
# If nothing has been broadcast in this many seconds (e.g. device offline and
# silent), send a keepalive frame so reverse proxies don't drop the idle WS.
MONITOR_HEARTBEAT_S = float(os.getenv("MONITOR_HEARTBEAT_S", "25"))
def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict:
"""Build the broadcast payload — same shape as the DRD stream, but DOD-sourced
so it carries ln1/ln2 (which DRD cannot)."""
return {
"unit_id": unit_id,
"timestamp": datetime.utcnow().isoformat(),
"measurement_state": snap.measurement_state,
"measurement_start_time": measurement_start_time,
"counter": snap.counter,
"lp": snap.lp,
"leq": snap.leq,
"lmax": snap.lmax,
"lmin": snap.lmin,
"lpeak": snap.lpeak,
"ln1": snap.ln1,
"ln2": snap.ln2,
"raw_payload": snap.raw_payload,
}
class DeviceMonitor:
"""Owns a single DOD poll loop for one device and fans each snapshot out to
all subscribers. Runs while it has at least one browser subscriber OR the
server-side keep-alive (alerting) flag is set."""
def __init__(self, unit_id: str):
self.unit_id = unit_id
self._subscribers: Set[asyncio.Queue] = set()
self._keepalive = False
self._task: Optional[asyncio.Task] = None
self._lock = asyncio.Lock()
self._last_payload: Optional[dict] = None # replayed to new subscribers
self._consec_fail = 0
self._reachable = True # last broadcast reachability (for transition frames)
self._cached_state: Optional[str] = None # run state, refreshed periodically
self._last_state_refresh = 0.0
self._last_trail_store = 0.0 # downsample throttle for the backfill trail
@property
def running(self) -> bool:
return self._task is not None and not self._task.done()
def subscriber_count(self) -> int:
return len(self._subscribers)
def _has_demand(self) -> bool:
return bool(self._subscribers) or self._keepalive
def _ensure_task(self) -> None:
if self._task is None or self._task.done():
self._task = asyncio.create_task(self._run())
async def subscribe(self) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=5)
async with self._lock:
self._subscribers.add(q)
# Replay the last frame so a client connecting mid-stream sees data
# (or the current 'unreachable' state) immediately, not after a poll.
if self._last_payload is not None:
try:
q.put_nowait(self._last_payload)
except asyncio.QueueFull:
pass
self._ensure_task()
return q
async def unsubscribe(self, q: asyncio.Queue) -> None:
async with self._lock:
self._subscribers.discard(q)
async def set_keepalive(self, on: bool) -> None:
async with self._lock:
self._keepalive = on
if on:
self._ensure_task()
async def _run(self) -> None:
logger.info(f"[MONITOR] {self.unit_id}: feed started")
loop = asyncio.get_running_loop()
last_send = loop.time()
try:
while self._has_demand():
snap, mst = await self._poll_once()
if snap is not None:
if not self._reachable:
# Recovered from an outage — clear the connectivity alert.
try:
await alert_evaluator.device_online(self.unit_id)
except Exception as e:
logger.warning(f"[MONITOR] {self.unit_id}: online alert failed: {e}")
self._consec_fail = 0
self._reachable = True
payload = _snapshot_payload(snap, self.unit_id, mst)
payload["feed_status"] = "ok"
self._broadcast(payload)
last_send = loop.time()
try:
await alert_evaluator.evaluate(self.unit_id, snap)
except Exception as e:
logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}")
else:
# Tell clients the device went offline — once, on transition, after a
# few failures so a momentary blip doesn't flap the UI. Same edge
# raises the device-offline alert.
self._consec_fail += 1
if self._reachable and self._consec_fail >= 3:
self._reachable = False
self._broadcast({
"unit_id": self.unit_id,
"timestamp": datetime.utcnow().isoformat(),
"feed_status": "unreachable",
})
last_send = loop.time()
try:
await alert_evaluator.device_offline(self.unit_id)
except Exception as e:
logger.warning(f"[MONITOR] {self.unit_id}: offline alert failed: {e}")
# Heartbeat: during quiet/offline stretches, send a keepalive so an
# idle WS isn't dropped by a reverse proxy. Not cached (new subscribers
# should still get the last real frame, not a heartbeat).
if loop.time() - last_send >= MONITOR_HEARTBEAT_S:
self._broadcast({
"unit_id": self.unit_id,
"timestamp": datetime.utcnow().isoformat(),
"feed_status": "ok" if self._reachable else "unreachable",
"heartbeat": True,
}, cache=False)
last_send = loop.time()
await asyncio.sleep(self._next_delay())
finally:
logger.info(f"[MONITOR] {self.unit_id}: feed stopped")
def _next_delay(self) -> float:
"""Inter-poll delay: exponential backoff while unreachable, full-rate while a
browser is watching, relaxed cadence when the feed is keepalive-only."""
if self._consec_fail > 0:
shift = min(self._consec_fail - 1, 6) # cap growth at 2**6 = 64x base
delay = min(MONITOR_BACKOFF_BASE_S * (2 ** shift), MONITOR_BACKOFF_MAX_S)
if self._subscribers:
delay = min(delay, MONITOR_BACKOFF_WATCHED_MAX_S)
return delay
if self._subscribers:
return MONITOR_POLL_INTERVAL # a browser is watching — smooth chart
return MONITOR_IDLE_POLL_INTERVAL # keepalive-only (alerting) — save data
async def _poll_once(self):
"""One DOD poll: read, persist, return (snapshot, measurement_start_iso)."""
db = SessionLocal()
try:
cfg = db.query(NL43Config).filter_by(unit_id=self.unit_id).first()
if not cfg or not cfg.tcp_enabled:
return None, None
client = NL43Client(
cfg.host, cfg.tcp_port,
ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password,
ftp_port=cfg.ftp_port or 21,
)
# Refresh the run state only every MONITOR_STATE_REFRESH_S; reuse the
# cached state otherwise so most polls send just DOD? (one rate-limited
# command) instead of DOD? + Measure?.
now = asyncio.get_running_loop().time()
refresh_state = (self._cached_state is None
or now - self._last_state_refresh >= MONITOR_STATE_REFRESH_S)
snap = await client.request_dod(
measurement_state=None if refresh_state else self._cached_state
)
if refresh_state:
self._cached_state = snap.measurement_state
self._last_state_refresh = now
snap.unit_id = self.unit_id
persist_snapshot(snap, db)
db.commit()
# Append to the downsampled backfill trail (~one row per TRAIL_SAMPLE_S).
if now - self._last_trail_store >= TRAIL_SAMPLE_S:
self._last_trail_store = now
self._store_trail(snap, db)
status = db.query(NL43Status).filter_by(unit_id=self.unit_id).first()
mst = (status.measurement_start_time.isoformat()
if status and status.measurement_start_time else None)
return snap, mst
except Exception as e:
logger.warning(f"[MONITOR] {self.unit_id}: poll failed: {e}")
return None, None
finally:
db.close()
def _store_trail(self, snap, db) -> None:
"""Append one downsampled reading to the backfill trail and prune old rows."""
from datetime import datetime, timedelta
from app.models import NL43Reading
try:
db.add(NL43Reading(
unit_id=self.unit_id, timestamp=datetime.utcnow(),
lp=snap.lp, leq=snap.leq, lmax=snap.lmax, ln1=snap.ln1, ln2=snap.ln2,
))
cutoff = datetime.utcnow() - timedelta(hours=TRAIL_RETENTION_HOURS)
db.query(NL43Reading).filter(
NL43Reading.unit_id == self.unit_id,
NL43Reading.timestamp < cutoff,
).delete()
db.commit()
except Exception as e:
logger.warning(f"[MONITOR] {self.unit_id}: trail store failed: {e}")
def _broadcast(self, payload: dict, cache: bool = True) -> None:
if cache:
self._last_payload = payload # replayed to new subscribers
for q in list(self._subscribers):
try:
q.put_nowait(payload)
except asyncio.QueueFull:
# Slow consumer — drop this frame rather than stall the whole feed.
pass
class MonitorManager:
"""Registry of per-device monitors (one per unit_id)."""
def __init__(self):
self._monitors: Dict[str, DeviceMonitor] = {}
self._lock = asyncio.Lock()
async def get(self, unit_id: str) -> DeviceMonitor:
async with self._lock:
m = self._monitors.get(unit_id)
if m is None:
m = DeviceMonitor(unit_id)
self._monitors[unit_id] = m
return m
def is_active(self, unit_id: str) -> bool:
"""True if this unit has a running monitor feed (so the background poller
can skip it — the monitor already polls it more often)."""
m = self._monitors.get(unit_id)
return m is not None and m.running
def status(self) -> dict:
return {
uid: {
"running": m.running,
"subscribers": m.subscriber_count(),
"keepalive": m._keepalive,
"reachable": m._reachable,
# what cadence the loop is currently using, for observability
"mode": ("backoff" if m._consec_fail > 0
else "watched" if m._subscribers
else "idle"),
}
for uid, m in self._monitors.items()
}
# Module-level singleton
monitor_manager = MonitorManager()
+401 -2
View File
@@ -11,7 +11,7 @@ import os
import asyncio
from app.database import get_db
from app.models import NL43Config, NL43Status
from app.models import NL43Config, NL43Status, AlertRule, AlertEvent, NL43Reading
from app.services import NL43Client, persist_snapshot
logger = logging.getLogger(__name__)
@@ -121,6 +121,392 @@ async def flush_connection_pool():
return {"status": "ok", "message": "All cached connections closed"}
@router.post("/{unit_id}/disconnect")
async def disconnect_device(unit_id: str, db: Session = Depends(get_db)):
"""Cleanly close SLMM's persistent TCP connection to a single device.
Gracefully closes (TCP FIN + wait_closed) the pooled connection for this
device and removes it from the pool, freeing the NL43's single connection
slot. Idempotent a no-op if no connection is currently cached.
Note: this releases the *idle* pooled connection. It does not interrupt an
in-progress DRD stream or an in-flight command (those have the socket
checked out of the pool) close the stream WebSocket to end a live stream.
"""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if not cfg:
raise HTTPException(status_code=404, detail="NL43 config not found")
from app.services import _connection_pool
device_key = f"{cfg.host}:{cfg.tcp_port}"
had_conn = device_key in _connection_pool.get_stats().get("connections", {})
await _connection_pool.discard(device_key)
return {
"status": "ok",
"unit_id": unit_id,
"device_key": device_key,
"disconnected": had_conn,
"message": "Connection closed" if had_conn else "No cached connection to close",
}
@router.post("/{unit_id}/deactivate")
async def deactivate_device(unit_id: str, db: Session = Depends(get_db)):
"""Make a single unit dormant: stop background polling for it AND drop its
connection, freeing the device's connection slot. poll_enabled=False is
persisted, so the unit stays dormant across restarts until /activate.
"""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if not cfg:
raise HTTPException(status_code=404, detail="NL43 config not found")
cfg.poll_enabled = False
db.commit()
from app.services import _connection_pool, _get_device_lock
device_key = f"{cfg.host}:{cfg.tcp_port}"
# Wait briefly for any in-flight poll/command to finish (so its connection is
# back in the pool), then drop it. If a long-lived stream holds the lock we
# don't block forever — discard the pooled connection regardless.
lock = await _get_device_lock(device_key)
acquired = False
try:
await asyncio.wait_for(lock.acquire(), timeout=10.0)
acquired = True
except asyncio.TimeoutError:
acquired = False
try:
await _connection_pool.discard(device_key)
finally:
if acquired:
lock.release()
return {
"status": "ok",
"unit_id": unit_id,
"poll_enabled": False,
"message": "Polling disabled and connection closed for this unit",
}
@router.post("/{unit_id}/activate")
async def activate_device(unit_id: str, db: Session = Depends(get_db)):
"""Resume background polling for a unit previously deactivated."""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if not cfg:
raise HTTPException(status_code=404, detail="NL43 config not found")
cfg.poll_enabled = True
db.commit()
return {
"status": "ok",
"unit_id": unit_id,
"poll_enabled": True,
"message": "Polling enabled for this unit",
}
@router.get("/_system/status")
async def system_status():
"""Report whether this SLMM instance is actively polling or in standby."""
from app.background_poller import poller
from app.services import _connection_pool
return {
"status": "ok",
"mode": "active" if poller.is_active() else "standby",
"polling_active": poller.is_active(),
"active_connections": _connection_pool.get_stats().get("active_connections", 0),
}
@router.post("/_system/standby")
async def system_standby():
"""Put this SLMM instance into standby: stop polling ALL devices and release
every connection, so it stops occupying device slots (e.g. so a prod instance
can take over). Runtime-only on restart the instance returns to its
SLMM_POLLING_ENABLED default.
"""
from app.background_poller import poller
await poller.set_active(False)
return {"status": "ok", "mode": "standby",
"message": "Polling stopped and all device connections released"}
@router.post("/_system/resume")
async def system_resume():
"""Resume polling after standby (global)."""
from app.background_poller import poller
await poller.set_active(True)
return {"status": "ok", "mode": "active", "message": "Polling resumed"}
# ============================================================================
# LIVE MONITOR (fan-out) — one DOD feed per device, broadcast to many clients
# ============================================================================
@router.websocket("/{unit_id}/monitor")
async def monitor_stream(websocket: WebSocket, unit_id: str):
"""Subscribe a browser to the device's shared 1 Hz DOD feed.
Any number of clients can attach without each opening its own device
connection (one poll loop per device, fanned out). Same JSON shape as the
DRD stream, but DOD-sourced so it includes ln1/ln2 (L1/L10).
"""
await websocket.accept()
from app.monitor import monitor_manager
monitor = await monitor_manager.get(unit_id)
queue = await monitor.subscribe()
logger.info(f"Monitor subscriber attached for {unit_id} ({monitor.subscriber_count()} total)")
async def _watch_disconnect():
# Completes when the client disconnects, so an idle feed (no data) still
# detects the drop and we don't leak a subscription that keeps the device
# feed (and its connection) alive.
try:
while True:
msg = await websocket.receive()
if msg.get("type") == "websocket.disconnect":
return
except Exception:
return
gone = asyncio.ensure_future(_watch_disconnect())
try:
while not gone.done():
try:
payload = await asyncio.wait_for(queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue # re-check gone.done()
if gone.done():
break # client disconnected while we waited — don't send into a closing socket
await websocket.send_json(payload)
except WebSocketDisconnect:
logger.info(f"Monitor subscriber disconnected for {unit_id}")
except Exception as e:
# A frame that races the close (client vanished mid-send) surfaces as
# "Unexpected ASGI message 'websocket.send' after ... websocket.close".
# That's expected on disconnect (the portal closes the socket on every tab
# switch), not an error — log it quietly.
msg = str(e)
if "after sending" in msg or "websocket.close" in msg or "response already completed" in msg:
logger.debug(f"Monitor stream for {unit_id} closed mid-send (client gone)")
else:
logger.warning(f"Monitor stream error for {unit_id}: {e}")
finally:
gone.cancel()
await monitor.unsubscribe(queue)
@router.post("/{unit_id}/monitor/start")
async def monitor_start(unit_id: str, db: Session = Depends(get_db)):
"""Enable 24/7 keepalive monitoring: persist monitor_enabled and start the feed
now, so alerting evaluates continuously even with no viewer. Survives restarts
(auto-started on boot from the persisted flag)."""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if cfg:
cfg.monitor_enabled = True
db.commit()
from app.monitor import monitor_manager
monitor = await monitor_manager.get(unit_id)
await monitor.set_keepalive(True)
return {"status": "ok", "unit_id": unit_id, "monitor_enabled": True, "running": monitor.running}
@router.post("/{unit_id}/monitor/stop")
async def monitor_stop(unit_id: str, db: Session = Depends(get_db)):
"""Disable keepalive monitoring: persist monitor_enabled=False and drop the
keepalive (the feed stops once no browser subscribers remain)."""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if cfg:
cfg.monitor_enabled = False
db.commit()
from app.monitor import monitor_manager
monitor = await monitor_manager.get(unit_id)
await monitor.set_keepalive(False)
return {"status": "ok", "unit_id": unit_id, "monitor_enabled": False}
@router.get("/_monitor/status")
async def monitor_status():
"""Status of every device monitor (running, subscriber count, keep-alive)."""
from app.monitor import monitor_manager
return {"status": "ok", "monitors": monitor_manager.status()}
@router.get("/{unit_id}/history")
def get_monitor_history(unit_id: str, hours: float = 2.0, db: Session = Depends(get_db)):
"""Recent downsampled monitor readings (the DOD trail) for the live-chart
backfill. Viewing only NOT the FTP report data."""
from datetime import timedelta
hours = max(0.1, min(hours, 48.0))
cutoff = datetime.utcnow() - timedelta(hours=hours)
rows = (db.query(NL43Reading)
.filter(NL43Reading.unit_id == unit_id, NL43Reading.timestamp >= cutoff)
.order_by(NL43Reading.timestamp.asc()).all())
return {
"status": "ok",
"unit_id": unit_id,
"hours": hours,
"count": len(rows),
"readings": [
{
"timestamp": r.timestamp.isoformat() if r.timestamp else None,
"lp": r.lp, "leq": r.leq, "lmax": r.lmax, "ln1": r.ln1, "ln2": r.ln2,
}
for r in rows
],
}
# ============================================================================
# ALERTS — threshold rules + fired events
# ============================================================================
class AlertRulePayload(BaseModel):
name: str = "Alert"
metric: str = "lp" # lp/leq/lmax/lmin/lpeak/ln1/ln2
comparison: str = "above" # above | below
threshold_db: float
duration_s: int = 0 # sustained seconds before firing (0 = instant)
clear_margin_db: float = 2.0 # hysteresis band
cooldown_s: int = 300
schedule_start: str | None = None # "HH:MM" local; null = always
schedule_end: str | None = None
schedule_days: str | None = None # CSV of 0-6 (Mon=0); null = every day
channels: str = "log"
recipients: str | None = None
enabled: bool = True
def _rule_dict(r: AlertRule) -> dict:
return {
"id": r.id, "unit_id": r.unit_id, "name": r.name, "metric": r.metric,
"comparison": r.comparison, "threshold_db": r.threshold_db,
"duration_s": r.duration_s, "clear_margin_db": r.clear_margin_db,
"cooldown_s": r.cooldown_s, "schedule_start": r.schedule_start,
"schedule_end": r.schedule_end, "schedule_days": r.schedule_days,
"channels": r.channels, "recipients": r.recipients, "enabled": r.enabled,
}
def _event_dict(e: AlertEvent) -> dict:
return {
"id": e.id, "rule_id": e.rule_id, "unit_id": e.unit_id,
"rule_name": e.rule_name, "metric": e.metric, "threshold_db": e.threshold_db,
"onset_at": e.onset_at.isoformat() if e.onset_at else None,
"onset_value": e.onset_value, "peak_value": e.peak_value,
"clear_at": e.clear_at.isoformat() if e.clear_at else None,
"status": e.status,
"acknowledged_at": e.acknowledged_at.isoformat() if e.acknowledged_at else None,
"acknowledged_by": e.acknowledged_by,
}
async def _sync_keepalive_to_rules(unit_id: str, db: Session):
"""Keep a unit's monitor running while it has enabled alert rules, so the
evaluator runs 24/7 even with no browser watching. Turns keepalive ON (and
persists monitor_enabled so it survives a restart via the boot auto-start)
when enabled rules exist; never turns it OFF a device may be kept alive for
other reasons, so operators control that on /admin/slmm."""
has_enabled = (db.query(AlertRule)
.filter_by(unit_id=unit_id, enabled=True).first() is not None)
if not has_enabled:
return
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if cfg and not cfg.monitor_enabled:
cfg.monitor_enabled = True
db.commit()
from app.monitor import monitor_manager
m = await monitor_manager.get(unit_id)
await m.set_keepalive(True)
def _reset_rule_runtime(unit_id: str, rule_id: int, db: Session):
"""After a rule edit/delete: drop its evaluator state machine and close any open
event, so a stale 'active' phase doesn't mis-evaluate against the new config and
the client portal doesn't stay 'in alarm' on a rule that changed or is gone."""
from app.alerts import alert_evaluator
alert_evaluator.forget_rule(unit_id, rule_id)
now = datetime.utcnow()
for evt in db.query(AlertEvent).filter_by(unit_id=unit_id, rule_id=rule_id, status="active").all():
evt.clear_at = now
evt.status = "cleared"
db.commit()
@router.post("/{unit_id}/alerts/rules")
async def create_alert_rule(unit_id: str, payload: AlertRulePayload, db: Session = Depends(get_db)):
rule = AlertRule(unit_id=unit_id, **payload.model_dump())
db.add(rule)
db.commit()
db.refresh(rule)
from app.alerts import alert_evaluator
alert_evaluator.invalidate(unit_id)
await _sync_keepalive_to_rules(unit_id, db)
return {"status": "ok", "rule": _rule_dict(rule)}
@router.get("/{unit_id}/alerts/rules")
def list_alert_rules(unit_id: str, db: Session = Depends(get_db)):
rules = db.query(AlertRule).filter_by(unit_id=unit_id).all()
return {"status": "ok", "rules": [_rule_dict(r) for r in rules]}
@router.put("/{unit_id}/alerts/rules/{rule_id}")
async def update_alert_rule(unit_id: str, rule_id: int, payload: AlertRulePayload, db: Session = Depends(get_db)):
rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first()
if not rule:
raise HTTPException(status_code=404, detail="Alert rule not found")
for field, value in payload.model_dump().items():
setattr(rule, field, value)
db.commit()
db.refresh(rule)
from app.alerts import alert_evaluator
alert_evaluator.invalidate(unit_id)
_reset_rule_runtime(unit_id, rule_id, db)
await _sync_keepalive_to_rules(unit_id, db)
return {"status": "ok", "rule": _rule_dict(rule)}
@router.delete("/{unit_id}/alerts/rules/{rule_id}")
async def delete_alert_rule(unit_id: str, rule_id: int, db: Session = Depends(get_db)):
rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first()
if not rule:
raise HTTPException(status_code=404, detail="Alert rule not found")
db.delete(rule)
db.commit()
from app.alerts import alert_evaluator
alert_evaluator.invalidate(unit_id)
_reset_rule_runtime(unit_id, rule_id, db) # close its open event so the portal doesn't stay red
await _sync_keepalive_to_rules(unit_id, db) # no-op if no enabled rules remain
return {"status": "ok", "deleted": rule_id}
@router.get("/{unit_id}/alerts/events")
def list_alert_events(unit_id: str, limit: int = 50, db: Session = Depends(get_db)):
events = (db.query(AlertEvent).filter_by(unit_id=unit_id)
.order_by(AlertEvent.onset_at.desc()).limit(limit).all())
return {"status": "ok", "events": [_event_dict(e) for e in events]}
@router.post("/{unit_id}/alerts/events/{event_id}/ack")
def ack_alert_event(unit_id: str, event_id: int, by: str | None = None, db: Session = Depends(get_db)):
evt = db.query(AlertEvent).filter_by(id=event_id, unit_id=unit_id).first()
if not evt:
raise HTTPException(status_code=404, detail="Alert event not found")
evt.acknowledged_at = datetime.utcnow()
evt.acknowledged_by = by
db.commit()
return {"status": "ok", "acknowledged": event_id}
# ============================================================================
# GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes)
# ============================================================================
@@ -197,6 +583,7 @@ def get_roster(db: Session = Depends(get_db)):
"web_enabled": cfg.web_enabled,
"poll_enabled": cfg.poll_enabled,
"poll_interval_seconds": cfg.poll_interval_seconds,
"monitor_enabled": cfg.monitor_enabled,
"status": None
}
@@ -445,11 +832,14 @@ def get_status(unit_id: str, db: Session = Depends(get_db)):
"unit_id": unit_id,
"last_seen": status.last_seen.isoformat() if status.last_seen else None,
"measurement_state": status.measurement_state,
"measurement_start_time": status.measurement_start_time.isoformat() if status.measurement_start_time else None,
"lp": status.lp,
"leq": status.leq,
"lmax": status.lmax,
"lmin": status.lmin,
"lpeak": status.lpeak,
"ln1": status.ln1,
"ln2": status.ln2,
"battery_level": status.battery_level,
"power_source": status.power_source,
"sd_remaining_mb": status.sd_remaining_mb,
@@ -472,6 +862,8 @@ class StatusPayload(BaseModel):
lmax: str | None = None
lmin: str | None = None
lpeak: str | None = None
ln1: str | None = None
ln2: str | None = None
battery_level: str | None = None
power_source: str | None = None
sd_remaining_mb: str | None = None
@@ -499,11 +891,14 @@ def upsert_status(unit_id: str, payload: StatusPayload, db: Session = Depends(ge
"unit_id": unit_id,
"last_seen": status.last_seen.isoformat(),
"measurement_state": status.measurement_state,
"measurement_start_time": status.measurement_start_time.isoformat() if status.measurement_start_time else None,
"lp": status.lp,
"leq": status.leq,
"lmax": status.lmax,
"lmin": status.lmin,
"lpeak": status.lpeak,
"ln1": status.ln1,
"ln2": status.ln2,
"battery_level": status.battery_level,
"power_source": status.power_source,
"sd_remaining_mb": status.sd_remaining_mb,
@@ -544,7 +939,7 @@ async def start_measurement(unit_id: str, db: Session = Depends(get_db)):
db.expire_all()
status = db.query(NL43Status).filter_by(unit_id=unit_id).first()
logger.info(f"State check: measurement_state={status.measurement_state if status else 'None'}, start_time={status.measurement_start_time if status else 'None'}")
if status and status.measurement_state == "Measure" and status.measurement_start_time:
if status and status.measurement_state in ("Start", "Measure") and status.measurement_start_time:
logger.info(f"✓ Measurement state confirmed for {unit_id} with start time {status.measurement_start_time}")
break
@@ -1205,6 +1600,8 @@ async def stream_live(websocket: WebSocket, unit_id: str):
"lmax": snap.lmax, # Maximum level
"lmin": snap.lmin, # Minimum level
"lpeak": snap.lpeak, # Peak level
"ln1": snap.ln1, # LN1 percentile (L1/L10 contract); null on DRD stream
"ln2": snap.ln2, # LN2 percentile; null on DRD stream
"raw_payload": snap.raw_payload,
})
except Exception as e:
@@ -1876,6 +2273,8 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)):
"lmax": status.lmax,
"lmin": status.lmin,
"lpeak": status.lpeak,
"ln1": status.ln1,
"ln2": status.ln2,
"battery_level": status.battery_level,
"power_source": status.power_source,
"sd_remaining_mb": status.sd_remaining_mb,
+53 -17
View File
@@ -46,6 +46,8 @@ class NL43Snapshot:
lmax: Optional[str] = None # Maximum level
lmin: Optional[str] = None # Minimum level
lpeak: Optional[str] = None # Peak level
ln1: Optional[str] = None # Percentile slot LN1 (configurable; device default L5, contract L1)
ln2: Optional[str] = None # Percentile slot LN2 (configurable; device default L10)
battery_level: Optional[str] = None
power_source: Optional[str] = None
sd_remaining_mb: Optional[str] = None
@@ -69,10 +71,27 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
logger.info(f"State transition check for {s.unit_id}: '{previous_state}' -> '{new_state}'")
# Device returns "Start" when measuring, "Stop" when stopped
# Normalize to previous behavior for backward compatibility
is_measuring = new_state == "Start"
was_measuring = previous_state == "Start"
# The device reports "Start" while measuring; the DOD path uses that string,
# but the DRD stream path tags snapshots "Measure" (and the DOD fallback also
# uses "Measure"). Treat ALL of these as "measuring" — otherwise opening and
# closing the live stream flips state "Start"->"Measure"->"Start", which the
# old equality check misread as stop-then-start and reset measurement_start_time.
#
# Also: only act on RECOGNIZED states. A buffer desync on the shared connection
# (e.g. right after a DRD/DOD test) can make a Measure? read return a stray,
# garbled value; treating that as "not measuring" produced constant phantom
# "STOPPED -> STARTED" log pairs and reset the timer. Ignore unknown reads.
MEASURING_STATES = {"Start", "Measure"}
STOPPED_STATES = {"Stop"}
was_measuring = previous_state in MEASURING_STATES
if new_state in MEASURING_STATES:
is_measuring = True
elif new_state in STOPPED_STATES:
is_measuring = False
else:
logger.warning(f"Ignoring unrecognized measurement state for {s.unit_id}: {new_state!r}")
is_measuring = was_measuring # garbled/unknown read — no transition
if not was_measuring and is_measuring:
# Measurement just started - record the start time
@@ -95,6 +114,9 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
except Exception:
pass
# Only persist a recognized state so one garbled read can't poison the next
# transition check (which would manufacture the phantom STOPPED/STARTED pair).
if new_state in MEASURING_STATES or new_state in STOPPED_STATES:
row.measurement_state = new_state
row.counter = s.counter
row.lp = s.lp
@@ -102,6 +124,8 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
row.lmax = s.lmax
row.lmin = s.lmin
row.lpeak = s.lpeak
row.ln1 = s.ln1
row.ln2 = s.ln2
row.battery_level = s.battery_level
row.power_source = s.power_source
row.sd_remaining_mb = s.sd_remaining_mb
@@ -656,10 +680,12 @@ class NL43Client:
else:
raise ValueError(f"Unknown result code: {result_code}")
async def request_dod(self) -> NL43Snapshot:
async def request_dod(self, measurement_state: Optional[str] = None) -> NL43Snapshot:
"""Request DOD (Data Output Display) snapshot from device.
Returns parsed measurement data from the device display.
Returns parsed measurement data from the device display. Pass
measurement_state to reuse a cached run state and skip the extra Measure?
round-trip (the state changes rarely); leave it None to query it.
"""
# _send_command now handles result code validation and returns the data line
resp = await self._send_command("DOD?\r\n")
@@ -682,7 +708,10 @@ class NL43Client:
logger.info(f"Parsed {len(parts)} data points from DOD response")
# Query actual measurement state (DOD doesn't include this information)
# DOD doesn't include the run state. Query it only when not supplied by the
# caller — the monitor passes a cached state most cycles and refreshes it
# occasionally, avoiding a second rate-limited command on every poll.
if measurement_state is None:
try:
measurement_state = await self.get_measurement_state()
except Exception as e:
@@ -691,22 +720,29 @@ class NL43Client:
snap = NL43Snapshot(unit_id="", raw_payload=resp, measurement_state=measurement_state)
# Parse known positions (based on NL43 communication guide - DRD format)
# DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ...
# Parse DOD positional fields. DOD's layout is DIFFERENT from DRD: it has NO
# leading counter and it includes LE plus LN1LN5. The device returns 4 channels
# of 16 fields each — [Lp, Leq, LE, Lmax, Lmin, LN1, LN2, LN3, LN4, LN5, Lpeak,
# LIeq, Leq_mov, Ltm5, over, under] — and channel 1 (parts[0:16]) is the main
# display. The previous code reused the DRD map (treating parts[0] as a counter),
# which shifted everything: Lp was reported as the counter, Leq as Lp, LE as Leq,
# and LN1 as Lpeak (you could spot it because "Lpeak" came out < Lmax).
try:
# Capture d0 (counter) for timer synchronization
if len(parts) >= 1:
snap.counter = parts[0] # d0: Measurement interval counter (1-600)
snap.lp = parts[0] # Lp: instantaneous sound pressure level
if len(parts) >= 2:
snap.lp = parts[1] # d1: Instantaneous sound pressure level
if len(parts) >= 3:
snap.leq = parts[2] # d2: Equivalent continuous sound level
snap.leq = parts[1] # Leq: equivalent continuous level
# parts[2] = LE (sound exposure level) — not currently surfaced
if len(parts) >= 4:
snap.lmax = parts[3] # d3: Maximum level
snap.lmax = parts[3] # Lmax
if len(parts) >= 5:
snap.lmin = parts[4] # d4: Minimum level
snap.lmin = parts[4] # Lmin
if len(parts) >= 11:
snap.lpeak = parts[10] # Lpeak (parts[5] is LN1, NOT Lpeak)
if len(parts) >= 6:
snap.lpeak = parts[5] # d5: Peak level
snap.ln1 = parts[5] # LN1 percentile slot (device default L5; contract L1)
if len(parts) >= 7:
snap.ln2 = parts[6] # LN2 percentile slot (device default L10)
except (IndexError, ValueError) as e:
logger.warning(f"Error parsing DOD data points: {e}")
+58
View File
@@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""
Migration script to add ln1 and ln2 percentile columns to the nl43_status table.
The NL-43 DOD response carries percentile slots LN1-LN5; the live SLM display
(Terra-View) shows two of them (default L1/L10). This adds storage for the two
surfaced slots. Run once per database to update existing schema.
"""
import sqlite3
import sys
from pathlib import Path
DB_PATH = Path(__file__).parent / "data" / "slmm.db"
def migrate():
"""Add ln1 and ln2 columns to the nl43_status table."""
if not DB_PATH.exists():
print(f"Database not found at {DB_PATH}")
print("No migration needed - database will be created with new schema")
return
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
cursor.execute("PRAGMA table_info(nl43_status)")
columns = [row[1] for row in cursor.fetchall()]
if "ln1" in columns and "ln2" in columns:
print("✓ ln1/ln2 columns already exist, no migration needed")
return
if "ln1" not in columns:
print("Adding ln1 column...")
cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln1 TEXT")
print("✓ Added ln1 column")
if "ln2" not in columns:
print("Adding ln2 column...")
cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln2 TEXT")
print("✓ Added ln2 column")
conn.commit()
print("\n✓ Migration completed successfully!")
except Exception as e:
conn.rollback()
print(f"✗ Migration failed: {e}", file=sys.stderr)
sys.exit(1)
finally:
conn.close()
if __name__ == "__main__":
migrate()
+48
View File
@@ -0,0 +1,48 @@
#!/usr/bin/env python3
"""
Migration: add monitor_enabled column to nl43_config.
Controls whether the live fan-out DOD monitor is kept alive 24/7 for a unit
(which is what makes alerting continuous). Defaults to enabled. Run once per DB.
"""
import sqlite3
import sys
from pathlib import Path
DB_PATH = Path(__file__).parent / "data" / "slmm.db"
def migrate():
if not DB_PATH.exists():
print(f"Database not found at {DB_PATH}")
print("No migration needed - database will be created with new schema")
return
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
cursor.execute("PRAGMA table_info(nl43_config)")
columns = [row[1] for row in cursor.fetchall()]
if "monitor_enabled" in columns:
print("✓ monitor_enabled column already exists, no migration needed")
return
print("Adding monitor_enabled column (default enabled)...")
# SQLite stores booleans as 0/1; default 1 = enabled.
cursor.execute("ALTER TABLE nl43_config ADD COLUMN monitor_enabled BOOLEAN DEFAULT 1")
conn.commit()
print("✓ Added monitor_enabled column")
print("\n✓ Migration completed successfully!")
except Exception as e:
conn.rollback()
print(f"✗ Migration failed: {e}", file=sys.stderr)
sys.exit(1)
finally:
conn.close()
if __name__ == "__main__":
migrate()
+68
View File
@@ -0,0 +1,68 @@
"""
Synthetic unit test for the alert state machine no DB, no device.
Drives `_evaluate_step` with a fake clock + a level series and checks that
onset/clear fire with the right debounce + hysteresis. Run:
docker compose exec -T slmm python3 test_alert_evaluator.py
# or, if app.alerts imports cleanly standalone: python3 test_alert_evaluator.py
"""
from types import SimpleNamespace
from app.alerts import RuleState, _evaluate_step
def rule(**kw):
base = dict(threshold_db=85.0, duration_s=3, clear_margin_db=2.0, comparison="above")
base.update(kw)
return SimpleNamespace(**base)
def run(series, r):
st = RuleState()
events = [(now, a) for value, now in series
if (a := _evaluate_step(st, value, now, r))]
return events, st
def main():
failures = 0
def check(label, cond, detail=""):
nonlocal failures
print(("PASS" if cond else "FAIL"), label, detail)
if not cond:
failures += 1
# 1) sustained exceedance -> onset after duration; recovery -> clear after duration
r = rule(threshold_db=85, duration_s=3, clear_margin_db=2)
ev, _ = run([(80, 0), (86, 1), (87, 2), (88, 3), (88, 4),
(88, 5), (82, 6), (82, 7), (82, 8), (82, 9)], r)
onsets = [t for t, a in ev if a == "onset"]
clears = [t for t, a in ev if a == "clear"]
check("1 sustained onset@4 / clear@9", onsets == [4] and clears == [9], str(ev))
# 2) brief spike under duration -> no onset (debounce)
ev, _ = run([(80, 0), (90, 1), (90, 2), (80, 3), (80, 4)], rule(duration_s=3))
check("2 brief spike debounced", ev == [], str(ev))
# 3) hysteresis: a dip into the margin (below threshold, above threshold-margin)
# does NOT clear
r = rule(threshold_db=85, duration_s=0, clear_margin_db=3)
ev, st = run([(86, 0), (84, 1), (84, 2), (84, 3)], r)
check("3 hysteresis holds ACTIVE", ev == [(0, "onset")] and st.phase == "active",
f"{ev} phase={st.phase}")
# 4) 'below' comparison (device too quiet) -> onset when value < threshold
ev, _ = run([(30, 0), (15, 1)], rule(threshold_db=20, duration_s=0,
clear_margin_db=2, comparison="below"))
check("4 below-comparison onset@1", ev == [(1, "onset")], str(ev))
print()
print("ALL PASS" if failures == 0 else f"{failures} FAILURE(S)")
return failures
if __name__ == "__main__":
import sys
sys.exit(1 if main() else 0)