Files
terra-view/backend/routers/units.py
serversdown f1f3da8e61 feat(sfm): unified deployment timeline (deprecate deployment_records)
Phase 4.  Rebuilds the seismograph "Deployment History" + "Timeline"
sections on the unit detail page as a single derived view computed from
three sources: unit_assignments (authoritative project/location windows),
unit_history (calibration/retirement/deployed state changes), and SFM
events overlaid per assignment window (count + peak PVS + last event).

Fixes the wonky-timeline symptoms: missing entries, duplicate/contradictory
rows, and no visibility into what the unit was actually doing during each
deployment window.

Backend:
- backend/services/deployment_timeline.py: new deployment_timeline_for_unit()
  helper.  Merges UnitAssignment rows (with SFM event overlay fetched
  concurrently via httpx), UnitHistory state-change rows (filtered to
  meaningful change_types and de-noised by dropping rows where
  old_value == new_value — there's noise in legacy audit log from
  record_history() being called on every save), and synthetic "gap"
  entries between assignments >= 1 day apart.  Sorts newest first.

- backend/routers/units.py: new GET /api/units/{unit_id}/deployment_timeline
  endpoint with optional include_events=false flag.

- backend/routers/project_locations.py: assign / unassign / swap /
  update endpoints now write UnitHistory rows on every assignment
  lifecycle event.  New change_types: assignment_created,
  assignment_ended, assignment_swapped, assignment_updated.  These
  surface in the unified timeline (where the assignment row itself
  shows the structural data; the audit row is filtered out to avoid
  double-rendering).  Closes a real gap — assignment changes were
  previously invisible to any audit consumer.

- backend/migrate_deprecate_deployment_records.py: non-destructive
  migration.  Adds deployment_records.deprecated_at column.  For each
  legacy row without a matching UnitAssignment, best-effort
  synthesizes one (with the free-text location_name preserved in
  notes).  Marks every processed row.  Idempotent.  DROP TABLE
  deferred to a follow-up release.

Frontend (templates/unit_detail.html):
- Removed legacy "Deployment History" card (with Log Deployment button)
  and the separate "Timeline" card.  Replaced with a single
  "Deployment Timeline" section.
- Three entry visual styles: assignment rows (orange dot, location +
  project link, event-overlay summary), gap rows (dashed outline, idle
  day count), and state_change rows (navy dot, friendly label, old →
  new value).  Active assignments get a green dot + "active" badge.
- Existing loadUnitHistory() and loadDeploymentHistory() functions kept
  as shims that delegate to loadDeploymentTimeline(), so modal-save
  callbacks that referenced them still trigger a refresh of the visible
  section.  Legacy function bodies preserved under _legacy_*_unused
  names for archeology; not called by anything.

Verified end-to-end:
- BE11529 timeline now shows 2 entries (active assignment with 24-event
  overlay + the deployed→benched state change), compared to the previous
  noisy mix that included 6 no-op state-change rows.
- Migration ran against real DB: 1 legacy row processed (had no
  project_id, marked deprecated without backfill).
- Assign / unassign / swap / edit now leave a paper trail in
  unit_history.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-12 00:15:07 +00:00

