Files
terra-view/backend/services/sfm_events.py
serversdown e15481884a feat(nav,stats): Events sidebar entry + 'Overall Peak' excludes false triggers
Two related operator-facing improvements after the nav reorg.

1) Events as a top-level sidebar entry.

The /sfm page (fleet-wide event database) was demoted to Settings →
Developer in the previous reorg.  Bringing it back to main nav as
"Events" — operators do reach for the cross-project, sortable
event list, so it earns a top-level slot.

Sidebar now (7 items):
  Dashboard · Devices · Projects · Events · Tools · Job Planner · Settings

Settings → Developer card pointing at /sfm is removed.  /sfm page
title/subtitle updated from "SFM Event Data" to just "Events".  URL
unchanged.

2) "Peak PVS" KPI tile becomes "Overall Peak" and excludes false
   triggers from the calculation.

When operators ask "what's the biggest event at this location/unit/
project?" they mean the biggest REAL event, not the biggest sensor
glitch.  A single mis-flagged false trigger could otherwise dominate
the tile (the 14.13 in/s spike at Loc 1 was a prime example).

backend/services/sfm_events.py:
- _compute_stats() skips false_trigger=True events when computing
  peak_pvs / peak_pvs_at / peak_pvs_serial.  Continues counting them
  in false_trigger_count so the separate "False Triggers" tile still
  reflects what got filtered out.  last_event unchanged (recency, not
  magnitude).
- Same change automatically propagates to events_for_unit() and
  vibration_summary_for_project() — both call _compute_stats().

Templates: "Peak PVS" → "Overall Peak" in 3 KPI tile locations
(vibration_location_detail.html, partials/projects/vibration_summary
.html, unit_detail.html).  The physical-quantity name "Peak Vector
Sum" in the event-detail modal stays — that's the actual physics
term, not a summary stat.

Verified end-to-end: Overall Peak renders on real data; peak event
false_trigger flag confirmed False.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-13 16:13:37 +00:00

593 lines
21 KiB
Python

