""" SFM events service — bridge between terra-view's UnitAssignment time-windows and the SFM (seismo-relay) events store. Architecture: 1. Terra-view owns the *assignment graph*: which seismograph was at which monitoring location during which time window (UnitAssignment rows). 2. SFM owns the *events store*: triggered waveform events keyed by (serial, timestamp), forwarded from Blastware ACH by series3-watcher. 3. This module fans out the assignments for a given location, queries SFM for the events emitted by each (serial, window) pair concurrently, and unions/sorts/paginates the results. SFM remains the single source of truth for events. Terra-view does not copy events into its own DB; every query hits SFM live. The events_for_location helper is also reused by Phase 3 (project-level roll-up) to aggregate across every location in a project. """ from __future__ import annotations import asyncio import logging import os from datetime import datetime, timezone from typing import Optional import httpx from sqlalchemy.orm import Session from backend.models import UnitAssignment, RosterUnit, MonitoringLocation, Project log = logging.getLogger("backend.services.sfm_events") SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200") # Per-request timeout when calling SFM /db/events. SFM is local on the # docker network so this should be fast; bump if you start seeing timeouts. _SFM_TIMEOUT_SECONDS = 10.0 # Max events we ever fetch per (serial, window) call to SFM. Must match # SFM's own /db/events max limit (currently 5000). The user-facing display # limit is independent — we over-fetch up to this cap so summary stats are # accurate, then trim the displayed list to the requested limit. _SFM_FETCH_CEILING = 5000 # ── Helpers ─────────────────────────────────────────────────────────────────── def _iso_utc(dt: Optional[datetime]) -> Optional[str]: """Render a datetime in the ISO format SFM /db/events expects.""" if dt is None: return None # SFM parses naive ISO strings as UTC; strip tzinfo for consistency. if dt.tzinfo is not None: dt = dt.astimezone(timezone.utc).replace(tzinfo=None) return dt.isoformat(sep=" ", timespec="seconds") def _intersect_window( assignment_start: datetime, assignment_end: Optional[datetime], filter_from: Optional[datetime], filter_to: Optional[datetime], now: datetime, ) -> Optional[tuple[datetime, datetime]]: """Intersect an assignment window with the requested filter window. Returns (effective_start, effective_end) or None if there's no overlap. Open-ended assignments (assigned_until=NULL) are bounded by `now`. """ a_end = assignment_end or now if filter_from and a_end <= filter_from: return None if filter_to and assignment_start >= filter_to: return None start = max(assignment_start, filter_from) if filter_from else assignment_start end = min(a_end, filter_to) if filter_to else a_end if end <= start: return None return (start, end) async def _fetch_events_for_serial( client: httpx.AsyncClient, serial: str, *, from_dt: datetime, to_dt: datetime, false_trigger: Optional[bool], limit: int, ) -> list[dict]: """Issue one /db/events call to SFM for one (serial, window) pair.""" params: dict[str, str] = { "serial": serial, "from_dt": _iso_utc(from_dt) or "", "to_dt": _iso_utc(to_dt) or "", "limit": str(limit), } if false_trigger is not None: params["false_trigger"] = "true" if false_trigger else "false" try: resp = await client.get(f"{SFM_BASE_URL}/db/events", params=params) resp.raise_for_status() except httpx.HTTPError as e: log.warning("SFM /db/events failed for serial=%s: %s", serial, e) return [] payload = resp.json() events = payload.get("events", []) or [] # Strip waveform_blob if present — it's the big per-event binary and we # don't render it in the list view. SFM returns it by default. for ev in events: ev.pop("waveform_blob", None) ev.pop("a5_pickle_filename", None) return events # ── Public API ──────────────────────────────────────────────────────────────── async def events_for_location( db: Session, location_id: str, *, from_dt: Optional[datetime] = None, to_dt: Optional[datetime] = None, false_trigger: Optional[bool] = None, limit: int = 500, ) -> dict: """Fan out UnitAssignment rows for `location_id` and union SFM events. Returns: { "events": [merged event dicts, newest first, capped at limit], "count": total events found across all windows (pre-cap), "stats": {event_count, peak_pvs, peak_pvs_at, last_event, false_trigger_count}, "assignments_used": [{unit_id, assigned_at, assigned_until, events_in_window}, ...], } The "events outside any assignment window" rule (Phase 1 design decision): events whose timestamp falls outside every assignment window are simply not fetched — we only ask SFM for events inside the intersected windows. Those orphan events surface under the per-unit detail page in Phase 2. """ # 1. Fetch all assignments (active + closed) for the location. assignments = ( db.query(UnitAssignment) .filter(UnitAssignment.location_id == location_id) .filter(UnitAssignment.device_type == "seismograph") .order_by(UnitAssignment.assigned_at.asc()) .all() ) if not assignments: return { "events": [], "count": 0, "stats": _empty_stats(), "assignments_used": [], } now = datetime.utcnow() # 2. For each assignment, compute the effective (start, end) window after # intersecting with the requested filter range. Drop assignments that # don't overlap the filter window. fetch_specs: list[tuple[UnitAssignment, datetime, datetime]] = [] for a in assignments: window = _intersect_window(a.assigned_at, a.assigned_until, from_dt, to_dt, now) if window is not None: fetch_specs.append((a, window[0], window[1])) if not fetch_specs: return { "events": [], "count": 0, "stats": _empty_stats(), "assignments_used": [ { "unit_id": a.unit_id, "assigned_at": _iso_utc(a.assigned_at), "assigned_until": _iso_utc(a.assigned_until), "events_in_window": 0, } for a in assignments ], } # 3. Concurrent SFM fetches. We over-fetch (up to _SFM_FETCH_CEILING per # window) so summary stats reflect the true peak/last/count across the # full filter window, not just what fits in the user's display limit. # The displayed event list is trimmed to `limit` after merge. async with httpx.AsyncClient(timeout=_SFM_TIMEOUT_SECONDS) as client: per_window_lists = await asyncio.gather( *( _fetch_events_for_serial( client, serial=a.unit_id, from_dt=start, to_dt=end, false_trigger=false_trigger, limit=_SFM_FETCH_CEILING, ) for a, start, end in fetch_specs ), return_exceptions=False, ) # 4. Build the per-assignment event counts (transparency for the operator). spec_event_counts: dict[str, int] = {} for (a, _start, _end), evs in zip(fetch_specs, per_window_lists): spec_event_counts[a.id] = len(evs) # 5. Union, sort newest-first, cap. merged: list[dict] = [] for evs in per_window_lists: merged.extend(evs) merged.sort(key=lambda e: e.get("timestamp") or "", reverse=True) total_count = len(merged) capped = merged[:limit] # 6. Compute summary stats over the full merged set (not the capped one). stats = _compute_stats(merged) # 7. Build the assignments_used report (every assignment, in chronological # order, with its event count — even ones that fell outside the filter # window so the operator sees them but with count=0). assignments_used = [] for a in assignments: assignments_used.append( { "unit_id": a.unit_id, "assignment_id": a.id, "assigned_at": _iso_utc(a.assigned_at), "assigned_until": _iso_utc(a.assigned_until), "events_in_window": spec_event_counts.get(a.id, 0), "status": a.status, } ) return { "events": capped, "count": total_count, "stats": stats, "assignments_used": assignments_used, } # ── Per-unit (cross-project) view ───────────────────────────────────────────── async def events_for_unit( db: Session, unit_id: str, *, bucket: str = "all", # "all" | "attributed" | "unattributed" from_dt: Optional[datetime] = None, to_dt: Optional[datetime] = None, false_trigger: Optional[bool] = None, limit: int = 500, ) -> dict: """Return events for a unit annotated with their assignment attribution. Unlike events_for_location (which queries SFM per assignment window), this helper queries SFM for ALL events for the serial within the optional [from_dt, to_dt] filter, then walks each event against the unit's UnitAssignment intervals to compute attribution. Bucket semantics: - "all": every event, attributed or not - "attributed": events that fall inside at least one assignment window - "unattributed": events with no overlapping assignment (the diagnostic bucket — operator should fix assignment dates to attribute these) Each event gets an extra `attribution` field: {assignment_id, location_id, location_name, project_id, project_name, assigned_at, assigned_until} or None Unattributed events also get a `nearest_assignment` field with the same shape plus `delta_days` (signed; negative = event before assignment). """ # 1. Pull all assignments for this unit (any device_type — caller has # already filtered by seismograph in the route). Order matters: we # want the earliest-start assignment first so attribution prefers the # chronologically-first overlap when there are simultaneous active # assignments at different locations (rare but possible). assignments = ( db.query(UnitAssignment) .filter(UnitAssignment.unit_id == unit_id) .order_by(UnitAssignment.assigned_at.asc()) .all() ) # Resolve location + project names once. loc_ids = {a.location_id for a in assignments} proj_ids = {a.project_id for a in assignments} loc_map = { l.id: l for l in db.query(MonitoringLocation).filter( MonitoringLocation.id.in_(loc_ids) ).all() } if loc_ids else {} proj_map = { p.id: p for p in db.query(Project).filter( Project.id.in_(proj_ids) ).all() } if proj_ids else {} now = datetime.utcnow() def _attr_dict(a: UnitAssignment) -> dict: loc = loc_map.get(a.location_id) proj = proj_map.get(a.project_id) return { "assignment_id": a.id, "location_id": a.location_id, "location_name": loc.name if loc else None, "project_id": a.project_id, "project_name": proj.name if proj else None, "assigned_at": _iso_utc(a.assigned_at), "assigned_until": _iso_utc(a.assigned_until), } # 2. Fetch all events for this serial in one shot. async with httpx.AsyncClient(timeout=_SFM_TIMEOUT_SECONDS) as client: events = await _fetch_events_for_serial( client, serial=unit_id, from_dt=from_dt or datetime(1970, 1, 1), to_dt=to_dt or now, false_trigger=false_trigger, limit=_SFM_FETCH_CEILING, ) # 3. For each event, walk the assignment list and find the first # overlapping window. O(N * M) but both are small in practice. for ev in events: ts_str = ev.get("timestamp") if not ts_str: ev["attribution"] = None continue try: # SFM returns ISO with "T" separator; tolerate both. ts = datetime.fromisoformat(ts_str.replace(" ", "T")) except ValueError: ev["attribution"] = None continue matched: Optional[UnitAssignment] = None for a in assignments: a_end = a.assigned_until or now if a.assigned_at <= ts <= a_end: matched = a break if matched is not None: ev["attribution"] = _attr_dict(matched) else: ev["attribution"] = None # Find the nearest assignment (chronologically) for diagnostic. if assignments: nearest = min( assignments, key=lambda a: min( abs((ts - a.assigned_at).total_seconds()), abs((ts - (a.assigned_until or now)).total_seconds()), ), ) # Signed delta in days from the nearest boundary # (negative = event BEFORE that boundary). if ts < nearest.assigned_at: delta_seconds = (ts - nearest.assigned_at).total_seconds() elif ts > (nearest.assigned_until or now): delta_seconds = (ts - (nearest.assigned_until or now)).total_seconds() else: delta_seconds = 0 ev["nearest_assignment"] = { **_attr_dict(nearest), "delta_days": round(delta_seconds / 86400, 1), } # 4. Apply bucket filter. if bucket == "attributed": filtered = [e for e in events if e.get("attribution") is not None] elif bucket == "unattributed": filtered = [e for e in events if e.get("attribution") is None] else: filtered = events filtered.sort(key=lambda e: e.get("timestamp") or "", reverse=True) total_count = len(filtered) capped = filtered[:limit] # 5. Stats: compute over the ENTIRE event set (not the filtered bucket) # so the unattributed_count tile is always meaningful regardless of # which bucket the operator has selected. base_stats = _compute_stats(events) unattributed_count = sum( 1 for e in events if e.get("attribution") is None ) base_stats["unattributed_count"] = unattributed_count return { "events": capped, "count": total_count, "stats": base_stats, "assignments_total": len(assignments), } # ── Stats helpers ───────────────────────────────────────────────────────────── def _empty_stats() -> dict: return { "event_count": 0, "peak_pvs": None, "peak_pvs_at": None, "peak_pvs_serial": None, "last_event": None, "false_trigger_count": 0, } def _compute_stats(events: list[dict]) -> dict: """Roll up summary stats from a merged event list. Cheap O(N) pass.""" if not events: return _empty_stats() peak_pvs = None peak_pvs_at = None peak_pvs_serial = None last_event = None false_trigger_count = 0 for ev in events: pvs = ev.get("peak_vector_sum") if pvs is not None and (peak_pvs is None or pvs > peak_pvs): peak_pvs = pvs peak_pvs_at = ev.get("timestamp") peak_pvs_serial = ev.get("serial") ts = ev.get("timestamp") if ts and (last_event is None or ts > last_event): last_event = ts if ev.get("false_trigger"): false_trigger_count += 1 return { "event_count": len(events), "peak_pvs": peak_pvs, "peak_pvs_at": peak_pvs_at, "peak_pvs_serial": peak_pvs_serial, "last_event": last_event, "false_trigger_count": false_trigger_count, }