Files
terra-view/backend/routers/pending_deployments.py
T
serversdown 502bf5bbeb fix(roster): bench outgoing unit on swap / unassign / deploy-classify
The legacy RosterUnit.deployed flag drives heartbeat polling and
benched-vs-deployed roster filters.  Three workflows ended an
assignment without flipping it, so the outgoing unit kept being
polled and showed up as "deployed" forever:

  - swap endpoint            (POST /locations/{loc}/swap)
  - unassign endpoint        (POST /assignments/{aid}/unassign)
  - promote-pending endpoint (POST /deployments/pending/{id}/promote)

All three now: close the previous active assignment, break the
outgoing unit's modem pairing (both directions), and set
`deployed = False` on the outgoing unit.  Unassign and swap also
clear the modem's back-reference.

The promote-pending path additionally handles the case where the
target location already has an active assignment — that previously
silently created two active assignments at the same location.  Now
the old one is closed (assigned_until = pending capture time, status
= completed), the old unit is benched and unpaired, and an
"assignment_swapped" history row is written.  Incoming unit gets
`deployed = True` if it was on the bench.

Verified live: triggered a swap via the existing endpoint and saw
the outgoing unit flip True → False while the incoming flipped
False → True.  Test mutations rolled back.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-18 18:04:09 +00:00

489 lines
17 KiB
Python