173 lines
5.9 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from datetime import datetime
from typing import Dict, Any, Optional
from backend.database import get_db
from backend.services.snapshot import emit_status_snapshot
from backend.models import RosterUnit
router = APIRouter(prefix="/api", tags=["units"])
@router.get("/unit/{unit_id}")
def get_unit_detail(unit_id: str, db: Session = Depends(get_db)):
"""
Returns detailed data for a single unit.
"""
snapshot = emit_status_snapshot()
if unit_id not in snapshot["units"]:
raise HTTPException(status_code=404, detail=f"Unit {unit_id} not found")
unit_data = snapshot["units"][unit_id]
# Mock coordinates for now (will be replaced with real data)
mock_coords = {
"BE1234": {"lat": 37.7749, "lon": -122.4194, "location": "San Francisco, CA"},
"BE5678": {"lat": 34.0522, "lon": -118.2437, "location": "Los Angeles, CA"},
"BE9012": {"lat": 40.7128, "lon": -74.0060, "location": "New York, NY"},
"BE3456": {"lat": 41.8781, "lon": -87.6298, "location": "Chicago, IL"},
"BE7890": {"lat": 29.7604, "lon": -95.3698, "location": "Houston, TX"},
}
coords = mock_coords.get(unit_id, {"lat": 39.8283, "lon": -98.5795, "location": "Unknown"})
return {
"id": unit_id,
"status": unit_data["status"],
"age": unit_data["age"],
"last_seen": unit_data["last"],
"last_file": unit_data.get("fname", ""),
"deployed": unit_data["deployed"],
"note": unit_data.get("note", ""),
"coordinates": coords
}
@router.get("/units/{unit_id}")
def get_unit_by_id(unit_id: str, db: Session = Depends(get_db)):
"""
Get unit data directly from the roster (for settings/configuration).
"""
unit = db.query(RosterUnit).filter_by(id=unit_id).first()
if not unit:
raise HTTPException(status_code=404, detail=f"Unit {unit_id} not found")
return {
"id": unit.id,
"unit_type": unit.unit_type,
"device_type": unit.device_type,
"deployed": unit.deployed,
"retired": unit.retired,
"note": unit.note,
"location": unit.location,
"address": unit.address,
"coordinates": unit.coordinates,
"slm_host": unit.slm_host,
"slm_tcp_port": unit.slm_tcp_port,
"slm_ftp_port": unit.slm_ftp_port,
"slm_model": unit.slm_model,
"slm_serial_number": unit.slm_serial_number,
"deployed_with_modem_id": unit.deployed_with_modem_id
}
@router.get("/units/{unit_id}/events")
async def get_unit_events(
unit_id: str,
bucket: str = Query("all", regex="^(all|attributed|unattributed)$"),
from_dt: Optional[datetime] = Query(None),
to_dt: Optional[datetime] = Query(None),
false_trigger: Optional[bool] = Query(None),
limit: int = Query(500, ge=1, le=5000),
db: Session = Depends(get_db),
):
"""
Return SFM events for a single unit, annotated with assignment attribution.
Each event includes an `attribution` object pointing at the project/location
it falls into (or null if outside every assignment window). Unattributed
events also carry a `nearest_assignment` field with `delta_days` so the
operator can see how far off the nearest assignment is — useful for
deciding whether to backdate the assignment to absorb the event.
Bucket filter:
- all (default): every event
- attributed: only events inside an assignment window
- unattributed: only orphan events (the diagnostic bucket)
Non-seismograph units return an empty events list. The route does not
404 for SLMs/modems so the unit detail page can render the section
conditionally without depending on the response shape.
"""
unit = db.query(RosterUnit).filter_by(id=unit_id).first()
if not unit:
raise HTTPException(status_code=404, detail=f"Unit {unit_id} not found")
if unit.device_type != "seismograph":
return {
"events": [],
"count": 0,
"stats": {
"event_count": 0,
"unattributed_count": 0,
"peak_pvs": None,
"peak_pvs_at": None,
"peak_pvs_serial": None,
"last_event": None,
"false_trigger_count": 0,
},
"assignments_total": 0,
"device_type": unit.device_type,
}
from backend.services.sfm_events import events_for_unit
result = await events_for_unit(
db,
unit_id,
bucket=bucket,
from_dt=from_dt,
to_dt=to_dt,
false_trigger=false_trigger,
limit=limit,
)
result["device_type"] = unit.device_type
return result
@router.get("/units/{unit_id}/deployment_timeline")
async def get_unit_deployment_timeline(
unit_id: str,
include_events: bool = Query(True),
db: Session = Depends(get_db),
):
"""
Return a chronological deployment timeline for a unit.
Merges three sources:
1. unit_assignments — authoritative project/location deployments
2. unit_history — state changes (calibration, retirement, etc.)
3. SFM events — per-assignment overlay (count + peak PVS + last event)
Replaces the legacy /api/deployments/{unit_id} (which read the
deprecated `deployment_records` table) and the
/api/roster/history/{unit_id} timeline endpoint, unifying them into
a single derived view.
Gaps >= 1 day between consecutive assignments are surfaced as
synthetic "gap" entries.
Pass include_events=false to skip the SFM event overlay (saves N
HTTP calls; useful for fast text-only history dumps).
"""
from backend.services.deployment_timeline import deployment_timeline_for_unit
return await deployment_timeline_for_unit(
db,
unit_id,
include_event_overlay=include_events,
)