Files
terra-view/backend/services/sfm_events.py
T
serversdown df771a87de feat(sfm): wire SFM events into project-location detail page
Phase 1 of the SFM project/location integration. When viewing a vibration
monitoring location, operators now see the events that were actually
recorded there — fanned out across every seismograph that was ever
assigned to that location (handles mid-project unit swaps).

Backend:
- backend/services/sfm_events.py: new events_for_location() async helper.
  Walks UnitAssignment rows for the location (active + closed), intersects
  each assignment's [assigned_at, assigned_until] window with the requested
  filter, and concurrently queries SFM /db/events for each (serial, window)
  pair via httpx.AsyncClient.  Unions, sorts newest-first, computes summary
  stats (event count, peak PVS + when/who, last event, false-trigger count)
  over the full set, and trims to the user's display limit.  Over-fetches
  per-window (up to 5000) so stats stay accurate even with a small display
  limit.

- backend/routers/project_locations.py: new GET endpoint
  /api/projects/{project_id}/locations/{location_id}/events.  Validates
  project/location pairing (404 on mismatch).  SLM locations return an
  empty payload rather than 404 so the frontend can render gracefully.

Frontend:
- templates/vibration_location_detail.html: new "Events" tab on the
  location detail page.  KPI tiles (total / peak PVS / last event / false
  triggers), "Seismographs deployed at this location" assignment list
  (transparency: shows each assignment's date range and contributed event
  count), date / false-trigger / limit filters, and the paginated event
  table.  Lazy-loaded on first tab visit; manual refresh button.

Architectural notes:
- SFM remains the single source of truth for events.  No event sync; live
  HTTP per page load.
- UnitAssignment is the join key (not MonitoringSession).
- Events whose timestamp falls outside every assignment window are NOT
  surfaced here.  Those orphan events get a dedicated "Unattributed
  events" view on the per-unit detail page in Phase 2.

Out of scope (this commit):
- Phase 2 (per-unit history view) and Phase 3 (project-level roll-up)
  reuse this helper but ship separately.
- Phase 4 (deprecating deployment_records) is independent.
- Extracting the event-table JS to a shared file is a follow-up.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 21:57:14 +00:00

302 lines
11 KiB
Python