"""
Pending deployments — field-captured "I just installed this seismograph"
records waiting to be classified into a project + location.
Routes:
POST /api/deployments/capture — capture a new pending deployment
GET /api/deployments/pending — list awaiting captures
GET /api/deployments/pending/{id} — single capture detail
POST /api/deployments/pending/{id}/promote — classify → create UnitAssignment
POST /api/deployments/pending/{id}/cancel — abandon
See backend/models.py PendingDeployment docstring for the full lifecycle.
Seismograph-only for v1; capture refuses if unit_id is anything else.
"""
from __future__ import annotations
import shutil
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.models import (
PendingDeployment,
RosterUnit,
Project,
MonitoringLocation,
UnitAssignment,
UnitHistory,
)
from backend.routers.photos import extract_exif_data
router = APIRouter(prefix="/api/deployments", tags=["pending-deployments"])
PHOTOS_BASE_DIR = Path("data/photos")
_ALLOWED_PHOTO_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".heic", ".heif"}
def _record_history(
db: Session,
unit_id: str,
change_type: str,
*,
old_value: Optional[str] = None,
new_value: Optional[str] = None,
notes: Optional[str] = None,
source: str = "manual",
) -> None:
"""Mirror of project_locations._record_assignment_history — kept local
so this router doesn't depend on a project_locations import cycle."""
db.add(UnitHistory(
unit_id=unit_id,
change_type=change_type,
field_name="pending_deployment",
old_value=old_value,
new_value=new_value,
changed_at=datetime.utcnow(),
source=source,
notes=notes,
))
@router.get("/seismograph-picker")
def seismograph_picker(
q: str = "",
limit: int = 20,
db: Session = Depends(get_db),
):
"""JSON list of seismograph units for the /deploy mobile picker.
Filters out retired units. Sorts by recency of pending captures
first, then alphabetically — so units the operator is actively
deploying with surface at the top.
"""
q_clean = (q or "").strip()
qb = db.query(RosterUnit).filter(
RosterUnit.device_type == "seismograph",
RosterUnit.retired == False, # noqa: E712
)
if q_clean:
qb = qb.filter(
(RosterUnit.id.ilike(f"%{q_clean}%"))
| (RosterUnit.note.ilike(f"%{q_clean}%"))
)
units = qb.order_by(RosterUnit.id).limit(limit).all()
# Annotate with "has an awaiting pending deployment" so the picker
# can de-emphasize / warn on units that are already mid-deploy.
pending_unit_ids = {
r[0] for r in db.query(PendingDeployment.unit_id)
.filter_by(status="awaiting").distinct().all()
}
return {
"units": [
{
"id": u.id,
"note": u.note,
"deployed": u.deployed,
"has_pending": u.id in pending_unit_ids,
}
for u in units
],
}
@router.post("/capture")
async def capture_deployment(
unit_id: str = Form(...),
operator_note: str = Form(""),
captured_at_iso: str = Form(""),
photo: UploadFile = File(...),
db: Session = Depends(get_db),
):
"""Field-capture endpoint.
Multipart form:
unit_id — seismograph being deployed
operator_note — optional free-text site memo
captured_at_iso — optional override of the capture timestamp
(default: photo's EXIF DateTimeOriginal, or now)
photo — install photo (EXIF GPS extracted if present)
Refuses if unit_id isn't a seismograph (SLM deployments don't follow
the same field-install pattern).
"""
unit = db.query(RosterUnit).filter_by(id=unit_id).first()
if not unit:
raise HTTPException(status_code=404, detail=f"Unit {unit_id!r} not found.")
if unit.device_type != "seismograph":
raise HTTPException(
status_code=400,
detail=f"Pending deployments are for seismographs only "
f"(this unit is {unit.device_type}).",
)
# Validate + save the photo.
file_ext = Path(photo.filename or "photo.jpg").suffix.lower()
if file_ext not in _ALLOWED_PHOTO_EXTS:
raise HTTPException(
status_code=400,
detail=f"Invalid photo type {file_ext!r}. Allowed: {sorted(_ALLOWED_PHOTO_EXTS)}",
)
unit_photo_dir = PHOTOS_BASE_DIR / unit_id
unit_photo_dir.mkdir(parents=True, exist_ok=True)
capture_id = str(uuid.uuid4())
ts_str = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"install_{ts_str}_{capture_id[:8]}{file_ext}"
file_path = unit_photo_dir / filename
try:
with open(file_path, "wb") as buf:
shutil.copyfileobj(photo.file, buf)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to save photo: {e}")
# Extract EXIF — best-effort. No EXIF / no GPS is fine; operator
# can fill coordinates manually later in the promote step.
metadata = extract_exif_data(file_path)
coords = metadata.get("coordinates") # "lat,lon" or None
photo_ts = metadata.get("timestamp") # datetime or None
if captured_at_iso:
try:
captured_at = datetime.fromisoformat(captured_at_iso)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid captured_at_iso: {captured_at_iso!r}")
elif photo_ts:
captured_at = photo_ts
else:
captured_at = datetime.utcnow()
pd = PendingDeployment(
id = capture_id,
unit_id = unit_id,
captured_at = captured_at,
coordinates = coords,
operator_note = (operator_note or "").strip() or None,
photo_filename = filename,
status = "awaiting",
)
db.add(pd)
_record_history(
db, unit_id=unit_id,
change_type="pending_deployment_captured",
new_value=f"awaiting classification @ {captured_at:%Y-%m-%d %H:%M}"
+ (f"{coords}" if coords else ""),
notes=(operator_note or None),
)
db.commit()
db.refresh(pd)
return JSONResponse({
"success": True,
"pending_deployment": _to_dict(pd, unit=unit),
"photo_url": f"/api/unit/{unit_id}/photo/{filename}",
"extracted_coords": coords,
"extracted_timestamp": photo_ts.isoformat() if photo_ts else None,
})
@router.get("/pending")
def list_pending(
status: str = "awaiting",
db: Session = Depends(get_db),
):
"""List pending deployments by status (default: awaiting classification)."""
rows = (
db.query(PendingDeployment)
.filter_by(status=status)
.order_by(PendingDeployment.captured_at.desc())
.all()
)
# Bulk-resolve unit references in one query (avoid N+1).
unit_ids = {r.unit_id for r in rows}
units = {u.id: u for u in db.query(RosterUnit).filter(RosterUnit.id.in_(unit_ids)).all()} \
if unit_ids else {}
return {
"count": len(rows),
"status": status,
"pending_deployments": [_to_dict(r, unit=units.get(r.unit_id)) for r in rows],
}
@router.get("/pending/{pending_id}")
def get_pending(pending_id: str, db: Session = Depends(get_db)):
pd = db.query(PendingDeployment).filter_by(id=pending_id).first()
if not pd:
raise HTTPException(status_code=404, detail="Pending deployment not found.")
unit = db.query(RosterUnit).filter_by(id=pd.unit_id).first()
return _to_dict(pd, unit=unit, detail=True)
@router.post("/pending/{pending_id}/promote")
async def promote_pending(
pending_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""Classify a pending deployment → create a UnitAssignment.
Body JSON — one of two shapes:
1. Assign to existing location:
{
"location_id": "<uuid>",
"notes": "<optional>"
}
2. Create a new location under (existing or new) project:
{
"project_id": "<existing>" | null, # null means create new
"project_name": "<required if project_id is null>",
"project_type_id": "<existing project_type id, e.g. 'vibration_monitoring'>",
# required if creating new project
"location_name": "<required>",
"use_captured_coords": true | false, # default true — write the
# pending's coordinates onto
# the new location
"notes": "<optional>"
}
Status flips to "assigned"; resulting_assignment_id is populated.
"""
pd = db.query(PendingDeployment).filter_by(id=pending_id).first()
if not pd:
raise HTTPException(status_code=404, detail="Pending deployment not found.")
if pd.status != "awaiting":
raise HTTPException(
status_code=400,
detail=f"Pending deployment is {pd.status!r}, not awaiting — already classified?",
)
try:
payload = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body.")
notes = (payload.get("notes") or "").strip() or None
# Resolve / create the location.
location_id = payload.get("location_id")
if location_id:
location = db.query(MonitoringLocation).filter_by(id=location_id).first()
if not location:
raise HTTPException(status_code=404, detail=f"Location {location_id!r} not found.")
project_id = location.project_id
else:
# Create-new path. Need a project (existing or new).
project_id = payload.get("project_id")
if not project_id:
project_name = (payload.get("project_name") or "").strip()
project_type_id = (payload.get("project_type_id") or "").strip()
if not project_name:
raise HTTPException(
status_code=400,
detail="Either project_id, or project_name + project_type_id, required.",
)
if not project_type_id:
raise HTTPException(
status_code=400,
detail="project_type_id required when creating a new project.",
)
new_project = Project(
id=str(uuid.uuid4()),
name=project_name,
project_type_id=project_type_id,
status="active",
)
db.add(new_project)
db.flush()
project_id = new_project.id
loc_name = (payload.get("location_name") or "").strip()
if not loc_name:
raise HTTPException(status_code=400, detail="location_name required.")
use_coords = payload.get("use_captured_coords", True)
location = MonitoringLocation(
id=str(uuid.uuid4()),
project_id=project_id,
location_type="vibration", # seismographs only
name=loc_name,
coordinates=(pd.coordinates if use_coords else None),
)
db.add(location)
db.flush()
# If this location already has an active assignment, the /deploy
# capture means someone replaced that unit in the field — close the
# old assignment, break the outgoing unit's modem pairing, and bench
# it so the heartbeat / polling subsystem stops chasing it.
existing_active = db.query(UnitAssignment).filter(
UnitAssignment.location_id == location.id,
UnitAssignment.assigned_until == None, # noqa: E711
).first()
if existing_active and existing_active.unit_id != pd.unit_id:
existing_active.assigned_until = pd.captured_at
existing_active.status = "completed"
old_unit = db.query(RosterUnit).filter_by(id=existing_active.unit_id).first()
if old_unit:
if old_unit.deployed_with_modem_id:
old_modem = db.query(RosterUnit).filter_by(
id=old_unit.deployed_with_modem_id, device_type="modem"
).first()
if old_modem and old_modem.deployed_with_unit_id == old_unit.id:
old_modem.deployed_with_unit_id = None
old_unit.deployed_with_modem_id = None
if old_unit.deployed:
old_unit.deployed = False
_record_history(
db, unit_id=existing_active.unit_id,
change_type="assignment_swapped",
old_value=location.name,
new_value=f"superseded by /deploy capture → {pd.unit_id}",
notes=notes,
)
# Create the assignment. assigned_at = pending capture time (so
# events emitted after the install are correctly attributed back
# to this location).
assignment = UnitAssignment(
id=str(uuid.uuid4()),
unit_id=pd.unit_id,
location_id=location.id,
project_id=project_id,
device_type="seismograph",
assigned_at=pd.captured_at,
assigned_until=None,
status="active",
notes=notes,
source="manual",
)
db.add(assignment)
db.flush()
# Incoming unit is in the field again — flip it back to deployed
# if it was on the bench (mirrors the swap endpoint).
incoming_unit = db.query(RosterUnit).filter_by(id=pd.unit_id).first()
if incoming_unit and not incoming_unit.deployed:
incoming_unit.deployed = True
# Promote the pending row.
pd.status = "assigned"
pd.promoted_at = datetime.utcnow()
pd.resulting_assignment_id = assignment.id
pd.updated_at = datetime.utcnow()
_record_history(
db, unit_id=pd.unit_id,
change_type="pending_deployment_promoted",
old_value="awaiting",
new_value=f"{location.name} (assignment {assignment.id[:8]})",
notes=notes,
)
db.commit()
db.refresh(pd)
db.refresh(assignment)
return {
"success": True,
"assignment_id": assignment.id,
"location_id": location.id,
"project_id": project_id,
"promoted_at": pd.promoted_at.isoformat(),
}
@router.post("/pending/{pending_id}/cancel")
async def cancel_pending(
pending_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""Mark a pending deployment as cancelled (operator captured by mistake)."""
pd = db.query(PendingDeployment).filter_by(id=pending_id).first()
if not pd:
raise HTTPException(status_code=404, detail="Pending deployment not found.")
if pd.status != "awaiting":
raise HTTPException(
status_code=400,
detail=f"Pending deployment is {pd.status!r}, not awaiting.",
)
try:
payload = await request.json()
except Exception:
payload = {}
reason = (payload.get("reason") or "").strip() or None
pd.status = "cancelled"
pd.cancelled_at = datetime.utcnow()
pd.cancelled_reason = reason
pd.updated_at = datetime.utcnow()
_record_history(
db, unit_id=pd.unit_id,
change_type="pending_deployment_cancelled",
old_value="awaiting",
new_value="cancelled",
notes=reason,
)
db.commit()
return {"success": True, "cancelled_at": pd.cancelled_at.isoformat()}
# ── Helpers ───────────────────────────────────────────────────────────────────
def _to_dict(pd: PendingDeployment, unit: Optional[RosterUnit] = None, detail: bool = False) -> dict:
out = {
"id": pd.id,
"unit_id": pd.unit_id,
"captured_at": pd.captured_at.isoformat() if pd.captured_at else None,
"coordinates": pd.coordinates,
"operator_note": pd.operator_note,
"photo_filename": pd.photo_filename,
"photo_url": f"/api/unit/{pd.unit_id}/photo/{pd.photo_filename}"
if pd.photo_filename else None,
"status": pd.status,
"created_at": pd.created_at.isoformat() if pd.created_at else None,
}
if pd.status == "assigned":
out["promoted_at"] = pd.promoted_at.isoformat() if pd.promoted_at else None
out["resulting_assignment_id"] = pd.resulting_assignment_id
if pd.status == "cancelled":
out["cancelled_at"] = pd.cancelled_at.isoformat() if pd.cancelled_at else None
out["cancelled_reason"] = pd.cancelled_reason
if unit:
out["unit"] = {
"id": unit.id,
"device_type": unit.device_type,
"note": unit.note,
"deployed": unit.deployed,
}
return out