Files
terra-view/backend/routers/units.py
T
serversdown 56bd3041cf feat(dashboard): clarify the fleet status card and swap map locations to project monitoring location coords.
feat: Location no longer assigned directly to unit, locations and coords are assigned to location only, unit only is deployed or benched.
2026-06-01 22:01:38 +00:00

169 lines
5.7 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from datetime import datetime
from typing import Dict, Any, Optional
from backend.database import get_db
from backend.services.snapshot import emit_status_snapshot
from backend.services.unit_location import get_active_location
from backend.models import RosterUnit
router = APIRouter(prefix="/api", tags=["units"])
@router.get("/unit/{unit_id}")
def get_unit_detail(unit_id: str, db: Session = Depends(get_db)):
"""
Returns detailed data for a single unit, including its active deployment
location (or None if benched / unassigned).
"""
snapshot = emit_status_snapshot()
if unit_id not in snapshot["units"]:
raise HTTPException(status_code=404, detail=f"Unit {unit_id} not found")
unit_data = snapshot["units"][unit_id]
active_loc = get_active_location(db, unit_id)
return {
"id": unit_id,
"status": unit_data["status"],
"age": unit_data["age"],
"last_seen": unit_data["last"],
"last_file": unit_data.get("fname", ""),
"deployed": unit_data["deployed"],
"note": unit_data.get("note", ""),
"active_location": active_loc,
}
@router.get("/units/{unit_id}")
def get_unit_by_id(unit_id: str, db: Session = Depends(get_db)):
"""
Get unit data directly from the roster (for settings/configuration).
Address/coordinates come from the active MonitoringLocation, not the
roster row.
"""
unit = db.query(RosterUnit).filter_by(id=unit_id).first()
if not unit:
raise HTTPException(status_code=404, detail=f"Unit {unit_id} not found")
active_loc = get_active_location(db, unit_id)
return {
"id": unit.id,
"unit_type": unit.unit_type,
"device_type": unit.device_type,
"deployed": unit.deployed,
"retired": unit.retired,
"note": unit.note,
"active_location": active_loc,
"address": (active_loc or {}).get("address") or "",
"coordinates": (active_loc or {}).get("coordinates") or "",
"slm_host": unit.slm_host,
"slm_tcp_port": unit.slm_tcp_port,
"slm_ftp_port": unit.slm_ftp_port,
"slm_model": unit.slm_model,
"slm_serial_number": unit.slm_serial_number,
"deployed_with_modem_id": unit.deployed_with_modem_id
}
@router.get("/units/{unit_id}/events")
async def get_unit_events(
unit_id: str,
bucket: str = Query("all", regex="^(all|attributed|unattributed)$"),
from_dt: Optional[datetime] = Query(None),
to_dt: Optional[datetime] = Query(None),
false_trigger: Optional[bool] = Query(None),
limit: int = Query(500, ge=1, le=5000),
db: Session = Depends(get_db),
):
"""
Return SFM events for a single unit, annotated with assignment attribution.
Each event includes an `attribution` object pointing at the project/location
it falls into (or null if outside every assignment window). Unattributed
events also carry a `nearest_assignment` field with `delta_days` so the
operator can see how far off the nearest assignment is — useful for
deciding whether to backdate the assignment to absorb the event.
Bucket filter:
- all (default): every event
- attributed: only events inside an assignment window
- unattributed: only orphan events (the diagnostic bucket)
Non-seismograph units return an empty events list. The route does not
404 for SLMs/modems so the unit detail page can render the section
conditionally without depending on the response shape.
"""
unit = db.query(RosterUnit).filter_by(id=unit_id).first()
if not unit:
raise HTTPException(status_code=404, detail=f"Unit {unit_id} not found")
if unit.device_type != "seismograph":
return {
"events": [],
"count": 0,
"stats": {
"event_count": 0,
"unattributed_count": 0,
"peak_pvs": None,
"peak_pvs_at": None,
"peak_pvs_serial": None,
"last_event": None,
"false_trigger_count": 0,
},
"assignments_total": 0,
"device_type": unit.device_type,
}
from backend.services.sfm_events import events_for_unit
result = await events_for_unit(
db,
unit_id,
bucket=bucket,
from_dt=from_dt,
to_dt=to_dt,
false_trigger=false_trigger,
limit=limit,
)
result["device_type"] = unit.device_type
return result
@router.get("/units/{unit_id}/deployment_timeline")
async def get_unit_deployment_timeline(
unit_id: str,
include_events: bool = Query(True),
db: Session = Depends(get_db),
):
"""
Return a chronological deployment timeline for a unit.
Merges three sources:
1. unit_assignments — authoritative project/location deployments
2. unit_history — state changes (calibration, retirement, etc.)
3. SFM events — per-assignment overlay (count + peak PVS + last event)
Replaces the legacy /api/deployments/{unit_id} (which read the
deprecated `deployment_records` table) and the
/api/roster/history/{unit_id} timeline endpoint, unifying them into
a single derived view.
Gaps >= 1 day between consecutive assignments are surfaced as
synthetic "gap" entries.
Pass include_events=false to skip the SFM event overlay (saves N
HTTP calls; useful for fast text-only history dumps).
"""
from backend.services.deployment_timeline import deployment_timeline_for_unit
return await deployment_timeline_for_unit(
db,
unit_id,
include_event_overlay=include_events,
)