Files
terra-view/backend/services/snapshot.py
T
serversdown 449e031589 feat(status): use SFM event forwards as primary seismograph last-seen, heartbeat as backup
emit_status_snapshot() now consults SFM /db/units (cached 15s) before
falling back to Emitter.last_seen for each seismograph. The fresher of
the two wins and the choice is recorded in a new per-unit
last_seen_source field ("sfm" | "heartbeat" | "none"). sfm_reachable is
exposed alongside so the UI can show degraded state.

Fallback is transparent: if SFM is unreachable or has no record for a
serial, the watcher heartbeat path takes over and the unit just shows
the HB badge instead of SFM. No schema changes; SLMs are untouched
(they don't go through SFM); modems inherit source from their pair.

active_table.html grows a small "SFM" / "HB" badge next to the age
column so operators can see at a glance which path is currently
driving each unit's status.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-13 22:58:34 +00:00

362 lines
15 KiB
Python

from datetime import datetime, timezone
import logging
import os
import threading
import time
from typing import Optional
import httpx
from sqlalchemy.orm import Session
from backend.database import get_db_session
from backend.models import Emitter, RosterUnit, IgnoredUnit
log = logging.getLogger(__name__)
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
# Tiny module-level cache: /api/status-snapshot is polled every 10s by the
# dashboard, and we don't want to hammer SFM with one /db/units roundtrip per
# call. 15s TTL keeps the cache mostly hot, with occasional refreshes.
_SFM_CACHE_TTL_SECONDS = 15.0
_sfm_cache_lock = threading.Lock()
_sfm_cache: dict = {"fetched_at": 0.0, "data": None, "reachable": False}
def _parse_sfm_timestamp(ts_str: Optional[str]) -> Optional[datetime]:
"""SFM /db/units returns naive ISO timestamps (no tz suffix). Treat them
as UTC, mirroring how the watcher heartbeat stores Emitter.last_seen."""
if not ts_str:
return None
try:
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
except ValueError:
return None
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
return ts
def fetch_sfm_unit_last_seen() -> tuple[dict[str, datetime], bool]:
"""Return ({serial: last_seen_utc}, sfm_reachable).
Cached for _SFM_CACHE_TTL_SECONDS. On any HTTP error returns ({}, False)
so callers transparently fall back to the watcher-heartbeat path.
"""
now = time.monotonic()
with _sfm_cache_lock:
if _sfm_cache["data"] is not None and (now - _sfm_cache["fetched_at"]) < _SFM_CACHE_TTL_SECONDS:
return _sfm_cache["data"], _sfm_cache["reachable"]
data: dict[str, datetime] = {}
reachable = False
try:
with httpx.Client(timeout=4.0) as client:
resp = client.get(f"{SFM_BASE_URL}/db/units")
resp.raise_for_status()
payload = resp.json() or []
for row in payload:
serial = row.get("serial")
ts = _parse_sfm_timestamp(row.get("last_seen"))
if serial and ts is not None:
data[serial] = ts
reachable = True
except httpx.HTTPError as e:
log.warning("SFM /db/units unreachable for status snapshot: %s", e)
except Exception as e: # noqa: BLE001 — defensive against malformed payload
log.warning("SFM /db/units parse error: %s", e)
with _sfm_cache_lock:
_sfm_cache["fetched_at"] = now
_sfm_cache["data"] = data
_sfm_cache["reachable"] = reachable
return data, reachable
def ensure_utc(dt):
if dt is None:
return None
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
def format_age(last_seen):
if not last_seen:
return "N/A"
last_seen = ensure_utc(last_seen)
now = datetime.now(timezone.utc)
diff = now - last_seen
hours = diff.total_seconds() // 3600
mins = (diff.total_seconds() % 3600) // 60
return f"{int(hours)}h {int(mins)}m"
def calculate_status(last_seen, status_ok_threshold=12, status_pending_threshold=24):
"""
Calculate status based on how long ago the unit was last seen.
Args:
last_seen: datetime of last seen (UTC)
status_ok_threshold: hours before status becomes Pending (default 12)
status_pending_threshold: hours before status becomes Missing (default 24)
Returns:
"OK", "Pending", or "Missing"
"""
if not last_seen:
return "Missing"
last_seen = ensure_utc(last_seen)
now = datetime.now(timezone.utc)
hours_ago = (now - last_seen).total_seconds() / 3600
if hours_ago > status_pending_threshold:
return "Missing"
elif hours_ago > status_ok_threshold:
return "Pending"
else:
return "OK"
def emit_status_snapshot():
"""
Merge roster (what we *intend*) with emitter data (what is *actually happening*).
Status is recalculated based on current time to ensure accuracy.
"""
db = get_db_session()
try:
# Get user preferences for status thresholds
from backend.models import UserPreferences
prefs = db.query(UserPreferences).filter_by(id=1).first()
status_ok_threshold = prefs.status_ok_threshold_hours if prefs else 12
status_pending_threshold = prefs.status_pending_threshold_hours if prefs else 24
roster = {r.id: r for r in db.query(RosterUnit).all()}
emitters = {e.id: e for e in db.query(Emitter).all()}
ignored = {i.id for i in db.query(IgnoredUnit).all()}
# SFM event-forwards are now the primary "last seen" signal for
# seismographs. Watcher heartbeats stay as a backup — if SFM is down
# or hasn't seen a serial, we fall back to Emitter.last_seen.
sfm_last_seen_map, sfm_reachable = fetch_sfm_unit_last_seen()
units = {}
# --- Merge roster entries first ---
for unit_id, r in roster.items():
e = emitters.get(unit_id)
if r.retired:
# Retired units get separated later
status = "Retired"
age = "N/A"
last_seen = None
fname = ""
elif r.out_for_calibration:
# Out for calibration units get separated later
status = "Out for Calibration"
age = "N/A"
last_seen = None
fname = ""
elif getattr(r, 'allocated', False) and not r.deployed:
# Allocated: staged for an upcoming job, not yet physically deployed
status = "Allocated"
age = "N/A"
last_seen = None
fname = ""
else:
device_type = r.device_type or "seismograph"
emitter_last_seen = ensure_utc(e.last_seen) if e else None
fname = e.last_file if e else ""
# SFM-primary, heartbeat-backup logic — only for seismographs.
# (SLMs / modems aren't forwarded into SFM's events store.)
sfm_last_seen = sfm_last_seen_map.get(unit_id) if device_type == "seismograph" else None
if sfm_last_seen and emitter_last_seen:
# Both sources reported — use whichever is more recent.
if sfm_last_seen >= emitter_last_seen:
last_seen = sfm_last_seen
last_seen_source = "sfm"
else:
last_seen = emitter_last_seen
last_seen_source = "heartbeat"
elif sfm_last_seen:
last_seen = sfm_last_seen
last_seen_source = "sfm"
elif emitter_last_seen:
last_seen = emitter_last_seen
# If SFM was reachable but doesn't have this serial, it
# means the unit is calling home to the watcher but not
# being forwarded — still a working state for now.
last_seen_source = "heartbeat"
else:
last_seen = None
last_seen_source = "none"
if last_seen is not None:
status = calculate_status(last_seen, status_ok_threshold, status_pending_threshold)
age = format_age(last_seen)
else:
status = "Missing"
age = "N/A"
units[unit_id] = {
"id": unit_id,
"status": status,
"age": age,
"last": last_seen.isoformat() if last_seen else None,
"last_seen_source": last_seen_source,
"sfm_reachable": sfm_reachable,
"fname": fname,
"deployed": r.deployed,
"note": r.note or "",
"retired": r.retired,
"out_for_calibration": r.out_for_calibration or False,
"allocated": getattr(r, 'allocated', False) or False,
"allocated_to_project_id": getattr(r, 'allocated_to_project_id', None) or "",
# Device type and type-specific fields
"device_type": r.device_type or "seismograph",
"last_calibrated": r.last_calibrated.isoformat() if r.last_calibrated else None,
"next_calibration_due": r.next_calibration_due.isoformat() if r.next_calibration_due else None,
"deployed_with_modem_id": r.deployed_with_modem_id,
"deployed_with_unit_id": r.deployed_with_unit_id,
"ip_address": r.ip_address,
"phone_number": r.phone_number,
"hardware_model": r.hardware_model,
# Location for mapping
"location": r.location or "",
"address": r.address or "",
"coordinates": r.coordinates or "",
}
# --- Add unexpected emitter-only units ---
for unit_id, e in emitters.items():
if unit_id not in roster:
emitter_last_seen = ensure_utc(e.last_seen)
sfm_last_seen = sfm_last_seen_map.get(unit_id)
if sfm_last_seen and (not emitter_last_seen or sfm_last_seen >= emitter_last_seen):
last_seen = sfm_last_seen
last_seen_source = "sfm"
else:
last_seen = emitter_last_seen
last_seen_source = "heartbeat"
# RECALCULATE status for unknown units too
status = calculate_status(last_seen, status_ok_threshold, status_pending_threshold)
units[unit_id] = {
"id": unit_id,
"status": status,
"age": format_age(last_seen),
"last": last_seen.isoformat() if last_seen else None,
"last_seen_source": last_seen_source,
"sfm_reachable": sfm_reachable,
"fname": e.last_file,
"deployed": False, # default
"note": "",
"retired": False,
"out_for_calibration": False,
"allocated": False,
"allocated_to_project_id": "",
# Device type and type-specific fields (defaults for unknown units)
"device_type": "seismograph", # default
"last_calibrated": None,
"next_calibration_due": None,
"deployed_with_modem_id": None,
"deployed_with_unit_id": None,
"ip_address": None,
"phone_number": None,
"hardware_model": None,
# Location fields
"location": "",
"address": "",
"coordinates": "",
}
# --- Derive modem status from paired devices ---
# Modems don't have their own check-in system, so we inherit status
# from whatever device they're paired with (seismograph or SLM)
# Check both directions: modem.deployed_with_unit_id OR device.deployed_with_modem_id
for unit_id, unit_data in units.items():
if unit_data.get("device_type") == "modem" and not unit_data.get("retired"):
paired_unit_id = None
roster_unit = roster.get(unit_id)
# First, check if modem has deployed_with_unit_id set
if roster_unit and roster_unit.deployed_with_unit_id:
paired_unit_id = roster_unit.deployed_with_unit_id
else:
# Fallback: check if any device has this modem in deployed_with_modem_id
for other_id, other_roster in roster.items():
if other_roster.deployed_with_modem_id == unit_id:
paired_unit_id = other_id
break
if paired_unit_id:
paired_unit = units.get(paired_unit_id)
if paired_unit:
# Inherit status from paired device
unit_data["status"] = paired_unit.get("status", "Missing")
unit_data["age"] = paired_unit.get("age", "N/A")
unit_data["last"] = paired_unit.get("last")
unit_data["last_seen_source"] = paired_unit.get("last_seen_source", "none")
unit_data["derived_from"] = paired_unit_id
# Separate buckets for UI
active_units = {
uid: u for uid, u in units.items()
if not u["retired"] and not u["out_for_calibration"] and u["deployed"] and uid not in ignored
}
benched_units = {
uid: u for uid, u in units.items()
if not u["retired"] and not u["out_for_calibration"] and not u["allocated"] and not u["deployed"] and uid not in ignored
}
allocated_units = {
uid: u for uid, u in units.items()
if not u["retired"] and not u["out_for_calibration"] and u["allocated"] and not u["deployed"] and uid not in ignored
}
retired_units = {
uid: u for uid, u in units.items()
if u["retired"]
}
out_for_calibration_units = {
uid: u for uid, u in units.items()
if u["out_for_calibration"]
}
# Unknown units - emitters that aren't in the roster and aren't ignored
unknown_units = {
uid: u for uid, u in units.items()
if uid not in roster and uid not in ignored
}
return {
"timestamp": datetime.utcnow().isoformat(),
"units": units,
"active": active_units,
"benched": benched_units,
"allocated": allocated_units,
"retired": retired_units,
"out_for_calibration": out_for_calibration_units,
"unknown": unknown_units,
"summary": {
"total": len(active_units) + len(benched_units) + len(allocated_units),
"active": len(active_units),
"benched": len(benched_units),
"allocated": len(allocated_units),
"retired": len(retired_units),
"out_for_calibration": len(out_for_calibration_units),
"unknown": len(unknown_units),
# Status counts only for deployed units (active_units)
"ok": sum(1 for u in active_units.values() if u["status"] == "OK"),
"pending": sum(1 for u in active_units.values() if u["status"] == "Pending"),
"missing": sum(1 for u in active_units.values() if u["status"] == "Missing"),
}
}
finally:
db.close()