Files
terra-view/backend/services/sfm_events.py
T
serversdown d5a0163852 feat(locations): soft-remove monitoring locations without destroying history
When a client drops a location from scope mid-project (e.g. the office
half of a museum+office monitoring job), operators couldn't previously
mark it as no-longer-active without either deleting it (which would
orphan historical events) or leaving it in the active list looking
deployable.  Now there's a proper middle ground.

Data model
- MonitoringLocation gets two new nullable columns:
  - removed_at      — NULL means active; set means soft-removed
  - removal_reason  — optional operator note
  Migration: backend/migrate_add_location_removed.py (idempotent)

Endpoints
- POST /api/projects/{p}/locations/{l}/remove
    Body: { effective_date?: ISO-datetime, reason?: str }
    Side effects (cascade):
      1. Closes active UnitAssignment rows at this location
         (assigned_until = effective_date, status = "completed")
      2. Cancels pending ScheduledActions at this location
      3. Marks location.removed_at = effective_date
    Returns counts of assignments closed + actions cancelled.
- POST /api/projects/{p}/locations/{l}/restore
    Clears removed_at + removal_reason.  Does NOT auto-reopen
    assignments — operator creates new ones if resuming monitoring.

Active-surface filters
- locations-json defaults to active-only; pass include_removed=true
  for historical / reporting views.  Schedule modal dropdowns now
  exclude removed locations automatically.
- Metadata-backfill fuzzy matcher excludes removed locations from
  proposed targets (don't want backfill creating new assignments at
  decommissioned locations).
- Vibration-summary per_location rollup includes removed locations
  (so historical event totals stay accurate) but tags each with
  removed_at so the UI can show a badge.

UI
- Project detail page's Monitoring Locations section now splits into:
    Active locations (full card with Assign / Edit / Remove / Delete)
    Removed locations (collapsed <details>, greyed cards, Restore button,
                       shows removal date + reason)
- New per-card "Remove" button → opens confirmation modal explaining
  the cascade, with optional effective-date (defaults to now,
  backdateable) and reason fields.
- Unit detail's SFM Events attribution cell shows a small "removed"
  badge next to historical attributions whose location is no longer
  active.  Same pattern in vibration_summary's top-locations list.
- Soft-removal indicator surfaced through the events_for_unit
  attribution payload as location_removed_at.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-14 22:22:40 +00:00

602 lines
22 KiB
Python

