diff --git a/backend/routers/project_locations.py b/backend/routers/project_locations.py index 701ffcf..f0aa1ed 100644 --- a/backend/routers/project_locations.py +++ b/backend/routers/project_locations.py @@ -648,6 +648,68 @@ async def get_nrl_sessions( }) +@router.get("/locations/{location_id}/events", response_class=JSONResponse) +async def get_location_events( + project_id: str, + location_id: str, + from_dt: Optional[datetime] = Query(None), + to_dt: Optional[datetime] = Query(None), + false_trigger: Optional[bool] = Query(None), + limit: int = Query(500, ge=1, le=5000), + db: Session = Depends(get_db), +): + """ + Return SFM events recorded at this monitoring location. + + Fans out the location's UnitAssignment rows (every seismograph ever + assigned to this location, active + closed), queries SFM /db/events + for each (serial, time-window) pair concurrently, and unions the + results. + + Sound (SLM) locations return an empty payload — SFM events are + seismograph-only. + """ + location = db.query(MonitoringLocation).filter_by(id=location_id).first() + if not location: + raise HTTPException(status_code=404, detail="Location not found.") + if location.project_id != project_id: + raise HTTPException( + status_code=404, + detail="Location does not belong to this project.", + ) + + # SLM locations don't have SFM events — return an empty payload rather + # than 404 so the frontend can render an empty state gracefully. + if location.location_type != "vibration": + return { + "events": [], + "count": 0, + "stats": { + "event_count": 0, + "peak_pvs": None, + "peak_pvs_at": None, + "peak_pvs_serial": None, + "last_event": None, + "false_trigger_count": 0, + }, + "assignments_used": [], + "location_type": location.location_type, + } + + from backend.services.sfm_events import events_for_location + + result = await events_for_location( + db, + location_id, + from_dt=from_dt, + to_dt=to_dt, + false_trigger=false_trigger, + limit=limit, + ) + result["location_type"] = location.location_type + return result + + @router.get("/nrl/{location_id}/files", response_class=HTMLResponse) async def get_nrl_files( project_id: str, diff --git a/backend/services/sfm_events.py b/backend/services/sfm_events.py new file mode 100644 index 0000000..19312ec --- /dev/null +++ b/backend/services/sfm_events.py @@ -0,0 +1,301 @@ +""" +SFM events service — bridge between terra-view's UnitAssignment time-windows +and the SFM (seismo-relay) events store. + +Architecture: + 1. Terra-view owns the *assignment graph*: which seismograph was at which + monitoring location during which time window (UnitAssignment rows). + 2. SFM owns the *events store*: triggered waveform events keyed by + (serial, timestamp), forwarded from Blastware ACH by series3-watcher. + 3. This module fans out the assignments for a given location, queries SFM + for the events emitted by each (serial, window) pair concurrently, and + unions/sorts/paginates the results. + +SFM remains the single source of truth for events. Terra-view does not +copy events into its own DB; every query hits SFM live. + +The events_for_location helper is also reused by Phase 3 (project-level +roll-up) to aggregate across every location in a project. +""" + +from __future__ import annotations + +import asyncio +import logging +import os +from datetime import datetime, timezone +from typing import Optional + +import httpx +from sqlalchemy.orm import Session + +from backend.models import UnitAssignment, RosterUnit + +log = logging.getLogger("backend.services.sfm_events") + +SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200") + +# Per-request timeout when calling SFM /db/events. SFM is local on the +# docker network so this should be fast; bump if you start seeing timeouts. +_SFM_TIMEOUT_SECONDS = 10.0 + +# Max events we ever fetch per (serial, window) call to SFM. Must match +# SFM's own /db/events max limit (currently 5000). The user-facing display +# limit is independent — we over-fetch up to this cap so summary stats are +# accurate, then trim the displayed list to the requested limit. +_SFM_FETCH_CEILING = 5000 + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +def _iso_utc(dt: Optional[datetime]) -> Optional[str]: + """Render a datetime in the ISO format SFM /db/events expects.""" + if dt is None: + return None + # SFM parses naive ISO strings as UTC; strip tzinfo for consistency. + if dt.tzinfo is not None: + dt = dt.astimezone(timezone.utc).replace(tzinfo=None) + return dt.isoformat(sep=" ", timespec="seconds") + + +def _intersect_window( + assignment_start: datetime, + assignment_end: Optional[datetime], + filter_from: Optional[datetime], + filter_to: Optional[datetime], + now: datetime, +) -> Optional[tuple[datetime, datetime]]: + """Intersect an assignment window with the requested filter window. + + Returns (effective_start, effective_end) or None if there's no overlap. + Open-ended assignments (assigned_until=NULL) are bounded by `now`. + """ + a_end = assignment_end or now + if filter_from and a_end <= filter_from: + return None + if filter_to and assignment_start >= filter_to: + return None + start = max(assignment_start, filter_from) if filter_from else assignment_start + end = min(a_end, filter_to) if filter_to else a_end + if end <= start: + return None + return (start, end) + + +async def _fetch_events_for_serial( + client: httpx.AsyncClient, + serial: str, + *, + from_dt: datetime, + to_dt: datetime, + false_trigger: Optional[bool], + limit: int, +) -> list[dict]: + """Issue one /db/events call to SFM for one (serial, window) pair.""" + params: dict[str, str] = { + "serial": serial, + "from_dt": _iso_utc(from_dt) or "", + "to_dt": _iso_utc(to_dt) or "", + "limit": str(limit), + } + if false_trigger is not None: + params["false_trigger"] = "true" if false_trigger else "false" + + try: + resp = await client.get(f"{SFM_BASE_URL}/db/events", params=params) + resp.raise_for_status() + except httpx.HTTPError as e: + log.warning("SFM /db/events failed for serial=%s: %s", serial, e) + return [] + + payload = resp.json() + events = payload.get("events", []) or [] + # Strip waveform_blob if present — it's the big per-event binary and we + # don't render it in the list view. SFM returns it by default. + for ev in events: + ev.pop("waveform_blob", None) + ev.pop("a5_pickle_filename", None) + return events + + +# ── Public API ──────────────────────────────────────────────────────────────── + + +async def events_for_location( + db: Session, + location_id: str, + *, + from_dt: Optional[datetime] = None, + to_dt: Optional[datetime] = None, + false_trigger: Optional[bool] = None, + limit: int = 500, +) -> dict: + """Fan out UnitAssignment rows for `location_id` and union SFM events. + + Returns: + { + "events": [merged event dicts, newest first, capped at limit], + "count": total events found across all windows (pre-cap), + "stats": {event_count, peak_pvs, peak_pvs_at, + last_event, false_trigger_count}, + "assignments_used": [{unit_id, assigned_at, assigned_until, + events_in_window}, ...], + } + + The "events outside any assignment window" rule (Phase 1 design decision): + events whose timestamp falls outside every assignment window are simply + not fetched — we only ask SFM for events inside the intersected windows. + Those orphan events surface under the per-unit detail page in Phase 2. + """ + # 1. Fetch all assignments (active + closed) for the location. + assignments = ( + db.query(UnitAssignment) + .filter(UnitAssignment.location_id == location_id) + .filter(UnitAssignment.device_type == "seismograph") + .order_by(UnitAssignment.assigned_at.asc()) + .all() + ) + + if not assignments: + return { + "events": [], + "count": 0, + "stats": _empty_stats(), + "assignments_used": [], + } + + now = datetime.utcnow() + + # 2. For each assignment, compute the effective (start, end) window after + # intersecting with the requested filter range. Drop assignments that + # don't overlap the filter window. + fetch_specs: list[tuple[UnitAssignment, datetime, datetime]] = [] + for a in assignments: + window = _intersect_window(a.assigned_at, a.assigned_until, from_dt, to_dt, now) + if window is not None: + fetch_specs.append((a, window[0], window[1])) + + if not fetch_specs: + return { + "events": [], + "count": 0, + "stats": _empty_stats(), + "assignments_used": [ + { + "unit_id": a.unit_id, + "assigned_at": _iso_utc(a.assigned_at), + "assigned_until": _iso_utc(a.assigned_until), + "events_in_window": 0, + } + for a in assignments + ], + } + + # 3. Concurrent SFM fetches. We over-fetch (up to _SFM_FETCH_CEILING per + # window) so summary stats reflect the true peak/last/count across the + # full filter window, not just what fits in the user's display limit. + # The displayed event list is trimmed to `limit` after merge. + async with httpx.AsyncClient(timeout=_SFM_TIMEOUT_SECONDS) as client: + per_window_lists = await asyncio.gather( + *( + _fetch_events_for_serial( + client, + serial=a.unit_id, + from_dt=start, + to_dt=end, + false_trigger=false_trigger, + limit=_SFM_FETCH_CEILING, + ) + for a, start, end in fetch_specs + ), + return_exceptions=False, + ) + + # 4. Build the per-assignment event counts (transparency for the operator). + spec_event_counts: dict[str, int] = {} + for (a, _start, _end), evs in zip(fetch_specs, per_window_lists): + spec_event_counts[a.id] = len(evs) + + # 5. Union, sort newest-first, cap. + merged: list[dict] = [] + for evs in per_window_lists: + merged.extend(evs) + merged.sort(key=lambda e: e.get("timestamp") or "", reverse=True) + total_count = len(merged) + capped = merged[:limit] + + # 6. Compute summary stats over the full merged set (not the capped one). + stats = _compute_stats(merged) + + # 7. Build the assignments_used report (every assignment, in chronological + # order, with its event count — even ones that fell outside the filter + # window so the operator sees them but with count=0). + assignments_used = [] + for a in assignments: + assignments_used.append( + { + "unit_id": a.unit_id, + "assignment_id": a.id, + "assigned_at": _iso_utc(a.assigned_at), + "assigned_until": _iso_utc(a.assigned_until), + "events_in_window": spec_event_counts.get(a.id, 0), + "status": a.status, + } + ) + + return { + "events": capped, + "count": total_count, + "stats": stats, + "assignments_used": assignments_used, + } + + +# ── Stats helpers ───────────────────────────────────────────────────────────── + + +def _empty_stats() -> dict: + return { + "event_count": 0, + "peak_pvs": None, + "peak_pvs_at": None, + "peak_pvs_serial": None, + "last_event": None, + "false_trigger_count": 0, + } + + +def _compute_stats(events: list[dict]) -> dict: + """Roll up summary stats from a merged event list. Cheap O(N) pass.""" + if not events: + return _empty_stats() + + peak_pvs = None + peak_pvs_at = None + peak_pvs_serial = None + last_event = None + false_trigger_count = 0 + + for ev in events: + pvs = ev.get("peak_vector_sum") + if pvs is not None and (peak_pvs is None or pvs > peak_pvs): + peak_pvs = pvs + peak_pvs_at = ev.get("timestamp") + peak_pvs_serial = ev.get("serial") + + ts = ev.get("timestamp") + if ts and (last_event is None or ts > last_event): + last_event = ts + + if ev.get("false_trigger"): + false_trigger_count += 1 + + return { + "event_count": len(events), + "peak_pvs": peak_pvs, + "peak_pvs_at": peak_pvs_at, + "peak_pvs_serial": peak_pvs_serial, + "last_event": last_event, + "false_trigger_count": false_trigger_count, + } diff --git a/templates/vibration_location_detail.html b/templates/vibration_location_detail.html index 6512948..e0ff8f6 100644 --- a/templates/vibration_location_detail.html +++ b/templates/vibration_location_detail.html @@ -65,6 +65,11 @@ class="tab-button px-4 py-3 border-b-2 font-medium text-sm transition-colors border-seismo-orange text-seismo-orange"> Overview + + + + + + +
+
+
+
+ Loading events… +
+
+
+ +