"""
SFM events service — bridge between terra-view's UnitAssignment time-windows
and the SFM (seismo-relay) events store.
Architecture:
1. Terra-view owns the *assignment graph*: which seismograph was at which
monitoring location during which time window (UnitAssignment rows).
2. SFM owns the *events store*: triggered waveform events keyed by
(serial, timestamp), forwarded from Blastware ACH by series3-watcher.
3. This module fans out the assignments for a given location, queries SFM
for the events emitted by each (serial, window) pair concurrently, and
unions/sorts/paginates the results.
SFM remains the single source of truth for events. Terra-view does not
copy events into its own DB; every query hits SFM live.
The events_for_location helper is also reused by Phase 3 (project-level
roll-up) to aggregate across every location in a project.
"""
from __future__ import annotations
import asyncio
import logging
import os
from datetime import datetime, timezone
from typing import Optional
import httpx
from sqlalchemy.orm import Session
from backend.models import UnitAssignment, RosterUnit
log = logging.getLogger("backend.services.sfm_events")
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
# Per-request timeout when calling SFM /db/events. SFM is local on the
# docker network so this should be fast; bump if you start seeing timeouts.
_SFM_TIMEOUT_SECONDS = 10.0
# Max events we ever fetch per (serial, window) call to SFM. Must match
# SFM's own /db/events max limit (currently 5000). The user-facing display
# limit is independent — we over-fetch up to this cap so summary stats are
# accurate, then trim the displayed list to the requested limit.
_SFM_FETCH_CEILING = 5000
# ── Helpers ───────────────────────────────────────────────────────────────────
def _iso_utc(dt: Optional[datetime]) -> Optional[str]:
"""Render a datetime in the ISO format SFM /db/events expects."""
if dt is None:
return None
# SFM parses naive ISO strings as UTC; strip tzinfo for consistency.
if dt.tzinfo is not None:
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
return dt.isoformat(sep=" ", timespec="seconds")
def _intersect_window(
assignment_start: datetime,
assignment_end: Optional[datetime],
filter_from: Optional[datetime],
filter_to: Optional[datetime],
now: datetime,
) -> Optional[tuple[datetime, datetime]]:
"""Intersect an assignment window with the requested filter window.
Returns (effective_start, effective_end) or None if there's no overlap.
Open-ended assignments (assigned_until=NULL) are bounded by `now`.
"""
a_end = assignment_end or now
if filter_from and a_end <= filter_from:
return None
if filter_to and assignment_start >= filter_to:
return None
start = max(assignment_start, filter_from) if filter_from else assignment_start
end = min(a_end, filter_to) if filter_to else a_end
if end <= start:
return None
return (start, end)
async def _fetch_events_for_serial(
client: httpx.AsyncClient,
serial: str,
*,
from_dt: datetime,
to_dt: datetime,
false_trigger: Optional[bool],
limit: int,
) -> list[dict]:
"""Issue one /db/events call to SFM for one (serial, window) pair."""
params: dict[str, str] = {
"serial": serial,
"from_dt": _iso_utc(from_dt) or "",
"to_dt": _iso_utc(to_dt) or "",
"limit": str(limit),
}
if false_trigger is not None:
params["false_trigger"] = "true" if false_trigger else "false"
try:
resp = await client.get(f"{SFM_BASE_URL}/db/events", params=params)
resp.raise_for_status()
except httpx.HTTPError as e:
log.warning("SFM /db/events failed for serial=%s: %s", serial, e)
return []
payload = resp.json()
events = payload.get("events", []) or []
# Strip waveform_blob if present — it's the big per-event binary and we
# don't render it in the list view. SFM returns it by default.
for ev in events:
ev.pop("waveform_blob", None)
ev.pop("a5_pickle_filename", None)
return events
# ── Public API ────────────────────────────────────────────────────────────────
async def events_for_location(
db: Session,
location_id: str,
*,
from_dt: Optional[datetime] = None,
to_dt: Optional[datetime] = None,
false_trigger: Optional[bool] = None,
limit: int = 500,
) -> dict:
"""Fan out UnitAssignment rows for `location_id` and union SFM events.
Returns:
{
"events": [merged event dicts, newest first, capped at limit],
"count": total events found across all windows (pre-cap),
"stats": {event_count, peak_pvs, peak_pvs_at,
last_event, false_trigger_count},
"assignments_used": [{unit_id, assigned_at, assigned_until,
events_in_window}, ...],
}
The "events outside any assignment window" rule (Phase 1 design decision):
events whose timestamp falls outside every assignment window are simply
not fetched — we only ask SFM for events inside the intersected windows.
Those orphan events surface under the per-unit detail page in Phase 2.
"""
# 1. Fetch all assignments (active + closed) for the location.
assignments = (
db.query(UnitAssignment)
.filter(UnitAssignment.location_id == location_id)
.filter(UnitAssignment.device_type == "seismograph")
.order_by(UnitAssignment.assigned_at.asc())
.all()
)
if not assignments:
return {
"events": [],
"count": 0,
"stats": _empty_stats(),
"assignments_used": [],
}
now = datetime.utcnow()
# 2. For each assignment, compute the effective (start, end) window after
# intersecting with the requested filter range. Drop assignments that
# don't overlap the filter window.
fetch_specs: list[tuple[UnitAssignment, datetime, datetime]] = []
for a in assignments:
window = _intersect_window(a.assigned_at, a.assigned_until, from_dt, to_dt, now)
if window is not None:
fetch_specs.append((a, window[0], window[1]))
if not fetch_specs:
return {
"events": [],
"count": 0,
"stats": _empty_stats(),
"assignments_used": [
{
"unit_id": a.unit_id,
"assigned_at": _iso_utc(a.assigned_at),
"assigned_until": _iso_utc(a.assigned_until),
"events_in_window": 0,
}
for a in assignments
],
}
# 3. Concurrent SFM fetches. We over-fetch (up to _SFM_FETCH_CEILING per
# window) so summary stats reflect the true peak/last/count across the
# full filter window, not just what fits in the user's display limit.
# The displayed event list is trimmed to `limit` after merge.
async with httpx.AsyncClient(timeout=_SFM_TIMEOUT_SECONDS) as client:
per_window_lists = await asyncio.gather(
*(
_fetch_events_for_serial(
client,
serial=a.unit_id,
from_dt=start,
to_dt=end,
false_trigger=false_trigger,
limit=_SFM_FETCH_CEILING,
)
for a, start, end in fetch_specs
),
return_exceptions=False,
)
# 4. Build the per-assignment event counts (transparency for the operator).
spec_event_counts: dict[str, int] = {}
for (a, _start, _end), evs in zip(fetch_specs, per_window_lists):
spec_event_counts[a.id] = len(evs)
# 5. Union, sort newest-first, cap.
merged: list[dict] = []
for evs in per_window_lists:
merged.extend(evs)
merged.sort(key=lambda e: e.get("timestamp") or "", reverse=True)
total_count = len(merged)
capped = merged[:limit]
# 6. Compute summary stats over the full merged set (not the capped one).
stats = _compute_stats(merged)
# 7. Build the assignments_used report (every assignment, in chronological
# order, with its event count — even ones that fell outside the filter
# window so the operator sees them but with count=0).
assignments_used = []
for a in assignments:
assignments_used.append(
{
"unit_id": a.unit_id,
"assignment_id": a.id,
"assigned_at": _iso_utc(a.assigned_at),
"assigned_until": _iso_utc(a.assigned_until),
"events_in_window": spec_event_counts.get(a.id, 0),
"status": a.status,
}
)
return {
"events": capped,
"count": total_count,
"stats": stats,
"assignments_used": assignments_used,
}
# ── Stats helpers ─────────────────────────────────────────────────────────────
def _empty_stats() -> dict:
return {
"event_count": 0,
"peak_pvs": None,
"peak_pvs_at": None,
"peak_pvs_serial": None,
"last_event": None,
"false_trigger_count": 0,
}
def _compute_stats(events: list[dict]) -> dict:
"""Roll up summary stats from a merged event list. Cheap O(N) pass."""
if not events:
return _empty_stats()
peak_pvs = None
peak_pvs_at = None
peak_pvs_serial = None
last_event = None
false_trigger_count = 0
for ev in events:
pvs = ev.get("peak_vector_sum")
if pvs is not None and (peak_pvs is None or pvs > peak_pvs):
peak_pvs = pvs
peak_pvs_at = ev.get("timestamp")
peak_pvs_serial = ev.get("serial")
ts = ev.get("timestamp")
if ts and (last_event is None or ts > last_event):
last_event = ts
if ev.get("false_trigger"):
false_trigger_count += 1
return {
"event_count": len(events),
"peak_pvs": peak_pvs,
"peak_pvs_at": peak_pvs_at,
"peak_pvs_serial": peak_pvs_serial,
"last_event": last_event,
"false_trigger_count": false_trigger_count,
}