"""
SFM events service — bridge between terra-view's UnitAssignment time-windows
and the SFM (seismo-relay) events store.
Architecture:
1. Terra-view owns the *assignment graph*: which seismograph was at which
monitoring location during which time window (UnitAssignment rows).
2. SFM owns the *events store*: triggered waveform events keyed by
(serial, timestamp), forwarded from Blastware ACH by series3-watcher.
3. This module fans out the assignments for a given location, queries SFM
for the events emitted by each (serial, window) pair concurrently, and
unions/sorts/paginates the results.
SFM remains the single source of truth for events. Terra-view does not
copy events into its own DB; every query hits SFM live.
The events_for_location helper is also reused by Phase 3 (project-level
roll-up) to aggregate across every location in a project.
"""
from __future__ import annotations
import asyncio
import logging
import os
from datetime import datetime, timezone
from typing import Optional
import httpx
from sqlalchemy.orm import Session
from backend.models import UnitAssignment, RosterUnit, MonitoringLocation, Project
log = logging.getLogger("backend.services.sfm_events")
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
# Per-request timeout when calling SFM /db/events. SFM is local on the
# docker network so this should be fast; bump if you start seeing timeouts.
_SFM_TIMEOUT_SECONDS = 10.0
# Max events we ever fetch per (serial, window) call to SFM. Must match
# SFM's own /db/events max limit (currently 5000). The user-facing display
# limit is independent — we over-fetch up to this cap so summary stats are
# accurate, then trim the displayed list to the requested limit.
_SFM_FETCH_CEILING = 5000
# ── Helpers ───────────────────────────────────────────────────────────────────
def _iso_utc(dt: Optional[datetime]) -> Optional[str]:
"""Render a datetime in the ISO format SFM /db/events expects."""
if dt is None:
return None
# SFM parses naive ISO strings as UTC; strip tzinfo for consistency.
if dt.tzinfo is not None:
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
return dt.isoformat(sep=" ", timespec="seconds")
def _intersect_window(
assignment_start: datetime,
assignment_end: Optional[datetime],
filter_from: Optional[datetime],
filter_to: Optional[datetime],
now: datetime,
) -> Optional[tuple[datetime, datetime]]:
"""Intersect an assignment window with the requested filter window.
Returns (effective_start, effective_end) or None if there's no overlap.
Open-ended assignments (assigned_until=NULL) are bounded by `now`.
"""
a_end = assignment_end or now
if filter_from and a_end <= filter_from:
return None
if filter_to and assignment_start >= filter_to:
return None
start = max(assignment_start, filter_from) if filter_from else assignment_start
end = min(a_end, filter_to) if filter_to else a_end
if end <= start:
return None
return (start, end)
async def _fetch_events_for_serial(
client: httpx.AsyncClient,
serial: str,
*,
from_dt: datetime,
to_dt: datetime,
false_trigger: Optional[bool],
limit: int,
) -> list[dict]:
"""Issue one /db/events call to SFM for one (serial, window) pair."""
params: dict[str, str] = {
"serial": serial,
"from_dt": _iso_utc(from_dt) or "",
"to_dt": _iso_utc(to_dt) or "",
"limit": str(limit),
}
if false_trigger is not None:
params["false_trigger"] = "true" if false_trigger else "false"
try:
resp = await client.get(f"{SFM_BASE_URL}/db/events", params=params)
resp.raise_for_status()
except httpx.HTTPError as e:
log.warning("SFM /db/events failed for serial=%s: %s", serial, e)
return []
payload = resp.json()
events = payload.get("events", []) or []
# Strip waveform_blob if present — it's the big per-event binary and we
# don't render it in the list view. SFM returns it by default.
for ev in events:
ev.pop("waveform_blob", None)
ev.pop("a5_pickle_filename", None)
return events
# ── Public API ────────────────────────────────────────────────────────────────
async def events_for_location(
db: Session,
location_id: str,
*,
from_dt: Optional[datetime] = None,
to_dt: Optional[datetime] = None,
false_trigger: Optional[bool] = None,
limit: int = 500,
) -> dict:
"""Fan out UnitAssignment rows for `location_id` and union SFM events.
Returns:
{
"events": [merged event dicts, newest first, capped at limit],
"count": total events found across all windows (pre-cap),
"stats": {event_count, peak_pvs, peak_pvs_at,
last_event, false_trigger_count},
"assignments_used": [{unit_id, assigned_at, assigned_until,
events_in_window}, ...],
}
The "events outside any assignment window" rule (Phase 1 design decision):
events whose timestamp falls outside every assignment window are simply
not fetched — we only ask SFM for events inside the intersected windows.
Those orphan events surface under the per-unit detail page in Phase 2.
"""
# 1. Fetch all assignments (active + closed) for the location.
assignments = (
db.query(UnitAssignment)
.filter(UnitAssignment.location_id == location_id)
.filter(UnitAssignment.device_type == "seismograph")
.order_by(UnitAssignment.assigned_at.asc())
.all()
)
if not assignments:
return {
"events": [],
"count": 0,
"stats": _empty_stats(),
"assignments_used": [],
}
now = datetime.utcnow()
# 2. For each assignment, compute the effective (start, end) window after
# intersecting with the requested filter range. Drop assignments that
# don't overlap the filter window.
fetch_specs: list[tuple[UnitAssignment, datetime, datetime]] = []
for a in assignments:
window = _intersect_window(a.assigned_at, a.assigned_until, from_dt, to_dt, now)
if window is not None:
fetch_specs.append((a, window[0], window[1]))
if not fetch_specs:
return {
"events": [],
"count": 0,
"stats": _empty_stats(),
"assignments_used": [
{
"unit_id": a.unit_id,
"assigned_at": _iso_utc(a.assigned_at),
"assigned_until": _iso_utc(a.assigned_until),
"events_in_window": 0,
}
for a in assignments
],
}
# 3. Concurrent SFM fetches. We over-fetch (up to _SFM_FETCH_CEILING per
# window) so summary stats reflect the true peak/last/count across the
# full filter window, not just what fits in the user's display limit.
# The displayed event list is trimmed to `limit` after merge.
async with httpx.AsyncClient(timeout=_SFM_TIMEOUT_SECONDS) as client:
per_window_lists = await asyncio.gather(
*(
_fetch_events_for_serial(
client,
serial=a.unit_id,
from_dt=start,
to_dt=end,
false_trigger=false_trigger,
limit=_SFM_FETCH_CEILING,
)
for a, start, end in fetch_specs
),
return_exceptions=False,
)
# 4. Build the per-assignment event counts (transparency for the operator).
spec_event_counts: dict[str, int] = {}
for (a, _start, _end), evs in zip(fetch_specs, per_window_lists):
spec_event_counts[a.id] = len(evs)
# 5. Union, sort newest-first, cap.
merged: list[dict] = []
for evs in per_window_lists:
merged.extend(evs)
merged.sort(key=lambda e: e.get("timestamp") or "", reverse=True)
total_count = len(merged)
capped = merged[:limit]
# 6. Compute summary stats over the full merged set (not the capped one).
stats = _compute_stats(merged)
# 7. Build the assignments_used report (every assignment, in chronological
# order, with its event count — even ones that fell outside the filter
# window so the operator sees them but with count=0).
assignments_used = []
for a in assignments:
assignments_used.append(
{
"unit_id": a.unit_id,
"assignment_id": a.id,
"assigned_at": _iso_utc(a.assigned_at),
"assigned_until": _iso_utc(a.assigned_until),
"events_in_window": spec_event_counts.get(a.id, 0),
"status": a.status,
}
)
return {
"events": capped,
"count": total_count,
"stats": stats,
"assignments_used": assignments_used,
}
# ── Per-unit (cross-project) view ─────────────────────────────────────────────
async def events_for_unit(
db: Session,
unit_id: str,
*,
bucket: str = "all", # "all" | "attributed" | "unattributed"
from_dt: Optional[datetime] = None,
to_dt: Optional[datetime] = None,
false_trigger: Optional[bool] = None,
limit: int = 500,
) -> dict:
"""Return events for a unit annotated with their assignment attribution.
Unlike events_for_location (which queries SFM per assignment window), this
helper queries SFM for ALL events for the serial within the optional
[from_dt, to_dt] filter, then walks each event against the unit's
UnitAssignment intervals to compute attribution.
Bucket semantics:
- "all": every event, attributed or not
- "attributed": events that fall inside at least one assignment window
- "unattributed": events with no overlapping assignment (the diagnostic
bucket — operator should fix assignment dates to
attribute these)
Each event gets an extra `attribution` field:
{assignment_id, location_id, location_name, project_id, project_name,
assigned_at, assigned_until} or None
Unattributed events also get a `nearest_assignment` field with the
same shape plus `delta_days` (signed; negative = event before assignment).
"""
# 1. Pull all assignments for this unit (any device_type — caller has
# already filtered by seismograph in the route). Order matters: we
# want the earliest-start assignment first so attribution prefers the
# chronologically-first overlap when there are simultaneous active
# assignments at different locations (rare but possible).
assignments = (
db.query(UnitAssignment)
.filter(UnitAssignment.unit_id == unit_id)
.order_by(UnitAssignment.assigned_at.asc())
.all()
)
# Resolve location + project names once.
loc_ids = {a.location_id for a in assignments}
proj_ids = {a.project_id for a in assignments}
loc_map = {
l.id: l for l in db.query(MonitoringLocation).filter(
MonitoringLocation.id.in_(loc_ids)
).all()
} if loc_ids else {}
proj_map = {
p.id: p for p in db.query(Project).filter(
Project.id.in_(proj_ids)
).all()
} if proj_ids else {}
now = datetime.utcnow()
def _attr_dict(a: UnitAssignment) -> dict:
loc = loc_map.get(a.location_id)
proj = proj_map.get(a.project_id)
return {
"assignment_id": a.id,
"location_id": a.location_id,
"location_name": loc.name if loc else None,
# Soft-removal indicator so the UI can render a "(removed)"
# badge next to historical attributions whose location is no
# longer actively monitored.
"location_removed_at": (loc.removed_at.isoformat()
if loc and loc.removed_at else None),
"project_id": a.project_id,
"project_name": proj.name if proj else None,
"assigned_at": _iso_utc(a.assigned_at),
"assigned_until": _iso_utc(a.assigned_until),
}
# 2. Fetch all events for this serial in one shot.
async with httpx.AsyncClient(timeout=_SFM_TIMEOUT_SECONDS) as client:
events = await _fetch_events_for_serial(
client,
serial=unit_id,
from_dt=from_dt or datetime(1970, 1, 1),
to_dt=to_dt or now,
false_trigger=false_trigger,
limit=_SFM_FETCH_CEILING,
)
# 3. For each event, walk the assignment list and find the first
# overlapping window. O(N * M) but both are small in practice.
for ev in events:
ts_str = ev.get("timestamp")
if not ts_str:
ev["attribution"] = None
continue
try:
# SFM returns ISO with "T" separator; tolerate both.
ts = datetime.fromisoformat(ts_str.replace(" ", "T"))
except ValueError:
ev["attribution"] = None
continue
matched: Optional[UnitAssignment] = None
for a in assignments:
a_end = a.assigned_until or now
if a.assigned_at <= ts <= a_end:
matched = a
break
if matched is not None:
ev["attribution"] = _attr_dict(matched)
else:
ev["attribution"] = None
# Find the nearest assignment (chronologically) for diagnostic.
if assignments:
nearest = min(
assignments,
key=lambda a: min(
abs((ts - a.assigned_at).total_seconds()),
abs((ts - (a.assigned_until or now)).total_seconds()),
),
)
# Signed delta in days from the nearest boundary
# (negative = event BEFORE that boundary).
if ts < nearest.assigned_at:
delta_seconds = (ts - nearest.assigned_at).total_seconds()
elif ts > (nearest.assigned_until or now):
delta_seconds = (ts - (nearest.assigned_until or now)).total_seconds()
else:
delta_seconds = 0
ev["nearest_assignment"] = {
**_attr_dict(nearest),
"delta_days": round(delta_seconds / 86400, 1),
}
# 4. Apply bucket filter.
if bucket == "attributed":
filtered = [e for e in events if e.get("attribution") is not None]
elif bucket == "unattributed":
filtered = [e for e in events if e.get("attribution") is None]
else:
filtered = events
filtered.sort(key=lambda e: e.get("timestamp") or "", reverse=True)
total_count = len(filtered)
capped = filtered[:limit]
# 5. Stats: compute over the ENTIRE event set (not the filtered bucket)
# so the unattributed_count tile is always meaningful regardless of
# which bucket the operator has selected.
base_stats = _compute_stats(events)
unattributed_count = sum(
1 for e in events if e.get("attribution") is None
)
base_stats["unattributed_count"] = unattributed_count
return {
"events": capped,
"count": total_count,
"stats": base_stats,
"assignments_total": len(assignments),
}
# ── Project-level roll-up (aggregates across all vibration locations) ─────────
async def vibration_summary_for_project(
db: Session,
project_id: str,
*,
from_dt: Optional[datetime] = None,
to_dt: Optional[datetime] = None,
) -> dict:
"""Aggregate SFM events across every vibration location in a project.
Returns:
{
"project_id": str,
"total_events": int,
"peak_pvs": float | None,
"peak_pvs_at": ISO timestamp | None,
"peak_pvs_location_id": str | None,
"peak_pvs_location_name": str | None,
"last_event": ISO timestamp | None,
"false_trigger_count": int,
"per_location": [
{"location_id", "location_name", "event_count",
"peak_pvs", "last_event"},
... # sorted by event_count DESC
],
"vibration_location_count": int,
}
"""
locations = (
db.query(MonitoringLocation)
.filter(MonitoringLocation.project_id == project_id)
.filter(MonitoringLocation.location_type == "vibration")
.all()
)
if not locations:
return {
"project_id": project_id,
"total_events": 0,
"peak_pvs": None,
"peak_pvs_at": None,
"peak_pvs_location_id": None,
"peak_pvs_location_name": None,
"last_event": None,
"false_trigger_count": 0,
"per_location": [],
"vibration_location_count": 0,
}
# Fan out across locations. Each call internally fans out across that
# location's UnitAssignment rows, so this is a nested fan-out. Both
# tiers happen concurrently because asyncio.gather + httpx pool.
results = await asyncio.gather(
*(
events_for_location(
db,
loc.id,
from_dt=from_dt,
to_dt=to_dt,
false_trigger=None,
limit=1, # We only need stats; events list itself is ignored.
)
for loc in locations
),
return_exceptions=False,
)
per_location: list[dict] = []
total_events = 0
peak_pvs = None
peak_pvs_at = None
peak_pvs_location_id = None
peak_pvs_location_name = None
last_event = None
false_trigger_count = 0
for loc, res in zip(locations, results):
st = res.get("stats", {}) or {}
ec = st.get("event_count", 0) or 0
total_events += ec
false_trigger_count += st.get("false_trigger_count", 0) or 0
ev_last = st.get("last_event")
if ev_last and (last_event is None or ev_last > last_event):
last_event = ev_last
ev_peak = st.get("peak_pvs")
if ev_peak is not None and (peak_pvs is None or ev_peak > peak_pvs):
peak_pvs = ev_peak
peak_pvs_at = st.get("peak_pvs_at")
peak_pvs_location_id = loc.id
peak_pvs_location_name = loc.name
per_location.append({
"location_id": loc.id,
"location_name": loc.name,
"event_count": ec,
"peak_pvs": ev_peak,
"last_event": ev_last,
# Soft-removal state — UI can show a "(removed)" badge in the
# per-location list so operators see at a glance that a row's
# numbers are historical-only.
"removed_at": loc.removed_at.isoformat() if loc.removed_at else None,
})
per_location.sort(key=lambda r: r["event_count"], reverse=True)
return {
"project_id": project_id,
"total_events": total_events,
"peak_pvs": peak_pvs,
"peak_pvs_at": peak_pvs_at,
"peak_pvs_location_id": peak_pvs_location_id,
"peak_pvs_location_name": peak_pvs_location_name,
"last_event": last_event,
"false_trigger_count": false_trigger_count,
"per_location": per_location,
"vibration_location_count": len(locations),
}
# ── Stats helpers ─────────────────────────────────────────────────────────────
def _empty_stats() -> dict:
return {
"event_count": 0,
"peak_pvs": None,
"peak_pvs_at": None,
"peak_pvs_serial": None,
"last_event": None,
"false_trigger_count": 0,
}
def _compute_stats(events: list[dict]) -> dict:
"""Roll up summary stats from a merged event list. Cheap O(N) pass.
The "Overall Peak" stat (peak_pvs) EXCLUDES events flagged as false
triggers — operators care about the highest REAL event, not the
biggest sensor glitch. false_trigger_count still includes them so
operators can see how many were filtered out. last_event uses
every event regardless (it's about activity recency, not magnitude).
"""
if not events:
return _empty_stats()
peak_pvs = None
peak_pvs_at = None
peak_pvs_serial = None
last_event = None
false_trigger_count = 0
for ev in events:
is_false_trigger = bool(ev.get("false_trigger"))
if is_false_trigger:
false_trigger_count += 1
# Peak calculation: skip flagged false triggers.
if not is_false_trigger:
pvs = ev.get("peak_vector_sum")
if pvs is not None and (peak_pvs is None or pvs > peak_pvs):
peak_pvs = pvs
peak_pvs_at = ev.get("timestamp")
peak_pvs_serial = ev.get("serial")
ts = ev.get("timestamp")
if ts and (last_event is None or ts > last_event):
last_event = ts
return {
"event_count": len(events),
"peak_pvs": peak_pvs,
"peak_pvs_at": peak_pvs_at,
"peak_pvs_serial": peak_pvs_serial,
"last_event": last_event,
"false_trigger_count": false_trigger_count,
}