From a7983d29589d8bcde3a6b6eac532b77dc78aad32 Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 21:53:00 +0000 Subject: [PATCH 01/20] fix: correct DOD field parsing and stop measurement-time resets Two device-data bugs surfaced while scoping the live-feed work: 1. DOD parser misalignment. DOD's response has no leading counter and includes LE + LN1-LN5, but the parser reused the DRD field map (parts[0]=counter). That shifted everything: Lp was stored as the counter, Leq as Lp, LE as Leq, and LN1 as Lpeak (visible because "Lpeak" came out below Lmax, which is impossible). Parse DOD with its own map: Lp=0, Leq=1, Lmax=3, Lmin=4, Lpeak=10 (channel 1 = main). 2. measurement_start_time reset on every live-stream open/close. The DOD path tags state "Start"; the DRD stream path tags "Measure". The transition detector treated only "Start" as measuring, so opening the stream ("Start"->"Measure") read as a stop (cleared start time) and closing it ("Measure"->"Start") read as a start (reset to now). Every viewer reset the elapsed measurement time. Treat {"Start","Measure"} both as measuring. LN1/LN2 (L1/L10) parsing + model/serialization is the next step. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/services.py | 42 +++++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/app/services.py b/app/services.py index 9375c5c..33e947b 100644 --- a/app/services.py +++ b/app/services.py @@ -69,10 +69,16 @@ def persist_snapshot(s: NL43Snapshot, db: Session): logger.info(f"State transition check for {s.unit_id}: '{previous_state}' -> '{new_state}'") - # Device returns "Start" when measuring, "Stop" when stopped - # Normalize to previous behavior for backward compatibility - is_measuring = new_state == "Start" - was_measuring = previous_state == "Start" + # The device reports "Start" while measuring; the DOD path uses that string, + # but the DRD stream path tags snapshots "Measure" (and the DOD fallback also + # uses "Measure"). Treat ALL of these as "measuring" — otherwise opening and + # closing the live stream flips state "Start"->"Measure"->"Start", which the + # old equality check misread as stop-then-start and RESET measurement_start_time + # every single time (the "elapsed time keeps resetting / shows wrong value on + # another computer" bug — and each extra viewer made it worse). + MEASURING_STATES = {"Start", "Measure"} + is_measuring = new_state in MEASURING_STATES + was_measuring = previous_state in MEASURING_STATES if not was_measuring and is_measuring: # Measurement just started - record the start time @@ -691,22 +697,28 @@ class NL43Client: snap = NL43Snapshot(unit_id="", raw_payload=resp, measurement_state=measurement_state) - # Parse known positions (based on NL43 communication guide - DRD format) - # DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ... + # Parse DOD positional fields. DOD's layout is DIFFERENT from DRD: it has NO + # leading counter and it includes LE plus LN1–LN5. The device returns 4 channels + # of 16 fields each — [Lp, Leq, LE, Lmax, Lmin, LN1, LN2, LN3, LN4, LN5, Lpeak, + # LIeq, Leq_mov, Ltm5, over, under] — and channel 1 (parts[0:16]) is the main + # display. The previous code reused the DRD map (treating parts[0] as a counter), + # which shifted everything: Lp was reported as the counter, Leq as Lp, LE as Leq, + # and LN1 as Lpeak (you could spot it because "Lpeak" came out < Lmax). try: - # Capture d0 (counter) for timer synchronization if len(parts) >= 1: - snap.counter = parts[0] # d0: Measurement interval counter (1-600) + snap.lp = parts[0] # Lp: instantaneous sound pressure level if len(parts) >= 2: - snap.lp = parts[1] # d1: Instantaneous sound pressure level - if len(parts) >= 3: - snap.leq = parts[2] # d2: Equivalent continuous sound level + snap.leq = parts[1] # Leq: equivalent continuous level + # parts[2] = LE (sound exposure level) — not currently surfaced if len(parts) >= 4: - snap.lmax = parts[3] # d3: Maximum level + snap.lmax = parts[3] # Lmax if len(parts) >= 5: - snap.lmin = parts[4] # d4: Minimum level - if len(parts) >= 6: - snap.lpeak = parts[5] # d5: Peak level + snap.lmin = parts[4] # Lmin + if len(parts) >= 11: + snap.lpeak = parts[10] # Lpeak (parts[5] is LN1, NOT Lpeak) + # LN1/LN2 percentiles live at parts[5]/parts[6] (the L1/L10 display contract). + # Surfaced as snap.ln1/snap.ln2 once those fields are added to the snapshot + # dataclass + NL43Status model — next step on this branch. except (IndexError, ValueError) as e: logger.warning(f"Error parsing DOD data points: {e}") -- 2.52.0 From 51dd6b682da055e837f587663dfaf0c2692ba3a1 Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 22:01:31 +0000 Subject: [PATCH 02/20] feat: surface LN1/LN2 (L1/L10) percentiles through SLMM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completes the SLMM side of the L1/L10 live-display contract. The NL-43's DOD response carries percentile slots LN1-LN5 (channel 1, parts[5]/[6]); parse the first two and expose them as ln1/ln2 end to end: - NL43Snapshot dataclass: ln1/ln2 fields - NL43Status model: ln1/ln2 columns (+ migrate_add_ln_percentiles.py) - DOD parser: snap.ln1=parts[5], snap.ln2=parts[6] - persist_snapshot writes them - all /status data dicts, StatusPayload, and the DRD stream payload emit ln1/ln2 (null on the DRD stream itself, which doesn't carry percentiles) Labels: device LN1 defaults to L5, not L1 — Terra-View defaults the label to L1/L10, so the device's Ln1/Ln2 slots must be set to 1%/10% for the labels to be accurate (dynamic label emission is a follow-up). Co-Authored-By: Claude Opus 4.8 (1M context) --- app/models.py | 2 ++ app/routers.py | 10 ++++++ app/services.py | 11 +++++-- migrate_add_ln_percentiles.py | 58 +++++++++++++++++++++++++++++++++++ 4 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 migrate_add_ln_percentiles.py diff --git a/app/models.py b/app/models.py index 4c86514..906043f 100644 --- a/app/models.py +++ b/app/models.py @@ -41,6 +41,8 @@ class NL43Status(Base): lmax = Column(String, nullable=True) # Maximum level lmin = Column(String, nullable=True) # Minimum level lpeak = Column(String, nullable=True) # Peak level + ln1 = Column(String, nullable=True) # Percentile slot LN1 (configurable; device default L5, contract L1) + ln2 = Column(String, nullable=True) # Percentile slot LN2 (configurable; device default L10) battery_level = Column(String, nullable=True) power_source = Column(String, nullable=True) sd_remaining_mb = Column(String, nullable=True) diff --git a/app/routers.py b/app/routers.py index a21c928..9ee6bae 100644 --- a/app/routers.py +++ b/app/routers.py @@ -450,6 +450,8 @@ def get_status(unit_id: str, db: Session = Depends(get_db)): "lmax": status.lmax, "lmin": status.lmin, "lpeak": status.lpeak, + "ln1": status.ln1, + "ln2": status.ln2, "battery_level": status.battery_level, "power_source": status.power_source, "sd_remaining_mb": status.sd_remaining_mb, @@ -472,6 +474,8 @@ class StatusPayload(BaseModel): lmax: str | None = None lmin: str | None = None lpeak: str | None = None + ln1: str | None = None + ln2: str | None = None battery_level: str | None = None power_source: str | None = None sd_remaining_mb: str | None = None @@ -504,6 +508,8 @@ def upsert_status(unit_id: str, payload: StatusPayload, db: Session = Depends(ge "lmax": status.lmax, "lmin": status.lmin, "lpeak": status.lpeak, + "ln1": status.ln1, + "ln2": status.ln2, "battery_level": status.battery_level, "power_source": status.power_source, "sd_remaining_mb": status.sd_remaining_mb, @@ -1205,6 +1211,8 @@ async def stream_live(websocket: WebSocket, unit_id: str): "lmax": snap.lmax, # Maximum level "lmin": snap.lmin, # Minimum level "lpeak": snap.lpeak, # Peak level + "ln1": snap.ln1, # LN1 percentile (L1/L10 contract); null on DRD stream + "ln2": snap.ln2, # LN2 percentile; null on DRD stream "raw_payload": snap.raw_payload, }) except Exception as e: @@ -1876,6 +1884,8 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)): "lmax": status.lmax, "lmin": status.lmin, "lpeak": status.lpeak, + "ln1": status.ln1, + "ln2": status.ln2, "battery_level": status.battery_level, "power_source": status.power_source, "sd_remaining_mb": status.sd_remaining_mb, diff --git a/app/services.py b/app/services.py index 33e947b..5e15a06 100644 --- a/app/services.py +++ b/app/services.py @@ -46,6 +46,8 @@ class NL43Snapshot: lmax: Optional[str] = None # Maximum level lmin: Optional[str] = None # Minimum level lpeak: Optional[str] = None # Peak level + ln1: Optional[str] = None # Percentile slot LN1 (configurable; device default L5, contract L1) + ln2: Optional[str] = None # Percentile slot LN2 (configurable; device default L10) battery_level: Optional[str] = None power_source: Optional[str] = None sd_remaining_mb: Optional[str] = None @@ -108,6 +110,8 @@ def persist_snapshot(s: NL43Snapshot, db: Session): row.lmax = s.lmax row.lmin = s.lmin row.lpeak = s.lpeak + row.ln1 = s.ln1 + row.ln2 = s.ln2 row.battery_level = s.battery_level row.power_source = s.power_source row.sd_remaining_mb = s.sd_remaining_mb @@ -716,9 +720,10 @@ class NL43Client: snap.lmin = parts[4] # Lmin if len(parts) >= 11: snap.lpeak = parts[10] # Lpeak (parts[5] is LN1, NOT Lpeak) - # LN1/LN2 percentiles live at parts[5]/parts[6] (the L1/L10 display contract). - # Surfaced as snap.ln1/snap.ln2 once those fields are added to the snapshot - # dataclass + NL43Status model — next step on this branch. + if len(parts) >= 6: + snap.ln1 = parts[5] # LN1 percentile slot (device default L5; contract L1) + if len(parts) >= 7: + snap.ln2 = parts[6] # LN2 percentile slot (device default L10) except (IndexError, ValueError) as e: logger.warning(f"Error parsing DOD data points: {e}") diff --git a/migrate_add_ln_percentiles.py b/migrate_add_ln_percentiles.py new file mode 100644 index 0000000..2e27b34 --- /dev/null +++ b/migrate_add_ln_percentiles.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +""" +Migration script to add ln1 and ln2 percentile columns to the nl43_status table. + +The NL-43 DOD response carries percentile slots LN1-LN5; the live SLM display +(Terra-View) shows two of them (default L1/L10). This adds storage for the two +surfaced slots. Run once per database to update existing schema. +""" + +import sqlite3 +import sys +from pathlib import Path + +DB_PATH = Path(__file__).parent / "data" / "slmm.db" + + +def migrate(): + """Add ln1 and ln2 columns to the nl43_status table.""" + + if not DB_PATH.exists(): + print(f"Database not found at {DB_PATH}") + print("No migration needed - database will be created with new schema") + return + + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + try: + cursor.execute("PRAGMA table_info(nl43_status)") + columns = [row[1] for row in cursor.fetchall()] + + if "ln1" in columns and "ln2" in columns: + print("✓ ln1/ln2 columns already exist, no migration needed") + return + + if "ln1" not in columns: + print("Adding ln1 column...") + cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln1 TEXT") + print("✓ Added ln1 column") + + if "ln2" not in columns: + print("Adding ln2 column...") + cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln2 TEXT") + print("✓ Added ln2 column") + + conn.commit() + print("\n✓ Migration completed successfully!") + + except Exception as e: + conn.rollback() + print(f"✗ Migration failed: {e}", file=sys.stderr) + sys.exit(1) + finally: + conn.close() + + +if __name__ == "__main__": + migrate() -- 2.52.0 From 0793e7df01d066541a1026ad80ee5f97b30199b8 Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 22:40:56 +0000 Subject: [PATCH 03/20] feat: add per-device disconnect endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit POST /api/nl43/{unit_id}/disconnect cleanly closes (TCP FIN + wait_closed) and drops the pooled connection for a single device, freeing the NL43's one connection slot. Previously only /_connections/flush existed, which tears down every device at once. Idempotent; no-op if nothing is cached. Releases the idle pooled connection only — an active DRD stream/command has the socket checked out of the pool, so close the stream WebSocket to end a live stream. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/routers.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/app/routers.py b/app/routers.py index 9ee6bae..98673de 100644 --- a/app/routers.py +++ b/app/routers.py @@ -121,6 +121,38 @@ async def flush_connection_pool(): return {"status": "ok", "message": "All cached connections closed"} +@router.post("/{unit_id}/disconnect") +async def disconnect_device(unit_id: str, db: Session = Depends(get_db)): + """Cleanly close SLMM's persistent TCP connection to a single device. + + Gracefully closes (TCP FIN + wait_closed) the pooled connection for this + device and removes it from the pool, freeing the NL43's single connection + slot. Idempotent — a no-op if no connection is currently cached. + + Note: this releases the *idle* pooled connection. It does not interrupt an + in-progress DRD stream or an in-flight command (those have the socket + checked out of the pool) — close the stream WebSocket to end a live stream. + """ + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if not cfg: + raise HTTPException(status_code=404, detail="NL43 config not found") + + from app.services import _connection_pool + + device_key = f"{cfg.host}:{cfg.tcp_port}" + had_conn = device_key in _connection_pool.get_stats().get("connections", {}) + + await _connection_pool.discard(device_key) + + return { + "status": "ok", + "unit_id": unit_id, + "device_key": device_key, + "disconnected": had_conn, + "message": "Connection closed" if had_conn else "No cached connection to close", + } + + # ============================================================================ # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # ============================================================================ -- 2.52.0 From b954eb8c89e47f7b29aecc2c2fffd3d9d5d448cb Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 22:45:52 +0000 Subject: [PATCH 04/20] feat: per-unit deactivate and global SLMM standby MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lets an instance stop occupying a device's single TCP connection slot so another instance (e.g. prod) can take over. Per-unit: - POST /api/nl43/{unit_id}/deactivate — poll_enabled=False (persisted) + drop the connection (waits up to 10s for in-flight ops via the device lock, then discards). Unit stays dormant across restarts. - POST /api/nl43/{unit_id}/activate — re-enable polling. Global standby: - POST /api/nl43/_system/standby — poller idles and releases ALL connections; the loop keeps re-releasing so the instance holds no slots. - POST /api/nl43/_system/resume — resume polling. - GET /api/nl43/_system/status — active vs standby + active_connections. - SLMM_POLLING_ENABLED=false starts an instance in standby (persistent way to keep a dev box from latching onto a prod-owned device). Co-Authored-By: Claude Opus 4.8 (1M context) --- app/background_poller.py | 48 +++++++++++++++++++-- app/routers.py | 93 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 4 deletions(-) diff --git a/app/background_poller.py b/app/background_poller.py index 64bcc60..071fc31 100644 --- a/app/background_poller.py +++ b/app/background_poller.py @@ -8,6 +8,7 @@ for fast API access without querying devices on every request. import asyncio import logging +import os from datetime import datetime, timedelta from typing import Optional @@ -20,6 +21,11 @@ from app.device_logger import log_device_event, cleanup_old_logs logger = logging.getLogger(__name__) +# Global polling default. Set SLMM_POLLING_ENABLED=false to start an instance in +# standby (running but not polling and not holding device connections) — e.g. a +# dev box that must not latch onto a device that a prod instance owns. +POLLING_ENABLED_DEFAULT = os.getenv("SLMM_POLLING_ENABLED", "true").lower() == "true" + class BackgroundPoller: """ @@ -39,6 +45,7 @@ class BackgroundPoller: self._logger = logger self._last_cleanup = None # Track last log cleanup time self._last_pool_log = None # Track last connection pool heartbeat log + self._active = POLLING_ENABLED_DEFAULT # Global polling on/off (standby toggle) async def start(self): """Start the background polling task.""" @@ -71,15 +78,48 @@ class BackgroundPoller: self._logger.info("Background poller stopped") + def is_active(self) -> bool: + """Whether background polling is currently active (vs standby).""" + return self._active + + async def set_active(self, active: bool): + """Globally enable/disable polling at runtime. + + When deactivated, the loop stays alive but polls nothing and releases all + device connections, so this SLMM instance stops occupying the devices' + single connection slots (e.g. so a prod instance can take over). Runtime + state only — on restart the instance returns to SLMM_POLLING_ENABLED. + """ + self._active = active + if active: + self._logger.info("[SYSTEM] Background polling ACTIVATED") + else: + self._logger.info("[SYSTEM] Background polling DEACTIVATED (standby) — releasing connections") + await self._release_all_connections() + + async def _release_all_connections(self): + """Gracefully close every pooled device connection (no-op if none).""" + from app.services import _connection_pool + for device_key in list(_connection_pool.get_stats().get("connections", {})): + await _connection_pool.discard(device_key) + async def _poll_loop(self): """Main polling loop that runs continuously.""" self._logger.info("Background polling loop started") while self._running: - try: - await self._poll_all_devices() - except Exception as e: - self._logger.error(f"Error in poll loop: {e}", exc_info=True) + if self._active: + try: + await self._poll_all_devices() + except Exception as e: + self._logger.error(f"Error in poll loop: {e}", exc_info=True) + else: + # Standby: poll nothing, and keep holding no device connection slots + # so another SLMM instance (e.g. prod) can talk to the devices. + try: + await self._release_all_connections() + except Exception as e: + self._logger.warning(f"Standby connection release failed: {e}") # Run log cleanup once per hour try: diff --git a/app/routers.py b/app/routers.py index 98673de..6b7cede 100644 --- a/app/routers.py +++ b/app/routers.py @@ -153,6 +153,99 @@ async def disconnect_device(unit_id: str, db: Session = Depends(get_db)): } +@router.post("/{unit_id}/deactivate") +async def deactivate_device(unit_id: str, db: Session = Depends(get_db)): + """Make a single unit dormant: stop background polling for it AND drop its + connection, freeing the device's connection slot. poll_enabled=False is + persisted, so the unit stays dormant across restarts until /activate. + """ + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if not cfg: + raise HTTPException(status_code=404, detail="NL43 config not found") + + cfg.poll_enabled = False + db.commit() + + from app.services import _connection_pool, _get_device_lock + + device_key = f"{cfg.host}:{cfg.tcp_port}" + + # Wait briefly for any in-flight poll/command to finish (so its connection is + # back in the pool), then drop it. If a long-lived stream holds the lock we + # don't block forever — discard the pooled connection regardless. + lock = await _get_device_lock(device_key) + acquired = False + try: + await asyncio.wait_for(lock.acquire(), timeout=10.0) + acquired = True + except asyncio.TimeoutError: + acquired = False + try: + await _connection_pool.discard(device_key) + finally: + if acquired: + lock.release() + + return { + "status": "ok", + "unit_id": unit_id, + "poll_enabled": False, + "message": "Polling disabled and connection closed for this unit", + } + + +@router.post("/{unit_id}/activate") +async def activate_device(unit_id: str, db: Session = Depends(get_db)): + """Resume background polling for a unit previously deactivated.""" + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if not cfg: + raise HTTPException(status_code=404, detail="NL43 config not found") + + cfg.poll_enabled = True + db.commit() + + return { + "status": "ok", + "unit_id": unit_id, + "poll_enabled": True, + "message": "Polling enabled for this unit", + } + + +@router.get("/_system/status") +async def system_status(): + """Report whether this SLMM instance is actively polling or in standby.""" + from app.background_poller import poller + from app.services import _connection_pool + return { + "status": "ok", + "mode": "active" if poller.is_active() else "standby", + "polling_active": poller.is_active(), + "active_connections": _connection_pool.get_stats().get("active_connections", 0), + } + + +@router.post("/_system/standby") +async def system_standby(): + """Put this SLMM instance into standby: stop polling ALL devices and release + every connection, so it stops occupying device slots (e.g. so a prod instance + can take over). Runtime-only — on restart the instance returns to its + SLMM_POLLING_ENABLED default. + """ + from app.background_poller import poller + await poller.set_active(False) + return {"status": "ok", "mode": "standby", + "message": "Polling stopped and all device connections released"} + + +@router.post("/_system/resume") +async def system_resume(): + """Resume polling after standby (global).""" + from app.background_poller import poller + await poller.set_active(True) + return {"status": "ok", "mode": "active", "message": "Polling resumed"} + + # ============================================================================ # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # ============================================================================ -- 2.52.0 From 8c17af4849fa8ec4231ab55fd3bc7a2f5ded040e Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 22:50:18 +0000 Subject: [PATCH 05/20] fix: ignore garbled measurement-state reads (phantom STOPPED/STARTED) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A buffer desync on the shared persistent connection (commonly right after a DRD/DOD test) can make a Measure? read return a stray value. The state classifier treated anything not in {"Start","Measure"} as "not measuring", so a garbled read logged a phantom STOPPED, the next clean read logged STARTED, and that reset measurement_start_time — producing constant STOPPED/STARTED device-log pairs and a drifting elapsed timer. Now only recognized states drive transitions: {"Start","Measure"} = measuring, {"Stop"} = stopped, anything else = no change. Garbled reads are also not persisted as the cached state, so they can't poison the next transition check. Builds on the earlier Start<->Measure normalization. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/services.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/app/services.py b/app/services.py index 5e15a06..b5c68ea 100644 --- a/app/services.py +++ b/app/services.py @@ -75,13 +75,24 @@ def persist_snapshot(s: NL43Snapshot, db: Session): # but the DRD stream path tags snapshots "Measure" (and the DOD fallback also # uses "Measure"). Treat ALL of these as "measuring" — otherwise opening and # closing the live stream flips state "Start"->"Measure"->"Start", which the - # old equality check misread as stop-then-start and RESET measurement_start_time - # every single time (the "elapsed time keeps resetting / shows wrong value on - # another computer" bug — and each extra viewer made it worse). + # old equality check misread as stop-then-start and reset measurement_start_time. + # + # Also: only act on RECOGNIZED states. A buffer desync on the shared connection + # (e.g. right after a DRD/DOD test) can make a Measure? read return a stray, + # garbled value; treating that as "not measuring" produced constant phantom + # "STOPPED -> STARTED" log pairs and reset the timer. Ignore unknown reads. MEASURING_STATES = {"Start", "Measure"} - is_measuring = new_state in MEASURING_STATES + STOPPED_STATES = {"Stop"} was_measuring = previous_state in MEASURING_STATES + if new_state in MEASURING_STATES: + is_measuring = True + elif new_state in STOPPED_STATES: + is_measuring = False + else: + logger.warning(f"Ignoring unrecognized measurement state for {s.unit_id}: {new_state!r}") + is_measuring = was_measuring # garbled/unknown read — no transition + if not was_measuring and is_measuring: # Measurement just started - record the start time row.measurement_start_time = datetime.utcnow() @@ -103,7 +114,10 @@ def persist_snapshot(s: NL43Snapshot, db: Session): except Exception: pass - row.measurement_state = new_state + # Only persist a recognized state so one garbled read can't poison the next + # transition check (which would manufacture the phantom STOPPED/STARTED pair). + if new_state in MEASURING_STATES or new_state in STOPPED_STATES: + row.measurement_state = new_state row.counter = s.counter row.lp = s.lp row.leq = s.leq -- 2.52.0 From aa3e088b64622ffb2adbb112430af2ef216746ce Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 23:27:05 +0000 Subject: [PATCH 06/20] feat: per-device live monitor (fan-out) + alert evaluator (POC) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The piece the live-view + alerting work was building toward. monitor.py — one DOD poll loop per device, broadcast to many subscribers: - browser WebSockets (fixes the single-connection "second viewer sees nothing" contention — browsers no longer each open a device stream) - the alert evaluator (can keep a feed running with no browser via /monitor/start, so alerting runs continuously) - persistence (each snapshot written like the poller) DOD-sourced, so the broadcast carries ln1/ln2 (which DRD cannot). All polls go through the existing per-device lock + pool, so it serializes safely with the background poller and on-demand commands. alerts.py — pluggable POC evaluator: fires (logs) when ALERT_METRIC exceeds ALERT_THRESHOLD_DB with an ALERT_COOLDOWN_SECONDS cooldown. The rule (instantaneous vs sustained vs L10) is the single swap point; dispatch is a server log for now (email/SMS later). Endpoints: - WS /api/nl43/{unit_id}/monitor subscribe to the shared feed - POST /api/nl43/{unit_id}/monitor/start keep feed alive w/o a browser - POST /api/nl43/{unit_id}/monitor/stop drop the keep-alive - GET /api/nl43/_monitor/status running/subscribers/keepalive WS endpoint races queue.get() against a disconnect watcher so an idle feed still detects client drop and doesn't leak a subscription. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/alerts.py | 71 ++++++++++++++++++++ app/monitor.py | 176 +++++++++++++++++++++++++++++++++++++++++++++++++ app/routers.py | 74 +++++++++++++++++++++ 3 files changed, 321 insertions(+) create mode 100644 app/alerts.py create mode 100644 app/monitor.py diff --git a/app/alerts.py b/app/alerts.py new file mode 100644 index 0000000..7c001e2 --- /dev/null +++ b/app/alerts.py @@ -0,0 +1,71 @@ +""" +Alert evaluation (POC). + +Receives each monitor snapshot and fires an alert when a configured metric +exceeds a threshold, with a cooldown so a sustained loud period doesn't spam. + +The RULE here is intentionally simple and swappable. Instantaneous Lp vs a +sustained window vs L10 is still an open design decision — this evaluator is the +single plug point for it. For the POC the rule is "instantaneous metric > +threshold, rate-limited by a cooldown", and dispatch is just a server-side log. +Wire email/SMS (likely via a Terra-View webhook) into _dispatch() later. + +Config via env: +- ALERT_ENABLED (default true) +- ALERT_METRIC which snapshot field to test: lp/leq/lmax/ln1/ln2 (default lp) +- ALERT_THRESHOLD_DB numeric dB threshold (default 85) +- ALERT_COOLDOWN_SECONDS min seconds between alerts per unit (default 60) +""" + +import asyncio +import logging +import os +from typing import Dict + +logger = logging.getLogger(__name__) + + +class AlertEvaluator: + def __init__(self): + self.enabled = os.getenv("ALERT_ENABLED", "true").lower() == "true" + self.metric = os.getenv("ALERT_METRIC", "lp").lower() + self.threshold_db = float(os.getenv("ALERT_THRESHOLD_DB", "85")) + self.cooldown_s = float(os.getenv("ALERT_COOLDOWN_SECONDS", "60")) + self._last_fired: Dict[str, float] = {} + logger.info( + f"[ALERT] evaluator ready: enabled={self.enabled} metric={self.metric} " + f"threshold={self.threshold_db}dB cooldown={self.cooldown_s}s" + ) + + async def evaluate(self, unit_id: str, snap) -> None: + """Evaluate one snapshot; fire (log) if the metric exceeds threshold.""" + if not self.enabled: + return + + raw = getattr(snap, self.metric, None) + try: + level = float(raw) + except (TypeError, ValueError): + return # missing / non-numeric (e.g. "-.-") + + if level <= self.threshold_db: + return + + # Cooldown — use the event loop clock (Math.random/Date.now-free). + now = asyncio.get_running_loop().time() + if now - self._last_fired.get(unit_id, 0.0) < self.cooldown_s: + return + self._last_fired[unit_id] = now + + await self._dispatch(unit_id, level) + + async def _dispatch(self, unit_id: str, level: float) -> None: + """POC dispatch: server-side log. Swap in email/SMS here later.""" + logger.warning( + f"[ALERT] {unit_id}: {self.metric.upper()}={level:.1f} dB exceeded " + f"threshold {self.threshold_db:.1f} dB" + ) + + +# Module-level singleton +alert_evaluator = AlertEvaluator() diff --git a/app/monitor.py b/app/monitor.py new file mode 100644 index 0000000..10e190c --- /dev/null +++ b/app/monitor.py @@ -0,0 +1,176 @@ +""" +Per-device live monitor (fan-out hub). + +ONE DOD poll loop per device, broadcast to many subscribers: +- browser WebSocket clients (live view) — they no longer each open their own + device stream, so the NL43's single-connection limit stops causing the + "second viewer sees nothing" contention. +- the alert evaluator (threshold alerts), which can keep a device's feed running + even with no browser attached. +- persistence (each snapshot is written to NL43Status, like the poller does). + +The device's one TCP connection is respected: every poll goes through the same +per-device lock + connection pool in services.py, so the monitor, the background +poller, and on-demand commands all serialize safely. +""" + +import asyncio +import logging +import os +from datetime import datetime +from typing import Dict, Optional, Set + +from app.database import SessionLocal +from app.models import NL43Config, NL43Status +from app.services import NL43Client, persist_snapshot +from app.alerts import alert_evaluator + +logger = logging.getLogger(__name__) + +# Sleep between DOD polls. Note the 1s device rate-limit (and DOD?+Measure? per +# poll) already paces the effective rate to a few seconds; this is the extra idle. +MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "1.0")) + + +def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict: + """Build the broadcast payload — same shape as the DRD stream, but DOD-sourced + so it carries ln1/ln2 (which DRD cannot).""" + return { + "unit_id": unit_id, + "timestamp": datetime.utcnow().isoformat(), + "measurement_state": snap.measurement_state, + "measurement_start_time": measurement_start_time, + "counter": snap.counter, + "lp": snap.lp, + "leq": snap.leq, + "lmax": snap.lmax, + "lmin": snap.lmin, + "lpeak": snap.lpeak, + "ln1": snap.ln1, + "ln2": snap.ln2, + "raw_payload": snap.raw_payload, + } + + +class DeviceMonitor: + """Owns a single DOD poll loop for one device and fans each snapshot out to + all subscribers. Runs while it has at least one browser subscriber OR the + server-side keep-alive (alerting) flag is set.""" + + def __init__(self, unit_id: str): + self.unit_id = unit_id + self._subscribers: Set[asyncio.Queue] = set() + self._keepalive = False + self._task: Optional[asyncio.Task] = None + self._lock = asyncio.Lock() + + @property + def running(self) -> bool: + return self._task is not None and not self._task.done() + + def subscriber_count(self) -> int: + return len(self._subscribers) + + def _has_demand(self) -> bool: + return bool(self._subscribers) or self._keepalive + + def _ensure_task(self) -> None: + if self._task is None or self._task.done(): + self._task = asyncio.create_task(self._run()) + + async def subscribe(self) -> asyncio.Queue: + q: asyncio.Queue = asyncio.Queue(maxsize=5) + async with self._lock: + self._subscribers.add(q) + self._ensure_task() + return q + + async def unsubscribe(self, q: asyncio.Queue) -> None: + async with self._lock: + self._subscribers.discard(q) + + async def set_keepalive(self, on: bool) -> None: + async with self._lock: + self._keepalive = on + if on: + self._ensure_task() + + async def _run(self) -> None: + logger.info(f"[MONITOR] {self.unit_id}: feed started") + try: + while self._has_demand(): + snap, mst = await self._poll_once() + if snap is not None: + payload = _snapshot_payload(snap, self.unit_id, mst) + self._broadcast(payload) + try: + await alert_evaluator.evaluate(self.unit_id, snap) + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}") + await asyncio.sleep(MONITOR_POLL_INTERVAL) + finally: + logger.info(f"[MONITOR] {self.unit_id}: feed stopped") + + async def _poll_once(self): + """One DOD poll: read, persist, return (snapshot, measurement_start_iso).""" + db = SessionLocal() + try: + cfg = db.query(NL43Config).filter_by(unit_id=self.unit_id).first() + if not cfg or not cfg.tcp_enabled: + return None, None + client = NL43Client( + cfg.host, cfg.tcp_port, + ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password, + ftp_port=cfg.ftp_port or 21, + ) + snap = await client.request_dod() + snap.unit_id = self.unit_id + persist_snapshot(snap, db) + db.commit() + status = db.query(NL43Status).filter_by(unit_id=self.unit_id).first() + mst = (status.measurement_start_time.isoformat() + if status and status.measurement_start_time else None) + return snap, mst + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: poll failed: {e}") + return None, None + finally: + db.close() + + def _broadcast(self, payload: dict) -> None: + for q in list(self._subscribers): + try: + q.put_nowait(payload) + except asyncio.QueueFull: + # Slow consumer — drop this frame rather than stall the whole feed. + pass + + +class MonitorManager: + """Registry of per-device monitors (one per unit_id).""" + + def __init__(self): + self._monitors: Dict[str, DeviceMonitor] = {} + self._lock = asyncio.Lock() + + async def get(self, unit_id: str) -> DeviceMonitor: + async with self._lock: + m = self._monitors.get(unit_id) + if m is None: + m = DeviceMonitor(unit_id) + self._monitors[unit_id] = m + return m + + def status(self) -> dict: + return { + uid: { + "running": m.running, + "subscribers": m.subscriber_count(), + "keepalive": m._keepalive, + } + for uid, m in self._monitors.items() + } + + +# Module-level singleton +monitor_manager = MonitorManager() diff --git a/app/routers.py b/app/routers.py index 6b7cede..1031d84 100644 --- a/app/routers.py +++ b/app/routers.py @@ -246,6 +246,80 @@ async def system_resume(): return {"status": "ok", "mode": "active", "message": "Polling resumed"} +# ============================================================================ +# LIVE MONITOR (fan-out) — one DOD feed per device, broadcast to many clients +# ============================================================================ + +@router.websocket("/{unit_id}/monitor") +async def monitor_stream(websocket: WebSocket, unit_id: str): + """Subscribe a browser to the device's shared 1 Hz DOD feed. + + Any number of clients can attach without each opening its own device + connection (one poll loop per device, fanned out). Same JSON shape as the + DRD stream, but DOD-sourced so it includes ln1/ln2 (L1/L10). + """ + await websocket.accept() + from app.monitor import monitor_manager + + monitor = await monitor_manager.get(unit_id) + queue = await monitor.subscribe() + logger.info(f"Monitor subscriber attached for {unit_id} ({monitor.subscriber_count()} total)") + + async def _watch_disconnect(): + # Completes when the client disconnects, so an idle feed (no data) still + # detects the drop and we don't leak a subscription that keeps the device + # feed (and its connection) alive. + try: + while True: + msg = await websocket.receive() + if msg.get("type") == "websocket.disconnect": + return + except Exception: + return + + gone = asyncio.ensure_future(_watch_disconnect()) + try: + while not gone.done(): + try: + payload = await asyncio.wait_for(queue.get(), timeout=1.0) + except asyncio.TimeoutError: + continue # re-check gone.done() + await websocket.send_json(payload) + except WebSocketDisconnect: + logger.info(f"Monitor subscriber disconnected for {unit_id}") + except Exception as e: + logger.warning(f"Monitor stream error for {unit_id}: {e}") + finally: + gone.cancel() + await monitor.unsubscribe(queue) + + +@router.post("/{unit_id}/monitor/start") +async def monitor_start(unit_id: str): + """Keep the device's feed running even with no browser attached, so alerting + evaluates continuously. Runtime-only (resets on restart).""" + from app.monitor import monitor_manager + monitor = await monitor_manager.get(unit_id) + await monitor.set_keepalive(True) + return {"status": "ok", "unit_id": unit_id, "running": monitor.running, "keepalive": True} + + +@router.post("/{unit_id}/monitor/stop") +async def monitor_stop(unit_id: str): + """Drop the keep-alive; the feed stops once no browser subscribers remain.""" + from app.monitor import monitor_manager + monitor = await monitor_manager.get(unit_id) + await monitor.set_keepalive(False) + return {"status": "ok", "unit_id": unit_id, "keepalive": False} + + +@router.get("/_monitor/status") +async def monitor_status(): + """Status of every device monitor (running, subscriber count, keep-alive).""" + from app.monitor import monitor_manager + return {"status": "ok", "monitors": monitor_manager.status()} + + # ============================================================================ # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # ============================================================================ -- 2.52.0 From 9c43e6853406100a5dd6c11052148c166fc9332e Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 9 Jun 2026 01:04:03 +0000 Subject: [PATCH 07/20] =?UTF-8?q?feat:=20alert=20engine=20stage=201=20?= =?UTF-8?q?=E2=80=94=20rules,=20events,=20state=20machine,=20CRUD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the POC single-threshold check with a real per-rule engine over the live monitor feed. - AlertRule / AlertEvent tables (auto-created via create_all; no migration). Rule = {metric, comparison, threshold_db, duration_s, clear_margin_db, schedule, channels, recipients}. - alerts.py: per-(unit,rule) state machine IDLE->ACTIVE->IDLE with duration debounce (both edges) + clear_margin hysteresis; onset/clear are distinct events; optional nighttime schedule; rule cache w/ invalidation. The state-machine core (_evaluate_step) is pure (no DB/clock) for testing. - Dispatch is a server log (POC); _dispatch() is the seam for a Terra-View webhook (email/SMS) later. - CRUD: POST/GET/PUT/DELETE /{unit}/alerts/rules, GET /{unit}/alerts/events, POST /{unit}/alerts/events/{id}/ack. - test_alert_evaluator.py: synthetic level series proves onset debounce, spike rejection, hysteresis hold, and below-comparison (4/4 pass, no device). Source-agnostic: the same rules transfer unchanged if a unit's feed is later sourced from FTP intervals instead of the DOD monitor. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/alerts.py | 265 +++++++++++++++++++++++++++++++++------- app/models.py | 52 +++++++- app/routers.py | 107 +++++++++++++++- test_alert_evaluator.py | 68 +++++++++++ 4 files changed, 444 insertions(+), 48 deletions(-) create mode 100644 test_alert_evaluator.py diff --git a/app/alerts.py b/app/alerts.py index 7c001e2..1fb84e7 100644 --- a/app/alerts.py +++ b/app/alerts.py @@ -1,71 +1,244 @@ """ -Alert evaluation (POC). +Threshold alert engine. -Receives each monitor snapshot and fires an alert when a configured metric -exceeds a threshold, with a cooldown so a sustained loud period doesn't spam. +Each unit can have any number of AlertRules. A rule is evaluated against the +unit's live monitor snapshots via a small per-(unit, rule) state machine: -The RULE here is intentionally simple and swappable. Instantaneous Lp vs a -sustained window vs L10 is still an open design decision — this evaluator is the -single plug point for it. For the POC the rule is "instantaneous metric > -threshold, rate-limited by a cooldown", and dispatch is just a server-side log. -Wire email/SMS (likely via a Terra-View webhook) into _dispatch() later. + IDLE --(metric exceeds threshold for duration_s)--> ACTIVE (fire ONSET) + ACTIVE --(metric recovers past hysteresis for duration_s)--> IDLE (fire CLEAR) -Config via env: -- ALERT_ENABLED (default true) -- ALERT_METRIC which snapshot field to test: lp/leq/lmax/ln1/ln2 (default lp) -- ALERT_THRESHOLD_DB numeric dB threshold (default 85) -- ALERT_COOLDOWN_SECONDS min seconds between alerts per unit (default 60) +duration_s debounces both edges; clear_margin_db adds hysteresis so a level +hovering at the threshold doesn't flap. Onset and clear are distinct events. + +The state-machine logic (`_evaluate_step`) is intentionally pure — no DB, no +real clock — so it can be unit-tested with a synthetic level series and a fake +clock. The AlertEvaluator wraps it with rule loading, scheduling, persistence, +and dispatch. Dispatch is a server log for now (POC); the seam to POST events to +a Terra-View webhook (email/SMS) is _dispatch(). """ import asyncio import logging import os -from typing import Dict +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Tuple logger = logging.getLogger(__name__) +# Local timezone offset for schedule windows (same env var services.py uses). +_TZ_OFFSET_HOURS = float(os.getenv("TIMEZONE_OFFSET", "-5")) + +# How long to cache a unit's rules before re-querying the DB (rules change rarely). +_RULE_CACHE_TTL_S = 15.0 + + +@dataclass +class RuleState: + """In-memory runtime state for one (unit, rule).""" + phase: str = "idle" # "idle" | "active" + edge_since: Optional[float] = None # when the current edge condition began (clock time) + peak: float = 0.0 + event_id: Optional[int] = None # the open AlertEvent row (for the clear update) + + +def _exceeds(value: float, rule) -> bool: + if rule.comparison == "below": + return value < rule.threshold_db + return value > rule.threshold_db + + +def _recovered(value: float, rule) -> bool: + margin = rule.clear_margin_db or 0.0 + if rule.comparison == "below": + return value > rule.threshold_db + margin + return value < rule.threshold_db - margin + + +def _evaluate_step(state: RuleState, value: float, now: float, rule) -> Optional[str]: + """Advance the state machine by one reading. + + Pure: mutates `state`, returns 'onset' | 'clear' | None. `now` is injected so + tests can drive a fake clock. + """ + duration = rule.duration_s or 0 + + if state.phase == "idle": + if _exceeds(value, rule): + if state.edge_since is None: + state.edge_since = now + if now - state.edge_since >= duration: + state.phase = "active" + state.edge_since = None + state.peak = value + return "onset" + else: + state.edge_since = None + return None + + # active + if rule.comparison == "below": + state.peak = min(state.peak, value) + else: + state.peak = max(state.peak, value) + + if _recovered(value, rule): + if state.edge_since is None: + state.edge_since = now + if now - state.edge_since >= duration: + state.phase = "idle" + state.edge_since = None + return "clear" + else: + state.edge_since = None + return None + + +def _in_window(now_minutes: int, start: str, end: str) -> bool: + """Is now_minutes (minutes since local midnight) within [start, end)? + Handles wraparound windows like 22:00–07:00.""" + def _m(s: str) -> int: + h, m = s.split(":") + return int(h) * 60 + int(m) + s, e = _m(start), _m(end) + if s == e: + return True + if s < e: + return s <= now_minutes < e + return now_minutes >= s or now_minutes < e # wraparound + class AlertEvaluator: def __init__(self): - self.enabled = os.getenv("ALERT_ENABLED", "true").lower() == "true" - self.metric = os.getenv("ALERT_METRIC", "lp").lower() - self.threshold_db = float(os.getenv("ALERT_THRESHOLD_DB", "85")) - self.cooldown_s = float(os.getenv("ALERT_COOLDOWN_SECONDS", "60")) - self._last_fired: Dict[str, float] = {} - logger.info( - f"[ALERT] evaluator ready: enabled={self.enabled} metric={self.metric} " - f"threshold={self.threshold_db}dB cooldown={self.cooldown_s}s" - ) + self._states: Dict[Tuple[str, int], RuleState] = {} + self._rule_cache: Dict[str, Tuple[float, list]] = {} # unit_id -> (fetched_at, rules) + logger.info("[ALERT] rule-based evaluator ready") async def evaluate(self, unit_id: str, snap) -> None: - """Evaluate one snapshot; fire (log) if the metric exceeds threshold.""" - if not self.enabled: + """Evaluate every enabled rule for this unit against one snapshot.""" + rules = self._get_rules(unit_id) + if not rules: return - - raw = getattr(snap, self.metric, None) - try: - level = float(raw) - except (TypeError, ValueError): - return # missing / non-numeric (e.g. "-.-") - - if level <= self.threshold_db: - return - - # Cooldown — use the event loop clock (Math.random/Date.now-free). now = asyncio.get_running_loop().time() - if now - self._last_fired.get(unit_id, 0.0) < self.cooldown_s: - return - self._last_fired[unit_id] = now + for rule in rules: + if not self._in_schedule(rule): + continue + raw = getattr(snap, rule.metric, None) + try: + value = float(raw) + except (TypeError, ValueError): + continue # missing / non-numeric ("-.-") + state = self._states.setdefault((unit_id, rule.id), RuleState()) + action = _evaluate_step(state, value, now, rule) + if action == "onset": + await self._on_onset(unit_id, rule, value, state) + elif action == "clear": + await self._on_clear(unit_id, rule, value, state) - await self._dispatch(unit_id, level) + # -- rule loading (cached) ---------------------------------------------- - async def _dispatch(self, unit_id: str, level: float) -> None: - """POC dispatch: server-side log. Swap in email/SMS here later.""" - logger.warning( - f"[ALERT] {unit_id}: {self.metric.upper()}={level:.1f} dB exceeded " - f"threshold {self.threshold_db:.1f} dB" + def _get_rules(self, unit_id: str) -> list: + loop_now = asyncio.get_running_loop().time() + cached = self._rule_cache.get(unit_id) + if cached and loop_now - cached[0] < _RULE_CACHE_TTL_S: + return cached[1] + rules = self._load_rules(unit_id) + self._rule_cache[unit_id] = (loop_now, rules) + return rules + + def _load_rules(self, unit_id: str) -> list: + from app.database import SessionLocal + from app.models import AlertRule + db = SessionLocal() + try: + return db.query(AlertRule).filter_by(unit_id=unit_id, enabled=True).all() + except Exception as e: + logger.warning(f"[ALERT] failed to load rules for {unit_id}: {e}") + return [] + finally: + db.close() + + def invalidate(self, unit_id: Optional[str] = None) -> None: + """Drop cached rules so a change is picked up immediately.""" + if unit_id is None: + self._rule_cache.clear() + else: + self._rule_cache.pop(unit_id, None) + + # -- scheduling ---------------------------------------------------------- + + def _in_schedule(self, rule) -> bool: + if not rule.schedule_start or not rule.schedule_end: + day_ok = self._day_ok(rule) + return day_ok + local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS) + if not self._day_ok(rule, local): + return False + return _in_window(local.hour * 60 + local.minute, rule.schedule_start, rule.schedule_end) + + @staticmethod + def _day_ok(rule, local: Optional[datetime] = None) -> bool: + if not rule.schedule_days: + return True + if local is None: + local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS) + allowed = {int(d) for d in str(rule.schedule_days).split(",") if d.strip() != ""} + return local.weekday() in allowed # Mon=0 + + # -- event persistence + dispatch --------------------------------------- + + async def _on_onset(self, unit_id: str, rule, value: float, state: RuleState) -> None: + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + try: + evt = AlertEvent( + rule_id=rule.id, unit_id=unit_id, rule_name=rule.name, + metric=rule.metric, threshold_db=rule.threshold_db, + onset_value=value, peak_value=value, status="active", + ) + db.add(evt) + db.commit() + db.refresh(evt) + state.event_id = evt.id + except Exception as e: + logger.warning(f"[ALERT] failed to record onset for {unit_id}: {e}") + finally: + db.close() + await self._dispatch( + "ONSET", unit_id, rule, + f"{rule.metric.upper()}={value:.1f} dB " + f"{'<' if rule.comparison == 'below' else '>'} {rule.threshold_db:.1f} dB" + f"{f' for {rule.duration_s}s' if rule.duration_s else ''}", ) + async def _on_clear(self, unit_id: str, rule, value: float, state: RuleState) -> None: + peak = state.peak + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + try: + if state.event_id is not None: + evt = db.query(AlertEvent).filter_by(id=state.event_id).first() + if evt: + evt.clear_at = datetime.utcnow() + evt.peak_value = peak + evt.status = "cleared" + db.commit() + except Exception as e: + logger.warning(f"[ALERT] failed to record clear for {unit_id}: {e}") + finally: + db.close() + state.event_id = None + await self._dispatch( + "CLEAR", unit_id, rule, + f"recovered to {value:.1f} dB (peak {peak:.1f} dB)", + ) -# Module-level singleton + async def _dispatch(self, kind: str, unit_id: str, rule, detail: str) -> None: + """POC dispatch: server log. Swap in a Terra-View webhook (email/SMS) here.""" + logger.warning(f"[ALERT:{kind}] {unit_id} '{rule.name}': {detail}") + + +# Module-level singleton (the monitor calls alert_evaluator.evaluate per snapshot) alert_evaluator = AlertEvaluator() diff --git a/app/models.py b/app/models.py index 906043f..67e08ed 100644 --- a/app/models.py +++ b/app/models.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, String, DateTime, Boolean, Integer, Text, func +from sqlalchemy import Column, String, DateTime, Boolean, Integer, Float, Text, func from app.database import Base @@ -74,3 +74,53 @@ class DeviceLog(Base): level = Column(String, default="INFO") # DEBUG, INFO, WARNING, ERROR category = Column(String, default="GENERAL") # TCP, FTP, POLL, COMMAND, STATE, SYNC message = Column(Text, nullable=False) + + +class AlertRule(Base): + """A threshold-alert rule evaluated against a unit's live monitor feed. + + Source-agnostic: today it runs over the DOD monitor; the same rule transfers + unchanged if a unit's feed is later sourced from FTP intervals. + """ + + __tablename__ = "alert_rules" + + id = Column(Integer, primary_key=True, autoincrement=True) + unit_id = Column(String, index=True, nullable=False) + name = Column(String, nullable=False, default="Alert") + metric = Column(String, nullable=False, default="lp") # lp/leq/lmax/lmin/lpeak/ln1/ln2 + comparison = Column(String, nullable=False, default="above") # above | below + threshold_db = Column(Float, nullable=False) + duration_s = Column(Integer, nullable=False, default=0) # sustained seconds (0 = instant) + clear_margin_db = Column(Float, nullable=False, default=2.0) # hysteresis band + cooldown_s = Column(Integer, nullable=False, default=300) # min seconds between onsets + # Optional time-of-day scoping (local time). schedule_start/end as "HH:MM"; + # null = always active. schedule_days = CSV of 0-6 (Mon=0); null = every day. + schedule_start = Column(String, nullable=True) + schedule_end = Column(String, nullable=True) + schedule_days = Column(String, nullable=True) + channels = Column(String, nullable=False, default="log") # CSV: log,email,sms + recipients = Column(Text, nullable=True) # CSV of emails/phones + enabled = Column(Boolean, default=True) + created_at = Column(DateTime, default=func.now()) + + +class AlertEvent(Base): + """A fired alert (onset → clear), for history / inbox / acknowledgement.""" + + __tablename__ = "alert_events" + + id = Column(Integer, primary_key=True, autoincrement=True) + rule_id = Column(Integer, index=True, nullable=False) + unit_id = Column(String, index=True, nullable=False) + rule_name = Column(String, nullable=True) + metric = Column(String, nullable=False) + threshold_db = Column(Float, nullable=False) + onset_at = Column(DateTime, default=func.now(), index=True) + onset_value = Column(Float, nullable=True) + peak_value = Column(Float, nullable=True) + clear_at = Column(DateTime, nullable=True) + status = Column(String, default="active") # active | cleared + acknowledged_at = Column(DateTime, nullable=True) + acknowledged_by = Column(String, nullable=True) + notes = Column(Text, nullable=True) diff --git a/app/routers.py b/app/routers.py index 1031d84..f317e15 100644 --- a/app/routers.py +++ b/app/routers.py @@ -11,7 +11,7 @@ import os import asyncio from app.database import get_db -from app.models import NL43Config, NL43Status +from app.models import NL43Config, NL43Status, AlertRule, AlertEvent from app.services import NL43Client, persist_snapshot logger = logging.getLogger(__name__) @@ -320,6 +320,111 @@ async def monitor_status(): return {"status": "ok", "monitors": monitor_manager.status()} +# ============================================================================ +# ALERTS — threshold rules + fired events +# ============================================================================ + +class AlertRulePayload(BaseModel): + name: str = "Alert" + metric: str = "lp" # lp/leq/lmax/lmin/lpeak/ln1/ln2 + comparison: str = "above" # above | below + threshold_db: float + duration_s: int = 0 # sustained seconds before firing (0 = instant) + clear_margin_db: float = 2.0 # hysteresis band + cooldown_s: int = 300 + schedule_start: str | None = None # "HH:MM" local; null = always + schedule_end: str | None = None + schedule_days: str | None = None # CSV of 0-6 (Mon=0); null = every day + channels: str = "log" + recipients: str | None = None + enabled: bool = True + + +def _rule_dict(r: AlertRule) -> dict: + return { + "id": r.id, "unit_id": r.unit_id, "name": r.name, "metric": r.metric, + "comparison": r.comparison, "threshold_db": r.threshold_db, + "duration_s": r.duration_s, "clear_margin_db": r.clear_margin_db, + "cooldown_s": r.cooldown_s, "schedule_start": r.schedule_start, + "schedule_end": r.schedule_end, "schedule_days": r.schedule_days, + "channels": r.channels, "recipients": r.recipients, "enabled": r.enabled, + } + + +def _event_dict(e: AlertEvent) -> dict: + return { + "id": e.id, "rule_id": e.rule_id, "unit_id": e.unit_id, + "rule_name": e.rule_name, "metric": e.metric, "threshold_db": e.threshold_db, + "onset_at": e.onset_at.isoformat() if e.onset_at else None, + "onset_value": e.onset_value, "peak_value": e.peak_value, + "clear_at": e.clear_at.isoformat() if e.clear_at else None, + "status": e.status, + "acknowledged_at": e.acknowledged_at.isoformat() if e.acknowledged_at else None, + "acknowledged_by": e.acknowledged_by, + } + + +@router.post("/{unit_id}/alerts/rules") +def create_alert_rule(unit_id: str, payload: AlertRulePayload, db: Session = Depends(get_db)): + rule = AlertRule(unit_id=unit_id, **payload.model_dump()) + db.add(rule) + db.commit() + db.refresh(rule) + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + return {"status": "ok", "rule": _rule_dict(rule)} + + +@router.get("/{unit_id}/alerts/rules") +def list_alert_rules(unit_id: str, db: Session = Depends(get_db)): + rules = db.query(AlertRule).filter_by(unit_id=unit_id).all() + return {"status": "ok", "rules": [_rule_dict(r) for r in rules]} + + +@router.put("/{unit_id}/alerts/rules/{rule_id}") +def update_alert_rule(unit_id: str, rule_id: int, payload: AlertRulePayload, db: Session = Depends(get_db)): + rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first() + if not rule: + raise HTTPException(status_code=404, detail="Alert rule not found") + for field, value in payload.model_dump().items(): + setattr(rule, field, value) + db.commit() + db.refresh(rule) + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + return {"status": "ok", "rule": _rule_dict(rule)} + + +@router.delete("/{unit_id}/alerts/rules/{rule_id}") +def delete_alert_rule(unit_id: str, rule_id: int, db: Session = Depends(get_db)): + rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first() + if not rule: + raise HTTPException(status_code=404, detail="Alert rule not found") + db.delete(rule) + db.commit() + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + return {"status": "ok", "deleted": rule_id} + + +@router.get("/{unit_id}/alerts/events") +def list_alert_events(unit_id: str, limit: int = 50, db: Session = Depends(get_db)): + events = (db.query(AlertEvent).filter_by(unit_id=unit_id) + .order_by(AlertEvent.onset_at.desc()).limit(limit).all()) + return {"status": "ok", "events": [_event_dict(e) for e in events]} + + +@router.post("/{unit_id}/alerts/events/{event_id}/ack") +def ack_alert_event(unit_id: str, event_id: int, by: str | None = None, db: Session = Depends(get_db)): + evt = db.query(AlertEvent).filter_by(id=event_id, unit_id=unit_id).first() + if not evt: + raise HTTPException(status_code=404, detail="Alert event not found") + evt.acknowledged_at = datetime.utcnow() + evt.acknowledged_by = by + db.commit() + return {"status": "ok", "acknowledged": event_id} + + # ============================================================================ # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # ============================================================================ diff --git a/test_alert_evaluator.py b/test_alert_evaluator.py new file mode 100644 index 0000000..5bacc62 --- /dev/null +++ b/test_alert_evaluator.py @@ -0,0 +1,68 @@ +""" +Synthetic unit test for the alert state machine — no DB, no device. + +Drives `_evaluate_step` with a fake clock + a level series and checks that +onset/clear fire with the right debounce + hysteresis. Run: + + docker compose exec -T slmm python3 test_alert_evaluator.py + # or, if app.alerts imports cleanly standalone: python3 test_alert_evaluator.py +""" + +from types import SimpleNamespace +from app.alerts import RuleState, _evaluate_step + + +def rule(**kw): + base = dict(threshold_db=85.0, duration_s=3, clear_margin_db=2.0, comparison="above") + base.update(kw) + return SimpleNamespace(**base) + + +def run(series, r): + st = RuleState() + events = [(now, a) for value, now in series + if (a := _evaluate_step(st, value, now, r))] + return events, st + + +def main(): + failures = 0 + + def check(label, cond, detail=""): + nonlocal failures + print(("PASS" if cond else "FAIL"), label, detail) + if not cond: + failures += 1 + + # 1) sustained exceedance -> onset after duration; recovery -> clear after duration + r = rule(threshold_db=85, duration_s=3, clear_margin_db=2) + ev, _ = run([(80, 0), (86, 1), (87, 2), (88, 3), (88, 4), + (88, 5), (82, 6), (82, 7), (82, 8), (82, 9)], r) + onsets = [t for t, a in ev if a == "onset"] + clears = [t for t, a in ev if a == "clear"] + check("1 sustained onset@4 / clear@9", onsets == [4] and clears == [9], str(ev)) + + # 2) brief spike under duration -> no onset (debounce) + ev, _ = run([(80, 0), (90, 1), (90, 2), (80, 3), (80, 4)], rule(duration_s=3)) + check("2 brief spike debounced", ev == [], str(ev)) + + # 3) hysteresis: a dip into the margin (below threshold, above threshold-margin) + # does NOT clear + r = rule(threshold_db=85, duration_s=0, clear_margin_db=3) + ev, st = run([(86, 0), (84, 1), (84, 2), (84, 3)], r) + check("3 hysteresis holds ACTIVE", ev == [(0, "onset")] and st.phase == "active", + f"{ev} phase={st.phase}") + + # 4) 'below' comparison (device too quiet) -> onset when value < threshold + ev, _ = run([(30, 0), (15, 1)], rule(threshold_db=20, duration_s=0, + clear_margin_db=2, comparison="below")) + check("4 below-comparison onset@1", ev == [(1, "onset")], str(ev)) + + print() + print("ALL PASS" if failures == 0 else f"{failures} FAILURE(S)") + return failures + + +if __name__ == "__main__": + import sys + sys.exit(1 if main() else 0) -- 2.52.0 From 6b1ec753964aa1a3cb65248655ceb66cac65af63 Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 9 Jun 2026 17:13:21 +0000 Subject: [PATCH 08/20] =?UTF-8?q?feat:=20harden=20fan-out=20for=20live=20c?= =?UTF-8?q?lients=20=E2=80=94=20instant=20first=20frame=20+=20offline=20st?= =?UTF-8?q?atus?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For multiple clients connecting to a live feed (e.g. the client portal): - cache the last broadcast frame and replay it to a new subscriber on connect, so a client sees data immediately instead of waiting a full poll cycle. - broadcast a {"feed_status":"unreachable"} frame once on transition (after 3 consecutive poll failures) so clients can render an offline state instead of a frozen chart; data frames now carry "feed_status":"ok". The cached frame reflects current state, so a client connecting while offline gets "unreachable" right away too. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/monitor.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/app/monitor.py b/app/monitor.py index 10e190c..1177893 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -63,6 +63,9 @@ class DeviceMonitor: self._keepalive = False self._task: Optional[asyncio.Task] = None self._lock = asyncio.Lock() + self._last_payload: Optional[dict] = None # replayed to new subscribers + self._consec_fail = 0 + self._reachable = True # last broadcast reachability (for transition frames) @property def running(self) -> bool: @@ -82,6 +85,13 @@ class DeviceMonitor: q: asyncio.Queue = asyncio.Queue(maxsize=5) async with self._lock: self._subscribers.add(q) + # Replay the last frame so a client connecting mid-stream sees data + # (or the current 'unreachable' state) immediately, not after a poll. + if self._last_payload is not None: + try: + q.put_nowait(self._last_payload) + except asyncio.QueueFull: + pass self._ensure_task() return q @@ -101,12 +111,26 @@ class DeviceMonitor: while self._has_demand(): snap, mst = await self._poll_once() if snap is not None: + self._consec_fail = 0 + self._reachable = True payload = _snapshot_payload(snap, self.unit_id, mst) + payload["feed_status"] = "ok" self._broadcast(payload) try: await alert_evaluator.evaluate(self.unit_id, snap) except Exception as e: logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}") + else: + # Tell clients the device went offline — once, on transition, after a + # few failures so a momentary blip doesn't flap the UI. + self._consec_fail += 1 + if self._reachable and self._consec_fail >= 3: + self._reachable = False + self._broadcast({ + "unit_id": self.unit_id, + "timestamp": datetime.utcnow().isoformat(), + "feed_status": "unreachable", + }) await asyncio.sleep(MONITOR_POLL_INTERVAL) finally: logger.info(f"[MONITOR] {self.unit_id}: feed stopped") @@ -138,6 +162,7 @@ class DeviceMonitor: db.close() def _broadcast(self, payload: dict) -> None: + self._last_payload = payload # cache for replay to new subscribers for q in list(self._subscribers): try: q.put_nowait(payload) -- 2.52.0 From ba622c67d848de1a7c525d09a8c5feff2e0aebf8 Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 9 Jun 2026 17:33:29 +0000 Subject: [PATCH 09/20] feat: monitor heartbeat + background poller skips active-monitored units MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Heartbeat: if nothing has been broadcast in MONITOR_HEARTBEAT_S (default 25s) — e.g. device offline and silent — send a non-cached keepalive frame so a reverse proxy (NPM) doesn't drop the idle WS. New subscribers still get the last real frame, not a heartbeat. - Poller-skip: the 60s background poller now skips any unit with a running monitor (MonitorManager.is_active). The monitor already polls it ~1Hz and keeps the status cache fresh, so the background poll was redundant and just added load/lock-contention on the device's single connection (and churn, which matters for the cellular wedge). Trade-off: the FTP start-time sync (only in the poller) doesn't run while a unit is actively monitored — fine, since reports take the authoritative start time from the FTP .rnd data. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/background_poller.py | 9 +++++++++ app/monitor.py | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/app/background_poller.py b/app/background_poller.py index 071fc31..0148671 100644 --- a/app/background_poller.py +++ b/app/background_poller.py @@ -178,10 +178,19 @@ class BackgroundPoller: now = datetime.utcnow() polled_count = 0 + from app.monitor import monitor_manager + for cfg in configs: if not self._running: break + # Skip units with an active live monitor: it polls them at ~1Hz and + # keeps the status cache fresh, so a redundant background poll would just + # add load/lock-contention on the device's single connection. + if monitor_manager.is_active(cfg.unit_id): + self._logger.debug(f"Skipping {cfg.unit_id} — live monitor active") + continue + # Get current status status = db.query(NL43Status).filter_by(unit_id=cfg.unit_id).first() diff --git a/app/monitor.py b/app/monitor.py index 1177893..47c21be 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -31,6 +31,10 @@ logger = logging.getLogger(__name__) # poll) already paces the effective rate to a few seconds; this is the extra idle. MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "1.0")) +# If nothing has been broadcast in this many seconds (e.g. device offline and +# silent), send a keepalive frame so reverse proxies don't drop the idle WS. +MONITOR_HEARTBEAT_S = float(os.getenv("MONITOR_HEARTBEAT_S", "25")) + def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict: """Build the broadcast payload — same shape as the DRD stream, but DOD-sourced @@ -107,6 +111,8 @@ class DeviceMonitor: async def _run(self) -> None: logger.info(f"[MONITOR] {self.unit_id}: feed started") + loop = asyncio.get_running_loop() + last_send = loop.time() try: while self._has_demand(): snap, mst = await self._poll_once() @@ -116,6 +122,7 @@ class DeviceMonitor: payload = _snapshot_payload(snap, self.unit_id, mst) payload["feed_status"] = "ok" self._broadcast(payload) + last_send = loop.time() try: await alert_evaluator.evaluate(self.unit_id, snap) except Exception as e: @@ -131,6 +138,20 @@ class DeviceMonitor: "timestamp": datetime.utcnow().isoformat(), "feed_status": "unreachable", }) + last_send = loop.time() + + # Heartbeat: during quiet/offline stretches, send a keepalive so an + # idle WS isn't dropped by a reverse proxy. Not cached (new subscribers + # should still get the last real frame, not a heartbeat). + if loop.time() - last_send >= MONITOR_HEARTBEAT_S: + self._broadcast({ + "unit_id": self.unit_id, + "timestamp": datetime.utcnow().isoformat(), + "feed_status": "ok" if self._reachable else "unreachable", + "heartbeat": True, + }, cache=False) + last_send = loop.time() + await asyncio.sleep(MONITOR_POLL_INTERVAL) finally: logger.info(f"[MONITOR] {self.unit_id}: feed stopped") @@ -161,8 +182,9 @@ class DeviceMonitor: finally: db.close() - def _broadcast(self, payload: dict) -> None: - self._last_payload = payload # cache for replay to new subscribers + def _broadcast(self, payload: dict, cache: bool = True) -> None: + if cache: + self._last_payload = payload # replayed to new subscribers for q in list(self._subscribers): try: q.put_nowait(payload) @@ -186,6 +208,12 @@ class MonitorManager: self._monitors[unit_id] = m return m + def is_active(self, unit_id: str) -> bool: + """True if this unit has a running monitor feed (so the background poller + can skip it — the monitor already polls it more often).""" + m = self._monitors.get(unit_id) + return m is not None and m.running + def status(self) -> dict: return { uid: { -- 2.52.0 From 9d34779171694270b302df18dd835f0192de2507 Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 9 Jun 2026 18:52:13 +0000 Subject: [PATCH 10/20] perf: monitor caches run state, ~halving live-feed latency Each monitor poll was sending DOD? + Measure? (two commands), and the NL43 enforces >=1s between commands, so updates were ~2.5s apart. The run state changes rarely, so cache it and refresh via Measure? only every MONITOR_STATE_REFRESH_S (default 30s); most polls now send just DOD? (one rate-limited command) -> ~1.3s/update. Also trim MONITOR_POLL_INTERVAL to 0.25s since the device rate-limit is the real pacer. request_dod() gains an optional measurement_state arg: when supplied it reuses that state and skips the Measure? round-trip; None preserves the old query-every-time behavior. ~1Hz is the device floor for DOD (the >=1s command spacing); DRD's 10Hz push isn't reachable via polling, but ~1s is a normal cadence for SLM levels. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/monitor.py | 26 ++++++++++++++++++++++---- app/services.py | 21 +++++++++++++-------- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/app/monitor.py b/app/monitor.py index 47c21be..5f7c8b3 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -27,9 +27,14 @@ from app.alerts import alert_evaluator logger = logging.getLogger(__name__) -# Sleep between DOD polls. Note the 1s device rate-limit (and DOD?+Measure? per -# poll) already paces the effective rate to a few seconds; this is the extra idle. -MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "1.0")) +# Extra idle between DOD polls. The 1s device rate-limit already paces consecutive +# DOD? commands, so this just needs to be small — the rate-limit is the real floor. +MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "0.25")) + +# How often to refresh the run state (Measure?). It changes rarely, so we cache it +# and skip that second rate-limited command on most polls — roughly halving the +# per-update latency (~2.5s -> ~1.3s). +MONITOR_STATE_REFRESH_S = float(os.getenv("MONITOR_STATE_REFRESH_S", "30")) # If nothing has been broadcast in this many seconds (e.g. device offline and # silent), send a keepalive frame so reverse proxies don't drop the idle WS. @@ -70,6 +75,8 @@ class DeviceMonitor: self._last_payload: Optional[dict] = None # replayed to new subscribers self._consec_fail = 0 self._reachable = True # last broadcast reachability (for transition frames) + self._cached_state: Optional[str] = None # run state, refreshed periodically + self._last_state_refresh = 0.0 @property def running(self) -> bool: @@ -168,7 +175,18 @@ class DeviceMonitor: ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password, ftp_port=cfg.ftp_port or 21, ) - snap = await client.request_dod() + # Refresh the run state only every MONITOR_STATE_REFRESH_S; reuse the + # cached state otherwise so most polls send just DOD? (one rate-limited + # command) instead of DOD? + Measure?. + now = asyncio.get_running_loop().time() + refresh_state = (self._cached_state is None + or now - self._last_state_refresh >= MONITOR_STATE_REFRESH_S) + snap = await client.request_dod( + measurement_state=None if refresh_state else self._cached_state + ) + if refresh_state: + self._cached_state = snap.measurement_state + self._last_state_refresh = now snap.unit_id = self.unit_id persist_snapshot(snap, db) db.commit() diff --git a/app/services.py b/app/services.py index b5c68ea..c608226 100644 --- a/app/services.py +++ b/app/services.py @@ -680,10 +680,12 @@ class NL43Client: else: raise ValueError(f"Unknown result code: {result_code}") - async def request_dod(self) -> NL43Snapshot: + async def request_dod(self, measurement_state: Optional[str] = None) -> NL43Snapshot: """Request DOD (Data Output Display) snapshot from device. - Returns parsed measurement data from the device display. + Returns parsed measurement data from the device display. Pass + measurement_state to reuse a cached run state and skip the extra Measure? + round-trip (the state changes rarely); leave it None to query it. """ # _send_command now handles result code validation and returns the data line resp = await self._send_command("DOD?\r\n") @@ -706,12 +708,15 @@ class NL43Client: logger.info(f"Parsed {len(parts)} data points from DOD response") - # Query actual measurement state (DOD doesn't include this information) - try: - measurement_state = await self.get_measurement_state() - except Exception as e: - logger.warning(f"Failed to get measurement state, defaulting to 'Measure': {e}") - measurement_state = "Measure" + # DOD doesn't include the run state. Query it only when not supplied by the + # caller — the monitor passes a cached state most cycles and refreshes it + # occasionally, avoiding a second rate-limited command on every poll. + if measurement_state is None: + try: + measurement_state = await self.get_measurement_state() + except Exception as e: + logger.warning(f"Failed to get measurement state, defaulting to 'Measure': {e}") + measurement_state = "Measure" snap = NL43Snapshot(unit_id="", raw_payload=resp, measurement_state=measurement_state) -- 2.52.0 From 43e72ae3c331c94133675409bcfe0db21601c83b Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 9 Jun 2026 19:27:25 +0000 Subject: [PATCH 11/20] feat: persistent monitor_enabled flag + auto-start keepalive on boot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Makes live monitoring (and therefore alerting) genuinely 24/7 and restart-surviving, instead of runtime-only keepalive. - NL43Config.monitor_enabled (default True) + migrate_add_monitor_enabled.py. - On startup, auto-start keepalive monitors for every monitor_enabled + tcp_enabled unit — so feeds/alerts resume after a restart with no manual step. - /monitor/start and /monitor/stop now PERSIST monitor_enabled (start=True, stop=False) in addition to applying keepalive at runtime, so the toggle sticks. Roster output includes monitor_enabled for the admin UI to read. On by default: configure a unit -> it's monitored 24/7 unless toggled off. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/main.py | 19 ++++++++++++++ app/models.py | 4 +++ app/routers.py | 25 +++++++++++++----- migrate_add_monitor_enabled.py | 48 ++++++++++++++++++++++++++++++++++ 4 files changed, 89 insertions(+), 7 deletions(-) create mode 100644 migrate_add_monitor_enabled.py diff --git a/app/main.py b/app/main.py index 4db5847..74f94c7 100644 --- a/app/main.py +++ b/app/main.py @@ -38,6 +38,25 @@ async def lifespan(app: FastAPI): await poller.start() logger.info("Background poller started") + # Auto-start keepalive live monitors for units configured for 24/7 monitoring + # (monitor_enabled). This is what keeps alerting running unattended across + # restarts — without it a feed only runs while someone has the live view open. + try: + from app.monitor import monitor_manager + from app.database import SessionLocal + from app.models import NL43Config + db = SessionLocal() + try: + units = db.query(NL43Config).filter_by(monitor_enabled=True, tcp_enabled=True).all() + for cfg in units: + m = await monitor_manager.get(cfg.unit_id) + await m.set_keepalive(True) + logger.info(f"Auto-started keepalive monitor for {cfg.unit_id}") + finally: + db.close() + except Exception as e: + logger.error(f"Failed to auto-start monitors: {e}") + yield # Application runs # Shutdown diff --git a/app/models.py b/app/models.py index 67e08ed..026aad6 100644 --- a/app/models.py +++ b/app/models.py @@ -23,6 +23,10 @@ class NL43Config(Base): poll_interval_seconds = Column(Integer, nullable=True, default=60) # Polling interval (10-3600 seconds) poll_enabled = Column(Boolean, default=True) # Enable/disable background polling for this device + # Live monitor (fan-out DOD feed). Keepalive runs it 24/7 even with no viewer, + # which is what makes alerting continuous. On by default; toggleable from the UI. + monitor_enabled = Column(Boolean, default=True) + class NL43Status(Base): """ diff --git a/app/routers.py b/app/routers.py index f317e15..80aeaaa 100644 --- a/app/routers.py +++ b/app/routers.py @@ -295,22 +295,32 @@ async def monitor_stream(websocket: WebSocket, unit_id: str): @router.post("/{unit_id}/monitor/start") -async def monitor_start(unit_id: str): - """Keep the device's feed running even with no browser attached, so alerting - evaluates continuously. Runtime-only (resets on restart).""" +async def monitor_start(unit_id: str, db: Session = Depends(get_db)): + """Enable 24/7 keepalive monitoring: persist monitor_enabled and start the feed + now, so alerting evaluates continuously even with no viewer. Survives restarts + (auto-started on boot from the persisted flag).""" + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if cfg: + cfg.monitor_enabled = True + db.commit() from app.monitor import monitor_manager monitor = await monitor_manager.get(unit_id) await monitor.set_keepalive(True) - return {"status": "ok", "unit_id": unit_id, "running": monitor.running, "keepalive": True} + return {"status": "ok", "unit_id": unit_id, "monitor_enabled": True, "running": monitor.running} @router.post("/{unit_id}/monitor/stop") -async def monitor_stop(unit_id: str): - """Drop the keep-alive; the feed stops once no browser subscribers remain.""" +async def monitor_stop(unit_id: str, db: Session = Depends(get_db)): + """Disable keepalive monitoring: persist monitor_enabled=False and drop the + keepalive (the feed stops once no browser subscribers remain).""" + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if cfg: + cfg.monitor_enabled = False + db.commit() from app.monitor import monitor_manager monitor = await monitor_manager.get(unit_id) await monitor.set_keepalive(False) - return {"status": "ok", "unit_id": unit_id, "keepalive": False} + return {"status": "ok", "unit_id": unit_id, "monitor_enabled": False} @router.get("/_monitor/status") @@ -501,6 +511,7 @@ def get_roster(db: Session = Depends(get_db)): "web_enabled": cfg.web_enabled, "poll_enabled": cfg.poll_enabled, "poll_interval_seconds": cfg.poll_interval_seconds, + "monitor_enabled": cfg.monitor_enabled, "status": None } diff --git a/migrate_add_monitor_enabled.py b/migrate_add_monitor_enabled.py new file mode 100644 index 0000000..8853e61 --- /dev/null +++ b/migrate_add_monitor_enabled.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +""" +Migration: add monitor_enabled column to nl43_config. + +Controls whether the live fan-out DOD monitor is kept alive 24/7 for a unit +(which is what makes alerting continuous). Defaults to enabled. Run once per DB. +""" + +import sqlite3 +import sys +from pathlib import Path + +DB_PATH = Path(__file__).parent / "data" / "slmm.db" + + +def migrate(): + if not DB_PATH.exists(): + print(f"Database not found at {DB_PATH}") + print("No migration needed - database will be created with new schema") + return + + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + try: + cursor.execute("PRAGMA table_info(nl43_config)") + columns = [row[1] for row in cursor.fetchall()] + + if "monitor_enabled" in columns: + print("✓ monitor_enabled column already exists, no migration needed") + return + + print("Adding monitor_enabled column (default enabled)...") + # SQLite stores booleans as 0/1; default 1 = enabled. + cursor.execute("ALTER TABLE nl43_config ADD COLUMN monitor_enabled BOOLEAN DEFAULT 1") + conn.commit() + print("✓ Added monitor_enabled column") + print("\n✓ Migration completed successfully!") + + except Exception as e: + conn.rollback() + print(f"✗ Migration failed: {e}", file=sys.stderr) + sys.exit(1) + finally: + conn.close() + + +if __name__ == "__main__": + migrate() -- 2.52.0 From d1d694302c9e05b60c31248f8cec0e557fc94ef9 Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 9 Jun 2026 19:58:30 +0000 Subject: [PATCH 12/20] feat: downsampled DOD trail + history endpoint for live-chart backfill MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit So a viewer sees recent trend on open instead of a blank chart. Viewing only — reports still use the device's FTP .rnd data. - NL43Reading table (auto-creates; no migration): unit_id, timestamp, lp/leq/lmax/ln1/ln2. - Monitor stores one downsampled reading per MONITOR_TRAIL_SAMPLE_S (default 60s) from its keepalive poll loop, pruning rows older than MONITOR_TRAIL_RETENTION_HOURS (default 24h). ~1440 rows/unit max. - GET /api/nl43/{unit}/history?hours=N -> the trail for the last N hours (clamped 0.1-48h), oldest-first. Because keepalive runs 24/7, the trail fills continuously, so the history is there whenever someone opens the live view. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/models.py | 21 +++++++++++++++++++++ app/monitor.py | 29 +++++++++++++++++++++++++++++ app/routers.py | 27 ++++++++++++++++++++++++++- 3 files changed, 76 insertions(+), 1 deletion(-) diff --git a/app/models.py b/app/models.py index 026aad6..c60ec6b 100644 --- a/app/models.py +++ b/app/models.py @@ -128,3 +128,24 @@ class AlertEvent(Base): acknowledged_at = Column(DateTime, nullable=True) acknowledged_by = Column(String, nullable=True) notes = Column(Text, nullable=True) + + +class NL43Reading(Base): + """Downsampled time-series of live-monitor readings, for the live-chart + backfill (so a viewer sees recent trend on open, not a blank chart). + + Viewing only — NOT the report source. Reports use the device's authoritative + FTP .rnd intervals. This is a short, capped trail (one row/minute, pruned to + a retention window) fed by the monitor's keepalive poll loop. + """ + + __tablename__ = "nl43_readings" + + id = Column(Integer, primary_key=True, autoincrement=True) + unit_id = Column(String, index=True, nullable=False) + timestamp = Column(DateTime, default=func.now(), index=True) + lp = Column(String, nullable=True) + leq = Column(String, nullable=True) + lmax = Column(String, nullable=True) + ln1 = Column(String, nullable=True) + ln2 = Column(String, nullable=True) diff --git a/app/monitor.py b/app/monitor.py index 5f7c8b3..a03dca5 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -36,6 +36,12 @@ MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "0.25")) # per-update latency (~2.5s -> ~1.3s). MONITOR_STATE_REFRESH_S = float(os.getenv("MONITOR_STATE_REFRESH_S", "30")) +# Downsampled trail for the live-chart backfill: store one reading per +# TRAIL_SAMPLE_S and keep TRAIL_RETENTION_HOURS of it (pruned). Viewing only — +# reports use the device's FTP .rnd data, not this. +TRAIL_SAMPLE_S = float(os.getenv("MONITOR_TRAIL_SAMPLE_S", "60")) +TRAIL_RETENTION_HOURS = float(os.getenv("MONITOR_TRAIL_RETENTION_HOURS", "24")) + # If nothing has been broadcast in this many seconds (e.g. device offline and # silent), send a keepalive frame so reverse proxies don't drop the idle WS. MONITOR_HEARTBEAT_S = float(os.getenv("MONITOR_HEARTBEAT_S", "25")) @@ -77,6 +83,7 @@ class DeviceMonitor: self._reachable = True # last broadcast reachability (for transition frames) self._cached_state: Optional[str] = None # run state, refreshed periodically self._last_state_refresh = 0.0 + self._last_trail_store = 0.0 # downsample throttle for the backfill trail @property def running(self) -> bool: @@ -190,6 +197,10 @@ class DeviceMonitor: snap.unit_id = self.unit_id persist_snapshot(snap, db) db.commit() + # Append to the downsampled backfill trail (~one row per TRAIL_SAMPLE_S). + if now - self._last_trail_store >= TRAIL_SAMPLE_S: + self._last_trail_store = now + self._store_trail(snap, db) status = db.query(NL43Status).filter_by(unit_id=self.unit_id).first() mst = (status.measurement_start_time.isoformat() if status and status.measurement_start_time else None) @@ -200,6 +211,24 @@ class DeviceMonitor: finally: db.close() + def _store_trail(self, snap, db) -> None: + """Append one downsampled reading to the backfill trail and prune old rows.""" + from datetime import datetime, timedelta + from app.models import NL43Reading + try: + db.add(NL43Reading( + unit_id=self.unit_id, timestamp=datetime.utcnow(), + lp=snap.lp, leq=snap.leq, lmax=snap.lmax, ln1=snap.ln1, ln2=snap.ln2, + )) + cutoff = datetime.utcnow() - timedelta(hours=TRAIL_RETENTION_HOURS) + db.query(NL43Reading).filter( + NL43Reading.unit_id == self.unit_id, + NL43Reading.timestamp < cutoff, + ).delete() + db.commit() + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: trail store failed: {e}") + def _broadcast(self, payload: dict, cache: bool = True) -> None: if cache: self._last_payload = payload # replayed to new subscribers diff --git a/app/routers.py b/app/routers.py index 80aeaaa..aae9a23 100644 --- a/app/routers.py +++ b/app/routers.py @@ -11,7 +11,7 @@ import os import asyncio from app.database import get_db -from app.models import NL43Config, NL43Status, AlertRule, AlertEvent +from app.models import NL43Config, NL43Status, AlertRule, AlertEvent, NL43Reading from app.services import NL43Client, persist_snapshot logger = logging.getLogger(__name__) @@ -330,6 +330,31 @@ async def monitor_status(): return {"status": "ok", "monitors": monitor_manager.status()} +@router.get("/{unit_id}/history") +def get_monitor_history(unit_id: str, hours: float = 2.0, db: Session = Depends(get_db)): + """Recent downsampled monitor readings (the DOD trail) for the live-chart + backfill. Viewing only — NOT the FTP report data.""" + from datetime import timedelta + hours = max(0.1, min(hours, 48.0)) + cutoff = datetime.utcnow() - timedelta(hours=hours) + rows = (db.query(NL43Reading) + .filter(NL43Reading.unit_id == unit_id, NL43Reading.timestamp >= cutoff) + .order_by(NL43Reading.timestamp.asc()).all()) + return { + "status": "ok", + "unit_id": unit_id, + "hours": hours, + "count": len(rows), + "readings": [ + { + "timestamp": r.timestamp.isoformat() if r.timestamp else None, + "lp": r.lp, "leq": r.leq, "lmax": r.lmax, "ln1": r.ln1, "ln2": r.ln2, + } + for r in rows + ], + } + + # ============================================================================ # ALERTS — threshold rules + fired events # ============================================================================ -- 2.52.0 From b4cea2f287d6c46cebfebbacfd82d76d791872b2 Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 9 Jun 2026 22:57:45 +0000 Subject: [PATCH 13/20] feat: include measurement_start_time in cached /status response So consumers (e.g. the command center) can read the elapsed-time clock from the cached status instead of a fresh device /live read. Added to both the GET and POST /status data dicts. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/routers.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/app/routers.py b/app/routers.py index aae9a23..be83f41 100644 --- a/app/routers.py +++ b/app/routers.py @@ -785,6 +785,7 @@ def get_status(unit_id: str, db: Session = Depends(get_db)): "unit_id": unit_id, "last_seen": status.last_seen.isoformat() if status.last_seen else None, "measurement_state": status.measurement_state, + "measurement_start_time": status.measurement_start_time.isoformat() if status.measurement_start_time else None, "lp": status.lp, "leq": status.leq, "lmax": status.lmax, @@ -843,6 +844,7 @@ def upsert_status(unit_id: str, payload: StatusPayload, db: Session = Depends(ge "unit_id": unit_id, "last_seen": status.last_seen.isoformat(), "measurement_state": status.measurement_state, + "measurement_start_time": status.measurement_start_time.isoformat() if status.measurement_start_time else None, "lp": status.lp, "leq": status.leq, "lmax": status.lmax, -- 2.52.0 From 1f5f1fb1f62f3e1a99f05351cf8bd4849c577588 Mon Sep 17 00:00:00 2001 From: serversdown Date: Wed, 10 Jun 2026 06:47:20 +0000 Subject: [PATCH 14/20] feat(monitor): adaptive poll rate, unreachable backoff, device-offline alert Three changes to cut wasted device/cellular load and surface outages: - Adaptive interval: full-rate (~1.25s) while a browser is subscribed for a smooth chart; relaxed cadence (MONITOR_IDLE_POLL_INTERVAL, default 10s) when the feed is keepalive-only (alerting). ~8x fewer polls with no viewer -> ~8x less cellular traffic on a metered SIM. Note: idle interval also sets the alert sampling resolution when nobody is watching. - Exponential backoff when the device is unreachable (1->2->...->60s cap), reset on the first good poll, so a dead/asleep device stops churning reconnects (log spam + wasted SYN traffic). Capped at 5s while a browser is watching so a recovery still surfaces quickly. - Device-offline alert: the reachable->unreachable transition raises a connectivity AlertEvent (sentinel rule_id=0, metric="connectivity") through the existing evaluator/dispatch seam; recovery clears it. Deduped in memory and via the DB (so a restart mid-outage doesn't duplicate the event). MonitorManager.status() now reports reachable + current mode (watched/idle/ backoff) for observability. Co-Authored-By: Claude Opus 4.8 --- app/alerts.py | 65 +++++++++++++++++++++++++++++++++++++++++++++++++- app/monitor.py | 54 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 114 insertions(+), 5 deletions(-) diff --git a/app/alerts.py b/app/alerts.py index 1fb84e7..f5eb142 100644 --- a/app/alerts.py +++ b/app/alerts.py @@ -112,6 +112,7 @@ class AlertEvaluator: def __init__(self): self._states: Dict[Tuple[str, int], RuleState] = {} self._rule_cache: Dict[str, Tuple[float, list]] = {} # unit_id -> (fetched_at, rules) + self._offline_events: Dict[str, int] = {} # unit_id -> open connectivity AlertEvent id logger.info("[ALERT] rule-based evaluator ready") async def evaluate(self, unit_id: str, snap) -> None: @@ -235,9 +236,71 @@ class AlertEvaluator: f"recovered to {value:.1f} dB (peak {peak:.1f} dB)", ) + # -- connectivity (device offline/online) ------------------------------- + # + # Raised by the live monitor when it loses / regains contact with a device. + # Persisted as an AlertEvent (sentinel rule_id=0, metric="connectivity") so it + # lands in the same events/inbox/ack pipeline as threshold alerts. The in-memory + # map dedupes; the DB query also dedupes across a process restart. + + async def device_offline(self, unit_id: str) -> None: + if unit_id in self._offline_events: + return # already flagged offline + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + try: + existing = db.query(AlertEvent).filter_by( + unit_id=unit_id, metric="connectivity", status="active").first() + if existing: # already open in the DB (e.g. carried across a restart) + self._offline_events[unit_id] = existing.id + return + evt = AlertEvent( + rule_id=0, unit_id=unit_id, rule_name="Device unreachable", + metric="connectivity", threshold_db=0.0, status="active", + ) + db.add(evt) + db.commit() + db.refresh(evt) + self._offline_events[unit_id] = evt.id + except Exception as e: + logger.warning(f"[ALERT] failed to record offline for {unit_id}: {e}") + finally: + db.close() + await self._dispatch_raw("OFFLINE", unit_id, "Device unreachable", + "live monitor lost contact with the device") + + async def device_online(self, unit_id: str) -> None: + self._offline_events.pop(unit_id, None) + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + cleared = 0 + try: + opened = db.query(AlertEvent).filter_by( + unit_id=unit_id, metric="connectivity", status="active").all() + for evt in opened: + evt.clear_at = datetime.utcnow() + evt.status = "cleared" + cleared += 1 + if cleared: + db.commit() + except Exception as e: + logger.warning(f"[ALERT] failed to record online for {unit_id}: {e}") + finally: + db.close() + if cleared: # only announce recovery if it was actually flagged offline + await self._dispatch_raw("ONLINE", unit_id, "Device recovered", + "live monitor regained contact with the device") + + # -- event persistence + dispatch --------------------------------------- + async def _dispatch(self, kind: str, unit_id: str, rule, detail: str) -> None: + await self._dispatch_raw(kind, unit_id, rule.name, detail) + + async def _dispatch_raw(self, kind: str, unit_id: str, name: str, detail: str) -> None: """POC dispatch: server log. Swap in a Terra-View webhook (email/SMS) here.""" - logger.warning(f"[ALERT:{kind}] {unit_id} '{rule.name}': {detail}") + logger.warning(f"[ALERT:{kind}] {unit_id} '{name}': {detail}") # Module-level singleton (the monitor calls alert_evaluator.evaluate per snapshot) diff --git a/app/monitor.py b/app/monitor.py index a03dca5..25539bc 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -27,10 +27,27 @@ from app.alerts import alert_evaluator logger = logging.getLogger(__name__) -# Extra idle between DOD polls. The 1s device rate-limit already paces consecutive -# DOD? commands, so this just needs to be small — the rate-limit is the real floor. +# Extra idle between DOD polls WHEN A BROWSER IS WATCHING. The 1s device rate-limit +# already paces consecutive DOD? commands, so this just needs to be small — the +# rate-limit is the real floor (~1.25s/poll effective). MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "0.25")) +# Idle cadence when NO browser is subscribed and the feed is only kept alive for +# alerting. Same data, ~8x fewer polls -> ~8x less cellular traffic on a metered +# SIM (~1 GB/device/month at full rate -> ~125 MB). NOTE: this also sets the alert +# sampling resolution when nobody is watching, so keep it <= the smallest alert +# duration_s you rely on (default 10s comfortably catches a "sustained 30/60s" rule). +MONITOR_IDLE_POLL_INTERVAL = float(os.getenv("MONITOR_IDLE_POLL_INTERVAL", "10")) + +# Exponential backoff once the device is unreachable, so a powered-off / asleep / +# out-of-signal device stops churning reconnects every cycle (log spam + a trickle +# of wasted cellular data on failed SYNs). delay = min(BASE * 2**(fails-1), MAX), +# reset to full-rate on the first good poll. While a browser is actively watching we +# cap the backoff lower (WATCHED_MAX) so a recovery surfaces quickly for the viewer. +MONITOR_BACKOFF_BASE_S = float(os.getenv("MONITOR_BACKOFF_BASE_S", "1")) +MONITOR_BACKOFF_MAX_S = float(os.getenv("MONITOR_BACKOFF_MAX_S", "60")) +MONITOR_BACKOFF_WATCHED_MAX_S = float(os.getenv("MONITOR_BACKOFF_WATCHED_MAX_S", "5")) + # How often to refresh the run state (Measure?). It changes rarely, so we cache it # and skip that second rate-limited command on most polls — roughly halving the # per-update latency (~2.5s -> ~1.3s). @@ -131,6 +148,12 @@ class DeviceMonitor: while self._has_demand(): snap, mst = await self._poll_once() if snap is not None: + if not self._reachable: + # Recovered from an outage — clear the connectivity alert. + try: + await alert_evaluator.device_online(self.unit_id) + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: online alert failed: {e}") self._consec_fail = 0 self._reachable = True payload = _snapshot_payload(snap, self.unit_id, mst) @@ -143,7 +166,8 @@ class DeviceMonitor: logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}") else: # Tell clients the device went offline — once, on transition, after a - # few failures so a momentary blip doesn't flap the UI. + # few failures so a momentary blip doesn't flap the UI. Same edge + # raises the device-offline alert. self._consec_fail += 1 if self._reachable and self._consec_fail >= 3: self._reachable = False @@ -153,6 +177,10 @@ class DeviceMonitor: "feed_status": "unreachable", }) last_send = loop.time() + try: + await alert_evaluator.device_offline(self.unit_id) + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: offline alert failed: {e}") # Heartbeat: during quiet/offline stretches, send a keepalive so an # idle WS isn't dropped by a reverse proxy. Not cached (new subscribers @@ -166,10 +194,23 @@ class DeviceMonitor: }, cache=False) last_send = loop.time() - await asyncio.sleep(MONITOR_POLL_INTERVAL) + await asyncio.sleep(self._next_delay()) finally: logger.info(f"[MONITOR] {self.unit_id}: feed stopped") + def _next_delay(self) -> float: + """Inter-poll delay: exponential backoff while unreachable, full-rate while a + browser is watching, relaxed cadence when the feed is keepalive-only.""" + if self._consec_fail > 0: + shift = min(self._consec_fail - 1, 6) # cap growth at 2**6 = 64x base + delay = min(MONITOR_BACKOFF_BASE_S * (2 ** shift), MONITOR_BACKOFF_MAX_S) + if self._subscribers: + delay = min(delay, MONITOR_BACKOFF_WATCHED_MAX_S) + return delay + if self._subscribers: + return MONITOR_POLL_INTERVAL # a browser is watching — smooth chart + return MONITOR_IDLE_POLL_INTERVAL # keepalive-only (alerting) — save data + async def _poll_once(self): """One DOD poll: read, persist, return (snapshot, measurement_start_iso).""" db = SessionLocal() @@ -267,6 +308,11 @@ class MonitorManager: "running": m.running, "subscribers": m.subscriber_count(), "keepalive": m._keepalive, + "reachable": m._reachable, + # what cadence the loop is currently using, for observability + "mode": ("backoff" if m._consec_fail > 0 + else "watched" if m._subscribers + else "idle"), } for uid, m in self._monitors.items() } -- 2.52.0 From 5bc542e92f5859676a7b8bb3bc17dd8a1bb4d41f Mon Sep 17 00:00:00 2001 From: serversdown Date: Thu, 11 Jun 2026 03:29:16 +0000 Subject: [PATCH 15/20] fix(monitor): quiet send-after-close race on WS disconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a monitor subscriber disconnects mid-frame (the client portal closes its stream on every tab switch via the Page Visibility guard), the loop could pull a queued payload during the 1s wait and then send_json into an already-closing socket -> "Unexpected ASGI message 'websocket.send' after ... websocket.close", logged as a WARNING on every disconnect. Re-check gone.done() after the queue wait and break before sending; treat the residual send-after-close as expected (debug, not warning). No behavior change — the connection was already closing as intended; this just stops the log spam. Co-Authored-By: Claude Opus 4.8 --- app/routers.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/app/routers.py b/app/routers.py index be83f41..ae869bb 100644 --- a/app/routers.py +++ b/app/routers.py @@ -284,11 +284,21 @@ async def monitor_stream(websocket: WebSocket, unit_id: str): payload = await asyncio.wait_for(queue.get(), timeout=1.0) except asyncio.TimeoutError: continue # re-check gone.done() + if gone.done(): + break # client disconnected while we waited — don't send into a closing socket await websocket.send_json(payload) except WebSocketDisconnect: logger.info(f"Monitor subscriber disconnected for {unit_id}") except Exception as e: - logger.warning(f"Monitor stream error for {unit_id}: {e}") + # A frame that races the close (client vanished mid-send) surfaces as + # "Unexpected ASGI message 'websocket.send' after ... websocket.close". + # That's expected on disconnect (the portal closes the socket on every tab + # switch), not an error — log it quietly. + msg = str(e) + if "after sending" in msg or "websocket.close" in msg or "response already completed" in msg: + logger.debug(f"Monitor stream for {unit_id} closed mid-send (client gone)") + else: + logger.warning(f"Monitor stream error for {unit_id}: {e}") finally: gone.cancel() await monitor.unsubscribe(queue) -- 2.52.0 From b51fefca2b491b6425a747886985f57c34d761e7 Mon Sep 17 00:00:00 2001 From: serversdown Date: Thu, 11 Jun 2026 19:36:16 +0000 Subject: [PATCH 16/20] feat(alerts): enabled rules pin the monitor on (24/7 evaluation) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The evaluator only runs inside the monitor loop, so a rule on an idle device never fired. Now creating/updating/deleting an alert rule calls _sync_keepalive_to_rules: if the unit has any enabled rule, persist NL43Config.monitor_enabled=True (so the boot auto-start re-enables it after a restart) and turn on runtime keepalive. Never auto-OFF — a device may be kept alive for other reasons; operators control that on /admin/slmm. Alert CRUD endpoints are now async to await the monitor manager. Co-Authored-By: Claude Opus 4.8 --- app/routers.py | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/app/routers.py b/app/routers.py index ae869bb..ac9d72b 100644 --- a/app/routers.py +++ b/app/routers.py @@ -409,14 +409,34 @@ def _event_dict(e: AlertEvent) -> dict: } +async def _sync_keepalive_to_rules(unit_id: str, db: Session): + """Keep a unit's monitor running while it has enabled alert rules, so the + evaluator runs 24/7 even with no browser watching. Turns keepalive ON (and + persists monitor_enabled so it survives a restart via the boot auto-start) + when enabled rules exist; never turns it OFF — a device may be kept alive for + other reasons, so operators control that on /admin/slmm.""" + has_enabled = (db.query(AlertRule) + .filter_by(unit_id=unit_id, enabled=True).first() is not None) + if not has_enabled: + return + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if cfg and not cfg.monitor_enabled: + cfg.monitor_enabled = True + db.commit() + from app.monitor import monitor_manager + m = await monitor_manager.get(unit_id) + await m.set_keepalive(True) + + @router.post("/{unit_id}/alerts/rules") -def create_alert_rule(unit_id: str, payload: AlertRulePayload, db: Session = Depends(get_db)): +async def create_alert_rule(unit_id: str, payload: AlertRulePayload, db: Session = Depends(get_db)): rule = AlertRule(unit_id=unit_id, **payload.model_dump()) db.add(rule) db.commit() db.refresh(rule) from app.alerts import alert_evaluator alert_evaluator.invalidate(unit_id) + await _sync_keepalive_to_rules(unit_id, db) return {"status": "ok", "rule": _rule_dict(rule)} @@ -427,7 +447,7 @@ def list_alert_rules(unit_id: str, db: Session = Depends(get_db)): @router.put("/{unit_id}/alerts/rules/{rule_id}") -def update_alert_rule(unit_id: str, rule_id: int, payload: AlertRulePayload, db: Session = Depends(get_db)): +async def update_alert_rule(unit_id: str, rule_id: int, payload: AlertRulePayload, db: Session = Depends(get_db)): rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first() if not rule: raise HTTPException(status_code=404, detail="Alert rule not found") @@ -437,11 +457,12 @@ def update_alert_rule(unit_id: str, rule_id: int, payload: AlertRulePayload, db: db.refresh(rule) from app.alerts import alert_evaluator alert_evaluator.invalidate(unit_id) + await _sync_keepalive_to_rules(unit_id, db) return {"status": "ok", "rule": _rule_dict(rule)} @router.delete("/{unit_id}/alerts/rules/{rule_id}") -def delete_alert_rule(unit_id: str, rule_id: int, db: Session = Depends(get_db)): +async def delete_alert_rule(unit_id: str, rule_id: int, db: Session = Depends(get_db)): rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first() if not rule: raise HTTPException(status_code=404, detail="Alert rule not found") @@ -449,6 +470,7 @@ def delete_alert_rule(unit_id: str, rule_id: int, db: Session = Depends(get_db)) db.commit() from app.alerts import alert_evaluator alert_evaluator.invalidate(unit_id) + await _sync_keepalive_to_rules(unit_id, db) # no-op if no enabled rules remain return {"status": "ok", "deleted": rule_id} -- 2.52.0 From cfdeada9d6e100d2b79f17571f732015f96047ed Mon Sep 17 00:00:00 2001 From: serversdown Date: Thu, 11 Jun 2026 22:47:39 +0000 Subject: [PATCH 17/20] fix(alerts): enforce cooldown_s between onsets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cooldown_s was stored + shown in the UI but never read, so a repeatedly-breaching signal (e.g. intermittent traffic noise) would flood the alert history with an event per spike. The evaluator now suppresses a new onset within cooldown_s of the last, holding the edge so it fires the moment the window lapses if still breaching. Hysteresis still gates clears. getattr-guarded so partial rule fixtures don't crash. Verified: existing 4 evaluator tests pass; cooldown scenario (onset → clear → suppressed re-breach → onset after window) passes. Co-Authored-By: Claude Opus 4.8 --- app/alerts.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/app/alerts.py b/app/alerts.py index f5eb142..cb89e2d 100644 --- a/app/alerts.py +++ b/app/alerts.py @@ -40,6 +40,7 @@ class RuleState: edge_since: Optional[float] = None # when the current edge condition began (clock time) peak: float = 0.0 event_id: Optional[int] = None # the open AlertEvent row (for the clear update) + last_onset: Optional[float] = None # time of the last onset (for cooldown) def _exceeds(value: float, rule) -> bool: @@ -68,9 +69,17 @@ def _evaluate_step(state: RuleState, value: float, now: float, rule) -> Optional if state.edge_since is None: state.edge_since = now if now - state.edge_since >= duration: + # Cooldown: suppress a new onset within cooldown_s of the last one + # (stops a repeatedly-breaching signal from flooding the history). + # Hold edge_since so it fires the moment cooldown lapses if still + # breaching — don't reset it here. + cooldown = getattr(rule, "cooldown_s", 0) or 0 + if state.last_onset is not None and (now - state.last_onset) < cooldown: + return None state.phase = "active" state.edge_since = None state.peak = value + state.last_onset = now return "onset" else: state.edge_since = None -- 2.52.0 From ad6071b79031d237c39cfaeed32573ee1b131554 Mon Sep 17 00:00:00 2001 From: serversdown Date: Thu, 11 Jun 2026 23:40:52 +0000 Subject: [PATCH 18/20] fix(alerts): reset rule state + close open event on rule edit/delete MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit invalidate() only dropped the rule cache, not the per-(unit,rule) state machine — so editing a rule's metric/threshold left a stale 'active' phase that mis-evaluated against the new config (spurious clear, or suppressed onset), and deleting an in-alarm rule left an open AlertEvent that kept the client portal stuck "in alarm" forever. update/delete now call _reset_rule_runtime: forget_rule() drops the state machine and any open event for that rule is closed. Verified: existing evaluator tests + cooldown scenario still pass; compiles. Co-Authored-By: Claude Opus 4.8 --- app/alerts.py | 6 ++++++ app/routers.py | 15 +++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/app/alerts.py b/app/alerts.py index cb89e2d..062730c 100644 --- a/app/alerts.py +++ b/app/alerts.py @@ -175,6 +175,12 @@ class AlertEvaluator: else: self._rule_cache.pop(unit_id, None) + def forget_rule(self, unit_id: str, rule_id: int) -> None: + """Drop a rule's per-(unit, rule) state machine after the rule is edited or + deleted, so a stale 'active' phase / open event_id from the old config + doesn't bleed into the new one (mis-firing a clear or suppressing an onset).""" + self._states.pop((unit_id, rule_id), None) + # -- scheduling ---------------------------------------------------------- def _in_schedule(self, rule) -> bool: diff --git a/app/routers.py b/app/routers.py index ac9d72b..7764b48 100644 --- a/app/routers.py +++ b/app/routers.py @@ -428,6 +428,19 @@ async def _sync_keepalive_to_rules(unit_id: str, db: Session): await m.set_keepalive(True) +def _reset_rule_runtime(unit_id: str, rule_id: int, db: Session): + """After a rule edit/delete: drop its evaluator state machine and close any open + event, so a stale 'active' phase doesn't mis-evaluate against the new config and + the client portal doesn't stay 'in alarm' on a rule that changed or is gone.""" + from app.alerts import alert_evaluator + alert_evaluator.forget_rule(unit_id, rule_id) + now = datetime.utcnow() + for evt in db.query(AlertEvent).filter_by(unit_id=unit_id, rule_id=rule_id, status="active").all(): + evt.clear_at = now + evt.status = "cleared" + db.commit() + + @router.post("/{unit_id}/alerts/rules") async def create_alert_rule(unit_id: str, payload: AlertRulePayload, db: Session = Depends(get_db)): rule = AlertRule(unit_id=unit_id, **payload.model_dump()) @@ -457,6 +470,7 @@ async def update_alert_rule(unit_id: str, rule_id: int, payload: AlertRulePayloa db.refresh(rule) from app.alerts import alert_evaluator alert_evaluator.invalidate(unit_id) + _reset_rule_runtime(unit_id, rule_id, db) await _sync_keepalive_to_rules(unit_id, db) return {"status": "ok", "rule": _rule_dict(rule)} @@ -470,6 +484,7 @@ async def delete_alert_rule(unit_id: str, rule_id: int, db: Session = Depends(ge db.commit() from app.alerts import alert_evaluator alert_evaluator.invalidate(unit_id) + _reset_rule_runtime(unit_id, rule_id, db) # close its open event so the portal doesn't stay red await _sync_keepalive_to_rules(unit_id, db) # no-op if no enabled rules remain return {"status": "ok", "deleted": rule_id} -- 2.52.0 From 6d1c426ee431d202b01299ec5ef348f967cda727 Mon Sep 17 00:00:00 2001 From: serversdown Date: Sun, 21 Jun 2026 20:22:39 +0000 Subject: [PATCH 19/20] fix: recognize 'Start' state when confirming measurement start The /start handler waited for measurement_state == "Measure", but the device reports "Start" while measuring. The confirmation check therefore never matched, so the post-start status loop always ran its full 3x DOD retry cycle over cellular, pushing the call past ~10s. That blew past the Terra-View proxy's request timeout and surfaced to users as a misleading "Unknown error" even though the unit had already started recording. Match the device's actual reported state (and stay consistent with persist_snapshot's MEASURING_STATES handling) so /start confirms on the first attempt and returns promptly. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/routers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/routers.py b/app/routers.py index 7764b48..ac455ad 100644 --- a/app/routers.py +++ b/app/routers.py @@ -939,7 +939,7 @@ async def start_measurement(unit_id: str, db: Session = Depends(get_db)): db.expire_all() status = db.query(NL43Status).filter_by(unit_id=unit_id).first() logger.info(f"State check: measurement_state={status.measurement_state if status else 'None'}, start_time={status.measurement_start_time if status else 'None'}") - if status and status.measurement_state == "Measure" and status.measurement_start_time: + if status and status.measurement_state in ("Start", "Measure") and status.measurement_start_time: logger.info(f"✓ Measurement state confirmed for {unit_id} with start time {status.measurement_start_time}") break -- 2.52.0 From 43b8e53d2d7961ffb68c0313ce46c7feb7d1c8e5 Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 22 Jun 2026 20:54:43 +0000 Subject: [PATCH 20/20] chore: version bump --- CHANGELOG.md | 46 +++++++++++++++++++++++++++ README.md | 87 ++++++++++++++++++++++++++++++++++++++++++++++++---- app/main.py | 2 +- 3 files changed, 128 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d83bab1..35fcc12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,52 @@ All notable changes to SLMM (Sound Level Meter Manager) will be documented in th The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.4.0] - 2026-06-22 + +### Added + +#### Live Monitor (fan-out feed) +- **Per-device fan-out monitor** - one shared, cached live feed per device. Multiple clients (dashboards, portal, charts) subscribe to the same stream instead of each fighting for the NL-43's single TCP connection: one poller reads the device, all subscribers get the same frames. +- **WebSocket monitor** - `WS /api/nl43/{unit_id}/monitor` delivers an instant first frame from cache, then live updates. +- **Monitor control** - `POST /api/nl43/{unit_id}/monitor/{start|stop}`, `GET /api/nl43/_monitor/status`. A persistent `monitor_enabled` flag auto-starts the keepalive on boot. +- **Adaptive polling** - poll rate adapts to demand; unreachable devices back off; a device-offline alert fires when a monitored unit drops. +- **De-duplication** - the background poller skips units already covered by an active monitor (no double-polling); a heartbeat keeps the feed warm. +- **Lower latency** - the monitor caches run state, roughly halving live-feed latency; fan-out emits an instant first frame + offline status to new clients. + +#### Alert Engine +- **Threshold rules** - per-device alert rules (metric + threshold + cooldown) with full CRUD: `POST/GET/PUT/DELETE /api/nl43/{unit_id}/alerts/rules[/{rule_id}]`. +- **Events + state machine** - onset/clear tracking via `GET /api/nl43/{unit_id}/alerts/events`; acknowledge with `POST .../events/{event_id}/ack`. A `cooldown_s` is enforced between onsets. +- **24/7 evaluation** - enabled rules pin the monitor on, so rules evaluate continuously even with no UI client connected. +- **Resilience** - editing or deleting a rule resets its state and closes any open event; device-offline events are raised when a monitored unit goes unreachable. + +#### Data & History +- **Live-chart backfill** - a downsampled DOD trail is persisted to a new `nl43_readings` table, exposed via `GET /api/nl43/{unit_id}/history` so charts can backfill recent history on load. +- **LN1/LN2 percentiles** - L1/L10 (configurable percentiles) surfaced through SLMM in the status and live-feed payloads. +- **measurement_start_time** included in the cached `/status` response. + +#### Device control +- **Per-device disconnect** - `POST /api/nl43/{unit_id}/disconnect` drops a device's pooled connection. +- **Deactivate / standby** - `POST /api/nl43/{unit_id}/deactivate` and global `POST /api/nl43/_system/standby` to quiesce polling/monitoring. + +### Changed +- **DRD streaming reuses the pooled connection** rather than opening a separate socket, avoiding contention with the persistent pool on a single-connection device. +- **Connection pool** - idle-TTL / max-age checks can now be disabled; pool status is logged periodically. + +### Fixed +- **Measurement-start confirmation** - `/start` now recognizes the device's `Start` state. It previously waited for `Measure`, which never matched, so the start cycle ran the full retry loop and Terra-View's proxy timed out with a misleading "Unknown error" even though the device had started. +- **Garbled reads** - corrupted measurement-state reads that produced phantom STOPPED/STARTED transitions are now ignored. +- **DOD parsing** - corrected field parsing and stopped spurious measurement-time resets. +- **Monitor WebSocket** - quieted a send-after-close race on client disconnect. + +### Database +- **New tables** (auto-created on startup via `Base.metadata.create_all`): `alert_rules`, `alert_events`, `nl43_readings`. +- **Migrations for existing tables** (run once per database): `migrate_add_ln_percentiles.py` (LN1/LN2 on `nl43_status`), `migrate_add_monitor_enabled.py` (`monitor_enabled` on `nl43_config`). + +### Notes +- Pairs with the matching Terra-View `dev` build, which reads SLMM's `/monitor` fan-out feed for live SLM dashboards (L1/L10 lines, live-chart backfill). Ship the two together. + +--- + ## [0.3.0] - 2026-02-17 ### Added diff --git a/README.md b/README.md index 441a1e6..115c645 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # SLMM - Sound Level Meter Manager -**Version 0.3.0** +**Version 0.4.0** Backend API service for controlling and monitoring Rion NL-43/NL-53 Sound Level Meters via TCP and FTP protocols. @@ -12,6 +12,9 @@ SLMM is a standalone backend module that provides REST API routing and command t ## Features +- **Live Monitor (fan-out)**: One shared cached live feed per device — many clients subscribe to the same stream instead of fighting over the meter's single TCP connection +- **Alert Engine**: Per-device threshold rules with onset/clear events, cooldowns, acks, and 24/7 evaluation +- **History & Percentiles**: Downsampled DOD trail + history endpoint for live-chart backfill; LN1/LN2 (L1/L10) percentiles surfaced through the feed - **Persistent TCP Connections**: Cached per-device connections with OS-level keepalive, tuned for cellular modem reliability - **Background Polling**: Continuous automatic polling of devices with configurable intervals - **Offline Detection**: Automatic device reachability tracking with failure counters @@ -44,6 +47,30 @@ SLMM is a standalone backend module that provides REST API routing and command t └──────────────┘ ``` +### Live Monitor — Fan-Out Feed (v0.4.0) + +The NL-43 allows only one TCP control connection at a time, so multiple clients +polling the same device directly would contend for it. The monitor solves this +with a single shared, cached feed per device: + +- **One reader, many subscribers**: a single poller reads the device; every + WebSocket subscriber (`WS /api/nl43/{unit_id}/monitor`) receives the same + frames — an instant first frame from cache, then live updates. +- **Persistent + auto-start**: a `monitor_enabled` flag keeps the feed running + and auto-starts it on boot. Enabled alert rules pin the monitor on for 24/7 + evaluation even with no UI connected. +- **Adaptive & deduplicated**: poll rate adapts to demand, unreachable devices + back off, and the background poller skips units already covered by a monitor. + +### Alert Engine (v0.4.0) + +Per-device threshold alerting evaluated against the live feed: + +- **Rules**: metric + threshold + `cooldown_s`, full CRUD per device +- **Events**: onset/clear state machine, acknowledgement, and a device-offline + alert when a monitored unit drops +- **Robust**: editing/deleting a rule resets its state and closes open events + ### Persistent TCP Connection Pool (v0.3.0) SLMM maintains persistent TCP connections to devices with OS-level keepalive, designed for reliable operation over cellular modems: @@ -145,8 +172,32 @@ Logs are written to: |--------|----------|-------------| | GET | `/api/nl43/{unit_id}/status` | Get cached measurement snapshot (updated by background poller) | | GET | `/api/nl43/{unit_id}/live` | Request fresh DOD data from device (bypasses cache) | +| GET | `/api/nl43/{unit_id}/history` | Downsampled DOD trail for live-chart backfill | | WS | `/api/nl43/{unit_id}/stream` | WebSocket stream for real-time DRD data | +### Live Monitor (fan-out feed) + +| Method | Endpoint | Description | +|--------|----------|-------------| +| WS | `/api/nl43/{unit_id}/monitor` | Subscribe to the shared cached live feed (instant first frame) | +| POST | `/api/nl43/{unit_id}/monitor/start` | Start the device's monitor feed | +| POST | `/api/nl43/{unit_id}/monitor/stop` | Stop the device's monitor feed | +| GET | `/api/nl43/_monitor/status` | Global monitor status across devices | +| POST | `/api/nl43/{unit_id}/disconnect` | Drop the device's pooled TCP connection | +| POST | `/api/nl43/{unit_id}/deactivate` | Quiesce polling/monitoring for one device | +| POST | `/api/nl43/_system/standby` | Global standby — quiesce all polling/monitoring | + +### Alerts + +| Method | Endpoint | Description | +|--------|----------|-------------| +| GET | `/api/nl43/{unit_id}/alerts/rules` | List alert rules for a device | +| POST | `/api/nl43/{unit_id}/alerts/rules` | Create an alert rule (metric, threshold, cooldown) | +| PUT | `/api/nl43/{unit_id}/alerts/rules/{rule_id}` | Update a rule (resets its state, closes open events) | +| DELETE | `/api/nl43/{unit_id}/alerts/rules/{rule_id}` | Delete a rule | +| GET | `/api/nl43/{unit_id}/alerts/events` | List alert events (onset/clear) | +| POST | `/api/nl43/{unit_id}/alerts/events/{event_id}/ack` | Acknowledge an event | + ### Background Polling | Method | Endpoint | Description | @@ -273,11 +324,35 @@ Caches latest measurement snapshot: - `sd_remaining_mb`: Free SD card space (MB) - `sd_free_ratio`: SD card free space ratio - `raw_payload`: Raw device response data -- `is_reachable`: Device reachability status (Boolean) ⭐ NEW -- `consecutive_failures`: Count of consecutive poll failures ⭐ NEW -- `last_poll_attempt`: Last time background poller attempted to poll ⭐ NEW -- `last_success`: Last successful poll timestamp ⭐ NEW -- `last_error`: Last error message (truncated to 500 chars) ⭐ NEW +- `is_reachable`: Device reachability status (Boolean) +- `consecutive_failures`: Count of consecutive poll failures +- `last_poll_attempt`: Last time background poller attempted to poll +- `last_success`: Last successful poll timestamp +- `last_error`: Last error message (truncated to 500 chars) +- `ln1` / `ln2`: LN1/LN2 (L1/L10) percentile levels ⭐ v0.4.0 + +### NL43Readings Table ⭐ v0.4.0 +Downsampled DOD trail backing the live-chart history endpoint (one row/minute, +pruned to a retention window — viewing only, not the report source): +- `id` (PK), `unit_id`, `timestamp` +- `lp` / `leq` / `lmax` / `ln1` / `ln2`: cached level samples + +### AlertRule Table ⭐ v0.4.0 +Per-device threshold alert rules: +- `id` (PK), `unit_id`, `name`, `enabled` +- `metric`, `comparison` (above/below), `threshold_db`, `clear_margin_db` (hysteresis) +- `duration_s` (sustained), `cooldown_s` (min seconds between onsets) +- `channels` / `recipients`, optional `schedule_start`/`schedule_end`/`schedule_days` + +### AlertEvent Table ⭐ v0.4.0 +Alert onset/clear events for history, inbox, and acknowledgement: +- `id` (PK), `unit_id`, `rule_id`, `rule_name`, `metric`, `threshold_db` +- `onset_at` / `onset_value`, `peak_value`, `clear_at`, `status` (active/cleared) +- `acknowledged_at` / `acknowledged_by`, `notes` + +> New tables (`alert_rules`, `alert_events`, `nl43_readings`) auto-create on +> startup. Existing-table columns ship with migrations: +> `migrate_add_ln_percentiles.py`, `migrate_add_monitor_enabled.py`. ## Protocol Details diff --git a/app/main.py b/app/main.py index 74f94c7..d0b6145 100644 --- a/app/main.py +++ b/app/main.py @@ -71,7 +71,7 @@ async def lifespan(app: FastAPI): app = FastAPI( title="SLMM NL43 Addon", description="Standalone module for NL43 configuration and status APIs with background polling", - version="0.3.0", + version="0.4.0", lifespan=lifespan, ) -- 2.52.0