449e031589
emit_status_snapshot() now consults SFM /db/units (cached 15s) before
falling back to Emitter.last_seen for each seismograph. The fresher of
the two wins and the choice is recorded in a new per-unit
last_seen_source field ("sfm" | "heartbeat" | "none"). sfm_reachable is
exposed alongside so the UI can show degraded state.
Fallback is transparent: if SFM is unreachable or has no record for a
serial, the watcher heartbeat path takes over and the unit just shows
the HB badge instead of SFM. No schema changes; SLMs are untouched
(they don't go through SFM); modems inherit source from their pair.
active_table.html grows a small "SFM" / "HB" badge next to the age
column so operators can see at a glance which path is currently
driving each unit's status.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
362 lines
15 KiB
Python
362 lines
15 KiB
Python
from datetime import datetime, timezone
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.database import get_db_session
|
|
from backend.models import Emitter, RosterUnit, IgnoredUnit
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
|
|
|
|
# Tiny module-level cache: /api/status-snapshot is polled every 10s by the
|
|
# dashboard, and we don't want to hammer SFM with one /db/units roundtrip per
|
|
# call. 15s TTL keeps the cache mostly hot, with occasional refreshes.
|
|
_SFM_CACHE_TTL_SECONDS = 15.0
|
|
_sfm_cache_lock = threading.Lock()
|
|
_sfm_cache: dict = {"fetched_at": 0.0, "data": None, "reachable": False}
|
|
|
|
|
|
def _parse_sfm_timestamp(ts_str: Optional[str]) -> Optional[datetime]:
|
|
"""SFM /db/units returns naive ISO timestamps (no tz suffix). Treat them
|
|
as UTC, mirroring how the watcher heartbeat stores Emitter.last_seen."""
|
|
if not ts_str:
|
|
return None
|
|
try:
|
|
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
return None
|
|
if ts.tzinfo is None:
|
|
ts = ts.replace(tzinfo=timezone.utc)
|
|
return ts
|
|
|
|
|
|
def fetch_sfm_unit_last_seen() -> tuple[dict[str, datetime], bool]:
|
|
"""Return ({serial: last_seen_utc}, sfm_reachable).
|
|
|
|
Cached for _SFM_CACHE_TTL_SECONDS. On any HTTP error returns ({}, False)
|
|
so callers transparently fall back to the watcher-heartbeat path.
|
|
"""
|
|
now = time.monotonic()
|
|
with _sfm_cache_lock:
|
|
if _sfm_cache["data"] is not None and (now - _sfm_cache["fetched_at"]) < _SFM_CACHE_TTL_SECONDS:
|
|
return _sfm_cache["data"], _sfm_cache["reachable"]
|
|
|
|
data: dict[str, datetime] = {}
|
|
reachable = False
|
|
try:
|
|
with httpx.Client(timeout=4.0) as client:
|
|
resp = client.get(f"{SFM_BASE_URL}/db/units")
|
|
resp.raise_for_status()
|
|
payload = resp.json() or []
|
|
for row in payload:
|
|
serial = row.get("serial")
|
|
ts = _parse_sfm_timestamp(row.get("last_seen"))
|
|
if serial and ts is not None:
|
|
data[serial] = ts
|
|
reachable = True
|
|
except httpx.HTTPError as e:
|
|
log.warning("SFM /db/units unreachable for status snapshot: %s", e)
|
|
except Exception as e: # noqa: BLE001 — defensive against malformed payload
|
|
log.warning("SFM /db/units parse error: %s", e)
|
|
|
|
with _sfm_cache_lock:
|
|
_sfm_cache["fetched_at"] = now
|
|
_sfm_cache["data"] = data
|
|
_sfm_cache["reachable"] = reachable
|
|
return data, reachable
|
|
|
|
|
|
def ensure_utc(dt):
|
|
if dt is None:
|
|
return None
|
|
if dt.tzinfo is None:
|
|
return dt.replace(tzinfo=timezone.utc)
|
|
return dt.astimezone(timezone.utc)
|
|
|
|
|
|
def format_age(last_seen):
|
|
if not last_seen:
|
|
return "N/A"
|
|
last_seen = ensure_utc(last_seen)
|
|
now = datetime.now(timezone.utc)
|
|
diff = now - last_seen
|
|
hours = diff.total_seconds() // 3600
|
|
mins = (diff.total_seconds() % 3600) // 60
|
|
return f"{int(hours)}h {int(mins)}m"
|
|
|
|
|
|
def calculate_status(last_seen, status_ok_threshold=12, status_pending_threshold=24):
|
|
"""
|
|
Calculate status based on how long ago the unit was last seen.
|
|
|
|
Args:
|
|
last_seen: datetime of last seen (UTC)
|
|
status_ok_threshold: hours before status becomes Pending (default 12)
|
|
status_pending_threshold: hours before status becomes Missing (default 24)
|
|
|
|
Returns:
|
|
"OK", "Pending", or "Missing"
|
|
"""
|
|
if not last_seen:
|
|
return "Missing"
|
|
|
|
last_seen = ensure_utc(last_seen)
|
|
now = datetime.now(timezone.utc)
|
|
hours_ago = (now - last_seen).total_seconds() / 3600
|
|
|
|
if hours_ago > status_pending_threshold:
|
|
return "Missing"
|
|
elif hours_ago > status_ok_threshold:
|
|
return "Pending"
|
|
else:
|
|
return "OK"
|
|
|
|
|
|
def emit_status_snapshot():
|
|
"""
|
|
Merge roster (what we *intend*) with emitter data (what is *actually happening*).
|
|
Status is recalculated based on current time to ensure accuracy.
|
|
"""
|
|
|
|
db = get_db_session()
|
|
try:
|
|
# Get user preferences for status thresholds
|
|
from backend.models import UserPreferences
|
|
prefs = db.query(UserPreferences).filter_by(id=1).first()
|
|
status_ok_threshold = prefs.status_ok_threshold_hours if prefs else 12
|
|
status_pending_threshold = prefs.status_pending_threshold_hours if prefs else 24
|
|
|
|
roster = {r.id: r for r in db.query(RosterUnit).all()}
|
|
emitters = {e.id: e for e in db.query(Emitter).all()}
|
|
ignored = {i.id for i in db.query(IgnoredUnit).all()}
|
|
|
|
# SFM event-forwards are now the primary "last seen" signal for
|
|
# seismographs. Watcher heartbeats stay as a backup — if SFM is down
|
|
# or hasn't seen a serial, we fall back to Emitter.last_seen.
|
|
sfm_last_seen_map, sfm_reachable = fetch_sfm_unit_last_seen()
|
|
|
|
units = {}
|
|
|
|
# --- Merge roster entries first ---
|
|
for unit_id, r in roster.items():
|
|
e = emitters.get(unit_id)
|
|
if r.retired:
|
|
# Retired units get separated later
|
|
status = "Retired"
|
|
age = "N/A"
|
|
last_seen = None
|
|
fname = ""
|
|
elif r.out_for_calibration:
|
|
# Out for calibration units get separated later
|
|
status = "Out for Calibration"
|
|
age = "N/A"
|
|
last_seen = None
|
|
fname = ""
|
|
elif getattr(r, 'allocated', False) and not r.deployed:
|
|
# Allocated: staged for an upcoming job, not yet physically deployed
|
|
status = "Allocated"
|
|
age = "N/A"
|
|
last_seen = None
|
|
fname = ""
|
|
else:
|
|
device_type = r.device_type or "seismograph"
|
|
emitter_last_seen = ensure_utc(e.last_seen) if e else None
|
|
fname = e.last_file if e else ""
|
|
|
|
# SFM-primary, heartbeat-backup logic — only for seismographs.
|
|
# (SLMs / modems aren't forwarded into SFM's events store.)
|
|
sfm_last_seen = sfm_last_seen_map.get(unit_id) if device_type == "seismograph" else None
|
|
|
|
if sfm_last_seen and emitter_last_seen:
|
|
# Both sources reported — use whichever is more recent.
|
|
if sfm_last_seen >= emitter_last_seen:
|
|
last_seen = sfm_last_seen
|
|
last_seen_source = "sfm"
|
|
else:
|
|
last_seen = emitter_last_seen
|
|
last_seen_source = "heartbeat"
|
|
elif sfm_last_seen:
|
|
last_seen = sfm_last_seen
|
|
last_seen_source = "sfm"
|
|
elif emitter_last_seen:
|
|
last_seen = emitter_last_seen
|
|
# If SFM was reachable but doesn't have this serial, it
|
|
# means the unit is calling home to the watcher but not
|
|
# being forwarded — still a working state for now.
|
|
last_seen_source = "heartbeat"
|
|
else:
|
|
last_seen = None
|
|
last_seen_source = "none"
|
|
|
|
if last_seen is not None:
|
|
status = calculate_status(last_seen, status_ok_threshold, status_pending_threshold)
|
|
age = format_age(last_seen)
|
|
else:
|
|
status = "Missing"
|
|
age = "N/A"
|
|
|
|
units[unit_id] = {
|
|
"id": unit_id,
|
|
"status": status,
|
|
"age": age,
|
|
"last": last_seen.isoformat() if last_seen else None,
|
|
"last_seen_source": last_seen_source,
|
|
"sfm_reachable": sfm_reachable,
|
|
"fname": fname,
|
|
"deployed": r.deployed,
|
|
"note": r.note or "",
|
|
"retired": r.retired,
|
|
"out_for_calibration": r.out_for_calibration or False,
|
|
"allocated": getattr(r, 'allocated', False) or False,
|
|
"allocated_to_project_id": getattr(r, 'allocated_to_project_id', None) or "",
|
|
# Device type and type-specific fields
|
|
"device_type": r.device_type or "seismograph",
|
|
"last_calibrated": r.last_calibrated.isoformat() if r.last_calibrated else None,
|
|
"next_calibration_due": r.next_calibration_due.isoformat() if r.next_calibration_due else None,
|
|
"deployed_with_modem_id": r.deployed_with_modem_id,
|
|
"deployed_with_unit_id": r.deployed_with_unit_id,
|
|
"ip_address": r.ip_address,
|
|
"phone_number": r.phone_number,
|
|
"hardware_model": r.hardware_model,
|
|
# Location for mapping
|
|
"location": r.location or "",
|
|
"address": r.address or "",
|
|
"coordinates": r.coordinates or "",
|
|
}
|
|
|
|
# --- Add unexpected emitter-only units ---
|
|
for unit_id, e in emitters.items():
|
|
if unit_id not in roster:
|
|
emitter_last_seen = ensure_utc(e.last_seen)
|
|
sfm_last_seen = sfm_last_seen_map.get(unit_id)
|
|
if sfm_last_seen and (not emitter_last_seen or sfm_last_seen >= emitter_last_seen):
|
|
last_seen = sfm_last_seen
|
|
last_seen_source = "sfm"
|
|
else:
|
|
last_seen = emitter_last_seen
|
|
last_seen_source = "heartbeat"
|
|
# RECALCULATE status for unknown units too
|
|
status = calculate_status(last_seen, status_ok_threshold, status_pending_threshold)
|
|
units[unit_id] = {
|
|
"id": unit_id,
|
|
"status": status,
|
|
"age": format_age(last_seen),
|
|
"last": last_seen.isoformat() if last_seen else None,
|
|
"last_seen_source": last_seen_source,
|
|
"sfm_reachable": sfm_reachable,
|
|
"fname": e.last_file,
|
|
"deployed": False, # default
|
|
"note": "",
|
|
"retired": False,
|
|
"out_for_calibration": False,
|
|
"allocated": False,
|
|
"allocated_to_project_id": "",
|
|
# Device type and type-specific fields (defaults for unknown units)
|
|
"device_type": "seismograph", # default
|
|
"last_calibrated": None,
|
|
"next_calibration_due": None,
|
|
"deployed_with_modem_id": None,
|
|
"deployed_with_unit_id": None,
|
|
"ip_address": None,
|
|
"phone_number": None,
|
|
"hardware_model": None,
|
|
# Location fields
|
|
"location": "",
|
|
"address": "",
|
|
"coordinates": "",
|
|
}
|
|
|
|
# --- Derive modem status from paired devices ---
|
|
# Modems don't have their own check-in system, so we inherit status
|
|
# from whatever device they're paired with (seismograph or SLM)
|
|
# Check both directions: modem.deployed_with_unit_id OR device.deployed_with_modem_id
|
|
for unit_id, unit_data in units.items():
|
|
if unit_data.get("device_type") == "modem" and not unit_data.get("retired"):
|
|
paired_unit_id = None
|
|
roster_unit = roster.get(unit_id)
|
|
|
|
# First, check if modem has deployed_with_unit_id set
|
|
if roster_unit and roster_unit.deployed_with_unit_id:
|
|
paired_unit_id = roster_unit.deployed_with_unit_id
|
|
else:
|
|
# Fallback: check if any device has this modem in deployed_with_modem_id
|
|
for other_id, other_roster in roster.items():
|
|
if other_roster.deployed_with_modem_id == unit_id:
|
|
paired_unit_id = other_id
|
|
break
|
|
|
|
if paired_unit_id:
|
|
paired_unit = units.get(paired_unit_id)
|
|
if paired_unit:
|
|
# Inherit status from paired device
|
|
unit_data["status"] = paired_unit.get("status", "Missing")
|
|
unit_data["age"] = paired_unit.get("age", "N/A")
|
|
unit_data["last"] = paired_unit.get("last")
|
|
unit_data["last_seen_source"] = paired_unit.get("last_seen_source", "none")
|
|
unit_data["derived_from"] = paired_unit_id
|
|
|
|
# Separate buckets for UI
|
|
active_units = {
|
|
uid: u for uid, u in units.items()
|
|
if not u["retired"] and not u["out_for_calibration"] and u["deployed"] and uid not in ignored
|
|
}
|
|
|
|
benched_units = {
|
|
uid: u for uid, u in units.items()
|
|
if not u["retired"] and not u["out_for_calibration"] and not u["allocated"] and not u["deployed"] and uid not in ignored
|
|
}
|
|
|
|
allocated_units = {
|
|
uid: u for uid, u in units.items()
|
|
if not u["retired"] and not u["out_for_calibration"] and u["allocated"] and not u["deployed"] and uid not in ignored
|
|
}
|
|
|
|
retired_units = {
|
|
uid: u for uid, u in units.items()
|
|
if u["retired"]
|
|
}
|
|
|
|
out_for_calibration_units = {
|
|
uid: u for uid, u in units.items()
|
|
if u["out_for_calibration"]
|
|
}
|
|
|
|
# Unknown units - emitters that aren't in the roster and aren't ignored
|
|
unknown_units = {
|
|
uid: u for uid, u in units.items()
|
|
if uid not in roster and uid not in ignored
|
|
}
|
|
|
|
return {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"units": units,
|
|
"active": active_units,
|
|
"benched": benched_units,
|
|
"allocated": allocated_units,
|
|
"retired": retired_units,
|
|
"out_for_calibration": out_for_calibration_units,
|
|
"unknown": unknown_units,
|
|
"summary": {
|
|
"total": len(active_units) + len(benched_units) + len(allocated_units),
|
|
"active": len(active_units),
|
|
"benched": len(benched_units),
|
|
"allocated": len(allocated_units),
|
|
"retired": len(retired_units),
|
|
"out_for_calibration": len(out_for_calibration_units),
|
|
"unknown": len(unknown_units),
|
|
# Status counts only for deployed units (active_units)
|
|
"ok": sum(1 for u in active_units.values() if u["status"] == "OK"),
|
|
"pending": sum(1 for u in active_units.values() if u["status"] == "Pending"),
|
|
"missing": sum(1 for u in active_units.values() if u["status"] == "Missing"),
|
|
}
|
|
}
|
|
finally:
|
|
db.close()
|