From 449e031589ec92415e7956df27c0584aaaa89d88 Mon Sep 17 00:00:00 2001 From: serversdown Date: Wed, 13 May 2026 22:58:34 +0000 Subject: [PATCH] feat(status): use SFM event forwards as primary seismograph last-seen, heartbeat as backup emit_status_snapshot() now consults SFM /db/units (cached 15s) before falling back to Emitter.last_seen for each seismograph. The fresher of the two wins and the choice is recorded in a new per-unit last_seen_source field ("sfm" | "heartbeat" | "none"). sfm_reachable is exposed alongside so the UI can show degraded state. Fallback is transparent: if SFM is unreachable or has no record for a serial, the watcher heartbeat path takes over and the unit just shows the HB badge instead of SFM. No schema changes; SLMs are untouched (they don't go through SFM); modems inherit source from their pair. active_table.html grows a small "SFM" / "HB" badge next to the age column so operators can see at a glance which path is currently driving each unit's status. Co-Authored-By: Claude Opus 4.7 --- backend/services/snapshot.py | 126 +++++++++++++++++++++++++-- templates/partials/active_table.html | 9 +- 2 files changed, 125 insertions(+), 10 deletions(-) diff --git a/backend/services/snapshot.py b/backend/services/snapshot.py index da54f65..f01cef6 100644 --- a/backend/services/snapshot.py +++ b/backend/services/snapshot.py @@ -1,9 +1,77 @@ from datetime import datetime, timezone +import logging +import os +import threading +import time +from typing import Optional + +import httpx from sqlalchemy.orm import Session from backend.database import get_db_session from backend.models import Emitter, RosterUnit, IgnoredUnit +log = logging.getLogger(__name__) + +SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200") + +# Tiny module-level cache: /api/status-snapshot is polled every 10s by the +# dashboard, and we don't want to hammer SFM with one /db/units roundtrip per +# call. 15s TTL keeps the cache mostly hot, with occasional refreshes. +_SFM_CACHE_TTL_SECONDS = 15.0 +_sfm_cache_lock = threading.Lock() +_sfm_cache: dict = {"fetched_at": 0.0, "data": None, "reachable": False} + + +def _parse_sfm_timestamp(ts_str: Optional[str]) -> Optional[datetime]: + """SFM /db/units returns naive ISO timestamps (no tz suffix). Treat them + as UTC, mirroring how the watcher heartbeat stores Emitter.last_seen.""" + if not ts_str: + return None + try: + ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) + except ValueError: + return None + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + return ts + + +def fetch_sfm_unit_last_seen() -> tuple[dict[str, datetime], bool]: + """Return ({serial: last_seen_utc}, sfm_reachable). + + Cached for _SFM_CACHE_TTL_SECONDS. On any HTTP error returns ({}, False) + so callers transparently fall back to the watcher-heartbeat path. + """ + now = time.monotonic() + with _sfm_cache_lock: + if _sfm_cache["data"] is not None and (now - _sfm_cache["fetched_at"]) < _SFM_CACHE_TTL_SECONDS: + return _sfm_cache["data"], _sfm_cache["reachable"] + + data: dict[str, datetime] = {} + reachable = False + try: + with httpx.Client(timeout=4.0) as client: + resp = client.get(f"{SFM_BASE_URL}/db/units") + resp.raise_for_status() + payload = resp.json() or [] + for row in payload: + serial = row.get("serial") + ts = _parse_sfm_timestamp(row.get("last_seen")) + if serial and ts is not None: + data[serial] = ts + reachable = True + except httpx.HTTPError as e: + log.warning("SFM /db/units unreachable for status snapshot: %s", e) + except Exception as e: # noqa: BLE001 — defensive against malformed payload + log.warning("SFM /db/units parse error: %s", e) + + with _sfm_cache_lock: + _sfm_cache["fetched_at"] = now + _sfm_cache["data"] = data + _sfm_cache["reachable"] = reachable + return data, reachable + def ensure_utc(dt): if dt is None: @@ -69,6 +137,11 @@ def emit_status_snapshot(): emitters = {e.id: e for e in db.query(Emitter).all()} ignored = {i.id for i in db.query(IgnoredUnit).all()} + # SFM event-forwards are now the primary "last seen" signal for + # seismographs. Watcher heartbeats stay as a backup — if SFM is down + # or hasn't seen a serial, we fall back to Emitter.last_seen. + sfm_last_seen_map, sfm_reachable = fetch_sfm_unit_last_seen() + units = {} # --- Merge roster entries first --- @@ -93,24 +166,49 @@ def emit_status_snapshot(): last_seen = None fname = "" else: - if e: - last_seen = ensure_utc(e.last_seen) - # RECALCULATE status based on current time, not stored value + device_type = r.device_type or "seismograph" + emitter_last_seen = ensure_utc(e.last_seen) if e else None + fname = e.last_file if e else "" + + # SFM-primary, heartbeat-backup logic — only for seismographs. + # (SLMs / modems aren't forwarded into SFM's events store.) + sfm_last_seen = sfm_last_seen_map.get(unit_id) if device_type == "seismograph" else None + + if sfm_last_seen and emitter_last_seen: + # Both sources reported — use whichever is more recent. + if sfm_last_seen >= emitter_last_seen: + last_seen = sfm_last_seen + last_seen_source = "sfm" + else: + last_seen = emitter_last_seen + last_seen_source = "heartbeat" + elif sfm_last_seen: + last_seen = sfm_last_seen + last_seen_source = "sfm" + elif emitter_last_seen: + last_seen = emitter_last_seen + # If SFM was reachable but doesn't have this serial, it + # means the unit is calling home to the watcher but not + # being forwarded — still a working state for now. + last_seen_source = "heartbeat" + else: + last_seen = None + last_seen_source = "none" + + if last_seen is not None: status = calculate_status(last_seen, status_ok_threshold, status_pending_threshold) age = format_age(last_seen) - fname = e.last_file else: - # Rostered but no emitter data status = "Missing" - last_seen = None age = "N/A" - fname = "" units[unit_id] = { "id": unit_id, "status": status, "age": age, "last": last_seen.isoformat() if last_seen else None, + "last_seen_source": last_seen_source, + "sfm_reachable": sfm_reachable, "fname": fname, "deployed": r.deployed, "note": r.note or "", @@ -136,14 +234,23 @@ def emit_status_snapshot(): # --- Add unexpected emitter-only units --- for unit_id, e in emitters.items(): if unit_id not in roster: - last_seen = ensure_utc(e.last_seen) + emitter_last_seen = ensure_utc(e.last_seen) + sfm_last_seen = sfm_last_seen_map.get(unit_id) + if sfm_last_seen and (not emitter_last_seen or sfm_last_seen >= emitter_last_seen): + last_seen = sfm_last_seen + last_seen_source = "sfm" + else: + last_seen = emitter_last_seen + last_seen_source = "heartbeat" # RECALCULATE status for unknown units too status = calculate_status(last_seen, status_ok_threshold, status_pending_threshold) units[unit_id] = { "id": unit_id, "status": status, "age": format_age(last_seen), - "last": last_seen.isoformat(), + "last": last_seen.isoformat() if last_seen else None, + "last_seen_source": last_seen_source, + "sfm_reachable": sfm_reachable, "fname": e.last_file, "deployed": False, # default "note": "", @@ -192,6 +299,7 @@ def emit_status_snapshot(): unit_data["status"] = paired_unit.get("status", "Missing") unit_data["age"] = paired_unit.get("age", "N/A") unit_data["last"] = paired_unit.get("last") + unit_data["last_seen_source"] = paired_unit.get("last_seen_source", "none") unit_data["derived_from"] = paired_unit_id # Separate buckets for UI diff --git a/templates/partials/active_table.html b/templates/partials/active_table.html index e72a4d5..085766c 100644 --- a/templates/partials/active_table.html +++ b/templates/partials/active_table.html @@ -36,7 +36,14 @@ -
+
+ {% if unit.last_seen_source == 'sfm' %} + SFM + {% elif unit.last_seen_source == 'heartbeat' %} + HB + {% endif %} {{ unit.age }}