From a7983d29589d8bcde3a6b6eac532b77dc78aad32 Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 21:53:00 +0000 Subject: [PATCH 1/9] fix: correct DOD field parsing and stop measurement-time resets Two device-data bugs surfaced while scoping the live-feed work: 1. DOD parser misalignment. DOD's response has no leading counter and includes LE + LN1-LN5, but the parser reused the DRD field map (parts[0]=counter). That shifted everything: Lp was stored as the counter, Leq as Lp, LE as Leq, and LN1 as Lpeak (visible because "Lpeak" came out below Lmax, which is impossible). Parse DOD with its own map: Lp=0, Leq=1, Lmax=3, Lmin=4, Lpeak=10 (channel 1 = main). 2. measurement_start_time reset on every live-stream open/close. The DOD path tags state "Start"; the DRD stream path tags "Measure". The transition detector treated only "Start" as measuring, so opening the stream ("Start"->"Measure") read as a stop (cleared start time) and closing it ("Measure"->"Start") read as a start (reset to now). Every viewer reset the elapsed measurement time. Treat {"Start","Measure"} both as measuring. LN1/LN2 (L1/L10) parsing + model/serialization is the next step. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/services.py | 42 +++++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/app/services.py b/app/services.py index 9375c5c..33e947b 100644 --- a/app/services.py +++ b/app/services.py @@ -69,10 +69,16 @@ def persist_snapshot(s: NL43Snapshot, db: Session): logger.info(f"State transition check for {s.unit_id}: '{previous_state}' -> '{new_state}'") - # Device returns "Start" when measuring, "Stop" when stopped - # Normalize to previous behavior for backward compatibility - is_measuring = new_state == "Start" - was_measuring = previous_state == "Start" + # The device reports "Start" while measuring; the DOD path uses that string, + # but the DRD stream path tags snapshots "Measure" (and the DOD fallback also + # uses "Measure"). Treat ALL of these as "measuring" — otherwise opening and + # closing the live stream flips state "Start"->"Measure"->"Start", which the + # old equality check misread as stop-then-start and RESET measurement_start_time + # every single time (the "elapsed time keeps resetting / shows wrong value on + # another computer" bug — and each extra viewer made it worse). + MEASURING_STATES = {"Start", "Measure"} + is_measuring = new_state in MEASURING_STATES + was_measuring = previous_state in MEASURING_STATES if not was_measuring and is_measuring: # Measurement just started - record the start time @@ -691,22 +697,28 @@ class NL43Client: snap = NL43Snapshot(unit_id="", raw_payload=resp, measurement_state=measurement_state) - # Parse known positions (based on NL43 communication guide - DRD format) - # DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ... + # Parse DOD positional fields. DOD's layout is DIFFERENT from DRD: it has NO + # leading counter and it includes LE plus LN1–LN5. The device returns 4 channels + # of 16 fields each — [Lp, Leq, LE, Lmax, Lmin, LN1, LN2, LN3, LN4, LN5, Lpeak, + # LIeq, Leq_mov, Ltm5, over, under] — and channel 1 (parts[0:16]) is the main + # display. The previous code reused the DRD map (treating parts[0] as a counter), + # which shifted everything: Lp was reported as the counter, Leq as Lp, LE as Leq, + # and LN1 as Lpeak (you could spot it because "Lpeak" came out < Lmax). try: - # Capture d0 (counter) for timer synchronization if len(parts) >= 1: - snap.counter = parts[0] # d0: Measurement interval counter (1-600) + snap.lp = parts[0] # Lp: instantaneous sound pressure level if len(parts) >= 2: - snap.lp = parts[1] # d1: Instantaneous sound pressure level - if len(parts) >= 3: - snap.leq = parts[2] # d2: Equivalent continuous sound level + snap.leq = parts[1] # Leq: equivalent continuous level + # parts[2] = LE (sound exposure level) — not currently surfaced if len(parts) >= 4: - snap.lmax = parts[3] # d3: Maximum level + snap.lmax = parts[3] # Lmax if len(parts) >= 5: - snap.lmin = parts[4] # d4: Minimum level - if len(parts) >= 6: - snap.lpeak = parts[5] # d5: Peak level + snap.lmin = parts[4] # Lmin + if len(parts) >= 11: + snap.lpeak = parts[10] # Lpeak (parts[5] is LN1, NOT Lpeak) + # LN1/LN2 percentiles live at parts[5]/parts[6] (the L1/L10 display contract). + # Surfaced as snap.ln1/snap.ln2 once those fields are added to the snapshot + # dataclass + NL43Status model — next step on this branch. except (IndexError, ValueError) as e: logger.warning(f"Error parsing DOD data points: {e}") -- 2.52.0 From 51dd6b682da055e837f587663dfaf0c2692ba3a1 Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 22:01:31 +0000 Subject: [PATCH 2/9] feat: surface LN1/LN2 (L1/L10) percentiles through SLMM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completes the SLMM side of the L1/L10 live-display contract. The NL-43's DOD response carries percentile slots LN1-LN5 (channel 1, parts[5]/[6]); parse the first two and expose them as ln1/ln2 end to end: - NL43Snapshot dataclass: ln1/ln2 fields - NL43Status model: ln1/ln2 columns (+ migrate_add_ln_percentiles.py) - DOD parser: snap.ln1=parts[5], snap.ln2=parts[6] - persist_snapshot writes them - all /status data dicts, StatusPayload, and the DRD stream payload emit ln1/ln2 (null on the DRD stream itself, which doesn't carry percentiles) Labels: device LN1 defaults to L5, not L1 — Terra-View defaults the label to L1/L10, so the device's Ln1/Ln2 slots must be set to 1%/10% for the labels to be accurate (dynamic label emission is a follow-up). Co-Authored-By: Claude Opus 4.8 (1M context) --- app/models.py | 2 ++ app/routers.py | 10 ++++++ app/services.py | 11 +++++-- migrate_add_ln_percentiles.py | 58 +++++++++++++++++++++++++++++++++++ 4 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 migrate_add_ln_percentiles.py diff --git a/app/models.py b/app/models.py index 4c86514..906043f 100644 --- a/app/models.py +++ b/app/models.py @@ -41,6 +41,8 @@ class NL43Status(Base): lmax = Column(String, nullable=True) # Maximum level lmin = Column(String, nullable=True) # Minimum level lpeak = Column(String, nullable=True) # Peak level + ln1 = Column(String, nullable=True) # Percentile slot LN1 (configurable; device default L5, contract L1) + ln2 = Column(String, nullable=True) # Percentile slot LN2 (configurable; device default L10) battery_level = Column(String, nullable=True) power_source = Column(String, nullable=True) sd_remaining_mb = Column(String, nullable=True) diff --git a/app/routers.py b/app/routers.py index a21c928..9ee6bae 100644 --- a/app/routers.py +++ b/app/routers.py @@ -450,6 +450,8 @@ def get_status(unit_id: str, db: Session = Depends(get_db)): "lmax": status.lmax, "lmin": status.lmin, "lpeak": status.lpeak, + "ln1": status.ln1, + "ln2": status.ln2, "battery_level": status.battery_level, "power_source": status.power_source, "sd_remaining_mb": status.sd_remaining_mb, @@ -472,6 +474,8 @@ class StatusPayload(BaseModel): lmax: str | None = None lmin: str | None = None lpeak: str | None = None + ln1: str | None = None + ln2: str | None = None battery_level: str | None = None power_source: str | None = None sd_remaining_mb: str | None = None @@ -504,6 +508,8 @@ def upsert_status(unit_id: str, payload: StatusPayload, db: Session = Depends(ge "lmax": status.lmax, "lmin": status.lmin, "lpeak": status.lpeak, + "ln1": status.ln1, + "ln2": status.ln2, "battery_level": status.battery_level, "power_source": status.power_source, "sd_remaining_mb": status.sd_remaining_mb, @@ -1205,6 +1211,8 @@ async def stream_live(websocket: WebSocket, unit_id: str): "lmax": snap.lmax, # Maximum level "lmin": snap.lmin, # Minimum level "lpeak": snap.lpeak, # Peak level + "ln1": snap.ln1, # LN1 percentile (L1/L10 contract); null on DRD stream + "ln2": snap.ln2, # LN2 percentile; null on DRD stream "raw_payload": snap.raw_payload, }) except Exception as e: @@ -1876,6 +1884,8 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)): "lmax": status.lmax, "lmin": status.lmin, "lpeak": status.lpeak, + "ln1": status.ln1, + "ln2": status.ln2, "battery_level": status.battery_level, "power_source": status.power_source, "sd_remaining_mb": status.sd_remaining_mb, diff --git a/app/services.py b/app/services.py index 33e947b..5e15a06 100644 --- a/app/services.py +++ b/app/services.py @@ -46,6 +46,8 @@ class NL43Snapshot: lmax: Optional[str] = None # Maximum level lmin: Optional[str] = None # Minimum level lpeak: Optional[str] = None # Peak level + ln1: Optional[str] = None # Percentile slot LN1 (configurable; device default L5, contract L1) + ln2: Optional[str] = None # Percentile slot LN2 (configurable; device default L10) battery_level: Optional[str] = None power_source: Optional[str] = None sd_remaining_mb: Optional[str] = None @@ -108,6 +110,8 @@ def persist_snapshot(s: NL43Snapshot, db: Session): row.lmax = s.lmax row.lmin = s.lmin row.lpeak = s.lpeak + row.ln1 = s.ln1 + row.ln2 = s.ln2 row.battery_level = s.battery_level row.power_source = s.power_source row.sd_remaining_mb = s.sd_remaining_mb @@ -716,9 +720,10 @@ class NL43Client: snap.lmin = parts[4] # Lmin if len(parts) >= 11: snap.lpeak = parts[10] # Lpeak (parts[5] is LN1, NOT Lpeak) - # LN1/LN2 percentiles live at parts[5]/parts[6] (the L1/L10 display contract). - # Surfaced as snap.ln1/snap.ln2 once those fields are added to the snapshot - # dataclass + NL43Status model — next step on this branch. + if len(parts) >= 6: + snap.ln1 = parts[5] # LN1 percentile slot (device default L5; contract L1) + if len(parts) >= 7: + snap.ln2 = parts[6] # LN2 percentile slot (device default L10) except (IndexError, ValueError) as e: logger.warning(f"Error parsing DOD data points: {e}") diff --git a/migrate_add_ln_percentiles.py b/migrate_add_ln_percentiles.py new file mode 100644 index 0000000..2e27b34 --- /dev/null +++ b/migrate_add_ln_percentiles.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +""" +Migration script to add ln1 and ln2 percentile columns to the nl43_status table. + +The NL-43 DOD response carries percentile slots LN1-LN5; the live SLM display +(Terra-View) shows two of them (default L1/L10). This adds storage for the two +surfaced slots. Run once per database to update existing schema. +""" + +import sqlite3 +import sys +from pathlib import Path + +DB_PATH = Path(__file__).parent / "data" / "slmm.db" + + +def migrate(): + """Add ln1 and ln2 columns to the nl43_status table.""" + + if not DB_PATH.exists(): + print(f"Database not found at {DB_PATH}") + print("No migration needed - database will be created with new schema") + return + + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + try: + cursor.execute("PRAGMA table_info(nl43_status)") + columns = [row[1] for row in cursor.fetchall()] + + if "ln1" in columns and "ln2" in columns: + print("✓ ln1/ln2 columns already exist, no migration needed") + return + + if "ln1" not in columns: + print("Adding ln1 column...") + cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln1 TEXT") + print("✓ Added ln1 column") + + if "ln2" not in columns: + print("Adding ln2 column...") + cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln2 TEXT") + print("✓ Added ln2 column") + + conn.commit() + print("\n✓ Migration completed successfully!") + + except Exception as e: + conn.rollback() + print(f"✗ Migration failed: {e}", file=sys.stderr) + sys.exit(1) + finally: + conn.close() + + +if __name__ == "__main__": + migrate() -- 2.52.0 From 0793e7df01d066541a1026ad80ee5f97b30199b8 Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 22:40:56 +0000 Subject: [PATCH 3/9] feat: add per-device disconnect endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit POST /api/nl43/{unit_id}/disconnect cleanly closes (TCP FIN + wait_closed) and drops the pooled connection for a single device, freeing the NL43's one connection slot. Previously only /_connections/flush existed, which tears down every device at once. Idempotent; no-op if nothing is cached. Releases the idle pooled connection only — an active DRD stream/command has the socket checked out of the pool, so close the stream WebSocket to end a live stream. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/routers.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/app/routers.py b/app/routers.py index 9ee6bae..98673de 100644 --- a/app/routers.py +++ b/app/routers.py @@ -121,6 +121,38 @@ async def flush_connection_pool(): return {"status": "ok", "message": "All cached connections closed"} +@router.post("/{unit_id}/disconnect") +async def disconnect_device(unit_id: str, db: Session = Depends(get_db)): + """Cleanly close SLMM's persistent TCP connection to a single device. + + Gracefully closes (TCP FIN + wait_closed) the pooled connection for this + device and removes it from the pool, freeing the NL43's single connection + slot. Idempotent — a no-op if no connection is currently cached. + + Note: this releases the *idle* pooled connection. It does not interrupt an + in-progress DRD stream or an in-flight command (those have the socket + checked out of the pool) — close the stream WebSocket to end a live stream. + """ + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if not cfg: + raise HTTPException(status_code=404, detail="NL43 config not found") + + from app.services import _connection_pool + + device_key = f"{cfg.host}:{cfg.tcp_port}" + had_conn = device_key in _connection_pool.get_stats().get("connections", {}) + + await _connection_pool.discard(device_key) + + return { + "status": "ok", + "unit_id": unit_id, + "device_key": device_key, + "disconnected": had_conn, + "message": "Connection closed" if had_conn else "No cached connection to close", + } + + # ============================================================================ # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # ============================================================================ -- 2.52.0 From b954eb8c89e47f7b29aecc2c2fffd3d9d5d448cb Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 22:45:52 +0000 Subject: [PATCH 4/9] feat: per-unit deactivate and global SLMM standby MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lets an instance stop occupying a device's single TCP connection slot so another instance (e.g. prod) can take over. Per-unit: - POST /api/nl43/{unit_id}/deactivate — poll_enabled=False (persisted) + drop the connection (waits up to 10s for in-flight ops via the device lock, then discards). Unit stays dormant across restarts. - POST /api/nl43/{unit_id}/activate — re-enable polling. Global standby: - POST /api/nl43/_system/standby — poller idles and releases ALL connections; the loop keeps re-releasing so the instance holds no slots. - POST /api/nl43/_system/resume — resume polling. - GET /api/nl43/_system/status — active vs standby + active_connections. - SLMM_POLLING_ENABLED=false starts an instance in standby (persistent way to keep a dev box from latching onto a prod-owned device). Co-Authored-By: Claude Opus 4.8 (1M context) --- app/background_poller.py | 48 +++++++++++++++++++-- app/routers.py | 93 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 4 deletions(-) diff --git a/app/background_poller.py b/app/background_poller.py index 64bcc60..071fc31 100644 --- a/app/background_poller.py +++ b/app/background_poller.py @@ -8,6 +8,7 @@ for fast API access without querying devices on every request. import asyncio import logging +import os from datetime import datetime, timedelta from typing import Optional @@ -20,6 +21,11 @@ from app.device_logger import log_device_event, cleanup_old_logs logger = logging.getLogger(__name__) +# Global polling default. Set SLMM_POLLING_ENABLED=false to start an instance in +# standby (running but not polling and not holding device connections) — e.g. a +# dev box that must not latch onto a device that a prod instance owns. +POLLING_ENABLED_DEFAULT = os.getenv("SLMM_POLLING_ENABLED", "true").lower() == "true" + class BackgroundPoller: """ @@ -39,6 +45,7 @@ class BackgroundPoller: self._logger = logger self._last_cleanup = None # Track last log cleanup time self._last_pool_log = None # Track last connection pool heartbeat log + self._active = POLLING_ENABLED_DEFAULT # Global polling on/off (standby toggle) async def start(self): """Start the background polling task.""" @@ -71,15 +78,48 @@ class BackgroundPoller: self._logger.info("Background poller stopped") + def is_active(self) -> bool: + """Whether background polling is currently active (vs standby).""" + return self._active + + async def set_active(self, active: bool): + """Globally enable/disable polling at runtime. + + When deactivated, the loop stays alive but polls nothing and releases all + device connections, so this SLMM instance stops occupying the devices' + single connection slots (e.g. so a prod instance can take over). Runtime + state only — on restart the instance returns to SLMM_POLLING_ENABLED. + """ + self._active = active + if active: + self._logger.info("[SYSTEM] Background polling ACTIVATED") + else: + self._logger.info("[SYSTEM] Background polling DEACTIVATED (standby) — releasing connections") + await self._release_all_connections() + + async def _release_all_connections(self): + """Gracefully close every pooled device connection (no-op if none).""" + from app.services import _connection_pool + for device_key in list(_connection_pool.get_stats().get("connections", {})): + await _connection_pool.discard(device_key) + async def _poll_loop(self): """Main polling loop that runs continuously.""" self._logger.info("Background polling loop started") while self._running: - try: - await self._poll_all_devices() - except Exception as e: - self._logger.error(f"Error in poll loop: {e}", exc_info=True) + if self._active: + try: + await self._poll_all_devices() + except Exception as e: + self._logger.error(f"Error in poll loop: {e}", exc_info=True) + else: + # Standby: poll nothing, and keep holding no device connection slots + # so another SLMM instance (e.g. prod) can talk to the devices. + try: + await self._release_all_connections() + except Exception as e: + self._logger.warning(f"Standby connection release failed: {e}") # Run log cleanup once per hour try: diff --git a/app/routers.py b/app/routers.py index 98673de..6b7cede 100644 --- a/app/routers.py +++ b/app/routers.py @@ -153,6 +153,99 @@ async def disconnect_device(unit_id: str, db: Session = Depends(get_db)): } +@router.post("/{unit_id}/deactivate") +async def deactivate_device(unit_id: str, db: Session = Depends(get_db)): + """Make a single unit dormant: stop background polling for it AND drop its + connection, freeing the device's connection slot. poll_enabled=False is + persisted, so the unit stays dormant across restarts until /activate. + """ + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if not cfg: + raise HTTPException(status_code=404, detail="NL43 config not found") + + cfg.poll_enabled = False + db.commit() + + from app.services import _connection_pool, _get_device_lock + + device_key = f"{cfg.host}:{cfg.tcp_port}" + + # Wait briefly for any in-flight poll/command to finish (so its connection is + # back in the pool), then drop it. If a long-lived stream holds the lock we + # don't block forever — discard the pooled connection regardless. + lock = await _get_device_lock(device_key) + acquired = False + try: + await asyncio.wait_for(lock.acquire(), timeout=10.0) + acquired = True + except asyncio.TimeoutError: + acquired = False + try: + await _connection_pool.discard(device_key) + finally: + if acquired: + lock.release() + + return { + "status": "ok", + "unit_id": unit_id, + "poll_enabled": False, + "message": "Polling disabled and connection closed for this unit", + } + + +@router.post("/{unit_id}/activate") +async def activate_device(unit_id: str, db: Session = Depends(get_db)): + """Resume background polling for a unit previously deactivated.""" + cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first() + if not cfg: + raise HTTPException(status_code=404, detail="NL43 config not found") + + cfg.poll_enabled = True + db.commit() + + return { + "status": "ok", + "unit_id": unit_id, + "poll_enabled": True, + "message": "Polling enabled for this unit", + } + + +@router.get("/_system/status") +async def system_status(): + """Report whether this SLMM instance is actively polling or in standby.""" + from app.background_poller import poller + from app.services import _connection_pool + return { + "status": "ok", + "mode": "active" if poller.is_active() else "standby", + "polling_active": poller.is_active(), + "active_connections": _connection_pool.get_stats().get("active_connections", 0), + } + + +@router.post("/_system/standby") +async def system_standby(): + """Put this SLMM instance into standby: stop polling ALL devices and release + every connection, so it stops occupying device slots (e.g. so a prod instance + can take over). Runtime-only — on restart the instance returns to its + SLMM_POLLING_ENABLED default. + """ + from app.background_poller import poller + await poller.set_active(False) + return {"status": "ok", "mode": "standby", + "message": "Polling stopped and all device connections released"} + + +@router.post("/_system/resume") +async def system_resume(): + """Resume polling after standby (global).""" + from app.background_poller import poller + await poller.set_active(True) + return {"status": "ok", "mode": "active", "message": "Polling resumed"} + + # ============================================================================ # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # ============================================================================ -- 2.52.0 From 8c17af4849fa8ec4231ab55fd3bc7a2f5ded040e Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 22:50:18 +0000 Subject: [PATCH 5/9] fix: ignore garbled measurement-state reads (phantom STOPPED/STARTED) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A buffer desync on the shared persistent connection (commonly right after a DRD/DOD test) can make a Measure? read return a stray value. The state classifier treated anything not in {"Start","Measure"} as "not measuring", so a garbled read logged a phantom STOPPED, the next clean read logged STARTED, and that reset measurement_start_time — producing constant STOPPED/STARTED device-log pairs and a drifting elapsed timer. Now only recognized states drive transitions: {"Start","Measure"} = measuring, {"Stop"} = stopped, anything else = no change. Garbled reads are also not persisted as the cached state, so they can't poison the next transition check. Builds on the earlier Start<->Measure normalization. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/services.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/app/services.py b/app/services.py index 5e15a06..b5c68ea 100644 --- a/app/services.py +++ b/app/services.py @@ -75,13 +75,24 @@ def persist_snapshot(s: NL43Snapshot, db: Session): # but the DRD stream path tags snapshots "Measure" (and the DOD fallback also # uses "Measure"). Treat ALL of these as "measuring" — otherwise opening and # closing the live stream flips state "Start"->"Measure"->"Start", which the - # old equality check misread as stop-then-start and RESET measurement_start_time - # every single time (the "elapsed time keeps resetting / shows wrong value on - # another computer" bug — and each extra viewer made it worse). + # old equality check misread as stop-then-start and reset measurement_start_time. + # + # Also: only act on RECOGNIZED states. A buffer desync on the shared connection + # (e.g. right after a DRD/DOD test) can make a Measure? read return a stray, + # garbled value; treating that as "not measuring" produced constant phantom + # "STOPPED -> STARTED" log pairs and reset the timer. Ignore unknown reads. MEASURING_STATES = {"Start", "Measure"} - is_measuring = new_state in MEASURING_STATES + STOPPED_STATES = {"Stop"} was_measuring = previous_state in MEASURING_STATES + if new_state in MEASURING_STATES: + is_measuring = True + elif new_state in STOPPED_STATES: + is_measuring = False + else: + logger.warning(f"Ignoring unrecognized measurement state for {s.unit_id}: {new_state!r}") + is_measuring = was_measuring # garbled/unknown read — no transition + if not was_measuring and is_measuring: # Measurement just started - record the start time row.measurement_start_time = datetime.utcnow() @@ -103,7 +114,10 @@ def persist_snapshot(s: NL43Snapshot, db: Session): except Exception: pass - row.measurement_state = new_state + # Only persist a recognized state so one garbled read can't poison the next + # transition check (which would manufacture the phantom STOPPED/STARTED pair). + if new_state in MEASURING_STATES or new_state in STOPPED_STATES: + row.measurement_state = new_state row.counter = s.counter row.lp = s.lp row.leq = s.leq -- 2.52.0 From aa3e088b64622ffb2adbb112430af2ef216746ce Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 23:27:05 +0000 Subject: [PATCH 6/9] feat: per-device live monitor (fan-out) + alert evaluator (POC) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The piece the live-view + alerting work was building toward. monitor.py — one DOD poll loop per device, broadcast to many subscribers: - browser WebSockets (fixes the single-connection "second viewer sees nothing" contention — browsers no longer each open a device stream) - the alert evaluator (can keep a feed running with no browser via /monitor/start, so alerting runs continuously) - persistence (each snapshot written like the poller) DOD-sourced, so the broadcast carries ln1/ln2 (which DRD cannot). All polls go through the existing per-device lock + pool, so it serializes safely with the background poller and on-demand commands. alerts.py — pluggable POC evaluator: fires (logs) when ALERT_METRIC exceeds ALERT_THRESHOLD_DB with an ALERT_COOLDOWN_SECONDS cooldown. The rule (instantaneous vs sustained vs L10) is the single swap point; dispatch is a server log for now (email/SMS later). Endpoints: - WS /api/nl43/{unit_id}/monitor subscribe to the shared feed - POST /api/nl43/{unit_id}/monitor/start keep feed alive w/o a browser - POST /api/nl43/{unit_id}/monitor/stop drop the keep-alive - GET /api/nl43/_monitor/status running/subscribers/keepalive WS endpoint races queue.get() against a disconnect watcher so an idle feed still detects client drop and doesn't leak a subscription. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/alerts.py | 71 ++++++++++++++++++++ app/monitor.py | 176 +++++++++++++++++++++++++++++++++++++++++++++++++ app/routers.py | 74 +++++++++++++++++++++ 3 files changed, 321 insertions(+) create mode 100644 app/alerts.py create mode 100644 app/monitor.py diff --git a/app/alerts.py b/app/alerts.py new file mode 100644 index 0000000..7c001e2 --- /dev/null +++ b/app/alerts.py @@ -0,0 +1,71 @@ +""" +Alert evaluation (POC). + +Receives each monitor snapshot and fires an alert when a configured metric +exceeds a threshold, with a cooldown so a sustained loud period doesn't spam. + +The RULE here is intentionally simple and swappable. Instantaneous Lp vs a +sustained window vs L10 is still an open design decision — this evaluator is the +single plug point for it. For the POC the rule is "instantaneous metric > +threshold, rate-limited by a cooldown", and dispatch is just a server-side log. +Wire email/SMS (likely via a Terra-View webhook) into _dispatch() later. + +Config via env: +- ALERT_ENABLED (default true) +- ALERT_METRIC which snapshot field to test: lp/leq/lmax/ln1/ln2 (default lp) +- ALERT_THRESHOLD_DB numeric dB threshold (default 85) +- ALERT_COOLDOWN_SECONDS min seconds between alerts per unit (default 60) +""" + +import asyncio +import logging +import os +from typing import Dict + +logger = logging.getLogger(__name__) + + +class AlertEvaluator: + def __init__(self): + self.enabled = os.getenv("ALERT_ENABLED", "true").lower() == "true" + self.metric = os.getenv("ALERT_METRIC", "lp").lower() + self.threshold_db = float(os.getenv("ALERT_THRESHOLD_DB", "85")) + self.cooldown_s = float(os.getenv("ALERT_COOLDOWN_SECONDS", "60")) + self._last_fired: Dict[str, float] = {} + logger.info( + f"[ALERT] evaluator ready: enabled={self.enabled} metric={self.metric} " + f"threshold={self.threshold_db}dB cooldown={self.cooldown_s}s" + ) + + async def evaluate(self, unit_id: str, snap) -> None: + """Evaluate one snapshot; fire (log) if the metric exceeds threshold.""" + if not self.enabled: + return + + raw = getattr(snap, self.metric, None) + try: + level = float(raw) + except (TypeError, ValueError): + return # missing / non-numeric (e.g. "-.-") + + if level <= self.threshold_db: + return + + # Cooldown — use the event loop clock (Math.random/Date.now-free). + now = asyncio.get_running_loop().time() + if now - self._last_fired.get(unit_id, 0.0) < self.cooldown_s: + return + self._last_fired[unit_id] = now + + await self._dispatch(unit_id, level) + + async def _dispatch(self, unit_id: str, level: float) -> None: + """POC dispatch: server-side log. Swap in email/SMS here later.""" + logger.warning( + f"[ALERT] {unit_id}: {self.metric.upper()}={level:.1f} dB exceeded " + f"threshold {self.threshold_db:.1f} dB" + ) + + +# Module-level singleton +alert_evaluator = AlertEvaluator() diff --git a/app/monitor.py b/app/monitor.py new file mode 100644 index 0000000..10e190c --- /dev/null +++ b/app/monitor.py @@ -0,0 +1,176 @@ +""" +Per-device live monitor (fan-out hub). + +ONE DOD poll loop per device, broadcast to many subscribers: +- browser WebSocket clients (live view) — they no longer each open their own + device stream, so the NL43's single-connection limit stops causing the + "second viewer sees nothing" contention. +- the alert evaluator (threshold alerts), which can keep a device's feed running + even with no browser attached. +- persistence (each snapshot is written to NL43Status, like the poller does). + +The device's one TCP connection is respected: every poll goes through the same +per-device lock + connection pool in services.py, so the monitor, the background +poller, and on-demand commands all serialize safely. +""" + +import asyncio +import logging +import os +from datetime import datetime +from typing import Dict, Optional, Set + +from app.database import SessionLocal +from app.models import NL43Config, NL43Status +from app.services import NL43Client, persist_snapshot +from app.alerts import alert_evaluator + +logger = logging.getLogger(__name__) + +# Sleep between DOD polls. Note the 1s device rate-limit (and DOD?+Measure? per +# poll) already paces the effective rate to a few seconds; this is the extra idle. +MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "1.0")) + + +def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict: + """Build the broadcast payload — same shape as the DRD stream, but DOD-sourced + so it carries ln1/ln2 (which DRD cannot).""" + return { + "unit_id": unit_id, + "timestamp": datetime.utcnow().isoformat(), + "measurement_state": snap.measurement_state, + "measurement_start_time": measurement_start_time, + "counter": snap.counter, + "lp": snap.lp, + "leq": snap.leq, + "lmax": snap.lmax, + "lmin": snap.lmin, + "lpeak": snap.lpeak, + "ln1": snap.ln1, + "ln2": snap.ln2, + "raw_payload": snap.raw_payload, + } + + +class DeviceMonitor: + """Owns a single DOD poll loop for one device and fans each snapshot out to + all subscribers. Runs while it has at least one browser subscriber OR the + server-side keep-alive (alerting) flag is set.""" + + def __init__(self, unit_id: str): + self.unit_id = unit_id + self._subscribers: Set[asyncio.Queue] = set() + self._keepalive = False + self._task: Optional[asyncio.Task] = None + self._lock = asyncio.Lock() + + @property + def running(self) -> bool: + return self._task is not None and not self._task.done() + + def subscriber_count(self) -> int: + return len(self._subscribers) + + def _has_demand(self) -> bool: + return bool(self._subscribers) or self._keepalive + + def _ensure_task(self) -> None: + if self._task is None or self._task.done(): + self._task = asyncio.create_task(self._run()) + + async def subscribe(self) -> asyncio.Queue: + q: asyncio.Queue = asyncio.Queue(maxsize=5) + async with self._lock: + self._subscribers.add(q) + self._ensure_task() + return q + + async def unsubscribe(self, q: asyncio.Queue) -> None: + async with self._lock: + self._subscribers.discard(q) + + async def set_keepalive(self, on: bool) -> None: + async with self._lock: + self._keepalive = on + if on: + self._ensure_task() + + async def _run(self) -> None: + logger.info(f"[MONITOR] {self.unit_id}: feed started") + try: + while self._has_demand(): + snap, mst = await self._poll_once() + if snap is not None: + payload = _snapshot_payload(snap, self.unit_id, mst) + self._broadcast(payload) + try: + await alert_evaluator.evaluate(self.unit_id, snap) + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}") + await asyncio.sleep(MONITOR_POLL_INTERVAL) + finally: + logger.info(f"[MONITOR] {self.unit_id}: feed stopped") + + async def _poll_once(self): + """One DOD poll: read, persist, return (snapshot, measurement_start_iso).""" + db = SessionLocal() + try: + cfg = db.query(NL43Config).filter_by(unit_id=self.unit_id).first() + if not cfg or not cfg.tcp_enabled: + return None, None + client = NL43Client( + cfg.host, cfg.tcp_port, + ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password, + ftp_port=cfg.ftp_port or 21, + ) + snap = await client.request_dod() + snap.unit_id = self.unit_id + persist_snapshot(snap, db) + db.commit() + status = db.query(NL43Status).filter_by(unit_id=self.unit_id).first() + mst = (status.measurement_start_time.isoformat() + if status and status.measurement_start_time else None) + return snap, mst + except Exception as e: + logger.warning(f"[MONITOR] {self.unit_id}: poll failed: {e}") + return None, None + finally: + db.close() + + def _broadcast(self, payload: dict) -> None: + for q in list(self._subscribers): + try: + q.put_nowait(payload) + except asyncio.QueueFull: + # Slow consumer — drop this frame rather than stall the whole feed. + pass + + +class MonitorManager: + """Registry of per-device monitors (one per unit_id).""" + + def __init__(self): + self._monitors: Dict[str, DeviceMonitor] = {} + self._lock = asyncio.Lock() + + async def get(self, unit_id: str) -> DeviceMonitor: + async with self._lock: + m = self._monitors.get(unit_id) + if m is None: + m = DeviceMonitor(unit_id) + self._monitors[unit_id] = m + return m + + def status(self) -> dict: + return { + uid: { + "running": m.running, + "subscribers": m.subscriber_count(), + "keepalive": m._keepalive, + } + for uid, m in self._monitors.items() + } + + +# Module-level singleton +monitor_manager = MonitorManager() diff --git a/app/routers.py b/app/routers.py index 6b7cede..1031d84 100644 --- a/app/routers.py +++ b/app/routers.py @@ -246,6 +246,80 @@ async def system_resume(): return {"status": "ok", "mode": "active", "message": "Polling resumed"} +# ============================================================================ +# LIVE MONITOR (fan-out) — one DOD feed per device, broadcast to many clients +# ============================================================================ + +@router.websocket("/{unit_id}/monitor") +async def monitor_stream(websocket: WebSocket, unit_id: str): + """Subscribe a browser to the device's shared 1 Hz DOD feed. + + Any number of clients can attach without each opening its own device + connection (one poll loop per device, fanned out). Same JSON shape as the + DRD stream, but DOD-sourced so it includes ln1/ln2 (L1/L10). + """ + await websocket.accept() + from app.monitor import monitor_manager + + monitor = await monitor_manager.get(unit_id) + queue = await monitor.subscribe() + logger.info(f"Monitor subscriber attached for {unit_id} ({monitor.subscriber_count()} total)") + + async def _watch_disconnect(): + # Completes when the client disconnects, so an idle feed (no data) still + # detects the drop and we don't leak a subscription that keeps the device + # feed (and its connection) alive. + try: + while True: + msg = await websocket.receive() + if msg.get("type") == "websocket.disconnect": + return + except Exception: + return + + gone = asyncio.ensure_future(_watch_disconnect()) + try: + while not gone.done(): + try: + payload = await asyncio.wait_for(queue.get(), timeout=1.0) + except asyncio.TimeoutError: + continue # re-check gone.done() + await websocket.send_json(payload) + except WebSocketDisconnect: + logger.info(f"Monitor subscriber disconnected for {unit_id}") + except Exception as e: + logger.warning(f"Monitor stream error for {unit_id}: {e}") + finally: + gone.cancel() + await monitor.unsubscribe(queue) + + +@router.post("/{unit_id}/monitor/start") +async def monitor_start(unit_id: str): + """Keep the device's feed running even with no browser attached, so alerting + evaluates continuously. Runtime-only (resets on restart).""" + from app.monitor import monitor_manager + monitor = await monitor_manager.get(unit_id) + await monitor.set_keepalive(True) + return {"status": "ok", "unit_id": unit_id, "running": monitor.running, "keepalive": True} + + +@router.post("/{unit_id}/monitor/stop") +async def monitor_stop(unit_id: str): + """Drop the keep-alive; the feed stops once no browser subscribers remain.""" + from app.monitor import monitor_manager + monitor = await monitor_manager.get(unit_id) + await monitor.set_keepalive(False) + return {"status": "ok", "unit_id": unit_id, "keepalive": False} + + +@router.get("/_monitor/status") +async def monitor_status(): + """Status of every device monitor (running, subscriber count, keep-alive).""" + from app.monitor import monitor_manager + return {"status": "ok", "monitors": monitor_manager.status()} + + # ============================================================================ # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # ============================================================================ -- 2.52.0 From 9c43e6853406100a5dd6c11052148c166fc9332e Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 9 Jun 2026 01:04:03 +0000 Subject: [PATCH 7/9] =?UTF-8?q?feat:=20alert=20engine=20stage=201=20?= =?UTF-8?q?=E2=80=94=20rules,=20events,=20state=20machine,=20CRUD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the POC single-threshold check with a real per-rule engine over the live monitor feed. - AlertRule / AlertEvent tables (auto-created via create_all; no migration). Rule = {metric, comparison, threshold_db, duration_s, clear_margin_db, schedule, channels, recipients}. - alerts.py: per-(unit,rule) state machine IDLE->ACTIVE->IDLE with duration debounce (both edges) + clear_margin hysteresis; onset/clear are distinct events; optional nighttime schedule; rule cache w/ invalidation. The state-machine core (_evaluate_step) is pure (no DB/clock) for testing. - Dispatch is a server log (POC); _dispatch() is the seam for a Terra-View webhook (email/SMS) later. - CRUD: POST/GET/PUT/DELETE /{unit}/alerts/rules, GET /{unit}/alerts/events, POST /{unit}/alerts/events/{id}/ack. - test_alert_evaluator.py: synthetic level series proves onset debounce, spike rejection, hysteresis hold, and below-comparison (4/4 pass, no device). Source-agnostic: the same rules transfer unchanged if a unit's feed is later sourced from FTP intervals instead of the DOD monitor. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/alerts.py | 265 +++++++++++++++++++++++++++++++++------- app/models.py | 52 +++++++- app/routers.py | 107 +++++++++++++++- test_alert_evaluator.py | 68 +++++++++++ 4 files changed, 444 insertions(+), 48 deletions(-) create mode 100644 test_alert_evaluator.py diff --git a/app/alerts.py b/app/alerts.py index 7c001e2..1fb84e7 100644 --- a/app/alerts.py +++ b/app/alerts.py @@ -1,71 +1,244 @@ """ -Alert evaluation (POC). +Threshold alert engine. -Receives each monitor snapshot and fires an alert when a configured metric -exceeds a threshold, with a cooldown so a sustained loud period doesn't spam. +Each unit can have any number of AlertRules. A rule is evaluated against the +unit's live monitor snapshots via a small per-(unit, rule) state machine: -The RULE here is intentionally simple and swappable. Instantaneous Lp vs a -sustained window vs L10 is still an open design decision — this evaluator is the -single plug point for it. For the POC the rule is "instantaneous metric > -threshold, rate-limited by a cooldown", and dispatch is just a server-side log. -Wire email/SMS (likely via a Terra-View webhook) into _dispatch() later. + IDLE --(metric exceeds threshold for duration_s)--> ACTIVE (fire ONSET) + ACTIVE --(metric recovers past hysteresis for duration_s)--> IDLE (fire CLEAR) -Config via env: -- ALERT_ENABLED (default true) -- ALERT_METRIC which snapshot field to test: lp/leq/lmax/ln1/ln2 (default lp) -- ALERT_THRESHOLD_DB numeric dB threshold (default 85) -- ALERT_COOLDOWN_SECONDS min seconds between alerts per unit (default 60) +duration_s debounces both edges; clear_margin_db adds hysteresis so a level +hovering at the threshold doesn't flap. Onset and clear are distinct events. + +The state-machine logic (`_evaluate_step`) is intentionally pure — no DB, no +real clock — so it can be unit-tested with a synthetic level series and a fake +clock. The AlertEvaluator wraps it with rule loading, scheduling, persistence, +and dispatch. Dispatch is a server log for now (POC); the seam to POST events to +a Terra-View webhook (email/SMS) is _dispatch(). """ import asyncio import logging import os -from typing import Dict +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Tuple logger = logging.getLogger(__name__) +# Local timezone offset for schedule windows (same env var services.py uses). +_TZ_OFFSET_HOURS = float(os.getenv("TIMEZONE_OFFSET", "-5")) + +# How long to cache a unit's rules before re-querying the DB (rules change rarely). +_RULE_CACHE_TTL_S = 15.0 + + +@dataclass +class RuleState: + """In-memory runtime state for one (unit, rule).""" + phase: str = "idle" # "idle" | "active" + edge_since: Optional[float] = None # when the current edge condition began (clock time) + peak: float = 0.0 + event_id: Optional[int] = None # the open AlertEvent row (for the clear update) + + +def _exceeds(value: float, rule) -> bool: + if rule.comparison == "below": + return value < rule.threshold_db + return value > rule.threshold_db + + +def _recovered(value: float, rule) -> bool: + margin = rule.clear_margin_db or 0.0 + if rule.comparison == "below": + return value > rule.threshold_db + margin + return value < rule.threshold_db - margin + + +def _evaluate_step(state: RuleState, value: float, now: float, rule) -> Optional[str]: + """Advance the state machine by one reading. + + Pure: mutates `state`, returns 'onset' | 'clear' | None. `now` is injected so + tests can drive a fake clock. + """ + duration = rule.duration_s or 0 + + if state.phase == "idle": + if _exceeds(value, rule): + if state.edge_since is None: + state.edge_since = now + if now - state.edge_since >= duration: + state.phase = "active" + state.edge_since = None + state.peak = value + return "onset" + else: + state.edge_since = None + return None + + # active + if rule.comparison == "below": + state.peak = min(state.peak, value) + else: + state.peak = max(state.peak, value) + + if _recovered(value, rule): + if state.edge_since is None: + state.edge_since = now + if now - state.edge_since >= duration: + state.phase = "idle" + state.edge_since = None + return "clear" + else: + state.edge_since = None + return None + + +def _in_window(now_minutes: int, start: str, end: str) -> bool: + """Is now_minutes (minutes since local midnight) within [start, end)? + Handles wraparound windows like 22:00–07:00.""" + def _m(s: str) -> int: + h, m = s.split(":") + return int(h) * 60 + int(m) + s, e = _m(start), _m(end) + if s == e: + return True + if s < e: + return s <= now_minutes < e + return now_minutes >= s or now_minutes < e # wraparound + class AlertEvaluator: def __init__(self): - self.enabled = os.getenv("ALERT_ENABLED", "true").lower() == "true" - self.metric = os.getenv("ALERT_METRIC", "lp").lower() - self.threshold_db = float(os.getenv("ALERT_THRESHOLD_DB", "85")) - self.cooldown_s = float(os.getenv("ALERT_COOLDOWN_SECONDS", "60")) - self._last_fired: Dict[str, float] = {} - logger.info( - f"[ALERT] evaluator ready: enabled={self.enabled} metric={self.metric} " - f"threshold={self.threshold_db}dB cooldown={self.cooldown_s}s" - ) + self._states: Dict[Tuple[str, int], RuleState] = {} + self._rule_cache: Dict[str, Tuple[float, list]] = {} # unit_id -> (fetched_at, rules) + logger.info("[ALERT] rule-based evaluator ready") async def evaluate(self, unit_id: str, snap) -> None: - """Evaluate one snapshot; fire (log) if the metric exceeds threshold.""" - if not self.enabled: + """Evaluate every enabled rule for this unit against one snapshot.""" + rules = self._get_rules(unit_id) + if not rules: return - - raw = getattr(snap, self.metric, None) - try: - level = float(raw) - except (TypeError, ValueError): - return # missing / non-numeric (e.g. "-.-") - - if level <= self.threshold_db: - return - - # Cooldown — use the event loop clock (Math.random/Date.now-free). now = asyncio.get_running_loop().time() - if now - self._last_fired.get(unit_id, 0.0) < self.cooldown_s: - return - self._last_fired[unit_id] = now + for rule in rules: + if not self._in_schedule(rule): + continue + raw = getattr(snap, rule.metric, None) + try: + value = float(raw) + except (TypeError, ValueError): + continue # missing / non-numeric ("-.-") + state = self._states.setdefault((unit_id, rule.id), RuleState()) + action = _evaluate_step(state, value, now, rule) + if action == "onset": + await self._on_onset(unit_id, rule, value, state) + elif action == "clear": + await self._on_clear(unit_id, rule, value, state) - await self._dispatch(unit_id, level) + # -- rule loading (cached) ---------------------------------------------- - async def _dispatch(self, unit_id: str, level: float) -> None: - """POC dispatch: server-side log. Swap in email/SMS here later.""" - logger.warning( - f"[ALERT] {unit_id}: {self.metric.upper()}={level:.1f} dB exceeded " - f"threshold {self.threshold_db:.1f} dB" + def _get_rules(self, unit_id: str) -> list: + loop_now = asyncio.get_running_loop().time() + cached = self._rule_cache.get(unit_id) + if cached and loop_now - cached[0] < _RULE_CACHE_TTL_S: + return cached[1] + rules = self._load_rules(unit_id) + self._rule_cache[unit_id] = (loop_now, rules) + return rules + + def _load_rules(self, unit_id: str) -> list: + from app.database import SessionLocal + from app.models import AlertRule + db = SessionLocal() + try: + return db.query(AlertRule).filter_by(unit_id=unit_id, enabled=True).all() + except Exception as e: + logger.warning(f"[ALERT] failed to load rules for {unit_id}: {e}") + return [] + finally: + db.close() + + def invalidate(self, unit_id: Optional[str] = None) -> None: + """Drop cached rules so a change is picked up immediately.""" + if unit_id is None: + self._rule_cache.clear() + else: + self._rule_cache.pop(unit_id, None) + + # -- scheduling ---------------------------------------------------------- + + def _in_schedule(self, rule) -> bool: + if not rule.schedule_start or not rule.schedule_end: + day_ok = self._day_ok(rule) + return day_ok + local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS) + if not self._day_ok(rule, local): + return False + return _in_window(local.hour * 60 + local.minute, rule.schedule_start, rule.schedule_end) + + @staticmethod + def _day_ok(rule, local: Optional[datetime] = None) -> bool: + if not rule.schedule_days: + return True + if local is None: + local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS) + allowed = {int(d) for d in str(rule.schedule_days).split(",") if d.strip() != ""} + return local.weekday() in allowed # Mon=0 + + # -- event persistence + dispatch --------------------------------------- + + async def _on_onset(self, unit_id: str, rule, value: float, state: RuleState) -> None: + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + try: + evt = AlertEvent( + rule_id=rule.id, unit_id=unit_id, rule_name=rule.name, + metric=rule.metric, threshold_db=rule.threshold_db, + onset_value=value, peak_value=value, status="active", + ) + db.add(evt) + db.commit() + db.refresh(evt) + state.event_id = evt.id + except Exception as e: + logger.warning(f"[ALERT] failed to record onset for {unit_id}: {e}") + finally: + db.close() + await self._dispatch( + "ONSET", unit_id, rule, + f"{rule.metric.upper()}={value:.1f} dB " + f"{'<' if rule.comparison == 'below' else '>'} {rule.threshold_db:.1f} dB" + f"{f' for {rule.duration_s}s' if rule.duration_s else ''}", ) + async def _on_clear(self, unit_id: str, rule, value: float, state: RuleState) -> None: + peak = state.peak + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + try: + if state.event_id is not None: + evt = db.query(AlertEvent).filter_by(id=state.event_id).first() + if evt: + evt.clear_at = datetime.utcnow() + evt.peak_value = peak + evt.status = "cleared" + db.commit() + except Exception as e: + logger.warning(f"[ALERT] failed to record clear for {unit_id}: {e}") + finally: + db.close() + state.event_id = None + await self._dispatch( + "CLEAR", unit_id, rule, + f"recovered to {value:.1f} dB (peak {peak:.1f} dB)", + ) -# Module-level singleton + async def _dispatch(self, kind: str, unit_id: str, rule, detail: str) -> None: + """POC dispatch: server log. Swap in a Terra-View webhook (email/SMS) here.""" + logger.warning(f"[ALERT:{kind}] {unit_id} '{rule.name}': {detail}") + + +# Module-level singleton (the monitor calls alert_evaluator.evaluate per snapshot) alert_evaluator = AlertEvaluator() diff --git a/app/models.py b/app/models.py index 906043f..67e08ed 100644 --- a/app/models.py +++ b/app/models.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, String, DateTime, Boolean, Integer, Text, func +from sqlalchemy import Column, String, DateTime, Boolean, Integer, Float, Text, func from app.database import Base @@ -74,3 +74,53 @@ class DeviceLog(Base): level = Column(String, default="INFO") # DEBUG, INFO, WARNING, ERROR category = Column(String, default="GENERAL") # TCP, FTP, POLL, COMMAND, STATE, SYNC message = Column(Text, nullable=False) + + +class AlertRule(Base): + """A threshold-alert rule evaluated against a unit's live monitor feed. + + Source-agnostic: today it runs over the DOD monitor; the same rule transfers + unchanged if a unit's feed is later sourced from FTP intervals. + """ + + __tablename__ = "alert_rules" + + id = Column(Integer, primary_key=True, autoincrement=True) + unit_id = Column(String, index=True, nullable=False) + name = Column(String, nullable=False, default="Alert") + metric = Column(String, nullable=False, default="lp") # lp/leq/lmax/lmin/lpeak/ln1/ln2 + comparison = Column(String, nullable=False, default="above") # above | below + threshold_db = Column(Float, nullable=False) + duration_s = Column(Integer, nullable=False, default=0) # sustained seconds (0 = instant) + clear_margin_db = Column(Float, nullable=False, default=2.0) # hysteresis band + cooldown_s = Column(Integer, nullable=False, default=300) # min seconds between onsets + # Optional time-of-day scoping (local time). schedule_start/end as "HH:MM"; + # null = always active. schedule_days = CSV of 0-6 (Mon=0); null = every day. + schedule_start = Column(String, nullable=True) + schedule_end = Column(String, nullable=True) + schedule_days = Column(String, nullable=True) + channels = Column(String, nullable=False, default="log") # CSV: log,email,sms + recipients = Column(Text, nullable=True) # CSV of emails/phones + enabled = Column(Boolean, default=True) + created_at = Column(DateTime, default=func.now()) + + +class AlertEvent(Base): + """A fired alert (onset → clear), for history / inbox / acknowledgement.""" + + __tablename__ = "alert_events" + + id = Column(Integer, primary_key=True, autoincrement=True) + rule_id = Column(Integer, index=True, nullable=False) + unit_id = Column(String, index=True, nullable=False) + rule_name = Column(String, nullable=True) + metric = Column(String, nullable=False) + threshold_db = Column(Float, nullable=False) + onset_at = Column(DateTime, default=func.now(), index=True) + onset_value = Column(Float, nullable=True) + peak_value = Column(Float, nullable=True) + clear_at = Column(DateTime, nullable=True) + status = Column(String, default="active") # active | cleared + acknowledged_at = Column(DateTime, nullable=True) + acknowledged_by = Column(String, nullable=True) + notes = Column(Text, nullable=True) diff --git a/app/routers.py b/app/routers.py index 1031d84..f317e15 100644 --- a/app/routers.py +++ b/app/routers.py @@ -11,7 +11,7 @@ import os import asyncio from app.database import get_db -from app.models import NL43Config, NL43Status +from app.models import NL43Config, NL43Status, AlertRule, AlertEvent from app.services import NL43Client, persist_snapshot logger = logging.getLogger(__name__) @@ -320,6 +320,111 @@ async def monitor_status(): return {"status": "ok", "monitors": monitor_manager.status()} +# ============================================================================ +# ALERTS — threshold rules + fired events +# ============================================================================ + +class AlertRulePayload(BaseModel): + name: str = "Alert" + metric: str = "lp" # lp/leq/lmax/lmin/lpeak/ln1/ln2 + comparison: str = "above" # above | below + threshold_db: float + duration_s: int = 0 # sustained seconds before firing (0 = instant) + clear_margin_db: float = 2.0 # hysteresis band + cooldown_s: int = 300 + schedule_start: str | None = None # "HH:MM" local; null = always + schedule_end: str | None = None + schedule_days: str | None = None # CSV of 0-6 (Mon=0); null = every day + channels: str = "log" + recipients: str | None = None + enabled: bool = True + + +def _rule_dict(r: AlertRule) -> dict: + return { + "id": r.id, "unit_id": r.unit_id, "name": r.name, "metric": r.metric, + "comparison": r.comparison, "threshold_db": r.threshold_db, + "duration_s": r.duration_s, "clear_margin_db": r.clear_margin_db, + "cooldown_s": r.cooldown_s, "schedule_start": r.schedule_start, + "schedule_end": r.schedule_end, "schedule_days": r.schedule_days, + "channels": r.channels, "recipients": r.recipients, "enabled": r.enabled, + } + + +def _event_dict(e: AlertEvent) -> dict: + return { + "id": e.id, "rule_id": e.rule_id, "unit_id": e.unit_id, + "rule_name": e.rule_name, "metric": e.metric, "threshold_db": e.threshold_db, + "onset_at": e.onset_at.isoformat() if e.onset_at else None, + "onset_value": e.onset_value, "peak_value": e.peak_value, + "clear_at": e.clear_at.isoformat() if e.clear_at else None, + "status": e.status, + "acknowledged_at": e.acknowledged_at.isoformat() if e.acknowledged_at else None, + "acknowledged_by": e.acknowledged_by, + } + + +@router.post("/{unit_id}/alerts/rules") +def create_alert_rule(unit_id: str, payload: AlertRulePayload, db: Session = Depends(get_db)): + rule = AlertRule(unit_id=unit_id, **payload.model_dump()) + db.add(rule) + db.commit() + db.refresh(rule) + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + return {"status": "ok", "rule": _rule_dict(rule)} + + +@router.get("/{unit_id}/alerts/rules") +def list_alert_rules(unit_id: str, db: Session = Depends(get_db)): + rules = db.query(AlertRule).filter_by(unit_id=unit_id).all() + return {"status": "ok", "rules": [_rule_dict(r) for r in rules]} + + +@router.put("/{unit_id}/alerts/rules/{rule_id}") +def update_alert_rule(unit_id: str, rule_id: int, payload: AlertRulePayload, db: Session = Depends(get_db)): + rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first() + if not rule: + raise HTTPException(status_code=404, detail="Alert rule not found") + for field, value in payload.model_dump().items(): + setattr(rule, field, value) + db.commit() + db.refresh(rule) + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + return {"status": "ok", "rule": _rule_dict(rule)} + + +@router.delete("/{unit_id}/alerts/rules/{rule_id}") +def delete_alert_rule(unit_id: str, rule_id: int, db: Session = Depends(get_db)): + rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first() + if not rule: + raise HTTPException(status_code=404, detail="Alert rule not found") + db.delete(rule) + db.commit() + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + return {"status": "ok", "deleted": rule_id} + + +@router.get("/{unit_id}/alerts/events") +def list_alert_events(unit_id: str, limit: int = 50, db: Session = Depends(get_db)): + events = (db.query(AlertEvent).filter_by(unit_id=unit_id) + .order_by(AlertEvent.onset_at.desc()).limit(limit).all()) + return {"status": "ok", "events": [_event_dict(e) for e in events]} + + +@router.post("/{unit_id}/alerts/events/{event_id}/ack") +def ack_alert_event(unit_id: str, event_id: int, by: str | None = None, db: Session = Depends(get_db)): + evt = db.query(AlertEvent).filter_by(id=event_id, unit_id=unit_id).first() + if not evt: + raise HTTPException(status_code=404, detail="Alert event not found") + evt.acknowledged_at = datetime.utcnow() + evt.acknowledged_by = by + db.commit() + return {"status": "ok", "acknowledged": event_id} + + # ============================================================================ # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # ============================================================================ diff --git a/test_alert_evaluator.py b/test_alert_evaluator.py new file mode 100644 index 0000000..5bacc62 --- /dev/null +++ b/test_alert_evaluator.py @@ -0,0 +1,68 @@ +""" +Synthetic unit test for the alert state machine — no DB, no device. + +Drives `_evaluate_step` with a fake clock + a level series and checks that +onset/clear fire with the right debounce + hysteresis. Run: + + docker compose exec -T slmm python3 test_alert_evaluator.py + # or, if app.alerts imports cleanly standalone: python3 test_alert_evaluator.py +""" + +from types import SimpleNamespace +from app.alerts import RuleState, _evaluate_step + + +def rule(**kw): + base = dict(threshold_db=85.0, duration_s=3, clear_margin_db=2.0, comparison="above") + base.update(kw) + return SimpleNamespace(**base) + + +def run(series, r): + st = RuleState() + events = [(now, a) for value, now in series + if (a := _evaluate_step(st, value, now, r))] + return events, st + + +def main(): + failures = 0 + + def check(label, cond, detail=""): + nonlocal failures + print(("PASS" if cond else "FAIL"), label, detail) + if not cond: + failures += 1 + + # 1) sustained exceedance -> onset after duration; recovery -> clear after duration + r = rule(threshold_db=85, duration_s=3, clear_margin_db=2) + ev, _ = run([(80, 0), (86, 1), (87, 2), (88, 3), (88, 4), + (88, 5), (82, 6), (82, 7), (82, 8), (82, 9)], r) + onsets = [t for t, a in ev if a == "onset"] + clears = [t for t, a in ev if a == "clear"] + check("1 sustained onset@4 / clear@9", onsets == [4] and clears == [9], str(ev)) + + # 2) brief spike under duration -> no onset (debounce) + ev, _ = run([(80, 0), (90, 1), (90, 2), (80, 3), (80, 4)], rule(duration_s=3)) + check("2 brief spike debounced", ev == [], str(ev)) + + # 3) hysteresis: a dip into the margin (below threshold, above threshold-margin) + # does NOT clear + r = rule(threshold_db=85, duration_s=0, clear_margin_db=3) + ev, st = run([(86, 0), (84, 1), (84, 2), (84, 3)], r) + check("3 hysteresis holds ACTIVE", ev == [(0, "onset")] and st.phase == "active", + f"{ev} phase={st.phase}") + + # 4) 'below' comparison (device too quiet) -> onset when value < threshold + ev, _ = run([(30, 0), (15, 1)], rule(threshold_db=20, duration_s=0, + clear_margin_db=2, comparison="below")) + check("4 below-comparison onset@1", ev == [(1, "onset")], str(ev)) + + print() + print("ALL PASS" if failures == 0 else f"{failures} FAILURE(S)") + return failures + + +if __name__ == "__main__": + import sys + sys.exit(1 if main() else 0) -- 2.52.0 From 6b1ec753964aa1a3cb65248655ceb66cac65af63 Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 9 Jun 2026 17:13:21 +0000 Subject: [PATCH 8/9] =?UTF-8?q?feat:=20harden=20fan-out=20for=20live=20cli?= =?UTF-8?q?ents=20=E2=80=94=20instant=20first=20frame=20+=20offline=20stat?= =?UTF-8?q?us?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For multiple clients connecting to a live feed (e.g. the client portal): - cache the last broadcast frame and replay it to a new subscriber on connect, so a client sees data immediately instead of waiting a full poll cycle. - broadcast a {"feed_status":"unreachable"} frame once on transition (after 3 consecutive poll failures) so clients can render an offline state instead of a frozen chart; data frames now carry "feed_status":"ok". The cached frame reflects current state, so a client connecting while offline gets "unreachable" right away too. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/monitor.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/app/monitor.py b/app/monitor.py index 10e190c..1177893 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -63,6 +63,9 @@ class DeviceMonitor: self._keepalive = False self._task: Optional[asyncio.Task] = None self._lock = asyncio.Lock() + self._last_payload: Optional[dict] = None # replayed to new subscribers + self._consec_fail = 0 + self._reachable = True # last broadcast reachability (for transition frames) @property def running(self) -> bool: @@ -82,6 +85,13 @@ class DeviceMonitor: q: asyncio.Queue = asyncio.Queue(maxsize=5) async with self._lock: self._subscribers.add(q) + # Replay the last frame so a client connecting mid-stream sees data + # (or the current 'unreachable' state) immediately, not after a poll. + if self._last_payload is not None: + try: + q.put_nowait(self._last_payload) + except asyncio.QueueFull: + pass self._ensure_task() return q @@ -101,12 +111,26 @@ class DeviceMonitor: while self._has_demand(): snap, mst = await self._poll_once() if snap is not None: + self._consec_fail = 0 + self._reachable = True payload = _snapshot_payload(snap, self.unit_id, mst) + payload["feed_status"] = "ok" self._broadcast(payload) try: await alert_evaluator.evaluate(self.unit_id, snap) except Exception as e: logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}") + else: + # Tell clients the device went offline — once, on transition, after a + # few failures so a momentary blip doesn't flap the UI. + self._consec_fail += 1 + if self._reachable and self._consec_fail >= 3: + self._reachable = False + self._broadcast({ + "unit_id": self.unit_id, + "timestamp": datetime.utcnow().isoformat(), + "feed_status": "unreachable", + }) await asyncio.sleep(MONITOR_POLL_INTERVAL) finally: logger.info(f"[MONITOR] {self.unit_id}: feed stopped") @@ -138,6 +162,7 @@ class DeviceMonitor: db.close() def _broadcast(self, payload: dict) -> None: + self._last_payload = payload # cache for replay to new subscribers for q in list(self._subscribers): try: q.put_nowait(payload) -- 2.52.0 From ba622c67d848de1a7c525d09a8c5feff2e0aebf8 Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 9 Jun 2026 17:33:29 +0000 Subject: [PATCH 9/9] feat: monitor heartbeat + background poller skips active-monitored units MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Heartbeat: if nothing has been broadcast in MONITOR_HEARTBEAT_S (default 25s) — e.g. device offline and silent — send a non-cached keepalive frame so a reverse proxy (NPM) doesn't drop the idle WS. New subscribers still get the last real frame, not a heartbeat. - Poller-skip: the 60s background poller now skips any unit with a running monitor (MonitorManager.is_active). The monitor already polls it ~1Hz and keeps the status cache fresh, so the background poll was redundant and just added load/lock-contention on the device's single connection (and churn, which matters for the cellular wedge). Trade-off: the FTP start-time sync (only in the poller) doesn't run while a unit is actively monitored — fine, since reports take the authoritative start time from the FTP .rnd data. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/background_poller.py | 9 +++++++++ app/monitor.py | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/app/background_poller.py b/app/background_poller.py index 071fc31..0148671 100644 --- a/app/background_poller.py +++ b/app/background_poller.py @@ -178,10 +178,19 @@ class BackgroundPoller: now = datetime.utcnow() polled_count = 0 + from app.monitor import monitor_manager + for cfg in configs: if not self._running: break + # Skip units with an active live monitor: it polls them at ~1Hz and + # keeps the status cache fresh, so a redundant background poll would just + # add load/lock-contention on the device's single connection. + if monitor_manager.is_active(cfg.unit_id): + self._logger.debug(f"Skipping {cfg.unit_id} — live monitor active") + continue + # Get current status status = db.query(NL43Status).filter_by(unit_id=cfg.unit_id).first() diff --git a/app/monitor.py b/app/monitor.py index 1177893..47c21be 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -31,6 +31,10 @@ logger = logging.getLogger(__name__) # poll) already paces the effective rate to a few seconds; this is the extra idle. MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "1.0")) +# If nothing has been broadcast in this many seconds (e.g. device offline and +# silent), send a keepalive frame so reverse proxies don't drop the idle WS. +MONITOR_HEARTBEAT_S = float(os.getenv("MONITOR_HEARTBEAT_S", "25")) + def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict: """Build the broadcast payload — same shape as the DRD stream, but DOD-sourced @@ -107,6 +111,8 @@ class DeviceMonitor: async def _run(self) -> None: logger.info(f"[MONITOR] {self.unit_id}: feed started") + loop = asyncio.get_running_loop() + last_send = loop.time() try: while self._has_demand(): snap, mst = await self._poll_once() @@ -116,6 +122,7 @@ class DeviceMonitor: payload = _snapshot_payload(snap, self.unit_id, mst) payload["feed_status"] = "ok" self._broadcast(payload) + last_send = loop.time() try: await alert_evaluator.evaluate(self.unit_id, snap) except Exception as e: @@ -131,6 +138,20 @@ class DeviceMonitor: "timestamp": datetime.utcnow().isoformat(), "feed_status": "unreachable", }) + last_send = loop.time() + + # Heartbeat: during quiet/offline stretches, send a keepalive so an + # idle WS isn't dropped by a reverse proxy. Not cached (new subscribers + # should still get the last real frame, not a heartbeat). + if loop.time() - last_send >= MONITOR_HEARTBEAT_S: + self._broadcast({ + "unit_id": self.unit_id, + "timestamp": datetime.utcnow().isoformat(), + "feed_status": "ok" if self._reachable else "unreachable", + "heartbeat": True, + }, cache=False) + last_send = loop.time() + await asyncio.sleep(MONITOR_POLL_INTERVAL) finally: logger.info(f"[MONITOR] {self.unit_id}: feed stopped") @@ -161,8 +182,9 @@ class DeviceMonitor: finally: db.close() - def _broadcast(self, payload: dict) -> None: - self._last_payload = payload # cache for replay to new subscribers + def _broadcast(self, payload: dict, cache: bool = True) -> None: + if cache: + self._last_payload = payload # replayed to new subscribers for q in list(self._subscribers): try: q.put_nowait(payload) @@ -186,6 +208,12 @@ class MonitorManager: self._monitors[unit_id] = m return m + def is_active(self, unit_id: str) -> bool: + """True if this unit has a running monitor feed (so the background poller + can skip it — the monitor already polls it more often).""" + m = self._monitors.get(unit_id) + return m is not None and m.running + def status(self) -> dict: return { uid: { -- 2.52.0