Files
terra-view/backend/routers/project_locations.py
serversdown e8e155556a refactor: unify active assignment checks and add project-type guards
- Replace all UnitAssignment "active" checks from `status == "active"` to
  `assigned_until == None` in both project_locations.py and projects.py.
  This aligns with the canonical definition: active = no end date set.
  (status field is still set in sync, but is no longer the query criterion)

- Add `_require_sound_project()` helper to both routers and call it at the
  top of every sound-monitoring-specific endpoint (FTP browser, FTP downloads,
  RND file viewer, all Excel report endpoints, combined report wizard,
  upload-all, NRL live status, NRL data upload). Vibration projects hitting
  these endpoints now receive a clear 400 instead of silently failing or
  returning empty results.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-27 21:12:38 +00:00

994 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Project Locations Router
Handles monitoring locations (NRLs for sound, monitoring points for vibration)
and unit assignments within projects.
"""
from fastapi import APIRouter, Request, Depends, HTTPException, Query
from fastapi.responses import HTMLResponse, JSONResponse
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from datetime import datetime
from typing import Optional
import uuid
import json
from fastapi import UploadFile, File
import zipfile
import hashlib
import io
from pathlib import Path
from backend.database import get_db
from backend.models import (
Project,
ProjectType,
MonitoringLocation,
UnitAssignment,
RosterUnit,
MonitoringSession,
DataFile,
)
from backend.templates_config import templates
router = APIRouter(prefix="/api/projects/{project_id}", tags=["project-locations"])
# ============================================================================
# Shared helpers
# ============================================================================
def _require_sound_project(project) -> None:
"""Raise 400 if the project is not a sound_monitoring project."""
if not project or project.project_type_id != "sound_monitoring":
raise HTTPException(
status_code=400,
detail="This feature is only available for Sound Monitoring projects.",
)
# ============================================================================
# Session period helpers
# ============================================================================
def _derive_period_type(dt: datetime) -> str:
"""
Classify a session start time into one of four period types.
Night = 22:0007:00, Day = 07:0022:00.
Weekend = Saturday (5) or Sunday (6).
"""
is_weekend = dt.weekday() >= 5
is_night = dt.hour >= 22 or dt.hour < 7
if is_weekend:
return "weekend_night" if is_night else "weekend_day"
return "weekday_night" if is_night else "weekday_day"
def _build_session_label(dt: datetime, location_name: str, period_type: str) -> str:
"""Build a human-readable session label, e.g. 'NRL-1 — Sun 2/23 — Night'.
Uses started_at date as-is; user can correct period_type in the wizard.
"""
day_abbr = dt.strftime("%a")
date_str = f"{dt.month}/{dt.day}"
period_str = {
"weekday_day": "Day",
"weekday_night": "Night",
"weekend_day": "Day",
"weekend_night": "Night",
}.get(period_type, "")
parts = [p for p in [location_name, f"{day_abbr} {date_str}", period_str] if p]
return "".join(parts)
# ============================================================================
# Monitoring Locations CRUD
# ============================================================================
@router.get("/locations", response_class=HTMLResponse)
async def get_project_locations(
project_id: str,
request: Request,
db: Session = Depends(get_db),
location_type: Optional[str] = Query(None),
):
"""
Get all monitoring locations for a project.
Returns HTML partial with location list.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
query = db.query(MonitoringLocation).filter_by(project_id=project_id)
# Filter by type if provided
if location_type:
query = query.filter_by(location_type=location_type)
locations = query.order_by(MonitoringLocation.name).all()
# Enrich with assignment info
locations_data = []
for location in locations:
# Get active assignment (active = assigned_until IS NULL)
assignment = db.query(UnitAssignment).filter(
and_(
UnitAssignment.location_id == location.id,
UnitAssignment.assigned_until == None,
)
).first()
assigned_unit = None
if assignment:
assigned_unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first()
# Count monitoring sessions
session_count = db.query(MonitoringSession).filter_by(
location_id=location.id
).count()
locations_data.append({
"location": location,
"assignment": assignment,
"assigned_unit": assigned_unit,
"session_count": session_count,
})
return templates.TemplateResponse("partials/projects/location_list.html", {
"request": request,
"project": project,
"locations": locations_data,
})
@router.get("/locations-json")
async def get_project_locations_json(
project_id: str,
db: Session = Depends(get_db),
location_type: Optional[str] = Query(None),
):
"""
Get all monitoring locations for a project as JSON.
Used by the schedule modal to populate location dropdown.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
query = db.query(MonitoringLocation).filter_by(project_id=project_id)
if location_type:
query = query.filter_by(location_type=location_type)
locations = query.order_by(MonitoringLocation.name).all()
return [
{
"id": loc.id,
"name": loc.name,
"location_type": loc.location_type,
"description": loc.description,
"address": loc.address,
"coordinates": loc.coordinates,
}
for loc in locations
]
@router.post("/locations/create")
async def create_location(
project_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Create a new monitoring location within a project.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
form_data = await request.form()
location = MonitoringLocation(
id=str(uuid.uuid4()),
project_id=project_id,
location_type=form_data.get("location_type"),
name=form_data.get("name"),
description=form_data.get("description"),
coordinates=form_data.get("coordinates"),
address=form_data.get("address"),
location_metadata=form_data.get("location_metadata"), # JSON string
)
db.add(location)
db.commit()
db.refresh(location)
return JSONResponse({
"success": True,
"location_id": location.id,
"message": f"Location '{location.name}' created successfully",
})
@router.put("/locations/{location_id}")
async def update_location(
project_id: str,
location_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Update a monitoring location.
"""
location = db.query(MonitoringLocation).filter_by(
id=location_id,
project_id=project_id,
).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
data = await request.json()
# Update fields if provided
if "name" in data:
location.name = data["name"]
if "description" in data:
location.description = data["description"]
if "location_type" in data:
location.location_type = data["location_type"]
if "coordinates" in data:
location.coordinates = data["coordinates"]
if "address" in data:
location.address = data["address"]
if "location_metadata" in data:
location.location_metadata = data["location_metadata"]
location.updated_at = datetime.utcnow()
db.commit()
return {"success": True, "message": "Location updated successfully"}
@router.delete("/locations/{location_id}")
async def delete_location(
project_id: str,
location_id: str,
db: Session = Depends(get_db),
):
"""
Delete a monitoring location.
"""
location = db.query(MonitoringLocation).filter_by(
id=location_id,
project_id=project_id,
).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# Check if location has active assignments (active = assigned_until IS NULL)
active_assignments = db.query(UnitAssignment).filter(
and_(
UnitAssignment.location_id == location_id,
UnitAssignment.assigned_until == None,
)
).count()
if active_assignments > 0:
raise HTTPException(
status_code=400,
detail="Cannot delete location with active unit assignments. Unassign units first.",
)
db.delete(location)
db.commit()
return {"success": True, "message": "Location deleted successfully"}
# ============================================================================
# Unit Assignments
# ============================================================================
@router.get("/assignments", response_class=HTMLResponse)
async def get_project_assignments(
project_id: str,
request: Request,
db: Session = Depends(get_db),
status: Optional[str] = Query("active"),
):
"""
Get all unit assignments for a project.
Returns HTML partial with assignment list.
"""
query = db.query(UnitAssignment).filter_by(project_id=project_id)
if status:
query = query.filter_by(status=status)
assignments = query.order_by(UnitAssignment.assigned_at.desc()).all()
# Enrich with unit and location details
assignments_data = []
for assignment in assignments:
unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first()
location = db.query(MonitoringLocation).filter_by(id=assignment.location_id).first()
assignments_data.append({
"assignment": assignment,
"unit": unit,
"location": location,
})
return templates.TemplateResponse("partials/projects/assignment_list.html", {
"request": request,
"project_id": project_id,
"assignments": assignments_data,
})
@router.post("/locations/{location_id}/assign")
async def assign_unit_to_location(
project_id: str,
location_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Assign a unit to a monitoring location.
"""
location = db.query(MonitoringLocation).filter_by(
id=location_id,
project_id=project_id,
).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
form_data = await request.form()
unit_id = form_data.get("unit_id")
# Verify unit exists and matches location type
unit = db.query(RosterUnit).filter_by(id=unit_id).first()
if not unit:
raise HTTPException(status_code=404, detail="Unit not found")
# Check device type matches location type
expected_device_type = "slm" if location.location_type == "sound" else "seismograph"
if unit.device_type != expected_device_type:
raise HTTPException(
status_code=400,
detail=f"Unit type '{unit.device_type}' does not match location type '{location.location_type}'",
)
# Check if location already has an active assignment (active = assigned_until IS NULL)
existing_assignment = db.query(UnitAssignment).filter(
and_(
UnitAssignment.location_id == location_id,
UnitAssignment.assigned_until == None,
)
).first()
if existing_assignment:
raise HTTPException(
status_code=400,
detail=f"Location already has an active unit assignment ({existing_assignment.unit_id}). Use swap to replace it.",
)
# Create new assignment
assigned_until_str = form_data.get("assigned_until")
assigned_until = datetime.fromisoformat(assigned_until_str) if assigned_until_str else None
assignment = UnitAssignment(
id=str(uuid.uuid4()),
unit_id=unit_id,
location_id=location_id,
project_id=project_id,
device_type=unit.device_type,
assigned_until=assigned_until,
status="active",
notes=form_data.get("notes"),
)
db.add(assignment)
db.commit()
db.refresh(assignment)
return JSONResponse({
"success": True,
"assignment_id": assignment.id,
"message": f"Unit '{unit_id}' assigned to '{location.name}'",
})
@router.post("/assignments/{assignment_id}/unassign")
async def unassign_unit(
project_id: str,
assignment_id: str,
db: Session = Depends(get_db),
):
"""
Unassign a unit from a location.
"""
assignment = db.query(UnitAssignment).filter_by(
id=assignment_id,
project_id=project_id,
).first()
if not assignment:
raise HTTPException(status_code=404, detail="Assignment not found")
# Check if there are active monitoring sessions
active_sessions = db.query(MonitoringSession).filter(
and_(
MonitoringSession.location_id == assignment.location_id,
MonitoringSession.unit_id == assignment.unit_id,
MonitoringSession.status == "recording",
)
).count()
if active_sessions > 0:
raise HTTPException(
status_code=400,
detail="Cannot unassign unit with active monitoring sessions. Stop monitoring first.",
)
assignment.status = "completed"
assignment.assigned_until = datetime.utcnow()
db.commit()
return {"success": True, "message": "Unit unassigned successfully"}
@router.post("/locations/{location_id}/swap")
async def swap_unit_on_location(
project_id: str,
location_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Swap the unit assigned to a vibration monitoring location.
Ends the current active assignment (if any), creates a new one,
and optionally updates modem pairing on the seismograph.
Works for first-time assignments too (no current assignment = just create).
"""
location = db.query(MonitoringLocation).filter_by(
id=location_id,
project_id=project_id,
).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
form_data = await request.form()
unit_id = form_data.get("unit_id")
modem_id = form_data.get("modem_id") or None
notes = form_data.get("notes") or None
if not unit_id:
raise HTTPException(status_code=400, detail="unit_id is required")
# Validate new unit
unit = db.query(RosterUnit).filter_by(id=unit_id).first()
if not unit:
raise HTTPException(status_code=404, detail="Unit not found")
expected_device_type = "slm" if location.location_type == "sound" else "seismograph"
if unit.device_type != expected_device_type:
raise HTTPException(
status_code=400,
detail=f"Unit type '{unit.device_type}' does not match location type '{location.location_type}'",
)
# End current active assignment if one exists (active = assigned_until IS NULL)
current = db.query(UnitAssignment).filter(
and_(
UnitAssignment.location_id == location_id,
UnitAssignment.assigned_until == None,
)
).first()
if current:
current.assigned_until = datetime.utcnow()
current.status = "completed"
# Create new assignment
new_assignment = UnitAssignment(
id=str(uuid.uuid4()),
unit_id=unit_id,
location_id=location_id,
project_id=project_id,
device_type=unit.device_type,
assigned_until=None,
status="active",
notes=notes,
)
db.add(new_assignment)
# Update modem pairing on the seismograph if modem provided
if modem_id:
modem = db.query(RosterUnit).filter_by(id=modem_id, device_type="modem").first()
if not modem:
raise HTTPException(status_code=404, detail=f"Modem '{modem_id}' not found")
unit.deployed_with_modem_id = modem_id
modem.deployed_with_unit_id = unit_id
else:
# Clear modem pairing if not provided
unit.deployed_with_modem_id = None
db.commit()
return JSONResponse({
"success": True,
"assignment_id": new_assignment.id,
"message": f"Unit '{unit_id}' assigned to '{location.name}'" + (f" with modem '{modem_id}'" if modem_id else ""),
})
# ============================================================================
# Available Units for Assignment
# ============================================================================
@router.get("/available-modems", response_class=JSONResponse)
async def get_available_modems(
project_id: str,
db: Session = Depends(get_db),
):
"""
Get all deployed, non-retired modems for the modem assignment dropdown.
"""
modems = db.query(RosterUnit).filter(
and_(
RosterUnit.device_type == "modem",
RosterUnit.deployed == True,
RosterUnit.retired == False,
)
).order_by(RosterUnit.id).all()
return [
{
"id": m.id,
"hardware_model": m.hardware_model,
"ip_address": m.ip_address,
}
for m in modems
]
@router.get("/available-units", response_class=JSONResponse)
async def get_available_units(
project_id: str,
location_type: str = Query(...),
db: Session = Depends(get_db),
):
"""
Get list of available units for assignment to a location.
Filters by device type matching the location type.
"""
# Determine required device type
required_device_type = "slm" if location_type == "sound" else "seismograph"
# Get all units of the required type that are deployed and not retired
all_units = db.query(RosterUnit).filter(
and_(
RosterUnit.device_type == required_device_type,
RosterUnit.deployed == True,
RosterUnit.retired == False,
)
).all()
# Filter out units that already have active assignments (active = assigned_until IS NULL)
assigned_unit_ids = db.query(UnitAssignment.unit_id).filter(
UnitAssignment.assigned_until == None
).distinct().all()
assigned_unit_ids = [uid[0] for uid in assigned_unit_ids]
available_units = [
{
"id": unit.id,
"device_type": unit.device_type,
"location": unit.address or unit.location,
"model": unit.slm_model if unit.device_type == "slm" else unit.unit_type,
}
for unit in all_units
if unit.id not in assigned_unit_ids
]
return available_units
# ============================================================================
# NRL-specific endpoints for detail page
# ============================================================================
@router.get("/nrl/{location_id}/sessions", response_class=HTMLResponse)
async def get_nrl_sessions(
project_id: str,
location_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Get monitoring sessions for a specific NRL.
Returns HTML partial with session list.
"""
sessions = db.query(MonitoringSession).filter_by(
location_id=location_id
).order_by(MonitoringSession.started_at.desc()).all()
# Enrich with unit details
sessions_data = []
for session in sessions:
unit = None
if session.unit_id:
unit = db.query(RosterUnit).filter_by(id=session.unit_id).first()
sessions_data.append({
"session": session,
"unit": unit,
})
return templates.TemplateResponse("partials/projects/session_list.html", {
"request": request,
"project_id": project_id,
"location_id": location_id,
"sessions": sessions_data,
})
@router.get("/nrl/{location_id}/files", response_class=HTMLResponse)
async def get_nrl_files(
project_id: str,
location_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Get data files for a specific NRL.
Returns HTML partial with file list.
"""
# Join DataFile with MonitoringSession to filter by location_id
files = db.query(DataFile).join(
MonitoringSession,
DataFile.session_id == MonitoringSession.id
).filter(
MonitoringSession.location_id == location_id
).order_by(DataFile.created_at.desc()).all()
# Enrich with session details
files_data = []
for file in files:
session = None
if file.session_id:
session = db.query(MonitoringSession).filter_by(id=file.session_id).first()
files_data.append({
"file": file,
"session": session,
})
return templates.TemplateResponse("partials/projects/file_list.html", {
"request": request,
"project_id": project_id,
"location_id": location_id,
"files": files_data,
})
# ============================================================================
# Manual SD Card Data Upload
# ============================================================================
def _parse_rnh(content: bytes) -> dict:
"""
Parse a Rion .rnh metadata file (INI-style with [Section] headers).
Returns a dict of key metadata fields.
"""
result = {}
try:
text = content.decode("utf-8", errors="replace")
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("["):
continue
if "," in line:
key, _, value = line.partition(",")
key = key.strip()
value = value.strip()
if key == "Serial Number":
result["serial_number"] = value
elif key == "Store Name":
result["store_name"] = value
elif key == "Index Number":
result["index_number"] = value
elif key == "Measurement Start Time":
result["start_time_str"] = value
elif key == "Measurement Stop Time":
result["stop_time_str"] = value
elif key == "Total Measurement Time":
result["total_time_str"] = value
except Exception:
pass
return result
def _parse_rnh_datetime(s: str):
"""Parse RNH datetime string: '2026/02/17 19:00:19' -> datetime"""
from datetime import datetime
if not s:
return None
try:
return datetime.strptime(s.strip(), "%Y/%m/%d %H:%M:%S")
except Exception:
return None
def _classify_file(filename: str) -> str:
"""Classify a file by name into a DataFile file_type."""
name = filename.lower()
if name.endswith(".rnh"):
return "log"
if name.endswith(".rnd"):
return "measurement"
if name.endswith(".zip"):
return "archive"
return "data"
@router.post("/nrl/{location_id}/upload-data")
async def upload_nrl_data(
project_id: str,
location_id: str,
db: Session = Depends(get_db),
files: list[UploadFile] = File(...),
):
"""
Manually upload SD card data for an offline NRL.
Accepts either:
- A single .zip file (the Auto_#### folder zipped) — auto-extracted
- Multiple .rnd / .rnh files selected directly from the SD card folder
Creates a MonitoringSession from .rnh metadata and DataFile records
for each measurement file. No unit assignment required.
"""
from datetime import datetime
# Verify project and location exist
project = db.query(Project).filter_by(id=project_id).first()
_require_sound_project(project)
location = db.query(MonitoringLocation).filter_by(
id=location_id, project_id=project_id
).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# --- Step 1: Normalize to (filename, bytes) list ---
file_entries: list[tuple[str, bytes]] = []
if len(files) == 1 and files[0].filename.lower().endswith(".zip"):
raw = await files[0].read()
try:
with zipfile.ZipFile(io.BytesIO(raw)) as zf:
for info in zf.infolist():
if info.is_dir():
continue
name = Path(info.filename).name # strip folder path
if not name:
continue
file_entries.append((name, zf.read(info)))
except zipfile.BadZipFile:
raise HTTPException(status_code=400, detail="Uploaded file is not a valid ZIP archive.")
else:
for uf in files:
data = await uf.read()
file_entries.append((uf.filename, data))
if not file_entries:
raise HTTPException(status_code=400, detail="No usable files found in upload.")
# --- Step 1b: Filter to only relevant files ---
# Keep: .rnh (metadata) and measurement .rnd files
# NL-43 generates two .rnd types: _Leq_ (15-min averages, wanted) and _Lp_ (1-sec granular, skip)
# AU2 (NL-23/older Rion) generates a single Au2_####.rnd per session — always keep those
# Drop: _Lp_ .rnd, .xlsx, .mp3, and anything else
def _is_wanted(fname: str) -> bool:
n = fname.lower()
if n.endswith(".rnh"):
return True
if n.endswith(".rnd"):
if "_leq_" in n: # NL-43 Leq file
return True
if n.startswith("au2_"): # AU2 format (NL-23) — always Leq equivalent
return True
if "_lp" not in n and "_leq_" not in n:
# Unknown .rnd format — include it so we don't silently drop data
return True
return False
file_entries = [(fname, fbytes) for fname, fbytes in file_entries if _is_wanted(fname)]
if not file_entries:
raise HTTPException(status_code=400, detail="No usable .rnd or .rnh files found. Expected NL-43 _Leq_ files or AU2 format .rnd files.")
# --- Step 2: Find and parse .rnh metadata ---
rnh_meta = {}
for fname, fbytes in file_entries:
if fname.lower().endswith(".rnh"):
rnh_meta = _parse_rnh(fbytes)
break
started_at = _parse_rnh_datetime(rnh_meta.get("start_time_str")) or datetime.utcnow()
stopped_at = _parse_rnh_datetime(rnh_meta.get("stop_time_str"))
duration_seconds = None
if started_at and stopped_at:
duration_seconds = int((stopped_at - started_at).total_seconds())
store_name = rnh_meta.get("store_name", "")
serial_number = rnh_meta.get("serial_number", "")
index_number = rnh_meta.get("index_number", "")
# --- Step 3: Create MonitoringSession ---
period_type = _derive_period_type(started_at) if started_at else None
session_label = _build_session_label(started_at, location.name, period_type) if started_at else None
session_id = str(uuid.uuid4())
monitoring_session = MonitoringSession(
id=session_id,
project_id=project_id,
location_id=location_id,
unit_id=None,
session_type="sound",
started_at=started_at,
stopped_at=stopped_at,
duration_seconds=duration_seconds,
status="completed",
session_label=session_label,
period_type=period_type,
session_metadata=json.dumps({
"source": "manual_upload",
"store_name": store_name,
"serial_number": serial_number,
"index_number": index_number,
}),
)
db.add(monitoring_session)
db.commit()
db.refresh(monitoring_session)
# --- Step 4: Write files to disk and create DataFile records ---
output_dir = Path("data/Projects") / project_id / session_id
output_dir.mkdir(parents=True, exist_ok=True)
leq_count = 0
lp_count = 0
metadata_count = 0
files_imported = 0
for fname, fbytes in file_entries:
file_type = _classify_file(fname)
fname_lower = fname.lower()
# Track counts for summary
if fname_lower.endswith(".rnd"):
if "_leq_" in fname_lower:
leq_count += 1
elif "_lp" in fname_lower:
lp_count += 1
elif fname_lower.endswith(".rnh"):
metadata_count += 1
# Write to disk
dest = output_dir / fname
dest.write_bytes(fbytes)
# Compute checksum
checksum = hashlib.sha256(fbytes).hexdigest()
# Store relative path from data/ dir
rel_path = str(dest.relative_to("data"))
data_file = DataFile(
id=str(uuid.uuid4()),
session_id=session_id,
file_path=rel_path,
file_type=file_type,
file_size_bytes=len(fbytes),
downloaded_at=datetime.utcnow(),
checksum=checksum,
file_metadata=json.dumps({
"source": "manual_upload",
"original_filename": fname,
"store_name": store_name,
}),
)
db.add(data_file)
files_imported += 1
db.commit()
return {
"success": True,
"session_id": session_id,
"files_imported": files_imported,
"leq_files": leq_count,
"lp_files": lp_count,
"metadata_files": metadata_count,
"store_name": store_name,
"started_at": started_at.isoformat() if started_at else None,
"stopped_at": stopped_at.isoformat() if stopped_at else None,
}
# ============================================================================
# NRL Live Status (connected NRLs only)
# ============================================================================
@router.get("/nrl/{location_id}/live-status", response_class=HTMLResponse)
async def get_nrl_live_status(
project_id: str,
location_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Fetch cached status from SLMM for the unit assigned to this NRL and
return a compact HTML status card. Used in the NRL overview tab for
connected NRLs. Gracefully shows an offline message if SLMM is unreachable.
Sound Monitoring projects only.
"""
import os
import httpx
_require_sound_project(db.query(Project).filter_by(id=project_id).first())
# Find the assigned unit (active = assigned_until IS NULL)
assignment = db.query(UnitAssignment).filter(
and_(
UnitAssignment.location_id == location_id,
UnitAssignment.assigned_until == None,
)
).first()
if not assignment:
return templates.TemplateResponse("partials/projects/nrl_live_status.html", {
"request": request,
"status": None,
"error": "No unit assigned",
})
unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first()
if not unit:
return templates.TemplateResponse("partials/projects/nrl_live_status.html", {
"request": request,
"status": None,
"error": "Assigned unit not found",
})
slmm_base = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
status_data = None
error_msg = None
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(f"{slmm_base}/api/nl43/{unit.id}/status")
if resp.status_code == 200:
status_data = resp.json()
else:
error_msg = f"SLMM returned {resp.status_code}"
except Exception as e:
error_msg = "SLMM unreachable"
return templates.TemplateResponse("partials/projects/nrl_live_status.html", {
"request": request,
"unit": unit,
"status": status_data,
"error": error_msg,
})