Compare commits
27 Commits
fefa9eace8
...
v0.4.0
| Author | SHA1 | Date | |
|---|---|---|---|
| 89b6892656 | |||
| 43b8e53d2d | |||
| 6d1c426ee4 | |||
| ad6071b790 | |||
| cfdeada9d6 | |||
| b51fefca2b | |||
| 5bc542e92f | |||
| 1f5f1fb1f6 | |||
| b4cea2f287 | |||
| d1d694302c | |||
| 43e72ae3c3 | |||
| 9d34779171 | |||
| 87c06f1519 | |||
| ba622c67d8 | |||
| 6b1ec75396 | |||
| 9c43e68534 | |||
| aa3e088b64 | |||
| 8c17af4849 | |||
| b954eb8c89 | |||
| 0793e7df01 | |||
| 51dd6b682d | |||
| a7983d2958 | |||
| d6dd2e736b | |||
| af86cf713e | |||
| e3f9ca7f5b | |||
| 450509d210 | |||
| ad1a40e0aa |
@@ -5,6 +5,52 @@ All notable changes to SLMM (Sound Level Meter Manager) will be documented in th
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [0.4.0] - 2026-06-22
|
||||
|
||||
### Added
|
||||
|
||||
#### Live Monitor (fan-out feed)
|
||||
- **Per-device fan-out monitor** - one shared, cached live feed per device. Multiple clients (dashboards, portal, charts) subscribe to the same stream instead of each fighting for the NL-43's single TCP connection: one poller reads the device, all subscribers get the same frames.
|
||||
- **WebSocket monitor** - `WS /api/nl43/{unit_id}/monitor` delivers an instant first frame from cache, then live updates.
|
||||
- **Monitor control** - `POST /api/nl43/{unit_id}/monitor/{start|stop}`, `GET /api/nl43/_monitor/status`. A persistent `monitor_enabled` flag auto-starts the keepalive on boot.
|
||||
- **Adaptive polling** - poll rate adapts to demand; unreachable devices back off; a device-offline alert fires when a monitored unit drops.
|
||||
- **De-duplication** - the background poller skips units already covered by an active monitor (no double-polling); a heartbeat keeps the feed warm.
|
||||
- **Lower latency** - the monitor caches run state, roughly halving live-feed latency; fan-out emits an instant first frame + offline status to new clients.
|
||||
|
||||
#### Alert Engine
|
||||
- **Threshold rules** - per-device alert rules (metric + threshold + cooldown) with full CRUD: `POST/GET/PUT/DELETE /api/nl43/{unit_id}/alerts/rules[/{rule_id}]`.
|
||||
- **Events + state machine** - onset/clear tracking via `GET /api/nl43/{unit_id}/alerts/events`; acknowledge with `POST .../events/{event_id}/ack`. A `cooldown_s` is enforced between onsets.
|
||||
- **24/7 evaluation** - enabled rules pin the monitor on, so rules evaluate continuously even with no UI client connected.
|
||||
- **Resilience** - editing or deleting a rule resets its state and closes any open event; device-offline events are raised when a monitored unit goes unreachable.
|
||||
|
||||
#### Data & History
|
||||
- **Live-chart backfill** - a downsampled DOD trail is persisted to a new `nl43_readings` table, exposed via `GET /api/nl43/{unit_id}/history` so charts can backfill recent history on load.
|
||||
- **LN1/LN2 percentiles** - L1/L10 (configurable percentiles) surfaced through SLMM in the status and live-feed payloads.
|
||||
- **measurement_start_time** included in the cached `/status` response.
|
||||
|
||||
#### Device control
|
||||
- **Per-device disconnect** - `POST /api/nl43/{unit_id}/disconnect` drops a device's pooled connection.
|
||||
- **Deactivate / standby** - `POST /api/nl43/{unit_id}/deactivate` and global `POST /api/nl43/_system/standby` to quiesce polling/monitoring.
|
||||
|
||||
### Changed
|
||||
- **DRD streaming reuses the pooled connection** rather than opening a separate socket, avoiding contention with the persistent pool on a single-connection device.
|
||||
- **Connection pool** - idle-TTL / max-age checks can now be disabled; pool status is logged periodically.
|
||||
|
||||
### Fixed
|
||||
- **Measurement-start confirmation** - `/start` now recognizes the device's `Start` state. It previously waited for `Measure`, which never matched, so the start cycle ran the full retry loop and Terra-View's proxy timed out with a misleading "Unknown error" even though the device had started.
|
||||
- **Garbled reads** - corrupted measurement-state reads that produced phantom STOPPED/STARTED transitions are now ignored.
|
||||
- **DOD parsing** - corrected field parsing and stopped spurious measurement-time resets.
|
||||
- **Monitor WebSocket** - quieted a send-after-close race on client disconnect.
|
||||
|
||||
### Database
|
||||
- **New tables** (auto-created on startup via `Base.metadata.create_all`): `alert_rules`, `alert_events`, `nl43_readings`.
|
||||
- **Migrations for existing tables** (run once per database): `migrate_add_ln_percentiles.py` (LN1/LN2 on `nl43_status`), `migrate_add_monitor_enabled.py` (`monitor_enabled` on `nl43_config`).
|
||||
|
||||
### Notes
|
||||
- Pairs with the matching Terra-View `dev` build, which reads SLMM's `/monitor` fan-out feed for live SLM dashboards (L1/L10 lines, live-chart backfill). Ship the two together.
|
||||
|
||||
---
|
||||
|
||||
## [0.3.0] - 2026-02-17
|
||||
|
||||
### Added
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# SLMM - Sound Level Meter Manager
|
||||
|
||||
**Version 0.3.0**
|
||||
**Version 0.4.0**
|
||||
|
||||
Backend API service for controlling and monitoring Rion NL-43/NL-53 Sound Level Meters via TCP and FTP protocols.
|
||||
|
||||
@@ -12,6 +12,9 @@ SLMM is a standalone backend module that provides REST API routing and command t
|
||||
|
||||
## Features
|
||||
|
||||
- **Live Monitor (fan-out)**: One shared cached live feed per device — many clients subscribe to the same stream instead of fighting over the meter's single TCP connection
|
||||
- **Alert Engine**: Per-device threshold rules with onset/clear events, cooldowns, acks, and 24/7 evaluation
|
||||
- **History & Percentiles**: Downsampled DOD trail + history endpoint for live-chart backfill; LN1/LN2 (L1/L10) percentiles surfaced through the feed
|
||||
- **Persistent TCP Connections**: Cached per-device connections with OS-level keepalive, tuned for cellular modem reliability
|
||||
- **Background Polling**: Continuous automatic polling of devices with configurable intervals
|
||||
- **Offline Detection**: Automatic device reachability tracking with failure counters
|
||||
@@ -44,6 +47,30 @@ SLMM is a standalone backend module that provides REST API routing and command t
|
||||
└──────────────┘
|
||||
```
|
||||
|
||||
### Live Monitor — Fan-Out Feed (v0.4.0)
|
||||
|
||||
The NL-43 allows only one TCP control connection at a time, so multiple clients
|
||||
polling the same device directly would contend for it. The monitor solves this
|
||||
with a single shared, cached feed per device:
|
||||
|
||||
- **One reader, many subscribers**: a single poller reads the device; every
|
||||
WebSocket subscriber (`WS /api/nl43/{unit_id}/monitor`) receives the same
|
||||
frames — an instant first frame from cache, then live updates.
|
||||
- **Persistent + auto-start**: a `monitor_enabled` flag keeps the feed running
|
||||
and auto-starts it on boot. Enabled alert rules pin the monitor on for 24/7
|
||||
evaluation even with no UI connected.
|
||||
- **Adaptive & deduplicated**: poll rate adapts to demand, unreachable devices
|
||||
back off, and the background poller skips units already covered by a monitor.
|
||||
|
||||
### Alert Engine (v0.4.0)
|
||||
|
||||
Per-device threshold alerting evaluated against the live feed:
|
||||
|
||||
- **Rules**: metric + threshold + `cooldown_s`, full CRUD per device
|
||||
- **Events**: onset/clear state machine, acknowledgement, and a device-offline
|
||||
alert when a monitored unit drops
|
||||
- **Robust**: editing/deleting a rule resets its state and closes open events
|
||||
|
||||
### Persistent TCP Connection Pool (v0.3.0)
|
||||
|
||||
SLMM maintains persistent TCP connections to devices with OS-level keepalive, designed for reliable operation over cellular modems:
|
||||
@@ -145,8 +172,32 @@ Logs are written to:
|
||||
|--------|----------|-------------|
|
||||
| GET | `/api/nl43/{unit_id}/status` | Get cached measurement snapshot (updated by background poller) |
|
||||
| GET | `/api/nl43/{unit_id}/live` | Request fresh DOD data from device (bypasses cache) |
|
||||
| GET | `/api/nl43/{unit_id}/history` | Downsampled DOD trail for live-chart backfill |
|
||||
| WS | `/api/nl43/{unit_id}/stream` | WebSocket stream for real-time DRD data |
|
||||
|
||||
### Live Monitor (fan-out feed)
|
||||
|
||||
| Method | Endpoint | Description |
|
||||
|--------|----------|-------------|
|
||||
| WS | `/api/nl43/{unit_id}/monitor` | Subscribe to the shared cached live feed (instant first frame) |
|
||||
| POST | `/api/nl43/{unit_id}/monitor/start` | Start the device's monitor feed |
|
||||
| POST | `/api/nl43/{unit_id}/monitor/stop` | Stop the device's monitor feed |
|
||||
| GET | `/api/nl43/_monitor/status` | Global monitor status across devices |
|
||||
| POST | `/api/nl43/{unit_id}/disconnect` | Drop the device's pooled TCP connection |
|
||||
| POST | `/api/nl43/{unit_id}/deactivate` | Quiesce polling/monitoring for one device |
|
||||
| POST | `/api/nl43/_system/standby` | Global standby — quiesce all polling/monitoring |
|
||||
|
||||
### Alerts
|
||||
|
||||
| Method | Endpoint | Description |
|
||||
|--------|----------|-------------|
|
||||
| GET | `/api/nl43/{unit_id}/alerts/rules` | List alert rules for a device |
|
||||
| POST | `/api/nl43/{unit_id}/alerts/rules` | Create an alert rule (metric, threshold, cooldown) |
|
||||
| PUT | `/api/nl43/{unit_id}/alerts/rules/{rule_id}` | Update a rule (resets its state, closes open events) |
|
||||
| DELETE | `/api/nl43/{unit_id}/alerts/rules/{rule_id}` | Delete a rule |
|
||||
| GET | `/api/nl43/{unit_id}/alerts/events` | List alert events (onset/clear) |
|
||||
| POST | `/api/nl43/{unit_id}/alerts/events/{event_id}/ack` | Acknowledge an event |
|
||||
|
||||
### Background Polling
|
||||
|
||||
| Method | Endpoint | Description |
|
||||
@@ -273,11 +324,35 @@ Caches latest measurement snapshot:
|
||||
- `sd_remaining_mb`: Free SD card space (MB)
|
||||
- `sd_free_ratio`: SD card free space ratio
|
||||
- `raw_payload`: Raw device response data
|
||||
- `is_reachable`: Device reachability status (Boolean) ⭐ NEW
|
||||
- `consecutive_failures`: Count of consecutive poll failures ⭐ NEW
|
||||
- `last_poll_attempt`: Last time background poller attempted to poll ⭐ NEW
|
||||
- `last_success`: Last successful poll timestamp ⭐ NEW
|
||||
- `last_error`: Last error message (truncated to 500 chars) ⭐ NEW
|
||||
- `is_reachable`: Device reachability status (Boolean)
|
||||
- `consecutive_failures`: Count of consecutive poll failures
|
||||
- `last_poll_attempt`: Last time background poller attempted to poll
|
||||
- `last_success`: Last successful poll timestamp
|
||||
- `last_error`: Last error message (truncated to 500 chars)
|
||||
- `ln1` / `ln2`: LN1/LN2 (L1/L10) percentile levels ⭐ v0.4.0
|
||||
|
||||
### NL43Readings Table ⭐ v0.4.0
|
||||
Downsampled DOD trail backing the live-chart history endpoint (one row/minute,
|
||||
pruned to a retention window — viewing only, not the report source):
|
||||
- `id` (PK), `unit_id`, `timestamp`
|
||||
- `lp` / `leq` / `lmax` / `ln1` / `ln2`: cached level samples
|
||||
|
||||
### AlertRule Table ⭐ v0.4.0
|
||||
Per-device threshold alert rules:
|
||||
- `id` (PK), `unit_id`, `name`, `enabled`
|
||||
- `metric`, `comparison` (above/below), `threshold_db`, `clear_margin_db` (hysteresis)
|
||||
- `duration_s` (sustained), `cooldown_s` (min seconds between onsets)
|
||||
- `channels` / `recipients`, optional `schedule_start`/`schedule_end`/`schedule_days`
|
||||
|
||||
### AlertEvent Table ⭐ v0.4.0
|
||||
Alert onset/clear events for history, inbox, and acknowledgement:
|
||||
- `id` (PK), `unit_id`, `rule_id`, `rule_name`, `metric`, `threshold_db`
|
||||
- `onset_at` / `onset_value`, `peak_value`, `clear_at`, `status` (active/cleared)
|
||||
- `acknowledged_at` / `acknowledged_by`, `notes`
|
||||
|
||||
> New tables (`alert_rules`, `alert_events`, `nl43_readings`) auto-create on
|
||||
> startup. Existing-table columns ship with migrations:
|
||||
> `migrate_add_ln_percentiles.py`, `migrate_add_monitor_enabled.py`.
|
||||
|
||||
## Protocol Details
|
||||
|
||||
|
||||
+322
@@ -0,0 +1,322 @@
|
||||
"""
|
||||
Threshold alert engine.
|
||||
|
||||
Each unit can have any number of AlertRules. A rule is evaluated against the
|
||||
unit's live monitor snapshots via a small per-(unit, rule) state machine:
|
||||
|
||||
IDLE --(metric exceeds threshold for duration_s)--> ACTIVE (fire ONSET)
|
||||
ACTIVE --(metric recovers past hysteresis for duration_s)--> IDLE (fire CLEAR)
|
||||
|
||||
duration_s debounces both edges; clear_margin_db adds hysteresis so a level
|
||||
hovering at the threshold doesn't flap. Onset and clear are distinct events.
|
||||
|
||||
The state-machine logic (`_evaluate_step`) is intentionally pure — no DB, no
|
||||
real clock — so it can be unit-tested with a synthetic level series and a fake
|
||||
clock. The AlertEvaluator wraps it with rule loading, scheduling, persistence,
|
||||
and dispatch. Dispatch is a server log for now (POC); the seam to POST events to
|
||||
a Terra-View webhook (email/SMS) is _dispatch().
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Local timezone offset for schedule windows (same env var services.py uses).
|
||||
_TZ_OFFSET_HOURS = float(os.getenv("TIMEZONE_OFFSET", "-5"))
|
||||
|
||||
# How long to cache a unit's rules before re-querying the DB (rules change rarely).
|
||||
_RULE_CACHE_TTL_S = 15.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class RuleState:
|
||||
"""In-memory runtime state for one (unit, rule)."""
|
||||
phase: str = "idle" # "idle" | "active"
|
||||
edge_since: Optional[float] = None # when the current edge condition began (clock time)
|
||||
peak: float = 0.0
|
||||
event_id: Optional[int] = None # the open AlertEvent row (for the clear update)
|
||||
last_onset: Optional[float] = None # time of the last onset (for cooldown)
|
||||
|
||||
|
||||
def _exceeds(value: float, rule) -> bool:
|
||||
if rule.comparison == "below":
|
||||
return value < rule.threshold_db
|
||||
return value > rule.threshold_db
|
||||
|
||||
|
||||
def _recovered(value: float, rule) -> bool:
|
||||
margin = rule.clear_margin_db or 0.0
|
||||
if rule.comparison == "below":
|
||||
return value > rule.threshold_db + margin
|
||||
return value < rule.threshold_db - margin
|
||||
|
||||
|
||||
def _evaluate_step(state: RuleState, value: float, now: float, rule) -> Optional[str]:
|
||||
"""Advance the state machine by one reading.
|
||||
|
||||
Pure: mutates `state`, returns 'onset' | 'clear' | None. `now` is injected so
|
||||
tests can drive a fake clock.
|
||||
"""
|
||||
duration = rule.duration_s or 0
|
||||
|
||||
if state.phase == "idle":
|
||||
if _exceeds(value, rule):
|
||||
if state.edge_since is None:
|
||||
state.edge_since = now
|
||||
if now - state.edge_since >= duration:
|
||||
# Cooldown: suppress a new onset within cooldown_s of the last one
|
||||
# (stops a repeatedly-breaching signal from flooding the history).
|
||||
# Hold edge_since so it fires the moment cooldown lapses if still
|
||||
# breaching — don't reset it here.
|
||||
cooldown = getattr(rule, "cooldown_s", 0) or 0
|
||||
if state.last_onset is not None and (now - state.last_onset) < cooldown:
|
||||
return None
|
||||
state.phase = "active"
|
||||
state.edge_since = None
|
||||
state.peak = value
|
||||
state.last_onset = now
|
||||
return "onset"
|
||||
else:
|
||||
state.edge_since = None
|
||||
return None
|
||||
|
||||
# active
|
||||
if rule.comparison == "below":
|
||||
state.peak = min(state.peak, value)
|
||||
else:
|
||||
state.peak = max(state.peak, value)
|
||||
|
||||
if _recovered(value, rule):
|
||||
if state.edge_since is None:
|
||||
state.edge_since = now
|
||||
if now - state.edge_since >= duration:
|
||||
state.phase = "idle"
|
||||
state.edge_since = None
|
||||
return "clear"
|
||||
else:
|
||||
state.edge_since = None
|
||||
return None
|
||||
|
||||
|
||||
def _in_window(now_minutes: int, start: str, end: str) -> bool:
|
||||
"""Is now_minutes (minutes since local midnight) within [start, end)?
|
||||
Handles wraparound windows like 22:00–07:00."""
|
||||
def _m(s: str) -> int:
|
||||
h, m = s.split(":")
|
||||
return int(h) * 60 + int(m)
|
||||
s, e = _m(start), _m(end)
|
||||
if s == e:
|
||||
return True
|
||||
if s < e:
|
||||
return s <= now_minutes < e
|
||||
return now_minutes >= s or now_minutes < e # wraparound
|
||||
|
||||
|
||||
class AlertEvaluator:
|
||||
def __init__(self):
|
||||
self._states: Dict[Tuple[str, int], RuleState] = {}
|
||||
self._rule_cache: Dict[str, Tuple[float, list]] = {} # unit_id -> (fetched_at, rules)
|
||||
self._offline_events: Dict[str, int] = {} # unit_id -> open connectivity AlertEvent id
|
||||
logger.info("[ALERT] rule-based evaluator ready")
|
||||
|
||||
async def evaluate(self, unit_id: str, snap) -> None:
|
||||
"""Evaluate every enabled rule for this unit against one snapshot."""
|
||||
rules = self._get_rules(unit_id)
|
||||
if not rules:
|
||||
return
|
||||
now = asyncio.get_running_loop().time()
|
||||
for rule in rules:
|
||||
if not self._in_schedule(rule):
|
||||
continue
|
||||
raw = getattr(snap, rule.metric, None)
|
||||
try:
|
||||
value = float(raw)
|
||||
except (TypeError, ValueError):
|
||||
continue # missing / non-numeric ("-.-")
|
||||
state = self._states.setdefault((unit_id, rule.id), RuleState())
|
||||
action = _evaluate_step(state, value, now, rule)
|
||||
if action == "onset":
|
||||
await self._on_onset(unit_id, rule, value, state)
|
||||
elif action == "clear":
|
||||
await self._on_clear(unit_id, rule, value, state)
|
||||
|
||||
# -- rule loading (cached) ----------------------------------------------
|
||||
|
||||
def _get_rules(self, unit_id: str) -> list:
|
||||
loop_now = asyncio.get_running_loop().time()
|
||||
cached = self._rule_cache.get(unit_id)
|
||||
if cached and loop_now - cached[0] < _RULE_CACHE_TTL_S:
|
||||
return cached[1]
|
||||
rules = self._load_rules(unit_id)
|
||||
self._rule_cache[unit_id] = (loop_now, rules)
|
||||
return rules
|
||||
|
||||
def _load_rules(self, unit_id: str) -> list:
|
||||
from app.database import SessionLocal
|
||||
from app.models import AlertRule
|
||||
db = SessionLocal()
|
||||
try:
|
||||
return db.query(AlertRule).filter_by(unit_id=unit_id, enabled=True).all()
|
||||
except Exception as e:
|
||||
logger.warning(f"[ALERT] failed to load rules for {unit_id}: {e}")
|
||||
return []
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
def invalidate(self, unit_id: Optional[str] = None) -> None:
|
||||
"""Drop cached rules so a change is picked up immediately."""
|
||||
if unit_id is None:
|
||||
self._rule_cache.clear()
|
||||
else:
|
||||
self._rule_cache.pop(unit_id, None)
|
||||
|
||||
def forget_rule(self, unit_id: str, rule_id: int) -> None:
|
||||
"""Drop a rule's per-(unit, rule) state machine after the rule is edited or
|
||||
deleted, so a stale 'active' phase / open event_id from the old config
|
||||
doesn't bleed into the new one (mis-firing a clear or suppressing an onset)."""
|
||||
self._states.pop((unit_id, rule_id), None)
|
||||
|
||||
# -- scheduling ----------------------------------------------------------
|
||||
|
||||
def _in_schedule(self, rule) -> bool:
|
||||
if not rule.schedule_start or not rule.schedule_end:
|
||||
day_ok = self._day_ok(rule)
|
||||
return day_ok
|
||||
local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS)
|
||||
if not self._day_ok(rule, local):
|
||||
return False
|
||||
return _in_window(local.hour * 60 + local.minute, rule.schedule_start, rule.schedule_end)
|
||||
|
||||
@staticmethod
|
||||
def _day_ok(rule, local: Optional[datetime] = None) -> bool:
|
||||
if not rule.schedule_days:
|
||||
return True
|
||||
if local is None:
|
||||
local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS)
|
||||
allowed = {int(d) for d in str(rule.schedule_days).split(",") if d.strip() != ""}
|
||||
return local.weekday() in allowed # Mon=0
|
||||
|
||||
# -- event persistence + dispatch ---------------------------------------
|
||||
|
||||
async def _on_onset(self, unit_id: str, rule, value: float, state: RuleState) -> None:
|
||||
from app.database import SessionLocal
|
||||
from app.models import AlertEvent
|
||||
db = SessionLocal()
|
||||
try:
|
||||
evt = AlertEvent(
|
||||
rule_id=rule.id, unit_id=unit_id, rule_name=rule.name,
|
||||
metric=rule.metric, threshold_db=rule.threshold_db,
|
||||
onset_value=value, peak_value=value, status="active",
|
||||
)
|
||||
db.add(evt)
|
||||
db.commit()
|
||||
db.refresh(evt)
|
||||
state.event_id = evt.id
|
||||
except Exception as e:
|
||||
logger.warning(f"[ALERT] failed to record onset for {unit_id}: {e}")
|
||||
finally:
|
||||
db.close()
|
||||
await self._dispatch(
|
||||
"ONSET", unit_id, rule,
|
||||
f"{rule.metric.upper()}={value:.1f} dB "
|
||||
f"{'<' if rule.comparison == 'below' else '>'} {rule.threshold_db:.1f} dB"
|
||||
f"{f' for {rule.duration_s}s' if rule.duration_s else ''}",
|
||||
)
|
||||
|
||||
async def _on_clear(self, unit_id: str, rule, value: float, state: RuleState) -> None:
|
||||
peak = state.peak
|
||||
from app.database import SessionLocal
|
||||
from app.models import AlertEvent
|
||||
db = SessionLocal()
|
||||
try:
|
||||
if state.event_id is not None:
|
||||
evt = db.query(AlertEvent).filter_by(id=state.event_id).first()
|
||||
if evt:
|
||||
evt.clear_at = datetime.utcnow()
|
||||
evt.peak_value = peak
|
||||
evt.status = "cleared"
|
||||
db.commit()
|
||||
except Exception as e:
|
||||
logger.warning(f"[ALERT] failed to record clear for {unit_id}: {e}")
|
||||
finally:
|
||||
db.close()
|
||||
state.event_id = None
|
||||
await self._dispatch(
|
||||
"CLEAR", unit_id, rule,
|
||||
f"recovered to {value:.1f} dB (peak {peak:.1f} dB)",
|
||||
)
|
||||
|
||||
# -- connectivity (device offline/online) -------------------------------
|
||||
#
|
||||
# Raised by the live monitor when it loses / regains contact with a device.
|
||||
# Persisted as an AlertEvent (sentinel rule_id=0, metric="connectivity") so it
|
||||
# lands in the same events/inbox/ack pipeline as threshold alerts. The in-memory
|
||||
# map dedupes; the DB query also dedupes across a process restart.
|
||||
|
||||
async def device_offline(self, unit_id: str) -> None:
|
||||
if unit_id in self._offline_events:
|
||||
return # already flagged offline
|
||||
from app.database import SessionLocal
|
||||
from app.models import AlertEvent
|
||||
db = SessionLocal()
|
||||
try:
|
||||
existing = db.query(AlertEvent).filter_by(
|
||||
unit_id=unit_id, metric="connectivity", status="active").first()
|
||||
if existing: # already open in the DB (e.g. carried across a restart)
|
||||
self._offline_events[unit_id] = existing.id
|
||||
return
|
||||
evt = AlertEvent(
|
||||
rule_id=0, unit_id=unit_id, rule_name="Device unreachable",
|
||||
metric="connectivity", threshold_db=0.0, status="active",
|
||||
)
|
||||
db.add(evt)
|
||||
db.commit()
|
||||
db.refresh(evt)
|
||||
self._offline_events[unit_id] = evt.id
|
||||
except Exception as e:
|
||||
logger.warning(f"[ALERT] failed to record offline for {unit_id}: {e}")
|
||||
finally:
|
||||
db.close()
|
||||
await self._dispatch_raw("OFFLINE", unit_id, "Device unreachable",
|
||||
"live monitor lost contact with the device")
|
||||
|
||||
async def device_online(self, unit_id: str) -> None:
|
||||
self._offline_events.pop(unit_id, None)
|
||||
from app.database import SessionLocal
|
||||
from app.models import AlertEvent
|
||||
db = SessionLocal()
|
||||
cleared = 0
|
||||
try:
|
||||
opened = db.query(AlertEvent).filter_by(
|
||||
unit_id=unit_id, metric="connectivity", status="active").all()
|
||||
for evt in opened:
|
||||
evt.clear_at = datetime.utcnow()
|
||||
evt.status = "cleared"
|
||||
cleared += 1
|
||||
if cleared:
|
||||
db.commit()
|
||||
except Exception as e:
|
||||
logger.warning(f"[ALERT] failed to record online for {unit_id}: {e}")
|
||||
finally:
|
||||
db.close()
|
||||
if cleared: # only announce recovery if it was actually flagged offline
|
||||
await self._dispatch_raw("ONLINE", unit_id, "Device recovered",
|
||||
"live monitor regained contact with the device")
|
||||
|
||||
# -- event persistence + dispatch ---------------------------------------
|
||||
|
||||
async def _dispatch(self, kind: str, unit_id: str, rule, detail: str) -> None:
|
||||
await self._dispatch_raw(kind, unit_id, rule.name, detail)
|
||||
|
||||
async def _dispatch_raw(self, kind: str, unit_id: str, name: str, detail: str) -> None:
|
||||
"""POC dispatch: server log. Swap in a Terra-View webhook (email/SMS) here."""
|
||||
logger.warning(f"[ALERT:{kind}] {unit_id} '{name}': {detail}")
|
||||
|
||||
|
||||
# Module-level singleton (the monitor calls alert_evaluator.evaluate per snapshot)
|
||||
alert_evaluator = AlertEvaluator()
|
||||
@@ -8,6 +8,7 @@ for fast API access without querying devices on every request.
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional
|
||||
|
||||
@@ -20,6 +21,11 @@ from app.device_logger import log_device_event, cleanup_old_logs
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Global polling default. Set SLMM_POLLING_ENABLED=false to start an instance in
|
||||
# standby (running but not polling and not holding device connections) — e.g. a
|
||||
# dev box that must not latch onto a device that a prod instance owns.
|
||||
POLLING_ENABLED_DEFAULT = os.getenv("SLMM_POLLING_ENABLED", "true").lower() == "true"
|
||||
|
||||
|
||||
class BackgroundPoller:
|
||||
"""
|
||||
@@ -39,6 +45,7 @@ class BackgroundPoller:
|
||||
self._logger = logger
|
||||
self._last_cleanup = None # Track last log cleanup time
|
||||
self._last_pool_log = None # Track last connection pool heartbeat log
|
||||
self._active = POLLING_ENABLED_DEFAULT # Global polling on/off (standby toggle)
|
||||
|
||||
async def start(self):
|
||||
"""Start the background polling task."""
|
||||
@@ -71,15 +78,48 @@ class BackgroundPoller:
|
||||
|
||||
self._logger.info("Background poller stopped")
|
||||
|
||||
def is_active(self) -> bool:
|
||||
"""Whether background polling is currently active (vs standby)."""
|
||||
return self._active
|
||||
|
||||
async def set_active(self, active: bool):
|
||||
"""Globally enable/disable polling at runtime.
|
||||
|
||||
When deactivated, the loop stays alive but polls nothing and releases all
|
||||
device connections, so this SLMM instance stops occupying the devices'
|
||||
single connection slots (e.g. so a prod instance can take over). Runtime
|
||||
state only — on restart the instance returns to SLMM_POLLING_ENABLED.
|
||||
"""
|
||||
self._active = active
|
||||
if active:
|
||||
self._logger.info("[SYSTEM] Background polling ACTIVATED")
|
||||
else:
|
||||
self._logger.info("[SYSTEM] Background polling DEACTIVATED (standby) — releasing connections")
|
||||
await self._release_all_connections()
|
||||
|
||||
async def _release_all_connections(self):
|
||||
"""Gracefully close every pooled device connection (no-op if none)."""
|
||||
from app.services import _connection_pool
|
||||
for device_key in list(_connection_pool.get_stats().get("connections", {})):
|
||||
await _connection_pool.discard(device_key)
|
||||
|
||||
async def _poll_loop(self):
|
||||
"""Main polling loop that runs continuously."""
|
||||
self._logger.info("Background polling loop started")
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
await self._poll_all_devices()
|
||||
except Exception as e:
|
||||
self._logger.error(f"Error in poll loop: {e}", exc_info=True)
|
||||
if self._active:
|
||||
try:
|
||||
await self._poll_all_devices()
|
||||
except Exception as e:
|
||||
self._logger.error(f"Error in poll loop: {e}", exc_info=True)
|
||||
else:
|
||||
# Standby: poll nothing, and keep holding no device connection slots
|
||||
# so another SLMM instance (e.g. prod) can talk to the devices.
|
||||
try:
|
||||
await self._release_all_connections()
|
||||
except Exception as e:
|
||||
self._logger.warning(f"Standby connection release failed: {e}")
|
||||
|
||||
# Run log cleanup once per hour
|
||||
try:
|
||||
@@ -138,10 +178,19 @@ class BackgroundPoller:
|
||||
now = datetime.utcnow()
|
||||
polled_count = 0
|
||||
|
||||
from app.monitor import monitor_manager
|
||||
|
||||
for cfg in configs:
|
||||
if not self._running:
|
||||
break
|
||||
|
||||
# Skip units with an active live monitor: it polls them at ~1Hz and
|
||||
# keeps the status cache fresh, so a redundant background poll would just
|
||||
# add load/lock-contention on the device's single connection.
|
||||
if monitor_manager.is_active(cfg.unit_id):
|
||||
self._logger.debug(f"Skipping {cfg.unit_id} — live monitor active")
|
||||
continue
|
||||
|
||||
# Get current status
|
||||
status = db.query(NL43Status).filter_by(unit_id=cfg.unit_id).first()
|
||||
|
||||
|
||||
+22
-3
@@ -38,6 +38,25 @@ async def lifespan(app: FastAPI):
|
||||
await poller.start()
|
||||
logger.info("Background poller started")
|
||||
|
||||
# Auto-start keepalive live monitors for units configured for 24/7 monitoring
|
||||
# (monitor_enabled). This is what keeps alerting running unattended across
|
||||
# restarts — without it a feed only runs while someone has the live view open.
|
||||
try:
|
||||
from app.monitor import monitor_manager
|
||||
from app.database import SessionLocal
|
||||
from app.models import NL43Config
|
||||
db = SessionLocal()
|
||||
try:
|
||||
units = db.query(NL43Config).filter_by(monitor_enabled=True, tcp_enabled=True).all()
|
||||
for cfg in units:
|
||||
m = await monitor_manager.get(cfg.unit_id)
|
||||
await m.set_keepalive(True)
|
||||
logger.info(f"Auto-started keepalive monitor for {cfg.unit_id}")
|
||||
finally:
|
||||
db.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to auto-start monitors: {e}")
|
||||
|
||||
yield # Application runs
|
||||
|
||||
# Shutdown
|
||||
@@ -52,7 +71,7 @@ async def lifespan(app: FastAPI):
|
||||
app = FastAPI(
|
||||
title="SLMM NL43 Addon",
|
||||
description="Standalone module for NL43 configuration and status APIs with background polling",
|
||||
version="0.3.0",
|
||||
version="0.4.0",
|
||||
lifespan=lifespan,
|
||||
)
|
||||
|
||||
@@ -76,12 +95,12 @@ app.include_router(routers.router)
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
def index(request: Request):
|
||||
return templates.TemplateResponse("index.html", {"request": request})
|
||||
return templates.TemplateResponse(request, "index.html")
|
||||
|
||||
|
||||
@app.get("/roster", response_class=HTMLResponse)
|
||||
def roster(request: Request):
|
||||
return templates.TemplateResponse("roster.html", {"request": request})
|
||||
return templates.TemplateResponse(request, "roster.html")
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
|
||||
+78
-1
@@ -1,4 +1,4 @@
|
||||
from sqlalchemy import Column, String, DateTime, Boolean, Integer, Text, func
|
||||
from sqlalchemy import Column, String, DateTime, Boolean, Integer, Float, Text, func
|
||||
from app.database import Base
|
||||
|
||||
|
||||
@@ -23,6 +23,10 @@ class NL43Config(Base):
|
||||
poll_interval_seconds = Column(Integer, nullable=True, default=60) # Polling interval (10-3600 seconds)
|
||||
poll_enabled = Column(Boolean, default=True) # Enable/disable background polling for this device
|
||||
|
||||
# Live monitor (fan-out DOD feed). Keepalive runs it 24/7 even with no viewer,
|
||||
# which is what makes alerting continuous. On by default; toggleable from the UI.
|
||||
monitor_enabled = Column(Boolean, default=True)
|
||||
|
||||
|
||||
class NL43Status(Base):
|
||||
"""
|
||||
@@ -41,6 +45,8 @@ class NL43Status(Base):
|
||||
lmax = Column(String, nullable=True) # Maximum level
|
||||
lmin = Column(String, nullable=True) # Minimum level
|
||||
lpeak = Column(String, nullable=True) # Peak level
|
||||
ln1 = Column(String, nullable=True) # Percentile slot LN1 (configurable; device default L5, contract L1)
|
||||
ln2 = Column(String, nullable=True) # Percentile slot LN2 (configurable; device default L10)
|
||||
battery_level = Column(String, nullable=True)
|
||||
power_source = Column(String, nullable=True)
|
||||
sd_remaining_mb = Column(String, nullable=True)
|
||||
@@ -72,3 +78,74 @@ class DeviceLog(Base):
|
||||
level = Column(String, default="INFO") # DEBUG, INFO, WARNING, ERROR
|
||||
category = Column(String, default="GENERAL") # TCP, FTP, POLL, COMMAND, STATE, SYNC
|
||||
message = Column(Text, nullable=False)
|
||||
|
||||
|
||||
class AlertRule(Base):
|
||||
"""A threshold-alert rule evaluated against a unit's live monitor feed.
|
||||
|
||||
Source-agnostic: today it runs over the DOD monitor; the same rule transfers
|
||||
unchanged if a unit's feed is later sourced from FTP intervals.
|
||||
"""
|
||||
|
||||
__tablename__ = "alert_rules"
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
unit_id = Column(String, index=True, nullable=False)
|
||||
name = Column(String, nullable=False, default="Alert")
|
||||
metric = Column(String, nullable=False, default="lp") # lp/leq/lmax/lmin/lpeak/ln1/ln2
|
||||
comparison = Column(String, nullable=False, default="above") # above | below
|
||||
threshold_db = Column(Float, nullable=False)
|
||||
duration_s = Column(Integer, nullable=False, default=0) # sustained seconds (0 = instant)
|
||||
clear_margin_db = Column(Float, nullable=False, default=2.0) # hysteresis band
|
||||
cooldown_s = Column(Integer, nullable=False, default=300) # min seconds between onsets
|
||||
# Optional time-of-day scoping (local time). schedule_start/end as "HH:MM";
|
||||
# null = always active. schedule_days = CSV of 0-6 (Mon=0); null = every day.
|
||||
schedule_start = Column(String, nullable=True)
|
||||
schedule_end = Column(String, nullable=True)
|
||||
schedule_days = Column(String, nullable=True)
|
||||
channels = Column(String, nullable=False, default="log") # CSV: log,email,sms
|
||||
recipients = Column(Text, nullable=True) # CSV of emails/phones
|
||||
enabled = Column(Boolean, default=True)
|
||||
created_at = Column(DateTime, default=func.now())
|
||||
|
||||
|
||||
class AlertEvent(Base):
|
||||
"""A fired alert (onset → clear), for history / inbox / acknowledgement."""
|
||||
|
||||
__tablename__ = "alert_events"
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
rule_id = Column(Integer, index=True, nullable=False)
|
||||
unit_id = Column(String, index=True, nullable=False)
|
||||
rule_name = Column(String, nullable=True)
|
||||
metric = Column(String, nullable=False)
|
||||
threshold_db = Column(Float, nullable=False)
|
||||
onset_at = Column(DateTime, default=func.now(), index=True)
|
||||
onset_value = Column(Float, nullable=True)
|
||||
peak_value = Column(Float, nullable=True)
|
||||
clear_at = Column(DateTime, nullable=True)
|
||||
status = Column(String, default="active") # active | cleared
|
||||
acknowledged_at = Column(DateTime, nullable=True)
|
||||
acknowledged_by = Column(String, nullable=True)
|
||||
notes = Column(Text, nullable=True)
|
||||
|
||||
|
||||
class NL43Reading(Base):
|
||||
"""Downsampled time-series of live-monitor readings, for the live-chart
|
||||
backfill (so a viewer sees recent trend on open, not a blank chart).
|
||||
|
||||
Viewing only — NOT the report source. Reports use the device's authoritative
|
||||
FTP .rnd intervals. This is a short, capped trail (one row/minute, pruned to
|
||||
a retention window) fed by the monitor's keepalive poll loop.
|
||||
"""
|
||||
|
||||
__tablename__ = "nl43_readings"
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
unit_id = Column(String, index=True, nullable=False)
|
||||
timestamp = Column(DateTime, default=func.now(), index=True)
|
||||
lp = Column(String, nullable=True)
|
||||
leq = Column(String, nullable=True)
|
||||
lmax = Column(String, nullable=True)
|
||||
ln1 = Column(String, nullable=True)
|
||||
ln2 = Column(String, nullable=True)
|
||||
|
||||
+322
@@ -0,0 +1,322 @@
|
||||
"""
|
||||
Per-device live monitor (fan-out hub).
|
||||
|
||||
ONE DOD poll loop per device, broadcast to many subscribers:
|
||||
- browser WebSocket clients (live view) — they no longer each open their own
|
||||
device stream, so the NL43's single-connection limit stops causing the
|
||||
"second viewer sees nothing" contention.
|
||||
- the alert evaluator (threshold alerts), which can keep a device's feed running
|
||||
even with no browser attached.
|
||||
- persistence (each snapshot is written to NL43Status, like the poller does).
|
||||
|
||||
The device's one TCP connection is respected: every poll goes through the same
|
||||
per-device lock + connection pool in services.py, so the monitor, the background
|
||||
poller, and on-demand commands all serialize safely.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import Dict, Optional, Set
|
||||
|
||||
from app.database import SessionLocal
|
||||
from app.models import NL43Config, NL43Status
|
||||
from app.services import NL43Client, persist_snapshot
|
||||
from app.alerts import alert_evaluator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Extra idle between DOD polls WHEN A BROWSER IS WATCHING. The 1s device rate-limit
|
||||
# already paces consecutive DOD? commands, so this just needs to be small — the
|
||||
# rate-limit is the real floor (~1.25s/poll effective).
|
||||
MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "0.25"))
|
||||
|
||||
# Idle cadence when NO browser is subscribed and the feed is only kept alive for
|
||||
# alerting. Same data, ~8x fewer polls -> ~8x less cellular traffic on a metered
|
||||
# SIM (~1 GB/device/month at full rate -> ~125 MB). NOTE: this also sets the alert
|
||||
# sampling resolution when nobody is watching, so keep it <= the smallest alert
|
||||
# duration_s you rely on (default 10s comfortably catches a "sustained 30/60s" rule).
|
||||
MONITOR_IDLE_POLL_INTERVAL = float(os.getenv("MONITOR_IDLE_POLL_INTERVAL", "10"))
|
||||
|
||||
# Exponential backoff once the device is unreachable, so a powered-off / asleep /
|
||||
# out-of-signal device stops churning reconnects every cycle (log spam + a trickle
|
||||
# of wasted cellular data on failed SYNs). delay = min(BASE * 2**(fails-1), MAX),
|
||||
# reset to full-rate on the first good poll. While a browser is actively watching we
|
||||
# cap the backoff lower (WATCHED_MAX) so a recovery surfaces quickly for the viewer.
|
||||
MONITOR_BACKOFF_BASE_S = float(os.getenv("MONITOR_BACKOFF_BASE_S", "1"))
|
||||
MONITOR_BACKOFF_MAX_S = float(os.getenv("MONITOR_BACKOFF_MAX_S", "60"))
|
||||
MONITOR_BACKOFF_WATCHED_MAX_S = float(os.getenv("MONITOR_BACKOFF_WATCHED_MAX_S", "5"))
|
||||
|
||||
# How often to refresh the run state (Measure?). It changes rarely, so we cache it
|
||||
# and skip that second rate-limited command on most polls — roughly halving the
|
||||
# per-update latency (~2.5s -> ~1.3s).
|
||||
MONITOR_STATE_REFRESH_S = float(os.getenv("MONITOR_STATE_REFRESH_S", "30"))
|
||||
|
||||
# Downsampled trail for the live-chart backfill: store one reading per
|
||||
# TRAIL_SAMPLE_S and keep TRAIL_RETENTION_HOURS of it (pruned). Viewing only —
|
||||
# reports use the device's FTP .rnd data, not this.
|
||||
TRAIL_SAMPLE_S = float(os.getenv("MONITOR_TRAIL_SAMPLE_S", "60"))
|
||||
TRAIL_RETENTION_HOURS = float(os.getenv("MONITOR_TRAIL_RETENTION_HOURS", "24"))
|
||||
|
||||
# If nothing has been broadcast in this many seconds (e.g. device offline and
|
||||
# silent), send a keepalive frame so reverse proxies don't drop the idle WS.
|
||||
MONITOR_HEARTBEAT_S = float(os.getenv("MONITOR_HEARTBEAT_S", "25"))
|
||||
|
||||
|
||||
def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict:
|
||||
"""Build the broadcast payload — same shape as the DRD stream, but DOD-sourced
|
||||
so it carries ln1/ln2 (which DRD cannot)."""
|
||||
return {
|
||||
"unit_id": unit_id,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"measurement_state": snap.measurement_state,
|
||||
"measurement_start_time": measurement_start_time,
|
||||
"counter": snap.counter,
|
||||
"lp": snap.lp,
|
||||
"leq": snap.leq,
|
||||
"lmax": snap.lmax,
|
||||
"lmin": snap.lmin,
|
||||
"lpeak": snap.lpeak,
|
||||
"ln1": snap.ln1,
|
||||
"ln2": snap.ln2,
|
||||
"raw_payload": snap.raw_payload,
|
||||
}
|
||||
|
||||
|
||||
class DeviceMonitor:
|
||||
"""Owns a single DOD poll loop for one device and fans each snapshot out to
|
||||
all subscribers. Runs while it has at least one browser subscriber OR the
|
||||
server-side keep-alive (alerting) flag is set."""
|
||||
|
||||
def __init__(self, unit_id: str):
|
||||
self.unit_id = unit_id
|
||||
self._subscribers: Set[asyncio.Queue] = set()
|
||||
self._keepalive = False
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._lock = asyncio.Lock()
|
||||
self._last_payload: Optional[dict] = None # replayed to new subscribers
|
||||
self._consec_fail = 0
|
||||
self._reachable = True # last broadcast reachability (for transition frames)
|
||||
self._cached_state: Optional[str] = None # run state, refreshed periodically
|
||||
self._last_state_refresh = 0.0
|
||||
self._last_trail_store = 0.0 # downsample throttle for the backfill trail
|
||||
|
||||
@property
|
||||
def running(self) -> bool:
|
||||
return self._task is not None and not self._task.done()
|
||||
|
||||
def subscriber_count(self) -> int:
|
||||
return len(self._subscribers)
|
||||
|
||||
def _has_demand(self) -> bool:
|
||||
return bool(self._subscribers) or self._keepalive
|
||||
|
||||
def _ensure_task(self) -> None:
|
||||
if self._task is None or self._task.done():
|
||||
self._task = asyncio.create_task(self._run())
|
||||
|
||||
async def subscribe(self) -> asyncio.Queue:
|
||||
q: asyncio.Queue = asyncio.Queue(maxsize=5)
|
||||
async with self._lock:
|
||||
self._subscribers.add(q)
|
||||
# Replay the last frame so a client connecting mid-stream sees data
|
||||
# (or the current 'unreachable' state) immediately, not after a poll.
|
||||
if self._last_payload is not None:
|
||||
try:
|
||||
q.put_nowait(self._last_payload)
|
||||
except asyncio.QueueFull:
|
||||
pass
|
||||
self._ensure_task()
|
||||
return q
|
||||
|
||||
async def unsubscribe(self, q: asyncio.Queue) -> None:
|
||||
async with self._lock:
|
||||
self._subscribers.discard(q)
|
||||
|
||||
async def set_keepalive(self, on: bool) -> None:
|
||||
async with self._lock:
|
||||
self._keepalive = on
|
||||
if on:
|
||||
self._ensure_task()
|
||||
|
||||
async def _run(self) -> None:
|
||||
logger.info(f"[MONITOR] {self.unit_id}: feed started")
|
||||
loop = asyncio.get_running_loop()
|
||||
last_send = loop.time()
|
||||
try:
|
||||
while self._has_demand():
|
||||
snap, mst = await self._poll_once()
|
||||
if snap is not None:
|
||||
if not self._reachable:
|
||||
# Recovered from an outage — clear the connectivity alert.
|
||||
try:
|
||||
await alert_evaluator.device_online(self.unit_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"[MONITOR] {self.unit_id}: online alert failed: {e}")
|
||||
self._consec_fail = 0
|
||||
self._reachable = True
|
||||
payload = _snapshot_payload(snap, self.unit_id, mst)
|
||||
payload["feed_status"] = "ok"
|
||||
self._broadcast(payload)
|
||||
last_send = loop.time()
|
||||
try:
|
||||
await alert_evaluator.evaluate(self.unit_id, snap)
|
||||
except Exception as e:
|
||||
logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}")
|
||||
else:
|
||||
# Tell clients the device went offline — once, on transition, after a
|
||||
# few failures so a momentary blip doesn't flap the UI. Same edge
|
||||
# raises the device-offline alert.
|
||||
self._consec_fail += 1
|
||||
if self._reachable and self._consec_fail >= 3:
|
||||
self._reachable = False
|
||||
self._broadcast({
|
||||
"unit_id": self.unit_id,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"feed_status": "unreachable",
|
||||
})
|
||||
last_send = loop.time()
|
||||
try:
|
||||
await alert_evaluator.device_offline(self.unit_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"[MONITOR] {self.unit_id}: offline alert failed: {e}")
|
||||
|
||||
# Heartbeat: during quiet/offline stretches, send a keepalive so an
|
||||
# idle WS isn't dropped by a reverse proxy. Not cached (new subscribers
|
||||
# should still get the last real frame, not a heartbeat).
|
||||
if loop.time() - last_send >= MONITOR_HEARTBEAT_S:
|
||||
self._broadcast({
|
||||
"unit_id": self.unit_id,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"feed_status": "ok" if self._reachable else "unreachable",
|
||||
"heartbeat": True,
|
||||
}, cache=False)
|
||||
last_send = loop.time()
|
||||
|
||||
await asyncio.sleep(self._next_delay())
|
||||
finally:
|
||||
logger.info(f"[MONITOR] {self.unit_id}: feed stopped")
|
||||
|
||||
def _next_delay(self) -> float:
|
||||
"""Inter-poll delay: exponential backoff while unreachable, full-rate while a
|
||||
browser is watching, relaxed cadence when the feed is keepalive-only."""
|
||||
if self._consec_fail > 0:
|
||||
shift = min(self._consec_fail - 1, 6) # cap growth at 2**6 = 64x base
|
||||
delay = min(MONITOR_BACKOFF_BASE_S * (2 ** shift), MONITOR_BACKOFF_MAX_S)
|
||||
if self._subscribers:
|
||||
delay = min(delay, MONITOR_BACKOFF_WATCHED_MAX_S)
|
||||
return delay
|
||||
if self._subscribers:
|
||||
return MONITOR_POLL_INTERVAL # a browser is watching — smooth chart
|
||||
return MONITOR_IDLE_POLL_INTERVAL # keepalive-only (alerting) — save data
|
||||
|
||||
async def _poll_once(self):
|
||||
"""One DOD poll: read, persist, return (snapshot, measurement_start_iso)."""
|
||||
db = SessionLocal()
|
||||
try:
|
||||
cfg = db.query(NL43Config).filter_by(unit_id=self.unit_id).first()
|
||||
if not cfg or not cfg.tcp_enabled:
|
||||
return None, None
|
||||
client = NL43Client(
|
||||
cfg.host, cfg.tcp_port,
|
||||
ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password,
|
||||
ftp_port=cfg.ftp_port or 21,
|
||||
)
|
||||
# Refresh the run state only every MONITOR_STATE_REFRESH_S; reuse the
|
||||
# cached state otherwise so most polls send just DOD? (one rate-limited
|
||||
# command) instead of DOD? + Measure?.
|
||||
now = asyncio.get_running_loop().time()
|
||||
refresh_state = (self._cached_state is None
|
||||
or now - self._last_state_refresh >= MONITOR_STATE_REFRESH_S)
|
||||
snap = await client.request_dod(
|
||||
measurement_state=None if refresh_state else self._cached_state
|
||||
)
|
||||
if refresh_state:
|
||||
self._cached_state = snap.measurement_state
|
||||
self._last_state_refresh = now
|
||||
snap.unit_id = self.unit_id
|
||||
persist_snapshot(snap, db)
|
||||
db.commit()
|
||||
# Append to the downsampled backfill trail (~one row per TRAIL_SAMPLE_S).
|
||||
if now - self._last_trail_store >= TRAIL_SAMPLE_S:
|
||||
self._last_trail_store = now
|
||||
self._store_trail(snap, db)
|
||||
status = db.query(NL43Status).filter_by(unit_id=self.unit_id).first()
|
||||
mst = (status.measurement_start_time.isoformat()
|
||||
if status and status.measurement_start_time else None)
|
||||
return snap, mst
|
||||
except Exception as e:
|
||||
logger.warning(f"[MONITOR] {self.unit_id}: poll failed: {e}")
|
||||
return None, None
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
def _store_trail(self, snap, db) -> None:
|
||||
"""Append one downsampled reading to the backfill trail and prune old rows."""
|
||||
from datetime import datetime, timedelta
|
||||
from app.models import NL43Reading
|
||||
try:
|
||||
db.add(NL43Reading(
|
||||
unit_id=self.unit_id, timestamp=datetime.utcnow(),
|
||||
lp=snap.lp, leq=snap.leq, lmax=snap.lmax, ln1=snap.ln1, ln2=snap.ln2,
|
||||
))
|
||||
cutoff = datetime.utcnow() - timedelta(hours=TRAIL_RETENTION_HOURS)
|
||||
db.query(NL43Reading).filter(
|
||||
NL43Reading.unit_id == self.unit_id,
|
||||
NL43Reading.timestamp < cutoff,
|
||||
).delete()
|
||||
db.commit()
|
||||
except Exception as e:
|
||||
logger.warning(f"[MONITOR] {self.unit_id}: trail store failed: {e}")
|
||||
|
||||
def _broadcast(self, payload: dict, cache: bool = True) -> None:
|
||||
if cache:
|
||||
self._last_payload = payload # replayed to new subscribers
|
||||
for q in list(self._subscribers):
|
||||
try:
|
||||
q.put_nowait(payload)
|
||||
except asyncio.QueueFull:
|
||||
# Slow consumer — drop this frame rather than stall the whole feed.
|
||||
pass
|
||||
|
||||
|
||||
class MonitorManager:
|
||||
"""Registry of per-device monitors (one per unit_id)."""
|
||||
|
||||
def __init__(self):
|
||||
self._monitors: Dict[str, DeviceMonitor] = {}
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
async def get(self, unit_id: str) -> DeviceMonitor:
|
||||
async with self._lock:
|
||||
m = self._monitors.get(unit_id)
|
||||
if m is None:
|
||||
m = DeviceMonitor(unit_id)
|
||||
self._monitors[unit_id] = m
|
||||
return m
|
||||
|
||||
def is_active(self, unit_id: str) -> bool:
|
||||
"""True if this unit has a running monitor feed (so the background poller
|
||||
can skip it — the monitor already polls it more often)."""
|
||||
m = self._monitors.get(unit_id)
|
||||
return m is not None and m.running
|
||||
|
||||
def status(self) -> dict:
|
||||
return {
|
||||
uid: {
|
||||
"running": m.running,
|
||||
"subscribers": m.subscriber_count(),
|
||||
"keepalive": m._keepalive,
|
||||
"reachable": m._reachable,
|
||||
# what cadence the loop is currently using, for observability
|
||||
"mode": ("backoff" if m._consec_fail > 0
|
||||
else "watched" if m._subscribers
|
||||
else "idle"),
|
||||
}
|
||||
for uid, m in self._monitors.items()
|
||||
}
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
monitor_manager = MonitorManager()
|
||||
+401
-2
@@ -11,7 +11,7 @@ import os
|
||||
import asyncio
|
||||
|
||||
from app.database import get_db
|
||||
from app.models import NL43Config, NL43Status
|
||||
from app.models import NL43Config, NL43Status, AlertRule, AlertEvent, NL43Reading
|
||||
from app.services import NL43Client, persist_snapshot
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -121,6 +121,392 @@ async def flush_connection_pool():
|
||||
return {"status": "ok", "message": "All cached connections closed"}
|
||||
|
||||
|
||||
@router.post("/{unit_id}/disconnect")
|
||||
async def disconnect_device(unit_id: str, db: Session = Depends(get_db)):
|
||||
"""Cleanly close SLMM's persistent TCP connection to a single device.
|
||||
|
||||
Gracefully closes (TCP FIN + wait_closed) the pooled connection for this
|
||||
device and removes it from the pool, freeing the NL43's single connection
|
||||
slot. Idempotent — a no-op if no connection is currently cached.
|
||||
|
||||
Note: this releases the *idle* pooled connection. It does not interrupt an
|
||||
in-progress DRD stream or an in-flight command (those have the socket
|
||||
checked out of the pool) — close the stream WebSocket to end a live stream.
|
||||
"""
|
||||
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
|
||||
if not cfg:
|
||||
raise HTTPException(status_code=404, detail="NL43 config not found")
|
||||
|
||||
from app.services import _connection_pool
|
||||
|
||||
device_key = f"{cfg.host}:{cfg.tcp_port}"
|
||||
had_conn = device_key in _connection_pool.get_stats().get("connections", {})
|
||||
|
||||
await _connection_pool.discard(device_key)
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"unit_id": unit_id,
|
||||
"device_key": device_key,
|
||||
"disconnected": had_conn,
|
||||
"message": "Connection closed" if had_conn else "No cached connection to close",
|
||||
}
|
||||
|
||||
|
||||
@router.post("/{unit_id}/deactivate")
|
||||
async def deactivate_device(unit_id: str, db: Session = Depends(get_db)):
|
||||
"""Make a single unit dormant: stop background polling for it AND drop its
|
||||
connection, freeing the device's connection slot. poll_enabled=False is
|
||||
persisted, so the unit stays dormant across restarts until /activate.
|
||||
"""
|
||||
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
|
||||
if not cfg:
|
||||
raise HTTPException(status_code=404, detail="NL43 config not found")
|
||||
|
||||
cfg.poll_enabled = False
|
||||
db.commit()
|
||||
|
||||
from app.services import _connection_pool, _get_device_lock
|
||||
|
||||
device_key = f"{cfg.host}:{cfg.tcp_port}"
|
||||
|
||||
# Wait briefly for any in-flight poll/command to finish (so its connection is
|
||||
# back in the pool), then drop it. If a long-lived stream holds the lock we
|
||||
# don't block forever — discard the pooled connection regardless.
|
||||
lock = await _get_device_lock(device_key)
|
||||
acquired = False
|
||||
try:
|
||||
await asyncio.wait_for(lock.acquire(), timeout=10.0)
|
||||
acquired = True
|
||||
except asyncio.TimeoutError:
|
||||
acquired = False
|
||||
try:
|
||||
await _connection_pool.discard(device_key)
|
||||
finally:
|
||||
if acquired:
|
||||
lock.release()
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"unit_id": unit_id,
|
||||
"poll_enabled": False,
|
||||
"message": "Polling disabled and connection closed for this unit",
|
||||
}
|
||||
|
||||
|
||||
@router.post("/{unit_id}/activate")
|
||||
async def activate_device(unit_id: str, db: Session = Depends(get_db)):
|
||||
"""Resume background polling for a unit previously deactivated."""
|
||||
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
|
||||
if not cfg:
|
||||
raise HTTPException(status_code=404, detail="NL43 config not found")
|
||||
|
||||
cfg.poll_enabled = True
|
||||
db.commit()
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"unit_id": unit_id,
|
||||
"poll_enabled": True,
|
||||
"message": "Polling enabled for this unit",
|
||||
}
|
||||
|
||||
|
||||
@router.get("/_system/status")
|
||||
async def system_status():
|
||||
"""Report whether this SLMM instance is actively polling or in standby."""
|
||||
from app.background_poller import poller
|
||||
from app.services import _connection_pool
|
||||
return {
|
||||
"status": "ok",
|
||||
"mode": "active" if poller.is_active() else "standby",
|
||||
"polling_active": poller.is_active(),
|
||||
"active_connections": _connection_pool.get_stats().get("active_connections", 0),
|
||||
}
|
||||
|
||||
|
||||
@router.post("/_system/standby")
|
||||
async def system_standby():
|
||||
"""Put this SLMM instance into standby: stop polling ALL devices and release
|
||||
every connection, so it stops occupying device slots (e.g. so a prod instance
|
||||
can take over). Runtime-only — on restart the instance returns to its
|
||||
SLMM_POLLING_ENABLED default.
|
||||
"""
|
||||
from app.background_poller import poller
|
||||
await poller.set_active(False)
|
||||
return {"status": "ok", "mode": "standby",
|
||||
"message": "Polling stopped and all device connections released"}
|
||||
|
||||
|
||||
@router.post("/_system/resume")
|
||||
async def system_resume():
|
||||
"""Resume polling after standby (global)."""
|
||||
from app.background_poller import poller
|
||||
await poller.set_active(True)
|
||||
return {"status": "ok", "mode": "active", "message": "Polling resumed"}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# LIVE MONITOR (fan-out) — one DOD feed per device, broadcast to many clients
|
||||
# ============================================================================
|
||||
|
||||
@router.websocket("/{unit_id}/monitor")
|
||||
async def monitor_stream(websocket: WebSocket, unit_id: str):
|
||||
"""Subscribe a browser to the device's shared 1 Hz DOD feed.
|
||||
|
||||
Any number of clients can attach without each opening its own device
|
||||
connection (one poll loop per device, fanned out). Same JSON shape as the
|
||||
DRD stream, but DOD-sourced so it includes ln1/ln2 (L1/L10).
|
||||
"""
|
||||
await websocket.accept()
|
||||
from app.monitor import monitor_manager
|
||||
|
||||
monitor = await monitor_manager.get(unit_id)
|
||||
queue = await monitor.subscribe()
|
||||
logger.info(f"Monitor subscriber attached for {unit_id} ({monitor.subscriber_count()} total)")
|
||||
|
||||
async def _watch_disconnect():
|
||||
# Completes when the client disconnects, so an idle feed (no data) still
|
||||
# detects the drop and we don't leak a subscription that keeps the device
|
||||
# feed (and its connection) alive.
|
||||
try:
|
||||
while True:
|
||||
msg = await websocket.receive()
|
||||
if msg.get("type") == "websocket.disconnect":
|
||||
return
|
||||
except Exception:
|
||||
return
|
||||
|
||||
gone = asyncio.ensure_future(_watch_disconnect())
|
||||
try:
|
||||
while not gone.done():
|
||||
try:
|
||||
payload = await asyncio.wait_for(queue.get(), timeout=1.0)
|
||||
except asyncio.TimeoutError:
|
||||
continue # re-check gone.done()
|
||||
if gone.done():
|
||||
break # client disconnected while we waited — don't send into a closing socket
|
||||
await websocket.send_json(payload)
|
||||
except WebSocketDisconnect:
|
||||
logger.info(f"Monitor subscriber disconnected for {unit_id}")
|
||||
except Exception as e:
|
||||
# A frame that races the close (client vanished mid-send) surfaces as
|
||||
# "Unexpected ASGI message 'websocket.send' after ... websocket.close".
|
||||
# That's expected on disconnect (the portal closes the socket on every tab
|
||||
# switch), not an error — log it quietly.
|
||||
msg = str(e)
|
||||
if "after sending" in msg or "websocket.close" in msg or "response already completed" in msg:
|
||||
logger.debug(f"Monitor stream for {unit_id} closed mid-send (client gone)")
|
||||
else:
|
||||
logger.warning(f"Monitor stream error for {unit_id}: {e}")
|
||||
finally:
|
||||
gone.cancel()
|
||||
await monitor.unsubscribe(queue)
|
||||
|
||||
|
||||
@router.post("/{unit_id}/monitor/start")
|
||||
async def monitor_start(unit_id: str, db: Session = Depends(get_db)):
|
||||
"""Enable 24/7 keepalive monitoring: persist monitor_enabled and start the feed
|
||||
now, so alerting evaluates continuously even with no viewer. Survives restarts
|
||||
(auto-started on boot from the persisted flag)."""
|
||||
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
|
||||
if cfg:
|
||||
cfg.monitor_enabled = True
|
||||
db.commit()
|
||||
from app.monitor import monitor_manager
|
||||
monitor = await monitor_manager.get(unit_id)
|
||||
await monitor.set_keepalive(True)
|
||||
return {"status": "ok", "unit_id": unit_id, "monitor_enabled": True, "running": monitor.running}
|
||||
|
||||
|
||||
@router.post("/{unit_id}/monitor/stop")
|
||||
async def monitor_stop(unit_id: str, db: Session = Depends(get_db)):
|
||||
"""Disable keepalive monitoring: persist monitor_enabled=False and drop the
|
||||
keepalive (the feed stops once no browser subscribers remain)."""
|
||||
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
|
||||
if cfg:
|
||||
cfg.monitor_enabled = False
|
||||
db.commit()
|
||||
from app.monitor import monitor_manager
|
||||
monitor = await monitor_manager.get(unit_id)
|
||||
await monitor.set_keepalive(False)
|
||||
return {"status": "ok", "unit_id": unit_id, "monitor_enabled": False}
|
||||
|
||||
|
||||
@router.get("/_monitor/status")
|
||||
async def monitor_status():
|
||||
"""Status of every device monitor (running, subscriber count, keep-alive)."""
|
||||
from app.monitor import monitor_manager
|
||||
return {"status": "ok", "monitors": monitor_manager.status()}
|
||||
|
||||
|
||||
@router.get("/{unit_id}/history")
|
||||
def get_monitor_history(unit_id: str, hours: float = 2.0, db: Session = Depends(get_db)):
|
||||
"""Recent downsampled monitor readings (the DOD trail) for the live-chart
|
||||
backfill. Viewing only — NOT the FTP report data."""
|
||||
from datetime import timedelta
|
||||
hours = max(0.1, min(hours, 48.0))
|
||||
cutoff = datetime.utcnow() - timedelta(hours=hours)
|
||||
rows = (db.query(NL43Reading)
|
||||
.filter(NL43Reading.unit_id == unit_id, NL43Reading.timestamp >= cutoff)
|
||||
.order_by(NL43Reading.timestamp.asc()).all())
|
||||
return {
|
||||
"status": "ok",
|
||||
"unit_id": unit_id,
|
||||
"hours": hours,
|
||||
"count": len(rows),
|
||||
"readings": [
|
||||
{
|
||||
"timestamp": r.timestamp.isoformat() if r.timestamp else None,
|
||||
"lp": r.lp, "leq": r.leq, "lmax": r.lmax, "ln1": r.ln1, "ln2": r.ln2,
|
||||
}
|
||||
for r in rows
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# ALERTS — threshold rules + fired events
|
||||
# ============================================================================
|
||||
|
||||
class AlertRulePayload(BaseModel):
|
||||
name: str = "Alert"
|
||||
metric: str = "lp" # lp/leq/lmax/lmin/lpeak/ln1/ln2
|
||||
comparison: str = "above" # above | below
|
||||
threshold_db: float
|
||||
duration_s: int = 0 # sustained seconds before firing (0 = instant)
|
||||
clear_margin_db: float = 2.0 # hysteresis band
|
||||
cooldown_s: int = 300
|
||||
schedule_start: str | None = None # "HH:MM" local; null = always
|
||||
schedule_end: str | None = None
|
||||
schedule_days: str | None = None # CSV of 0-6 (Mon=0); null = every day
|
||||
channels: str = "log"
|
||||
recipients: str | None = None
|
||||
enabled: bool = True
|
||||
|
||||
|
||||
def _rule_dict(r: AlertRule) -> dict:
|
||||
return {
|
||||
"id": r.id, "unit_id": r.unit_id, "name": r.name, "metric": r.metric,
|
||||
"comparison": r.comparison, "threshold_db": r.threshold_db,
|
||||
"duration_s": r.duration_s, "clear_margin_db": r.clear_margin_db,
|
||||
"cooldown_s": r.cooldown_s, "schedule_start": r.schedule_start,
|
||||
"schedule_end": r.schedule_end, "schedule_days": r.schedule_days,
|
||||
"channels": r.channels, "recipients": r.recipients, "enabled": r.enabled,
|
||||
}
|
||||
|
||||
|
||||
def _event_dict(e: AlertEvent) -> dict:
|
||||
return {
|
||||
"id": e.id, "rule_id": e.rule_id, "unit_id": e.unit_id,
|
||||
"rule_name": e.rule_name, "metric": e.metric, "threshold_db": e.threshold_db,
|
||||
"onset_at": e.onset_at.isoformat() if e.onset_at else None,
|
||||
"onset_value": e.onset_value, "peak_value": e.peak_value,
|
||||
"clear_at": e.clear_at.isoformat() if e.clear_at else None,
|
||||
"status": e.status,
|
||||
"acknowledged_at": e.acknowledged_at.isoformat() if e.acknowledged_at else None,
|
||||
"acknowledged_by": e.acknowledged_by,
|
||||
}
|
||||
|
||||
|
||||
async def _sync_keepalive_to_rules(unit_id: str, db: Session):
|
||||
"""Keep a unit's monitor running while it has enabled alert rules, so the
|
||||
evaluator runs 24/7 even with no browser watching. Turns keepalive ON (and
|
||||
persists monitor_enabled so it survives a restart via the boot auto-start)
|
||||
when enabled rules exist; never turns it OFF — a device may be kept alive for
|
||||
other reasons, so operators control that on /admin/slmm."""
|
||||
has_enabled = (db.query(AlertRule)
|
||||
.filter_by(unit_id=unit_id, enabled=True).first() is not None)
|
||||
if not has_enabled:
|
||||
return
|
||||
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
|
||||
if cfg and not cfg.monitor_enabled:
|
||||
cfg.monitor_enabled = True
|
||||
db.commit()
|
||||
from app.monitor import monitor_manager
|
||||
m = await monitor_manager.get(unit_id)
|
||||
await m.set_keepalive(True)
|
||||
|
||||
|
||||
def _reset_rule_runtime(unit_id: str, rule_id: int, db: Session):
|
||||
"""After a rule edit/delete: drop its evaluator state machine and close any open
|
||||
event, so a stale 'active' phase doesn't mis-evaluate against the new config and
|
||||
the client portal doesn't stay 'in alarm' on a rule that changed or is gone."""
|
||||
from app.alerts import alert_evaluator
|
||||
alert_evaluator.forget_rule(unit_id, rule_id)
|
||||
now = datetime.utcnow()
|
||||
for evt in db.query(AlertEvent).filter_by(unit_id=unit_id, rule_id=rule_id, status="active").all():
|
||||
evt.clear_at = now
|
||||
evt.status = "cleared"
|
||||
db.commit()
|
||||
|
||||
|
||||
@router.post("/{unit_id}/alerts/rules")
|
||||
async def create_alert_rule(unit_id: str, payload: AlertRulePayload, db: Session = Depends(get_db)):
|
||||
rule = AlertRule(unit_id=unit_id, **payload.model_dump())
|
||||
db.add(rule)
|
||||
db.commit()
|
||||
db.refresh(rule)
|
||||
from app.alerts import alert_evaluator
|
||||
alert_evaluator.invalidate(unit_id)
|
||||
await _sync_keepalive_to_rules(unit_id, db)
|
||||
return {"status": "ok", "rule": _rule_dict(rule)}
|
||||
|
||||
|
||||
@router.get("/{unit_id}/alerts/rules")
|
||||
def list_alert_rules(unit_id: str, db: Session = Depends(get_db)):
|
||||
rules = db.query(AlertRule).filter_by(unit_id=unit_id).all()
|
||||
return {"status": "ok", "rules": [_rule_dict(r) for r in rules]}
|
||||
|
||||
|
||||
@router.put("/{unit_id}/alerts/rules/{rule_id}")
|
||||
async def update_alert_rule(unit_id: str, rule_id: int, payload: AlertRulePayload, db: Session = Depends(get_db)):
|
||||
rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first()
|
||||
if not rule:
|
||||
raise HTTPException(status_code=404, detail="Alert rule not found")
|
||||
for field, value in payload.model_dump().items():
|
||||
setattr(rule, field, value)
|
||||
db.commit()
|
||||
db.refresh(rule)
|
||||
from app.alerts import alert_evaluator
|
||||
alert_evaluator.invalidate(unit_id)
|
||||
_reset_rule_runtime(unit_id, rule_id, db)
|
||||
await _sync_keepalive_to_rules(unit_id, db)
|
||||
return {"status": "ok", "rule": _rule_dict(rule)}
|
||||
|
||||
|
||||
@router.delete("/{unit_id}/alerts/rules/{rule_id}")
|
||||
async def delete_alert_rule(unit_id: str, rule_id: int, db: Session = Depends(get_db)):
|
||||
rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first()
|
||||
if not rule:
|
||||
raise HTTPException(status_code=404, detail="Alert rule not found")
|
||||
db.delete(rule)
|
||||
db.commit()
|
||||
from app.alerts import alert_evaluator
|
||||
alert_evaluator.invalidate(unit_id)
|
||||
_reset_rule_runtime(unit_id, rule_id, db) # close its open event so the portal doesn't stay red
|
||||
await _sync_keepalive_to_rules(unit_id, db) # no-op if no enabled rules remain
|
||||
return {"status": "ok", "deleted": rule_id}
|
||||
|
||||
|
||||
@router.get("/{unit_id}/alerts/events")
|
||||
def list_alert_events(unit_id: str, limit: int = 50, db: Session = Depends(get_db)):
|
||||
events = (db.query(AlertEvent).filter_by(unit_id=unit_id)
|
||||
.order_by(AlertEvent.onset_at.desc()).limit(limit).all())
|
||||
return {"status": "ok", "events": [_event_dict(e) for e in events]}
|
||||
|
||||
|
||||
@router.post("/{unit_id}/alerts/events/{event_id}/ack")
|
||||
def ack_alert_event(unit_id: str, event_id: int, by: str | None = None, db: Session = Depends(get_db)):
|
||||
evt = db.query(AlertEvent).filter_by(id=event_id, unit_id=unit_id).first()
|
||||
if not evt:
|
||||
raise HTTPException(status_code=404, detail="Alert event not found")
|
||||
evt.acknowledged_at = datetime.utcnow()
|
||||
evt.acknowledged_by = by
|
||||
db.commit()
|
||||
return {"status": "ok", "acknowledged": event_id}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes)
|
||||
# ============================================================================
|
||||
@@ -197,6 +583,7 @@ def get_roster(db: Session = Depends(get_db)):
|
||||
"web_enabled": cfg.web_enabled,
|
||||
"poll_enabled": cfg.poll_enabled,
|
||||
"poll_interval_seconds": cfg.poll_interval_seconds,
|
||||
"monitor_enabled": cfg.monitor_enabled,
|
||||
"status": None
|
||||
}
|
||||
|
||||
@@ -445,11 +832,14 @@ def get_status(unit_id: str, db: Session = Depends(get_db)):
|
||||
"unit_id": unit_id,
|
||||
"last_seen": status.last_seen.isoformat() if status.last_seen else None,
|
||||
"measurement_state": status.measurement_state,
|
||||
"measurement_start_time": status.measurement_start_time.isoformat() if status.measurement_start_time else None,
|
||||
"lp": status.lp,
|
||||
"leq": status.leq,
|
||||
"lmax": status.lmax,
|
||||
"lmin": status.lmin,
|
||||
"lpeak": status.lpeak,
|
||||
"ln1": status.ln1,
|
||||
"ln2": status.ln2,
|
||||
"battery_level": status.battery_level,
|
||||
"power_source": status.power_source,
|
||||
"sd_remaining_mb": status.sd_remaining_mb,
|
||||
@@ -472,6 +862,8 @@ class StatusPayload(BaseModel):
|
||||
lmax: str | None = None
|
||||
lmin: str | None = None
|
||||
lpeak: str | None = None
|
||||
ln1: str | None = None
|
||||
ln2: str | None = None
|
||||
battery_level: str | None = None
|
||||
power_source: str | None = None
|
||||
sd_remaining_mb: str | None = None
|
||||
@@ -499,11 +891,14 @@ def upsert_status(unit_id: str, payload: StatusPayload, db: Session = Depends(ge
|
||||
"unit_id": unit_id,
|
||||
"last_seen": status.last_seen.isoformat(),
|
||||
"measurement_state": status.measurement_state,
|
||||
"measurement_start_time": status.measurement_start_time.isoformat() if status.measurement_start_time else None,
|
||||
"lp": status.lp,
|
||||
"leq": status.leq,
|
||||
"lmax": status.lmax,
|
||||
"lmin": status.lmin,
|
||||
"lpeak": status.lpeak,
|
||||
"ln1": status.ln1,
|
||||
"ln2": status.ln2,
|
||||
"battery_level": status.battery_level,
|
||||
"power_source": status.power_source,
|
||||
"sd_remaining_mb": status.sd_remaining_mb,
|
||||
@@ -544,7 +939,7 @@ async def start_measurement(unit_id: str, db: Session = Depends(get_db)):
|
||||
db.expire_all()
|
||||
status = db.query(NL43Status).filter_by(unit_id=unit_id).first()
|
||||
logger.info(f"State check: measurement_state={status.measurement_state if status else 'None'}, start_time={status.measurement_start_time if status else 'None'}")
|
||||
if status and status.measurement_state == "Measure" and status.measurement_start_time:
|
||||
if status and status.measurement_state in ("Start", "Measure") and status.measurement_start_time:
|
||||
logger.info(f"✓ Measurement state confirmed for {unit_id} with start time {status.measurement_start_time}")
|
||||
break
|
||||
|
||||
@@ -1205,6 +1600,8 @@ async def stream_live(websocket: WebSocket, unit_id: str):
|
||||
"lmax": snap.lmax, # Maximum level
|
||||
"lmin": snap.lmin, # Minimum level
|
||||
"lpeak": snap.lpeak, # Peak level
|
||||
"ln1": snap.ln1, # LN1 percentile (L1/L10 contract); null on DRD stream
|
||||
"ln2": snap.ln2, # LN2 percentile; null on DRD stream
|
||||
"raw_payload": snap.raw_payload,
|
||||
})
|
||||
except Exception as e:
|
||||
@@ -1876,6 +2273,8 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)):
|
||||
"lmax": status.lmax,
|
||||
"lmin": status.lmin,
|
||||
"lpeak": status.lpeak,
|
||||
"ln1": status.ln1,
|
||||
"ln2": status.ln2,
|
||||
"battery_level": status.battery_level,
|
||||
"power_source": status.power_source,
|
||||
"sd_remaining_mb": status.sd_remaining_mb,
|
||||
|
||||
+76
-31
@@ -46,6 +46,8 @@ class NL43Snapshot:
|
||||
lmax: Optional[str] = None # Maximum level
|
||||
lmin: Optional[str] = None # Minimum level
|
||||
lpeak: Optional[str] = None # Peak level
|
||||
ln1: Optional[str] = None # Percentile slot LN1 (configurable; device default L5, contract L1)
|
||||
ln2: Optional[str] = None # Percentile slot LN2 (configurable; device default L10)
|
||||
battery_level: Optional[str] = None
|
||||
power_source: Optional[str] = None
|
||||
sd_remaining_mb: Optional[str] = None
|
||||
@@ -69,10 +71,27 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
|
||||
|
||||
logger.info(f"State transition check for {s.unit_id}: '{previous_state}' -> '{new_state}'")
|
||||
|
||||
# Device returns "Start" when measuring, "Stop" when stopped
|
||||
# Normalize to previous behavior for backward compatibility
|
||||
is_measuring = new_state == "Start"
|
||||
was_measuring = previous_state == "Start"
|
||||
# The device reports "Start" while measuring; the DOD path uses that string,
|
||||
# but the DRD stream path tags snapshots "Measure" (and the DOD fallback also
|
||||
# uses "Measure"). Treat ALL of these as "measuring" — otherwise opening and
|
||||
# closing the live stream flips state "Start"->"Measure"->"Start", which the
|
||||
# old equality check misread as stop-then-start and reset measurement_start_time.
|
||||
#
|
||||
# Also: only act on RECOGNIZED states. A buffer desync on the shared connection
|
||||
# (e.g. right after a DRD/DOD test) can make a Measure? read return a stray,
|
||||
# garbled value; treating that as "not measuring" produced constant phantom
|
||||
# "STOPPED -> STARTED" log pairs and reset the timer. Ignore unknown reads.
|
||||
MEASURING_STATES = {"Start", "Measure"}
|
||||
STOPPED_STATES = {"Stop"}
|
||||
was_measuring = previous_state in MEASURING_STATES
|
||||
|
||||
if new_state in MEASURING_STATES:
|
||||
is_measuring = True
|
||||
elif new_state in STOPPED_STATES:
|
||||
is_measuring = False
|
||||
else:
|
||||
logger.warning(f"Ignoring unrecognized measurement state for {s.unit_id}: {new_state!r}")
|
||||
is_measuring = was_measuring # garbled/unknown read — no transition
|
||||
|
||||
if not was_measuring and is_measuring:
|
||||
# Measurement just started - record the start time
|
||||
@@ -95,13 +114,18 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
row.measurement_state = new_state
|
||||
# Only persist a recognized state so one garbled read can't poison the next
|
||||
# transition check (which would manufacture the phantom STOPPED/STARTED pair).
|
||||
if new_state in MEASURING_STATES or new_state in STOPPED_STATES:
|
||||
row.measurement_state = new_state
|
||||
row.counter = s.counter
|
||||
row.lp = s.lp
|
||||
row.leq = s.leq
|
||||
row.lmax = s.lmax
|
||||
row.lmin = s.lmin
|
||||
row.lpeak = s.lpeak
|
||||
row.ln1 = s.ln1
|
||||
row.ln2 = s.ln2
|
||||
row.battery_level = s.battery_level
|
||||
row.power_source = s.power_source
|
||||
row.sd_remaining_mb = s.sd_remaining_mb
|
||||
@@ -656,10 +680,12 @@ class NL43Client:
|
||||
else:
|
||||
raise ValueError(f"Unknown result code: {result_code}")
|
||||
|
||||
async def request_dod(self) -> NL43Snapshot:
|
||||
async def request_dod(self, measurement_state: Optional[str] = None) -> NL43Snapshot:
|
||||
"""Request DOD (Data Output Display) snapshot from device.
|
||||
|
||||
Returns parsed measurement data from the device display.
|
||||
Returns parsed measurement data from the device display. Pass
|
||||
measurement_state to reuse a cached run state and skip the extra Measure?
|
||||
round-trip (the state changes rarely); leave it None to query it.
|
||||
"""
|
||||
# _send_command now handles result code validation and returns the data line
|
||||
resp = await self._send_command("DOD?\r\n")
|
||||
@@ -682,31 +708,41 @@ class NL43Client:
|
||||
|
||||
logger.info(f"Parsed {len(parts)} data points from DOD response")
|
||||
|
||||
# Query actual measurement state (DOD doesn't include this information)
|
||||
try:
|
||||
measurement_state = await self.get_measurement_state()
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get measurement state, defaulting to 'Measure': {e}")
|
||||
measurement_state = "Measure"
|
||||
# DOD doesn't include the run state. Query it only when not supplied by the
|
||||
# caller — the monitor passes a cached state most cycles and refreshes it
|
||||
# occasionally, avoiding a second rate-limited command on every poll.
|
||||
if measurement_state is None:
|
||||
try:
|
||||
measurement_state = await self.get_measurement_state()
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get measurement state, defaulting to 'Measure': {e}")
|
||||
measurement_state = "Measure"
|
||||
|
||||
snap = NL43Snapshot(unit_id="", raw_payload=resp, measurement_state=measurement_state)
|
||||
|
||||
# Parse known positions (based on NL43 communication guide - DRD format)
|
||||
# DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ...
|
||||
# Parse DOD positional fields. DOD's layout is DIFFERENT from DRD: it has NO
|
||||
# leading counter and it includes LE plus LN1–LN5. The device returns 4 channels
|
||||
# of 16 fields each — [Lp, Leq, LE, Lmax, Lmin, LN1, LN2, LN3, LN4, LN5, Lpeak,
|
||||
# LIeq, Leq_mov, Ltm5, over, under] — and channel 1 (parts[0:16]) is the main
|
||||
# display. The previous code reused the DRD map (treating parts[0] as a counter),
|
||||
# which shifted everything: Lp was reported as the counter, Leq as Lp, LE as Leq,
|
||||
# and LN1 as Lpeak (you could spot it because "Lpeak" came out < Lmax).
|
||||
try:
|
||||
# Capture d0 (counter) for timer synchronization
|
||||
if len(parts) >= 1:
|
||||
snap.counter = parts[0] # d0: Measurement interval counter (1-600)
|
||||
snap.lp = parts[0] # Lp: instantaneous sound pressure level
|
||||
if len(parts) >= 2:
|
||||
snap.lp = parts[1] # d1: Instantaneous sound pressure level
|
||||
if len(parts) >= 3:
|
||||
snap.leq = parts[2] # d2: Equivalent continuous sound level
|
||||
snap.leq = parts[1] # Leq: equivalent continuous level
|
||||
# parts[2] = LE (sound exposure level) — not currently surfaced
|
||||
if len(parts) >= 4:
|
||||
snap.lmax = parts[3] # d3: Maximum level
|
||||
snap.lmax = parts[3] # Lmax
|
||||
if len(parts) >= 5:
|
||||
snap.lmin = parts[4] # d4: Minimum level
|
||||
snap.lmin = parts[4] # Lmin
|
||||
if len(parts) >= 11:
|
||||
snap.lpeak = parts[10] # Lpeak (parts[5] is LN1, NOT Lpeak)
|
||||
if len(parts) >= 6:
|
||||
snap.lpeak = parts[5] # d5: Peak level
|
||||
snap.ln1 = parts[5] # LN1 percentile slot (device default L5; contract L1)
|
||||
if len(parts) >= 7:
|
||||
snap.ln2 = parts[6] # LN2 percentile slot (device default L10)
|
||||
except (IndexError, ValueError) as e:
|
||||
logger.warning(f"Error parsing DOD data points: {e}")
|
||||
|
||||
@@ -896,15 +932,20 @@ class NL43Client:
|
||||
# Acquire per-device lock - held for entire streaming session
|
||||
device_lock = await _get_device_lock(self.device_key)
|
||||
async with device_lock:
|
||||
# Evict any cached connection — streaming needs its own dedicated socket
|
||||
await _connection_pool.discard(self.device_key)
|
||||
await self._enforce_rate_limit()
|
||||
|
||||
logger.info(f"Starting DRD stream for {self.device_key}")
|
||||
|
||||
# Reuse the pooled connection instead of discard()+reopen. The NL43
|
||||
# allows only ONE TCP connection at a time, and on a cellular link the
|
||||
# device does not free its single slot fast enough for an immediate
|
||||
# reconnect — so a fresh connect times out (the DRD stream failure).
|
||||
# The per-device lock is held for the whole session, so it already
|
||||
# blocks the poller; reusing the warm socket keeps us at exactly one
|
||||
# connection and lets the stream start on the slot commands already use.
|
||||
try:
|
||||
reader, writer = await _connection_pool._open_connection(
|
||||
self.host, self.port, self.timeout
|
||||
reader, writer, from_cache = await _connection_pool.acquire(
|
||||
self.device_key, self.host, self.port, self.timeout
|
||||
)
|
||||
except ConnectionError:
|
||||
logger.error(f"DRD stream connection failed to {self.device_key}")
|
||||
@@ -981,16 +1022,20 @@ class NL43Client:
|
||||
break
|
||||
|
||||
finally:
|
||||
# Send SUB character to stop streaming
|
||||
# Stop streaming on the device (SUB = 0x1A), then return the warm
|
||||
# connection to the pool so subsequent commands reuse this single
|
||||
# socket instead of opening a second one. release() returns healthy
|
||||
# sockets to the pool and closes dead ones; the next acquire()
|
||||
# drains any residual stop output before reuse.
|
||||
try:
|
||||
writer.write(b"\x1A")
|
||||
await writer.drain()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
writer.close()
|
||||
with contextlib.suppress(Exception):
|
||||
await writer.wait_closed()
|
||||
await _connection_pool.release(
|
||||
self.device_key, reader, writer, self.host, self.port
|
||||
)
|
||||
|
||||
logger.info(f"DRD stream ended for {self.device_key}")
|
||||
|
||||
|
||||
Binary file not shown.
@@ -1,9 +0,0 @@
|
||||
2026-03-12 21:29:42,683 - app.main - INFO - Database tables initialized
|
||||
2026-03-12 21:29:42,684 - app.main - INFO - CORS allowed origins: ['*']
|
||||
2026-03-12 21:29:42,703 - app.main - INFO - Starting TCP connection pool cleanup task...
|
||||
2026-03-12 21:29:42,703 - app.services - INFO - Connection pool cleanup task started
|
||||
2026-03-12 21:29:42,703 - app.main - INFO - Starting background poller...
|
||||
2026-03-12 21:29:42,703 - app.background_poller - INFO - Background poller task created
|
||||
2026-03-12 21:29:42,703 - app.main - INFO - Background poller started
|
||||
2026-03-12 21:29:42,703 - app.background_poller - INFO - Background polling loop started
|
||||
2026-03-12 21:29:42,708 - app.background_poller - INFO - [POOL] No active connections in pool
|
||||
@@ -0,0 +1,58 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Migration script to add ln1 and ln2 percentile columns to the nl43_status table.
|
||||
|
||||
The NL-43 DOD response carries percentile slots LN1-LN5; the live SLM display
|
||||
(Terra-View) shows two of them (default L1/L10). This adds storage for the two
|
||||
surfaced slots. Run once per database to update existing schema.
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
DB_PATH = Path(__file__).parent / "data" / "slmm.db"
|
||||
|
||||
|
||||
def migrate():
|
||||
"""Add ln1 and ln2 columns to the nl43_status table."""
|
||||
|
||||
if not DB_PATH.exists():
|
||||
print(f"Database not found at {DB_PATH}")
|
||||
print("No migration needed - database will be created with new schema")
|
||||
return
|
||||
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute("PRAGMA table_info(nl43_status)")
|
||||
columns = [row[1] for row in cursor.fetchall()]
|
||||
|
||||
if "ln1" in columns and "ln2" in columns:
|
||||
print("✓ ln1/ln2 columns already exist, no migration needed")
|
||||
return
|
||||
|
||||
if "ln1" not in columns:
|
||||
print("Adding ln1 column...")
|
||||
cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln1 TEXT")
|
||||
print("✓ Added ln1 column")
|
||||
|
||||
if "ln2" not in columns:
|
||||
print("Adding ln2 column...")
|
||||
cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln2 TEXT")
|
||||
print("✓ Added ln2 column")
|
||||
|
||||
conn.commit()
|
||||
print("\n✓ Migration completed successfully!")
|
||||
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
print(f"✗ Migration failed: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
migrate()
|
||||
@@ -0,0 +1,48 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Migration: add monitor_enabled column to nl43_config.
|
||||
|
||||
Controls whether the live fan-out DOD monitor is kept alive 24/7 for a unit
|
||||
(which is what makes alerting continuous). Defaults to enabled. Run once per DB.
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
DB_PATH = Path(__file__).parent / "data" / "slmm.db"
|
||||
|
||||
|
||||
def migrate():
|
||||
if not DB_PATH.exists():
|
||||
print(f"Database not found at {DB_PATH}")
|
||||
print("No migration needed - database will be created with new schema")
|
||||
return
|
||||
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
cursor.execute("PRAGMA table_info(nl43_config)")
|
||||
columns = [row[1] for row in cursor.fetchall()]
|
||||
|
||||
if "monitor_enabled" in columns:
|
||||
print("✓ monitor_enabled column already exists, no migration needed")
|
||||
return
|
||||
|
||||
print("Adding monitor_enabled column (default enabled)...")
|
||||
# SQLite stores booleans as 0/1; default 1 = enabled.
|
||||
cursor.execute("ALTER TABLE nl43_config ADD COLUMN monitor_enabled BOOLEAN DEFAULT 1")
|
||||
conn.commit()
|
||||
print("✓ Added monitor_enabled column")
|
||||
print("\n✓ Migration completed successfully!")
|
||||
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
print(f"✗ Migration failed: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
migrate()
|
||||
@@ -0,0 +1,68 @@
|
||||
"""
|
||||
Synthetic unit test for the alert state machine — no DB, no device.
|
||||
|
||||
Drives `_evaluate_step` with a fake clock + a level series and checks that
|
||||
onset/clear fire with the right debounce + hysteresis. Run:
|
||||
|
||||
docker compose exec -T slmm python3 test_alert_evaluator.py
|
||||
# or, if app.alerts imports cleanly standalone: python3 test_alert_evaluator.py
|
||||
"""
|
||||
|
||||
from types import SimpleNamespace
|
||||
from app.alerts import RuleState, _evaluate_step
|
||||
|
||||
|
||||
def rule(**kw):
|
||||
base = dict(threshold_db=85.0, duration_s=3, clear_margin_db=2.0, comparison="above")
|
||||
base.update(kw)
|
||||
return SimpleNamespace(**base)
|
||||
|
||||
|
||||
def run(series, r):
|
||||
st = RuleState()
|
||||
events = [(now, a) for value, now in series
|
||||
if (a := _evaluate_step(st, value, now, r))]
|
||||
return events, st
|
||||
|
||||
|
||||
def main():
|
||||
failures = 0
|
||||
|
||||
def check(label, cond, detail=""):
|
||||
nonlocal failures
|
||||
print(("PASS" if cond else "FAIL"), label, detail)
|
||||
if not cond:
|
||||
failures += 1
|
||||
|
||||
# 1) sustained exceedance -> onset after duration; recovery -> clear after duration
|
||||
r = rule(threshold_db=85, duration_s=3, clear_margin_db=2)
|
||||
ev, _ = run([(80, 0), (86, 1), (87, 2), (88, 3), (88, 4),
|
||||
(88, 5), (82, 6), (82, 7), (82, 8), (82, 9)], r)
|
||||
onsets = [t for t, a in ev if a == "onset"]
|
||||
clears = [t for t, a in ev if a == "clear"]
|
||||
check("1 sustained onset@4 / clear@9", onsets == [4] and clears == [9], str(ev))
|
||||
|
||||
# 2) brief spike under duration -> no onset (debounce)
|
||||
ev, _ = run([(80, 0), (90, 1), (90, 2), (80, 3), (80, 4)], rule(duration_s=3))
|
||||
check("2 brief spike debounced", ev == [], str(ev))
|
||||
|
||||
# 3) hysteresis: a dip into the margin (below threshold, above threshold-margin)
|
||||
# does NOT clear
|
||||
r = rule(threshold_db=85, duration_s=0, clear_margin_db=3)
|
||||
ev, st = run([(86, 0), (84, 1), (84, 2), (84, 3)], r)
|
||||
check("3 hysteresis holds ACTIVE", ev == [(0, "onset")] and st.phase == "active",
|
||||
f"{ev} phase={st.phase}")
|
||||
|
||||
# 4) 'below' comparison (device too quiet) -> onset when value < threshold
|
||||
ev, _ = run([(30, 0), (15, 1)], rule(threshold_db=20, duration_s=0,
|
||||
clear_margin_db=2, comparison="below"))
|
||||
check("4 below-comparison onset@1", ev == [(1, "onset")], str(ev))
|
||||
|
||||
print()
|
||||
print("ALL PASS" if failures == 0 else f"{failures} FAILURE(S)")
|
||||
return failures
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
sys.exit(1 if main() else 0)
|
||||
Reference in New Issue
Block a user