"""
SFM events service — bridge between terra-view's UnitAssignment time-windows
and the SFM (seismo-relay) events store.
Architecture:
1. Terra-view owns the *assignment graph*: which seismograph was at which
monitoring location during which time window (UnitAssignment rows).
2. SFM owns the *events store*: triggered waveform events keyed by
(serial, timestamp), forwarded from Blastware ACH by series3-watcher.
3. This module fans out the assignments for a given location, queries SFM
for the events emitted by each (serial, window) pair concurrently, and
unions/sorts/paginates the results.
SFM remains the single source of truth for events. Terra-view does not
copy events into its own DB; every query hits SFM live.
The events_for_location helper is also reused by Phase 3 (project-level
roll-up) to aggregate across every location in a project.
"""
from __future__ import annotations
import asyncio
import logging
import os
from datetime import datetime, timezone
from typing import Optional
import httpx
from sqlalchemy.orm import Session
from backend.models import UnitAssignment, RosterUnit, MonitoringLocation, Project
log = logging.getLogger("backend.services.sfm_events")
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
# Per-request timeout when calling SFM /db/events. SFM is local on the
# docker network so this should be fast; bump if you start seeing timeouts.
_SFM_TIMEOUT_SECONDS = 10.0
# Max events we ever fetch per (serial, window) call to SFM. Must match
# SFM's own /db/events max limit (currently 5000). The user-facing display
# limit is independent — we over-fetch up to this cap so summary stats are
# accurate, then trim the displayed list to the requested limit.
_SFM_FETCH_CEILING = 5000
# ── Helpers ───────────────────────────────────────────────────────────────────
def _iso_utc(dt: Optional[datetime]) -> Optional[str]:
"""Render a datetime in the ISO format SFM /db/events expects."""
if dt is None:
return None
# SFM parses naive ISO strings as UTC; strip tzinfo for consistency.
if dt.tzinfo is not None:
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
return dt.isoformat(sep=" ", timespec="seconds")
def _intersect_window(
assignment_start: datetime,
assignment_end: Optional[datetime],
filter_from: Optional[datetime],
filter_to: Optional[datetime],
now: datetime,
) -> Optional[tuple[datetime, datetime]]:
"""Intersect an assignment window with the requested filter window.
Returns (effective_start, effective_end) or None if there's no overlap.
Open-ended assignments (assigned_until=NULL) are bounded by `now`.
"""
a_end = assignment_end or now
if filter_from and a_end <= filter_from:
return None
if filter_to and assignment_start >= filter_to:
return None
start = max(assignment_start, filter_from) if filter_from else assignment_start
end = min(a_end, filter_to) if filter_to else a_end
if end <= start:
return None
return (start, end)
async def _fetch_events_for_serial(
client: httpx.AsyncClient,
serial: str,
*,
from_dt: datetime,
to_dt: datetime,
false_trigger: Optional[bool],
limit: int,
) -> list[dict]:
"""Issue one /db/events call to SFM for one (serial, window) pair."""
params: dict[str, str] = {
"serial": serial,
"from_dt": _iso_utc(from_dt) or "",
"to_dt": _iso_utc(to_dt) or "",
"limit": str(limit),
}
if false_trigger is not None:
params["false_trigger"] = "true" if false_trigger else "false"
try:
resp = await client.get(f"{SFM_BASE_URL}/db/events", params=params)
resp.raise_for_status()
except httpx.HTTPError as e:
log.warning("SFM /db/events failed for serial=%s: %s", serial, e)
return []
payload = resp.json()
events = payload.get("events", []) or []
# Strip waveform_blob if present — it's the big per-event binary and we
# don't render it in the list view. SFM returns it by default.
for ev in events:
ev.pop("waveform_blob", None)
ev.pop("a5_pickle_filename", None)
return events
# ── Public API ────────────────────────────────────────────────────────────────
async def events_for_location(
db: Session,
location_id: str,
*,
from_dt: Optional[datetime] = None,
to_dt: Optional[datetime] = None,
false_trigger: Optional[bool] = None,
limit: int = 500,
) -> dict:
"""Fan out UnitAssignment rows for `location_id` and union SFM events.
Returns:
{
"events": [merged event dicts, newest first, capped at limit],
"count": total events found across all windows (pre-cap),
"stats": {event_count, peak_pvs, peak_pvs_at,
last_event, false_trigger_count},
"assignments_used": [{unit_id, assigned_at, assigned_until,
events_in_window}, ...],
}
The "events outside any assignment window" rule (Phase 1 design decision):
events whose timestamp falls outside every assignment window are simply
not fetched — we only ask SFM for events inside the intersected windows.
Those orphan events surface under the per-unit detail page in Phase 2.
"""
# 1. Fetch all assignments (active + closed) for the location.
assignments = (
db.query(UnitAssignment)
.filter(UnitAssignment.location_id == location_id)
.filter(UnitAssignment.device_type == "seismograph")
.order_by(UnitAssignment.assigned_at.asc())
.all()
)
if not assignments:
return {
"events": [],
"count": 0,
"stats": _empty_stats(),
"assignments_used": [],
}
now = datetime.utcnow()
# 2. For each assignment, compute the effective (start, end) window after
# intersecting with the requested filter range. Drop assignments that
# don't overlap the filter window.
fetch_specs: list[tuple[UnitAssignment, datetime, datetime]] = []
for a in assignments:
window = _intersect_window(a.assigned_at, a.assigned_until, from_dt, to_dt, now)
if window is not None:
fetch_specs.append((a, window[0], window[1]))
if not fetch_specs:
return {
"events": [],
"count": 0,
"stats": _empty_stats(),
"assignments_used": [
{
"unit_id": a.unit_id,
"assigned_at": _iso_utc(a.assigned_at),
"assigned_until": _iso_utc(a.assigned_until),
"events_in_window": 0,
}
for a in assignments
],
}
# 3. Concurrent SFM fetches. We over-fetch (up to _SFM_FETCH_CEILING per
# window) so summary stats reflect the true peak/last/count across the
# full filter window, not just what fits in the user's display limit.
# The displayed event list is trimmed to `limit` after merge.
async with httpx.AsyncClient(timeout=_SFM_TIMEOUT_SECONDS) as client:
per_window_lists = await asyncio.gather(
*(
_fetch_events_for_serial(
client,
serial=a.unit_id,
from_dt=start,
to_dt=end,
false_trigger=false_trigger,
limit=_SFM_FETCH_CEILING,
)
for a, start, end in fetch_specs
),
return_exceptions=False,
)
# 4. Build the per-assignment event counts (transparency for the operator).
spec_event_counts: dict[str, int] = {}
for (a, _start, _end), evs in zip(fetch_specs, per_window_lists):
spec_event_counts[a.id] = len(evs)
# 5. Union, sort newest-first, cap.
merged: list[dict] = []
for evs in per_window_lists:
merged.extend(evs)
merged.sort(key=lambda e: e.get("timestamp") or "", reverse=True)
total_count = len(merged)
capped = merged[:limit]
# 6. Compute summary stats over the full merged set (not the capped one).
stats = _compute_stats(merged)
# 7. Build the assignments_used report (every assignment, in chronological
# order, with its event count — even ones that fell outside the filter
# window so the operator sees them but with count=0).
assignments_used = []
for a in assignments:
assignments_used.append(
{
"unit_id": a.unit_id,
"assignment_id": a.id,
"assigned_at": _iso_utc(a.assigned_at),
"assigned_until": _iso_utc(a.assigned_until),
"events_in_window": spec_event_counts.get(a.id, 0),
"status": a.status,
}
)
return {
"events": capped,
"count": total_count,
"stats": stats,
"assignments_used": assignments_used,
}
# ── Per-unit (cross-project) view ─────────────────────────────────────────────
async def events_for_unit(
db: Session,
unit_id: str,
*,
bucket: str = "all", # "all" | "attributed" | "unattributed"
from_dt: Optional[datetime] = None,
to_dt: Optional[datetime] = None,
false_trigger: Optional[bool] = None,
limit: int = 500,
) -> dict:
"""Return events for a unit annotated with their assignment attribution.
Unlike events_for_location (which queries SFM per assignment window), this
helper queries SFM for ALL events for the serial within the optional
[from_dt, to_dt] filter, then walks each event against the unit's
UnitAssignment intervals to compute attribution.
Bucket semantics:
- "all": every event, attributed or not
- "attributed": events that fall inside at least one assignment window
- "unattributed": events with no overlapping assignment (the diagnostic
bucket — operator should fix assignment dates to
attribute these)
Each event gets an extra `attribution` field:
{assignment_id, location_id, location_name, project_id, project_name,
assigned_at, assigned_until} or None
Unattributed events also get a `nearest_assignment` field with the
same shape plus `delta_days` (signed; negative = event before assignment).
"""
# 1. Pull all assignments for this unit (any device_type — caller has
# already filtered by seismograph in the route). Order matters: we
# want the earliest-start assignment first so attribution prefers the
# chronologically-first overlap when there are simultaneous active
# assignments at different locations (rare but possible).
assignments = (
db.query(UnitAssignment)
.filter(UnitAssignment.unit_id == unit_id)
.order_by(UnitAssignment.assigned_at.asc())
.all()
)
# Resolve location + project names once.
loc_ids = {a.location_id for a in assignments}
proj_ids = {a.project_id for a in assignments}
loc_map = {
l.id: l for l in db.query(MonitoringLocation).filter(
MonitoringLocation.id.in_(loc_ids)
).all()
} if loc_ids else {}
proj_map = {
p.id: p for p in db.query(Project).filter(
Project.id.in_(proj_ids)
).all()
} if proj_ids else {}
now = datetime.utcnow()
def _attr_dict(a: UnitAssignment) -> dict:
loc = loc_map.get(a.location_id)
proj = proj_map.get(a.project_id)
return {
"assignment_id": a.id,
"location_id": a.location_id,
"location_name": loc.name if loc else None,
"project_id": a.project_id,
"project_name": proj.name if proj else None,
"assigned_at": _iso_utc(a.assigned_at),
"assigned_until": _iso_utc(a.assigned_until),
}
# 2. Fetch all events for this serial in one shot.
async with httpx.AsyncClient(timeout=_SFM_TIMEOUT_SECONDS) as client:
events = await _fetch_events_for_serial(
client,
serial=unit_id,
from_dt=from_dt or datetime(1970, 1, 1),
to_dt=to_dt or now,
false_trigger=false_trigger,
limit=_SFM_FETCH_CEILING,
)
# 3. For each event, walk the assignment list and find the first
# overlapping window. O(N * M) but both are small in practice.
for ev in events:
ts_str = ev.get("timestamp")
if not ts_str:
ev["attribution"] = None
continue
try:
# SFM returns ISO with "T" separator; tolerate both.
ts = datetime.fromisoformat(ts_str.replace(" ", "T"))
except ValueError:
ev["attribution"] = None
continue
matched: Optional[UnitAssignment] = None
for a in assignments:
a_end = a.assigned_until or now
if a.assigned_at <= ts <= a_end:
matched = a
break
if matched is not None:
ev["attribution"] = _attr_dict(matched)
else:
ev["attribution"] = None
# Find the nearest assignment (chronologically) for diagnostic.
if assignments:
nearest = min(
assignments,
key=lambda a: min(
abs((ts - a.assigned_at).total_seconds()),
abs((ts - (a.assigned_until or now)).total_seconds()),
),
)
# Signed delta in days from the nearest boundary
# (negative = event BEFORE that boundary).
if ts < nearest.assigned_at:
delta_seconds = (ts - nearest.assigned_at).total_seconds()
elif ts > (nearest.assigned_until or now):
delta_seconds = (ts - (nearest.assigned_until or now)).total_seconds()
else:
delta_seconds = 0
ev["nearest_assignment"] = {
**_attr_dict(nearest),
"delta_days": round(delta_seconds / 86400, 1),
}
# 4. Apply bucket filter.
if bucket == "attributed":
filtered = [e for e in events if e.get("attribution") is not None]
elif bucket == "unattributed":
filtered = [e for e in events if e.get("attribution") is None]
else:
filtered = events
filtered.sort(key=lambda e: e.get("timestamp") or "", reverse=True)
total_count = len(filtered)
capped = filtered[:limit]
# 5. Stats: compute over the ENTIRE event set (not the filtered bucket)
# so the unattributed_count tile is always meaningful regardless of
# which bucket the operator has selected.
base_stats = _compute_stats(events)
unattributed_count = sum(
1 for e in events if e.get("attribution") is None
)
base_stats["unattributed_count"] = unattributed_count
return {
"events": capped,
"count": total_count,
"stats": base_stats,
"assignments_total": len(assignments),
}
# ── Project-level roll-up (aggregates across all vibration locations) ─────────
async def vibration_summary_for_project(
db: Session,
project_id: str,
*,
from_dt: Optional[datetime] = None,
to_dt: Optional[datetime] = None,
) -> dict:
"""Aggregate SFM events across every vibration location in a project.
Returns:
{
"project_id": str,
"total_events": int,
"peak_pvs": float | None,
"peak_pvs_at": ISO timestamp | None,
"peak_pvs_location_id": str | None,
"peak_pvs_location_name": str | None,
"last_event": ISO timestamp | None,
"false_trigger_count": int,
"per_location": [
{"location_id", "location_name", "event_count",
"peak_pvs", "last_event"},
... # sorted by event_count DESC
],
"vibration_location_count": int,
}
"""
locations = (
db.query(MonitoringLocation)
.filter(MonitoringLocation.project_id == project_id)
.filter(MonitoringLocation.location_type == "vibration")
.all()
)
if not locations:
return {
"project_id": project_id,
"total_events": 0,
"peak_pvs": None,
"peak_pvs_at": None,
"peak_pvs_location_id": None,
"peak_pvs_location_name": None,
"last_event": None,
"false_trigger_count": 0,
"per_location": [],
"vibration_location_count": 0,
}
# Fan out across locations. Each call internally fans out across that
# location's UnitAssignment rows, so this is a nested fan-out. Both
# tiers happen concurrently because asyncio.gather + httpx pool.
results = await asyncio.gather(
*(
events_for_location(
db,
loc.id,
from_dt=from_dt,
to_dt=to_dt,
false_trigger=None,
limit=1, # We only need stats; events list itself is ignored.
)
for loc in locations
),
return_exceptions=False,
)
per_location: list[dict] = []
total_events = 0
peak_pvs = None
peak_pvs_at = None
peak_pvs_location_id = None
peak_pvs_location_name = None
last_event = None
false_trigger_count = 0
for loc, res in zip(locations, results):
st = res.get("stats", {}) or {}
ec = st.get("event_count", 0) or 0
total_events += ec
false_trigger_count += st.get("false_trigger_count", 0) or 0
ev_last = st.get("last_event")
if ev_last and (last_event is None or ev_last > last_event):
last_event = ev_last
ev_peak = st.get("peak_pvs")
if ev_peak is not None and (peak_pvs is None or ev_peak > peak_pvs):
peak_pvs = ev_peak
peak_pvs_at = st.get("peak_pvs_at")
peak_pvs_location_id = loc.id
peak_pvs_location_name = loc.name
per_location.append({
"location_id": loc.id,
"location_name": loc.name,
"event_count": ec,
"peak_pvs": ev_peak,
"last_event": ev_last,
})
per_location.sort(key=lambda r: r["event_count"], reverse=True)
return {
"project_id": project_id,
"total_events": total_events,
"peak_pvs": peak_pvs,
"peak_pvs_at": peak_pvs_at,
"peak_pvs_location_id": peak_pvs_location_id,
"peak_pvs_location_name": peak_pvs_location_name,
"last_event": last_event,
"false_trigger_count": false_trigger_count,
"per_location": per_location,
"vibration_location_count": len(locations),
}
# ── Stats helpers ─────────────────────────────────────────────────────────────
def _empty_stats() -> dict:
return {
"event_count": 0,
"peak_pvs": None,
"peak_pvs_at": None,
"peak_pvs_serial": None,
"last_event": None,
"false_trigger_count": 0,
}
def _compute_stats(events: list[dict]) -> dict:
"""Roll up summary stats from a merged event list. Cheap O(N) pass.
The "Overall Peak" stat (peak_pvs) EXCLUDES events flagged as false
triggers — operators care about the highest REAL event, not the
biggest sensor glitch. false_trigger_count still includes them so
operators can see how many were filtered out. last_event uses
every event regardless (it's about activity recency, not magnitude).
"""
if not events:
return _empty_stats()
peak_pvs = None
peak_pvs_at = None
peak_pvs_serial = None
last_event = None
false_trigger_count = 0
for ev in events:
is_false_trigger = bool(ev.get("false_trigger"))
if is_false_trigger:
false_trigger_count += 1
# Peak calculation: skip flagged false triggers.
if not is_false_trigger:
pvs = ev.get("peak_vector_sum")
if pvs is not None and (peak_pvs is None or pvs > peak_pvs):
peak_pvs = pvs
peak_pvs_at = ev.get("timestamp")
peak_pvs_serial = ev.get("serial")
ts = ev.get("timestamp")
if ts and (last_event is None or ts > last_event):
last_event = ts
return {
"event_count": len(events),
"peak_pvs": peak_pvs,
"peak_pvs_at": peak_pvs_at,
"peak_pvs_serial": peak_pvs_serial,
"last_event": last_event,
"false_trigger_count": false_trigger_count,
}