Files
terra-view/backend/routers/pending_deployments.py
T
serversdown e05f2189c4 feat(deployments): field-capture endpoint for pending deployments (commit 1)
Field-install workflow needs to be fast: arrive on site, snap a photo
of the seismograph in place, leave.  Project / location classification
happens later at a desk.  This adds the data model + capture endpoint
for that workflow.

Data model
- New PendingDeployment table.  Lifecycle: awaiting → assigned (when
  promoted to a real UnitAssignment) or → cancelled (operator's
  mistake).  Photos are filesystem files under data/photos/{unit_id}/
  with the filename stored on the row.
- Migration: backend/migrate_add_pending_deployments.py (idempotent).

Endpoints
- POST /api/deployments/capture  — multipart upload (unit_id, photo,
  optional note).  Refuses non-seismographs.  Extracts EXIF GPS
  (cribbing extract_exif_data from routers/photos.py) and stores
  the captured "lat,lon" on the row.  Saves the photo under
  data/photos/{unit_id}/install_YYYYMMDD_HHMMSS_<uuid8>.<ext>.
  Returns the new pending_deployment_id + extracted coords + photo
  URL for the client to render confirmation.
- GET  /api/deployments/pending           — list by status (default awaiting)
- GET  /api/deployments/pending/{id}      — single row detail
- POST /api/deployments/pending/{id}/promote — classify → create
  UnitAssignment.  Body accepts two shapes: assign-to-existing-location
  OR create-new-location (with new-or-existing project).  Sets
  status=assigned, resulting_assignment_id, promoted_at.
- POST /api/deployments/pending/{id}/cancel  — abandon with optional reason.

All four routes write UnitHistory audit rows
(pending_deployment_captured / _promoted / _cancelled).

Events from a unit with an unclassified pending deployment land in the
unit's "Unattributed" events bucket as usual.  Once promoted, the new
UnitAssignment's window retroactively attributes them — same mechanism
the metadata-backfill tool uses.

Seismograph-only for v1.  SLM deployments don't follow this pattern
and are tracked elsewhere.  Capture refuses non-seismograph unit_ids
with HTTP 400.

UI (commits 2 + 3) lands next.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-16 03:40:24 +00:00

409 lines
14 KiB
Python

