27 Commits

Author SHA1 Message Date
serversdown 89b6892656 Merge pull request 'Update to v 0.4.0' (#6) from dev into main
Reviewed-on: #6
2026-06-22 18:07:36 -04:00
serversdown 43b8e53d2d chore: version bump 2026-06-22 20:54:43 +00:00
serversdown 6d1c426ee4 fix: recognize 'Start' state when confirming measurement start
The /start handler waited for measurement_state == "Measure", but the
device reports "Start" while measuring. The confirmation check therefore
never matched, so the post-start status loop always ran its full 3x DOD
retry cycle over cellular, pushing the call past ~10s. That blew past the
Terra-View proxy's request timeout and surfaced to users as a misleading
"Unknown error" even though the unit had already started recording.

Match the device's actual reported state (and stay consistent with
persist_snapshot's MEASURING_STATES handling) so /start confirms on the
first attempt and returns promptly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-21 20:22:39 +00:00
serversdown ad6071b790 fix(alerts): reset rule state + close open event on rule edit/delete
invalidate() only dropped the rule cache, not the per-(unit,rule) state machine —
so editing a rule's metric/threshold left a stale 'active' phase that mis-evaluated
against the new config (spurious clear, or suppressed onset), and deleting an
in-alarm rule left an open AlertEvent that kept the client portal stuck "in alarm"
forever. update/delete now call _reset_rule_runtime: forget_rule() drops the state
machine and any open event for that rule is closed.

Verified: existing evaluator tests + cooldown scenario still pass; compiles.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 23:40:52 +00:00
serversdown cfdeada9d6 fix(alerts): enforce cooldown_s between onsets
cooldown_s was stored + shown in the UI but never read, so a repeatedly-breaching
signal (e.g. intermittent traffic noise) would flood the alert history with an
event per spike. The evaluator now suppresses a new onset within cooldown_s of the
last, holding the edge so it fires the moment the window lapses if still breaching.
Hysteresis still gates clears. getattr-guarded so partial rule fixtures don't crash.

Verified: existing 4 evaluator tests pass; cooldown scenario (onset → clear →
suppressed re-breach → onset after window) passes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 22:47:39 +00:00
serversdown b51fefca2b feat(alerts): enabled rules pin the monitor on (24/7 evaluation)
The evaluator only runs inside the monitor loop, so a rule on an idle device
never fired. Now creating/updating/deleting an alert rule calls
_sync_keepalive_to_rules: if the unit has any enabled rule, persist
NL43Config.monitor_enabled=True (so the boot auto-start re-enables it after a
restart) and turn on runtime keepalive. Never auto-OFF — a device may be kept
alive for other reasons; operators control that on /admin/slmm. Alert CRUD
endpoints are now async to await the monitor manager.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 19:36:16 +00:00
serversdown 5bc542e92f fix(monitor): quiet send-after-close race on WS disconnect
When a monitor subscriber disconnects mid-frame (the client portal closes its
stream on every tab switch via the Page Visibility guard), the loop could pull a
queued payload during the 1s wait and then send_json into an already-closing
socket -> "Unexpected ASGI message 'websocket.send' after ... websocket.close",
logged as a WARNING on every disconnect.

Re-check gone.done() after the queue wait and break before sending; treat the
residual send-after-close as expected (debug, not warning). No behavior change —
the connection was already closing as intended; this just stops the log spam.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 03:29:16 +00:00
serversdown 1f5f1fb1f6 feat(monitor): adaptive poll rate, unreachable backoff, device-offline alert
Three changes to cut wasted device/cellular load and surface outages:

- Adaptive interval: full-rate (~1.25s) while a browser is subscribed for a
  smooth chart; relaxed cadence (MONITOR_IDLE_POLL_INTERVAL, default 10s) when
  the feed is keepalive-only (alerting). ~8x fewer polls with no viewer ->
  ~8x less cellular traffic on a metered SIM. Note: idle interval also sets the
  alert sampling resolution when nobody is watching.
- Exponential backoff when the device is unreachable (1->2->...->60s cap),
  reset on the first good poll, so a dead/asleep device stops churning
  reconnects (log spam + wasted SYN traffic). Capped at 5s while a browser is
  watching so a recovery still surfaces quickly.
- Device-offline alert: the reachable->unreachable transition raises a
  connectivity AlertEvent (sentinel rule_id=0, metric="connectivity") through
  the existing evaluator/dispatch seam; recovery clears it. Deduped in memory
  and via the DB (so a restart mid-outage doesn't duplicate the event).

MonitorManager.status() now reports reachable + current mode (watched/idle/
backoff) for observability.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 06:47:20 +00:00
serversdown b4cea2f287 feat: include measurement_start_time in cached /status response
So consumers (e.g. the command center) can read the elapsed-time clock from
the cached status instead of a fresh device /live read. Added to both the
GET and POST /status data dicts.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 22:57:45 +00:00
serversdown d1d694302c feat: downsampled DOD trail + history endpoint for live-chart backfill
So a viewer sees recent trend on open instead of a blank chart. Viewing
only — reports still use the device's FTP .rnd data.

- NL43Reading table (auto-creates; no migration): unit_id, timestamp,
  lp/leq/lmax/ln1/ln2.
- Monitor stores one downsampled reading per MONITOR_TRAIL_SAMPLE_S
  (default 60s) from its keepalive poll loop, pruning rows older than
  MONITOR_TRAIL_RETENTION_HOURS (default 24h). ~1440 rows/unit max.
- GET /api/nl43/{unit}/history?hours=N -> the trail for the last N hours
  (clamped 0.1-48h), oldest-first.

Because keepalive runs 24/7, the trail fills continuously, so the history
is there whenever someone opens the live view.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 19:58:30 +00:00
serversdown 43e72ae3c3 feat: persistent monitor_enabled flag + auto-start keepalive on boot
Makes live monitoring (and therefore alerting) genuinely 24/7 and
restart-surviving, instead of runtime-only keepalive.

- NL43Config.monitor_enabled (default True) + migrate_add_monitor_enabled.py.
- On startup, auto-start keepalive monitors for every monitor_enabled +
  tcp_enabled unit — so feeds/alerts resume after a restart with no manual step.
- /monitor/start and /monitor/stop now PERSIST monitor_enabled (start=True,
  stop=False) in addition to applying keepalive at runtime, so the toggle
  sticks. Roster output includes monitor_enabled for the admin UI to read.

On by default: configure a unit -> it's monitored 24/7 unless toggled off.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 19:27:25 +00:00
serversdown 9d34779171 perf: monitor caches run state, ~halving live-feed latency
Each monitor poll was sending DOD? + Measure? (two commands), and the NL43
enforces >=1s between commands, so updates were ~2.5s apart. The run state
changes rarely, so cache it and refresh via Measure? only every
MONITOR_STATE_REFRESH_S (default 30s); most polls now send just DOD? (one
rate-limited command) -> ~1.3s/update. Also trim MONITOR_POLL_INTERVAL to
0.25s since the device rate-limit is the real pacer.

request_dod() gains an optional measurement_state arg: when supplied it
reuses that state and skips the Measure? round-trip; None preserves the old
query-every-time behavior.

~1Hz is the device floor for DOD (the >=1s command spacing); DRD's 10Hz
push isn't reachable via polling, but ~1s is a normal cadence for SLM levels.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 18:52:13 +00:00
serversdown 87c06f1519 Merge pull request 'merge drd-fix into dev' (#5) from feat/drd-fix into dev
Reviewed-on: #5
2026-06-09 14:21:16 -04:00
serversdown ba622c67d8 feat: monitor heartbeat + background poller skips active-monitored units
- Heartbeat: if nothing has been broadcast in MONITOR_HEARTBEAT_S (default
  25s) — e.g. device offline and silent — send a non-cached keepalive frame
  so a reverse proxy (NPM) doesn't drop the idle WS. New subscribers still
  get the last real frame, not a heartbeat.
- Poller-skip: the 60s background poller now skips any unit with a running
  monitor (MonitorManager.is_active). The monitor already polls it ~1Hz and
  keeps the status cache fresh, so the background poll was redundant and just
  added load/lock-contention on the device's single connection (and churn,
  which matters for the cellular wedge). Trade-off: the FTP start-time sync
  (only in the poller) doesn't run while a unit is actively monitored — fine,
  since reports take the authoritative start time from the FTP .rnd data.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 17:33:29 +00:00
serversdown 6b1ec75396 feat: harden fan-out for live clients — instant first frame + offline status
For multiple clients connecting to a live feed (e.g. the client portal):
- cache the last broadcast frame and replay it to a new subscriber on
  connect, so a client sees data immediately instead of waiting a full
  poll cycle.
- broadcast a {"feed_status":"unreachable"} frame once on transition (after
  3 consecutive poll failures) so clients can render an offline state
  instead of a frozen chart; data frames now carry "feed_status":"ok".
  The cached frame reflects current state, so a client connecting while
  offline gets "unreachable" right away too.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 17:13:21 +00:00
serversdown 9c43e68534 feat: alert engine stage 1 — rules, events, state machine, CRUD
Replaces the POC single-threshold check with a real per-rule engine over
the live monitor feed.

- AlertRule / AlertEvent tables (auto-created via create_all; no migration).
  Rule = {metric, comparison, threshold_db, duration_s, clear_margin_db,
  schedule, channels, recipients}.
- alerts.py: per-(unit,rule) state machine IDLE->ACTIVE->IDLE with duration
  debounce (both edges) + clear_margin hysteresis; onset/clear are distinct
  events; optional nighttime schedule; rule cache w/ invalidation. The
  state-machine core (_evaluate_step) is pure (no DB/clock) for testing.
- Dispatch is a server log (POC); _dispatch() is the seam for a Terra-View
  webhook (email/SMS) later.
- CRUD: POST/GET/PUT/DELETE /{unit}/alerts/rules, GET /{unit}/alerts/events,
  POST /{unit}/alerts/events/{id}/ack.
- test_alert_evaluator.py: synthetic level series proves onset debounce,
  spike rejection, hysteresis hold, and below-comparison (4/4 pass, no device).

Source-agnostic: the same rules transfer unchanged if a unit's feed is later
sourced from FTP intervals instead of the DOD monitor.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 01:04:03 +00:00
serversdown aa3e088b64 feat: per-device live monitor (fan-out) + alert evaluator (POC)
The piece the live-view + alerting work was building toward.

monitor.py — one DOD poll loop per device, broadcast to many subscribers:
- browser WebSockets (fixes the single-connection "second viewer sees
  nothing" contention — browsers no longer each open a device stream)
- the alert evaluator (can keep a feed running with no browser via
  /monitor/start, so alerting runs continuously)
- persistence (each snapshot written like the poller)
DOD-sourced, so the broadcast carries ln1/ln2 (which DRD cannot). All polls
go through the existing per-device lock + pool, so it serializes safely with
the background poller and on-demand commands.

alerts.py — pluggable POC evaluator: fires (logs) when ALERT_METRIC exceeds
ALERT_THRESHOLD_DB with an ALERT_COOLDOWN_SECONDS cooldown. The rule
(instantaneous vs sustained vs L10) is the single swap point; dispatch is a
server log for now (email/SMS later).

Endpoints:
- WS   /api/nl43/{unit_id}/monitor          subscribe to the shared feed
- POST /api/nl43/{unit_id}/monitor/start    keep feed alive w/o a browser
- POST /api/nl43/{unit_id}/monitor/stop     drop the keep-alive
- GET  /api/nl43/_monitor/status            running/subscribers/keepalive

WS endpoint races queue.get() against a disconnect watcher so an idle feed
still detects client drop and doesn't leak a subscription.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 23:27:05 +00:00
serversdown 8c17af4849 fix: ignore garbled measurement-state reads (phantom STOPPED/STARTED)
A buffer desync on the shared persistent connection (commonly right after
a DRD/DOD test) can make a Measure? read return a stray value. The state
classifier treated anything not in {"Start","Measure"} as "not measuring",
so a garbled read logged a phantom STOPPED, the next clean read logged
STARTED, and that reset measurement_start_time — producing constant
STOPPED/STARTED device-log pairs and a drifting elapsed timer.

Now only recognized states drive transitions: {"Start","Measure"} =
measuring, {"Stop"} = stopped, anything else = no change. Garbled reads
are also not persisted as the cached state, so they can't poison the next
transition check. Builds on the earlier Start<->Measure normalization.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 22:50:18 +00:00
serversdown b954eb8c89 feat: per-unit deactivate and global SLMM standby
Lets an instance stop occupying a device's single TCP connection slot so
another instance (e.g. prod) can take over.

Per-unit:
- POST /api/nl43/{unit_id}/deactivate — poll_enabled=False (persisted) +
  drop the connection (waits up to 10s for in-flight ops via the device
  lock, then discards). Unit stays dormant across restarts.
- POST /api/nl43/{unit_id}/activate — re-enable polling.

Global standby:
- POST /api/nl43/_system/standby — poller idles and releases ALL
  connections; the loop keeps re-releasing so the instance holds no slots.
- POST /api/nl43/_system/resume — resume polling.
- GET  /api/nl43/_system/status — active vs standby + active_connections.
- SLMM_POLLING_ENABLED=false starts an instance in standby (persistent
  way to keep a dev box from latching onto a prod-owned device).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 22:45:52 +00:00
serversdown 0793e7df01 feat: add per-device disconnect endpoint
POST /api/nl43/{unit_id}/disconnect cleanly closes (TCP FIN + wait_closed)
and drops the pooled connection for a single device, freeing the NL43's
one connection slot. Previously only /_connections/flush existed, which
tears down every device at once.

Idempotent; no-op if nothing is cached. Releases the idle pooled
connection only — an active DRD stream/command has the socket checked out
of the pool, so close the stream WebSocket to end a live stream.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 22:40:56 +00:00
serversdown 51dd6b682d feat: surface LN1/LN2 (L1/L10) percentiles through SLMM
Completes the SLMM side of the L1/L10 live-display contract. The NL-43's
DOD response carries percentile slots LN1-LN5 (channel 1, parts[5]/[6]);
parse the first two and expose them as ln1/ln2 end to end:

- NL43Snapshot dataclass: ln1/ln2 fields
- NL43Status model: ln1/ln2 columns (+ migrate_add_ln_percentiles.py)
- DOD parser: snap.ln1=parts[5], snap.ln2=parts[6]
- persist_snapshot writes them
- all /status data dicts, StatusPayload, and the DRD stream payload emit
  ln1/ln2 (null on the DRD stream itself, which doesn't carry percentiles)

Labels: device LN1 defaults to L5, not L1 — Terra-View defaults the label
to L1/L10, so the device's Ln1/Ln2 slots must be set to 1%/10% for the
labels to be accurate (dynamic label emission is a follow-up).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 22:01:31 +00:00
serversdown a7983d2958 fix: correct DOD field parsing and stop measurement-time resets
Two device-data bugs surfaced while scoping the live-feed work:

1. DOD parser misalignment. DOD's response has no leading counter and
   includes LE + LN1-LN5, but the parser reused the DRD field map
   (parts[0]=counter). That shifted everything: Lp was stored as the
   counter, Leq as Lp, LE as Leq, and LN1 as Lpeak (visible because
   "Lpeak" came out below Lmax, which is impossible). Parse DOD with its
   own map: Lp=0, Leq=1, Lmax=3, Lmin=4, Lpeak=10 (channel 1 = main).

2. measurement_start_time reset on every live-stream open/close. The DOD
   path tags state "Start"; the DRD stream path tags "Measure". The
   transition detector treated only "Start" as measuring, so opening the
   stream ("Start"->"Measure") read as a stop (cleared start time) and
   closing it ("Measure"->"Start") read as a start (reset to now). Every
   viewer reset the elapsed measurement time. Treat {"Start","Measure"}
   both as measuring.

LN1/LN2 (L1/L10) parsing + model/serialization is the next step.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 21:53:00 +00:00
serversdown d6dd2e736b Merge pull request 'fix: improve connection pool idle and max age checks to allow disabling' (#3) from dev-persistent into main
Reviewed-on: #3
2026-06-08 16:56:33 -04:00
serversdown af86cf713e fix: reuse pooled TCP connection for DRD streaming
stream_drd() discarded the pooled connection and forced a fresh connect.
The NL43 allows only one TCP connection at a time; over a cellular link
the device does not free its single slot fast enough for an immediate
reconnect, so the fresh connect times out — the live DRD stream fails
while start/stop commands (which reuse the warm pooled socket) keep
working. This surfaced once the persistent connection pool was enabled
(TCP_PERSISTENT_ENABLED=true).

Stream over the already-open pooled connection via acquire() instead of
discard()+_open_connection(), and release() it back to the pool on exit
(after sending SUB to stop the stream) so commands keep reusing the same
single socket. The per-device lock is held for the whole streaming
session, so the poller can't touch the socket concurrently.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 19:00:35 +00:00
serversdown e3f9ca7f5b fix: use request-first TemplateResponse signature
Modern Starlette requires `request` as the first positional arg to
TemplateResponse. The old `TemplateResponse(name, context)` form caused
the context dict to be passed as the template name, which Jinja2 then
tried to use as a cache key -> TypeError: unhashable type: 'dict' (500
on GET / and /roster).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 17:59:39 +00:00
serversdown 450509d210 stop tracking dev runtime data 2026-03-12 22:46:37 +00:00
serversdown ad1a40e0aa Merge pull request 'v0.3.0, persistent polling update. Persistent TCP connection pool with all features Connection pool diagnostics (API + UI) All 6 new environment variables Changes to health check, diagnostics, and DRD streaming Technical architecture details and cellular' (#2) from dev-persistent into main
Reviewed-on: #2
2026-02-16 21:57:37 -05:00
14 changed files with 1575 additions and 56 deletions
+46
View File
@@ -5,6 +5,52 @@ All notable changes to SLMM (Sound Level Meter Manager) will be documented in th
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.4.0] - 2026-06-22
### Added
#### Live Monitor (fan-out feed)
- **Per-device fan-out monitor** - one shared, cached live feed per device. Multiple clients (dashboards, portal, charts) subscribe to the same stream instead of each fighting for the NL-43's single TCP connection: one poller reads the device, all subscribers get the same frames.
- **WebSocket monitor** - `WS /api/nl43/{unit_id}/monitor` delivers an instant first frame from cache, then live updates.
- **Monitor control** - `POST /api/nl43/{unit_id}/monitor/{start|stop}`, `GET /api/nl43/_monitor/status`. A persistent `monitor_enabled` flag auto-starts the keepalive on boot.
- **Adaptive polling** - poll rate adapts to demand; unreachable devices back off; a device-offline alert fires when a monitored unit drops.
- **De-duplication** - the background poller skips units already covered by an active monitor (no double-polling); a heartbeat keeps the feed warm.
- **Lower latency** - the monitor caches run state, roughly halving live-feed latency; fan-out emits an instant first frame + offline status to new clients.
#### Alert Engine
- **Threshold rules** - per-device alert rules (metric + threshold + cooldown) with full CRUD: `POST/GET/PUT/DELETE /api/nl43/{unit_id}/alerts/rules[/{rule_id}]`.
- **Events + state machine** - onset/clear tracking via `GET /api/nl43/{unit_id}/alerts/events`; acknowledge with `POST .../events/{event_id}/ack`. A `cooldown_s` is enforced between onsets.
- **24/7 evaluation** - enabled rules pin the monitor on, so rules evaluate continuously even with no UI client connected.
- **Resilience** - editing or deleting a rule resets its state and closes any open event; device-offline events are raised when a monitored unit goes unreachable.
#### Data & History
- **Live-chart backfill** - a downsampled DOD trail is persisted to a new `nl43_readings` table, exposed via `GET /api/nl43/{unit_id}/history` so charts can backfill recent history on load.
- **LN1/LN2 percentiles** - L1/L10 (configurable percentiles) surfaced through SLMM in the status and live-feed payloads.
- **measurement_start_time** included in the cached `/status` response.
#### Device control
- **Per-device disconnect** - `POST /api/nl43/{unit_id}/disconnect` drops a device's pooled connection.
- **Deactivate / standby** - `POST /api/nl43/{unit_id}/deactivate` and global `POST /api/nl43/_system/standby` to quiesce polling/monitoring.
### Changed
- **DRD streaming reuses the pooled connection** rather than opening a separate socket, avoiding contention with the persistent pool on a single-connection device.
- **Connection pool** - idle-TTL / max-age checks can now be disabled; pool status is logged periodically.
### Fixed
- **Measurement-start confirmation** - `/start` now recognizes the device's `Start` state. It previously waited for `Measure`, which never matched, so the start cycle ran the full retry loop and Terra-View's proxy timed out with a misleading "Unknown error" even though the device had started.
- **Garbled reads** - corrupted measurement-state reads that produced phantom STOPPED/STARTED transitions are now ignored.
- **DOD parsing** - corrected field parsing and stopped spurious measurement-time resets.
- **Monitor WebSocket** - quieted a send-after-close race on client disconnect.
### Database
- **New tables** (auto-created on startup via `Base.metadata.create_all`): `alert_rules`, `alert_events`, `nl43_readings`.
- **Migrations for existing tables** (run once per database): `migrate_add_ln_percentiles.py` (LN1/LN2 on `nl43_status`), `migrate_add_monitor_enabled.py` (`monitor_enabled` on `nl43_config`).
### Notes
- Pairs with the matching Terra-View `dev` build, which reads SLMM's `/monitor` fan-out feed for live SLM dashboards (L1/L10 lines, live-chart backfill). Ship the two together.
---
## [0.3.0] - 2026-02-17 ## [0.3.0] - 2026-02-17
### Added ### Added
+81 -6
View File
@@ -1,6 +1,6 @@
# SLMM - Sound Level Meter Manager # SLMM - Sound Level Meter Manager
**Version 0.3.0** **Version 0.4.0**
Backend API service for controlling and monitoring Rion NL-43/NL-53 Sound Level Meters via TCP and FTP protocols. Backend API service for controlling and monitoring Rion NL-43/NL-53 Sound Level Meters via TCP and FTP protocols.
@@ -12,6 +12,9 @@ SLMM is a standalone backend module that provides REST API routing and command t
## Features ## Features
- **Live Monitor (fan-out)**: One shared cached live feed per device — many clients subscribe to the same stream instead of fighting over the meter's single TCP connection
- **Alert Engine**: Per-device threshold rules with onset/clear events, cooldowns, acks, and 24/7 evaluation
- **History & Percentiles**: Downsampled DOD trail + history endpoint for live-chart backfill; LN1/LN2 (L1/L10) percentiles surfaced through the feed
- **Persistent TCP Connections**: Cached per-device connections with OS-level keepalive, tuned for cellular modem reliability - **Persistent TCP Connections**: Cached per-device connections with OS-level keepalive, tuned for cellular modem reliability
- **Background Polling**: Continuous automatic polling of devices with configurable intervals - **Background Polling**: Continuous automatic polling of devices with configurable intervals
- **Offline Detection**: Automatic device reachability tracking with failure counters - **Offline Detection**: Automatic device reachability tracking with failure counters
@@ -44,6 +47,30 @@ SLMM is a standalone backend module that provides REST API routing and command t
└──────────────┘ └──────────────┘
``` ```
### Live Monitor — Fan-Out Feed (v0.4.0)
The NL-43 allows only one TCP control connection at a time, so multiple clients
polling the same device directly would contend for it. The monitor solves this
with a single shared, cached feed per device:
- **One reader, many subscribers**: a single poller reads the device; every
WebSocket subscriber (`WS /api/nl43/{unit_id}/monitor`) receives the same
frames — an instant first frame from cache, then live updates.
- **Persistent + auto-start**: a `monitor_enabled` flag keeps the feed running
and auto-starts it on boot. Enabled alert rules pin the monitor on for 24/7
evaluation even with no UI connected.
- **Adaptive & deduplicated**: poll rate adapts to demand, unreachable devices
back off, and the background poller skips units already covered by a monitor.
### Alert Engine (v0.4.0)
Per-device threshold alerting evaluated against the live feed:
- **Rules**: metric + threshold + `cooldown_s`, full CRUD per device
- **Events**: onset/clear state machine, acknowledgement, and a device-offline
alert when a monitored unit drops
- **Robust**: editing/deleting a rule resets its state and closes open events
### Persistent TCP Connection Pool (v0.3.0) ### Persistent TCP Connection Pool (v0.3.0)
SLMM maintains persistent TCP connections to devices with OS-level keepalive, designed for reliable operation over cellular modems: SLMM maintains persistent TCP connections to devices with OS-level keepalive, designed for reliable operation over cellular modems:
@@ -145,8 +172,32 @@ Logs are written to:
|--------|----------|-------------| |--------|----------|-------------|
| GET | `/api/nl43/{unit_id}/status` | Get cached measurement snapshot (updated by background poller) | | GET | `/api/nl43/{unit_id}/status` | Get cached measurement snapshot (updated by background poller) |
| GET | `/api/nl43/{unit_id}/live` | Request fresh DOD data from device (bypasses cache) | | GET | `/api/nl43/{unit_id}/live` | Request fresh DOD data from device (bypasses cache) |
| GET | `/api/nl43/{unit_id}/history` | Downsampled DOD trail for live-chart backfill |
| WS | `/api/nl43/{unit_id}/stream` | WebSocket stream for real-time DRD data | | WS | `/api/nl43/{unit_id}/stream` | WebSocket stream for real-time DRD data |
### Live Monitor (fan-out feed)
| Method | Endpoint | Description |
|--------|----------|-------------|
| WS | `/api/nl43/{unit_id}/monitor` | Subscribe to the shared cached live feed (instant first frame) |
| POST | `/api/nl43/{unit_id}/monitor/start` | Start the device's monitor feed |
| POST | `/api/nl43/{unit_id}/monitor/stop` | Stop the device's monitor feed |
| GET | `/api/nl43/_monitor/status` | Global monitor status across devices |
| POST | `/api/nl43/{unit_id}/disconnect` | Drop the device's pooled TCP connection |
| POST | `/api/nl43/{unit_id}/deactivate` | Quiesce polling/monitoring for one device |
| POST | `/api/nl43/_system/standby` | Global standby — quiesce all polling/monitoring |
### Alerts
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/api/nl43/{unit_id}/alerts/rules` | List alert rules for a device |
| POST | `/api/nl43/{unit_id}/alerts/rules` | Create an alert rule (metric, threshold, cooldown) |
| PUT | `/api/nl43/{unit_id}/alerts/rules/{rule_id}` | Update a rule (resets its state, closes open events) |
| DELETE | `/api/nl43/{unit_id}/alerts/rules/{rule_id}` | Delete a rule |
| GET | `/api/nl43/{unit_id}/alerts/events` | List alert events (onset/clear) |
| POST | `/api/nl43/{unit_id}/alerts/events/{event_id}/ack` | Acknowledge an event |
### Background Polling ### Background Polling
| Method | Endpoint | Description | | Method | Endpoint | Description |
@@ -273,11 +324,35 @@ Caches latest measurement snapshot:
- `sd_remaining_mb`: Free SD card space (MB) - `sd_remaining_mb`: Free SD card space (MB)
- `sd_free_ratio`: SD card free space ratio - `sd_free_ratio`: SD card free space ratio
- `raw_payload`: Raw device response data - `raw_payload`: Raw device response data
- `is_reachable`: Device reachability status (Boolean) ⭐ NEW - `is_reachable`: Device reachability status (Boolean)
- `consecutive_failures`: Count of consecutive poll failures ⭐ NEW - `consecutive_failures`: Count of consecutive poll failures
- `last_poll_attempt`: Last time background poller attempted to poll ⭐ NEW - `last_poll_attempt`: Last time background poller attempted to poll
- `last_success`: Last successful poll timestamp ⭐ NEW - `last_success`: Last successful poll timestamp
- `last_error`: Last error message (truncated to 500 chars) ⭐ NEW - `last_error`: Last error message (truncated to 500 chars)
- `ln1` / `ln2`: LN1/LN2 (L1/L10) percentile levels ⭐ v0.4.0
### NL43Readings Table ⭐ v0.4.0
Downsampled DOD trail backing the live-chart history endpoint (one row/minute,
pruned to a retention window — viewing only, not the report source):
- `id` (PK), `unit_id`, `timestamp`
- `lp` / `leq` / `lmax` / `ln1` / `ln2`: cached level samples
### AlertRule Table ⭐ v0.4.0
Per-device threshold alert rules:
- `id` (PK), `unit_id`, `name`, `enabled`
- `metric`, `comparison` (above/below), `threshold_db`, `clear_margin_db` (hysteresis)
- `duration_s` (sustained), `cooldown_s` (min seconds between onsets)
- `channels` / `recipients`, optional `schedule_start`/`schedule_end`/`schedule_days`
### AlertEvent Table ⭐ v0.4.0
Alert onset/clear events for history, inbox, and acknowledgement:
- `id` (PK), `unit_id`, `rule_id`, `rule_name`, `metric`, `threshold_db`
- `onset_at` / `onset_value`, `peak_value`, `clear_at`, `status` (active/cleared)
- `acknowledged_at` / `acknowledged_by`, `notes`
> New tables (`alert_rules`, `alert_events`, `nl43_readings`) auto-create on
> startup. Existing-table columns ship with migrations:
> `migrate_add_ln_percentiles.py`, `migrate_add_monitor_enabled.py`.
## Protocol Details ## Protocol Details
+322
View File
@@ -0,0 +1,322 @@
"""
Threshold alert engine.
Each unit can have any number of AlertRules. A rule is evaluated against the
unit's live monitor snapshots via a small per-(unit, rule) state machine:
IDLE --(metric exceeds threshold for duration_s)--> ACTIVE (fire ONSET)
ACTIVE --(metric recovers past hysteresis for duration_s)--> IDLE (fire CLEAR)
duration_s debounces both edges; clear_margin_db adds hysteresis so a level
hovering at the threshold doesn't flap. Onset and clear are distinct events.
The state-machine logic (`_evaluate_step`) is intentionally pure — no DB, no
real clock — so it can be unit-tested with a synthetic level series and a fake
clock. The AlertEvaluator wraps it with rule loading, scheduling, persistence,
and dispatch. Dispatch is a server log for now (POC); the seam to POST events to
a Terra-View webhook (email/SMS) is _dispatch().
"""
import asyncio
import logging
import os
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Local timezone offset for schedule windows (same env var services.py uses).
_TZ_OFFSET_HOURS = float(os.getenv("TIMEZONE_OFFSET", "-5"))
# How long to cache a unit's rules before re-querying the DB (rules change rarely).
_RULE_CACHE_TTL_S = 15.0
@dataclass
class RuleState:
"""In-memory runtime state for one (unit, rule)."""
phase: str = "idle" # "idle" | "active"
edge_since: Optional[float] = None # when the current edge condition began (clock time)
peak: float = 0.0
event_id: Optional[int] = None # the open AlertEvent row (for the clear update)
last_onset: Optional[float] = None # time of the last onset (for cooldown)
def _exceeds(value: float, rule) -> bool:
if rule.comparison == "below":
return value < rule.threshold_db
return value > rule.threshold_db
def _recovered(value: float, rule) -> bool:
margin = rule.clear_margin_db or 0.0
if rule.comparison == "below":
return value > rule.threshold_db + margin
return value < rule.threshold_db - margin
def _evaluate_step(state: RuleState, value: float, now: float, rule) -> Optional[str]:
"""Advance the state machine by one reading.
Pure: mutates `state`, returns 'onset' | 'clear' | None. `now` is injected so
tests can drive a fake clock.
"""
duration = rule.duration_s or 0
if state.phase == "idle":
if _exceeds(value, rule):
if state.edge_since is None:
state.edge_since = now
if now - state.edge_since >= duration:
# Cooldown: suppress a new onset within cooldown_s of the last one
# (stops a repeatedly-breaching signal from flooding the history).
# Hold edge_since so it fires the moment cooldown lapses if still
# breaching — don't reset it here.
cooldown = getattr(rule, "cooldown_s", 0) or 0
if state.last_onset is not None and (now - state.last_onset) < cooldown:
return None
state.phase = "active"
state.edge_since = None
state.peak = value
state.last_onset = now
return "onset"
else:
state.edge_since = None
return None
# active
if rule.comparison == "below":
state.peak = min(state.peak, value)
else:
state.peak = max(state.peak, value)
if _recovered(value, rule):
if state.edge_since is None:
state.edge_since = now
if now - state.edge_since >= duration:
state.phase = "idle"
state.edge_since = None
return "clear"
else:
state.edge_since = None
return None
def _in_window(now_minutes: int, start: str, end: str) -> bool:
"""Is now_minutes (minutes since local midnight) within [start, end)?
Handles wraparound windows like 22:0007:00."""
def _m(s: str) -> int:
h, m = s.split(":")
return int(h) * 60 + int(m)
s, e = _m(start), _m(end)
if s == e:
return True
if s < e:
return s <= now_minutes < e
return now_minutes >= s or now_minutes < e # wraparound
class AlertEvaluator:
def __init__(self):
self._states: Dict[Tuple[str, int], RuleState] = {}
self._rule_cache: Dict[str, Tuple[float, list]] = {} # unit_id -> (fetched_at, rules)
self._offline_events: Dict[str, int] = {} # unit_id -> open connectivity AlertEvent id
logger.info("[ALERT] rule-based evaluator ready")
async def evaluate(self, unit_id: str, snap) -> None:
"""Evaluate every enabled rule for this unit against one snapshot."""
rules = self._get_rules(unit_id)
if not rules:
return
now = asyncio.get_running_loop().time()
for rule in rules:
if not self._in_schedule(rule):
continue
raw = getattr(snap, rule.metric, None)
try:
value = float(raw)
except (TypeError, ValueError):
continue # missing / non-numeric ("-.-")
state = self._states.setdefault((unit_id, rule.id), RuleState())
action = _evaluate_step(state, value, now, rule)
if action == "onset":
await self._on_onset(unit_id, rule, value, state)
elif action == "clear":
await self._on_clear(unit_id, rule, value, state)
# -- rule loading (cached) ----------------------------------------------
def _get_rules(self, unit_id: str) -> list:
loop_now = asyncio.get_running_loop().time()
cached = self._rule_cache.get(unit_id)
if cached and loop_now - cached[0] < _RULE_CACHE_TTL_S:
return cached[1]
rules = self._load_rules(unit_id)
self._rule_cache[unit_id] = (loop_now, rules)
return rules
def _load_rules(self, unit_id: str) -> list:
from app.database import SessionLocal
from app.models import AlertRule
db = SessionLocal()
try:
return db.query(AlertRule).filter_by(unit_id=unit_id, enabled=True).all()
except Exception as e:
logger.warning(f"[ALERT] failed to load rules for {unit_id}: {e}")
return []
finally:
db.close()
def invalidate(self, unit_id: Optional[str] = None) -> None:
"""Drop cached rules so a change is picked up immediately."""
if unit_id is None:
self._rule_cache.clear()
else:
self._rule_cache.pop(unit_id, None)
def forget_rule(self, unit_id: str, rule_id: int) -> None:
"""Drop a rule's per-(unit, rule) state machine after the rule is edited or
deleted, so a stale 'active' phase / open event_id from the old config
doesn't bleed into the new one (mis-firing a clear or suppressing an onset)."""
self._states.pop((unit_id, rule_id), None)
# -- scheduling ----------------------------------------------------------
def _in_schedule(self, rule) -> bool:
if not rule.schedule_start or not rule.schedule_end:
day_ok = self._day_ok(rule)
return day_ok
local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS)
if not self._day_ok(rule, local):
return False
return _in_window(local.hour * 60 + local.minute, rule.schedule_start, rule.schedule_end)
@staticmethod
def _day_ok(rule, local: Optional[datetime] = None) -> bool:
if not rule.schedule_days:
return True
if local is None:
local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS)
allowed = {int(d) for d in str(rule.schedule_days).split(",") if d.strip() != ""}
return local.weekday() in allowed # Mon=0
# -- event persistence + dispatch ---------------------------------------
async def _on_onset(self, unit_id: str, rule, value: float, state: RuleState) -> None:
from app.database import SessionLocal
from app.models import AlertEvent
db = SessionLocal()
try:
evt = AlertEvent(
rule_id=rule.id, unit_id=unit_id, rule_name=rule.name,
metric=rule.metric, threshold_db=rule.threshold_db,
onset_value=value, peak_value=value, status="active",
)
db.add(evt)
db.commit()
db.refresh(evt)
state.event_id = evt.id
except Exception as e:
logger.warning(f"[ALERT] failed to record onset for {unit_id}: {e}")
finally:
db.close()
await self._dispatch(
"ONSET", unit_id, rule,
f"{rule.metric.upper()}={value:.1f} dB "
f"{'<' if rule.comparison == 'below' else '>'} {rule.threshold_db:.1f} dB"
f"{f' for {rule.duration_s}s' if rule.duration_s else ''}",
)
async def _on_clear(self, unit_id: str, rule, value: float, state: RuleState) -> None:
peak = state.peak
from app.database import SessionLocal
from app.models import AlertEvent
db = SessionLocal()
try:
if state.event_id is not None:
evt = db.query(AlertEvent).filter_by(id=state.event_id).first()
if evt:
evt.clear_at = datetime.utcnow()
evt.peak_value = peak
evt.status = "cleared"
db.commit()
except Exception as e:
logger.warning(f"[ALERT] failed to record clear for {unit_id}: {e}")
finally:
db.close()
state.event_id = None
await self._dispatch(
"CLEAR", unit_id, rule,
f"recovered to {value:.1f} dB (peak {peak:.1f} dB)",
)
# -- connectivity (device offline/online) -------------------------------
#
# Raised by the live monitor when it loses / regains contact with a device.
# Persisted as an AlertEvent (sentinel rule_id=0, metric="connectivity") so it
# lands in the same events/inbox/ack pipeline as threshold alerts. The in-memory
# map dedupes; the DB query also dedupes across a process restart.
async def device_offline(self, unit_id: str) -> None:
if unit_id in self._offline_events:
return # already flagged offline
from app.database import SessionLocal
from app.models import AlertEvent
db = SessionLocal()
try:
existing = db.query(AlertEvent).filter_by(
unit_id=unit_id, metric="connectivity", status="active").first()
if existing: # already open in the DB (e.g. carried across a restart)
self._offline_events[unit_id] = existing.id
return
evt = AlertEvent(
rule_id=0, unit_id=unit_id, rule_name="Device unreachable",
metric="connectivity", threshold_db=0.0, status="active",
)
db.add(evt)
db.commit()
db.refresh(evt)
self._offline_events[unit_id] = evt.id
except Exception as e:
logger.warning(f"[ALERT] failed to record offline for {unit_id}: {e}")
finally:
db.close()
await self._dispatch_raw("OFFLINE", unit_id, "Device unreachable",
"live monitor lost contact with the device")
async def device_online(self, unit_id: str) -> None:
self._offline_events.pop(unit_id, None)
from app.database import SessionLocal
from app.models import AlertEvent
db = SessionLocal()
cleared = 0
try:
opened = db.query(AlertEvent).filter_by(
unit_id=unit_id, metric="connectivity", status="active").all()
for evt in opened:
evt.clear_at = datetime.utcnow()
evt.status = "cleared"
cleared += 1
if cleared:
db.commit()
except Exception as e:
logger.warning(f"[ALERT] failed to record online for {unit_id}: {e}")
finally:
db.close()
if cleared: # only announce recovery if it was actually flagged offline
await self._dispatch_raw("ONLINE", unit_id, "Device recovered",
"live monitor regained contact with the device")
# -- event persistence + dispatch ---------------------------------------
async def _dispatch(self, kind: str, unit_id: str, rule, detail: str) -> None:
await self._dispatch_raw(kind, unit_id, rule.name, detail)
async def _dispatch_raw(self, kind: str, unit_id: str, name: str, detail: str) -> None:
"""POC dispatch: server log. Swap in a Terra-View webhook (email/SMS) here."""
logger.warning(f"[ALERT:{kind}] {unit_id} '{name}': {detail}")
# Module-level singleton (the monitor calls alert_evaluator.evaluate per snapshot)
alert_evaluator = AlertEvaluator()
+53 -4
View File
@@ -8,6 +8,7 @@ for fast API access without querying devices on every request.
import asyncio import asyncio
import logging import logging
import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional from typing import Optional
@@ -20,6 +21,11 @@ from app.device_logger import log_device_event, cleanup_old_logs
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Global polling default. Set SLMM_POLLING_ENABLED=false to start an instance in
# standby (running but not polling and not holding device connections) — e.g. a
# dev box that must not latch onto a device that a prod instance owns.
POLLING_ENABLED_DEFAULT = os.getenv("SLMM_POLLING_ENABLED", "true").lower() == "true"
class BackgroundPoller: class BackgroundPoller:
""" """
@@ -39,6 +45,7 @@ class BackgroundPoller:
self._logger = logger self._logger = logger
self._last_cleanup = None # Track last log cleanup time self._last_cleanup = None # Track last log cleanup time
self._last_pool_log = None # Track last connection pool heartbeat log self._last_pool_log = None # Track last connection pool heartbeat log
self._active = POLLING_ENABLED_DEFAULT # Global polling on/off (standby toggle)
async def start(self): async def start(self):
"""Start the background polling task.""" """Start the background polling task."""
@@ -71,15 +78,48 @@ class BackgroundPoller:
self._logger.info("Background poller stopped") self._logger.info("Background poller stopped")
def is_active(self) -> bool:
"""Whether background polling is currently active (vs standby)."""
return self._active
async def set_active(self, active: bool):
"""Globally enable/disable polling at runtime.
When deactivated, the loop stays alive but polls nothing and releases all
device connections, so this SLMM instance stops occupying the devices'
single connection slots (e.g. so a prod instance can take over). Runtime
state only — on restart the instance returns to SLMM_POLLING_ENABLED.
"""
self._active = active
if active:
self._logger.info("[SYSTEM] Background polling ACTIVATED")
else:
self._logger.info("[SYSTEM] Background polling DEACTIVATED (standby) — releasing connections")
await self._release_all_connections()
async def _release_all_connections(self):
"""Gracefully close every pooled device connection (no-op if none)."""
from app.services import _connection_pool
for device_key in list(_connection_pool.get_stats().get("connections", {})):
await _connection_pool.discard(device_key)
async def _poll_loop(self): async def _poll_loop(self):
"""Main polling loop that runs continuously.""" """Main polling loop that runs continuously."""
self._logger.info("Background polling loop started") self._logger.info("Background polling loop started")
while self._running: while self._running:
try: if self._active:
await self._poll_all_devices() try:
except Exception as e: await self._poll_all_devices()
self._logger.error(f"Error in poll loop: {e}", exc_info=True) except Exception as e:
self._logger.error(f"Error in poll loop: {e}", exc_info=True)
else:
# Standby: poll nothing, and keep holding no device connection slots
# so another SLMM instance (e.g. prod) can talk to the devices.
try:
await self._release_all_connections()
except Exception as e:
self._logger.warning(f"Standby connection release failed: {e}")
# Run log cleanup once per hour # Run log cleanup once per hour
try: try:
@@ -138,10 +178,19 @@ class BackgroundPoller:
now = datetime.utcnow() now = datetime.utcnow()
polled_count = 0 polled_count = 0
from app.monitor import monitor_manager
for cfg in configs: for cfg in configs:
if not self._running: if not self._running:
break break
# Skip units with an active live monitor: it polls them at ~1Hz and
# keeps the status cache fresh, so a redundant background poll would just
# add load/lock-contention on the device's single connection.
if monitor_manager.is_active(cfg.unit_id):
self._logger.debug(f"Skipping {cfg.unit_id} — live monitor active")
continue
# Get current status # Get current status
status = db.query(NL43Status).filter_by(unit_id=cfg.unit_id).first() status = db.query(NL43Status).filter_by(unit_id=cfg.unit_id).first()
+22 -3
View File
@@ -38,6 +38,25 @@ async def lifespan(app: FastAPI):
await poller.start() await poller.start()
logger.info("Background poller started") logger.info("Background poller started")
# Auto-start keepalive live monitors for units configured for 24/7 monitoring
# (monitor_enabled). This is what keeps alerting running unattended across
# restarts — without it a feed only runs while someone has the live view open.
try:
from app.monitor import monitor_manager
from app.database import SessionLocal
from app.models import NL43Config
db = SessionLocal()
try:
units = db.query(NL43Config).filter_by(monitor_enabled=True, tcp_enabled=True).all()
for cfg in units:
m = await monitor_manager.get(cfg.unit_id)
await m.set_keepalive(True)
logger.info(f"Auto-started keepalive monitor for {cfg.unit_id}")
finally:
db.close()
except Exception as e:
logger.error(f"Failed to auto-start monitors: {e}")
yield # Application runs yield # Application runs
# Shutdown # Shutdown
@@ -52,7 +71,7 @@ async def lifespan(app: FastAPI):
app = FastAPI( app = FastAPI(
title="SLMM NL43 Addon", title="SLMM NL43 Addon",
description="Standalone module for NL43 configuration and status APIs with background polling", description="Standalone module for NL43 configuration and status APIs with background polling",
version="0.3.0", version="0.4.0",
lifespan=lifespan, lifespan=lifespan,
) )
@@ -76,12 +95,12 @@ app.include_router(routers.router)
@app.get("/", response_class=HTMLResponse) @app.get("/", response_class=HTMLResponse)
def index(request: Request): def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request}) return templates.TemplateResponse(request, "index.html")
@app.get("/roster", response_class=HTMLResponse) @app.get("/roster", response_class=HTMLResponse)
def roster(request: Request): def roster(request: Request):
return templates.TemplateResponse("roster.html", {"request": request}) return templates.TemplateResponse(request, "roster.html")
@app.get("/health") @app.get("/health")
+78 -1
View File
@@ -1,4 +1,4 @@
from sqlalchemy import Column, String, DateTime, Boolean, Integer, Text, func from sqlalchemy import Column, String, DateTime, Boolean, Integer, Float, Text, func
from app.database import Base from app.database import Base
@@ -23,6 +23,10 @@ class NL43Config(Base):
poll_interval_seconds = Column(Integer, nullable=True, default=60) # Polling interval (10-3600 seconds) poll_interval_seconds = Column(Integer, nullable=True, default=60) # Polling interval (10-3600 seconds)
poll_enabled = Column(Boolean, default=True) # Enable/disable background polling for this device poll_enabled = Column(Boolean, default=True) # Enable/disable background polling for this device
# Live monitor (fan-out DOD feed). Keepalive runs it 24/7 even with no viewer,
# which is what makes alerting continuous. On by default; toggleable from the UI.
monitor_enabled = Column(Boolean, default=True)
class NL43Status(Base): class NL43Status(Base):
""" """
@@ -41,6 +45,8 @@ class NL43Status(Base):
lmax = Column(String, nullable=True) # Maximum level lmax = Column(String, nullable=True) # Maximum level
lmin = Column(String, nullable=True) # Minimum level lmin = Column(String, nullable=True) # Minimum level
lpeak = Column(String, nullable=True) # Peak level lpeak = Column(String, nullable=True) # Peak level
ln1 = Column(String, nullable=True) # Percentile slot LN1 (configurable; device default L5, contract L1)
ln2 = Column(String, nullable=True) # Percentile slot LN2 (configurable; device default L10)
battery_level = Column(String, nullable=True) battery_level = Column(String, nullable=True)
power_source = Column(String, nullable=True) power_source = Column(String, nullable=True)
sd_remaining_mb = Column(String, nullable=True) sd_remaining_mb = Column(String, nullable=True)
@@ -72,3 +78,74 @@ class DeviceLog(Base):
level = Column(String, default="INFO") # DEBUG, INFO, WARNING, ERROR level = Column(String, default="INFO") # DEBUG, INFO, WARNING, ERROR
category = Column(String, default="GENERAL") # TCP, FTP, POLL, COMMAND, STATE, SYNC category = Column(String, default="GENERAL") # TCP, FTP, POLL, COMMAND, STATE, SYNC
message = Column(Text, nullable=False) message = Column(Text, nullable=False)
class AlertRule(Base):
"""A threshold-alert rule evaluated against a unit's live monitor feed.
Source-agnostic: today it runs over the DOD monitor; the same rule transfers
unchanged if a unit's feed is later sourced from FTP intervals.
"""
__tablename__ = "alert_rules"
id = Column(Integer, primary_key=True, autoincrement=True)
unit_id = Column(String, index=True, nullable=False)
name = Column(String, nullable=False, default="Alert")
metric = Column(String, nullable=False, default="lp") # lp/leq/lmax/lmin/lpeak/ln1/ln2
comparison = Column(String, nullable=False, default="above") # above | below
threshold_db = Column(Float, nullable=False)
duration_s = Column(Integer, nullable=False, default=0) # sustained seconds (0 = instant)
clear_margin_db = Column(Float, nullable=False, default=2.0) # hysteresis band
cooldown_s = Column(Integer, nullable=False, default=300) # min seconds between onsets
# Optional time-of-day scoping (local time). schedule_start/end as "HH:MM";
# null = always active. schedule_days = CSV of 0-6 (Mon=0); null = every day.
schedule_start = Column(String, nullable=True)
schedule_end = Column(String, nullable=True)
schedule_days = Column(String, nullable=True)
channels = Column(String, nullable=False, default="log") # CSV: log,email,sms
recipients = Column(Text, nullable=True) # CSV of emails/phones
enabled = Column(Boolean, default=True)
created_at = Column(DateTime, default=func.now())
class AlertEvent(Base):
"""A fired alert (onset → clear), for history / inbox / acknowledgement."""
__tablename__ = "alert_events"
id = Column(Integer, primary_key=True, autoincrement=True)
rule_id = Column(Integer, index=True, nullable=False)
unit_id = Column(String, index=True, nullable=False)
rule_name = Column(String, nullable=True)
metric = Column(String, nullable=False)
threshold_db = Column(Float, nullable=False)
onset_at = Column(DateTime, default=func.now(), index=True)
onset_value = Column(Float, nullable=True)
peak_value = Column(Float, nullable=True)
clear_at = Column(DateTime, nullable=True)
status = Column(String, default="active") # active | cleared
acknowledged_at = Column(DateTime, nullable=True)
acknowledged_by = Column(String, nullable=True)
notes = Column(Text, nullable=True)
class NL43Reading(Base):
"""Downsampled time-series of live-monitor readings, for the live-chart
backfill (so a viewer sees recent trend on open, not a blank chart).
Viewing only — NOT the report source. Reports use the device's authoritative
FTP .rnd intervals. This is a short, capped trail (one row/minute, pruned to
a retention window) fed by the monitor's keepalive poll loop.
"""
__tablename__ = "nl43_readings"
id = Column(Integer, primary_key=True, autoincrement=True)
unit_id = Column(String, index=True, nullable=False)
timestamp = Column(DateTime, default=func.now(), index=True)
lp = Column(String, nullable=True)
leq = Column(String, nullable=True)
lmax = Column(String, nullable=True)
ln1 = Column(String, nullable=True)
ln2 = Column(String, nullable=True)
+322
View File
@@ -0,0 +1,322 @@
"""
Per-device live monitor (fan-out hub).
ONE DOD poll loop per device, broadcast to many subscribers:
- browser WebSocket clients (live view) — they no longer each open their own
device stream, so the NL43's single-connection limit stops causing the
"second viewer sees nothing" contention.
- the alert evaluator (threshold alerts), which can keep a device's feed running
even with no browser attached.
- persistence (each snapshot is written to NL43Status, like the poller does).
The device's one TCP connection is respected: every poll goes through the same
per-device lock + connection pool in services.py, so the monitor, the background
poller, and on-demand commands all serialize safely.
"""
import asyncio
import logging
import os
from datetime import datetime
from typing import Dict, Optional, Set
from app.database import SessionLocal
from app.models import NL43Config, NL43Status
from app.services import NL43Client, persist_snapshot
from app.alerts import alert_evaluator
logger = logging.getLogger(__name__)
# Extra idle between DOD polls WHEN A BROWSER IS WATCHING. The 1s device rate-limit
# already paces consecutive DOD? commands, so this just needs to be small — the
# rate-limit is the real floor (~1.25s/poll effective).
MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "0.25"))
# Idle cadence when NO browser is subscribed and the feed is only kept alive for
# alerting. Same data, ~8x fewer polls -> ~8x less cellular traffic on a metered
# SIM (~1 GB/device/month at full rate -> ~125 MB). NOTE: this also sets the alert
# sampling resolution when nobody is watching, so keep it <= the smallest alert
# duration_s you rely on (default 10s comfortably catches a "sustained 30/60s" rule).
MONITOR_IDLE_POLL_INTERVAL = float(os.getenv("MONITOR_IDLE_POLL_INTERVAL", "10"))
# Exponential backoff once the device is unreachable, so a powered-off / asleep /
# out-of-signal device stops churning reconnects every cycle (log spam + a trickle
# of wasted cellular data on failed SYNs). delay = min(BASE * 2**(fails-1), MAX),
# reset to full-rate on the first good poll. While a browser is actively watching we
# cap the backoff lower (WATCHED_MAX) so a recovery surfaces quickly for the viewer.
MONITOR_BACKOFF_BASE_S = float(os.getenv("MONITOR_BACKOFF_BASE_S", "1"))
MONITOR_BACKOFF_MAX_S = float(os.getenv("MONITOR_BACKOFF_MAX_S", "60"))
MONITOR_BACKOFF_WATCHED_MAX_S = float(os.getenv("MONITOR_BACKOFF_WATCHED_MAX_S", "5"))
# How often to refresh the run state (Measure?). It changes rarely, so we cache it
# and skip that second rate-limited command on most polls — roughly halving the
# per-update latency (~2.5s -> ~1.3s).
MONITOR_STATE_REFRESH_S = float(os.getenv("MONITOR_STATE_REFRESH_S", "30"))
# Downsampled trail for the live-chart backfill: store one reading per
# TRAIL_SAMPLE_S and keep TRAIL_RETENTION_HOURS of it (pruned). Viewing only —
# reports use the device's FTP .rnd data, not this.
TRAIL_SAMPLE_S = float(os.getenv("MONITOR_TRAIL_SAMPLE_S", "60"))
TRAIL_RETENTION_HOURS = float(os.getenv("MONITOR_TRAIL_RETENTION_HOURS", "24"))
# If nothing has been broadcast in this many seconds (e.g. device offline and
# silent), send a keepalive frame so reverse proxies don't drop the idle WS.
MONITOR_HEARTBEAT_S = float(os.getenv("MONITOR_HEARTBEAT_S", "25"))
def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict:
"""Build the broadcast payload — same shape as the DRD stream, but DOD-sourced
so it carries ln1/ln2 (which DRD cannot)."""
return {
"unit_id": unit_id,
"timestamp": datetime.utcnow().isoformat(),
"measurement_state": snap.measurement_state,
"measurement_start_time": measurement_start_time,
"counter": snap.counter,
"lp": snap.lp,
"leq": snap.leq,
"lmax": snap.lmax,
"lmin": snap.lmin,
"lpeak": snap.lpeak,
"ln1": snap.ln1,
"ln2": snap.ln2,
"raw_payload": snap.raw_payload,
}
class DeviceMonitor:
"""Owns a single DOD poll loop for one device and fans each snapshot out to
all subscribers. Runs while it has at least one browser subscriber OR the
server-side keep-alive (alerting) flag is set."""
def __init__(self, unit_id: str):
self.unit_id = unit_id
self._subscribers: Set[asyncio.Queue] = set()
self._keepalive = False
self._task: Optional[asyncio.Task] = None
self._lock = asyncio.Lock()
self._last_payload: Optional[dict] = None # replayed to new subscribers
self._consec_fail = 0
self._reachable = True # last broadcast reachability (for transition frames)
self._cached_state: Optional[str] = None # run state, refreshed periodically
self._last_state_refresh = 0.0
self._last_trail_store = 0.0 # downsample throttle for the backfill trail
@property
def running(self) -> bool:
return self._task is not None and not self._task.done()
def subscriber_count(self) -> int:
return len(self._subscribers)
def _has_demand(self) -> bool:
return bool(self._subscribers) or self._keepalive
def _ensure_task(self) -> None:
if self._task is None or self._task.done():
self._task = asyncio.create_task(self._run())
async def subscribe(self) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=5)
async with self._lock:
self._subscribers.add(q)
# Replay the last frame so a client connecting mid-stream sees data
# (or the current 'unreachable' state) immediately, not after a poll.
if self._last_payload is not None:
try:
q.put_nowait(self._last_payload)
except asyncio.QueueFull:
pass
self._ensure_task()
return q
async def unsubscribe(self, q: asyncio.Queue) -> None:
async with self._lock:
self._subscribers.discard(q)
async def set_keepalive(self, on: bool) -> None:
async with self._lock:
self._keepalive = on
if on:
self._ensure_task()
async def _run(self) -> None:
logger.info(f"[MONITOR] {self.unit_id}: feed started")
loop = asyncio.get_running_loop()
last_send = loop.time()
try:
while self._has_demand():
snap, mst = await self._poll_once()
if snap is not None:
if not self._reachable:
# Recovered from an outage — clear the connectivity alert.
try:
await alert_evaluator.device_online(self.unit_id)
except Exception as e:
logger.warning(f"[MONITOR] {self.unit_id}: online alert failed: {e}")
self._consec_fail = 0
self._reachable = True
payload = _snapshot_payload(snap, self.unit_id, mst)
payload["feed_status"] = "ok"
self._broadcast(payload)
last_send = loop.time()
try:
await alert_evaluator.evaluate(self.unit_id, snap)
except Exception as e:
logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}")
else:
# Tell clients the device went offline — once, on transition, after a
# few failures so a momentary blip doesn't flap the UI. Same edge
# raises the device-offline alert.
self._consec_fail += 1
if self._reachable and self._consec_fail >= 3:
self._reachable = False
self._broadcast({
"unit_id": self.unit_id,
"timestamp": datetime.utcnow().isoformat(),
"feed_status": "unreachable",
})
last_send = loop.time()
try:
await alert_evaluator.device_offline(self.unit_id)
except Exception as e:
logger.warning(f"[MONITOR] {self.unit_id}: offline alert failed: {e}")
# Heartbeat: during quiet/offline stretches, send a keepalive so an
# idle WS isn't dropped by a reverse proxy. Not cached (new subscribers
# should still get the last real frame, not a heartbeat).
if loop.time() - last_send >= MONITOR_HEARTBEAT_S:
self._broadcast({
"unit_id": self.unit_id,
"timestamp": datetime.utcnow().isoformat(),
"feed_status": "ok" if self._reachable else "unreachable",
"heartbeat": True,
}, cache=False)
last_send = loop.time()
await asyncio.sleep(self._next_delay())
finally:
logger.info(f"[MONITOR] {self.unit_id}: feed stopped")
def _next_delay(self) -> float:
"""Inter-poll delay: exponential backoff while unreachable, full-rate while a
browser is watching, relaxed cadence when the feed is keepalive-only."""
if self._consec_fail > 0:
shift = min(self._consec_fail - 1, 6) # cap growth at 2**6 = 64x base
delay = min(MONITOR_BACKOFF_BASE_S * (2 ** shift), MONITOR_BACKOFF_MAX_S)
if self._subscribers:
delay = min(delay, MONITOR_BACKOFF_WATCHED_MAX_S)
return delay
if self._subscribers:
return MONITOR_POLL_INTERVAL # a browser is watching — smooth chart
return MONITOR_IDLE_POLL_INTERVAL # keepalive-only (alerting) — save data
async def _poll_once(self):
"""One DOD poll: read, persist, return (snapshot, measurement_start_iso)."""
db = SessionLocal()
try:
cfg = db.query(NL43Config).filter_by(unit_id=self.unit_id).first()
if not cfg or not cfg.tcp_enabled:
return None, None
client = NL43Client(
cfg.host, cfg.tcp_port,
ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password,
ftp_port=cfg.ftp_port or 21,
)
# Refresh the run state only every MONITOR_STATE_REFRESH_S; reuse the
# cached state otherwise so most polls send just DOD? (one rate-limited
# command) instead of DOD? + Measure?.
now = asyncio.get_running_loop().time()
refresh_state = (self._cached_state is None
or now - self._last_state_refresh >= MONITOR_STATE_REFRESH_S)
snap = await client.request_dod(
measurement_state=None if refresh_state else self._cached_state
)
if refresh_state:
self._cached_state = snap.measurement_state
self._last_state_refresh = now
snap.unit_id = self.unit_id
persist_snapshot(snap, db)
db.commit()
# Append to the downsampled backfill trail (~one row per TRAIL_SAMPLE_S).
if now - self._last_trail_store >= TRAIL_SAMPLE_S:
self._last_trail_store = now
self._store_trail(snap, db)
status = db.query(NL43Status).filter_by(unit_id=self.unit_id).first()
mst = (status.measurement_start_time.isoformat()
if status and status.measurement_start_time else None)
return snap, mst
except Exception as e:
logger.warning(f"[MONITOR] {self.unit_id}: poll failed: {e}")
return None, None
finally:
db.close()
def _store_trail(self, snap, db) -> None:
"""Append one downsampled reading to the backfill trail and prune old rows."""
from datetime import datetime, timedelta
from app.models import NL43Reading
try:
db.add(NL43Reading(
unit_id=self.unit_id, timestamp=datetime.utcnow(),
lp=snap.lp, leq=snap.leq, lmax=snap.lmax, ln1=snap.ln1, ln2=snap.ln2,
))
cutoff = datetime.utcnow() - timedelta(hours=TRAIL_RETENTION_HOURS)
db.query(NL43Reading).filter(
NL43Reading.unit_id == self.unit_id,
NL43Reading.timestamp < cutoff,
).delete()
db.commit()
except Exception as e:
logger.warning(f"[MONITOR] {self.unit_id}: trail store failed: {e}")
def _broadcast(self, payload: dict, cache: bool = True) -> None:
if cache:
self._last_payload = payload # replayed to new subscribers
for q in list(self._subscribers):
try:
q.put_nowait(payload)
except asyncio.QueueFull:
# Slow consumer — drop this frame rather than stall the whole feed.
pass
class MonitorManager:
"""Registry of per-device monitors (one per unit_id)."""
def __init__(self):
self._monitors: Dict[str, DeviceMonitor] = {}
self._lock = asyncio.Lock()
async def get(self, unit_id: str) -> DeviceMonitor:
async with self._lock:
m = self._monitors.get(unit_id)
if m is None:
m = DeviceMonitor(unit_id)
self._monitors[unit_id] = m
return m
def is_active(self, unit_id: str) -> bool:
"""True if this unit has a running monitor feed (so the background poller
can skip it — the monitor already polls it more often)."""
m = self._monitors.get(unit_id)
return m is not None and m.running
def status(self) -> dict:
return {
uid: {
"running": m.running,
"subscribers": m.subscriber_count(),
"keepalive": m._keepalive,
"reachable": m._reachable,
# what cadence the loop is currently using, for observability
"mode": ("backoff" if m._consec_fail > 0
else "watched" if m._subscribers
else "idle"),
}
for uid, m in self._monitors.items()
}
# Module-level singleton
monitor_manager = MonitorManager()
+401 -2
View File
@@ -11,7 +11,7 @@ import os
import asyncio import asyncio
from app.database import get_db from app.database import get_db
from app.models import NL43Config, NL43Status from app.models import NL43Config, NL43Status, AlertRule, AlertEvent, NL43Reading
from app.services import NL43Client, persist_snapshot from app.services import NL43Client, persist_snapshot
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -121,6 +121,392 @@ async def flush_connection_pool():
return {"status": "ok", "message": "All cached connections closed"} return {"status": "ok", "message": "All cached connections closed"}
@router.post("/{unit_id}/disconnect")
async def disconnect_device(unit_id: str, db: Session = Depends(get_db)):
"""Cleanly close SLMM's persistent TCP connection to a single device.
Gracefully closes (TCP FIN + wait_closed) the pooled connection for this
device and removes it from the pool, freeing the NL43's single connection
slot. Idempotent — a no-op if no connection is currently cached.
Note: this releases the *idle* pooled connection. It does not interrupt an
in-progress DRD stream or an in-flight command (those have the socket
checked out of the pool) — close the stream WebSocket to end a live stream.
"""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if not cfg:
raise HTTPException(status_code=404, detail="NL43 config not found")
from app.services import _connection_pool
device_key = f"{cfg.host}:{cfg.tcp_port}"
had_conn = device_key in _connection_pool.get_stats().get("connections", {})
await _connection_pool.discard(device_key)
return {
"status": "ok",
"unit_id": unit_id,
"device_key": device_key,
"disconnected": had_conn,
"message": "Connection closed" if had_conn else "No cached connection to close",
}
@router.post("/{unit_id}/deactivate")
async def deactivate_device(unit_id: str, db: Session = Depends(get_db)):
"""Make a single unit dormant: stop background polling for it AND drop its
connection, freeing the device's connection slot. poll_enabled=False is
persisted, so the unit stays dormant across restarts until /activate.
"""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if not cfg:
raise HTTPException(status_code=404, detail="NL43 config not found")
cfg.poll_enabled = False
db.commit()
from app.services import _connection_pool, _get_device_lock
device_key = f"{cfg.host}:{cfg.tcp_port}"
# Wait briefly for any in-flight poll/command to finish (so its connection is
# back in the pool), then drop it. If a long-lived stream holds the lock we
# don't block forever — discard the pooled connection regardless.
lock = await _get_device_lock(device_key)
acquired = False
try:
await asyncio.wait_for(lock.acquire(), timeout=10.0)
acquired = True
except asyncio.TimeoutError:
acquired = False
try:
await _connection_pool.discard(device_key)
finally:
if acquired:
lock.release()
return {
"status": "ok",
"unit_id": unit_id,
"poll_enabled": False,
"message": "Polling disabled and connection closed for this unit",
}
@router.post("/{unit_id}/activate")
async def activate_device(unit_id: str, db: Session = Depends(get_db)):
"""Resume background polling for a unit previously deactivated."""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if not cfg:
raise HTTPException(status_code=404, detail="NL43 config not found")
cfg.poll_enabled = True
db.commit()
return {
"status": "ok",
"unit_id": unit_id,
"poll_enabled": True,
"message": "Polling enabled for this unit",
}
@router.get("/_system/status")
async def system_status():
"""Report whether this SLMM instance is actively polling or in standby."""
from app.background_poller import poller
from app.services import _connection_pool
return {
"status": "ok",
"mode": "active" if poller.is_active() else "standby",
"polling_active": poller.is_active(),
"active_connections": _connection_pool.get_stats().get("active_connections", 0),
}
@router.post("/_system/standby")
async def system_standby():
"""Put this SLMM instance into standby: stop polling ALL devices and release
every connection, so it stops occupying device slots (e.g. so a prod instance
can take over). Runtime-only — on restart the instance returns to its
SLMM_POLLING_ENABLED default.
"""
from app.background_poller import poller
await poller.set_active(False)
return {"status": "ok", "mode": "standby",
"message": "Polling stopped and all device connections released"}
@router.post("/_system/resume")
async def system_resume():
"""Resume polling after standby (global)."""
from app.background_poller import poller
await poller.set_active(True)
return {"status": "ok", "mode": "active", "message": "Polling resumed"}
# ============================================================================
# LIVE MONITOR (fan-out) — one DOD feed per device, broadcast to many clients
# ============================================================================
@router.websocket("/{unit_id}/monitor")
async def monitor_stream(websocket: WebSocket, unit_id: str):
"""Subscribe a browser to the device's shared 1 Hz DOD feed.
Any number of clients can attach without each opening its own device
connection (one poll loop per device, fanned out). Same JSON shape as the
DRD stream, but DOD-sourced so it includes ln1/ln2 (L1/L10).
"""
await websocket.accept()
from app.monitor import monitor_manager
monitor = await monitor_manager.get(unit_id)
queue = await monitor.subscribe()
logger.info(f"Monitor subscriber attached for {unit_id} ({monitor.subscriber_count()} total)")
async def _watch_disconnect():
# Completes when the client disconnects, so an idle feed (no data) still
# detects the drop and we don't leak a subscription that keeps the device
# feed (and its connection) alive.
try:
while True:
msg = await websocket.receive()
if msg.get("type") == "websocket.disconnect":
return
except Exception:
return
gone = asyncio.ensure_future(_watch_disconnect())
try:
while not gone.done():
try:
payload = await asyncio.wait_for(queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue # re-check gone.done()
if gone.done():
break # client disconnected while we waited — don't send into a closing socket
await websocket.send_json(payload)
except WebSocketDisconnect:
logger.info(f"Monitor subscriber disconnected for {unit_id}")
except Exception as e:
# A frame that races the close (client vanished mid-send) surfaces as
# "Unexpected ASGI message 'websocket.send' after ... websocket.close".
# That's expected on disconnect (the portal closes the socket on every tab
# switch), not an error — log it quietly.
msg = str(e)
if "after sending" in msg or "websocket.close" in msg or "response already completed" in msg:
logger.debug(f"Monitor stream for {unit_id} closed mid-send (client gone)")
else:
logger.warning(f"Monitor stream error for {unit_id}: {e}")
finally:
gone.cancel()
await monitor.unsubscribe(queue)
@router.post("/{unit_id}/monitor/start")
async def monitor_start(unit_id: str, db: Session = Depends(get_db)):
"""Enable 24/7 keepalive monitoring: persist monitor_enabled and start the feed
now, so alerting evaluates continuously even with no viewer. Survives restarts
(auto-started on boot from the persisted flag)."""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if cfg:
cfg.monitor_enabled = True
db.commit()
from app.monitor import monitor_manager
monitor = await monitor_manager.get(unit_id)
await monitor.set_keepalive(True)
return {"status": "ok", "unit_id": unit_id, "monitor_enabled": True, "running": monitor.running}
@router.post("/{unit_id}/monitor/stop")
async def monitor_stop(unit_id: str, db: Session = Depends(get_db)):
"""Disable keepalive monitoring: persist monitor_enabled=False and drop the
keepalive (the feed stops once no browser subscribers remain)."""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if cfg:
cfg.monitor_enabled = False
db.commit()
from app.monitor import monitor_manager
monitor = await monitor_manager.get(unit_id)
await monitor.set_keepalive(False)
return {"status": "ok", "unit_id": unit_id, "monitor_enabled": False}
@router.get("/_monitor/status")
async def monitor_status():
"""Status of every device monitor (running, subscriber count, keep-alive)."""
from app.monitor import monitor_manager
return {"status": "ok", "monitors": monitor_manager.status()}
@router.get("/{unit_id}/history")
def get_monitor_history(unit_id: str, hours: float = 2.0, db: Session = Depends(get_db)):
"""Recent downsampled monitor readings (the DOD trail) for the live-chart
backfill. Viewing only — NOT the FTP report data."""
from datetime import timedelta
hours = max(0.1, min(hours, 48.0))
cutoff = datetime.utcnow() - timedelta(hours=hours)
rows = (db.query(NL43Reading)
.filter(NL43Reading.unit_id == unit_id, NL43Reading.timestamp >= cutoff)
.order_by(NL43Reading.timestamp.asc()).all())
return {
"status": "ok",
"unit_id": unit_id,
"hours": hours,
"count": len(rows),
"readings": [
{
"timestamp": r.timestamp.isoformat() if r.timestamp else None,
"lp": r.lp, "leq": r.leq, "lmax": r.lmax, "ln1": r.ln1, "ln2": r.ln2,
}
for r in rows
],
}
# ============================================================================
# ALERTS — threshold rules + fired events
# ============================================================================
class AlertRulePayload(BaseModel):
name: str = "Alert"
metric: str = "lp" # lp/leq/lmax/lmin/lpeak/ln1/ln2
comparison: str = "above" # above | below
threshold_db: float
duration_s: int = 0 # sustained seconds before firing (0 = instant)
clear_margin_db: float = 2.0 # hysteresis band
cooldown_s: int = 300
schedule_start: str | None = None # "HH:MM" local; null = always
schedule_end: str | None = None
schedule_days: str | None = None # CSV of 0-6 (Mon=0); null = every day
channels: str = "log"
recipients: str | None = None
enabled: bool = True
def _rule_dict(r: AlertRule) -> dict:
return {
"id": r.id, "unit_id": r.unit_id, "name": r.name, "metric": r.metric,
"comparison": r.comparison, "threshold_db": r.threshold_db,
"duration_s": r.duration_s, "clear_margin_db": r.clear_margin_db,
"cooldown_s": r.cooldown_s, "schedule_start": r.schedule_start,
"schedule_end": r.schedule_end, "schedule_days": r.schedule_days,
"channels": r.channels, "recipients": r.recipients, "enabled": r.enabled,
}
def _event_dict(e: AlertEvent) -> dict:
return {
"id": e.id, "rule_id": e.rule_id, "unit_id": e.unit_id,
"rule_name": e.rule_name, "metric": e.metric, "threshold_db": e.threshold_db,
"onset_at": e.onset_at.isoformat() if e.onset_at else None,
"onset_value": e.onset_value, "peak_value": e.peak_value,
"clear_at": e.clear_at.isoformat() if e.clear_at else None,
"status": e.status,
"acknowledged_at": e.acknowledged_at.isoformat() if e.acknowledged_at else None,
"acknowledged_by": e.acknowledged_by,
}
async def _sync_keepalive_to_rules(unit_id: str, db: Session):
"""Keep a unit's monitor running while it has enabled alert rules, so the
evaluator runs 24/7 even with no browser watching. Turns keepalive ON (and
persists monitor_enabled so it survives a restart via the boot auto-start)
when enabled rules exist; never turns it OFF — a device may be kept alive for
other reasons, so operators control that on /admin/slmm."""
has_enabled = (db.query(AlertRule)
.filter_by(unit_id=unit_id, enabled=True).first() is not None)
if not has_enabled:
return
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if cfg and not cfg.monitor_enabled:
cfg.monitor_enabled = True
db.commit()
from app.monitor import monitor_manager
m = await monitor_manager.get(unit_id)
await m.set_keepalive(True)
def _reset_rule_runtime(unit_id: str, rule_id: int, db: Session):
"""After a rule edit/delete: drop its evaluator state machine and close any open
event, so a stale 'active' phase doesn't mis-evaluate against the new config and
the client portal doesn't stay 'in alarm' on a rule that changed or is gone."""
from app.alerts import alert_evaluator
alert_evaluator.forget_rule(unit_id, rule_id)
now = datetime.utcnow()
for evt in db.query(AlertEvent).filter_by(unit_id=unit_id, rule_id=rule_id, status="active").all():
evt.clear_at = now
evt.status = "cleared"
db.commit()
@router.post("/{unit_id}/alerts/rules")
async def create_alert_rule(unit_id: str, payload: AlertRulePayload, db: Session = Depends(get_db)):
rule = AlertRule(unit_id=unit_id, **payload.model_dump())
db.add(rule)
db.commit()
db.refresh(rule)
from app.alerts import alert_evaluator
alert_evaluator.invalidate(unit_id)
await _sync_keepalive_to_rules(unit_id, db)
return {"status": "ok", "rule": _rule_dict(rule)}
@router.get("/{unit_id}/alerts/rules")
def list_alert_rules(unit_id: str, db: Session = Depends(get_db)):
rules = db.query(AlertRule).filter_by(unit_id=unit_id).all()
return {"status": "ok", "rules": [_rule_dict(r) for r in rules]}
@router.put("/{unit_id}/alerts/rules/{rule_id}")
async def update_alert_rule(unit_id: str, rule_id: int, payload: AlertRulePayload, db: Session = Depends(get_db)):
rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first()
if not rule:
raise HTTPException(status_code=404, detail="Alert rule not found")
for field, value in payload.model_dump().items():
setattr(rule, field, value)
db.commit()
db.refresh(rule)
from app.alerts import alert_evaluator
alert_evaluator.invalidate(unit_id)
_reset_rule_runtime(unit_id, rule_id, db)
await _sync_keepalive_to_rules(unit_id, db)
return {"status": "ok", "rule": _rule_dict(rule)}
@router.delete("/{unit_id}/alerts/rules/{rule_id}")
async def delete_alert_rule(unit_id: str, rule_id: int, db: Session = Depends(get_db)):
rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first()
if not rule:
raise HTTPException(status_code=404, detail="Alert rule not found")
db.delete(rule)
db.commit()
from app.alerts import alert_evaluator
alert_evaluator.invalidate(unit_id)
_reset_rule_runtime(unit_id, rule_id, db) # close its open event so the portal doesn't stay red
await _sync_keepalive_to_rules(unit_id, db) # no-op if no enabled rules remain
return {"status": "ok", "deleted": rule_id}
@router.get("/{unit_id}/alerts/events")
def list_alert_events(unit_id: str, limit: int = 50, db: Session = Depends(get_db)):
events = (db.query(AlertEvent).filter_by(unit_id=unit_id)
.order_by(AlertEvent.onset_at.desc()).limit(limit).all())
return {"status": "ok", "events": [_event_dict(e) for e in events]}
@router.post("/{unit_id}/alerts/events/{event_id}/ack")
def ack_alert_event(unit_id: str, event_id: int, by: str | None = None, db: Session = Depends(get_db)):
evt = db.query(AlertEvent).filter_by(id=event_id, unit_id=unit_id).first()
if not evt:
raise HTTPException(status_code=404, detail="Alert event not found")
evt.acknowledged_at = datetime.utcnow()
evt.acknowledged_by = by
db.commit()
return {"status": "ok", "acknowledged": event_id}
# ============================================================================ # ============================================================================
# GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes)
# ============================================================================ # ============================================================================
@@ -197,6 +583,7 @@ def get_roster(db: Session = Depends(get_db)):
"web_enabled": cfg.web_enabled, "web_enabled": cfg.web_enabled,
"poll_enabled": cfg.poll_enabled, "poll_enabled": cfg.poll_enabled,
"poll_interval_seconds": cfg.poll_interval_seconds, "poll_interval_seconds": cfg.poll_interval_seconds,
"monitor_enabled": cfg.monitor_enabled,
"status": None "status": None
} }
@@ -445,11 +832,14 @@ def get_status(unit_id: str, db: Session = Depends(get_db)):
"unit_id": unit_id, "unit_id": unit_id,
"last_seen": status.last_seen.isoformat() if status.last_seen else None, "last_seen": status.last_seen.isoformat() if status.last_seen else None,
"measurement_state": status.measurement_state, "measurement_state": status.measurement_state,
"measurement_start_time": status.measurement_start_time.isoformat() if status.measurement_start_time else None,
"lp": status.lp, "lp": status.lp,
"leq": status.leq, "leq": status.leq,
"lmax": status.lmax, "lmax": status.lmax,
"lmin": status.lmin, "lmin": status.lmin,
"lpeak": status.lpeak, "lpeak": status.lpeak,
"ln1": status.ln1,
"ln2": status.ln2,
"battery_level": status.battery_level, "battery_level": status.battery_level,
"power_source": status.power_source, "power_source": status.power_source,
"sd_remaining_mb": status.sd_remaining_mb, "sd_remaining_mb": status.sd_remaining_mb,
@@ -472,6 +862,8 @@ class StatusPayload(BaseModel):
lmax: str | None = None lmax: str | None = None
lmin: str | None = None lmin: str | None = None
lpeak: str | None = None lpeak: str | None = None
ln1: str | None = None
ln2: str | None = None
battery_level: str | None = None battery_level: str | None = None
power_source: str | None = None power_source: str | None = None
sd_remaining_mb: str | None = None sd_remaining_mb: str | None = None
@@ -499,11 +891,14 @@ def upsert_status(unit_id: str, payload: StatusPayload, db: Session = Depends(ge
"unit_id": unit_id, "unit_id": unit_id,
"last_seen": status.last_seen.isoformat(), "last_seen": status.last_seen.isoformat(),
"measurement_state": status.measurement_state, "measurement_state": status.measurement_state,
"measurement_start_time": status.measurement_start_time.isoformat() if status.measurement_start_time else None,
"lp": status.lp, "lp": status.lp,
"leq": status.leq, "leq": status.leq,
"lmax": status.lmax, "lmax": status.lmax,
"lmin": status.lmin, "lmin": status.lmin,
"lpeak": status.lpeak, "lpeak": status.lpeak,
"ln1": status.ln1,
"ln2": status.ln2,
"battery_level": status.battery_level, "battery_level": status.battery_level,
"power_source": status.power_source, "power_source": status.power_source,
"sd_remaining_mb": status.sd_remaining_mb, "sd_remaining_mb": status.sd_remaining_mb,
@@ -544,7 +939,7 @@ async def start_measurement(unit_id: str, db: Session = Depends(get_db)):
db.expire_all() db.expire_all()
status = db.query(NL43Status).filter_by(unit_id=unit_id).first() status = db.query(NL43Status).filter_by(unit_id=unit_id).first()
logger.info(f"State check: measurement_state={status.measurement_state if status else 'None'}, start_time={status.measurement_start_time if status else 'None'}") logger.info(f"State check: measurement_state={status.measurement_state if status else 'None'}, start_time={status.measurement_start_time if status else 'None'}")
if status and status.measurement_state == "Measure" and status.measurement_start_time: if status and status.measurement_state in ("Start", "Measure") and status.measurement_start_time:
logger.info(f"✓ Measurement state confirmed for {unit_id} with start time {status.measurement_start_time}") logger.info(f"✓ Measurement state confirmed for {unit_id} with start time {status.measurement_start_time}")
break break
@@ -1205,6 +1600,8 @@ async def stream_live(websocket: WebSocket, unit_id: str):
"lmax": snap.lmax, # Maximum level "lmax": snap.lmax, # Maximum level
"lmin": snap.lmin, # Minimum level "lmin": snap.lmin, # Minimum level
"lpeak": snap.lpeak, # Peak level "lpeak": snap.lpeak, # Peak level
"ln1": snap.ln1, # LN1 percentile (L1/L10 contract); null on DRD stream
"ln2": snap.ln2, # LN2 percentile; null on DRD stream
"raw_payload": snap.raw_payload, "raw_payload": snap.raw_payload,
}) })
except Exception as e: except Exception as e:
@@ -1876,6 +2273,8 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)):
"lmax": status.lmax, "lmax": status.lmax,
"lmin": status.lmin, "lmin": status.lmin,
"lpeak": status.lpeak, "lpeak": status.lpeak,
"ln1": status.ln1,
"ln2": status.ln2,
"battery_level": status.battery_level, "battery_level": status.battery_level,
"power_source": status.power_source, "power_source": status.power_source,
"sd_remaining_mb": status.sd_remaining_mb, "sd_remaining_mb": status.sd_remaining_mb,
+76 -31
View File
@@ -46,6 +46,8 @@ class NL43Snapshot:
lmax: Optional[str] = None # Maximum level lmax: Optional[str] = None # Maximum level
lmin: Optional[str] = None # Minimum level lmin: Optional[str] = None # Minimum level
lpeak: Optional[str] = None # Peak level lpeak: Optional[str] = None # Peak level
ln1: Optional[str] = None # Percentile slot LN1 (configurable; device default L5, contract L1)
ln2: Optional[str] = None # Percentile slot LN2 (configurable; device default L10)
battery_level: Optional[str] = None battery_level: Optional[str] = None
power_source: Optional[str] = None power_source: Optional[str] = None
sd_remaining_mb: Optional[str] = None sd_remaining_mb: Optional[str] = None
@@ -69,10 +71,27 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
logger.info(f"State transition check for {s.unit_id}: '{previous_state}' -> '{new_state}'") logger.info(f"State transition check for {s.unit_id}: '{previous_state}' -> '{new_state}'")
# Device returns "Start" when measuring, "Stop" when stopped # The device reports "Start" while measuring; the DOD path uses that string,
# Normalize to previous behavior for backward compatibility # but the DRD stream path tags snapshots "Measure" (and the DOD fallback also
is_measuring = new_state == "Start" # uses "Measure"). Treat ALL of these as "measuring" — otherwise opening and
was_measuring = previous_state == "Start" # closing the live stream flips state "Start"->"Measure"->"Start", which the
# old equality check misread as stop-then-start and reset measurement_start_time.
#
# Also: only act on RECOGNIZED states. A buffer desync on the shared connection
# (e.g. right after a DRD/DOD test) can make a Measure? read return a stray,
# garbled value; treating that as "not measuring" produced constant phantom
# "STOPPED -> STARTED" log pairs and reset the timer. Ignore unknown reads.
MEASURING_STATES = {"Start", "Measure"}
STOPPED_STATES = {"Stop"}
was_measuring = previous_state in MEASURING_STATES
if new_state in MEASURING_STATES:
is_measuring = True
elif new_state in STOPPED_STATES:
is_measuring = False
else:
logger.warning(f"Ignoring unrecognized measurement state for {s.unit_id}: {new_state!r}")
is_measuring = was_measuring # garbled/unknown read — no transition
if not was_measuring and is_measuring: if not was_measuring and is_measuring:
# Measurement just started - record the start time # Measurement just started - record the start time
@@ -95,13 +114,18 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
except Exception: except Exception:
pass pass
row.measurement_state = new_state # Only persist a recognized state so one garbled read can't poison the next
# transition check (which would manufacture the phantom STOPPED/STARTED pair).
if new_state in MEASURING_STATES or new_state in STOPPED_STATES:
row.measurement_state = new_state
row.counter = s.counter row.counter = s.counter
row.lp = s.lp row.lp = s.lp
row.leq = s.leq row.leq = s.leq
row.lmax = s.lmax row.lmax = s.lmax
row.lmin = s.lmin row.lmin = s.lmin
row.lpeak = s.lpeak row.lpeak = s.lpeak
row.ln1 = s.ln1
row.ln2 = s.ln2
row.battery_level = s.battery_level row.battery_level = s.battery_level
row.power_source = s.power_source row.power_source = s.power_source
row.sd_remaining_mb = s.sd_remaining_mb row.sd_remaining_mb = s.sd_remaining_mb
@@ -656,10 +680,12 @@ class NL43Client:
else: else:
raise ValueError(f"Unknown result code: {result_code}") raise ValueError(f"Unknown result code: {result_code}")
async def request_dod(self) -> NL43Snapshot: async def request_dod(self, measurement_state: Optional[str] = None) -> NL43Snapshot:
"""Request DOD (Data Output Display) snapshot from device. """Request DOD (Data Output Display) snapshot from device.
Returns parsed measurement data from the device display. Returns parsed measurement data from the device display. Pass
measurement_state to reuse a cached run state and skip the extra Measure?
round-trip (the state changes rarely); leave it None to query it.
""" """
# _send_command now handles result code validation and returns the data line # _send_command now handles result code validation and returns the data line
resp = await self._send_command("DOD?\r\n") resp = await self._send_command("DOD?\r\n")
@@ -682,31 +708,41 @@ class NL43Client:
logger.info(f"Parsed {len(parts)} data points from DOD response") logger.info(f"Parsed {len(parts)} data points from DOD response")
# Query actual measurement state (DOD doesn't include this information) # DOD doesn't include the run state. Query it only when not supplied by the
try: # caller — the monitor passes a cached state most cycles and refreshes it
measurement_state = await self.get_measurement_state() # occasionally, avoiding a second rate-limited command on every poll.
except Exception as e: if measurement_state is None:
logger.warning(f"Failed to get measurement state, defaulting to 'Measure': {e}") try:
measurement_state = "Measure" measurement_state = await self.get_measurement_state()
except Exception as e:
logger.warning(f"Failed to get measurement state, defaulting to 'Measure': {e}")
measurement_state = "Measure"
snap = NL43Snapshot(unit_id="", raw_payload=resp, measurement_state=measurement_state) snap = NL43Snapshot(unit_id="", raw_payload=resp, measurement_state=measurement_state)
# Parse known positions (based on NL43 communication guide - DRD format) # Parse DOD positional fields. DOD's layout is DIFFERENT from DRD: it has NO
# DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ... # leading counter and it includes LE plus LN1LN5. The device returns 4 channels
# of 16 fields each — [Lp, Leq, LE, Lmax, Lmin, LN1, LN2, LN3, LN4, LN5, Lpeak,
# LIeq, Leq_mov, Ltm5, over, under] — and channel 1 (parts[0:16]) is the main
# display. The previous code reused the DRD map (treating parts[0] as a counter),
# which shifted everything: Lp was reported as the counter, Leq as Lp, LE as Leq,
# and LN1 as Lpeak (you could spot it because "Lpeak" came out < Lmax).
try: try:
# Capture d0 (counter) for timer synchronization
if len(parts) >= 1: if len(parts) >= 1:
snap.counter = parts[0] # d0: Measurement interval counter (1-600) snap.lp = parts[0] # Lp: instantaneous sound pressure level
if len(parts) >= 2: if len(parts) >= 2:
snap.lp = parts[1] # d1: Instantaneous sound pressure level snap.leq = parts[1] # Leq: equivalent continuous level
if len(parts) >= 3: # parts[2] = LE (sound exposure level) — not currently surfaced
snap.leq = parts[2] # d2: Equivalent continuous sound level
if len(parts) >= 4: if len(parts) >= 4:
snap.lmax = parts[3] # d3: Maximum level snap.lmax = parts[3] # Lmax
if len(parts) >= 5: if len(parts) >= 5:
snap.lmin = parts[4] # d4: Minimum level snap.lmin = parts[4] # Lmin
if len(parts) >= 11:
snap.lpeak = parts[10] # Lpeak (parts[5] is LN1, NOT Lpeak)
if len(parts) >= 6: if len(parts) >= 6:
snap.lpeak = parts[5] # d5: Peak level snap.ln1 = parts[5] # LN1 percentile slot (device default L5; contract L1)
if len(parts) >= 7:
snap.ln2 = parts[6] # LN2 percentile slot (device default L10)
except (IndexError, ValueError) as e: except (IndexError, ValueError) as e:
logger.warning(f"Error parsing DOD data points: {e}") logger.warning(f"Error parsing DOD data points: {e}")
@@ -896,15 +932,20 @@ class NL43Client:
# Acquire per-device lock - held for entire streaming session # Acquire per-device lock - held for entire streaming session
device_lock = await _get_device_lock(self.device_key) device_lock = await _get_device_lock(self.device_key)
async with device_lock: async with device_lock:
# Evict any cached connection — streaming needs its own dedicated socket
await _connection_pool.discard(self.device_key)
await self._enforce_rate_limit() await self._enforce_rate_limit()
logger.info(f"Starting DRD stream for {self.device_key}") logger.info(f"Starting DRD stream for {self.device_key}")
# Reuse the pooled connection instead of discard()+reopen. The NL43
# allows only ONE TCP connection at a time, and on a cellular link the
# device does not free its single slot fast enough for an immediate
# reconnect — so a fresh connect times out (the DRD stream failure).
# The per-device lock is held for the whole session, so it already
# blocks the poller; reusing the warm socket keeps us at exactly one
# connection and lets the stream start on the slot commands already use.
try: try:
reader, writer = await _connection_pool._open_connection( reader, writer, from_cache = await _connection_pool.acquire(
self.host, self.port, self.timeout self.device_key, self.host, self.port, self.timeout
) )
except ConnectionError: except ConnectionError:
logger.error(f"DRD stream connection failed to {self.device_key}") logger.error(f"DRD stream connection failed to {self.device_key}")
@@ -981,16 +1022,20 @@ class NL43Client:
break break
finally: finally:
# Send SUB character to stop streaming # Stop streaming on the device (SUB = 0x1A), then return the warm
# connection to the pool so subsequent commands reuse this single
# socket instead of opening a second one. release() returns healthy
# sockets to the pool and closes dead ones; the next acquire()
# drains any residual stop output before reuse.
try: try:
writer.write(b"\x1A") writer.write(b"\x1A")
await writer.drain() await writer.drain()
except Exception: except Exception:
pass pass
writer.close() await _connection_pool.release(
with contextlib.suppress(Exception): self.device_key, reader, writer, self.host, self.port
await writer.wait_closed() )
logger.info(f"DRD stream ended for {self.device_key}") logger.info(f"DRD stream ended for {self.device_key}")
BIN
View File
Binary file not shown.
-9
View File
@@ -1,9 +0,0 @@
2026-03-12 21:29:42,683 - app.main - INFO - Database tables initialized
2026-03-12 21:29:42,684 - app.main - INFO - CORS allowed origins: ['*']
2026-03-12 21:29:42,703 - app.main - INFO - Starting TCP connection pool cleanup task...
2026-03-12 21:29:42,703 - app.services - INFO - Connection pool cleanup task started
2026-03-12 21:29:42,703 - app.main - INFO - Starting background poller...
2026-03-12 21:29:42,703 - app.background_poller - INFO - Background poller task created
2026-03-12 21:29:42,703 - app.main - INFO - Background poller started
2026-03-12 21:29:42,703 - app.background_poller - INFO - Background polling loop started
2026-03-12 21:29:42,708 - app.background_poller - INFO - [POOL] No active connections in pool
+58
View File
@@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""
Migration script to add ln1 and ln2 percentile columns to the nl43_status table.
The NL-43 DOD response carries percentile slots LN1-LN5; the live SLM display
(Terra-View) shows two of them (default L1/L10). This adds storage for the two
surfaced slots. Run once per database to update existing schema.
"""
import sqlite3
import sys
from pathlib import Path
DB_PATH = Path(__file__).parent / "data" / "slmm.db"
def migrate():
"""Add ln1 and ln2 columns to the nl43_status table."""
if not DB_PATH.exists():
print(f"Database not found at {DB_PATH}")
print("No migration needed - database will be created with new schema")
return
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
cursor.execute("PRAGMA table_info(nl43_status)")
columns = [row[1] for row in cursor.fetchall()]
if "ln1" in columns and "ln2" in columns:
print("✓ ln1/ln2 columns already exist, no migration needed")
return
if "ln1" not in columns:
print("Adding ln1 column...")
cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln1 TEXT")
print("✓ Added ln1 column")
if "ln2" not in columns:
print("Adding ln2 column...")
cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln2 TEXT")
print("✓ Added ln2 column")
conn.commit()
print("\n✓ Migration completed successfully!")
except Exception as e:
conn.rollback()
print(f"✗ Migration failed: {e}", file=sys.stderr)
sys.exit(1)
finally:
conn.close()
if __name__ == "__main__":
migrate()
+48
View File
@@ -0,0 +1,48 @@
#!/usr/bin/env python3
"""
Migration: add monitor_enabled column to nl43_config.
Controls whether the live fan-out DOD monitor is kept alive 24/7 for a unit
(which is what makes alerting continuous). Defaults to enabled. Run once per DB.
"""
import sqlite3
import sys
from pathlib import Path
DB_PATH = Path(__file__).parent / "data" / "slmm.db"
def migrate():
if not DB_PATH.exists():
print(f"Database not found at {DB_PATH}")
print("No migration needed - database will be created with new schema")
return
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
cursor.execute("PRAGMA table_info(nl43_config)")
columns = [row[1] for row in cursor.fetchall()]
if "monitor_enabled" in columns:
print("✓ monitor_enabled column already exists, no migration needed")
return
print("Adding monitor_enabled column (default enabled)...")
# SQLite stores booleans as 0/1; default 1 = enabled.
cursor.execute("ALTER TABLE nl43_config ADD COLUMN monitor_enabled BOOLEAN DEFAULT 1")
conn.commit()
print("✓ Added monitor_enabled column")
print("\n✓ Migration completed successfully!")
except Exception as e:
conn.rollback()
print(f"✗ Migration failed: {e}", file=sys.stderr)
sys.exit(1)
finally:
conn.close()
if __name__ == "__main__":
migrate()
+68
View File
@@ -0,0 +1,68 @@
"""
Synthetic unit test for the alert state machine no DB, no device.
Drives `_evaluate_step` with a fake clock + a level series and checks that
onset/clear fire with the right debounce + hysteresis. Run:
docker compose exec -T slmm python3 test_alert_evaluator.py
# or, if app.alerts imports cleanly standalone: python3 test_alert_evaluator.py
"""
from types import SimpleNamespace
from app.alerts import RuleState, _evaluate_step
def rule(**kw):
base = dict(threshold_db=85.0, duration_s=3, clear_margin_db=2.0, comparison="above")
base.update(kw)
return SimpleNamespace(**base)
def run(series, r):
st = RuleState()
events = [(now, a) for value, now in series
if (a := _evaluate_step(st, value, now, r))]
return events, st
def main():
failures = 0
def check(label, cond, detail=""):
nonlocal failures
print(("PASS" if cond else "FAIL"), label, detail)
if not cond:
failures += 1
# 1) sustained exceedance -> onset after duration; recovery -> clear after duration
r = rule(threshold_db=85, duration_s=3, clear_margin_db=2)
ev, _ = run([(80, 0), (86, 1), (87, 2), (88, 3), (88, 4),
(88, 5), (82, 6), (82, 7), (82, 8), (82, 9)], r)
onsets = [t for t, a in ev if a == "onset"]
clears = [t for t, a in ev if a == "clear"]
check("1 sustained onset@4 / clear@9", onsets == [4] and clears == [9], str(ev))
# 2) brief spike under duration -> no onset (debounce)
ev, _ = run([(80, 0), (90, 1), (90, 2), (80, 3), (80, 4)], rule(duration_s=3))
check("2 brief spike debounced", ev == [], str(ev))
# 3) hysteresis: a dip into the margin (below threshold, above threshold-margin)
# does NOT clear
r = rule(threshold_db=85, duration_s=0, clear_margin_db=3)
ev, st = run([(86, 0), (84, 1), (84, 2), (84, 3)], r)
check("3 hysteresis holds ACTIVE", ev == [(0, "onset")] and st.phase == "active",
f"{ev} phase={st.phase}")
# 4) 'below' comparison (device too quiet) -> onset when value < threshold
ev, _ = run([(30, 0), (15, 1)], rule(threshold_db=20, duration_s=0,
clear_margin_db=2, comparison="below"))
check("4 below-comparison onset@1", ev == [(1, "onset")], str(ev))
print()
print("ALL PASS" if failures == 0 else f"{failures} FAILURE(S)")
return failures
if __name__ == "__main__":
import sys
sys.exit(1 if main() else 0)