63bd6ad8a2
Phase 3 of the SFM integration. Adds a "Project-wide vibration events"
KPI card to the Vibration tab of every project detail page, summarising
event activity across all of that project's vibration MonitoringLocations.
Backend:
- backend/services/sfm_events.py: vibration_summary_for_project() helper.
Concurrently fans out events_for_location() across every vibration
location in the project; aggregates total events, peak PVS (with the
location it occurred at), last-event timestamp, false-trigger count;
and produces a per-location breakdown sorted by event count.
- backend/routers/project_locations.py: new GET /api/projects/{p}/
vibration_summary endpoint returning an HTML partial (HTMX-friendly,
matches the locations-list HTMX pattern already used on this page).
Frontend:
- templates/partials/projects/vibration_summary.html: new partial with
four KPI tiles (total, peak PVS + linked location + date, last event,
false triggers) and a "Top locations by activity" mini-list showing
the top 5 by event count. Empty-state copy when the project has no
vibration locations yet.
- templates/projects/detail.html: HTMX-load the new summary above the
locations list inside the Vibration tab.
Verified against terra-view-alpha: 24 events across "Loc 1 - 78 poop
street", peak PVS 14.1351 in/s.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
583 lines
21 KiB
Python
583 lines
21 KiB
Python
"""
|
|
SFM events service — bridge between terra-view's UnitAssignment time-windows
|
|
and the SFM (seismo-relay) events store.
|
|
|
|
Architecture:
|
|
1. Terra-view owns the *assignment graph*: which seismograph was at which
|
|
monitoring location during which time window (UnitAssignment rows).
|
|
2. SFM owns the *events store*: triggered waveform events keyed by
|
|
(serial, timestamp), forwarded from Blastware ACH by series3-watcher.
|
|
3. This module fans out the assignments for a given location, queries SFM
|
|
for the events emitted by each (serial, window) pair concurrently, and
|
|
unions/sorts/paginates the results.
|
|
|
|
SFM remains the single source of truth for events. Terra-view does not
|
|
copy events into its own DB; every query hits SFM live.
|
|
|
|
The events_for_location helper is also reused by Phase 3 (project-level
|
|
roll-up) to aggregate across every location in a project.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.models import UnitAssignment, RosterUnit, MonitoringLocation, Project
|
|
|
|
log = logging.getLogger("backend.services.sfm_events")
|
|
|
|
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
|
|
|
|
# Per-request timeout when calling SFM /db/events. SFM is local on the
|
|
# docker network so this should be fast; bump if you start seeing timeouts.
|
|
_SFM_TIMEOUT_SECONDS = 10.0
|
|
|
|
# Max events we ever fetch per (serial, window) call to SFM. Must match
|
|
# SFM's own /db/events max limit (currently 5000). The user-facing display
|
|
# limit is independent — we over-fetch up to this cap so summary stats are
|
|
# accurate, then trim the displayed list to the requested limit.
|
|
_SFM_FETCH_CEILING = 5000
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _iso_utc(dt: Optional[datetime]) -> Optional[str]:
|
|
"""Render a datetime in the ISO format SFM /db/events expects."""
|
|
if dt is None:
|
|
return None
|
|
# SFM parses naive ISO strings as UTC; strip tzinfo for consistency.
|
|
if dt.tzinfo is not None:
|
|
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
|
|
return dt.isoformat(sep=" ", timespec="seconds")
|
|
|
|
|
|
def _intersect_window(
|
|
assignment_start: datetime,
|
|
assignment_end: Optional[datetime],
|
|
filter_from: Optional[datetime],
|
|
filter_to: Optional[datetime],
|
|
now: datetime,
|
|
) -> Optional[tuple[datetime, datetime]]:
|
|
"""Intersect an assignment window with the requested filter window.
|
|
|
|
Returns (effective_start, effective_end) or None if there's no overlap.
|
|
Open-ended assignments (assigned_until=NULL) are bounded by `now`.
|
|
"""
|
|
a_end = assignment_end or now
|
|
if filter_from and a_end <= filter_from:
|
|
return None
|
|
if filter_to and assignment_start >= filter_to:
|
|
return None
|
|
start = max(assignment_start, filter_from) if filter_from else assignment_start
|
|
end = min(a_end, filter_to) if filter_to else a_end
|
|
if end <= start:
|
|
return None
|
|
return (start, end)
|
|
|
|
|
|
async def _fetch_events_for_serial(
|
|
client: httpx.AsyncClient,
|
|
serial: str,
|
|
*,
|
|
from_dt: datetime,
|
|
to_dt: datetime,
|
|
false_trigger: Optional[bool],
|
|
limit: int,
|
|
) -> list[dict]:
|
|
"""Issue one /db/events call to SFM for one (serial, window) pair."""
|
|
params: dict[str, str] = {
|
|
"serial": serial,
|
|
"from_dt": _iso_utc(from_dt) or "",
|
|
"to_dt": _iso_utc(to_dt) or "",
|
|
"limit": str(limit),
|
|
}
|
|
if false_trigger is not None:
|
|
params["false_trigger"] = "true" if false_trigger else "false"
|
|
|
|
try:
|
|
resp = await client.get(f"{SFM_BASE_URL}/db/events", params=params)
|
|
resp.raise_for_status()
|
|
except httpx.HTTPError as e:
|
|
log.warning("SFM /db/events failed for serial=%s: %s", serial, e)
|
|
return []
|
|
|
|
payload = resp.json()
|
|
events = payload.get("events", []) or []
|
|
# Strip waveform_blob if present — it's the big per-event binary and we
|
|
# don't render it in the list view. SFM returns it by default.
|
|
for ev in events:
|
|
ev.pop("waveform_blob", None)
|
|
ev.pop("a5_pickle_filename", None)
|
|
return events
|
|
|
|
|
|
# ── Public API ────────────────────────────────────────────────────────────────
|
|
|
|
|
|
async def events_for_location(
|
|
db: Session,
|
|
location_id: str,
|
|
*,
|
|
from_dt: Optional[datetime] = None,
|
|
to_dt: Optional[datetime] = None,
|
|
false_trigger: Optional[bool] = None,
|
|
limit: int = 500,
|
|
) -> dict:
|
|
"""Fan out UnitAssignment rows for `location_id` and union SFM events.
|
|
|
|
Returns:
|
|
{
|
|
"events": [merged event dicts, newest first, capped at limit],
|
|
"count": total events found across all windows (pre-cap),
|
|
"stats": {event_count, peak_pvs, peak_pvs_at,
|
|
last_event, false_trigger_count},
|
|
"assignments_used": [{unit_id, assigned_at, assigned_until,
|
|
events_in_window}, ...],
|
|
}
|
|
|
|
The "events outside any assignment window" rule (Phase 1 design decision):
|
|
events whose timestamp falls outside every assignment window are simply
|
|
not fetched — we only ask SFM for events inside the intersected windows.
|
|
Those orphan events surface under the per-unit detail page in Phase 2.
|
|
"""
|
|
# 1. Fetch all assignments (active + closed) for the location.
|
|
assignments = (
|
|
db.query(UnitAssignment)
|
|
.filter(UnitAssignment.location_id == location_id)
|
|
.filter(UnitAssignment.device_type == "seismograph")
|
|
.order_by(UnitAssignment.assigned_at.asc())
|
|
.all()
|
|
)
|
|
|
|
if not assignments:
|
|
return {
|
|
"events": [],
|
|
"count": 0,
|
|
"stats": _empty_stats(),
|
|
"assignments_used": [],
|
|
}
|
|
|
|
now = datetime.utcnow()
|
|
|
|
# 2. For each assignment, compute the effective (start, end) window after
|
|
# intersecting with the requested filter range. Drop assignments that
|
|
# don't overlap the filter window.
|
|
fetch_specs: list[tuple[UnitAssignment, datetime, datetime]] = []
|
|
for a in assignments:
|
|
window = _intersect_window(a.assigned_at, a.assigned_until, from_dt, to_dt, now)
|
|
if window is not None:
|
|
fetch_specs.append((a, window[0], window[1]))
|
|
|
|
if not fetch_specs:
|
|
return {
|
|
"events": [],
|
|
"count": 0,
|
|
"stats": _empty_stats(),
|
|
"assignments_used": [
|
|
{
|
|
"unit_id": a.unit_id,
|
|
"assigned_at": _iso_utc(a.assigned_at),
|
|
"assigned_until": _iso_utc(a.assigned_until),
|
|
"events_in_window": 0,
|
|
}
|
|
for a in assignments
|
|
],
|
|
}
|
|
|
|
# 3. Concurrent SFM fetches. We over-fetch (up to _SFM_FETCH_CEILING per
|
|
# window) so summary stats reflect the true peak/last/count across the
|
|
# full filter window, not just what fits in the user's display limit.
|
|
# The displayed event list is trimmed to `limit` after merge.
|
|
async with httpx.AsyncClient(timeout=_SFM_TIMEOUT_SECONDS) as client:
|
|
per_window_lists = await asyncio.gather(
|
|
*(
|
|
_fetch_events_for_serial(
|
|
client,
|
|
serial=a.unit_id,
|
|
from_dt=start,
|
|
to_dt=end,
|
|
false_trigger=false_trigger,
|
|
limit=_SFM_FETCH_CEILING,
|
|
)
|
|
for a, start, end in fetch_specs
|
|
),
|
|
return_exceptions=False,
|
|
)
|
|
|
|
# 4. Build the per-assignment event counts (transparency for the operator).
|
|
spec_event_counts: dict[str, int] = {}
|
|
for (a, _start, _end), evs in zip(fetch_specs, per_window_lists):
|
|
spec_event_counts[a.id] = len(evs)
|
|
|
|
# 5. Union, sort newest-first, cap.
|
|
merged: list[dict] = []
|
|
for evs in per_window_lists:
|
|
merged.extend(evs)
|
|
merged.sort(key=lambda e: e.get("timestamp") or "", reverse=True)
|
|
total_count = len(merged)
|
|
capped = merged[:limit]
|
|
|
|
# 6. Compute summary stats over the full merged set (not the capped one).
|
|
stats = _compute_stats(merged)
|
|
|
|
# 7. Build the assignments_used report (every assignment, in chronological
|
|
# order, with its event count — even ones that fell outside the filter
|
|
# window so the operator sees them but with count=0).
|
|
assignments_used = []
|
|
for a in assignments:
|
|
assignments_used.append(
|
|
{
|
|
"unit_id": a.unit_id,
|
|
"assignment_id": a.id,
|
|
"assigned_at": _iso_utc(a.assigned_at),
|
|
"assigned_until": _iso_utc(a.assigned_until),
|
|
"events_in_window": spec_event_counts.get(a.id, 0),
|
|
"status": a.status,
|
|
}
|
|
)
|
|
|
|
return {
|
|
"events": capped,
|
|
"count": total_count,
|
|
"stats": stats,
|
|
"assignments_used": assignments_used,
|
|
}
|
|
|
|
|
|
# ── Per-unit (cross-project) view ─────────────────────────────────────────────
|
|
|
|
|
|
async def events_for_unit(
|
|
db: Session,
|
|
unit_id: str,
|
|
*,
|
|
bucket: str = "all", # "all" | "attributed" | "unattributed"
|
|
from_dt: Optional[datetime] = None,
|
|
to_dt: Optional[datetime] = None,
|
|
false_trigger: Optional[bool] = None,
|
|
limit: int = 500,
|
|
) -> dict:
|
|
"""Return events for a unit annotated with their assignment attribution.
|
|
|
|
Unlike events_for_location (which queries SFM per assignment window), this
|
|
helper queries SFM for ALL events for the serial within the optional
|
|
[from_dt, to_dt] filter, then walks each event against the unit's
|
|
UnitAssignment intervals to compute attribution.
|
|
|
|
Bucket semantics:
|
|
- "all": every event, attributed or not
|
|
- "attributed": events that fall inside at least one assignment window
|
|
- "unattributed": events with no overlapping assignment (the diagnostic
|
|
bucket — operator should fix assignment dates to
|
|
attribute these)
|
|
|
|
Each event gets an extra `attribution` field:
|
|
{assignment_id, location_id, location_name, project_id, project_name,
|
|
assigned_at, assigned_until} or None
|
|
|
|
Unattributed events also get a `nearest_assignment` field with the
|
|
same shape plus `delta_days` (signed; negative = event before assignment).
|
|
"""
|
|
# 1. Pull all assignments for this unit (any device_type — caller has
|
|
# already filtered by seismograph in the route). Order matters: we
|
|
# want the earliest-start assignment first so attribution prefers the
|
|
# chronologically-first overlap when there are simultaneous active
|
|
# assignments at different locations (rare but possible).
|
|
assignments = (
|
|
db.query(UnitAssignment)
|
|
.filter(UnitAssignment.unit_id == unit_id)
|
|
.order_by(UnitAssignment.assigned_at.asc())
|
|
.all()
|
|
)
|
|
|
|
# Resolve location + project names once.
|
|
loc_ids = {a.location_id for a in assignments}
|
|
proj_ids = {a.project_id for a in assignments}
|
|
loc_map = {
|
|
l.id: l for l in db.query(MonitoringLocation).filter(
|
|
MonitoringLocation.id.in_(loc_ids)
|
|
).all()
|
|
} if loc_ids else {}
|
|
proj_map = {
|
|
p.id: p for p in db.query(Project).filter(
|
|
Project.id.in_(proj_ids)
|
|
).all()
|
|
} if proj_ids else {}
|
|
|
|
now = datetime.utcnow()
|
|
|
|
def _attr_dict(a: UnitAssignment) -> dict:
|
|
loc = loc_map.get(a.location_id)
|
|
proj = proj_map.get(a.project_id)
|
|
return {
|
|
"assignment_id": a.id,
|
|
"location_id": a.location_id,
|
|
"location_name": loc.name if loc else None,
|
|
"project_id": a.project_id,
|
|
"project_name": proj.name if proj else None,
|
|
"assigned_at": _iso_utc(a.assigned_at),
|
|
"assigned_until": _iso_utc(a.assigned_until),
|
|
}
|
|
|
|
# 2. Fetch all events for this serial in one shot.
|
|
async with httpx.AsyncClient(timeout=_SFM_TIMEOUT_SECONDS) as client:
|
|
events = await _fetch_events_for_serial(
|
|
client,
|
|
serial=unit_id,
|
|
from_dt=from_dt or datetime(1970, 1, 1),
|
|
to_dt=to_dt or now,
|
|
false_trigger=false_trigger,
|
|
limit=_SFM_FETCH_CEILING,
|
|
)
|
|
|
|
# 3. For each event, walk the assignment list and find the first
|
|
# overlapping window. O(N * M) but both are small in practice.
|
|
for ev in events:
|
|
ts_str = ev.get("timestamp")
|
|
if not ts_str:
|
|
ev["attribution"] = None
|
|
continue
|
|
try:
|
|
# SFM returns ISO with "T" separator; tolerate both.
|
|
ts = datetime.fromisoformat(ts_str.replace(" ", "T"))
|
|
except ValueError:
|
|
ev["attribution"] = None
|
|
continue
|
|
|
|
matched: Optional[UnitAssignment] = None
|
|
for a in assignments:
|
|
a_end = a.assigned_until or now
|
|
if a.assigned_at <= ts <= a_end:
|
|
matched = a
|
|
break
|
|
|
|
if matched is not None:
|
|
ev["attribution"] = _attr_dict(matched)
|
|
else:
|
|
ev["attribution"] = None
|
|
# Find the nearest assignment (chronologically) for diagnostic.
|
|
if assignments:
|
|
nearest = min(
|
|
assignments,
|
|
key=lambda a: min(
|
|
abs((ts - a.assigned_at).total_seconds()),
|
|
abs((ts - (a.assigned_until or now)).total_seconds()),
|
|
),
|
|
)
|
|
# Signed delta in days from the nearest boundary
|
|
# (negative = event BEFORE that boundary).
|
|
if ts < nearest.assigned_at:
|
|
delta_seconds = (ts - nearest.assigned_at).total_seconds()
|
|
elif ts > (nearest.assigned_until or now):
|
|
delta_seconds = (ts - (nearest.assigned_until or now)).total_seconds()
|
|
else:
|
|
delta_seconds = 0
|
|
ev["nearest_assignment"] = {
|
|
**_attr_dict(nearest),
|
|
"delta_days": round(delta_seconds / 86400, 1),
|
|
}
|
|
|
|
# 4. Apply bucket filter.
|
|
if bucket == "attributed":
|
|
filtered = [e for e in events if e.get("attribution") is not None]
|
|
elif bucket == "unattributed":
|
|
filtered = [e for e in events if e.get("attribution") is None]
|
|
else:
|
|
filtered = events
|
|
|
|
filtered.sort(key=lambda e: e.get("timestamp") or "", reverse=True)
|
|
total_count = len(filtered)
|
|
capped = filtered[:limit]
|
|
|
|
# 5. Stats: compute over the ENTIRE event set (not the filtered bucket)
|
|
# so the unattributed_count tile is always meaningful regardless of
|
|
# which bucket the operator has selected.
|
|
base_stats = _compute_stats(events)
|
|
unattributed_count = sum(
|
|
1 for e in events if e.get("attribution") is None
|
|
)
|
|
base_stats["unattributed_count"] = unattributed_count
|
|
|
|
return {
|
|
"events": capped,
|
|
"count": total_count,
|
|
"stats": base_stats,
|
|
"assignments_total": len(assignments),
|
|
}
|
|
|
|
|
|
# ── Project-level roll-up (aggregates across all vibration locations) ─────────
|
|
|
|
|
|
async def vibration_summary_for_project(
|
|
db: Session,
|
|
project_id: str,
|
|
*,
|
|
from_dt: Optional[datetime] = None,
|
|
to_dt: Optional[datetime] = None,
|
|
) -> dict:
|
|
"""Aggregate SFM events across every vibration location in a project.
|
|
|
|
Returns:
|
|
{
|
|
"project_id": str,
|
|
"total_events": int,
|
|
"peak_pvs": float | None,
|
|
"peak_pvs_at": ISO timestamp | None,
|
|
"peak_pvs_location_id": str | None,
|
|
"peak_pvs_location_name": str | None,
|
|
"last_event": ISO timestamp | None,
|
|
"false_trigger_count": int,
|
|
"per_location": [
|
|
{"location_id", "location_name", "event_count",
|
|
"peak_pvs", "last_event"},
|
|
... # sorted by event_count DESC
|
|
],
|
|
"vibration_location_count": int,
|
|
}
|
|
"""
|
|
locations = (
|
|
db.query(MonitoringLocation)
|
|
.filter(MonitoringLocation.project_id == project_id)
|
|
.filter(MonitoringLocation.location_type == "vibration")
|
|
.all()
|
|
)
|
|
|
|
if not locations:
|
|
return {
|
|
"project_id": project_id,
|
|
"total_events": 0,
|
|
"peak_pvs": None,
|
|
"peak_pvs_at": None,
|
|
"peak_pvs_location_id": None,
|
|
"peak_pvs_location_name": None,
|
|
"last_event": None,
|
|
"false_trigger_count": 0,
|
|
"per_location": [],
|
|
"vibration_location_count": 0,
|
|
}
|
|
|
|
# Fan out across locations. Each call internally fans out across that
|
|
# location's UnitAssignment rows, so this is a nested fan-out. Both
|
|
# tiers happen concurrently because asyncio.gather + httpx pool.
|
|
results = await asyncio.gather(
|
|
*(
|
|
events_for_location(
|
|
db,
|
|
loc.id,
|
|
from_dt=from_dt,
|
|
to_dt=to_dt,
|
|
false_trigger=None,
|
|
limit=1, # We only need stats; events list itself is ignored.
|
|
)
|
|
for loc in locations
|
|
),
|
|
return_exceptions=False,
|
|
)
|
|
|
|
per_location: list[dict] = []
|
|
total_events = 0
|
|
peak_pvs = None
|
|
peak_pvs_at = None
|
|
peak_pvs_location_id = None
|
|
peak_pvs_location_name = None
|
|
last_event = None
|
|
false_trigger_count = 0
|
|
|
|
for loc, res in zip(locations, results):
|
|
st = res.get("stats", {}) or {}
|
|
ec = st.get("event_count", 0) or 0
|
|
total_events += ec
|
|
false_trigger_count += st.get("false_trigger_count", 0) or 0
|
|
|
|
ev_last = st.get("last_event")
|
|
if ev_last and (last_event is None or ev_last > last_event):
|
|
last_event = ev_last
|
|
|
|
ev_peak = st.get("peak_pvs")
|
|
if ev_peak is not None and (peak_pvs is None or ev_peak > peak_pvs):
|
|
peak_pvs = ev_peak
|
|
peak_pvs_at = st.get("peak_pvs_at")
|
|
peak_pvs_location_id = loc.id
|
|
peak_pvs_location_name = loc.name
|
|
|
|
per_location.append({
|
|
"location_id": loc.id,
|
|
"location_name": loc.name,
|
|
"event_count": ec,
|
|
"peak_pvs": ev_peak,
|
|
"last_event": ev_last,
|
|
})
|
|
|
|
per_location.sort(key=lambda r: r["event_count"], reverse=True)
|
|
|
|
return {
|
|
"project_id": project_id,
|
|
"total_events": total_events,
|
|
"peak_pvs": peak_pvs,
|
|
"peak_pvs_at": peak_pvs_at,
|
|
"peak_pvs_location_id": peak_pvs_location_id,
|
|
"peak_pvs_location_name": peak_pvs_location_name,
|
|
"last_event": last_event,
|
|
"false_trigger_count": false_trigger_count,
|
|
"per_location": per_location,
|
|
"vibration_location_count": len(locations),
|
|
}
|
|
|
|
|
|
# ── Stats helpers ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _empty_stats() -> dict:
|
|
return {
|
|
"event_count": 0,
|
|
"peak_pvs": None,
|
|
"peak_pvs_at": None,
|
|
"peak_pvs_serial": None,
|
|
"last_event": None,
|
|
"false_trigger_count": 0,
|
|
}
|
|
|
|
|
|
def _compute_stats(events: list[dict]) -> dict:
|
|
"""Roll up summary stats from a merged event list. Cheap O(N) pass."""
|
|
if not events:
|
|
return _empty_stats()
|
|
|
|
peak_pvs = None
|
|
peak_pvs_at = None
|
|
peak_pvs_serial = None
|
|
last_event = None
|
|
false_trigger_count = 0
|
|
|
|
for ev in events:
|
|
pvs = ev.get("peak_vector_sum")
|
|
if pvs is not None and (peak_pvs is None or pvs > peak_pvs):
|
|
peak_pvs = pvs
|
|
peak_pvs_at = ev.get("timestamp")
|
|
peak_pvs_serial = ev.get("serial")
|
|
|
|
ts = ev.get("timestamp")
|
|
if ts and (last_event is None or ts > last_event):
|
|
last_event = ts
|
|
|
|
if ev.get("false_trigger"):
|
|
false_trigger_count += 1
|
|
|
|
return {
|
|
"event_count": len(events),
|
|
"peak_pvs": peak_pvs,
|
|
"peak_pvs_at": peak_pvs_at,
|
|
"peak_pvs_serial": peak_pvs_serial,
|
|
"last_event": last_event,
|
|
"false_trigger_count": false_trigger_count,
|
|
}
|