Files
terra-view/backend/routers/project_locations.py
T
serversdown f1f3da8e61 feat(sfm): unified deployment timeline (deprecate deployment_records)
Phase 4.  Rebuilds the seismograph "Deployment History" + "Timeline"
sections on the unit detail page as a single derived view computed from
three sources: unit_assignments (authoritative project/location windows),
unit_history (calibration/retirement/deployed state changes), and SFM
events overlaid per assignment window (count + peak PVS + last event).

Fixes the wonky-timeline symptoms: missing entries, duplicate/contradictory
rows, and no visibility into what the unit was actually doing during each
deployment window.

Backend:
- backend/services/deployment_timeline.py: new deployment_timeline_for_unit()
  helper.  Merges UnitAssignment rows (with SFM event overlay fetched
  concurrently via httpx), UnitHistory state-change rows (filtered to
  meaningful change_types and de-noised by dropping rows where
  old_value == new_value — there's noise in legacy audit log from
  record_history() being called on every save), and synthetic "gap"
  entries between assignments >= 1 day apart.  Sorts newest first.

- backend/routers/units.py: new GET /api/units/{unit_id}/deployment_timeline
  endpoint with optional include_events=false flag.

- backend/routers/project_locations.py: assign / unassign / swap /
  update endpoints now write UnitHistory rows on every assignment
  lifecycle event.  New change_types: assignment_created,
  assignment_ended, assignment_swapped, assignment_updated.  These
  surface in the unified timeline (where the assignment row itself
  shows the structural data; the audit row is filtered out to avoid
  double-rendering).  Closes a real gap — assignment changes were
  previously invisible to any audit consumer.

- backend/migrate_deprecate_deployment_records.py: non-destructive
  migration.  Adds deployment_records.deprecated_at column.  For each
  legacy row without a matching UnitAssignment, best-effort
  synthesizes one (with the free-text location_name preserved in
  notes).  Marks every processed row.  Idempotent.  DROP TABLE
  deferred to a follow-up release.

Frontend (templates/unit_detail.html):
- Removed legacy "Deployment History" card (with Log Deployment button)
  and the separate "Timeline" card.  Replaced with a single
  "Deployment Timeline" section.
- Three entry visual styles: assignment rows (orange dot, location +
  project link, event-overlay summary), gap rows (dashed outline, idle
  day count), and state_change rows (navy dot, friendly label, old →
  new value).  Active assignments get a green dot + "active" badge.
- Existing loadUnitHistory() and loadDeploymentHistory() functions kept
  as shims that delegate to loadDeploymentTimeline(), so modal-save
  callbacks that referenced them still trigger a refresh of the visible
  section.  Legacy function bodies preserved under _legacy_*_unused
  names for archeology; not called by anything.

Verified end-to-end:
- BE11529 timeline now shows 2 entries (active assignment with 24-event
  overlay + the deployed→benched state change), compared to the previous
  noisy mix that included 6 no-op state-change rows.
- Migration ran against real DB: 1 legacy row processed (had no
  project_id, marked deprecated without backfill).
- Assign / unassign / swap / edit now leave a paper trail in
  unit_history.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-12 00:15:07 +00:00

