feat(status): use SFM event forwards as primary seismograph last-seen, heartbeat as backup
emit_status_snapshot() now consults SFM /db/units (cached 15s) before
falling back to Emitter.last_seen for each seismograph. The fresher of
the two wins and the choice is recorded in a new per-unit
last_seen_source field ("sfm" | "heartbeat" | "none"). sfm_reachable is
exposed alongside so the UI can show degraded state.
Fallback is transparent: if SFM is unreachable or has no record for a
serial, the watcher heartbeat path takes over and the unit just shows
the HB badge instead of SFM. No schema changes; SLMs are untouched
(they don't go through SFM); modems inherit source from their pair.
active_table.html grows a small "SFM" / "HB" badge next to the age
column so operators can see at a glance which path is currently
driving each unit's status.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -1,9 +1,77 @@
|
||||
from datetime import datetime, timezone
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from backend.database import get_db_session
|
||||
from backend.models import Emitter, RosterUnit, IgnoredUnit
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
|
||||
|
||||
# Tiny module-level cache: /api/status-snapshot is polled every 10s by the
|
||||
# dashboard, and we don't want to hammer SFM with one /db/units roundtrip per
|
||||
# call. 15s TTL keeps the cache mostly hot, with occasional refreshes.
|
||||
_SFM_CACHE_TTL_SECONDS = 15.0
|
||||
_sfm_cache_lock = threading.Lock()
|
||||
_sfm_cache: dict = {"fetched_at": 0.0, "data": None, "reachable": False}
|
||||
|
||||
|
||||
def _parse_sfm_timestamp(ts_str: Optional[str]) -> Optional[datetime]:
|
||||
"""SFM /db/units returns naive ISO timestamps (no tz suffix). Treat them
|
||||
as UTC, mirroring how the watcher heartbeat stores Emitter.last_seen."""
|
||||
if not ts_str:
|
||||
return None
|
||||
try:
|
||||
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return None
|
||||
if ts.tzinfo is None:
|
||||
ts = ts.replace(tzinfo=timezone.utc)
|
||||
return ts
|
||||
|
||||
|
||||
def fetch_sfm_unit_last_seen() -> tuple[dict[str, datetime], bool]:
|
||||
"""Return ({serial: last_seen_utc}, sfm_reachable).
|
||||
|
||||
Cached for _SFM_CACHE_TTL_SECONDS. On any HTTP error returns ({}, False)
|
||||
so callers transparently fall back to the watcher-heartbeat path.
|
||||
"""
|
||||
now = time.monotonic()
|
||||
with _sfm_cache_lock:
|
||||
if _sfm_cache["data"] is not None and (now - _sfm_cache["fetched_at"]) < _SFM_CACHE_TTL_SECONDS:
|
||||
return _sfm_cache["data"], _sfm_cache["reachable"]
|
||||
|
||||
data: dict[str, datetime] = {}
|
||||
reachable = False
|
||||
try:
|
||||
with httpx.Client(timeout=4.0) as client:
|
||||
resp = client.get(f"{SFM_BASE_URL}/db/units")
|
||||
resp.raise_for_status()
|
||||
payload = resp.json() or []
|
||||
for row in payload:
|
||||
serial = row.get("serial")
|
||||
ts = _parse_sfm_timestamp(row.get("last_seen"))
|
||||
if serial and ts is not None:
|
||||
data[serial] = ts
|
||||
reachable = True
|
||||
except httpx.HTTPError as e:
|
||||
log.warning("SFM /db/units unreachable for status snapshot: %s", e)
|
||||
except Exception as e: # noqa: BLE001 — defensive against malformed payload
|
||||
log.warning("SFM /db/units parse error: %s", e)
|
||||
|
||||
with _sfm_cache_lock:
|
||||
_sfm_cache["fetched_at"] = now
|
||||
_sfm_cache["data"] = data
|
||||
_sfm_cache["reachable"] = reachable
|
||||
return data, reachable
|
||||
|
||||
|
||||
def ensure_utc(dt):
|
||||
if dt is None:
|
||||
@@ -69,6 +137,11 @@ def emit_status_snapshot():
|
||||
emitters = {e.id: e for e in db.query(Emitter).all()}
|
||||
ignored = {i.id for i in db.query(IgnoredUnit).all()}
|
||||
|
||||
# SFM event-forwards are now the primary "last seen" signal for
|
||||
# seismographs. Watcher heartbeats stay as a backup — if SFM is down
|
||||
# or hasn't seen a serial, we fall back to Emitter.last_seen.
|
||||
sfm_last_seen_map, sfm_reachable = fetch_sfm_unit_last_seen()
|
||||
|
||||
units = {}
|
||||
|
||||
# --- Merge roster entries first ---
|
||||
@@ -93,24 +166,49 @@ def emit_status_snapshot():
|
||||
last_seen = None
|
||||
fname = ""
|
||||
else:
|
||||
if e:
|
||||
last_seen = ensure_utc(e.last_seen)
|
||||
# RECALCULATE status based on current time, not stored value
|
||||
device_type = r.device_type or "seismograph"
|
||||
emitter_last_seen = ensure_utc(e.last_seen) if e else None
|
||||
fname = e.last_file if e else ""
|
||||
|
||||
# SFM-primary, heartbeat-backup logic — only for seismographs.
|
||||
# (SLMs / modems aren't forwarded into SFM's events store.)
|
||||
sfm_last_seen = sfm_last_seen_map.get(unit_id) if device_type == "seismograph" else None
|
||||
|
||||
if sfm_last_seen and emitter_last_seen:
|
||||
# Both sources reported — use whichever is more recent.
|
||||
if sfm_last_seen >= emitter_last_seen:
|
||||
last_seen = sfm_last_seen
|
||||
last_seen_source = "sfm"
|
||||
else:
|
||||
last_seen = emitter_last_seen
|
||||
last_seen_source = "heartbeat"
|
||||
elif sfm_last_seen:
|
||||
last_seen = sfm_last_seen
|
||||
last_seen_source = "sfm"
|
||||
elif emitter_last_seen:
|
||||
last_seen = emitter_last_seen
|
||||
# If SFM was reachable but doesn't have this serial, it
|
||||
# means the unit is calling home to the watcher but not
|
||||
# being forwarded — still a working state for now.
|
||||
last_seen_source = "heartbeat"
|
||||
else:
|
||||
last_seen = None
|
||||
last_seen_source = "none"
|
||||
|
||||
if last_seen is not None:
|
||||
status = calculate_status(last_seen, status_ok_threshold, status_pending_threshold)
|
||||
age = format_age(last_seen)
|
||||
fname = e.last_file
|
||||
else:
|
||||
# Rostered but no emitter data
|
||||
status = "Missing"
|
||||
last_seen = None
|
||||
age = "N/A"
|
||||
fname = ""
|
||||
|
||||
units[unit_id] = {
|
||||
"id": unit_id,
|
||||
"status": status,
|
||||
"age": age,
|
||||
"last": last_seen.isoformat() if last_seen else None,
|
||||
"last_seen_source": last_seen_source,
|
||||
"sfm_reachable": sfm_reachable,
|
||||
"fname": fname,
|
||||
"deployed": r.deployed,
|
||||
"note": r.note or "",
|
||||
@@ -136,14 +234,23 @@ def emit_status_snapshot():
|
||||
# --- Add unexpected emitter-only units ---
|
||||
for unit_id, e in emitters.items():
|
||||
if unit_id not in roster:
|
||||
last_seen = ensure_utc(e.last_seen)
|
||||
emitter_last_seen = ensure_utc(e.last_seen)
|
||||
sfm_last_seen = sfm_last_seen_map.get(unit_id)
|
||||
if sfm_last_seen and (not emitter_last_seen or sfm_last_seen >= emitter_last_seen):
|
||||
last_seen = sfm_last_seen
|
||||
last_seen_source = "sfm"
|
||||
else:
|
||||
last_seen = emitter_last_seen
|
||||
last_seen_source = "heartbeat"
|
||||
# RECALCULATE status for unknown units too
|
||||
status = calculate_status(last_seen, status_ok_threshold, status_pending_threshold)
|
||||
units[unit_id] = {
|
||||
"id": unit_id,
|
||||
"status": status,
|
||||
"age": format_age(last_seen),
|
||||
"last": last_seen.isoformat(),
|
||||
"last": last_seen.isoformat() if last_seen else None,
|
||||
"last_seen_source": last_seen_source,
|
||||
"sfm_reachable": sfm_reachable,
|
||||
"fname": e.last_file,
|
||||
"deployed": False, # default
|
||||
"note": "",
|
||||
@@ -192,6 +299,7 @@ def emit_status_snapshot():
|
||||
unit_data["status"] = paired_unit.get("status", "Missing")
|
||||
unit_data["age"] = paired_unit.get("age", "N/A")
|
||||
unit_data["last"] = paired_unit.get("last")
|
||||
unit_data["last_seen_source"] = paired_unit.get("last_seen_source", "none")
|
||||
unit_data["derived_from"] = paired_unit_id
|
||||
|
||||
# Separate buckets for UI
|
||||
|
||||
Reference in New Issue
Block a user