""" Deployment timeline service — replaces the legacy `deployment_records`-driven timeline on the seismograph unit detail page. Architecture: - `unit_assignments` is the authoritative source for "where was this unit" (one row per location/time-window). Auto-written by the project location swap/assign/unassign/update workflows. - `unit_history` is the audit log for non-location state changes (calibration toggles, retirement, allocation, etc.). - SFM events are overlaid per assignment window to show "what was the unit actually doing during this deployment" (count + peak PVS + last-event). Gaps between assignments are emitted as synthetic "gap" entries so operators can see when the unit was idle vs out-of-service. `deployment_records` is being deprecated; this module does not read it. """ from __future__ import annotations import asyncio import logging from datetime import datetime, timedelta from typing import Optional import httpx from sqlalchemy.orm import Session from backend.models import ( UnitAssignment, UnitHistory, MonitoringLocation, Project, RosterUnit, ) from backend.services.sfm_events import ( SFM_BASE_URL, _fetch_events_for_serial, _iso_utc, ) log = logging.getLogger("backend.services.deployment_timeline") # Don't emit synthetic gap entries shorter than this (seconds). Avoids visual # clutter from a sub-second handoff during a swap workflow. _MIN_GAP_SECONDS = 24 * 3600 # 1 day # When detecting "mergeable" groups of consecutive same-location assignments, # treat assignments separated by no more than this many seconds as adjacent. # Generous enough to catch overnight handoffs and weekend gaps where the # operator forgot to log, but tight enough that genuinely separate # deployments months apart don't get suggested for merging. _MERGE_GAP_TOLERANCE_SECONDS = 7 * 24 * 3600 # 7 days # Per-call timeout when querying SFM for the event overlay. _SFM_TIMEOUT = 10.0 _SFM_FETCH_CEILING = 5000 # ── Public API ──────────────────────────────────────────────────────────────── async def deployment_timeline_for_unit( db: Session, unit_id: str, *, include_event_overlay: bool = True, ) -> dict: """Build a chronological timeline for a unit. Returns: { "unit_id": str, "device_type": str, "entries": [ { "kind": "assignment" | "gap" | "state_change", "starts_at": ISO timestamp, "ends_at": ISO timestamp | None, "duration_days": float | None, # — assignment-only fields — "assignment_id": str, "location_id": str, "location_name": str, "project_id": str, "project_name": str, "is_active": bool, "event_overlay": {event_count, peak_pvs, peak_pvs_at, last_event} or None if include_event_overlay=False, "notes": str | None, # — gap-only fields — "context": "between assignments" | None, # — state_change-only fields — "change_type": str, "field_name": str | None, "old_value": str | None, "new_value": str | None, "source": str, "history_notes": str | None, }, ... # newest first ], } """ unit = db.query(RosterUnit).filter_by(id=unit_id).first() if not unit: return {"unit_id": unit_id, "device_type": None, "entries": []} # 1. Load assignments + their location/project lookups in bulk. assignments = ( db.query(UnitAssignment) .filter(UnitAssignment.unit_id == unit_id) .order_by(UnitAssignment.assigned_at.asc()) .all() ) loc_ids = {a.location_id for a in assignments} proj_ids = {a.project_id for a in assignments} loc_map = { l.id: l for l in db.query(MonitoringLocation).filter( MonitoringLocation.id.in_(loc_ids) ).all() } if loc_ids else {} proj_map = { p.id: p for p in db.query(Project).filter( Project.id.in_(proj_ids) ).all() } if proj_ids else {} # 2. Load relevant unit_history rows. We surface state changes that # operators care about on a deployment timeline: calibration status, # retirement, deployed flag, allocation, calibration date, and the # assignment_* events we just added (those are redundant with the # assignment rows themselves, so we skip them to avoid double-rendering). interesting_change_types = ( "calibration_status_change", "retired_change", "deployed_change", "allocation_change", "last_calibrated_change", "next_calibration_due_change", ) history = ( db.query(UnitHistory) .filter(UnitHistory.unit_id == unit_id) .filter(UnitHistory.change_type.in_(interesting_change_types)) .order_by(UnitHistory.changed_at.asc()) .all() ) now = datetime.utcnow() # 3. Optionally fetch SFM event overlay for each assignment window. # Concurrent fan-out via httpx + asyncio.gather. overlays: dict[str, dict] = {} if include_event_overlay and assignments and unit.device_type == "seismograph": async with httpx.AsyncClient(timeout=_SFM_TIMEOUT) as client: results = await asyncio.gather( *( _fetch_events_for_serial( client, serial=unit_id, from_dt=a.assigned_at, to_dt=a.assigned_until or now, false_trigger=None, limit=_SFM_FETCH_CEILING, ) for a in assignments ), return_exceptions=False, ) for a, events in zip(assignments, results): peak = None peak_at = None last_ev = None for ev in events: pvs = ev.get("peak_vector_sum") if pvs is not None and (peak is None or pvs > peak): peak = pvs peak_at = ev.get("timestamp") ts = ev.get("timestamp") if ts and (last_ev is None or ts > last_ev): last_ev = ts overlays[a.id] = { "event_count": len(events), "peak_pvs": peak, "peak_pvs_at": peak_at, "last_event": last_ev, } # 4. Build entries. Start by emitting assignment rows + gap rows between # consecutive assignments, then add state-change rows from unit_history. entries: list[dict] = [] for idx, a in enumerate(assignments): loc = loc_map.get(a.location_id) proj = proj_map.get(a.project_id) is_active = a.assigned_until is None ends_at = a.assigned_until or now duration_days = (ends_at - a.assigned_at).total_seconds() / 86400 if a.assigned_at else None entry = { "kind": "assignment", "starts_at": _iso_utc(a.assigned_at), "ends_at": _iso_utc(a.assigned_until), "duration_days": round(duration_days, 1) if duration_days is not None else None, "assignment_id": a.id, "location_id": a.location_id, "location_name": loc.name if loc else None, "project_id": a.project_id, "project_name": proj.name if proj else None, "is_active": is_active, "notes": a.notes, "event_overlay": overlays.get(a.id), } entries.append(entry) # Gap detection: from the end of this assignment to the start of the # next one. Only emit gaps that are at least _MIN_GAP_SECONDS long # so trivial sub-second handoffs during swaps don't clutter the view. if idx + 1 < len(assignments): next_a = assignments[idx + 1] gap_start = a.assigned_until or now gap_end = next_a.assigned_at gap_seconds = (gap_end - gap_start).total_seconds() if gap_end and gap_start else 0 if gap_seconds >= _MIN_GAP_SECONDS: entries.append({ "kind": "gap", "starts_at": _iso_utc(gap_start), "ends_at": _iso_utc(gap_end), "duration_days": round(gap_seconds / 86400, 1), "context": "between assignments", }) # 5. State changes — interleaved by timestamp. Skip no-op rows where # old_value == new_value (an artifact of the legacy record_history() # being called on every save regardless of whether the field changed). for h in history: if h.old_value == h.new_value: continue entries.append({ "kind": "state_change", "starts_at": _iso_utc(h.changed_at), "ends_at": None, "duration_days": None, "change_type": h.change_type, "field_name": h.field_name, "old_value": h.old_value, "new_value": h.new_value, "source": h.source, "history_notes": h.notes, }) # 6. Detect mergeable groups — runs of consecutive assignments to the # same location with small gaps between them. Each group becomes a # list of assignment_ids; the UI offers a "Merge into one" action # on any group >= 2. merge_groups: list[list[str]] = [] if len(assignments) >= 2: # Sort ascending for the linear scan. sorted_assignments = sorted(assignments, key=lambda a: a.assigned_at) cur_group: list[UnitAssignment] = [sorted_assignments[0]] for a in sorted_assignments[1:]: prev = cur_group[-1] same_location = a.location_id == prev.location_id prev_end = prev.assigned_until or now gap_seconds = (a.assigned_at - prev_end).total_seconds() if a.assigned_at else 0 # Within tolerance and same location → extend the current group. # Negative gaps (overlap) also count as adjacent. if same_location and gap_seconds <= _MERGE_GAP_TOLERANCE_SECONDS: cur_group.append(a) else: if len(cur_group) >= 2: merge_groups.append([x.id for x in cur_group]) cur_group = [a] if len(cur_group) >= 2: merge_groups.append([x.id for x in cur_group]) # 7. Sort newest first. Active assignments (no end) sort by start time, # same as everything else. entries.sort(key=lambda e: e.get("starts_at") or "", reverse=True) return { "unit_id": unit.id, "device_type": unit.device_type, "entries": entries, # List of assignment_id lists; each inner list is a mergeable group. # Empty if nothing is mergeable. UI shows a "Merge" button on any # row whose assignment_id appears in a group. "merge_groups": merge_groups, }