1325 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Project Locations Router
Handles monitoring locations (NRLs for sound, monitoring points for vibration)
and unit assignments within projects.
"""
from fastapi import APIRouter, Request, Depends, HTTPException, Query
from fastapi.responses import HTMLResponse, JSONResponse
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from datetime import datetime
from typing import Optional
import uuid
import json
from fastapi import UploadFile, File
import zipfile
import hashlib
import io
from pathlib import Path
from backend.database import get_db
from backend.models import (
Project,
ProjectType,
ProjectModule,
MonitoringLocation,
UnitAssignment,
RosterUnit,
MonitoringSession,
DataFile,
UnitHistory,
)
from backend.templates_config import templates
from backend.utils.timezone import local_to_utc
router = APIRouter(prefix="/api/projects/{project_id}", tags=["project-locations"])
# ── Audit log helper ──────────────────────────────────────────────────────────
# Mirrors record_history() in roster_edit.py. Kept local to avoid cross-router
# imports. The four assignment endpoints below use this to write UnitHistory
# rows that the unit-detail deployment timeline (Phase 4) renders.
def _record_assignment_history(
db: Session,
unit_id: str,
change_type: str,
*,
old_value: Optional[str] = None,
new_value: Optional[str] = None,
notes: Optional[str] = None,
) -> None:
"""Append a UnitHistory row for an assignment-lifecycle event.
change_type values used:
- assignment_created — unit assigned to a location (new assignment)
- assignment_ended — unit unassigned / removed (assigned_until set)
- assignment_swapped — unit replaced by another at the same location
- assignment_updated — assignment dates / notes edited via PATCH
Caller is responsible for db.commit().
"""
db.add(UnitHistory(
unit_id=unit_id,
change_type=change_type,
field_name="unit_assignment",
old_value=old_value,
new_value=new_value,
changed_at=datetime.utcnow(),
source="manual",
notes=notes,
))
# ============================================================================
# Shared helpers
# ============================================================================
def _require_module(project, module_type: str, db: Session) -> None:
"""Raise 400 if the project does not have the given module enabled."""
if not project:
raise HTTPException(status_code=404, detail="Project not found.")
exists = db.query(ProjectModule).filter_by(
project_id=project.id, module_type=module_type, enabled=True
).first()
if not exists:
raise HTTPException(
status_code=400,
detail=f"This project does not have the {module_type.replace('_', ' ').title()} module enabled.",
)
# ============================================================================
# Session period helpers
# ============================================================================
def _derive_period_type(dt: datetime) -> str:
"""
Classify a session start time into one of four period types.
Night = 22:0007:00, Day = 07:0022:00.
Weekend = Saturday (5) or Sunday (6).
"""
is_weekend = dt.weekday() >= 5
is_night = dt.hour >= 22 or dt.hour < 7
if is_weekend:
return "weekend_night" if is_night else "weekend_day"
return "weekday_night" if is_night else "weekday_day"
def _build_session_label(dt: datetime, location_name: str, period_type: str) -> str:
"""Build a human-readable session label, e.g. 'NRL-1 — Sun 2/23 — Night'.
Uses started_at date as-is; user can correct period_type in the wizard.
"""
day_abbr = dt.strftime("%a")
date_str = f"{dt.month}/{dt.day}"
period_str = {
"weekday_day": "Day",
"weekday_night": "Night",
"weekend_day": "Day",
"weekend_night": "Night",
}.get(period_type, "")
parts = [p for p in [location_name, f"{day_abbr} {date_str}", period_str] if p]
return "".join(parts)
# ============================================================================
# Monitoring Locations CRUD
# ============================================================================
@router.get("/locations", response_class=HTMLResponse)
async def get_project_locations(
project_id: str,
request: Request,
db: Session = Depends(get_db),
location_type: Optional[str] = Query(None),
):
"""
Get all monitoring locations for a project.
Returns HTML partial with location list.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
query = db.query(MonitoringLocation).filter_by(project_id=project_id)
# Filter by type if provided
if location_type:
query = query.filter_by(location_type=location_type)
locations = query.order_by(MonitoringLocation.name).all()
# Enrich with assignment info
locations_data = []
for location in locations:
# Get active assignment (active = assigned_until IS NULL)
assignment = db.query(UnitAssignment).filter(
and_(
UnitAssignment.location_id == location.id,
UnitAssignment.assigned_until == None,
)
).first()
assigned_unit = None
if assignment:
assigned_unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first()
# Count monitoring sessions
session_count = db.query(MonitoringSession).filter_by(
location_id=location.id
).count()
locations_data.append({
"location": location,
"assignment": assignment,
"assigned_unit": assigned_unit,
"session_count": session_count,
})
return templates.TemplateResponse("partials/projects/location_list.html", {
"request": request,
"project": project,
"locations": locations_data,
})
@router.get("/locations-json")
async def get_project_locations_json(
project_id: str,
db: Session = Depends(get_db),
location_type: Optional[str] = Query(None),
):
"""
Get all monitoring locations for a project as JSON.
Used by the schedule modal to populate location dropdown.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
query = db.query(MonitoringLocation).filter_by(project_id=project_id)
if location_type:
query = query.filter_by(location_type=location_type)
locations = query.order_by(MonitoringLocation.name).all()
return [
{
"id": loc.id,
"name": loc.name,
"location_type": loc.location_type,
"description": loc.description,
"address": loc.address,
"coordinates": loc.coordinates,
}
for loc in locations
]
@router.post("/locations/create")
async def create_location(
project_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Create a new monitoring location within a project.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
form_data = await request.form()
location = MonitoringLocation(
id=str(uuid.uuid4()),
project_id=project_id,
location_type=form_data.get("location_type"),
name=form_data.get("name"),
description=form_data.get("description"),
coordinates=form_data.get("coordinates"),
address=form_data.get("address"),
location_metadata=form_data.get("location_metadata"), # JSON string
)
db.add(location)
db.commit()
db.refresh(location)
return JSONResponse({
"success": True,
"location_id": location.id,
"message": f"Location '{location.name}' created successfully",
})
@router.put("/locations/{location_id}")
async def update_location(
project_id: str,
location_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Update a monitoring location.
"""
location = db.query(MonitoringLocation).filter_by(
id=location_id,
project_id=project_id,
).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
data = await request.json()
# Update fields if provided
if "name" in data:
location.name = data["name"]
if "description" in data:
location.description = data["description"]
if "location_type" in data:
location.location_type = data["location_type"]
if "coordinates" in data:
location.coordinates = data["coordinates"]
if "address" in data:
location.address = data["address"]
if "location_metadata" in data:
location.location_metadata = data["location_metadata"]
location.updated_at = datetime.utcnow()
db.commit()
return {"success": True, "message": "Location updated successfully"}
@router.delete("/locations/{location_id}")
async def delete_location(
project_id: str,
location_id: str,
db: Session = Depends(get_db),
):
"""
Delete a monitoring location.
"""
location = db.query(MonitoringLocation).filter_by(
id=location_id,
project_id=project_id,
).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# Check if location has active assignments (active = assigned_until IS NULL)
active_assignments = db.query(UnitAssignment).filter(
and_(
UnitAssignment.location_id == location_id,
UnitAssignment.assigned_until == None,
)
).count()
if active_assignments > 0:
raise HTTPException(
status_code=400,
detail="Cannot delete location with active unit assignments. Unassign units first.",
)
db.delete(location)
db.commit()
return {"success": True, "message": "Location deleted successfully"}
# ============================================================================
# Unit Assignments
# ============================================================================
@router.get("/assignments", response_class=HTMLResponse)
async def get_project_assignments(
project_id: str,
request: Request,
db: Session = Depends(get_db),
status: Optional[str] = Query("active"),
):
"""
Get all unit assignments for a project.
Returns HTML partial with assignment list.
"""
query = db.query(UnitAssignment).filter_by(project_id=project_id)
if status:
query = query.filter_by(status=status)
assignments = query.order_by(UnitAssignment.assigned_at.desc()).all()
# Enrich with unit and location details
assignments_data = []
for assignment in assignments:
unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first()
location = db.query(MonitoringLocation).filter_by(id=assignment.location_id).first()
assignments_data.append({
"assignment": assignment,
"unit": unit,
"location": location,
})
return templates.TemplateResponse("partials/projects/assignment_list.html", {
"request": request,
"project_id": project_id,
"assignments": assignments_data,
})
@router.post("/locations/{location_id}/assign")
async def assign_unit_to_location(
project_id: str,
location_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Assign a unit to a monitoring location.
"""
location = db.query(MonitoringLocation).filter_by(
id=location_id,
project_id=project_id,
).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
form_data = await request.form()
unit_id = form_data.get("unit_id")
# Verify unit exists and matches location type
unit = db.query(RosterUnit).filter_by(id=unit_id).first()
if not unit:
raise HTTPException(status_code=404, detail="Unit not found")
# Check device type matches location type
expected_device_type = "slm" if location.location_type == "sound" else "seismograph"
if unit.device_type != expected_device_type:
raise HTTPException(
status_code=400,
detail=f"Unit type '{unit.device_type}' does not match location type '{location.location_type}'",
)
# Check if location already has an active assignment (active = assigned_until IS NULL)
existing_assignment = db.query(UnitAssignment).filter(
and_(
UnitAssignment.location_id == location_id,
UnitAssignment.assigned_until == None,
)
).first()
if existing_assignment:
raise HTTPException(
status_code=400,
detail=f"Location already has an active unit assignment ({existing_assignment.unit_id}). Use swap to replace it.",
)
# Create new assignment
assigned_until_str = form_data.get("assigned_until")
assigned_until = datetime.fromisoformat(assigned_until_str) if assigned_until_str else None
assignment = UnitAssignment(
id=str(uuid.uuid4()),
unit_id=unit_id,
location_id=location_id,
project_id=project_id,
device_type=unit.device_type,
assigned_until=assigned_until,
status="active",
notes=form_data.get("notes"),
)
db.add(assignment)
_record_assignment_history(
db,
unit_id=unit_id,
change_type="assignment_created",
new_value=f"{location.name} (project: {location.project_id})",
notes=form_data.get("notes"),
)
db.commit()
db.refresh(assignment)
return JSONResponse({
"success": True,
"assignment_id": assignment.id,
"message": f"Unit '{unit_id}' assigned to '{location.name}'",
})
@router.post("/assignments/{assignment_id}/unassign")
async def unassign_unit(
project_id: str,
assignment_id: str,
db: Session = Depends(get_db),
):
"""
Unassign a unit from a location.
"""
assignment = db.query(UnitAssignment).filter_by(
id=assignment_id,
project_id=project_id,
).first()
if not assignment:
raise HTTPException(status_code=404, detail="Assignment not found")
# Check if there are active monitoring sessions
active_sessions = db.query(MonitoringSession).filter(
and_(
MonitoringSession.location_id == assignment.location_id,
MonitoringSession.unit_id == assignment.unit_id,
MonitoringSession.status == "recording",
)
).count()
if active_sessions > 0:
raise HTTPException(
status_code=400,
detail="Cannot unassign unit with active monitoring sessions. Stop monitoring first.",
)
assignment.status = "completed"
assignment.assigned_until = datetime.utcnow()
location = db.query(MonitoringLocation).filter_by(id=assignment.location_id).first()
_record_assignment_history(
db,
unit_id=assignment.unit_id,
change_type="assignment_ended",
old_value=location.name if location else assignment.location_id,
new_value="unassigned",
)
db.commit()
return {"success": True, "message": "Unit unassigned successfully"}
@router.patch("/assignments/{assignment_id}")
async def update_assignment(
project_id: str,
assignment_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Update an assignment's date window and/or notes.
Common use case: backdate a deployment so events emitted before the
operator created the assignment in terra-view (e.g. a unit that was
physically deployed in December but only recorded in the system today)
get correctly attributed to the location.
Accepts JSON body with optional fields:
- assigned_at: ISO datetime (or empty string to leave unchanged)
- assigned_until: ISO datetime, or null/"" to mark indefinite (active)
- notes: string
Sets `status` to "active" when assigned_until is cleared, "completed"
when it's set in the past.
"""
assignment = db.query(UnitAssignment).filter_by(
id=assignment_id,
project_id=project_id,
).first()
if not assignment:
raise HTTPException(status_code=404, detail="Assignment not found")
try:
payload = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
# Parse new values (None = unchanged, explicit None/"" for assigned_until = clear)
new_assigned_at = assignment.assigned_at
new_assigned_until = assignment.assigned_until
new_notes = assignment.notes
if "assigned_at" in payload:
raw = payload["assigned_at"]
if raw is None or raw == "":
raise HTTPException(
status_code=400,
detail="assigned_at is required; cannot be cleared.",
)
try:
# Accept "YYYY-MM-DDTHH:MM" from datetime-local inputs or full ISO.
new_assigned_at = datetime.fromisoformat(raw)
except (TypeError, ValueError):
raise HTTPException(
status_code=400,
detail=f"Invalid assigned_at datetime: {raw!r}",
)
if "assigned_until" in payload:
raw = payload["assigned_until"]
if raw is None or raw == "":
new_assigned_until = None
else:
try:
new_assigned_until = datetime.fromisoformat(raw)
except (TypeError, ValueError):
raise HTTPException(
status_code=400,
detail=f"Invalid assigned_until datetime: {raw!r}",
)
if "notes" in payload:
raw = payload["notes"]
new_notes = (raw or "").strip() or None
# Validation: end must be after start if both set.
if new_assigned_until is not None and new_assigned_until <= new_assigned_at:
raise HTTPException(
status_code=400,
detail="assigned_until must be after assigned_at.",
)
# Sanity: reject creating an overlap with another assignment of the SAME
# unit at the SAME location. Different units at the same location can
# legitimately overlap during a swap window (rare but valid).
new_end_for_overlap = new_assigned_until or datetime.utcnow()
overlapping = (
db.query(UnitAssignment)
.filter(UnitAssignment.location_id == assignment.location_id)
.filter(UnitAssignment.unit_id == assignment.unit_id)
.filter(UnitAssignment.id != assignment.id)
.all()
)
for other in overlapping:
other_start = other.assigned_at
other_end = other.assigned_until or datetime.utcnow()
if new_assigned_at < other_end and new_end_for_overlap > other_start:
raise HTTPException(
status_code=400,
detail=(
f"This window overlaps with another assignment for the "
f"same unit ({other.assigned_at:%Y-%m-%d}"
f"{other.assigned_until and other.assigned_until.strftime('%Y-%m-%d') or 'present'})."
),
)
# Capture change description for audit log BEFORE mutating.
old_start = assignment.assigned_at.isoformat() if assignment.assigned_at else None
old_end = assignment.assigned_until.isoformat() if assignment.assigned_until else "active"
new_start = new_assigned_at.isoformat() if new_assigned_at else None
new_end = new_assigned_until.isoformat() if new_assigned_until else "active"
# Apply.
assignment.assigned_at = new_assigned_at
assignment.assigned_until = new_assigned_until
assignment.notes = new_notes
assignment.status = "completed" if new_assigned_until is not None else "active"
if old_start != new_start or old_end != new_end:
_record_assignment_history(
db,
unit_id=assignment.unit_id,
change_type="assignment_updated",
old_value=f"{old_start}{old_end}",
new_value=f"{new_start}{new_end}",
notes=new_notes,
)
db.commit()
db.refresh(assignment)
return {
"success": True,
"assignment": {
"id": assignment.id,
"unit_id": assignment.unit_id,
"location_id": assignment.location_id,
"assigned_at": assignment.assigned_at.isoformat() if assignment.assigned_at else None,
"assigned_until": assignment.assigned_until.isoformat() if assignment.assigned_until else None,
"status": assignment.status,
"notes": assignment.notes,
},
}
@router.post("/locations/{location_id}/swap")
async def swap_unit_on_location(
project_id: str,
location_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Swap the unit assigned to a vibration monitoring location.
Ends the current active assignment (if any), creates a new one,
and optionally updates modem pairing on the seismograph.
Works for first-time assignments too (no current assignment = just create).
"""
location = db.query(MonitoringLocation).filter_by(
id=location_id,
project_id=project_id,
).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
form_data = await request.form()
unit_id = form_data.get("unit_id")
modem_id = form_data.get("modem_id") or None
notes = form_data.get("notes") or None
if not unit_id:
raise HTTPException(status_code=400, detail="unit_id is required")
# Validate new unit
unit = db.query(RosterUnit).filter_by(id=unit_id).first()
if not unit:
raise HTTPException(status_code=404, detail="Unit not found")
expected_device_type = "slm" if location.location_type == "sound" else "seismograph"
if unit.device_type != expected_device_type:
raise HTTPException(
status_code=400,
detail=f"Unit type '{unit.device_type}' does not match location type '{location.location_type}'",
)
# End current active assignment if one exists (active = assigned_until IS NULL)
current = db.query(UnitAssignment).filter(
and_(
UnitAssignment.location_id == location_id,
UnitAssignment.assigned_until == None,
)
).first()
if current:
current.assigned_until = datetime.utcnow()
current.status = "completed"
# If the swap is replacing a different unit, that unit's deployment ended.
if current.unit_id != unit_id:
_record_assignment_history(
db,
unit_id=current.unit_id,
change_type="assignment_swapped",
old_value=location.name,
new_value=f"swapped out → {unit_id}",
notes=notes,
)
# Create new assignment
new_assignment = UnitAssignment(
id=str(uuid.uuid4()),
unit_id=unit_id,
location_id=location_id,
project_id=project_id,
device_type=unit.device_type,
assigned_until=None,
status="active",
notes=notes,
)
db.add(new_assignment)
_record_assignment_history(
db,
unit_id=unit_id,
change_type="assignment_swapped" if (current and current.unit_id != unit_id) else "assignment_created",
new_value=f"{location.name} (project: {location.project_id})",
notes=notes,
)
# Update modem pairing on the seismograph if modem provided
if modem_id:
modem = db.query(RosterUnit).filter_by(id=modem_id, device_type="modem").first()
if not modem:
raise HTTPException(status_code=404, detail=f"Modem '{modem_id}' not found")
unit.deployed_with_modem_id = modem_id
modem.deployed_with_unit_id = unit_id
else:
# Clear modem pairing if not provided
unit.deployed_with_modem_id = None
db.commit()
return JSONResponse({
"success": True,
"assignment_id": new_assignment.id,
"message": f"Unit '{unit_id}' assigned to '{location.name}'" + (f" with modem '{modem_id}'" if modem_id else ""),
})
# ============================================================================
# Available Units for Assignment
# ============================================================================
@router.get("/available-modems", response_class=JSONResponse)
async def get_available_modems(
project_id: str,
db: Session = Depends(get_db),
):
"""
Get all deployed, non-retired modems for the modem assignment dropdown.
"""
modems = db.query(RosterUnit).filter(
and_(
RosterUnit.device_type == "modem",
RosterUnit.deployed == True,
RosterUnit.retired == False,
)
).order_by(RosterUnit.id).all()
return [
{
"id": m.id,
"hardware_model": m.hardware_model,
"ip_address": m.ip_address,
}
for m in modems
]
@router.get("/available-units", response_class=JSONResponse)
async def get_available_units(
project_id: str,
location_type: str = Query(...),
db: Session = Depends(get_db),
):
"""
Get list of available units for assignment to a location.
Filters by device type matching the location type.
"""
# Determine required device type
required_device_type = "slm" if location_type == "sound" else "seismograph"
# Get all units of the required type that are deployed and not retired
all_units = db.query(RosterUnit).filter(
and_(
RosterUnit.device_type == required_device_type,
RosterUnit.deployed == True,
RosterUnit.retired == False,
)
).all()
# Filter out units that already have active assignments (active = assigned_until IS NULL)
assigned_unit_ids = db.query(UnitAssignment.unit_id).filter(
UnitAssignment.assigned_until == None
).distinct().all()
assigned_unit_ids = [uid[0] for uid in assigned_unit_ids]
available_units = [
{
"id": unit.id,
"device_type": unit.device_type,
"location": unit.address or unit.location,
"model": unit.slm_model if unit.device_type == "slm" else unit.unit_type,
}
for unit in all_units
if unit.id not in assigned_unit_ids
]
return available_units
# ============================================================================
# NRL-specific endpoints for detail page
# ============================================================================
@router.get("/nrl/{location_id}/sessions", response_class=HTMLResponse)
async def get_nrl_sessions(
project_id: str,
location_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Get monitoring sessions for a specific NRL.
Returns HTML partial with session list.
"""
sessions = db.query(MonitoringSession).filter_by(
location_id=location_id
).order_by(MonitoringSession.started_at.desc()).all()
# Enrich with unit details
sessions_data = []
for session in sessions:
unit = None
if session.unit_id:
unit = db.query(RosterUnit).filter_by(id=session.unit_id).first()
sessions_data.append({
"session": session,
"unit": unit,
})
return templates.TemplateResponse("partials/projects/session_list.html", {
"request": request,
"project_id": project_id,
"location_id": location_id,
"sessions": sessions_data,
})
@router.get("/vibration_summary", response_class=HTMLResponse)
async def get_project_vibration_summary(
project_id: str,
request: Request,
from_dt: Optional[datetime] = Query(None),
to_dt: Optional[datetime] = Query(None),
db: Session = Depends(get_db),
):
"""
Render a small HTML partial summarising vibration-event activity
across every vibration MonitoringLocation in the project.
Returned to the Vibration tab of the project detail page via HTMX.
Fans out concurrently across all locations (which in turn fan out
across each location's UnitAssignment windows). Total queries to
SFM = sum of assignments across the project.
404 if the project doesn't exist. Empty-state partial if the
project has no vibration locations.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found.")
from backend.services.sfm_events import vibration_summary_for_project
summary = await vibration_summary_for_project(
db, project_id, from_dt=from_dt, to_dt=to_dt
)
return templates.TemplateResponse(
"partials/projects/vibration_summary.html",
{
"request": request,
"project_id": project_id,
"summary": summary,
},
)
@router.get("/locations/{location_id}/events", response_class=JSONResponse)
async def get_location_events(
project_id: str,
location_id: str,
from_dt: Optional[datetime] = Query(None),
to_dt: Optional[datetime] = Query(None),
false_trigger: Optional[bool] = Query(None),
limit: int = Query(500, ge=1, le=5000),
db: Session = Depends(get_db),
):
"""
Return SFM events recorded at this monitoring location.
Fans out the location's UnitAssignment rows (every seismograph ever
assigned to this location, active + closed), queries SFM /db/events
for each (serial, time-window) pair concurrently, and unions the
results.
Sound (SLM) locations return an empty payload — SFM events are
seismograph-only.
"""
location = db.query(MonitoringLocation).filter_by(id=location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found.")
if location.project_id != project_id:
raise HTTPException(
status_code=404,
detail="Location does not belong to this project.",
)
# SLM locations don't have SFM events — return an empty payload rather
# than 404 so the frontend can render an empty state gracefully.
if location.location_type != "vibration":
return {
"events": [],
"count": 0,
"stats": {
"event_count": 0,
"peak_pvs": None,
"peak_pvs_at": None,
"peak_pvs_serial": None,
"last_event": None,
"false_trigger_count": 0,
},
"assignments_used": [],
"location_type": location.location_type,
}
from backend.services.sfm_events import events_for_location
result = await events_for_location(
db,
location_id,
from_dt=from_dt,
to_dt=to_dt,
false_trigger=false_trigger,
limit=limit,
)
result["location_type"] = location.location_type
return result
@router.get("/nrl/{location_id}/files", response_class=HTMLResponse)
async def get_nrl_files(
project_id: str,
location_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Get data files for a specific NRL.
Returns HTML partial with file list.
"""
# Join DataFile with MonitoringSession to filter by location_id
files = db.query(DataFile).join(
MonitoringSession,
DataFile.session_id == MonitoringSession.id
).filter(
MonitoringSession.location_id == location_id
).order_by(DataFile.created_at.desc()).all()
# Enrich with session details
files_data = []
for file in files:
session = None
if file.session_id:
session = db.query(MonitoringSession).filter_by(id=file.session_id).first()
files_data.append({
"file": file,
"session": session,
})
return templates.TemplateResponse("partials/projects/file_list.html", {
"request": request,
"project_id": project_id,
"location_id": location_id,
"files": files_data,
})
# ============================================================================
# Manual SD Card Data Upload
# ============================================================================
def _parse_rnh(content: bytes) -> dict:
"""
Parse a Rion .rnh metadata file (INI-style with [Section] headers).
Returns a dict of key metadata fields.
"""
result = {}
try:
text = content.decode("utf-8", errors="replace")
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("["):
continue
if "," in line:
key, _, value = line.partition(",")
key = key.strip()
value = value.strip()
if key == "Serial Number":
result["serial_number"] = value
elif key == "Store Name":
result["store_name"] = value
elif key == "Index Number":
result["index_number"] = value
elif key == "Measurement Start Time":
result["start_time_str"] = value
elif key == "Measurement Stop Time":
result["stop_time_str"] = value
elif key == "Total Measurement Time":
result["total_time_str"] = value
except Exception:
pass
return result
def _parse_rnh_datetime(s: str):
"""Parse RNH datetime string: '2026/02/17 19:00:19' -> datetime"""
from datetime import datetime
if not s:
return None
try:
return datetime.strptime(s.strip(), "%Y/%m/%d %H:%M:%S")
except Exception:
return None
def _classify_file(filename: str) -> str:
"""Classify a file by name into a DataFile file_type."""
name = filename.lower()
if name.endswith(".rnh"):
return "log"
if name.endswith(".rnd"):
return "measurement"
if name.endswith(".zip"):
return "archive"
return "data"
@router.post("/nrl/{location_id}/upload-data")
async def upload_nrl_data(
project_id: str,
location_id: str,
db: Session = Depends(get_db),
files: list[UploadFile] = File(...),
):
"""
Manually upload SD card data for an offline NRL.
Accepts either:
- A single .zip file (the Auto_#### folder zipped) — auto-extracted
- Multiple .rnd / .rnh files selected directly from the SD card folder
Creates a MonitoringSession from .rnh metadata and DataFile records
for each measurement file. No unit assignment required.
"""
from datetime import datetime
# Verify project and location exist
project = db.query(Project).filter_by(id=project_id).first()
_require_module(project, "sound_monitoring", db)
location = db.query(MonitoringLocation).filter_by(
id=location_id, project_id=project_id
).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# --- Step 1: Normalize to (filename, bytes) list ---
file_entries: list[tuple[str, bytes]] = []
if len(files) == 1 and files[0].filename.lower().endswith(".zip"):
raw = await files[0].read()
try:
with zipfile.ZipFile(io.BytesIO(raw)) as zf:
for info in zf.infolist():
if info.is_dir():
continue
name = Path(info.filename).name # strip folder path
if not name:
continue
file_entries.append((name, zf.read(info)))
except zipfile.BadZipFile:
raise HTTPException(status_code=400, detail="Uploaded file is not a valid ZIP archive.")
else:
for uf in files:
data = await uf.read()
file_entries.append((uf.filename, data))
if not file_entries:
raise HTTPException(status_code=400, detail="No usable files found in upload.")
# --- Step 1b: Filter to only relevant files ---
# Keep: .rnh (metadata) and measurement .rnd files
# NL-43 generates two .rnd types: _Leq_ (15-min averages, wanted) and _Lp_ (1-sec granular, skip)
# AU2 (NL-23/older Rion) generates a single Au2_####.rnd per session — always keep those
# Drop: _Lp_ .rnd, .xlsx, .mp3, and anything else
def _is_wanted(fname: str) -> bool:
n = fname.lower()
if n.endswith(".rnh"):
return True
if n.endswith(".rnd"):
if "_leq_" in n: # NL-43 Leq file
return True
if n.startswith("au2_"): # AU2 format (NL-23) — always Leq equivalent
return True
if "_lp" not in n and "_leq_" not in n:
# Unknown .rnd format — include it so we don't silently drop data
return True
return False
file_entries = [(fname, fbytes) for fname, fbytes in file_entries if _is_wanted(fname)]
if not file_entries:
raise HTTPException(status_code=400, detail="No usable .rnd or .rnh files found. Expected NL-43 _Leq_ files or AU2 format .rnd files.")
# --- Step 2: Find and parse .rnh metadata ---
rnh_meta = {}
for fname, fbytes in file_entries:
if fname.lower().endswith(".rnh"):
rnh_meta = _parse_rnh(fbytes)
break
# RNH files store local time (no UTC offset). Use local values for period
# classification / label generation, then convert to UTC for DB storage so
# the local_datetime Jinja filter displays the correct time.
started_at_local = _parse_rnh_datetime(rnh_meta.get("start_time_str")) or datetime.utcnow()
stopped_at_local = _parse_rnh_datetime(rnh_meta.get("stop_time_str"))
started_at = local_to_utc(started_at_local)
stopped_at = local_to_utc(stopped_at_local) if stopped_at_local else None
duration_seconds = None
if started_at and stopped_at:
duration_seconds = int((stopped_at - started_at).total_seconds())
store_name = rnh_meta.get("store_name", "")
serial_number = rnh_meta.get("serial_number", "")
index_number = rnh_meta.get("index_number", "")
# --- Step 3: Create MonitoringSession ---
# Use local times for period/label so classification reflects the clock at the site.
period_type = _derive_period_type(started_at_local) if started_at_local else None
session_label = _build_session_label(started_at_local, location.name, period_type) if started_at_local else None
session_id = str(uuid.uuid4())
monitoring_session = MonitoringSession(
id=session_id,
project_id=project_id,
location_id=location_id,
unit_id=None,
session_type="sound",
started_at=started_at,
stopped_at=stopped_at,
duration_seconds=duration_seconds,
status="completed",
session_label=session_label,
period_type=period_type,
session_metadata=json.dumps({
"source": "manual_upload",
"store_name": store_name,
"serial_number": serial_number,
"index_number": index_number,
}),
)
db.add(monitoring_session)
db.commit()
db.refresh(monitoring_session)
# --- Step 4: Write files to disk and create DataFile records ---
output_dir = Path("data/Projects") / project_id / session_id
output_dir.mkdir(parents=True, exist_ok=True)
leq_count = 0
lp_count = 0
metadata_count = 0
files_imported = 0
for fname, fbytes in file_entries:
file_type = _classify_file(fname)
fname_lower = fname.lower()
# Track counts for summary
if fname_lower.endswith(".rnd"):
if "_leq_" in fname_lower:
leq_count += 1
elif "_lp" in fname_lower:
lp_count += 1
elif fname_lower.endswith(".rnh"):
metadata_count += 1
# Write to disk
dest = output_dir / fname
dest.write_bytes(fbytes)
# Compute checksum
checksum = hashlib.sha256(fbytes).hexdigest()
# Store relative path from data/ dir
rel_path = str(dest.relative_to("data"))
data_file = DataFile(
id=str(uuid.uuid4()),
session_id=session_id,
file_path=rel_path,
file_type=file_type,
file_size_bytes=len(fbytes),
downloaded_at=datetime.utcnow(),
checksum=checksum,
file_metadata=json.dumps({
"source": "manual_upload",
"original_filename": fname,
"store_name": store_name,
}),
)
db.add(data_file)
files_imported += 1
db.commit()
return {
"success": True,
"session_id": session_id,
"files_imported": files_imported,
"leq_files": leq_count,
"lp_files": lp_count,
"metadata_files": metadata_count,
"store_name": store_name,
"started_at": started_at.isoformat() if started_at else None,
"stopped_at": stopped_at.isoformat() if stopped_at else None,
}
# ============================================================================
# NRL Live Status (connected NRLs only)
# ============================================================================
@router.get("/nrl/{location_id}/live-status", response_class=HTMLResponse)
async def get_nrl_live_status(
project_id: str,
location_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Fetch cached status from SLMM for the unit assigned to this NRL and
return a compact HTML status card. Used in the NRL overview tab for
connected NRLs. Gracefully shows an offline message if SLMM is unreachable.
Sound Monitoring projects only.
"""
import os
import httpx
_require_module(db.query(Project).filter_by(id=project_id).first(), "sound_monitoring", db)
# Find the assigned unit (active = assigned_until IS NULL)
assignment = db.query(UnitAssignment).filter(
and_(
UnitAssignment.location_id == location_id,
UnitAssignment.assigned_until == None,
)
).first()
if not assignment:
return templates.TemplateResponse("partials/projects/nrl_live_status.html", {
"request": request,
"status": None,
"error": "No unit assigned",
})
unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first()
if not unit:
return templates.TemplateResponse("partials/projects/nrl_live_status.html", {
"request": request,
"status": None,
"error": "Assigned unit not found",
})
slmm_base = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
status_data = None
error_msg = None
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(f"{slmm_base}/api/nl43/{unit.id}/status")
if resp.status_code == 200:
status_data = resp.json()
else:
error_msg = f"SLMM returned {resp.status_code}"
except Exception as e:
error_msg = "SLMM unreachable"
return templates.TemplateResponse("partials/projects/nrl_live_status.html", {
"request": request,
"unit": unit,
"status": status_data,
"error": error_msg,
})