e15481884a
Two related operator-facing improvements after the nav reorg. 1) Events as a top-level sidebar entry. The /sfm page (fleet-wide event database) was demoted to Settings → Developer in the previous reorg. Bringing it back to main nav as "Events" — operators do reach for the cross-project, sortable event list, so it earns a top-level slot. Sidebar now (7 items): Dashboard · Devices · Projects · Events · Tools · Job Planner · Settings Settings → Developer card pointing at /sfm is removed. /sfm page title/subtitle updated from "SFM Event Data" to just "Events". URL unchanged. 2) "Peak PVS" KPI tile becomes "Overall Peak" and excludes false triggers from the calculation. When operators ask "what's the biggest event at this location/unit/ project?" they mean the biggest REAL event, not the biggest sensor glitch. A single mis-flagged false trigger could otherwise dominate the tile (the 14.13 in/s spike at Loc 1 was a prime example). backend/services/sfm_events.py: - _compute_stats() skips false_trigger=True events when computing peak_pvs / peak_pvs_at / peak_pvs_serial. Continues counting them in false_trigger_count so the separate "False Triggers" tile still reflects what got filtered out. last_event unchanged (recency, not magnitude). - Same change automatically propagates to events_for_unit() and vibration_summary_for_project() — both call _compute_stats(). Templates: "Peak PVS" → "Overall Peak" in 3 KPI tile locations (vibration_location_detail.html, partials/projects/vibration_summary .html, unit_detail.html). The physical-quantity name "Peak Vector Sum" in the event-detail modal stays — that's the actual physics term, not a summary stat. Verified end-to-end: Overall Peak renders on real data; peak event false_trigger flag confirmed False. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
593 lines
21 KiB
Python
593 lines
21 KiB
Python
"""
|
|
SFM events service — bridge between terra-view's UnitAssignment time-windows
|
|
and the SFM (seismo-relay) events store.
|
|
|
|
Architecture:
|
|
1. Terra-view owns the *assignment graph*: which seismograph was at which
|
|
monitoring location during which time window (UnitAssignment rows).
|
|
2. SFM owns the *events store*: triggered waveform events keyed by
|
|
(serial, timestamp), forwarded from Blastware ACH by series3-watcher.
|
|
3. This module fans out the assignments for a given location, queries SFM
|
|
for the events emitted by each (serial, window) pair concurrently, and
|
|
unions/sorts/paginates the results.
|
|
|
|
SFM remains the single source of truth for events. Terra-view does not
|
|
copy events into its own DB; every query hits SFM live.
|
|
|
|
The events_for_location helper is also reused by Phase 3 (project-level
|
|
roll-up) to aggregate across every location in a project.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.models import UnitAssignment, RosterUnit, MonitoringLocation, Project
|
|
|
|
log = logging.getLogger("backend.services.sfm_events")
|
|
|
|
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
|
|
|
|
# Per-request timeout when calling SFM /db/events. SFM is local on the
|
|
# docker network so this should be fast; bump if you start seeing timeouts.
|
|
_SFM_TIMEOUT_SECONDS = 10.0
|
|
|
|
# Max events we ever fetch per (serial, window) call to SFM. Must match
|
|
# SFM's own /db/events max limit (currently 5000). The user-facing display
|
|
# limit is independent — we over-fetch up to this cap so summary stats are
|
|
# accurate, then trim the displayed list to the requested limit.
|
|
_SFM_FETCH_CEILING = 5000
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _iso_utc(dt: Optional[datetime]) -> Optional[str]:
|
|
"""Render a datetime in the ISO format SFM /db/events expects."""
|
|
if dt is None:
|
|
return None
|
|
# SFM parses naive ISO strings as UTC; strip tzinfo for consistency.
|
|
if dt.tzinfo is not None:
|
|
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
|
|
return dt.isoformat(sep=" ", timespec="seconds")
|
|
|
|
|
|
def _intersect_window(
|
|
assignment_start: datetime,
|
|
assignment_end: Optional[datetime],
|
|
filter_from: Optional[datetime],
|
|
filter_to: Optional[datetime],
|
|
now: datetime,
|
|
) -> Optional[tuple[datetime, datetime]]:
|
|
"""Intersect an assignment window with the requested filter window.
|
|
|
|
Returns (effective_start, effective_end) or None if there's no overlap.
|
|
Open-ended assignments (assigned_until=NULL) are bounded by `now`.
|
|
"""
|
|
a_end = assignment_end or now
|
|
if filter_from and a_end <= filter_from:
|
|
return None
|
|
if filter_to and assignment_start >= filter_to:
|
|
return None
|
|
start = max(assignment_start, filter_from) if filter_from else assignment_start
|
|
end = min(a_end, filter_to) if filter_to else a_end
|
|
if end <= start:
|
|
return None
|
|
return (start, end)
|
|
|
|
|
|
async def _fetch_events_for_serial(
|
|
client: httpx.AsyncClient,
|
|
serial: str,
|
|
*,
|
|
from_dt: datetime,
|
|
to_dt: datetime,
|
|
false_trigger: Optional[bool],
|
|
limit: int,
|
|
) -> list[dict]:
|
|
"""Issue one /db/events call to SFM for one (serial, window) pair."""
|
|
params: dict[str, str] = {
|
|
"serial": serial,
|
|
"from_dt": _iso_utc(from_dt) or "",
|
|
"to_dt": _iso_utc(to_dt) or "",
|
|
"limit": str(limit),
|
|
}
|
|
if false_trigger is not None:
|
|
params["false_trigger"] = "true" if false_trigger else "false"
|
|
|
|
try:
|
|
resp = await client.get(f"{SFM_BASE_URL}/db/events", params=params)
|
|
resp.raise_for_status()
|
|
except httpx.HTTPError as e:
|
|
log.warning("SFM /db/events failed for serial=%s: %s", serial, e)
|
|
return []
|
|
|
|
payload = resp.json()
|
|
events = payload.get("events", []) or []
|
|
# Strip waveform_blob if present — it's the big per-event binary and we
|
|
# don't render it in the list view. SFM returns it by default.
|
|
for ev in events:
|
|
ev.pop("waveform_blob", None)
|
|
ev.pop("a5_pickle_filename", None)
|
|
return events
|
|
|
|
|
|
# ── Public API ────────────────────────────────────────────────────────────────
|
|
|
|
|
|
async def events_for_location(
|
|
db: Session,
|
|
location_id: str,
|
|
*,
|
|
from_dt: Optional[datetime] = None,
|
|
to_dt: Optional[datetime] = None,
|
|
false_trigger: Optional[bool] = None,
|
|
limit: int = 500,
|
|
) -> dict:
|
|
"""Fan out UnitAssignment rows for `location_id` and union SFM events.
|
|
|
|
Returns:
|
|
{
|
|
"events": [merged event dicts, newest first, capped at limit],
|
|
"count": total events found across all windows (pre-cap),
|
|
"stats": {event_count, peak_pvs, peak_pvs_at,
|
|
last_event, false_trigger_count},
|
|
"assignments_used": [{unit_id, assigned_at, assigned_until,
|
|
events_in_window}, ...],
|
|
}
|
|
|
|
The "events outside any assignment window" rule (Phase 1 design decision):
|
|
events whose timestamp falls outside every assignment window are simply
|
|
not fetched — we only ask SFM for events inside the intersected windows.
|
|
Those orphan events surface under the per-unit detail page in Phase 2.
|
|
"""
|
|
# 1. Fetch all assignments (active + closed) for the location.
|
|
assignments = (
|
|
db.query(UnitAssignment)
|
|
.filter(UnitAssignment.location_id == location_id)
|
|
.filter(UnitAssignment.device_type == "seismograph")
|
|
.order_by(UnitAssignment.assigned_at.asc())
|
|
.all()
|
|
)
|
|
|
|
if not assignments:
|
|
return {
|
|
"events": [],
|
|
"count": 0,
|
|
"stats": _empty_stats(),
|
|
"assignments_used": [],
|
|
}
|
|
|
|
now = datetime.utcnow()
|
|
|
|
# 2. For each assignment, compute the effective (start, end) window after
|
|
# intersecting with the requested filter range. Drop assignments that
|
|
# don't overlap the filter window.
|
|
fetch_specs: list[tuple[UnitAssignment, datetime, datetime]] = []
|
|
for a in assignments:
|
|
window = _intersect_window(a.assigned_at, a.assigned_until, from_dt, to_dt, now)
|
|
if window is not None:
|
|
fetch_specs.append((a, window[0], window[1]))
|
|
|
|
if not fetch_specs:
|
|
return {
|
|
"events": [],
|
|
"count": 0,
|
|
"stats": _empty_stats(),
|
|
"assignments_used": [
|
|
{
|
|
"unit_id": a.unit_id,
|
|
"assigned_at": _iso_utc(a.assigned_at),
|
|
"assigned_until": _iso_utc(a.assigned_until),
|
|
"events_in_window": 0,
|
|
}
|
|
for a in assignments
|
|
],
|
|
}
|
|
|
|
# 3. Concurrent SFM fetches. We over-fetch (up to _SFM_FETCH_CEILING per
|
|
# window) so summary stats reflect the true peak/last/count across the
|
|
# full filter window, not just what fits in the user's display limit.
|
|
# The displayed event list is trimmed to `limit` after merge.
|
|
async with httpx.AsyncClient(timeout=_SFM_TIMEOUT_SECONDS) as client:
|
|
per_window_lists = await asyncio.gather(
|
|
*(
|
|
_fetch_events_for_serial(
|
|
client,
|
|
serial=a.unit_id,
|
|
from_dt=start,
|
|
to_dt=end,
|
|
false_trigger=false_trigger,
|
|
limit=_SFM_FETCH_CEILING,
|
|
)
|
|
for a, start, end in fetch_specs
|
|
),
|
|
return_exceptions=False,
|
|
)
|
|
|
|
# 4. Build the per-assignment event counts (transparency for the operator).
|
|
spec_event_counts: dict[str, int] = {}
|
|
for (a, _start, _end), evs in zip(fetch_specs, per_window_lists):
|
|
spec_event_counts[a.id] = len(evs)
|
|
|
|
# 5. Union, sort newest-first, cap.
|
|
merged: list[dict] = []
|
|
for evs in per_window_lists:
|
|
merged.extend(evs)
|
|
merged.sort(key=lambda e: e.get("timestamp") or "", reverse=True)
|
|
total_count = len(merged)
|
|
capped = merged[:limit]
|
|
|
|
# 6. Compute summary stats over the full merged set (not the capped one).
|
|
stats = _compute_stats(merged)
|
|
|
|
# 7. Build the assignments_used report (every assignment, in chronological
|
|
# order, with its event count — even ones that fell outside the filter
|
|
# window so the operator sees them but with count=0).
|
|
assignments_used = []
|
|
for a in assignments:
|
|
assignments_used.append(
|
|
{
|
|
"unit_id": a.unit_id,
|
|
"assignment_id": a.id,
|
|
"assigned_at": _iso_utc(a.assigned_at),
|
|
"assigned_until": _iso_utc(a.assigned_until),
|
|
"events_in_window": spec_event_counts.get(a.id, 0),
|
|
"status": a.status,
|
|
}
|
|
)
|
|
|
|
return {
|
|
"events": capped,
|
|
"count": total_count,
|
|
"stats": stats,
|
|
"assignments_used": assignments_used,
|
|
}
|
|
|
|
|
|
# ── Per-unit (cross-project) view ─────────────────────────────────────────────
|
|
|
|
|
|
async def events_for_unit(
|
|
db: Session,
|
|
unit_id: str,
|
|
*,
|
|
bucket: str = "all", # "all" | "attributed" | "unattributed"
|
|
from_dt: Optional[datetime] = None,
|
|
to_dt: Optional[datetime] = None,
|
|
false_trigger: Optional[bool] = None,
|
|
limit: int = 500,
|
|
) -> dict:
|
|
"""Return events for a unit annotated with their assignment attribution.
|
|
|
|
Unlike events_for_location (which queries SFM per assignment window), this
|
|
helper queries SFM for ALL events for the serial within the optional
|
|
[from_dt, to_dt] filter, then walks each event against the unit's
|
|
UnitAssignment intervals to compute attribution.
|
|
|
|
Bucket semantics:
|
|
- "all": every event, attributed or not
|
|
- "attributed": events that fall inside at least one assignment window
|
|
- "unattributed": events with no overlapping assignment (the diagnostic
|
|
bucket — operator should fix assignment dates to
|
|
attribute these)
|
|
|
|
Each event gets an extra `attribution` field:
|
|
{assignment_id, location_id, location_name, project_id, project_name,
|
|
assigned_at, assigned_until} or None
|
|
|
|
Unattributed events also get a `nearest_assignment` field with the
|
|
same shape plus `delta_days` (signed; negative = event before assignment).
|
|
"""
|
|
# 1. Pull all assignments for this unit (any device_type — caller has
|
|
# already filtered by seismograph in the route). Order matters: we
|
|
# want the earliest-start assignment first so attribution prefers the
|
|
# chronologically-first overlap when there are simultaneous active
|
|
# assignments at different locations (rare but possible).
|
|
assignments = (
|
|
db.query(UnitAssignment)
|
|
.filter(UnitAssignment.unit_id == unit_id)
|
|
.order_by(UnitAssignment.assigned_at.asc())
|
|
.all()
|
|
)
|
|
|
|
# Resolve location + project names once.
|
|
loc_ids = {a.location_id for a in assignments}
|
|
proj_ids = {a.project_id for a in assignments}
|
|
loc_map = {
|
|
l.id: l for l in db.query(MonitoringLocation).filter(
|
|
MonitoringLocation.id.in_(loc_ids)
|
|
).all()
|
|
} if loc_ids else {}
|
|
proj_map = {
|
|
p.id: p for p in db.query(Project).filter(
|
|
Project.id.in_(proj_ids)
|
|
).all()
|
|
} if proj_ids else {}
|
|
|
|
now = datetime.utcnow()
|
|
|
|
def _attr_dict(a: UnitAssignment) -> dict:
|
|
loc = loc_map.get(a.location_id)
|
|
proj = proj_map.get(a.project_id)
|
|
return {
|
|
"assignment_id": a.id,
|
|
"location_id": a.location_id,
|
|
"location_name": loc.name if loc else None,
|
|
"project_id": a.project_id,
|
|
"project_name": proj.name if proj else None,
|
|
"assigned_at": _iso_utc(a.assigned_at),
|
|
"assigned_until": _iso_utc(a.assigned_until),
|
|
}
|
|
|
|
# 2. Fetch all events for this serial in one shot.
|
|
async with httpx.AsyncClient(timeout=_SFM_TIMEOUT_SECONDS) as client:
|
|
events = await _fetch_events_for_serial(
|
|
client,
|
|
serial=unit_id,
|
|
from_dt=from_dt or datetime(1970, 1, 1),
|
|
to_dt=to_dt or now,
|
|
false_trigger=false_trigger,
|
|
limit=_SFM_FETCH_CEILING,
|
|
)
|
|
|
|
# 3. For each event, walk the assignment list and find the first
|
|
# overlapping window. O(N * M) but both are small in practice.
|
|
for ev in events:
|
|
ts_str = ev.get("timestamp")
|
|
if not ts_str:
|
|
ev["attribution"] = None
|
|
continue
|
|
try:
|
|
# SFM returns ISO with "T" separator; tolerate both.
|
|
ts = datetime.fromisoformat(ts_str.replace(" ", "T"))
|
|
except ValueError:
|
|
ev["attribution"] = None
|
|
continue
|
|
|
|
matched: Optional[UnitAssignment] = None
|
|
for a in assignments:
|
|
a_end = a.assigned_until or now
|
|
if a.assigned_at <= ts <= a_end:
|
|
matched = a
|
|
break
|
|
|
|
if matched is not None:
|
|
ev["attribution"] = _attr_dict(matched)
|
|
else:
|
|
ev["attribution"] = None
|
|
# Find the nearest assignment (chronologically) for diagnostic.
|
|
if assignments:
|
|
nearest = min(
|
|
assignments,
|
|
key=lambda a: min(
|
|
abs((ts - a.assigned_at).total_seconds()),
|
|
abs((ts - (a.assigned_until or now)).total_seconds()),
|
|
),
|
|
)
|
|
# Signed delta in days from the nearest boundary
|
|
# (negative = event BEFORE that boundary).
|
|
if ts < nearest.assigned_at:
|
|
delta_seconds = (ts - nearest.assigned_at).total_seconds()
|
|
elif ts > (nearest.assigned_until or now):
|
|
delta_seconds = (ts - (nearest.assigned_until or now)).total_seconds()
|
|
else:
|
|
delta_seconds = 0
|
|
ev["nearest_assignment"] = {
|
|
**_attr_dict(nearest),
|
|
"delta_days": round(delta_seconds / 86400, 1),
|
|
}
|
|
|
|
# 4. Apply bucket filter.
|
|
if bucket == "attributed":
|
|
filtered = [e for e in events if e.get("attribution") is not None]
|
|
elif bucket == "unattributed":
|
|
filtered = [e for e in events if e.get("attribution") is None]
|
|
else:
|
|
filtered = events
|
|
|
|
filtered.sort(key=lambda e: e.get("timestamp") or "", reverse=True)
|
|
total_count = len(filtered)
|
|
capped = filtered[:limit]
|
|
|
|
# 5. Stats: compute over the ENTIRE event set (not the filtered bucket)
|
|
# so the unattributed_count tile is always meaningful regardless of
|
|
# which bucket the operator has selected.
|
|
base_stats = _compute_stats(events)
|
|
unattributed_count = sum(
|
|
1 for e in events if e.get("attribution") is None
|
|
)
|
|
base_stats["unattributed_count"] = unattributed_count
|
|
|
|
return {
|
|
"events": capped,
|
|
"count": total_count,
|
|
"stats": base_stats,
|
|
"assignments_total": len(assignments),
|
|
}
|
|
|
|
|
|
# ── Project-level roll-up (aggregates across all vibration locations) ─────────
|
|
|
|
|
|
async def vibration_summary_for_project(
|
|
db: Session,
|
|
project_id: str,
|
|
*,
|
|
from_dt: Optional[datetime] = None,
|
|
to_dt: Optional[datetime] = None,
|
|
) -> dict:
|
|
"""Aggregate SFM events across every vibration location in a project.
|
|
|
|
Returns:
|
|
{
|
|
"project_id": str,
|
|
"total_events": int,
|
|
"peak_pvs": float | None,
|
|
"peak_pvs_at": ISO timestamp | None,
|
|
"peak_pvs_location_id": str | None,
|
|
"peak_pvs_location_name": str | None,
|
|
"last_event": ISO timestamp | None,
|
|
"false_trigger_count": int,
|
|
"per_location": [
|
|
{"location_id", "location_name", "event_count",
|
|
"peak_pvs", "last_event"},
|
|
... # sorted by event_count DESC
|
|
],
|
|
"vibration_location_count": int,
|
|
}
|
|
"""
|
|
locations = (
|
|
db.query(MonitoringLocation)
|
|
.filter(MonitoringLocation.project_id == project_id)
|
|
.filter(MonitoringLocation.location_type == "vibration")
|
|
.all()
|
|
)
|
|
|
|
if not locations:
|
|
return {
|
|
"project_id": project_id,
|
|
"total_events": 0,
|
|
"peak_pvs": None,
|
|
"peak_pvs_at": None,
|
|
"peak_pvs_location_id": None,
|
|
"peak_pvs_location_name": None,
|
|
"last_event": None,
|
|
"false_trigger_count": 0,
|
|
"per_location": [],
|
|
"vibration_location_count": 0,
|
|
}
|
|
|
|
# Fan out across locations. Each call internally fans out across that
|
|
# location's UnitAssignment rows, so this is a nested fan-out. Both
|
|
# tiers happen concurrently because asyncio.gather + httpx pool.
|
|
results = await asyncio.gather(
|
|
*(
|
|
events_for_location(
|
|
db,
|
|
loc.id,
|
|
from_dt=from_dt,
|
|
to_dt=to_dt,
|
|
false_trigger=None,
|
|
limit=1, # We only need stats; events list itself is ignored.
|
|
)
|
|
for loc in locations
|
|
),
|
|
return_exceptions=False,
|
|
)
|
|
|
|
per_location: list[dict] = []
|
|
total_events = 0
|
|
peak_pvs = None
|
|
peak_pvs_at = None
|
|
peak_pvs_location_id = None
|
|
peak_pvs_location_name = None
|
|
last_event = None
|
|
false_trigger_count = 0
|
|
|
|
for loc, res in zip(locations, results):
|
|
st = res.get("stats", {}) or {}
|
|
ec = st.get("event_count", 0) or 0
|
|
total_events += ec
|
|
false_trigger_count += st.get("false_trigger_count", 0) or 0
|
|
|
|
ev_last = st.get("last_event")
|
|
if ev_last and (last_event is None or ev_last > last_event):
|
|
last_event = ev_last
|
|
|
|
ev_peak = st.get("peak_pvs")
|
|
if ev_peak is not None and (peak_pvs is None or ev_peak > peak_pvs):
|
|
peak_pvs = ev_peak
|
|
peak_pvs_at = st.get("peak_pvs_at")
|
|
peak_pvs_location_id = loc.id
|
|
peak_pvs_location_name = loc.name
|
|
|
|
per_location.append({
|
|
"location_id": loc.id,
|
|
"location_name": loc.name,
|
|
"event_count": ec,
|
|
"peak_pvs": ev_peak,
|
|
"last_event": ev_last,
|
|
})
|
|
|
|
per_location.sort(key=lambda r: r["event_count"], reverse=True)
|
|
|
|
return {
|
|
"project_id": project_id,
|
|
"total_events": total_events,
|
|
"peak_pvs": peak_pvs,
|
|
"peak_pvs_at": peak_pvs_at,
|
|
"peak_pvs_location_id": peak_pvs_location_id,
|
|
"peak_pvs_location_name": peak_pvs_location_name,
|
|
"last_event": last_event,
|
|
"false_trigger_count": false_trigger_count,
|
|
"per_location": per_location,
|
|
"vibration_location_count": len(locations),
|
|
}
|
|
|
|
|
|
# ── Stats helpers ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _empty_stats() -> dict:
|
|
return {
|
|
"event_count": 0,
|
|
"peak_pvs": None,
|
|
"peak_pvs_at": None,
|
|
"peak_pvs_serial": None,
|
|
"last_event": None,
|
|
"false_trigger_count": 0,
|
|
}
|
|
|
|
|
|
def _compute_stats(events: list[dict]) -> dict:
|
|
"""Roll up summary stats from a merged event list. Cheap O(N) pass.
|
|
|
|
The "Overall Peak" stat (peak_pvs) EXCLUDES events flagged as false
|
|
triggers — operators care about the highest REAL event, not the
|
|
biggest sensor glitch. false_trigger_count still includes them so
|
|
operators can see how many were filtered out. last_event uses
|
|
every event regardless (it's about activity recency, not magnitude).
|
|
"""
|
|
if not events:
|
|
return _empty_stats()
|
|
|
|
peak_pvs = None
|
|
peak_pvs_at = None
|
|
peak_pvs_serial = None
|
|
last_event = None
|
|
false_trigger_count = 0
|
|
|
|
for ev in events:
|
|
is_false_trigger = bool(ev.get("false_trigger"))
|
|
if is_false_trigger:
|
|
false_trigger_count += 1
|
|
|
|
# Peak calculation: skip flagged false triggers.
|
|
if not is_false_trigger:
|
|
pvs = ev.get("peak_vector_sum")
|
|
if pvs is not None and (peak_pvs is None or pvs > peak_pvs):
|
|
peak_pvs = pvs
|
|
peak_pvs_at = ev.get("timestamp")
|
|
peak_pvs_serial = ev.get("serial")
|
|
|
|
ts = ev.get("timestamp")
|
|
if ts and (last_event is None or ts > last_event):
|
|
last_event = ts
|
|
|
|
return {
|
|
"event_count": len(events),
|
|
"peak_pvs": peak_pvs,
|
|
"peak_pvs_at": peak_pvs_at,
|
|
"peak_pvs_serial": peak_pvs_serial,
|
|
"last_event": last_event,
|
|
"false_trigger_count": false_trigger_count,
|
|
}
|