merge drd-fix into dev #5
+244
@@ -0,0 +1,244 @@
|
||||
"""
|
||||
Threshold alert engine.
|
||||
|
||||
Each unit can have any number of AlertRules. A rule is evaluated against the
|
||||
unit's live monitor snapshots via a small per-(unit, rule) state machine:
|
||||
|
||||
IDLE --(metric exceeds threshold for duration_s)--> ACTIVE (fire ONSET)
|
||||
ACTIVE --(metric recovers past hysteresis for duration_s)--> IDLE (fire CLEAR)
|
||||
|
||||
duration_s debounces both edges; clear_margin_db adds hysteresis so a level
|
||||
hovering at the threshold doesn't flap. Onset and clear are distinct events.
|
||||
|
||||
The state-machine logic (`_evaluate_step`) is intentionally pure — no DB, no
|
||||
real clock — so it can be unit-tested with a synthetic level series and a fake
|
||||
clock. The AlertEvaluator wraps it with rule loading, scheduling, persistence,
|
||||
and dispatch. Dispatch is a server log for now (POC); the seam to POST events to
|
||||
a Terra-View webhook (email/SMS) is _dispatch().
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Local timezone offset for schedule windows (same env var services.py uses).
|
||||
_TZ_OFFSET_HOURS = float(os.getenv("TIMEZONE_OFFSET", "-5"))
|
||||
|
||||
# How long to cache a unit's rules before re-querying the DB (rules change rarely).
|
||||
_RULE_CACHE_TTL_S = 15.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class RuleState:
|
||||
"""In-memory runtime state for one (unit, rule)."""
|
||||
phase: str = "idle" # "idle" | "active"
|
||||
edge_since: Optional[float] = None # when the current edge condition began (clock time)
|
||||
peak: float = 0.0
|
||||
event_id: Optional[int] = None # the open AlertEvent row (for the clear update)
|
||||
|
||||
|
||||
def _exceeds(value: float, rule) -> bool:
|
||||
if rule.comparison == "below":
|
||||
return value < rule.threshold_db
|
||||
return value > rule.threshold_db
|
||||
|
||||
|
||||
def _recovered(value: float, rule) -> bool:
|
||||
margin = rule.clear_margin_db or 0.0
|
||||
if rule.comparison == "below":
|
||||
return value > rule.threshold_db + margin
|
||||
return value < rule.threshold_db - margin
|
||||
|
||||
|
||||
def _evaluate_step(state: RuleState, value: float, now: float, rule) -> Optional[str]:
|
||||
"""Advance the state machine by one reading.
|
||||
|
||||
Pure: mutates `state`, returns 'onset' | 'clear' | None. `now` is injected so
|
||||
tests can drive a fake clock.
|
||||
"""
|
||||
duration = rule.duration_s or 0
|
||||
|
||||
if state.phase == "idle":
|
||||
if _exceeds(value, rule):
|
||||
if state.edge_since is None:
|
||||
state.edge_since = now
|
||||
if now - state.edge_since >= duration:
|
||||
state.phase = "active"
|
||||
state.edge_since = None
|
||||
state.peak = value
|
||||
return "onset"
|
||||
else:
|
||||
state.edge_since = None
|
||||
return None
|
||||
|
||||
# active
|
||||
if rule.comparison == "below":
|
||||
state.peak = min(state.peak, value)
|
||||
else:
|
||||
state.peak = max(state.peak, value)
|
||||
|
||||
if _recovered(value, rule):
|
||||
if state.edge_since is None:
|
||||
state.edge_since = now
|
||||
if now - state.edge_since >= duration:
|
||||
state.phase = "idle"
|
||||
state.edge_since = None
|
||||
return "clear"
|
||||
else:
|
||||
state.edge_since = None
|
||||
return None
|
||||
|
||||
|
||||
def _in_window(now_minutes: int, start: str, end: str) -> bool:
|
||||
"""Is now_minutes (minutes since local midnight) within [start, end)?
|
||||
Handles wraparound windows like 22:00–07:00."""
|
||||
def _m(s: str) -> int:
|
||||
h, m = s.split(":")
|
||||
return int(h) * 60 + int(m)
|
||||
s, e = _m(start), _m(end)
|
||||
if s == e:
|
||||
return True
|
||||
if s < e:
|
||||
return s <= now_minutes < e
|
||||
return now_minutes >= s or now_minutes < e # wraparound
|
||||
|
||||
|
||||
class AlertEvaluator:
|
||||
def __init__(self):
|
||||
self._states: Dict[Tuple[str, int], RuleState] = {}
|
||||
self._rule_cache: Dict[str, Tuple[float, list]] = {} # unit_id -> (fetched_at, rules)
|
||||
logger.info("[ALERT] rule-based evaluator ready")
|
||||
|
||||
async def evaluate(self, unit_id: str, snap) -> None:
|
||||
"""Evaluate every enabled rule for this unit against one snapshot."""
|
||||
rules = self._get_rules(unit_id)
|
||||
if not rules:
|
||||
return
|
||||
now = asyncio.get_running_loop().time()
|
||||
for rule in rules:
|
||||
if not self._in_schedule(rule):
|
||||
continue
|
||||
raw = getattr(snap, rule.metric, None)
|
||||
try:
|
||||
value = float(raw)
|
||||
except (TypeError, ValueError):
|
||||
continue # missing / non-numeric ("-.-")
|
||||
state = self._states.setdefault((unit_id, rule.id), RuleState())
|
||||
action = _evaluate_step(state, value, now, rule)
|
||||
if action == "onset":
|
||||
await self._on_onset(unit_id, rule, value, state)
|
||||
elif action == "clear":
|
||||
await self._on_clear(unit_id, rule, value, state)
|
||||
|
||||
# -- rule loading (cached) ----------------------------------------------
|
||||
|
||||
def _get_rules(self, unit_id: str) -> list:
|
||||
loop_now = asyncio.get_running_loop().time()
|
||||
cached = self._rule_cache.get(unit_id)
|
||||
if cached and loop_now - cached[0] < _RULE_CACHE_TTL_S:
|
||||
return cached[1]
|
||||
rules = self._load_rules(unit_id)
|
||||
self._rule_cache[unit_id] = (loop_now, rules)
|
||||
return rules
|
||||
|
||||
def _load_rules(self, unit_id: str) -> list:
|
||||
from app.database import SessionLocal
|
||||
from app.models import AlertRule
|
||||
db = SessionLocal()
|
||||
try:
|
||||
return db.query(AlertRule).filter_by(unit_id=unit_id, enabled=True).all()
|
||||
except Exception as e:
|
||||
logger.warning(f"[ALERT] failed to load rules for {unit_id}: {e}")
|
||||
return []
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
def invalidate(self, unit_id: Optional[str] = None) -> None:
|
||||
"""Drop cached rules so a change is picked up immediately."""
|
||||
if unit_id is None:
|
||||
self._rule_cache.clear()
|
||||
else:
|
||||
self._rule_cache.pop(unit_id, None)
|
||||
|
||||
# -- scheduling ----------------------------------------------------------
|
||||
|
||||
def _in_schedule(self, rule) -> bool:
|
||||
if not rule.schedule_start or not rule.schedule_end:
|
||||
day_ok = self._day_ok(rule)
|
||||
return day_ok
|
||||
local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS)
|
||||
if not self._day_ok(rule, local):
|
||||
return False
|
||||
return _in_window(local.hour * 60 + local.minute, rule.schedule_start, rule.schedule_end)
|
||||
|
||||
@staticmethod
|
||||
def _day_ok(rule, local: Optional[datetime] = None) -> bool:
|
||||
if not rule.schedule_days:
|
||||
return True
|
||||
if local is None:
|
||||
local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS)
|
||||
allowed = {int(d) for d in str(rule.schedule_days).split(",") if d.strip() != ""}
|
||||
return local.weekday() in allowed # Mon=0
|
||||
|
||||
# -- event persistence + dispatch ---------------------------------------
|
||||
|
||||
async def _on_onset(self, unit_id: str, rule, value: float, state: RuleState) -> None:
|
||||
from app.database import SessionLocal
|
||||
from app.models import AlertEvent
|
||||
db = SessionLocal()
|
||||
try:
|
||||
evt = AlertEvent(
|
||||
rule_id=rule.id, unit_id=unit_id, rule_name=rule.name,
|
||||
metric=rule.metric, threshold_db=rule.threshold_db,
|
||||
onset_value=value, peak_value=value, status="active",
|
||||
)
|
||||
db.add(evt)
|
||||
db.commit()
|
||||
db.refresh(evt)
|
||||
state.event_id = evt.id
|
||||
except Exception as e:
|
||||
logger.warning(f"[ALERT] failed to record onset for {unit_id}: {e}")
|
||||
finally:
|
||||
db.close()
|
||||
await self._dispatch(
|
||||
"ONSET", unit_id, rule,
|
||||
f"{rule.metric.upper()}={value:.1f} dB "
|
||||
f"{'<' if rule.comparison == 'below' else '>'} {rule.threshold_db:.1f} dB"
|
||||
f"{f' for {rule.duration_s}s' if rule.duration_s else ''}",
|
||||
)
|
||||
|
||||
async def _on_clear(self, unit_id: str, rule, value: float, state: RuleState) -> None:
|
||||
peak = state.peak
|
||||
from app.database import SessionLocal
|
||||
from app.models import AlertEvent
|
||||
db = SessionLocal()
|
||||
try:
|
||||
if state.event_id is not None:
|
||||
evt = db.query(AlertEvent).filter_by(id=state.event_id).first()
|
||||
if evt:
|
||||
evt.clear_at = datetime.utcnow()
|
||||
evt.peak_value = peak
|
||||
evt.status = "cleared"
|
||||
db.commit()
|
||||
except Exception as e:
|
||||
logger.warning(f"[ALERT] failed to record clear for {unit_id}: {e}")
|
||||
finally:
|
||||
db.close()
|
||||
state.event_id = None
|
||||
await self._dispatch(
|
||||
"CLEAR", unit_id, rule,
|
||||
f"recovered to {value:.1f} dB (peak {peak:.1f} dB)",
|
||||
)
|
||||
|
||||
async def _dispatch(self, kind: str, unit_id: str, rule, detail: str) -> None:
|
||||
"""POC dispatch: server log. Swap in a Terra-View webhook (email/SMS) here."""
|
||||
logger.warning(f"[ALERT:{kind}] {unit_id} '{rule.name}': {detail}")
|
||||
|
||||
|
||||
# Module-level singleton (the monitor calls alert_evaluator.evaluate per snapshot)
|
||||
alert_evaluator = AlertEvaluator()
|
||||
@@ -8,6 +8,7 @@ for fast API access without querying devices on every request.
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional
|
||||
|
||||
@@ -20,6 +21,11 @@ from app.device_logger import log_device_event, cleanup_old_logs
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Global polling default. Set SLMM_POLLING_ENABLED=false to start an instance in
|
||||
# standby (running but not polling and not holding device connections) — e.g. a
|
||||
# dev box that must not latch onto a device that a prod instance owns.
|
||||
POLLING_ENABLED_DEFAULT = os.getenv("SLMM_POLLING_ENABLED", "true").lower() == "true"
|
||||
|
||||
|
||||
class BackgroundPoller:
|
||||
"""
|
||||
@@ -39,6 +45,7 @@ class BackgroundPoller:
|
||||
self._logger = logger
|
||||
self._last_cleanup = None # Track last log cleanup time
|
||||
self._last_pool_log = None # Track last connection pool heartbeat log
|
||||
self._active = POLLING_ENABLED_DEFAULT # Global polling on/off (standby toggle)
|
||||
|
||||
async def start(self):
|
||||
"""Start the background polling task."""
|
||||
@@ -71,15 +78,48 @@ class BackgroundPoller:
|
||||
|
||||
self._logger.info("Background poller stopped")
|
||||
|
||||
def is_active(self) -> bool:
|
||||
"""Whether background polling is currently active (vs standby)."""
|
||||
return self._active
|
||||
|
||||
async def set_active(self, active: bool):
|
||||
"""Globally enable/disable polling at runtime.
|
||||
|
||||
When deactivated, the loop stays alive but polls nothing and releases all
|
||||
device connections, so this SLMM instance stops occupying the devices'
|
||||
single connection slots (e.g. so a prod instance can take over). Runtime
|
||||
state only — on restart the instance returns to SLMM_POLLING_ENABLED.
|
||||
"""
|
||||
self._active = active
|
||||
if active:
|
||||
self._logger.info("[SYSTEM] Background polling ACTIVATED")
|
||||
else:
|
||||
self._logger.info("[SYSTEM] Background polling DEACTIVATED (standby) — releasing connections")
|
||||
await self._release_all_connections()
|
||||
|
||||
async def _release_all_connections(self):
|
||||
"""Gracefully close every pooled device connection (no-op if none)."""
|
||||
from app.services import _connection_pool
|
||||
for device_key in list(_connection_pool.get_stats().get("connections", {})):
|
||||
await _connection_pool.discard(device_key)
|
||||
|
||||
async def _poll_loop(self):
|
||||
"""Main polling loop that runs continuously."""
|
||||
self._logger.info("Background polling loop started")
|
||||
|
||||
while self._running:
|
||||
if self._active:
|
||||
try:
|
||||
await self._poll_all_devices()
|
||||
except Exception as e:
|
||||
self._logger.error(f"Error in poll loop: {e}", exc_info=True)
|
||||
else:
|
||||
# Standby: poll nothing, and keep holding no device connection slots
|
||||
# so another SLMM instance (e.g. prod) can talk to the devices.
|
||||
try:
|
||||
await self._release_all_connections()
|
||||
except Exception as e:
|
||||
self._logger.warning(f"Standby connection release failed: {e}")
|
||||
|
||||
# Run log cleanup once per hour
|
||||
try:
|
||||
@@ -138,10 +178,19 @@ class BackgroundPoller:
|
||||
now = datetime.utcnow()
|
||||
polled_count = 0
|
||||
|
||||
from app.monitor import monitor_manager
|
||||
|
||||
for cfg in configs:
|
||||
if not self._running:
|
||||
break
|
||||
|
||||
# Skip units with an active live monitor: it polls them at ~1Hz and
|
||||
# keeps the status cache fresh, so a redundant background poll would just
|
||||
# add load/lock-contention on the device's single connection.
|
||||
if monitor_manager.is_active(cfg.unit_id):
|
||||
self._logger.debug(f"Skipping {cfg.unit_id} — live monitor active")
|
||||
continue
|
||||
|
||||
# Get current status
|
||||
status = db.query(NL43Status).filter_by(unit_id=cfg.unit_id).first()
|
||||
|
||||
|
||||
+53
-1
@@ -1,4 +1,4 @@
|
||||
from sqlalchemy import Column, String, DateTime, Boolean, Integer, Text, func
|
||||
from sqlalchemy import Column, String, DateTime, Boolean, Integer, Float, Text, func
|
||||
from app.database import Base
|
||||
|
||||
|
||||
@@ -41,6 +41,8 @@ class NL43Status(Base):
|
||||
lmax = Column(String, nullable=True) # Maximum level
|
||||
lmin = Column(String, nullable=True) # Minimum level
|
||||
lpeak = Column(String, nullable=True) # Peak level
|
||||
ln1 = Column(String, nullable=True) # Percentile slot LN1 (configurable; device default L5, contract L1)
|
||||
ln2 = Column(String, nullable=True) # Percentile slot LN2 (configurable; device default L10)
|
||||
battery_level = Column(String, nullable=True)
|
||||
power_source = Column(String, nullable=True)
|
||||
sd_remaining_mb = Column(String, nullable=True)
|
||||
@@ -72,3 +74,53 @@ class DeviceLog(Base):
|
||||
level = Column(String, default="INFO") # DEBUG, INFO, WARNING, ERROR
|
||||
category = Column(String, default="GENERAL") # TCP, FTP, POLL, COMMAND, STATE, SYNC
|
||||
message = Column(Text, nullable=False)
|
||||
|
||||
|
||||
class AlertRule(Base):
|
||||
"""A threshold-alert rule evaluated against a unit's live monitor feed.
|
||||
|
||||
Source-agnostic: today it runs over the DOD monitor; the same rule transfers
|
||||
unchanged if a unit's feed is later sourced from FTP intervals.
|
||||
"""
|
||||
|
||||
__tablename__ = "alert_rules"
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
unit_id = Column(String, index=True, nullable=False)
|
||||
name = Column(String, nullable=False, default="Alert")
|
||||
metric = Column(String, nullable=False, default="lp") # lp/leq/lmax/lmin/lpeak/ln1/ln2
|
||||
comparison = Column(String, nullable=False, default="above") # above | below
|
||||
threshold_db = Column(Float, nullable=False)
|
||||
duration_s = Column(Integer, nullable=False, default=0) # sustained seconds (0 = instant)
|
||||
clear_margin_db = Column(Float, nullable=False, default=2.0) # hysteresis band
|
||||
cooldown_s = Column(Integer, nullable=False, default=300) # min seconds between onsets
|
||||
# Optional time-of-day scoping (local time). schedule_start/end as "HH:MM";
|
||||
# null = always active. schedule_days = CSV of 0-6 (Mon=0); null = every day.
|
||||
schedule_start = Column(String, nullable=True)
|
||||
schedule_end = Column(String, nullable=True)
|
||||
schedule_days = Column(String, nullable=True)
|
||||
channels = Column(String, nullable=False, default="log") # CSV: log,email,sms
|
||||
recipients = Column(Text, nullable=True) # CSV of emails/phones
|
||||
enabled = Column(Boolean, default=True)
|
||||
created_at = Column(DateTime, default=func.now())
|
||||
|
||||
|
||||
class AlertEvent(Base):
|
||||
"""A fired alert (onset → clear), for history / inbox / acknowledgement."""
|
||||
|
||||
__tablename__ = "alert_events"
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
rule_id = Column(Integer, index=True, nullable=False)
|
||||
unit_id = Column(String, index=True, nullable=False)
|
||||
rule_name = Column(String, nullable=True)
|
||||
metric = Column(String, nullable=False)
|
||||
threshold_db = Column(Float, nullable=False)
|
||||
onset_at = Column(DateTime, default=func.now(), index=True)
|
||||
onset_value = Column(Float, nullable=True)
|
||||
peak_value = Column(Float, nullable=True)
|
||||
clear_at = Column(DateTime, nullable=True)
|
||||
status = Column(String, default="active") # active | cleared
|
||||
acknowledged_at = Column(DateTime, nullable=True)
|
||||
acknowledged_by = Column(String, nullable=True)
|
||||
notes = Column(Text, nullable=True)
|
||||
|
||||
+229
@@ -0,0 +1,229 @@
|
||||
"""
|
||||
Per-device live monitor (fan-out hub).
|
||||
|
||||
ONE DOD poll loop per device, broadcast to many subscribers:
|
||||
- browser WebSocket clients (live view) — they no longer each open their own
|
||||
device stream, so the NL43's single-connection limit stops causing the
|
||||
"second viewer sees nothing" contention.
|
||||
- the alert evaluator (threshold alerts), which can keep a device's feed running
|
||||
even with no browser attached.
|
||||
- persistence (each snapshot is written to NL43Status, like the poller does).
|
||||
|
||||
The device's one TCP connection is respected: every poll goes through the same
|
||||
per-device lock + connection pool in services.py, so the monitor, the background
|
||||
poller, and on-demand commands all serialize safely.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import Dict, Optional, Set
|
||||
|
||||
from app.database import SessionLocal
|
||||
from app.models import NL43Config, NL43Status
|
||||
from app.services import NL43Client, persist_snapshot
|
||||
from app.alerts import alert_evaluator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Sleep between DOD polls. Note the 1s device rate-limit (and DOD?+Measure? per
|
||||
# poll) already paces the effective rate to a few seconds; this is the extra idle.
|
||||
MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "1.0"))
|
||||
|
||||
# If nothing has been broadcast in this many seconds (e.g. device offline and
|
||||
# silent), send a keepalive frame so reverse proxies don't drop the idle WS.
|
||||
MONITOR_HEARTBEAT_S = float(os.getenv("MONITOR_HEARTBEAT_S", "25"))
|
||||
|
||||
|
||||
def _snapshot_payload(snap, unit_id: str, measurement_start_time) -> dict:
|
||||
"""Build the broadcast payload — same shape as the DRD stream, but DOD-sourced
|
||||
so it carries ln1/ln2 (which DRD cannot)."""
|
||||
return {
|
||||
"unit_id": unit_id,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"measurement_state": snap.measurement_state,
|
||||
"measurement_start_time": measurement_start_time,
|
||||
"counter": snap.counter,
|
||||
"lp": snap.lp,
|
||||
"leq": snap.leq,
|
||||
"lmax": snap.lmax,
|
||||
"lmin": snap.lmin,
|
||||
"lpeak": snap.lpeak,
|
||||
"ln1": snap.ln1,
|
||||
"ln2": snap.ln2,
|
||||
"raw_payload": snap.raw_payload,
|
||||
}
|
||||
|
||||
|
||||
class DeviceMonitor:
|
||||
"""Owns a single DOD poll loop for one device and fans each snapshot out to
|
||||
all subscribers. Runs while it has at least one browser subscriber OR the
|
||||
server-side keep-alive (alerting) flag is set."""
|
||||
|
||||
def __init__(self, unit_id: str):
|
||||
self.unit_id = unit_id
|
||||
self._subscribers: Set[asyncio.Queue] = set()
|
||||
self._keepalive = False
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._lock = asyncio.Lock()
|
||||
self._last_payload: Optional[dict] = None # replayed to new subscribers
|
||||
self._consec_fail = 0
|
||||
self._reachable = True # last broadcast reachability (for transition frames)
|
||||
|
||||
@property
|
||||
def running(self) -> bool:
|
||||
return self._task is not None and not self._task.done()
|
||||
|
||||
def subscriber_count(self) -> int:
|
||||
return len(self._subscribers)
|
||||
|
||||
def _has_demand(self) -> bool:
|
||||
return bool(self._subscribers) or self._keepalive
|
||||
|
||||
def _ensure_task(self) -> None:
|
||||
if self._task is None or self._task.done():
|
||||
self._task = asyncio.create_task(self._run())
|
||||
|
||||
async def subscribe(self) -> asyncio.Queue:
|
||||
q: asyncio.Queue = asyncio.Queue(maxsize=5)
|
||||
async with self._lock:
|
||||
self._subscribers.add(q)
|
||||
# Replay the last frame so a client connecting mid-stream sees data
|
||||
# (or the current 'unreachable' state) immediately, not after a poll.
|
||||
if self._last_payload is not None:
|
||||
try:
|
||||
q.put_nowait(self._last_payload)
|
||||
except asyncio.QueueFull:
|
||||
pass
|
||||
self._ensure_task()
|
||||
return q
|
||||
|
||||
async def unsubscribe(self, q: asyncio.Queue) -> None:
|
||||
async with self._lock:
|
||||
self._subscribers.discard(q)
|
||||
|
||||
async def set_keepalive(self, on: bool) -> None:
|
||||
async with self._lock:
|
||||
self._keepalive = on
|
||||
if on:
|
||||
self._ensure_task()
|
||||
|
||||
async def _run(self) -> None:
|
||||
logger.info(f"[MONITOR] {self.unit_id}: feed started")
|
||||
loop = asyncio.get_running_loop()
|
||||
last_send = loop.time()
|
||||
try:
|
||||
while self._has_demand():
|
||||
snap, mst = await self._poll_once()
|
||||
if snap is not None:
|
||||
self._consec_fail = 0
|
||||
self._reachable = True
|
||||
payload = _snapshot_payload(snap, self.unit_id, mst)
|
||||
payload["feed_status"] = "ok"
|
||||
self._broadcast(payload)
|
||||
last_send = loop.time()
|
||||
try:
|
||||
await alert_evaluator.evaluate(self.unit_id, snap)
|
||||
except Exception as e:
|
||||
logger.warning(f"[MONITOR] {self.unit_id}: alert eval failed: {e}")
|
||||
else:
|
||||
# Tell clients the device went offline — once, on transition, after a
|
||||
# few failures so a momentary blip doesn't flap the UI.
|
||||
self._consec_fail += 1
|
||||
if self._reachable and self._consec_fail >= 3:
|
||||
self._reachable = False
|
||||
self._broadcast({
|
||||
"unit_id": self.unit_id,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"feed_status": "unreachable",
|
||||
})
|
||||
last_send = loop.time()
|
||||
|
||||
# Heartbeat: during quiet/offline stretches, send a keepalive so an
|
||||
# idle WS isn't dropped by a reverse proxy. Not cached (new subscribers
|
||||
# should still get the last real frame, not a heartbeat).
|
||||
if loop.time() - last_send >= MONITOR_HEARTBEAT_S:
|
||||
self._broadcast({
|
||||
"unit_id": self.unit_id,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"feed_status": "ok" if self._reachable else "unreachable",
|
||||
"heartbeat": True,
|
||||
}, cache=False)
|
||||
last_send = loop.time()
|
||||
|
||||
await asyncio.sleep(MONITOR_POLL_INTERVAL)
|
||||
finally:
|
||||
logger.info(f"[MONITOR] {self.unit_id}: feed stopped")
|
||||
|
||||
async def _poll_once(self):
|
||||
"""One DOD poll: read, persist, return (snapshot, measurement_start_iso)."""
|
||||
db = SessionLocal()
|
||||
try:
|
||||
cfg = db.query(NL43Config).filter_by(unit_id=self.unit_id).first()
|
||||
if not cfg or not cfg.tcp_enabled:
|
||||
return None, None
|
||||
client = NL43Client(
|
||||
cfg.host, cfg.tcp_port,
|
||||
ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password,
|
||||
ftp_port=cfg.ftp_port or 21,
|
||||
)
|
||||
snap = await client.request_dod()
|
||||
snap.unit_id = self.unit_id
|
||||
persist_snapshot(snap, db)
|
||||
db.commit()
|
||||
status = db.query(NL43Status).filter_by(unit_id=self.unit_id).first()
|
||||
mst = (status.measurement_start_time.isoformat()
|
||||
if status and status.measurement_start_time else None)
|
||||
return snap, mst
|
||||
except Exception as e:
|
||||
logger.warning(f"[MONITOR] {self.unit_id}: poll failed: {e}")
|
||||
return None, None
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
def _broadcast(self, payload: dict, cache: bool = True) -> None:
|
||||
if cache:
|
||||
self._last_payload = payload # replayed to new subscribers
|
||||
for q in list(self._subscribers):
|
||||
try:
|
||||
q.put_nowait(payload)
|
||||
except asyncio.QueueFull:
|
||||
# Slow consumer — drop this frame rather than stall the whole feed.
|
||||
pass
|
||||
|
||||
|
||||
class MonitorManager:
|
||||
"""Registry of per-device monitors (one per unit_id)."""
|
||||
|
||||
def __init__(self):
|
||||
self._monitors: Dict[str, DeviceMonitor] = {}
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
async def get(self, unit_id: str) -> DeviceMonitor:
|
||||
async with self._lock:
|
||||
m = self._monitors.get(unit_id)
|
||||
if m is None:
|
||||
m = DeviceMonitor(unit_id)
|
||||
self._monitors[unit_id] = m
|
||||
return m
|
||||
|
||||
def is_active(self, unit_id: str) -> bool:
|
||||
"""True if this unit has a running monitor feed (so the background poller
|
||||
can skip it — the monitor already polls it more often)."""
|
||||
m = self._monitors.get(unit_id)
|
||||
return m is not None and m.running
|
||||
|
||||
def status(self) -> dict:
|
||||
return {
|
||||
uid: {
|
||||
"running": m.running,
|
||||
"subscribers": m.subscriber_count(),
|
||||
"keepalive": m._keepalive,
|
||||
}
|
||||
for uid, m in self._monitors.items()
|
||||
}
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
monitor_manager = MonitorManager()
|
||||
+315
-1
@@ -11,7 +11,7 @@ import os
|
||||
import asyncio
|
||||
|
||||
from app.database import get_db
|
||||
from app.models import NL43Config, NL43Status
|
||||
from app.models import NL43Config, NL43Status, AlertRule, AlertEvent
|
||||
from app.services import NL43Client, persist_snapshot
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -121,6 +121,310 @@ async def flush_connection_pool():
|
||||
return {"status": "ok", "message": "All cached connections closed"}
|
||||
|
||||
|
||||
@router.post("/{unit_id}/disconnect")
|
||||
async def disconnect_device(unit_id: str, db: Session = Depends(get_db)):
|
||||
"""Cleanly close SLMM's persistent TCP connection to a single device.
|
||||
|
||||
Gracefully closes (TCP FIN + wait_closed) the pooled connection for this
|
||||
device and removes it from the pool, freeing the NL43's single connection
|
||||
slot. Idempotent — a no-op if no connection is currently cached.
|
||||
|
||||
Note: this releases the *idle* pooled connection. It does not interrupt an
|
||||
in-progress DRD stream or an in-flight command (those have the socket
|
||||
checked out of the pool) — close the stream WebSocket to end a live stream.
|
||||
"""
|
||||
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
|
||||
if not cfg:
|
||||
raise HTTPException(status_code=404, detail="NL43 config not found")
|
||||
|
||||
from app.services import _connection_pool
|
||||
|
||||
device_key = f"{cfg.host}:{cfg.tcp_port}"
|
||||
had_conn = device_key in _connection_pool.get_stats().get("connections", {})
|
||||
|
||||
await _connection_pool.discard(device_key)
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"unit_id": unit_id,
|
||||
"device_key": device_key,
|
||||
"disconnected": had_conn,
|
||||
"message": "Connection closed" if had_conn else "No cached connection to close",
|
||||
}
|
||||
|
||||
|
||||
@router.post("/{unit_id}/deactivate")
|
||||
async def deactivate_device(unit_id: str, db: Session = Depends(get_db)):
|
||||
"""Make a single unit dormant: stop background polling for it AND drop its
|
||||
connection, freeing the device's connection slot. poll_enabled=False is
|
||||
persisted, so the unit stays dormant across restarts until /activate.
|
||||
"""
|
||||
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
|
||||
if not cfg:
|
||||
raise HTTPException(status_code=404, detail="NL43 config not found")
|
||||
|
||||
cfg.poll_enabled = False
|
||||
db.commit()
|
||||
|
||||
from app.services import _connection_pool, _get_device_lock
|
||||
|
||||
device_key = f"{cfg.host}:{cfg.tcp_port}"
|
||||
|
||||
# Wait briefly for any in-flight poll/command to finish (so its connection is
|
||||
# back in the pool), then drop it. If a long-lived stream holds the lock we
|
||||
# don't block forever — discard the pooled connection regardless.
|
||||
lock = await _get_device_lock(device_key)
|
||||
acquired = False
|
||||
try:
|
||||
await asyncio.wait_for(lock.acquire(), timeout=10.0)
|
||||
acquired = True
|
||||
except asyncio.TimeoutError:
|
||||
acquired = False
|
||||
try:
|
||||
await _connection_pool.discard(device_key)
|
||||
finally:
|
||||
if acquired:
|
||||
lock.release()
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"unit_id": unit_id,
|
||||
"poll_enabled": False,
|
||||
"message": "Polling disabled and connection closed for this unit",
|
||||
}
|
||||
|
||||
|
||||
@router.post("/{unit_id}/activate")
|
||||
async def activate_device(unit_id: str, db: Session = Depends(get_db)):
|
||||
"""Resume background polling for a unit previously deactivated."""
|
||||
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
|
||||
if not cfg:
|
||||
raise HTTPException(status_code=404, detail="NL43 config not found")
|
||||
|
||||
cfg.poll_enabled = True
|
||||
db.commit()
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"unit_id": unit_id,
|
||||
"poll_enabled": True,
|
||||
"message": "Polling enabled for this unit",
|
||||
}
|
||||
|
||||
|
||||
@router.get("/_system/status")
|
||||
async def system_status():
|
||||
"""Report whether this SLMM instance is actively polling or in standby."""
|
||||
from app.background_poller import poller
|
||||
from app.services import _connection_pool
|
||||
return {
|
||||
"status": "ok",
|
||||
"mode": "active" if poller.is_active() else "standby",
|
||||
"polling_active": poller.is_active(),
|
||||
"active_connections": _connection_pool.get_stats().get("active_connections", 0),
|
||||
}
|
||||
|
||||
|
||||
@router.post("/_system/standby")
|
||||
async def system_standby():
|
||||
"""Put this SLMM instance into standby: stop polling ALL devices and release
|
||||
every connection, so it stops occupying device slots (e.g. so a prod instance
|
||||
can take over). Runtime-only — on restart the instance returns to its
|
||||
SLMM_POLLING_ENABLED default.
|
||||
"""
|
||||
from app.background_poller import poller
|
||||
await poller.set_active(False)
|
||||
return {"status": "ok", "mode": "standby",
|
||||
"message": "Polling stopped and all device connections released"}
|
||||
|
||||
|
||||
@router.post("/_system/resume")
|
||||
async def system_resume():
|
||||
"""Resume polling after standby (global)."""
|
||||
from app.background_poller import poller
|
||||
await poller.set_active(True)
|
||||
return {"status": "ok", "mode": "active", "message": "Polling resumed"}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# LIVE MONITOR (fan-out) — one DOD feed per device, broadcast to many clients
|
||||
# ============================================================================
|
||||
|
||||
@router.websocket("/{unit_id}/monitor")
|
||||
async def monitor_stream(websocket: WebSocket, unit_id: str):
|
||||
"""Subscribe a browser to the device's shared 1 Hz DOD feed.
|
||||
|
||||
Any number of clients can attach without each opening its own device
|
||||
connection (one poll loop per device, fanned out). Same JSON shape as the
|
||||
DRD stream, but DOD-sourced so it includes ln1/ln2 (L1/L10).
|
||||
"""
|
||||
await websocket.accept()
|
||||
from app.monitor import monitor_manager
|
||||
|
||||
monitor = await monitor_manager.get(unit_id)
|
||||
queue = await monitor.subscribe()
|
||||
logger.info(f"Monitor subscriber attached for {unit_id} ({monitor.subscriber_count()} total)")
|
||||
|
||||
async def _watch_disconnect():
|
||||
# Completes when the client disconnects, so an idle feed (no data) still
|
||||
# detects the drop and we don't leak a subscription that keeps the device
|
||||
# feed (and its connection) alive.
|
||||
try:
|
||||
while True:
|
||||
msg = await websocket.receive()
|
||||
if msg.get("type") == "websocket.disconnect":
|
||||
return
|
||||
except Exception:
|
||||
return
|
||||
|
||||
gone = asyncio.ensure_future(_watch_disconnect())
|
||||
try:
|
||||
while not gone.done():
|
||||
try:
|
||||
payload = await asyncio.wait_for(queue.get(), timeout=1.0)
|
||||
except asyncio.TimeoutError:
|
||||
continue # re-check gone.done()
|
||||
await websocket.send_json(payload)
|
||||
except WebSocketDisconnect:
|
||||
logger.info(f"Monitor subscriber disconnected for {unit_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Monitor stream error for {unit_id}: {e}")
|
||||
finally:
|
||||
gone.cancel()
|
||||
await monitor.unsubscribe(queue)
|
||||
|
||||
|
||||
@router.post("/{unit_id}/monitor/start")
|
||||
async def monitor_start(unit_id: str):
|
||||
"""Keep the device's feed running even with no browser attached, so alerting
|
||||
evaluates continuously. Runtime-only (resets on restart)."""
|
||||
from app.monitor import monitor_manager
|
||||
monitor = await monitor_manager.get(unit_id)
|
||||
await monitor.set_keepalive(True)
|
||||
return {"status": "ok", "unit_id": unit_id, "running": monitor.running, "keepalive": True}
|
||||
|
||||
|
||||
@router.post("/{unit_id}/monitor/stop")
|
||||
async def monitor_stop(unit_id: str):
|
||||
"""Drop the keep-alive; the feed stops once no browser subscribers remain."""
|
||||
from app.monitor import monitor_manager
|
||||
monitor = await monitor_manager.get(unit_id)
|
||||
await monitor.set_keepalive(False)
|
||||
return {"status": "ok", "unit_id": unit_id, "keepalive": False}
|
||||
|
||||
|
||||
@router.get("/_monitor/status")
|
||||
async def monitor_status():
|
||||
"""Status of every device monitor (running, subscriber count, keep-alive)."""
|
||||
from app.monitor import monitor_manager
|
||||
return {"status": "ok", "monitors": monitor_manager.status()}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# ALERTS — threshold rules + fired events
|
||||
# ============================================================================
|
||||
|
||||
class AlertRulePayload(BaseModel):
|
||||
name: str = "Alert"
|
||||
metric: str = "lp" # lp/leq/lmax/lmin/lpeak/ln1/ln2
|
||||
comparison: str = "above" # above | below
|
||||
threshold_db: float
|
||||
duration_s: int = 0 # sustained seconds before firing (0 = instant)
|
||||
clear_margin_db: float = 2.0 # hysteresis band
|
||||
cooldown_s: int = 300
|
||||
schedule_start: str | None = None # "HH:MM" local; null = always
|
||||
schedule_end: str | None = None
|
||||
schedule_days: str | None = None # CSV of 0-6 (Mon=0); null = every day
|
||||
channels: str = "log"
|
||||
recipients: str | None = None
|
||||
enabled: bool = True
|
||||
|
||||
|
||||
def _rule_dict(r: AlertRule) -> dict:
|
||||
return {
|
||||
"id": r.id, "unit_id": r.unit_id, "name": r.name, "metric": r.metric,
|
||||
"comparison": r.comparison, "threshold_db": r.threshold_db,
|
||||
"duration_s": r.duration_s, "clear_margin_db": r.clear_margin_db,
|
||||
"cooldown_s": r.cooldown_s, "schedule_start": r.schedule_start,
|
||||
"schedule_end": r.schedule_end, "schedule_days": r.schedule_days,
|
||||
"channels": r.channels, "recipients": r.recipients, "enabled": r.enabled,
|
||||
}
|
||||
|
||||
|
||||
def _event_dict(e: AlertEvent) -> dict:
|
||||
return {
|
||||
"id": e.id, "rule_id": e.rule_id, "unit_id": e.unit_id,
|
||||
"rule_name": e.rule_name, "metric": e.metric, "threshold_db": e.threshold_db,
|
||||
"onset_at": e.onset_at.isoformat() if e.onset_at else None,
|
||||
"onset_value": e.onset_value, "peak_value": e.peak_value,
|
||||
"clear_at": e.clear_at.isoformat() if e.clear_at else None,
|
||||
"status": e.status,
|
||||
"acknowledged_at": e.acknowledged_at.isoformat() if e.acknowledged_at else None,
|
||||
"acknowledged_by": e.acknowledged_by,
|
||||
}
|
||||
|
||||
|
||||
@router.post("/{unit_id}/alerts/rules")
|
||||
def create_alert_rule(unit_id: str, payload: AlertRulePayload, db: Session = Depends(get_db)):
|
||||
rule = AlertRule(unit_id=unit_id, **payload.model_dump())
|
||||
db.add(rule)
|
||||
db.commit()
|
||||
db.refresh(rule)
|
||||
from app.alerts import alert_evaluator
|
||||
alert_evaluator.invalidate(unit_id)
|
||||
return {"status": "ok", "rule": _rule_dict(rule)}
|
||||
|
||||
|
||||
@router.get("/{unit_id}/alerts/rules")
|
||||
def list_alert_rules(unit_id: str, db: Session = Depends(get_db)):
|
||||
rules = db.query(AlertRule).filter_by(unit_id=unit_id).all()
|
||||
return {"status": "ok", "rules": [_rule_dict(r) for r in rules]}
|
||||
|
||||
|
||||
@router.put("/{unit_id}/alerts/rules/{rule_id}")
|
||||
def update_alert_rule(unit_id: str, rule_id: int, payload: AlertRulePayload, db: Session = Depends(get_db)):
|
||||
rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first()
|
||||
if not rule:
|
||||
raise HTTPException(status_code=404, detail="Alert rule not found")
|
||||
for field, value in payload.model_dump().items():
|
||||
setattr(rule, field, value)
|
||||
db.commit()
|
||||
db.refresh(rule)
|
||||
from app.alerts import alert_evaluator
|
||||
alert_evaluator.invalidate(unit_id)
|
||||
return {"status": "ok", "rule": _rule_dict(rule)}
|
||||
|
||||
|
||||
@router.delete("/{unit_id}/alerts/rules/{rule_id}")
|
||||
def delete_alert_rule(unit_id: str, rule_id: int, db: Session = Depends(get_db)):
|
||||
rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first()
|
||||
if not rule:
|
||||
raise HTTPException(status_code=404, detail="Alert rule not found")
|
||||
db.delete(rule)
|
||||
db.commit()
|
||||
from app.alerts import alert_evaluator
|
||||
alert_evaluator.invalidate(unit_id)
|
||||
return {"status": "ok", "deleted": rule_id}
|
||||
|
||||
|
||||
@router.get("/{unit_id}/alerts/events")
|
||||
def list_alert_events(unit_id: str, limit: int = 50, db: Session = Depends(get_db)):
|
||||
events = (db.query(AlertEvent).filter_by(unit_id=unit_id)
|
||||
.order_by(AlertEvent.onset_at.desc()).limit(limit).all())
|
||||
return {"status": "ok", "events": [_event_dict(e) for e in events]}
|
||||
|
||||
|
||||
@router.post("/{unit_id}/alerts/events/{event_id}/ack")
|
||||
def ack_alert_event(unit_id: str, event_id: int, by: str | None = None, db: Session = Depends(get_db)):
|
||||
evt = db.query(AlertEvent).filter_by(id=event_id, unit_id=unit_id).first()
|
||||
if not evt:
|
||||
raise HTTPException(status_code=404, detail="Alert event not found")
|
||||
evt.acknowledged_at = datetime.utcnow()
|
||||
evt.acknowledged_by = by
|
||||
db.commit()
|
||||
return {"status": "ok", "acknowledged": event_id}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes)
|
||||
# ============================================================================
|
||||
@@ -450,6 +754,8 @@ def get_status(unit_id: str, db: Session = Depends(get_db)):
|
||||
"lmax": status.lmax,
|
||||
"lmin": status.lmin,
|
||||
"lpeak": status.lpeak,
|
||||
"ln1": status.ln1,
|
||||
"ln2": status.ln2,
|
||||
"battery_level": status.battery_level,
|
||||
"power_source": status.power_source,
|
||||
"sd_remaining_mb": status.sd_remaining_mb,
|
||||
@@ -472,6 +778,8 @@ class StatusPayload(BaseModel):
|
||||
lmax: str | None = None
|
||||
lmin: str | None = None
|
||||
lpeak: str | None = None
|
||||
ln1: str | None = None
|
||||
ln2: str | None = None
|
||||
battery_level: str | None = None
|
||||
power_source: str | None = None
|
||||
sd_remaining_mb: str | None = None
|
||||
@@ -504,6 +812,8 @@ def upsert_status(unit_id: str, payload: StatusPayload, db: Session = Depends(ge
|
||||
"lmax": status.lmax,
|
||||
"lmin": status.lmin,
|
||||
"lpeak": status.lpeak,
|
||||
"ln1": status.ln1,
|
||||
"ln2": status.ln2,
|
||||
"battery_level": status.battery_level,
|
||||
"power_source": status.power_source,
|
||||
"sd_remaining_mb": status.sd_remaining_mb,
|
||||
@@ -1205,6 +1515,8 @@ async def stream_live(websocket: WebSocket, unit_id: str):
|
||||
"lmax": snap.lmax, # Maximum level
|
||||
"lmin": snap.lmin, # Minimum level
|
||||
"lpeak": snap.lpeak, # Peak level
|
||||
"ln1": snap.ln1, # LN1 percentile (L1/L10 contract); null on DRD stream
|
||||
"ln2": snap.ln2, # LN2 percentile; null on DRD stream
|
||||
"raw_payload": snap.raw_payload,
|
||||
})
|
||||
except Exception as e:
|
||||
@@ -1876,6 +2188,8 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)):
|
||||
"lmax": status.lmax,
|
||||
"lmin": status.lmin,
|
||||
"lpeak": status.lpeak,
|
||||
"ln1": status.ln1,
|
||||
"ln2": status.ln2,
|
||||
"battery_level": status.battery_level,
|
||||
"power_source": status.power_source,
|
||||
"sd_remaining_mb": status.sd_remaining_mb,
|
||||
|
||||
+45
-14
@@ -46,6 +46,8 @@ class NL43Snapshot:
|
||||
lmax: Optional[str] = None # Maximum level
|
||||
lmin: Optional[str] = None # Minimum level
|
||||
lpeak: Optional[str] = None # Peak level
|
||||
ln1: Optional[str] = None # Percentile slot LN1 (configurable; device default L5, contract L1)
|
||||
ln2: Optional[str] = None # Percentile slot LN2 (configurable; device default L10)
|
||||
battery_level: Optional[str] = None
|
||||
power_source: Optional[str] = None
|
||||
sd_remaining_mb: Optional[str] = None
|
||||
@@ -69,10 +71,27 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
|
||||
|
||||
logger.info(f"State transition check for {s.unit_id}: '{previous_state}' -> '{new_state}'")
|
||||
|
||||
# Device returns "Start" when measuring, "Stop" when stopped
|
||||
# Normalize to previous behavior for backward compatibility
|
||||
is_measuring = new_state == "Start"
|
||||
was_measuring = previous_state == "Start"
|
||||
# The device reports "Start" while measuring; the DOD path uses that string,
|
||||
# but the DRD stream path tags snapshots "Measure" (and the DOD fallback also
|
||||
# uses "Measure"). Treat ALL of these as "measuring" — otherwise opening and
|
||||
# closing the live stream flips state "Start"->"Measure"->"Start", which the
|
||||
# old equality check misread as stop-then-start and reset measurement_start_time.
|
||||
#
|
||||
# Also: only act on RECOGNIZED states. A buffer desync on the shared connection
|
||||
# (e.g. right after a DRD/DOD test) can make a Measure? read return a stray,
|
||||
# garbled value; treating that as "not measuring" produced constant phantom
|
||||
# "STOPPED -> STARTED" log pairs and reset the timer. Ignore unknown reads.
|
||||
MEASURING_STATES = {"Start", "Measure"}
|
||||
STOPPED_STATES = {"Stop"}
|
||||
was_measuring = previous_state in MEASURING_STATES
|
||||
|
||||
if new_state in MEASURING_STATES:
|
||||
is_measuring = True
|
||||
elif new_state in STOPPED_STATES:
|
||||
is_measuring = False
|
||||
else:
|
||||
logger.warning(f"Ignoring unrecognized measurement state for {s.unit_id}: {new_state!r}")
|
||||
is_measuring = was_measuring # garbled/unknown read — no transition
|
||||
|
||||
if not was_measuring and is_measuring:
|
||||
# Measurement just started - record the start time
|
||||
@@ -95,6 +114,9 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Only persist a recognized state so one garbled read can't poison the next
|
||||
# transition check (which would manufacture the phantom STOPPED/STARTED pair).
|
||||
if new_state in MEASURING_STATES or new_state in STOPPED_STATES:
|
||||
row.measurement_state = new_state
|
||||
row.counter = s.counter
|
||||
row.lp = s.lp
|
||||
@@ -102,6 +124,8 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
|
||||
row.lmax = s.lmax
|
||||
row.lmin = s.lmin
|
||||
row.lpeak = s.lpeak
|
||||
row.ln1 = s.ln1
|
||||
row.ln2 = s.ln2
|
||||
row.battery_level = s.battery_level
|
||||
row.power_source = s.power_source
|
||||
row.sd_remaining_mb = s.sd_remaining_mb
|
||||
@@ -691,22 +715,29 @@ class NL43Client:
|
||||
|
||||
snap = NL43Snapshot(unit_id="", raw_payload=resp, measurement_state=measurement_state)
|
||||
|
||||
# Parse known positions (based on NL43 communication guide - DRD format)
|
||||
# DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ...
|
||||
# Parse DOD positional fields. DOD's layout is DIFFERENT from DRD: it has NO
|
||||
# leading counter and it includes LE plus LN1–LN5. The device returns 4 channels
|
||||
# of 16 fields each — [Lp, Leq, LE, Lmax, Lmin, LN1, LN2, LN3, LN4, LN5, Lpeak,
|
||||
# LIeq, Leq_mov, Ltm5, over, under] — and channel 1 (parts[0:16]) is the main
|
||||
# display. The previous code reused the DRD map (treating parts[0] as a counter),
|
||||
# which shifted everything: Lp was reported as the counter, Leq as Lp, LE as Leq,
|
||||
# and LN1 as Lpeak (you could spot it because "Lpeak" came out < Lmax).
|
||||
try:
|
||||
# Capture d0 (counter) for timer synchronization
|
||||
if len(parts) >= 1:
|
||||
snap.counter = parts[0] # d0: Measurement interval counter (1-600)
|
||||
snap.lp = parts[0] # Lp: instantaneous sound pressure level
|
||||
if len(parts) >= 2:
|
||||
snap.lp = parts[1] # d1: Instantaneous sound pressure level
|
||||
if len(parts) >= 3:
|
||||
snap.leq = parts[2] # d2: Equivalent continuous sound level
|
||||
snap.leq = parts[1] # Leq: equivalent continuous level
|
||||
# parts[2] = LE (sound exposure level) — not currently surfaced
|
||||
if len(parts) >= 4:
|
||||
snap.lmax = parts[3] # d3: Maximum level
|
||||
snap.lmax = parts[3] # Lmax
|
||||
if len(parts) >= 5:
|
||||
snap.lmin = parts[4] # d4: Minimum level
|
||||
snap.lmin = parts[4] # Lmin
|
||||
if len(parts) >= 11:
|
||||
snap.lpeak = parts[10] # Lpeak (parts[5] is LN1, NOT Lpeak)
|
||||
if len(parts) >= 6:
|
||||
snap.lpeak = parts[5] # d5: Peak level
|
||||
snap.ln1 = parts[5] # LN1 percentile slot (device default L5; contract L1)
|
||||
if len(parts) >= 7:
|
||||
snap.ln2 = parts[6] # LN2 percentile slot (device default L10)
|
||||
except (IndexError, ValueError) as e:
|
||||
logger.warning(f"Error parsing DOD data points: {e}")
|
||||
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Migration script to add ln1 and ln2 percentile columns to the nl43_status table.
|
||||
|
||||
The NL-43 DOD response carries percentile slots LN1-LN5; the live SLM display
|
||||
(Terra-View) shows two of them (default L1/L10). This adds storage for the two
|
||||
surfaced slots. Run once per database to update existing schema.
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
DB_PATH = Path(__file__).parent / "data" / "slmm.db"
|
||||
|
||||
|
||||
def migrate():
|
||||
"""Add ln1 and ln2 columns to the nl43_status table."""
|
||||
|
||||
if not DB_PATH.exists():
|
||||
print(f"Database not found at {DB_PATH}")
|
||||
print("No migration needed - database will be created with new schema")
|
||||
return
|
||||
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute("PRAGMA table_info(nl43_status)")
|
||||
columns = [row[1] for row in cursor.fetchall()]
|
||||
|
||||
if "ln1" in columns and "ln2" in columns:
|
||||
print("✓ ln1/ln2 columns already exist, no migration needed")
|
||||
return
|
||||
|
||||
if "ln1" not in columns:
|
||||
print("Adding ln1 column...")
|
||||
cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln1 TEXT")
|
||||
print("✓ Added ln1 column")
|
||||
|
||||
if "ln2" not in columns:
|
||||
print("Adding ln2 column...")
|
||||
cursor.execute("ALTER TABLE nl43_status ADD COLUMN ln2 TEXT")
|
||||
print("✓ Added ln2 column")
|
||||
|
||||
conn.commit()
|
||||
print("\n✓ Migration completed successfully!")
|
||||
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
print(f"✗ Migration failed: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
migrate()
|
||||
@@ -0,0 +1,68 @@
|
||||
"""
|
||||
Synthetic unit test for the alert state machine — no DB, no device.
|
||||
|
||||
Drives `_evaluate_step` with a fake clock + a level series and checks that
|
||||
onset/clear fire with the right debounce + hysteresis. Run:
|
||||
|
||||
docker compose exec -T slmm python3 test_alert_evaluator.py
|
||||
# or, if app.alerts imports cleanly standalone: python3 test_alert_evaluator.py
|
||||
"""
|
||||
|
||||
from types import SimpleNamespace
|
||||
from app.alerts import RuleState, _evaluate_step
|
||||
|
||||
|
||||
def rule(**kw):
|
||||
base = dict(threshold_db=85.0, duration_s=3, clear_margin_db=2.0, comparison="above")
|
||||
base.update(kw)
|
||||
return SimpleNamespace(**base)
|
||||
|
||||
|
||||
def run(series, r):
|
||||
st = RuleState()
|
||||
events = [(now, a) for value, now in series
|
||||
if (a := _evaluate_step(st, value, now, r))]
|
||||
return events, st
|
||||
|
||||
|
||||
def main():
|
||||
failures = 0
|
||||
|
||||
def check(label, cond, detail=""):
|
||||
nonlocal failures
|
||||
print(("PASS" if cond else "FAIL"), label, detail)
|
||||
if not cond:
|
||||
failures += 1
|
||||
|
||||
# 1) sustained exceedance -> onset after duration; recovery -> clear after duration
|
||||
r = rule(threshold_db=85, duration_s=3, clear_margin_db=2)
|
||||
ev, _ = run([(80, 0), (86, 1), (87, 2), (88, 3), (88, 4),
|
||||
(88, 5), (82, 6), (82, 7), (82, 8), (82, 9)], r)
|
||||
onsets = [t for t, a in ev if a == "onset"]
|
||||
clears = [t for t, a in ev if a == "clear"]
|
||||
check("1 sustained onset@4 / clear@9", onsets == [4] and clears == [9], str(ev))
|
||||
|
||||
# 2) brief spike under duration -> no onset (debounce)
|
||||
ev, _ = run([(80, 0), (90, 1), (90, 2), (80, 3), (80, 4)], rule(duration_s=3))
|
||||
check("2 brief spike debounced", ev == [], str(ev))
|
||||
|
||||
# 3) hysteresis: a dip into the margin (below threshold, above threshold-margin)
|
||||
# does NOT clear
|
||||
r = rule(threshold_db=85, duration_s=0, clear_margin_db=3)
|
||||
ev, st = run([(86, 0), (84, 1), (84, 2), (84, 3)], r)
|
||||
check("3 hysteresis holds ACTIVE", ev == [(0, "onset")] and st.phase == "active",
|
||||
f"{ev} phase={st.phase}")
|
||||
|
||||
# 4) 'below' comparison (device too quiet) -> onset when value < threshold
|
||||
ev, _ = run([(30, 0), (15, 1)], rule(threshold_db=20, duration_s=0,
|
||||
clear_margin_db=2, comparison="below"))
|
||||
check("4 below-comparison onset@1", ev == [(1, "onset")], str(ev))
|
||||
|
||||
print()
|
||||
print("ALL PASS" if failures == 0 else f"{failures} FAILURE(S)")
|
||||
return failures
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
sys.exit(1 if main() else 0)
|
||||
Reference in New Issue
Block a user