"""
Pending deployments — field-captured "I just installed this seismograph"
records waiting to be classified into a project + location.
Routes:
POST /api/deployments/capture — capture a new pending deployment
GET /api/deployments/pending — list awaiting captures
GET /api/deployments/pending/{id} — single capture detail
POST /api/deployments/pending/{id}/promote — classify → create UnitAssignment
POST /api/deployments/pending/{id}/cancel — abandon
See backend/models.py PendingDeployment docstring for the full lifecycle.
Seismograph-only for v1; capture refuses if unit_id is anything else.
"""
from __future__ import annotations
import shutil
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.models import (
PendingDeployment,
RosterUnit,
Project,
MonitoringLocation,
UnitAssignment,
UnitHistory,
)
from backend.routers.photos import extract_exif_data
router = APIRouter(prefix="/api/deployments", tags=["pending-deployments"])
PHOTOS_BASE_DIR = Path("data/photos")
_ALLOWED_PHOTO_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".heic", ".heif"}
def _record_history(
db: Session,
unit_id: str,
change_type: str,
*,
old_value: Optional[str] = None,
new_value: Optional[str] = None,
notes: Optional[str] = None,
source: str = "manual",
) -> None:
"""Mirror of project_locations._record_assignment_history — kept local
so this router doesn't depend on a project_locations import cycle."""
db.add(UnitHistory(
unit_id=unit_id,
change_type=change_type,
field_name="pending_deployment",
old_value=old_value,
new_value=new_value,
changed_at=datetime.utcnow(),
source=source,
notes=notes,
))
@router.post("/capture")
async def capture_deployment(
unit_id: str = Form(...),
operator_note: str = Form(""),
captured_at_iso: str = Form(""),
photo: UploadFile = File(...),
db: Session = Depends(get_db),
):
"""Field-capture endpoint.
Multipart form:
unit_id — seismograph being deployed
operator_note — optional free-text site memo
captured_at_iso — optional override of the capture timestamp
(default: photo's EXIF DateTimeOriginal, or now)
photo — install photo (EXIF GPS extracted if present)
Refuses if unit_id isn't a seismograph (SLM deployments don't follow
the same field-install pattern).
"""
unit = db.query(RosterUnit).filter_by(id=unit_id).first()
if not unit:
raise HTTPException(status_code=404, detail=f"Unit {unit_id!r} not found.")
if unit.device_type != "seismograph":
raise HTTPException(
status_code=400,
detail=f"Pending deployments are for seismographs only "
f"(this unit is {unit.device_type}).",
)
# Validate + save the photo.
file_ext = Path(photo.filename or "photo.jpg").suffix.lower()
if file_ext not in _ALLOWED_PHOTO_EXTS:
raise HTTPException(
status_code=400,
detail=f"Invalid photo type {file_ext!r}. Allowed: {sorted(_ALLOWED_PHOTO_EXTS)}",
)
unit_photo_dir = PHOTOS_BASE_DIR / unit_id
unit_photo_dir.mkdir(parents=True, exist_ok=True)
capture_id = str(uuid.uuid4())
ts_str = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"install_{ts_str}_{capture_id[:8]}{file_ext}"
file_path = unit_photo_dir / filename
try:
with open(file_path, "wb") as buf:
shutil.copyfileobj(photo.file, buf)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to save photo: {e}")
# Extract EXIF — best-effort. No EXIF / no GPS is fine; operator
# can fill coordinates manually later in the promote step.
metadata = extract_exif_data(file_path)
coords = metadata.get("coordinates") # "lat,lon" or None
photo_ts = metadata.get("timestamp") # datetime or None
if captured_at_iso:
try:
captured_at = datetime.fromisoformat(captured_at_iso)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid captured_at_iso: {captured_at_iso!r}")
elif photo_ts:
captured_at = photo_ts
else:
captured_at = datetime.utcnow()
pd = PendingDeployment(
id = capture_id,
unit_id = unit_id,
captured_at = captured_at,
coordinates = coords,
operator_note = (operator_note or "").strip() or None,
photo_filename = filename,
status = "awaiting",
)
db.add(pd)
_record_history(
db, unit_id=unit_id,
change_type="pending_deployment_captured",
new_value=f"awaiting classification @ {captured_at:%Y-%m-%d %H:%M}"
+ (f"{coords}" if coords else ""),
notes=(operator_note or None),
)
db.commit()
db.refresh(pd)
return JSONResponse({
"success": True,
"pending_deployment": _to_dict(pd, unit=unit),
"photo_url": f"/api/unit/{unit_id}/photo/{filename}",
"extracted_coords": coords,
"extracted_timestamp": photo_ts.isoformat() if photo_ts else None,
})
@router.get("/pending")
def list_pending(
status: str = "awaiting",
db: Session = Depends(get_db),
):
"""List pending deployments by status (default: awaiting classification)."""
rows = (
db.query(PendingDeployment)
.filter_by(status=status)
.order_by(PendingDeployment.captured_at.desc())
.all()
)
# Bulk-resolve unit references in one query (avoid N+1).
unit_ids = {r.unit_id for r in rows}
units = {u.id: u for u in db.query(RosterUnit).filter(RosterUnit.id.in_(unit_ids)).all()} \
if unit_ids else {}
return {
"count": len(rows),
"status": status,
"pending_deployments": [_to_dict(r, unit=units.get(r.unit_id)) for r in rows],
}
@router.get("/pending/{pending_id}")
def get_pending(pending_id: str, db: Session = Depends(get_db)):
pd = db.query(PendingDeployment).filter_by(id=pending_id).first()
if not pd:
raise HTTPException(status_code=404, detail="Pending deployment not found.")
unit = db.query(RosterUnit).filter_by(id=pd.unit_id).first()
return _to_dict(pd, unit=unit, detail=True)
@router.post("/pending/{pending_id}/promote")
async def promote_pending(
pending_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""Classify a pending deployment → create a UnitAssignment.
Body JSON — one of two shapes:
1. Assign to existing location:
{
"location_id": "<uuid>",
"notes": "<optional>"
}
2. Create a new location under (existing or new) project:
{
"project_id": "<existing>" | null, # null means create new
"project_name": "<required if project_id is null>",
"project_type_id": "<existing project_type id, e.g. 'vibration_monitoring'>",
# required if creating new project
"location_name": "<required>",
"use_captured_coords": true | false, # default true — write the
# pending's coordinates onto
# the new location
"notes": "<optional>"
}
Status flips to "assigned"; resulting_assignment_id is populated.
"""
pd = db.query(PendingDeployment).filter_by(id=pending_id).first()
if not pd:
raise HTTPException(status_code=404, detail="Pending deployment not found.")
if pd.status != "awaiting":
raise HTTPException(
status_code=400,
detail=f"Pending deployment is {pd.status!r}, not awaiting — already classified?",
)
try:
payload = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body.")
notes = (payload.get("notes") or "").strip() or None
# Resolve / create the location.
location_id = payload.get("location_id")
if location_id:
location = db.query(MonitoringLocation).filter_by(id=location_id).first()
if not location:
raise HTTPException(status_code=404, detail=f"Location {location_id!r} not found.")
project_id = location.project_id
else:
# Create-new path. Need a project (existing or new).
project_id = payload.get("project_id")
if not project_id:
project_name = (payload.get("project_name") or "").strip()
project_type_id = (payload.get("project_type_id") or "").strip()
if not project_name:
raise HTTPException(
status_code=400,
detail="Either project_id, or project_name + project_type_id, required.",
)
if not project_type_id:
raise HTTPException(
status_code=400,
detail="project_type_id required when creating a new project.",
)
new_project = Project(
id=str(uuid.uuid4()),
name=project_name,
project_type_id=project_type_id,
status="active",
)
db.add(new_project)
db.flush()
project_id = new_project.id
loc_name = (payload.get("location_name") or "").strip()
if not loc_name:
raise HTTPException(status_code=400, detail="location_name required.")
use_coords = payload.get("use_captured_coords", True)
location = MonitoringLocation(
id=str(uuid.uuid4()),
project_id=project_id,
location_type="vibration", # seismographs only
name=loc_name,
coordinates=(pd.coordinates if use_coords else None),
)
db.add(location)
db.flush()
# Create the assignment. assigned_at = pending capture time (so
# events emitted after the install are correctly attributed back
# to this location).
assignment = UnitAssignment(
id=str(uuid.uuid4()),
unit_id=pd.unit_id,
location_id=location.id,
project_id=project_id,
device_type="seismograph",
assigned_at=pd.captured_at,
assigned_until=None,
status="active",
notes=notes,
source="manual",
)
db.add(assignment)
db.flush()
# Promote the pending row.
pd.status = "assigned"
pd.promoted_at = datetime.utcnow()
pd.resulting_assignment_id = assignment.id
pd.updated_at = datetime.utcnow()
_record_history(
db, unit_id=pd.unit_id,
change_type="pending_deployment_promoted",
old_value="awaiting",
new_value=f"{location.name} (assignment {assignment.id[:8]})",
notes=notes,
)
db.commit()
db.refresh(pd)
db.refresh(assignment)
return {
"success": True,
"assignment_id": assignment.id,
"location_id": location.id,
"project_id": project_id,
"promoted_at": pd.promoted_at.isoformat(),
}
@router.post("/pending/{pending_id}/cancel")
async def cancel_pending(
pending_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""Mark a pending deployment as cancelled (operator captured by mistake)."""
pd = db.query(PendingDeployment).filter_by(id=pending_id).first()
if not pd:
raise HTTPException(status_code=404, detail="Pending deployment not found.")
if pd.status != "awaiting":
raise HTTPException(
status_code=400,
detail=f"Pending deployment is {pd.status!r}, not awaiting.",
)
try:
payload = await request.json()
except Exception:
payload = {}
reason = (payload.get("reason") or "").strip() or None
pd.status = "cancelled"
pd.cancelled_at = datetime.utcnow()
pd.cancelled_reason = reason
pd.updated_at = datetime.utcnow()
_record_history(
db, unit_id=pd.unit_id,
change_type="pending_deployment_cancelled",
old_value="awaiting",
new_value="cancelled",
notes=reason,
)
db.commit()
return {"success": True, "cancelled_at": pd.cancelled_at.isoformat()}
# ── Helpers ───────────────────────────────────────────────────────────────────
def _to_dict(pd: PendingDeployment, unit: Optional[RosterUnit] = None, detail: bool = False) -> dict:
out = {
"id": pd.id,
"unit_id": pd.unit_id,
"captured_at": pd.captured_at.isoformat() if pd.captured_at else None,
"coordinates": pd.coordinates,
"operator_note": pd.operator_note,
"photo_filename": pd.photo_filename,
"photo_url": f"/api/unit/{pd.unit_id}/photo/{pd.photo_filename}"
if pd.photo_filename else None,
"status": pd.status,
"created_at": pd.created_at.isoformat() if pd.created_at else None,
}
if pd.status == "assigned":
out["promoted_at"] = pd.promoted_at.isoformat() if pd.promoted_at else None
out["resulting_assignment_id"] = pd.resulting_assignment_id
if pd.status == "cancelled":
out["cancelled_at"] = pd.cancelled_at.isoformat() if pd.cancelled_at else None
out["cancelled_reason"] = pd.cancelled_reason
if unit:
out["unit"] = {
"id": unit.id,
"device_type": unit.device_type,
"note": unit.note,
"deployed": unit.deployed,
}
return out