diff --git a/app/alerts.py b/app/alerts.py index 7c001e2..1fb84e7 100644 --- a/app/alerts.py +++ b/app/alerts.py @@ -1,71 +1,244 @@ """ -Alert evaluation (POC). +Threshold alert engine. -Receives each monitor snapshot and fires an alert when a configured metric -exceeds a threshold, with a cooldown so a sustained loud period doesn't spam. +Each unit can have any number of AlertRules. A rule is evaluated against the +unit's live monitor snapshots via a small per-(unit, rule) state machine: -The RULE here is intentionally simple and swappable. Instantaneous Lp vs a -sustained window vs L10 is still an open design decision — this evaluator is the -single plug point for it. For the POC the rule is "instantaneous metric > -threshold, rate-limited by a cooldown", and dispatch is just a server-side log. -Wire email/SMS (likely via a Terra-View webhook) into _dispatch() later. + IDLE --(metric exceeds threshold for duration_s)--> ACTIVE (fire ONSET) + ACTIVE --(metric recovers past hysteresis for duration_s)--> IDLE (fire CLEAR) -Config via env: -- ALERT_ENABLED (default true) -- ALERT_METRIC which snapshot field to test: lp/leq/lmax/ln1/ln2 (default lp) -- ALERT_THRESHOLD_DB numeric dB threshold (default 85) -- ALERT_COOLDOWN_SECONDS min seconds between alerts per unit (default 60) +duration_s debounces both edges; clear_margin_db adds hysteresis so a level +hovering at the threshold doesn't flap. Onset and clear are distinct events. + +The state-machine logic (`_evaluate_step`) is intentionally pure — no DB, no +real clock — so it can be unit-tested with a synthetic level series and a fake +clock. The AlertEvaluator wraps it with rule loading, scheduling, persistence, +and dispatch. Dispatch is a server log for now (POC); the seam to POST events to +a Terra-View webhook (email/SMS) is _dispatch(). """ import asyncio import logging import os -from typing import Dict +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Tuple logger = logging.getLogger(__name__) +# Local timezone offset for schedule windows (same env var services.py uses). +_TZ_OFFSET_HOURS = float(os.getenv("TIMEZONE_OFFSET", "-5")) + +# How long to cache a unit's rules before re-querying the DB (rules change rarely). +_RULE_CACHE_TTL_S = 15.0 + + +@dataclass +class RuleState: + """In-memory runtime state for one (unit, rule).""" + phase: str = "idle" # "idle" | "active" + edge_since: Optional[float] = None # when the current edge condition began (clock time) + peak: float = 0.0 + event_id: Optional[int] = None # the open AlertEvent row (for the clear update) + + +def _exceeds(value: float, rule) -> bool: + if rule.comparison == "below": + return value < rule.threshold_db + return value > rule.threshold_db + + +def _recovered(value: float, rule) -> bool: + margin = rule.clear_margin_db or 0.0 + if rule.comparison == "below": + return value > rule.threshold_db + margin + return value < rule.threshold_db - margin + + +def _evaluate_step(state: RuleState, value: float, now: float, rule) -> Optional[str]: + """Advance the state machine by one reading. + + Pure: mutates `state`, returns 'onset' | 'clear' | None. `now` is injected so + tests can drive a fake clock. + """ + duration = rule.duration_s or 0 + + if state.phase == "idle": + if _exceeds(value, rule): + if state.edge_since is None: + state.edge_since = now + if now - state.edge_since >= duration: + state.phase = "active" + state.edge_since = None + state.peak = value + return "onset" + else: + state.edge_since = None + return None + + # active + if rule.comparison == "below": + state.peak = min(state.peak, value) + else: + state.peak = max(state.peak, value) + + if _recovered(value, rule): + if state.edge_since is None: + state.edge_since = now + if now - state.edge_since >= duration: + state.phase = "idle" + state.edge_since = None + return "clear" + else: + state.edge_since = None + return None + + +def _in_window(now_minutes: int, start: str, end: str) -> bool: + """Is now_minutes (minutes since local midnight) within [start, end)? + Handles wraparound windows like 22:00–07:00.""" + def _m(s: str) -> int: + h, m = s.split(":") + return int(h) * 60 + int(m) + s, e = _m(start), _m(end) + if s == e: + return True + if s < e: + return s <= now_minutes < e + return now_minutes >= s or now_minutes < e # wraparound + class AlertEvaluator: def __init__(self): - self.enabled = os.getenv("ALERT_ENABLED", "true").lower() == "true" - self.metric = os.getenv("ALERT_METRIC", "lp").lower() - self.threshold_db = float(os.getenv("ALERT_THRESHOLD_DB", "85")) - self.cooldown_s = float(os.getenv("ALERT_COOLDOWN_SECONDS", "60")) - self._last_fired: Dict[str, float] = {} - logger.info( - f"[ALERT] evaluator ready: enabled={self.enabled} metric={self.metric} " - f"threshold={self.threshold_db}dB cooldown={self.cooldown_s}s" - ) + self._states: Dict[Tuple[str, int], RuleState] = {} + self._rule_cache: Dict[str, Tuple[float, list]] = {} # unit_id -> (fetched_at, rules) + logger.info("[ALERT] rule-based evaluator ready") async def evaluate(self, unit_id: str, snap) -> None: - """Evaluate one snapshot; fire (log) if the metric exceeds threshold.""" - if not self.enabled: + """Evaluate every enabled rule for this unit against one snapshot.""" + rules = self._get_rules(unit_id) + if not rules: return - - raw = getattr(snap, self.metric, None) - try: - level = float(raw) - except (TypeError, ValueError): - return # missing / non-numeric (e.g. "-.-") - - if level <= self.threshold_db: - return - - # Cooldown — use the event loop clock (Math.random/Date.now-free). now = asyncio.get_running_loop().time() - if now - self._last_fired.get(unit_id, 0.0) < self.cooldown_s: - return - self._last_fired[unit_id] = now + for rule in rules: + if not self._in_schedule(rule): + continue + raw = getattr(snap, rule.metric, None) + try: + value = float(raw) + except (TypeError, ValueError): + continue # missing / non-numeric ("-.-") + state = self._states.setdefault((unit_id, rule.id), RuleState()) + action = _evaluate_step(state, value, now, rule) + if action == "onset": + await self._on_onset(unit_id, rule, value, state) + elif action == "clear": + await self._on_clear(unit_id, rule, value, state) - await self._dispatch(unit_id, level) + # -- rule loading (cached) ---------------------------------------------- - async def _dispatch(self, unit_id: str, level: float) -> None: - """POC dispatch: server-side log. Swap in email/SMS here later.""" - logger.warning( - f"[ALERT] {unit_id}: {self.metric.upper()}={level:.1f} dB exceeded " - f"threshold {self.threshold_db:.1f} dB" + def _get_rules(self, unit_id: str) -> list: + loop_now = asyncio.get_running_loop().time() + cached = self._rule_cache.get(unit_id) + if cached and loop_now - cached[0] < _RULE_CACHE_TTL_S: + return cached[1] + rules = self._load_rules(unit_id) + self._rule_cache[unit_id] = (loop_now, rules) + return rules + + def _load_rules(self, unit_id: str) -> list: + from app.database import SessionLocal + from app.models import AlertRule + db = SessionLocal() + try: + return db.query(AlertRule).filter_by(unit_id=unit_id, enabled=True).all() + except Exception as e: + logger.warning(f"[ALERT] failed to load rules for {unit_id}: {e}") + return [] + finally: + db.close() + + def invalidate(self, unit_id: Optional[str] = None) -> None: + """Drop cached rules so a change is picked up immediately.""" + if unit_id is None: + self._rule_cache.clear() + else: + self._rule_cache.pop(unit_id, None) + + # -- scheduling ---------------------------------------------------------- + + def _in_schedule(self, rule) -> bool: + if not rule.schedule_start or not rule.schedule_end: + day_ok = self._day_ok(rule) + return day_ok + local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS) + if not self._day_ok(rule, local): + return False + return _in_window(local.hour * 60 + local.minute, rule.schedule_start, rule.schedule_end) + + @staticmethod + def _day_ok(rule, local: Optional[datetime] = None) -> bool: + if not rule.schedule_days: + return True + if local is None: + local = datetime.utcnow() + timedelta(hours=_TZ_OFFSET_HOURS) + allowed = {int(d) for d in str(rule.schedule_days).split(",") if d.strip() != ""} + return local.weekday() in allowed # Mon=0 + + # -- event persistence + dispatch --------------------------------------- + + async def _on_onset(self, unit_id: str, rule, value: float, state: RuleState) -> None: + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + try: + evt = AlertEvent( + rule_id=rule.id, unit_id=unit_id, rule_name=rule.name, + metric=rule.metric, threshold_db=rule.threshold_db, + onset_value=value, peak_value=value, status="active", + ) + db.add(evt) + db.commit() + db.refresh(evt) + state.event_id = evt.id + except Exception as e: + logger.warning(f"[ALERT] failed to record onset for {unit_id}: {e}") + finally: + db.close() + await self._dispatch( + "ONSET", unit_id, rule, + f"{rule.metric.upper()}={value:.1f} dB " + f"{'<' if rule.comparison == 'below' else '>'} {rule.threshold_db:.1f} dB" + f"{f' for {rule.duration_s}s' if rule.duration_s else ''}", ) + async def _on_clear(self, unit_id: str, rule, value: float, state: RuleState) -> None: + peak = state.peak + from app.database import SessionLocal + from app.models import AlertEvent + db = SessionLocal() + try: + if state.event_id is not None: + evt = db.query(AlertEvent).filter_by(id=state.event_id).first() + if evt: + evt.clear_at = datetime.utcnow() + evt.peak_value = peak + evt.status = "cleared" + db.commit() + except Exception as e: + logger.warning(f"[ALERT] failed to record clear for {unit_id}: {e}") + finally: + db.close() + state.event_id = None + await self._dispatch( + "CLEAR", unit_id, rule, + f"recovered to {value:.1f} dB (peak {peak:.1f} dB)", + ) -# Module-level singleton + async def _dispatch(self, kind: str, unit_id: str, rule, detail: str) -> None: + """POC dispatch: server log. Swap in a Terra-View webhook (email/SMS) here.""" + logger.warning(f"[ALERT:{kind}] {unit_id} '{rule.name}': {detail}") + + +# Module-level singleton (the monitor calls alert_evaluator.evaluate per snapshot) alert_evaluator = AlertEvaluator() diff --git a/app/models.py b/app/models.py index 906043f..67e08ed 100644 --- a/app/models.py +++ b/app/models.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, String, DateTime, Boolean, Integer, Text, func +from sqlalchemy import Column, String, DateTime, Boolean, Integer, Float, Text, func from app.database import Base @@ -74,3 +74,53 @@ class DeviceLog(Base): level = Column(String, default="INFO") # DEBUG, INFO, WARNING, ERROR category = Column(String, default="GENERAL") # TCP, FTP, POLL, COMMAND, STATE, SYNC message = Column(Text, nullable=False) + + +class AlertRule(Base): + """A threshold-alert rule evaluated against a unit's live monitor feed. + + Source-agnostic: today it runs over the DOD monitor; the same rule transfers + unchanged if a unit's feed is later sourced from FTP intervals. + """ + + __tablename__ = "alert_rules" + + id = Column(Integer, primary_key=True, autoincrement=True) + unit_id = Column(String, index=True, nullable=False) + name = Column(String, nullable=False, default="Alert") + metric = Column(String, nullable=False, default="lp") # lp/leq/lmax/lmin/lpeak/ln1/ln2 + comparison = Column(String, nullable=False, default="above") # above | below + threshold_db = Column(Float, nullable=False) + duration_s = Column(Integer, nullable=False, default=0) # sustained seconds (0 = instant) + clear_margin_db = Column(Float, nullable=False, default=2.0) # hysteresis band + cooldown_s = Column(Integer, nullable=False, default=300) # min seconds between onsets + # Optional time-of-day scoping (local time). schedule_start/end as "HH:MM"; + # null = always active. schedule_days = CSV of 0-6 (Mon=0); null = every day. + schedule_start = Column(String, nullable=True) + schedule_end = Column(String, nullable=True) + schedule_days = Column(String, nullable=True) + channels = Column(String, nullable=False, default="log") # CSV: log,email,sms + recipients = Column(Text, nullable=True) # CSV of emails/phones + enabled = Column(Boolean, default=True) + created_at = Column(DateTime, default=func.now()) + + +class AlertEvent(Base): + """A fired alert (onset → clear), for history / inbox / acknowledgement.""" + + __tablename__ = "alert_events" + + id = Column(Integer, primary_key=True, autoincrement=True) + rule_id = Column(Integer, index=True, nullable=False) + unit_id = Column(String, index=True, nullable=False) + rule_name = Column(String, nullable=True) + metric = Column(String, nullable=False) + threshold_db = Column(Float, nullable=False) + onset_at = Column(DateTime, default=func.now(), index=True) + onset_value = Column(Float, nullable=True) + peak_value = Column(Float, nullable=True) + clear_at = Column(DateTime, nullable=True) + status = Column(String, default="active") # active | cleared + acknowledged_at = Column(DateTime, nullable=True) + acknowledged_by = Column(String, nullable=True) + notes = Column(Text, nullable=True) diff --git a/app/routers.py b/app/routers.py index 1031d84..f317e15 100644 --- a/app/routers.py +++ b/app/routers.py @@ -11,7 +11,7 @@ import os import asyncio from app.database import get_db -from app.models import NL43Config, NL43Status +from app.models import NL43Config, NL43Status, AlertRule, AlertEvent from app.services import NL43Client, persist_snapshot logger = logging.getLogger(__name__) @@ -320,6 +320,111 @@ async def monitor_status(): return {"status": "ok", "monitors": monitor_manager.status()} +# ============================================================================ +# ALERTS — threshold rules + fired events +# ============================================================================ + +class AlertRulePayload(BaseModel): + name: str = "Alert" + metric: str = "lp" # lp/leq/lmax/lmin/lpeak/ln1/ln2 + comparison: str = "above" # above | below + threshold_db: float + duration_s: int = 0 # sustained seconds before firing (0 = instant) + clear_margin_db: float = 2.0 # hysteresis band + cooldown_s: int = 300 + schedule_start: str | None = None # "HH:MM" local; null = always + schedule_end: str | None = None + schedule_days: str | None = None # CSV of 0-6 (Mon=0); null = every day + channels: str = "log" + recipients: str | None = None + enabled: bool = True + + +def _rule_dict(r: AlertRule) -> dict: + return { + "id": r.id, "unit_id": r.unit_id, "name": r.name, "metric": r.metric, + "comparison": r.comparison, "threshold_db": r.threshold_db, + "duration_s": r.duration_s, "clear_margin_db": r.clear_margin_db, + "cooldown_s": r.cooldown_s, "schedule_start": r.schedule_start, + "schedule_end": r.schedule_end, "schedule_days": r.schedule_days, + "channels": r.channels, "recipients": r.recipients, "enabled": r.enabled, + } + + +def _event_dict(e: AlertEvent) -> dict: + return { + "id": e.id, "rule_id": e.rule_id, "unit_id": e.unit_id, + "rule_name": e.rule_name, "metric": e.metric, "threshold_db": e.threshold_db, + "onset_at": e.onset_at.isoformat() if e.onset_at else None, + "onset_value": e.onset_value, "peak_value": e.peak_value, + "clear_at": e.clear_at.isoformat() if e.clear_at else None, + "status": e.status, + "acknowledged_at": e.acknowledged_at.isoformat() if e.acknowledged_at else None, + "acknowledged_by": e.acknowledged_by, + } + + +@router.post("/{unit_id}/alerts/rules") +def create_alert_rule(unit_id: str, payload: AlertRulePayload, db: Session = Depends(get_db)): + rule = AlertRule(unit_id=unit_id, **payload.model_dump()) + db.add(rule) + db.commit() + db.refresh(rule) + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + return {"status": "ok", "rule": _rule_dict(rule)} + + +@router.get("/{unit_id}/alerts/rules") +def list_alert_rules(unit_id: str, db: Session = Depends(get_db)): + rules = db.query(AlertRule).filter_by(unit_id=unit_id).all() + return {"status": "ok", "rules": [_rule_dict(r) for r in rules]} + + +@router.put("/{unit_id}/alerts/rules/{rule_id}") +def update_alert_rule(unit_id: str, rule_id: int, payload: AlertRulePayload, db: Session = Depends(get_db)): + rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first() + if not rule: + raise HTTPException(status_code=404, detail="Alert rule not found") + for field, value in payload.model_dump().items(): + setattr(rule, field, value) + db.commit() + db.refresh(rule) + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + return {"status": "ok", "rule": _rule_dict(rule)} + + +@router.delete("/{unit_id}/alerts/rules/{rule_id}") +def delete_alert_rule(unit_id: str, rule_id: int, db: Session = Depends(get_db)): + rule = db.query(AlertRule).filter_by(id=rule_id, unit_id=unit_id).first() + if not rule: + raise HTTPException(status_code=404, detail="Alert rule not found") + db.delete(rule) + db.commit() + from app.alerts import alert_evaluator + alert_evaluator.invalidate(unit_id) + return {"status": "ok", "deleted": rule_id} + + +@router.get("/{unit_id}/alerts/events") +def list_alert_events(unit_id: str, limit: int = 50, db: Session = Depends(get_db)): + events = (db.query(AlertEvent).filter_by(unit_id=unit_id) + .order_by(AlertEvent.onset_at.desc()).limit(limit).all()) + return {"status": "ok", "events": [_event_dict(e) for e in events]} + + +@router.post("/{unit_id}/alerts/events/{event_id}/ack") +def ack_alert_event(unit_id: str, event_id: int, by: str | None = None, db: Session = Depends(get_db)): + evt = db.query(AlertEvent).filter_by(id=event_id, unit_id=unit_id).first() + if not evt: + raise HTTPException(status_code=404, detail="Alert event not found") + evt.acknowledged_at = datetime.utcnow() + evt.acknowledged_by = by + db.commit() + return {"status": "ok", "acknowledged": event_id} + + # ============================================================================ # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # ============================================================================ diff --git a/test_alert_evaluator.py b/test_alert_evaluator.py new file mode 100644 index 0000000..5bacc62 --- /dev/null +++ b/test_alert_evaluator.py @@ -0,0 +1,68 @@ +""" +Synthetic unit test for the alert state machine — no DB, no device. + +Drives `_evaluate_step` with a fake clock + a level series and checks that +onset/clear fire with the right debounce + hysteresis. Run: + + docker compose exec -T slmm python3 test_alert_evaluator.py + # or, if app.alerts imports cleanly standalone: python3 test_alert_evaluator.py +""" + +from types import SimpleNamespace +from app.alerts import RuleState, _evaluate_step + + +def rule(**kw): + base = dict(threshold_db=85.0, duration_s=3, clear_margin_db=2.0, comparison="above") + base.update(kw) + return SimpleNamespace(**base) + + +def run(series, r): + st = RuleState() + events = [(now, a) for value, now in series + if (a := _evaluate_step(st, value, now, r))] + return events, st + + +def main(): + failures = 0 + + def check(label, cond, detail=""): + nonlocal failures + print(("PASS" if cond else "FAIL"), label, detail) + if not cond: + failures += 1 + + # 1) sustained exceedance -> onset after duration; recovery -> clear after duration + r = rule(threshold_db=85, duration_s=3, clear_margin_db=2) + ev, _ = run([(80, 0), (86, 1), (87, 2), (88, 3), (88, 4), + (88, 5), (82, 6), (82, 7), (82, 8), (82, 9)], r) + onsets = [t for t, a in ev if a == "onset"] + clears = [t for t, a in ev if a == "clear"] + check("1 sustained onset@4 / clear@9", onsets == [4] and clears == [9], str(ev)) + + # 2) brief spike under duration -> no onset (debounce) + ev, _ = run([(80, 0), (90, 1), (90, 2), (80, 3), (80, 4)], rule(duration_s=3)) + check("2 brief spike debounced", ev == [], str(ev)) + + # 3) hysteresis: a dip into the margin (below threshold, above threshold-margin) + # does NOT clear + r = rule(threshold_db=85, duration_s=0, clear_margin_db=3) + ev, st = run([(86, 0), (84, 1), (84, 2), (84, 3)], r) + check("3 hysteresis holds ACTIVE", ev == [(0, "onset")] and st.phase == "active", + f"{ev} phase={st.phase}") + + # 4) 'below' comparison (device too quiet) -> onset when value < threshold + ev, _ = run([(30, 0), (15, 1)], rule(threshold_db=20, duration_s=0, + clear_margin_db=2, comparison="below")) + check("4 below-comparison onset@1", ev == [(1, "onset")], str(ev)) + + print() + print("ALL PASS" if failures == 0 else f"{failures} FAILURE(S)") + return failures + + +if __name__ == "__main__": + import sys + sys.exit(1 if main() else 0)