""" SFM events service — bridge between terra-view's UnitAssignment time-windows and the SFM (seismo-relay) events store. Architecture: 1. Terra-view owns the *assignment graph*: which seismograph was at which monitoring location during which time window (UnitAssignment rows). 2. SFM owns the *events store*: triggered waveform events keyed by (serial, timestamp), forwarded from Blastware ACH by series3-watcher. 3. This module fans out the assignments for a given location, queries SFM for the events emitted by each (serial, window) pair concurrently, and unions/sorts/paginates the results. SFM remains the single source of truth for events. Terra-view does not copy events into its own DB; every query hits SFM live. The events_for_location helper is also reused by Phase 3 (project-level roll-up) to aggregate across every location in a project. """ from __future__ import annotations import asyncio import logging import os from datetime import datetime, timezone from typing import Optional import httpx from sqlalchemy.orm import Session from backend.models import UnitAssignment, RosterUnit log = logging.getLogger("backend.services.sfm_events") SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200") # Per-request timeout when calling SFM /db/events. SFM is local on the # docker network so this should be fast; bump if you start seeing timeouts. _SFM_TIMEOUT_SECONDS = 10.0 # Max events we ever fetch per (serial, window) call to SFM. Must match # SFM's own /db/events max limit (currently 5000). The user-facing display # limit is independent — we over-fetch up to this cap so summary stats are # accurate, then trim the displayed list to the requested limit. _SFM_FETCH_CEILING = 5000 # ── Helpers ─────────────────────────────────────────────────────────────────── def _iso_utc(dt: Optional[datetime]) -> Optional[str]: """Render a datetime in the ISO format SFM /db/events expects.""" if dt is None: return None # SFM parses naive ISO strings as UTC; strip tzinfo for consistency. if dt.tzinfo is not None: dt = dt.astimezone(timezone.utc).replace(tzinfo=None) return dt.isoformat(sep=" ", timespec="seconds") def _intersect_window( assignment_start: datetime, assignment_end: Optional[datetime], filter_from: Optional[datetime], filter_to: Optional[datetime], now: datetime, ) -> Optional[tuple[datetime, datetime]]: """Intersect an assignment window with the requested filter window. Returns (effective_start, effective_end) or None if there's no overlap. Open-ended assignments (assigned_until=NULL) are bounded by `now`. """ a_end = assignment_end or now if filter_from and a_end <= filter_from: return None if filter_to and assignment_start >= filter_to: return None start = max(assignment_start, filter_from) if filter_from else assignment_start end = min(a_end, filter_to) if filter_to else a_end if end <= start: return None return (start, end) async def _fetch_events_for_serial( client: httpx.AsyncClient, serial: str, *, from_dt: datetime, to_dt: datetime, false_trigger: Optional[bool], limit: int, ) -> list[dict]: """Issue one /db/events call to SFM for one (serial, window) pair.""" params: dict[str, str] = { "serial": serial, "from_dt": _iso_utc(from_dt) or "", "to_dt": _iso_utc(to_dt) or "", "limit": str(limit), } if false_trigger is not None: params["false_trigger"] = "true" if false_trigger else "false" try: resp = await client.get(f"{SFM_BASE_URL}/db/events", params=params) resp.raise_for_status() except httpx.HTTPError as e: log.warning("SFM /db/events failed for serial=%s: %s", serial, e) return [] payload = resp.json() events = payload.get("events", []) or [] # Strip waveform_blob if present — it's the big per-event binary and we # don't render it in the list view. SFM returns it by default. for ev in events: ev.pop("waveform_blob", None) ev.pop("a5_pickle_filename", None) return events # ── Public API ──────────────────────────────────────────────────────────────── async def events_for_location( db: Session, location_id: str, *, from_dt: Optional[datetime] = None, to_dt: Optional[datetime] = None, false_trigger: Optional[bool] = None, limit: int = 500, ) -> dict: """Fan out UnitAssignment rows for `location_id` and union SFM events. Returns: { "events": [merged event dicts, newest first, capped at limit], "count": total events found across all windows (pre-cap), "stats": {event_count, peak_pvs, peak_pvs_at, last_event, false_trigger_count}, "assignments_used": [{unit_id, assigned_at, assigned_until, events_in_window}, ...], } The "events outside any assignment window" rule (Phase 1 design decision): events whose timestamp falls outside every assignment window are simply not fetched — we only ask SFM for events inside the intersected windows. Those orphan events surface under the per-unit detail page in Phase 2. """ # 1. Fetch all assignments (active + closed) for the location. assignments = ( db.query(UnitAssignment) .filter(UnitAssignment.location_id == location_id) .filter(UnitAssignment.device_type == "seismograph") .order_by(UnitAssignment.assigned_at.asc()) .all() ) if not assignments: return { "events": [], "count": 0, "stats": _empty_stats(), "assignments_used": [], } now = datetime.utcnow() # 2. For each assignment, compute the effective (start, end) window after # intersecting with the requested filter range. Drop assignments that # don't overlap the filter window. fetch_specs: list[tuple[UnitAssignment, datetime, datetime]] = [] for a in assignments: window = _intersect_window(a.assigned_at, a.assigned_until, from_dt, to_dt, now) if window is not None: fetch_specs.append((a, window[0], window[1])) if not fetch_specs: return { "events": [], "count": 0, "stats": _empty_stats(), "assignments_used": [ { "unit_id": a.unit_id, "assigned_at": _iso_utc(a.assigned_at), "assigned_until": _iso_utc(a.assigned_until), "events_in_window": 0, } for a in assignments ], } # 3. Concurrent SFM fetches. We over-fetch (up to _SFM_FETCH_CEILING per # window) so summary stats reflect the true peak/last/count across the # full filter window, not just what fits in the user's display limit. # The displayed event list is trimmed to `limit` after merge. async with httpx.AsyncClient(timeout=_SFM_TIMEOUT_SECONDS) as client: per_window_lists = await asyncio.gather( *( _fetch_events_for_serial( client, serial=a.unit_id, from_dt=start, to_dt=end, false_trigger=false_trigger, limit=_SFM_FETCH_CEILING, ) for a, start, end in fetch_specs ), return_exceptions=False, ) # 4. Build the per-assignment event counts (transparency for the operator). spec_event_counts: dict[str, int] = {} for (a, _start, _end), evs in zip(fetch_specs, per_window_lists): spec_event_counts[a.id] = len(evs) # 5. Union, sort newest-first, cap. merged: list[dict] = [] for evs in per_window_lists: merged.extend(evs) merged.sort(key=lambda e: e.get("timestamp") or "", reverse=True) total_count = len(merged) capped = merged[:limit] # 6. Compute summary stats over the full merged set (not the capped one). stats = _compute_stats(merged) # 7. Build the assignments_used report (every assignment, in chronological # order, with its event count — even ones that fell outside the filter # window so the operator sees them but with count=0). assignments_used = [] for a in assignments: assignments_used.append( { "unit_id": a.unit_id, "assignment_id": a.id, "assigned_at": _iso_utc(a.assigned_at), "assigned_until": _iso_utc(a.assigned_until), "events_in_window": spec_event_counts.get(a.id, 0), "status": a.status, } ) return { "events": capped, "count": total_count, "stats": stats, "assignments_used": assignments_used, } # ── Stats helpers ───────────────────────────────────────────────────────────── def _empty_stats() -> dict: return { "event_count": 0, "peak_pvs": None, "peak_pvs_at": None, "peak_pvs_serial": None, "last_event": None, "false_trigger_count": 0, } def _compute_stats(events: list[dict]) -> dict: """Roll up summary stats from a merged event list. Cheap O(N) pass.""" if not events: return _empty_stats() peak_pvs = None peak_pvs_at = None peak_pvs_serial = None last_event = None false_trigger_count = 0 for ev in events: pvs = ev.get("peak_vector_sum") if pvs is not None and (peak_pvs is None or pvs > peak_pvs): peak_pvs = pvs peak_pvs_at = ev.get("timestamp") peak_pvs_serial = ev.get("serial") ts = ev.get("timestamp") if ts and (last_event is None or ts > last_event): last_event = ts if ev.get("false_trigger"): false_trigger_count += 1 return { "event_count": len(events), "peak_pvs": peak_pvs, "peak_pvs_at": peak_pvs_at, "peak_pvs_serial": peak_pvs_serial, "last_event": last_event, "false_trigger_count": false_trigger_count, }