Files
terra-view/backend/routers/projects.py
serversdown ef8c046f31 feat: add slm model schemas, please run migration on prod db
Feat: add complete combined sound report creation tool (wizard), add new slm schema for each model

feat: update project header link for combined report wizard

feat: add migration script to backfill device_model in monitoring_sessions

feat: implement combined report preview template with spreadsheet functionality

feat: create combined report wizard template for report generation.
2026-03-05 20:43:22 +00:00

3719 lines
134 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Projects Router
Provides API endpoints for the Projects system:
- Project CRUD operations
- Project dashboards
- Project statistics
- Type-aware features
"""
from fastapi import APIRouter, Request, Depends, HTTPException, Query
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from sqlalchemy.orm import Session
from sqlalchemy import func, and_, or_
from datetime import datetime, timedelta
from typing import Optional
from collections import OrderedDict
import uuid
import json
import logging
import io
from backend.utils.timezone import utc_to_local, format_local_datetime
from backend.database import get_db
from fastapi import UploadFile, File
import zipfile
import hashlib
import pathlib as _pathlib
from backend.models import (
Project,
ProjectType,
MonitoringLocation,
UnitAssignment,
MonitoringSession,
DataFile,
ScheduledAction,
RecurringSchedule,
RosterUnit,
)
from backend.templates_config import templates
router = APIRouter(prefix="/api/projects", tags=["projects"])
logger = logging.getLogger(__name__)
# ============================================================================
# RND file normalization — maps AU2 (older Rion) column names to the NL-43
# equivalents so report generation and the web viewer work for both formats.
# AU2 files: LAeq, LAmax, LAmin, LA01, LA10, LA50, LA90, LA95, LCpeak
# NL-43 files: Leq(Main), Lmax(Main), Lmin(Main), LN1(Main) … Lpeak(Main)
# ============================================================================
_AU2_TO_NL43 = {
"LAeq": "Leq(Main)",
"LAmax": "Lmax(Main)",
"LAmin": "Lmin(Main)",
"LCpeak": "Lpeak(Main)",
"LA01": "LN1(Main)",
"LA10": "LN2(Main)",
"LA50": "LN3(Main)",
"LA90": "LN4(Main)",
"LA95": "LN5(Main)",
# Time column differs too
"Time": "Start Time",
}
def _normalize_rnd_rows(rows: list[dict]) -> tuple[list[dict], bool]:
"""
Detect AU2-format RND rows (by presence of 'LAeq' key) and remap column
names to NL-43 equivalents. Returns (normalized_rows, was_au2_format).
If already NL-43 format the rows are returned unchanged.
"""
if not rows:
return rows, False
if "LAeq" not in rows[0]:
return rows, False # already NL-43 format
normalized = []
for row in rows:
new_row = {}
for k, v in row.items():
new_row[_AU2_TO_NL43.get(k, k)] = v
normalized.append(new_row)
return normalized, True
def _peek_rnd_headers(file_path) -> list[dict]:
"""Read just the first data row of an RND file to check column names cheaply."""
import csv as _csv
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
reader = _csv.DictReader(f)
row = next(reader, None)
return [row] if row else []
except Exception:
return []
def _is_leq_file(file_path: str, rows: list[dict]) -> bool:
"""
Return True if this RND file contains Leq (15-min averaged) data.
Accepts NL-43 Leq files (_Leq_ in path) and AU2 files (LAeq column or
Leq(Main) column after normalisation).
"""
if "_Leq_" in file_path:
return True
if rows and ("LAeq" in rows[0] or "Leq(Main)" in rows[0]):
return True
return False
def _filter_rnd_rows(
rows: list[dict],
filter_start_time: str,
filter_end_time: str,
filter_start_date: str,
filter_end_date: str,
) -> list[dict]:
"""Filter RND data rows by time window and/or date range. Handles overnight ranges."""
if not filter_start_time and not filter_end_time and not filter_start_date and not filter_end_date:
return rows
filtered = []
start_hour = start_minute = end_hour = end_minute = None
if filter_start_time:
try:
parts = filter_start_time.split(':')
start_hour = int(parts[0])
start_minute = int(parts[1]) if len(parts) > 1 else 0
except (ValueError, IndexError):
pass
if filter_end_time:
try:
parts = filter_end_time.split(':')
end_hour = int(parts[0])
end_minute = int(parts[1]) if len(parts) > 1 else 0
except (ValueError, IndexError):
pass
start_dt = end_dt = None
if filter_start_date:
try:
start_dt = datetime.strptime(filter_start_date, '%Y-%m-%d').date()
except ValueError:
pass
if filter_end_date:
try:
end_dt = datetime.strptime(filter_end_date, '%Y-%m-%d').date()
except ValueError:
pass
for row in rows:
start_time_str = row.get('Start Time', '')
if not start_time_str:
continue
try:
dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S')
row_date = dt.date()
row_hour = dt.hour
row_minute = dt.minute
if start_dt and row_date < start_dt:
continue
if end_dt and row_date > end_dt:
continue
if start_hour is not None and end_hour is not None:
row_time_minutes = row_hour * 60 + row_minute
start_time_minutes = start_hour * 60 + start_minute
end_time_minutes = end_hour * 60 + end_minute
if start_time_minutes > end_time_minutes:
# Overnight range (e.g., 19:00-07:00)
if not (row_time_minutes >= start_time_minutes or row_time_minutes < end_time_minutes):
continue
else:
# Same-day range (e.g., 07:00-19:00)
if not (start_time_minutes <= row_time_minutes < end_time_minutes):
continue
filtered.append(row)
except ValueError:
filtered.append(row)
return filtered
def _read_rnd_file_rows(file_path_str: str) -> list[dict]:
"""Read and parse a single RND CSV file into a list of cleaned row dicts."""
import csv as _csv
from pathlib import Path as _Path
file_path = _Path("data") / file_path_str
if not file_path.exists():
return []
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
rows = []
reader = _csv.DictReader(io.StringIO(content))
for row in reader:
cleaned_row = {}
for key, value in row.items():
if key:
cleaned_key = key.strip()
cleaned_value = value.strip() if value else ''
if cleaned_value and cleaned_value not in ['-.-', '-', '']:
try:
cleaned_value = float(cleaned_value)
except ValueError:
pass
elif cleaned_value in ['-.-', '-']:
cleaned_value = None
cleaned_row[cleaned_key] = cleaned_value
rows.append(cleaned_row)
return rows
except Exception:
return []
def _build_combined_location_data(
project_id: str,
db,
start_time: str = "",
end_time: str = "",
start_date: str = "",
end_date: str = "",
enabled_locations: list = None,
) -> dict:
"""
Read all Leq RND files for a project, apply time/date filters, and return
per-location spreadsheet data ready for the wizard preview.
Returns:
{
"project": Project,
"location_data": [
{
"location_name": str,
"raw_count": int,
"filtered_count": int,
"spreadsheet_data": [[idx, date, time, lmax, ln1, ln2, ""], ...]
},
...
]
}
Raises HTTPException 404 if project not found or no Leq files exist.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
sessions = db.query(MonitoringSession).filter_by(project_id=project_id).all()
# Group Leq files by location
location_files: dict = {}
for session in sessions:
files = db.query(DataFile).filter_by(session_id=session.id).all()
for file in files:
if not file.file_path or not file.file_path.lower().endswith('.rnd'):
continue
from pathlib import Path as _Path
abs_path = _Path("data") / file.file_path
peek = _peek_rnd_headers(abs_path)
if not _is_leq_file(file.file_path, peek):
continue
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None
loc_name = location.name if location else f"Session {session.id[:8]}"
if loc_name not in location_files:
location_files[loc_name] = []
location_files[loc_name].append(file)
if not location_files:
raise HTTPException(status_code=404, detail="No Leq measurement files found in project.")
# Filter by enabled_locations if specified
if enabled_locations:
location_files = {k: v for k, v in location_files.items() if k in enabled_locations}
if not location_files:
raise HTTPException(status_code=404, detail="None of the selected locations have Leq files.")
location_data = []
for loc_name, files in sorted(location_files.items()):
all_rows = []
for file in files:
rows = _read_rnd_file_rows(file.file_path)
rows, _ = _normalize_rnd_rows(rows)
all_rows.extend(rows)
if not all_rows:
continue
all_rows.sort(key=lambda r: r.get('Start Time', ''))
raw_count = len(all_rows)
filtered_rows = _filter_rnd_rows(all_rows, start_time, end_time, start_date, end_date)
spreadsheet_data = []
for idx, row in enumerate(filtered_rows, 1):
start_time_str = row.get('Start Time', '')
date_str = time_str = ''
if start_time_str:
try:
dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S')
date_str = dt.strftime('%Y-%m-%d')
time_str = dt.strftime('%H:%M:%S')
except ValueError:
date_str = start_time_str
lmax = row.get('Lmax(Main)', '')
ln1 = row.get('LN1(Main)', '')
ln2 = row.get('LN2(Main)', '')
spreadsheet_data.append([
idx,
date_str,
time_str,
lmax if lmax else '',
ln1 if ln1 else '',
ln2 if ln2 else '',
'',
])
location_data.append({
"location_name": loc_name,
"raw_count": raw_count,
"filtered_count": len(filtered_rows),
"spreadsheet_data": spreadsheet_data,
})
return {"project": project, "location_data": location_data}
# ============================================================================
# Project List & Overview
# ============================================================================
@router.get("/list", response_class=HTMLResponse)
async def get_projects_list(
request: Request,
db: Session = Depends(get_db),
status: Optional[str] = Query(None),
project_type_id: Optional[str] = Query(None),
view: Optional[str] = Query(None),
):
"""
Get list of all projects.
Returns HTML partial with project cards.
"""
query = db.query(Project)
# Filter by status if provided; otherwise exclude soft-deleted projects
if status:
query = query.filter(Project.status == status)
else:
query = query.filter(Project.status != "deleted")
# Filter by project type if provided
if project_type_id:
query = query.filter(Project.project_type_id == project_type_id)
projects = query.order_by(Project.created_at.desc()).all()
# Enrich each project with stats
projects_data = []
for project in projects:
# Get project type
project_type = db.query(ProjectType).filter_by(id=project.project_type_id).first()
# Count locations
location_count = db.query(func.count(MonitoringLocation.id)).filter_by(
project_id=project.id
).scalar()
# Count assigned units
unit_count = db.query(func.count(UnitAssignment.id)).filter(
and_(
UnitAssignment.project_id == project.id,
UnitAssignment.status == "active",
)
).scalar()
# Count active sessions
active_session_count = db.query(func.count(MonitoringSession.id)).filter(
and_(
MonitoringSession.project_id == project.id,
MonitoringSession.status == "recording",
)
).scalar()
projects_data.append({
"project": project,
"project_type": project_type,
"location_count": location_count,
"unit_count": unit_count,
"active_session_count": active_session_count,
})
template_name = "partials/projects/project_list.html"
if view == "compact":
template_name = "partials/projects/project_list_compact.html"
return templates.TemplateResponse(template_name, {
"request": request,
"projects": projects_data,
})
@router.get("/stats", response_class=HTMLResponse)
async def get_projects_stats(request: Request, db: Session = Depends(get_db)):
"""
Get summary statistics for projects overview.
Returns HTML partial with stat cards.
"""
# Count projects by status (exclude deleted)
total_projects = db.query(func.count(Project.id)).filter(Project.status != "deleted").scalar()
active_projects = db.query(func.count(Project.id)).filter_by(status="active").scalar()
on_hold_projects = db.query(func.count(Project.id)).filter_by(status="on_hold").scalar()
completed_projects = db.query(func.count(Project.id)).filter_by(status="completed").scalar()
# Count total locations across all projects
total_locations = db.query(func.count(MonitoringLocation.id)).scalar()
# Count assigned units
assigned_units = db.query(func.count(UnitAssignment.id)).filter_by(
status="active"
).scalar()
# Count active recording sessions
active_sessions = db.query(func.count(MonitoringSession.id)).filter_by(
status="recording"
).scalar()
return templates.TemplateResponse("partials/projects/project_stats.html", {
"request": request,
"total_projects": total_projects,
"active_projects": active_projects,
"on_hold_projects": on_hold_projects,
"completed_projects": completed_projects,
"total_locations": total_locations,
"assigned_units": assigned_units,
"active_sessions": active_sessions,
})
# ============================================================================
# Project Search (Smart Autocomplete)
# ============================================================================
def _build_project_display(project: Project) -> str:
"""Build display string from project fields: 'xxxx-YY - Client - Name'"""
parts = []
if project.project_number:
parts.append(project.project_number)
if project.client_name:
parts.append(project.client_name)
if project.name:
parts.append(project.name)
return " - ".join(parts) if parts else project.id
@router.get("/search", response_class=HTMLResponse)
async def search_projects(
request: Request,
q: str = Query("", description="Search term"),
db: Session = Depends(get_db),
limit: int = Query(10, le=50),
):
"""
Fuzzy search across project fields for autocomplete.
Searches: project_number, client_name, name (project/site name)
Returns HTML partial for HTMX dropdown.
"""
if not q.strip():
# Return recent active projects when no search term
projects = db.query(Project).filter(
Project.status.notin_(["archived", "deleted"])
).order_by(Project.updated_at.desc()).limit(limit).all()
else:
search_term = f"%{q}%"
projects = db.query(Project).filter(
and_(
Project.status.notin_(["archived", "deleted"]),
or_(
Project.project_number.ilike(search_term),
Project.client_name.ilike(search_term),
Project.name.ilike(search_term),
)
)
).order_by(Project.updated_at.desc()).limit(limit).all()
# Build display data for each project
projects_data = [{
"id": p.id,
"project_number": p.project_number,
"client_name": p.client_name,
"name": p.name,
"display": _build_project_display(p),
"status": p.status,
} for p in projects]
return templates.TemplateResponse("partials/project_search_results.html", {
"request": request,
"projects": projects_data,
"query": q,
"show_create": len(projects) == 0 and q.strip(),
})
@router.get("/search-json")
async def search_projects_json(
q: str = Query("", description="Search term"),
db: Session = Depends(get_db),
limit: int = Query(10, le=50),
):
"""
Fuzzy search across project fields - JSON response.
For programmatic/API consumption.
"""
if not q.strip():
projects = db.query(Project).filter(
Project.status.notin_(["archived", "deleted"])
).order_by(Project.updated_at.desc()).limit(limit).all()
else:
search_term = f"%{q}%"
projects = db.query(Project).filter(
and_(
Project.status.notin_(["archived", "deleted"]),
or_(
Project.project_number.ilike(search_term),
Project.client_name.ilike(search_term),
Project.name.ilike(search_term),
)
)
).order_by(Project.updated_at.desc()).limit(limit).all()
return [{
"id": p.id,
"project_number": p.project_number,
"client_name": p.client_name,
"name": p.name,
"display": _build_project_display(p),
"status": p.status,
} for p in projects]
# ============================================================================
# Project CRUD
# ============================================================================
@router.post("/create")
async def create_project(request: Request, db: Session = Depends(get_db)):
"""
Create a new project.
Expects form data with project details.
"""
form_data = await request.form()
project = Project(
id=str(uuid.uuid4()),
project_number=form_data.get("project_number"), # TMI ID: xxxx-YY format
name=form_data.get("name"),
description=form_data.get("description"),
project_type_id=form_data.get("project_type_id"),
status="active",
client_name=form_data.get("client_name"),
site_address=form_data.get("site_address"),
site_coordinates=form_data.get("site_coordinates"),
start_date=datetime.fromisoformat(form_data.get("start_date")) if form_data.get("start_date") else None,
end_date=datetime.fromisoformat(form_data.get("end_date")) if form_data.get("end_date") else None,
)
db.add(project)
db.commit()
db.refresh(project)
return JSONResponse({
"success": True,
"project_id": project.id,
"message": f"Project '{project.name}' created successfully",
})
@router.get("/{project_id}")
async def get_project(project_id: str, db: Session = Depends(get_db)):
"""
Get project details by ID.
Returns JSON with full project data.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
project_type = db.query(ProjectType).filter_by(id=project.project_type_id).first()
return {
"id": project.id,
"project_number": project.project_number,
"name": project.name,
"description": project.description,
"project_type_id": project.project_type_id,
"project_type_name": project_type.name if project_type else None,
"status": project.status,
"client_name": project.client_name,
"site_address": project.site_address,
"site_coordinates": project.site_coordinates,
"start_date": project.start_date.isoformat() if project.start_date else None,
"end_date": project.end_date.isoformat() if project.end_date else None,
"created_at": project.created_at.isoformat(),
"updated_at": project.updated_at.isoformat(),
}
@router.put("/{project_id}")
async def update_project(
project_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Update project details.
Expects JSON body with fields to update.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
data = await request.json()
# Update fields if provided
if "name" in data:
project.name = data["name"]
if "description" in data:
project.description = data["description"]
if "status" in data:
project.status = data["status"]
# Cancel pending scheduled actions when archiving
if data["status"] == "archived":
db.query(ScheduledAction).filter(
and_(
ScheduledAction.project_id == project_id,
ScheduledAction.execution_status == "pending",
)
).update({"execution_status": "cancelled"})
if "client_name" in data:
project.client_name = data["client_name"]
if "site_address" in data:
project.site_address = data["site_address"]
if "site_coordinates" in data:
project.site_coordinates = data["site_coordinates"]
if "start_date" in data:
project.start_date = datetime.fromisoformat(data["start_date"]) if data["start_date"] else None
if "end_date" in data:
project.end_date = datetime.fromisoformat(data["end_date"]) if data["end_date"] else None
project.updated_at = datetime.utcnow()
db.commit()
return {"success": True, "message": "Project updated successfully"}
@router.delete("/{project_id}")
async def delete_project(project_id: str, db: Session = Depends(get_db)):
"""
Soft-delete a project. Sets status='deleted' and records deleted_at timestamp.
Data will be permanently removed after 60 days (or via /permanent endpoint).
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
project.status = "deleted"
project.deleted_at = datetime.utcnow()
project.updated_at = datetime.utcnow()
# Cancel all pending scheduled actions
db.query(ScheduledAction).filter(
and_(
ScheduledAction.project_id == project_id,
ScheduledAction.execution_status == "pending",
)
).update({"execution_status": "cancelled"})
db.commit()
return {"success": True, "message": "Project deleted. Data will be permanently removed after 60 days."}
@router.delete("/{project_id}/permanent")
async def permanently_delete_project(project_id: str, db: Session = Depends(get_db)):
"""
Hard-delete a project and all related data. Only allowed when status='deleted'.
Removes: locations, assignments, sessions, scheduled actions, recurring schedules.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
if project.status != "deleted":
raise HTTPException(status_code=400, detail="Project must be soft-deleted before permanent deletion.")
# Delete related data
db.query(RecurringSchedule).filter_by(project_id=project_id).delete()
db.query(ScheduledAction).filter_by(project_id=project_id).delete()
db.query(MonitoringSession).filter_by(project_id=project_id).delete()
db.query(UnitAssignment).filter_by(project_id=project_id).delete()
db.query(MonitoringLocation).filter_by(project_id=project_id).delete()
db.delete(project)
db.commit()
return {"success": True, "message": "Project permanently deleted."}
@router.post("/{project_id}/hold")
async def hold_project(project_id: str, db: Session = Depends(get_db)):
"""
Put a project on hold. Pauses without archiving; assignments and schedules remain.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
project.status = "on_hold"
project.updated_at = datetime.utcnow()
# Cancel pending scheduled actions so they don't appear in dashboards or fire
db.query(ScheduledAction).filter(
and_(
ScheduledAction.project_id == project_id,
ScheduledAction.execution_status == "pending",
)
).update({"execution_status": "cancelled"})
db.commit()
return {"success": True, "message": "Project put on hold."}
@router.post("/{project_id}/unhold")
async def unhold_project(project_id: str, db: Session = Depends(get_db)):
"""
Resume a project that was on hold.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
project.status = "active"
project.updated_at = datetime.utcnow()
db.commit()
return {"success": True, "message": "Project resumed."}
# ============================================================================
# Project Dashboard Data
# ============================================================================
@router.get("/{project_id}/dashboard", response_class=HTMLResponse)
async def get_project_dashboard(
project_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Get project dashboard data.
Returns HTML partial with project summary.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
project_type = db.query(ProjectType).filter_by(id=project.project_type_id).first()
# Get locations
locations = db.query(MonitoringLocation).filter_by(project_id=project_id).all()
# Get assigned units with details
assignments = db.query(UnitAssignment).filter(
and_(
UnitAssignment.project_id == project_id,
UnitAssignment.status == "active",
)
).all()
assigned_units = []
for assignment in assignments:
unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first()
if unit:
assigned_units.append({
"assignment": assignment,
"unit": unit,
})
# Get active recording sessions
active_sessions = db.query(MonitoringSession).filter(
and_(
MonitoringSession.project_id == project_id,
MonitoringSession.status == "recording",
)
).all()
# Get completed sessions count
completed_sessions_count = db.query(func.count(MonitoringSession.id)).filter(
and_(
MonitoringSession.project_id == project_id,
MonitoringSession.status == "completed",
)
).scalar()
# Get upcoming scheduled actions
upcoming_actions = db.query(ScheduledAction).filter(
and_(
ScheduledAction.project_id == project_id,
ScheduledAction.execution_status == "pending",
ScheduledAction.scheduled_time > datetime.utcnow(),
)
).order_by(ScheduledAction.scheduled_time).limit(5).all()
return templates.TemplateResponse("partials/projects/project_dashboard.html", {
"request": request,
"project": project,
"project_type": project_type,
"locations": locations,
"assigned_units": assigned_units,
"active_sessions": active_sessions,
"completed_sessions_count": completed_sessions_count,
"upcoming_actions": upcoming_actions,
})
# ============================================================================
# Project Types
# ============================================================================
@router.get("/{project_id}/header", response_class=HTMLResponse)
async def get_project_header(
project_id: str,
request: Request,
db: Session = Depends(get_db)
):
"""
Get project header information for dynamic display.
Returns HTML partial with project name, status, and type.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
project_type = db.query(ProjectType).filter_by(id=project.project_type_id).first()
return templates.TemplateResponse("partials/projects/project_header.html", {
"request": request,
"project": project,
"project_type": project_type,
})
@router.get("/{project_id}/units", response_class=HTMLResponse)
async def get_project_units(
project_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Get all units assigned to this project's locations.
Returns HTML partial with unit list.
"""
from backend.models import DataFile
# Get all assignments for this project
assignments = db.query(UnitAssignment).filter(
and_(
UnitAssignment.project_id == project_id,
UnitAssignment.status == "active",
)
).all()
# Enrich with unit and location details
units_data = []
for assignment in assignments:
unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first()
location = db.query(MonitoringLocation).filter_by(id=assignment.location_id).first()
# Count sessions for this assignment
session_count = db.query(func.count(MonitoringSession.id)).filter_by(
location_id=assignment.location_id,
unit_id=assignment.unit_id,
).scalar()
# Count files from sessions
file_count = db.query(func.count(DataFile.id)).join(
MonitoringSession,
DataFile.session_id == MonitoringSession.id
).filter(
MonitoringSession.location_id == assignment.location_id,
MonitoringSession.unit_id == assignment.unit_id,
).scalar()
# Check if currently recording
active_session = db.query(MonitoringSession).filter(
and_(
MonitoringSession.location_id == assignment.location_id,
MonitoringSession.unit_id == assignment.unit_id,
MonitoringSession.status == "recording",
)
).first()
units_data.append({
"assignment": assignment,
"unit": unit,
"location": location,
"session_count": session_count,
"file_count": file_count,
"active_session": active_session,
})
# Get project type for label context
project = db.query(Project).filter_by(id=project_id).first()
project_type = db.query(ProjectType).filter_by(id=project.project_type_id).first() if project else None
return templates.TemplateResponse("partials/projects/unit_list.html", {
"request": request,
"project_id": project_id,
"units": units_data,
"project_type": project_type,
})
@router.get("/{project_id}/schedules", response_class=HTMLResponse)
async def get_project_schedules(
project_id: str,
request: Request,
db: Session = Depends(get_db),
status: Optional[str] = Query(None),
):
"""
Get scheduled actions for this project.
Returns HTML partial with schedule list.
Optional status filter: pending, completed, failed, cancelled
"""
query = db.query(ScheduledAction).filter_by(project_id=project_id)
# Filter by status if provided
if status:
query = query.filter(ScheduledAction.execution_status == status)
# For pending actions, show soonest first (ascending)
# For completed/failed, show most recent first (descending)
if status == "pending":
schedules = query.order_by(ScheduledAction.scheduled_time.asc()).all()
else:
schedules = query.order_by(ScheduledAction.scheduled_time.desc()).all()
# Enrich with location details and group by date
schedules_by_date = OrderedDict()
for schedule in schedules:
location = None
if schedule.location_id:
location = db.query(MonitoringLocation).filter_by(id=schedule.location_id).first()
# Get local date for grouping
if schedule.scheduled_time:
local_dt = utc_to_local(schedule.scheduled_time)
date_key = local_dt.strftime("%Y-%m-%d")
date_display = local_dt.strftime("%A, %B %d, %Y") # "Wednesday, January 22, 2026"
else:
date_key = "unknown"
date_display = "Unknown Date"
if date_key not in schedules_by_date:
schedules_by_date[date_key] = {
"date_display": date_display,
"date_key": date_key,
"actions": [],
}
# Parse module_response for display
result_data = None
if schedule.module_response:
try:
result_data = json.loads(schedule.module_response)
except json.JSONDecodeError:
pass
schedules_by_date[date_key]["actions"].append({
"schedule": schedule,
"location": location,
"result": result_data,
})
project = db.query(Project).filter_by(id=project_id).first()
project_status = project.status if project else "active"
return templates.TemplateResponse("partials/projects/schedule_list.html", {
"request": request,
"project_id": project_id,
"schedules_by_date": schedules_by_date,
"project_status": project_status,
})
@router.post("/{project_id}/schedules/{schedule_id}/execute")
async def execute_scheduled_action(
project_id: str,
schedule_id: str,
db: Session = Depends(get_db),
):
"""
Manually execute a scheduled action now.
"""
from backend.services.scheduler import get_scheduler
action = db.query(ScheduledAction).filter_by(
id=schedule_id,
project_id=project_id,
).first()
if not action:
raise HTTPException(status_code=404, detail="Action not found")
if action.execution_status != "pending":
raise HTTPException(
status_code=400,
detail=f"Action is not pending (status: {action.execution_status})",
)
# Execute via scheduler service
scheduler = get_scheduler()
result = await scheduler.execute_action_by_id(schedule_id)
# Refresh from DB to get updated status
db.refresh(action)
return JSONResponse({
"success": result.get("success", False),
"message": f"Action executed: {action.action_type}",
"result": result,
"action": {
"id": action.id,
"execution_status": action.execution_status,
"executed_at": action.executed_at.isoformat() if action.executed_at else None,
"error_message": action.error_message,
},
})
@router.post("/{project_id}/schedules/{schedule_id}/cancel")
async def cancel_scheduled_action(
project_id: str,
schedule_id: str,
db: Session = Depends(get_db),
):
"""
Cancel a pending scheduled action.
"""
action = db.query(ScheduledAction).filter_by(
id=schedule_id,
project_id=project_id,
).first()
if not action:
raise HTTPException(status_code=404, detail="Action not found")
if action.execution_status != "pending":
raise HTTPException(
status_code=400,
detail=f"Can only cancel pending actions (status: {action.execution_status})",
)
action.execution_status = "cancelled"
db.commit()
return JSONResponse({
"success": True,
"message": "Action cancelled successfully",
})
@router.get("/{project_id}/sessions", response_class=HTMLResponse)
async def get_project_sessions(
project_id: str,
request: Request,
db: Session = Depends(get_db),
status: Optional[str] = Query(None),
):
"""
Get all recording sessions for this project.
Returns HTML partial with session list.
Optional status filter: recording, completed, paused, failed
"""
query = db.query(MonitoringSession).filter_by(project_id=project_id)
# Filter by status if provided
if status:
query = query.filter(MonitoringSession.status == status)
sessions = query.order_by(MonitoringSession.started_at.desc()).all()
# Enrich with unit and location details
sessions_data = []
for session in sessions:
unit = None
location = None
if session.unit_id:
unit = db.query(RosterUnit).filter_by(id=session.unit_id).first()
if session.location_id:
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first()
sessions_data.append({
"session": session,
"unit": unit,
"location": location,
})
return templates.TemplateResponse("partials/projects/session_list.html", {
"request": request,
"project_id": project_id,
"sessions": sessions_data,
})
@router.get("/{project_id}/ftp-browser", response_class=HTMLResponse)
async def get_ftp_browser(
project_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Get FTP browser interface for downloading files from assigned SLMs.
Returns HTML partial with FTP browser.
"""
from backend.models import DataFile
# Get all assignments for this project
assignments = db.query(UnitAssignment).filter(
and_(
UnitAssignment.project_id == project_id,
UnitAssignment.status == "active",
)
).all()
# Enrich with unit and location details
units_data = []
for assignment in assignments:
unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first()
location = db.query(MonitoringLocation).filter_by(id=assignment.location_id).first()
# Only include SLM units
if unit and unit.device_type == "slm":
units_data.append({
"assignment": assignment,
"unit": unit,
"location": location,
})
return templates.TemplateResponse("partials/projects/ftp_browser.html", {
"request": request,
"project_id": project_id,
"units": units_data,
})
@router.post("/{project_id}/ftp-download-to-server")
async def ftp_download_to_server(
project_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Download a file from an SLM to the server via FTP.
Creates a DataFile record and stores the file in data/Projects/{project_id}/
"""
import httpx
import os
import hashlib
from pathlib import Path
from backend.models import DataFile
data = await request.json()
unit_id = data.get("unit_id")
remote_path = data.get("remote_path")
location_id = data.get("location_id")
if not unit_id or not remote_path:
raise HTTPException(status_code=400, detail="Missing unit_id or remote_path")
# Get or create active session for this location/unit
session = db.query(MonitoringSession).filter(
and_(
MonitoringSession.project_id == project_id,
MonitoringSession.location_id == location_id,
MonitoringSession.unit_id == unit_id,
MonitoringSession.status.in_(["recording", "paused"])
)
).first()
# If no active session, create one
if not session:
_ftp_unit = db.query(RosterUnit).filter_by(id=unit_id).first()
session = MonitoringSession(
id=str(uuid.uuid4()),
project_id=project_id,
location_id=location_id,
unit_id=unit_id,
session_type="sound", # SLMs are sound monitoring devices
status="completed",
started_at=datetime.utcnow(),
stopped_at=datetime.utcnow(),
device_model=_ftp_unit.slm_model if _ftp_unit else None,
session_metadata='{"source": "ftp_download", "note": "Auto-created for FTP download"}'
)
db.add(session)
db.commit()
db.refresh(session)
# Download file from SLMM
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
try:
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.post(
f"{SLMM_BASE_URL}/api/nl43/{unit_id}/ftp/download",
json={"remote_path": remote_path}
)
if not response.is_success:
raise HTTPException(
status_code=response.status_code,
detail=f"Failed to download from SLMM: {response.text}"
)
# Extract filename from remote_path
filename = os.path.basename(remote_path)
# Determine file type from extension
ext = os.path.splitext(filename)[1].lower()
file_type_map = {
# Audio files
'.wav': 'audio',
'.mp3': 'audio',
'.flac': 'audio',
'.m4a': 'audio',
'.aac': 'audio',
# Sound level meter measurement files
'.rnd': 'measurement',
# Data files
'.csv': 'data',
'.txt': 'data',
'.json': 'data',
'.xml': 'data',
'.dat': 'data',
# Log files
'.log': 'log',
# Archives
'.zip': 'archive',
'.tar': 'archive',
'.gz': 'archive',
'.7z': 'archive',
'.rar': 'archive',
# Images
'.jpg': 'image',
'.jpeg': 'image',
'.png': 'image',
'.gif': 'image',
# Documents
'.pdf': 'document',
'.doc': 'document',
'.docx': 'document',
}
file_type = file_type_map.get(ext, 'data')
# Create directory structure: data/Projects/{project_id}/{session_id}/
project_dir = Path(f"data/Projects/{project_id}/{session.id}")
project_dir.mkdir(parents=True, exist_ok=True)
# Save file to disk
file_path = project_dir / filename
file_content = response.content
with open(file_path, 'wb') as f:
f.write(file_content)
# Calculate checksum
checksum = hashlib.sha256(file_content).hexdigest()
# Create DataFile record
data_file = DataFile(
id=str(uuid.uuid4()),
session_id=session.id,
file_path=str(file_path.relative_to("data")), # Store relative to data/
file_type=file_type,
file_size_bytes=len(file_content),
downloaded_at=datetime.utcnow(),
checksum=checksum,
file_metadata=json.dumps({
"source": "ftp",
"remote_path": remote_path,
"unit_id": unit_id,
"location_id": location_id,
})
)
db.add(data_file)
db.commit()
return {
"success": True,
"message": f"Downloaded {filename} to server",
"file_id": data_file.id,
"file_path": str(file_path),
"file_size": len(file_content),
}
except httpx.TimeoutException:
raise HTTPException(
status_code=504,
detail="Timeout downloading file from SLM"
)
except Exception as e:
logger.error(f"Error downloading file to server: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to download file to server: {str(e)}"
)
@router.post("/{project_id}/ftp-download-folder-to-server")
async def ftp_download_folder_to_server(
project_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Download an entire folder from an SLM to the server via FTP.
Extracts all files from the ZIP and preserves folder structure.
Creates individual DataFile records for each file.
"""
import httpx
import os
import hashlib
import zipfile
import io
from pathlib import Path
from backend.models import DataFile
data = await request.json()
unit_id = data.get("unit_id")
remote_path = data.get("remote_path")
location_id = data.get("location_id")
if not unit_id or not remote_path:
raise HTTPException(status_code=400, detail="Missing unit_id or remote_path")
# Get or create active session for this location/unit
session = db.query(MonitoringSession).filter(
and_(
MonitoringSession.project_id == project_id,
MonitoringSession.location_id == location_id,
MonitoringSession.unit_id == unit_id,
MonitoringSession.status.in_(["recording", "paused"])
)
).first()
# If no active session, create one
if not session:
_ftp_unit = db.query(RosterUnit).filter_by(id=unit_id).first()
session = MonitoringSession(
id=str(uuid.uuid4()),
project_id=project_id,
location_id=location_id,
unit_id=unit_id,
session_type="sound", # SLMs are sound monitoring devices
status="completed",
started_at=datetime.utcnow(),
stopped_at=datetime.utcnow(),
device_model=_ftp_unit.slm_model if _ftp_unit else None,
session_metadata='{"source": "ftp_folder_download", "note": "Auto-created for FTP folder download"}'
)
db.add(session)
db.commit()
db.refresh(session)
# Download folder from SLMM (returns ZIP)
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
try:
async with httpx.AsyncClient(timeout=600.0) as client: # Longer timeout for folders
response = await client.post(
f"{SLMM_BASE_URL}/api/nl43/{unit_id}/ftp/download-folder",
json={"remote_path": remote_path}
)
if not response.is_success:
raise HTTPException(
status_code=response.status_code,
detail=f"Failed to download folder from SLMM: {response.text}"
)
# Extract folder name from remote_path
folder_name = os.path.basename(remote_path.rstrip('/'))
# Create base directory: data/Projects/{project_id}/{session_id}/{folder_name}/
base_dir = Path(f"data/Projects/{project_id}/{session.id}/{folder_name}")
base_dir.mkdir(parents=True, exist_ok=True)
# Extract ZIP and save individual files
zip_content = response.content
created_files = []
total_size = 0
# File type mapping for classification
file_type_map = {
# Audio files
'.wav': 'audio', '.mp3': 'audio', '.flac': 'audio', '.m4a': 'audio', '.aac': 'audio',
# Data files
'.csv': 'data', '.txt': 'data', '.json': 'data', '.xml': 'data', '.dat': 'data',
# Log files
'.log': 'log',
# Archives
'.zip': 'archive', '.tar': 'archive', '.gz': 'archive', '.7z': 'archive', '.rar': 'archive',
# Images
'.jpg': 'image', '.jpeg': 'image', '.png': 'image', '.gif': 'image',
# Documents
'.pdf': 'document', '.doc': 'document', '.docx': 'document',
}
with zipfile.ZipFile(io.BytesIO(zip_content)) as zf:
for zip_info in zf.filelist:
# Skip directories
if zip_info.is_dir():
continue
# Read file from ZIP
file_data = zf.read(zip_info.filename)
# Determine file path (preserve structure within folder)
# zip_info.filename might be like "Auto_0001/measurement.wav"
file_path = base_dir / zip_info.filename
file_path.parent.mkdir(parents=True, exist_ok=True)
# Write file to disk
with open(file_path, 'wb') as f:
f.write(file_data)
# Calculate checksum
checksum = hashlib.sha256(file_data).hexdigest()
# Determine file type
ext = os.path.splitext(zip_info.filename)[1].lower()
file_type = file_type_map.get(ext, 'data')
# Create DataFile record
data_file = DataFile(
id=str(uuid.uuid4()),
session_id=session.id,
file_path=str(file_path.relative_to("data")),
file_type=file_type,
file_size_bytes=len(file_data),
downloaded_at=datetime.utcnow(),
checksum=checksum,
file_metadata=json.dumps({
"source": "ftp_folder",
"remote_path": remote_path,
"unit_id": unit_id,
"location_id": location_id,
"folder_name": folder_name,
"relative_path": zip_info.filename,
})
)
db.add(data_file)
created_files.append({
"filename": zip_info.filename,
"size": len(file_data),
"type": file_type
})
total_size += len(file_data)
db.commit()
return {
"success": True,
"message": f"Downloaded folder {folder_name} with {len(created_files)} files",
"folder_name": folder_name,
"file_count": len(created_files),
"total_size": total_size,
"files": created_files,
}
except httpx.TimeoutException:
raise HTTPException(
status_code=504,
detail="Timeout downloading folder from SLM (large folders may take a while)"
)
except zipfile.BadZipFile:
raise HTTPException(
status_code=500,
detail="Downloaded file is not a valid ZIP archive"
)
except Exception as e:
logger.error(f"Error downloading folder to server: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to download folder to server: {str(e)}"
)
# ============================================================================
# Project Types
# ============================================================================
@router.get("/{project_id}/files-unified", response_class=HTMLResponse)
async def get_unified_files(
project_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Get unified view of all files in this project.
Groups files by recording session with full metadata.
Returns HTML partial with hierarchical file listing.
"""
from backend.models import DataFile
from pathlib import Path
import json
# Get all sessions for this project
sessions = db.query(MonitoringSession).filter_by(
project_id=project_id
).order_by(MonitoringSession.started_at.desc()).all()
sessions_data = []
for session in sessions:
# Get files for this session
files = db.query(DataFile).filter_by(session_id=session.id).all()
# Skip sessions with no files
if not files:
continue
# Get session context
unit = None
location = None
if session.unit_id:
unit = db.query(RosterUnit).filter_by(id=session.unit_id).first()
if session.location_id:
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first()
files_data = []
for file in files:
# Check if file exists on disk
file_path = Path("data") / file.file_path
exists_on_disk = file_path.exists()
# Get actual file size if exists
actual_size = file_path.stat().st_size if exists_on_disk else None
# Parse metadata JSON
metadata = {}
try:
if file.file_metadata:
metadata = json.loads(file.file_metadata)
except Exception as e:
logger.warning(f"Failed to parse metadata for file {file.id}: {e}")
files_data.append({
"file": file,
"exists_on_disk": exists_on_disk,
"actual_size": actual_size,
"metadata": metadata,
})
sessions_data.append({
"session": session,
"unit": unit,
"location": location,
"files": files_data,
})
return templates.TemplateResponse("partials/projects/unified_files.html", {
"request": request,
"project_id": project_id,
"sessions": sessions_data,
})
@router.get("/{project_id}/files/{file_id}/download")
async def download_project_file(
project_id: str,
file_id: str,
db: Session = Depends(get_db),
):
"""
Download a data file from a project.
Returns the file for download.
"""
from backend.models import DataFile
from fastapi.responses import FileResponse
from pathlib import Path
# Get the file record
file_record = db.query(DataFile).filter_by(id=file_id).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
# Verify file belongs to this project
session = db.query(MonitoringSession).filter_by(id=file_record.session_id).first()
if not session or session.project_id != project_id:
raise HTTPException(status_code=403, detail="File does not belong to this project")
# Build full file path
file_path = Path("data") / file_record.file_path
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found on disk")
# Extract filename for download
filename = file_path.name
return FileResponse(
path=str(file_path),
filename=filename,
media_type="application/octet-stream"
)
@router.get("/{project_id}/sessions/{session_id}/download-all")
async def download_session_files(
project_id: str,
session_id: str,
db: Session = Depends(get_db),
):
"""
Download all files from a session as a single zip archive.
"""
from backend.models import DataFile
from pathlib import Path
import zipfile
# Verify session belongs to this project
session = db.query(MonitoringSession).filter_by(id=session_id).first()
if not session:
raise HTTPException(status_code=404, detail="Session not found")
if session.project_id != project_id:
raise HTTPException(status_code=403, detail="Session does not belong to this project")
# Get all files for this session
files = db.query(DataFile).filter_by(session_id=session_id).all()
if not files:
raise HTTPException(status_code=404, detail="No files found in this session")
# Create zip in memory
zip_buffer = io.BytesIO()
# Get session info for folder naming
session_date = session.started_at.strftime('%Y-%m-%d_%H%M') if session.started_at else 'unknown'
# Get unit and location for naming
unit = db.query(RosterUnit).filter_by(id=session.unit_id).first() if session.unit_id else None
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None
unit_name = unit.id if unit else "unknown_unit"
location_name = location.name.replace(" ", "_") if location else ""
# Build folder name for zip contents
folder_name = f"{session_date}_{unit_name}"
if location_name:
folder_name += f"_{location_name}"
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for file_record in files:
file_path = Path("data") / file_record.file_path
if file_path.exists():
# Add file to zip with folder structure
arcname = f"{folder_name}/{file_path.name}"
zip_file.write(file_path, arcname)
zip_buffer.seek(0)
# Generate filename for the zip
zip_filename = f"{folder_name}.zip"
return StreamingResponse(
zip_buffer,
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={zip_filename}"}
)
@router.delete("/{project_id}/files/{file_id}")
async def delete_project_file(
project_id: str,
file_id: str,
db: Session = Depends(get_db),
):
"""
Delete a single data file from a project.
Removes both the database record and the file on disk.
"""
from backend.models import DataFile
from pathlib import Path
# Get the file record
file_record = db.query(DataFile).filter_by(id=file_id).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
# Verify file belongs to this project
session = db.query(MonitoringSession).filter_by(id=file_record.session_id).first()
if not session or session.project_id != project_id:
raise HTTPException(status_code=403, detail="File does not belong to this project")
# Delete file from disk if it exists
file_path = Path("data") / file_record.file_path
if file_path.exists():
file_path.unlink()
# Delete database record
db.delete(file_record)
db.commit()
return JSONResponse({"status": "success", "message": "File deleted"})
@router.delete("/{project_id}/sessions/{session_id}")
async def delete_session(
project_id: str,
session_id: str,
db: Session = Depends(get_db),
):
"""
Delete an entire session and all its files.
Removes database records and files on disk.
"""
from backend.models import DataFile
from pathlib import Path
# Verify session belongs to this project
session = db.query(MonitoringSession).filter_by(id=session_id).first()
if not session:
raise HTTPException(status_code=404, detail="Session not found")
if session.project_id != project_id:
raise HTTPException(status_code=403, detail="Session does not belong to this project")
# Get all files for this session
files = db.query(DataFile).filter_by(session_id=session_id).all()
# Delete files from disk
deleted_count = 0
for file_record in files:
file_path = Path("data") / file_record.file_path
if file_path.exists():
file_path.unlink()
deleted_count += 1
# Delete database record
db.delete(file_record)
# Delete the session record
db.delete(session)
db.commit()
return JSONResponse({
"status": "success",
"message": f"Session and {deleted_count} file(s) deleted"
})
@router.get("/{project_id}/files/{file_id}/view-rnd", response_class=HTMLResponse)
async def view_rnd_file(
request: Request,
project_id: str,
file_id: str,
db: Session = Depends(get_db),
):
"""
View an RND (sound level meter measurement) file.
Returns a dedicated page with data table and charts.
"""
from backend.models import DataFile
from pathlib import Path
# Get the file record
file_record = db.query(DataFile).filter_by(id=file_id).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
# Verify file belongs to this project
session = db.query(MonitoringSession).filter_by(id=file_record.session_id).first()
if not session or session.project_id != project_id:
raise HTTPException(status_code=403, detail="File does not belong to this project")
# Build full file path
file_path = Path("data") / file_record.file_path
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found on disk")
# Get project info
project = db.query(Project).filter_by(id=project_id).first()
# Get location info if available
location = None
if session.location_id:
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first()
# Get unit info if available
unit = None
if session.unit_id:
unit = db.query(RosterUnit).filter_by(id=session.unit_id).first()
# Parse file metadata
metadata = {}
if file_record.file_metadata:
try:
metadata = json.loads(file_record.file_metadata)
except json.JSONDecodeError:
pass
return templates.TemplateResponse("rnd_viewer.html", {
"request": request,
"project": project,
"project_id": project_id,
"file": file_record,
"file_id": file_id,
"session": session,
"location": location,
"unit": unit,
"metadata": metadata,
"filename": file_path.name,
"is_leq": _is_leq_file(str(file_record.file_path), _peek_rnd_headers(file_path)),
})
@router.get("/{project_id}/files/{file_id}/rnd-data")
async def get_rnd_data(
project_id: str,
file_id: str,
db: Session = Depends(get_db),
):
"""
Get parsed RND file data as JSON.
Returns the measurement data for charts and tables.
"""
from backend.models import DataFile
from pathlib import Path
import csv
import io
# Get the file record
file_record = db.query(DataFile).filter_by(id=file_id).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
# Verify file belongs to this project
session = db.query(MonitoringSession).filter_by(id=file_record.session_id).first()
if not session or session.project_id != project_id:
raise HTTPException(status_code=403, detail="File does not belong to this project")
# Build full file path
file_path = Path("data") / file_record.file_path
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found on disk")
# Read and parse the RND file
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
# Parse as CSV
reader = csv.DictReader(io.StringIO(content))
rows = []
headers = []
for row in reader:
if not headers:
headers = list(row.keys())
# Clean up values - strip whitespace and handle special values
cleaned_row = {}
for key, value in row.items():
if key: # Skip empty keys
cleaned_key = key.strip()
cleaned_value = value.strip() if value else ''
# Convert numeric values
if cleaned_value and cleaned_value not in ['-.-', '-', '']:
try:
cleaned_value = float(cleaned_value)
except ValueError:
pass
elif cleaned_value in ['-.-', '-']:
cleaned_value = None
cleaned_row[cleaned_key] = cleaned_value
rows.append(cleaned_row)
# Normalise AU2-format columns to NL-43 names
rows, _was_au2 = _normalize_rnd_rows(rows)
if _was_au2:
headers = list(rows[0].keys()) if rows else headers
# Detect file type (Leq vs Lp) based on columns
file_type = 'unknown'
if headers:
header_str = ','.join(headers).lower()
if 'leq(main)' in header_str or 'laeq' in header_str:
file_type = 'leq' # Time-averaged data
elif 'lp(main)' in header_str or 'lp (main)' in header_str:
file_type = 'lp' # Instantaneous data
# Get summary statistics
summary = {
"total_rows": len(rows),
"file_type": file_type,
"headers": [h.strip() for h in headers if h.strip()],
}
# Calculate min/max/avg for key metrics if available
metrics_to_summarize = ['Leq(Main)', 'Lmax(Main)', 'Lmin(Main)', 'Lpeak(Main)', 'Lp(Main)']
for metric in metrics_to_summarize:
values = [row.get(metric) for row in rows if isinstance(row.get(metric), (int, float))]
if values:
summary[f"{metric}_min"] = min(values)
summary[f"{metric}_max"] = max(values)
summary[f"{metric}_avg"] = sum(values) / len(values)
# Get time range
if rows:
first_time = rows[0].get('Start Time', '')
last_time = rows[-1].get('Start Time', '')
summary['time_start'] = first_time
summary['time_end'] = last_time
return {
"success": True,
"summary": summary,
"headers": summary["headers"],
"data": rows,
}
except Exception as e:
logger.error(f"Error parsing RND file: {e}")
raise HTTPException(status_code=500, detail=f"Error parsing file: {str(e)}")
@router.get("/{project_id}/files/{file_id}/generate-report")
async def generate_excel_report(
project_id: str,
file_id: str,
report_title: str = Query("Background Noise Study", description="Title for the report"),
location_name: str = Query("", description="Location name (e.g., 'NRL 1 - West Side')"),
project_name: str = Query("", description="Project name override"),
client_name: str = Query("", description="Client name for report header"),
start_time: str = Query("", description="Filter start time (HH:MM format, e.g., '19:00')"),
end_time: str = Query("", description="Filter end time (HH:MM format, e.g., '07:00')"),
start_date: str = Query("", description="Filter start date (YYYY-MM-DD format)"),
end_date: str = Query("", description="Filter end date (YYYY-MM-DD format)"),
db: Session = Depends(get_db),
):
"""
Generate an Excel report from an RND file.
Creates a formatted Excel workbook with:
- Title and location headers
- Data table (Test #, Date, Time, LAmax, LA01, LA10, Comments)
- Line chart visualization
- Time period summary statistics
Time filtering:
- start_time/end_time: Filter to time window (handles overnight like 19:00-07:00)
- start_date/end_date: Filter to date range
Column mapping from RND to Report:
- Lmax(Main) -> LAmax (dBA)
- LN1(Main) -> LA01 (dBA) [L1 percentile]
- LN2(Main) -> LA10 (dBA) [L10 percentile]
"""
from backend.models import DataFile
from pathlib import Path
import csv
try:
import openpyxl
from openpyxl.chart import LineChart, Reference
from openpyxl.chart.label import DataLabelList
from openpyxl.styles import Font, Alignment, Border, Side, PatternFill
from openpyxl.utils import get_column_letter
except ImportError:
raise HTTPException(
status_code=500,
detail="openpyxl is not installed. Run: pip install openpyxl"
)
# Get the file record
file_record = db.query(DataFile).filter_by(id=file_id).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
# Verify file belongs to this project
session = db.query(MonitoringSession).filter_by(id=file_record.session_id).first()
if not session or session.project_id != project_id:
raise HTTPException(status_code=403, detail="File does not belong to this project")
# Get related data for report context
project = db.query(Project).filter_by(id=project_id).first()
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None
# Build full file path
file_path = Path("data") / file_record.file_path
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found on disk")
# Read and parse the RND file
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
reader = csv.DictReader(io.StringIO(content))
rnd_rows = []
for row in reader:
cleaned_row = {}
for key, value in row.items():
if key:
cleaned_key = key.strip()
cleaned_value = value.strip() if value else ''
if cleaned_value and cleaned_value not in ['-.-', '-', '']:
try:
cleaned_value = float(cleaned_value)
except ValueError:
pass
elif cleaned_value in ['-.-', '-']:
cleaned_value = None
cleaned_row[cleaned_key] = cleaned_value
rnd_rows.append(cleaned_row)
if not rnd_rows:
raise HTTPException(status_code=400, detail="No data found in RND file")
# Normalise AU2-format columns to NL-43 names
rnd_rows, _ = _normalize_rnd_rows(rnd_rows)
# Validate this is a Leq file — Lp files lack the LN percentile data
if not _is_leq_file(file_record.file_path, rnd_rows):
raise HTTPException(
status_code=400,
detail="Reports can only be generated from Leq files (15-minute averaged data). This appears to be an Lp (instantaneous) file."
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error reading RND file: {e}")
raise HTTPException(status_code=500, detail=f"Error reading file: {str(e)}")
# Apply time and date filtering
def filter_rows_by_time(rows, filter_start_time, filter_end_time, filter_start_date, filter_end_date):
"""Filter rows by time window and date range."""
if not filter_start_time and not filter_end_time and not filter_start_date and not filter_end_date:
return rows
filtered = []
# Parse time filters
start_hour = start_minute = end_hour = end_minute = None
if filter_start_time:
try:
parts = filter_start_time.split(':')
start_hour = int(parts[0])
start_minute = int(parts[1]) if len(parts) > 1 else 0
except (ValueError, IndexError):
pass
if filter_end_time:
try:
parts = filter_end_time.split(':')
end_hour = int(parts[0])
end_minute = int(parts[1]) if len(parts) > 1 else 0
except (ValueError, IndexError):
pass
# Parse date filters
start_dt = end_dt = None
if filter_start_date:
try:
start_dt = datetime.strptime(filter_start_date, '%Y-%m-%d').date()
except ValueError:
pass
if filter_end_date:
try:
end_dt = datetime.strptime(filter_end_date, '%Y-%m-%d').date()
except ValueError:
pass
for row in rows:
start_time_str = row.get('Start Time', '')
if not start_time_str:
continue
try:
dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S')
row_date = dt.date()
row_hour = dt.hour
row_minute = dt.minute
# Date filtering
if start_dt and row_date < start_dt:
continue
if end_dt and row_date > end_dt:
continue
# Time filtering (handle overnight ranges like 19:00-07:00)
if start_hour is not None and end_hour is not None:
row_time_minutes = row_hour * 60 + row_minute
start_time_minutes = start_hour * 60 + start_minute
end_time_minutes = end_hour * 60 + end_minute
if start_time_minutes > end_time_minutes:
# Overnight range (e.g., 19:00-07:00)
if not (row_time_minutes >= start_time_minutes or row_time_minutes < end_time_minutes):
continue
else:
# Same day range (e.g., 07:00-19:00)
if not (start_time_minutes <= row_time_minutes < end_time_minutes):
continue
filtered.append(row)
except ValueError:
# If we can't parse the time, include the row anyway
filtered.append(row)
return filtered
# Apply filters
original_count = len(rnd_rows)
rnd_rows = filter_rows_by_time(rnd_rows, start_time, end_time, start_date, end_date)
if not rnd_rows:
time_filter_desc = ""
if start_time and end_time:
time_filter_desc = f" between {start_time} and {end_time}"
if start_date or end_date:
time_filter_desc += f" from {start_date or 'start'} to {end_date or 'end'}"
raise HTTPException(
status_code=400,
detail=f"No data found after applying filters{time_filter_desc}. Original file had {original_count} rows."
)
# Create Excel workbook
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Sound Level Data"
# Define styles
title_font = Font(name='Arial', bold=True, size=12)
subtitle_font = Font(name='Arial', bold=True, size=12)
client_font = Font(name='Arial', italic=True, size=10)
filter_font = Font(name='Arial', italic=True, size=10, color="666666")
header_font = Font(name='Arial', bold=True, size=10)
data_font = Font(name='Arial', size=10)
summary_title_font = Font(name='Arial', bold=True, size=12)
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
header_fill = PatternFill(start_color="DAEEF3", end_color="DAEEF3", fill_type="solid")
center_align = Alignment(horizontal='center', vertical='center')
left_align = Alignment(horizontal='left', vertical='center')
right_align = Alignment(horizontal='right', vertical='center')
# Row 1: Report title
final_project_name = project_name if project_name else (project.name if project else "")
final_title = report_title
if final_project_name:
final_title = f"{report_title} - {final_project_name}"
ws['A1'] = final_title
ws['A1'].font = title_font
ws['A1'].alignment = center_align
ws.merge_cells('A1:G1')
ws.row_dimensions[1].height = 20
# Row 2: Client name (if provided)
if client_name:
ws['A2'] = f"Client: {client_name}"
ws['A2'].font = client_font
ws['A2'].alignment = center_align
ws.merge_cells('A2:G2')
ws.row_dimensions[2].height = 16
# Row 3: Location name
final_location = location_name
if not final_location and location:
final_location = location.name
if final_location:
ws['A3'] = final_location
ws['A3'].font = subtitle_font
ws['A3'].alignment = center_align
ws.merge_cells('A3:G3')
ws.row_dimensions[3].height = 20
# Row 4: Time filter info (if applied)
if start_time and end_time:
filter_info = f"Time Filter: {start_time} - {end_time}"
if start_date or end_date:
filter_info += f" | Date Range: {start_date or 'start'} to {end_date or 'end'}"
filter_info += f" | {len(rnd_rows)} of {original_count} rows"
ws['A4'] = filter_info
ws['A4'].font = filter_font
ws['A4'].alignment = center_align
ws.merge_cells('A4:G4')
ws.row_dimensions[4].height = 14
# Row 7: Headers
headers = ['Test Increment #', 'Date', 'Time', 'LAmax (dBA)', 'LA01 (dBA)', 'LA10 (dBA)', 'Comments']
for col, header in enumerate(headers, 1):
cell = ws.cell(row=7, column=col, value=header)
cell.font = header_font
cell.border = thin_border
cell.fill = header_fill
cell.alignment = center_align
ws.row_dimensions[7].height = 16
# Set column widths — A-G sized to fit one printed page width
column_widths = [12, 11, 9, 11, 11, 11, 20]
for i, width in enumerate(column_widths, 1):
ws.column_dimensions[get_column_letter(i)].width = width
# Data rows starting at row 8
data_start_row = 8
for idx, row in enumerate(rnd_rows, 1):
data_row = data_start_row + idx - 1
# Test Increment #
c = ws.cell(row=data_row, column=1, value=idx)
c.border = thin_border; c.font = data_font; c.alignment = center_align
# Parse the Start Time to get Date and Time
start_time_str = row.get('Start Time', '')
if start_time_str:
try:
dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S')
c2 = ws.cell(row=data_row, column=2, value=dt.strftime('%m/%d/%y'))
c3 = ws.cell(row=data_row, column=3, value=dt.strftime('%H:%M'))
except ValueError:
c2 = ws.cell(row=data_row, column=2, value=start_time_str)
c3 = ws.cell(row=data_row, column=3, value='')
else:
c2 = ws.cell(row=data_row, column=2, value='')
c3 = ws.cell(row=data_row, column=3, value='')
for c in (c2, c3):
c.border = thin_border; c.font = data_font; c.alignment = center_align
# LAmax, LA01, LA10
for col_idx, key in [(4, 'Lmax(Main)'), (5, 'LN1(Main)'), (6, 'LN2(Main)')]:
val = row.get(key)
c = ws.cell(row=data_row, column=col_idx, value=val if val else '')
c.border = thin_border; c.font = data_font; c.alignment = right_align
# Comments
c = ws.cell(row=data_row, column=7, value='')
c.border = thin_border; c.font = data_font; c.alignment = left_align
data_end_row = data_start_row + len(rnd_rows) - 1
# Add Line Chart
chart = LineChart()
chart.title = f"{final_location or 'Sound Level Data'} - Background Noise Study"
chart.style = 10
chart.y_axis.title = "Sound Level (dBA)"
chart.x_axis.title = "Time"
chart.height = 18
chart.width = 22
# Data references (LAmax, LA01, LA10 are columns D, E, F)
data_ref = Reference(ws, min_col=4, min_row=7, max_col=6, max_row=data_end_row)
categories = Reference(ws, min_col=3, min_row=data_start_row, max_row=data_end_row)
chart.add_data(data_ref, titles_from_data=True)
chart.set_categories(categories)
# Style the series
if len(chart.series) >= 3:
chart.series[0].graphicalProperties.line.solidFill = "FF0000" # LAmax - Red
chart.series[1].graphicalProperties.line.solidFill = "00B050" # LA01 - Green
chart.series[2].graphicalProperties.line.solidFill = "0070C0" # LA10 - Blue
# Position chart starting at column H
ws.add_chart(chart, "H3")
# Print layout: A-G fits one page width, landscape
from openpyxl.worksheet.properties import PageSetupProperties
ws.sheet_properties.pageSetUpPr = PageSetupProperties(fitToPage=True)
ws.page_setup.orientation = 'landscape'
ws.page_setup.fitToWidth = 1
ws.page_setup.fitToHeight = 0
# Add summary statistics section below the data
summary_row = data_end_row + 3
c = ws.cell(row=summary_row, column=1, value="Summary Statistics")
c.font = summary_title_font
# Calculate time-period statistics
time_periods = {
'Evening (7PM-10PM)': [],
'Nighttime (10PM-7AM)': [],
'Morning (7AM-12PM)': [],
'Daytime (12PM-7PM)': []
}
for row in rnd_rows:
start_time_str = row.get('Start Time', '')
if start_time_str:
try:
dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S')
hour = dt.hour
lmax = row.get('Lmax(Main)')
ln1 = row.get('LN1(Main)')
ln2 = row.get('LN2(Main)')
if isinstance(lmax, (int, float)) and isinstance(ln1, (int, float)) and isinstance(ln2, (int, float)):
data_point = {'lmax': lmax, 'ln1': ln1, 'ln2': ln2}
if 19 <= hour < 22:
time_periods['Evening (7PM-10PM)'].append(data_point)
elif hour >= 22 or hour < 7:
time_periods['Nighttime (10PM-7AM)'].append(data_point)
elif 7 <= hour < 12:
time_periods['Morning (7AM-12PM)'].append(data_point)
else: # 12-19
time_periods['Daytime (12PM-7PM)'].append(data_point)
except ValueError:
continue
# Summary table headers
summary_row += 2
summary_headers = ['Time Period', 'Samples', 'LAmax Avg', 'LA01 Avg', 'LA10 Avg']
for col, header in enumerate(summary_headers, 1):
cell = ws.cell(row=summary_row, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.border = thin_border
cell.alignment = center_align
# Summary data
summary_row += 1
for period_name, samples in time_periods.items():
c = ws.cell(row=summary_row, column=1, value=period_name)
c.border = thin_border; c.font = data_font; c.alignment = left_align
c = ws.cell(row=summary_row, column=2, value=len(samples))
c.border = thin_border; c.font = data_font; c.alignment = center_align
if samples:
avg_lmax = sum(s['lmax'] for s in samples) / len(samples)
avg_ln1 = sum(s['ln1'] for s in samples) / len(samples)
avg_ln2 = sum(s['ln2'] for s in samples) / len(samples)
for col_idx, val in [(3, avg_lmax), (4, avg_ln1), (5, avg_ln2)]:
c = ws.cell(row=summary_row, column=col_idx, value=round(val, 1))
c.border = thin_border; c.font = data_font; c.alignment = right_align
else:
for col_idx in [3, 4, 5]:
c = ws.cell(row=summary_row, column=col_idx, value='-')
c.border = thin_border; c.font = data_font; c.alignment = center_align
summary_row += 1
# Overall summary
summary_row += 1
c = ws.cell(row=summary_row, column=1, value='Overall')
c.font = Font(name='Arial', bold=True, size=10); c.border = thin_border; c.alignment = left_align
c = ws.cell(row=summary_row, column=2, value=len(rnd_rows))
c.border = thin_border; c.font = data_font; c.alignment = center_align
all_lmax = [r.get('Lmax(Main)') for r in rnd_rows if isinstance(r.get('Lmax(Main)'), (int, float))]
all_ln1 = [r.get('LN1(Main)') for r in rnd_rows if isinstance(r.get('LN1(Main)'), (int, float))]
all_ln2 = [r.get('LN2(Main)') for r in rnd_rows if isinstance(r.get('LN2(Main)'), (int, float))]
for col_idx, vals in [(3, all_lmax), (4, all_ln1), (5, all_ln2)]:
if vals:
c = ws.cell(row=summary_row, column=col_idx, value=round(sum(vals) / len(vals), 1))
c.border = thin_border; c.font = data_font; c.alignment = right_align
# Save to buffer
output = io.BytesIO()
wb.save(output)
output.seek(0)
# Generate filename
filename = file_record.file_path.split('/')[-1].replace('.rnd', '')
if location:
filename = f"{location.name}_{filename}"
filename = f"{filename}_report.xlsx"
# Clean filename
filename = "".join(c for c in filename if c.isalnum() or c in ('_', '-', '.')).rstrip()
return StreamingResponse(
output,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="{filename}"'}
)
@router.get("/{project_id}/files/{file_id}/preview-report")
async def preview_report_data(
request: Request,
project_id: str,
file_id: str,
report_title: str = Query("Background Noise Study", description="Title for the report"),
location_name: str = Query("", description="Location name"),
project_name: str = Query("", description="Project name override"),
client_name: str = Query("", description="Client name"),
start_time: str = Query("", description="Filter start time (HH:MM format)"),
end_time: str = Query("", description="Filter end time (HH:MM format)"),
start_date: str = Query("", description="Filter start date (YYYY-MM-DD format)"),
end_date: str = Query("", description="Filter end date (YYYY-MM-DD format)"),
db: Session = Depends(get_db),
):
"""
Preview report data for editing in jspreadsheet.
Returns an HTML page with the spreadsheet editor.
"""
from backend.models import DataFile, ReportTemplate
from pathlib import Path
import csv
# Get the file record
file_record = db.query(DataFile).filter_by(id=file_id).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
# Verify file belongs to this project
session = db.query(MonitoringSession).filter_by(id=file_record.session_id).first()
if not session or session.project_id != project_id:
raise HTTPException(status_code=403, detail="File does not belong to this project")
# Get related data for report context
project = db.query(Project).filter_by(id=project_id).first()
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None
# Build full file path
file_path = Path("data") / file_record.file_path
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found on disk")
# Read and parse the RND file
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
reader = csv.DictReader(io.StringIO(content))
rnd_rows = []
for row in reader:
cleaned_row = {}
for key, value in row.items():
if key:
cleaned_key = key.strip()
cleaned_value = value.strip() if value else ''
if cleaned_value and cleaned_value not in ['-.-', '-', '']:
try:
cleaned_value = float(cleaned_value)
except ValueError:
pass
elif cleaned_value in ['-.-', '-']:
cleaned_value = None
cleaned_row[cleaned_key] = cleaned_value
rnd_rows.append(cleaned_row)
if not rnd_rows:
raise HTTPException(status_code=400, detail="No data found in RND file")
rnd_rows, _ = _normalize_rnd_rows(rnd_rows)
if not _is_leq_file(file_record.file_path, rnd_rows):
raise HTTPException(
status_code=400,
detail="Reports can only be generated from Leq files (15-minute averaged data)."
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error reading RND file: {e}")
raise HTTPException(status_code=500, detail=f"Error reading file: {str(e)}")
# Apply time and date filtering (same logic as generate-report)
def filter_rows(rows, filter_start_time, filter_end_time, filter_start_date, filter_end_date):
if not filter_start_time and not filter_end_time and not filter_start_date and not filter_end_date:
return rows
filtered = []
start_hour = start_minute = end_hour = end_minute = None
if filter_start_time:
try:
parts = filter_start_time.split(':')
start_hour = int(parts[0])
start_minute = int(parts[1]) if len(parts) > 1 else 0
except (ValueError, IndexError):
pass
if filter_end_time:
try:
parts = filter_end_time.split(':')
end_hour = int(parts[0])
end_minute = int(parts[1]) if len(parts) > 1 else 0
except (ValueError, IndexError):
pass
start_dt = end_dt = None
if filter_start_date:
try:
start_dt = datetime.strptime(filter_start_date, '%Y-%m-%d').date()
except ValueError:
pass
if filter_end_date:
try:
end_dt = datetime.strptime(filter_end_date, '%Y-%m-%d').date()
except ValueError:
pass
for row in rows:
start_time_str = row.get('Start Time', '')
if not start_time_str:
continue
try:
dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S')
row_date = dt.date()
row_hour = dt.hour
row_minute = dt.minute
if start_dt and row_date < start_dt:
continue
if end_dt and row_date > end_dt:
continue
if start_hour is not None and end_hour is not None:
row_time_minutes = row_hour * 60 + row_minute
start_time_minutes = start_hour * 60 + start_minute
end_time_minutes = end_hour * 60 + end_minute
if start_time_minutes > end_time_minutes:
if not (row_time_minutes >= start_time_minutes or row_time_minutes < end_time_minutes):
continue
else:
if not (start_time_minutes <= row_time_minutes < end_time_minutes):
continue
filtered.append(row)
except ValueError:
filtered.append(row)
return filtered
original_count = len(rnd_rows)
rnd_rows = filter_rows(rnd_rows, start_time, end_time, start_date, end_date)
# Convert to spreadsheet data format (array of arrays)
spreadsheet_data = []
for idx, row in enumerate(rnd_rows, 1):
start_time_str = row.get('Start Time', '')
date_str = ''
time_str = ''
if start_time_str:
try:
dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S')
date_str = dt.strftime('%Y-%m-%d')
time_str = dt.strftime('%H:%M:%S')
except ValueError:
date_str = start_time_str
time_str = ''
lmax = row.get('Lmax(Main)', '')
ln1 = row.get('LN1(Main)', '')
ln2 = row.get('LN2(Main)', '')
spreadsheet_data.append([
idx, # Test #
date_str,
time_str,
lmax if lmax else '',
ln1 if ln1 else '',
ln2 if ln2 else '',
'' # Comments
])
# Prepare context data
final_project_name = project_name if project_name else (project.name if project else "")
final_location = location_name if location_name else (location.name if location else "")
# Get templates for the dropdown
report_templates = db.query(ReportTemplate).all()
return templates.TemplateResponse("report_preview.html", {
"request": request,
"project_id": project_id,
"file_id": file_id,
"project": project,
"location": location,
"file": file_record,
"spreadsheet_data": spreadsheet_data,
"report_title": report_title,
"project_name": final_project_name,
"client_name": client_name,
"location_name": final_location,
"start_time": start_time,
"end_time": end_time,
"start_date": start_date,
"end_date": end_date,
"original_count": original_count,
"filtered_count": len(rnd_rows),
"templates": report_templates,
})
@router.post("/{project_id}/files/{file_id}/generate-from-preview")
async def generate_report_from_preview(
project_id: str,
file_id: str,
data: dict,
db: Session = Depends(get_db),
):
"""
Generate an Excel report from edited spreadsheet data.
Accepts the edited data from jspreadsheet and creates the final Excel file.
"""
from backend.models import DataFile
from pathlib import Path
try:
import openpyxl
from openpyxl.chart import LineChart, Reference
from openpyxl.styles import Font, Alignment, Border, Side, PatternFill
from openpyxl.utils import get_column_letter
except ImportError:
raise HTTPException(status_code=500, detail="openpyxl is not installed")
# Get the file record for filename generation
file_record = db.query(DataFile).filter_by(id=file_id).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
session = db.query(MonitoringSession).filter_by(id=file_record.session_id).first()
if not session or session.project_id != project_id:
raise HTTPException(status_code=403, detail="File does not belong to this project")
project = db.query(Project).filter_by(id=project_id).first()
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None
# Extract data from request
spreadsheet_data = data.get('data', [])
report_title = data.get('report_title', 'Background Noise Study')
project_name = data.get('project_name', project.name if project else '')
client_name = data.get('client_name', '')
location_name = data.get('location_name', location.name if location else '')
time_filter = data.get('time_filter', '')
if not spreadsheet_data:
raise HTTPException(status_code=400, detail="No data provided")
# Create Excel workbook
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Sound Level Data"
# Styles
title_font = Font(name='Arial', bold=True, size=12)
subtitle_font = Font(name='Arial', bold=True, size=12)
client_font = Font(name='Arial', italic=True, size=10)
filter_font = Font(name='Arial', italic=True, size=10, color="666666")
header_font = Font(name='Arial', bold=True, size=10)
data_font = Font(name='Arial', size=10)
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
header_fill = PatternFill(start_color="DAEEF3", end_color="DAEEF3", fill_type="solid")
center_align = Alignment(horizontal='center', vertical='center')
left_align = Alignment(horizontal='left', vertical='center')
right_align = Alignment(horizontal='right', vertical='center')
# Row 1: Title
final_title = f"{report_title} - {project_name}" if project_name else report_title
ws['A1'] = final_title
ws['A1'].font = title_font
ws['A1'].alignment = center_align
ws.merge_cells('A1:G1')
ws.row_dimensions[1].height = 20
# Row 2: Client
if client_name:
ws['A2'] = f"Client: {client_name}"
ws['A2'].font = client_font
ws['A2'].alignment = center_align
ws.merge_cells('A2:G2')
ws.row_dimensions[2].height = 16
# Row 3: Location
if location_name:
ws['A3'] = location_name
ws['A3'].font = subtitle_font
ws['A3'].alignment = center_align
ws.merge_cells('A3:G3')
ws.row_dimensions[3].height = 20
# Row 4: Time filter info
if time_filter:
ws['A4'] = time_filter
ws['A4'].font = filter_font
ws['A4'].alignment = center_align
ws.merge_cells('A4:G4')
ws.row_dimensions[4].height = 14
# Row 7: Headers
headers = ['Test Increment #', 'Date', 'Time', 'LAmax (dBA)', 'LA01 (dBA)', 'LA10 (dBA)', 'Comments']
for col, header in enumerate(headers, 1):
cell = ws.cell(row=7, column=col, value=header)
cell.font = header_font
cell.border = thin_border
cell.fill = header_fill
cell.alignment = center_align
ws.row_dimensions[7].height = 16
# Column widths — A-G sized to fit one printed page width
column_widths = [12, 11, 9, 11, 11, 11, 20]
for i, width in enumerate(column_widths, 1):
ws.column_dimensions[get_column_letter(i)].width = width
# Data rows
data_start_row = 8
col_aligns = [center_align, center_align, center_align, right_align, right_align, right_align, left_align]
for idx, row_data in enumerate(spreadsheet_data):
data_row = data_start_row + idx
for col, value in enumerate(row_data, 1):
cell = ws.cell(row=data_row, column=col, value=value if value != '' else None)
cell.border = thin_border
cell.font = data_font
cell.alignment = col_aligns[col - 1] if col <= len(col_aligns) else left_align
data_end_row = data_start_row + len(spreadsheet_data) - 1
# Add chart if we have data
if len(spreadsheet_data) > 0:
chart = LineChart()
chart.title = f"{location_name or 'Sound Level Data'} - Background Noise Study"
chart.style = 10
chart.y_axis.title = "Sound Level (dBA)"
chart.x_axis.title = "Time"
chart.height = 18
chart.width = 22
data_ref = Reference(ws, min_col=4, min_row=7, max_col=6, max_row=data_end_row)
categories = Reference(ws, min_col=3, min_row=data_start_row, max_row=data_end_row)
chart.add_data(data_ref, titles_from_data=True)
chart.set_categories(categories)
if len(chart.series) >= 3:
chart.series[0].graphicalProperties.line.solidFill = "FF0000"
chart.series[1].graphicalProperties.line.solidFill = "00B050"
chart.series[2].graphicalProperties.line.solidFill = "0070C0"
ws.add_chart(chart, "H3")
# Print layout: A-G fits one page width, landscape
from openpyxl.worksheet.properties import PageSetupProperties
ws.sheet_properties.pageSetUpPr = PageSetupProperties(fitToPage=True)
ws.page_setup.orientation = 'landscape'
ws.page_setup.fitToWidth = 1
ws.page_setup.fitToHeight = 0
# Save to buffer
output = io.BytesIO()
wb.save(output)
output.seek(0)
# Generate filename
filename = file_record.file_path.split('/')[-1].replace('.rnd', '')
if location:
filename = f"{location.name}_{filename}"
filename = f"{filename}_report.xlsx"
filename = "".join(c for c in filename if c.isalnum() or c in ('_', '-', '.')).rstrip()
return StreamingResponse(
output,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="{filename}"'}
)
@router.get("/{project_id}/generate-combined-report")
async def generate_combined_excel_report(
project_id: str,
report_title: str = Query("Background Noise Study", description="Title for the report"),
db: Session = Depends(get_db),
):
"""
Generate a combined Excel report from all RND files in a project.
Creates a multi-sheet Excel workbook with:
- One sheet per location/RND file
- Data tables with LAmax, LA01, LA10
- Line charts for each location
- Summary sheet combining all locations
Column mapping from RND to Report:
- Lmax(Main) -> LAmax (dBA)
- LN1(Main) -> LA01 (dBA) [L1 percentile]
- LN2(Main) -> LA10 (dBA) [L10 percentile]
"""
from backend.models import DataFile
from pathlib import Path
import csv
try:
import openpyxl
from openpyxl.chart import LineChart, Reference
from openpyxl.styles import Font, Alignment, Border, Side, PatternFill
from openpyxl.utils import get_column_letter
except ImportError:
raise HTTPException(
status_code=500,
detail="openpyxl is not installed. Run: pip install openpyxl"
)
# Get project
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Get all sessions with measurement files
sessions = db.query(MonitoringSession).filter_by(project_id=project_id).all()
# Collect all Leq RND files grouped by location
# Only include files with '_Leq_' in the path (15-minute averaged data)
# Exclude Lp files (instantaneous 100ms readings)
location_files = {}
for session in sessions:
files = db.query(DataFile).filter_by(session_id=session.id).all()
for file in files:
if not file.file_path or not file.file_path.lower().endswith('.rnd'):
continue
from pathlib import Path as _Path
abs_path = _Path("data") / file.file_path
peek = _peek_rnd_headers(abs_path)
if _is_leq_file(file.file_path, peek):
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None
location_name = location.name if location else f"Session {session.id[:8]}"
if location_name not in location_files:
location_files[location_name] = []
location_files[location_name].append({
'file': file,
'session': session,
'location': location
})
if not location_files:
raise HTTPException(status_code=404, detail="No Leq measurement files found in project. Reports require Leq data (files with '_Leq_' in the name).")
# Define styles
title_font = Font(name='Arial', bold=True, size=12)
header_font = Font(name='Arial', bold=True, size=10)
data_font = Font(name='Arial', size=10)
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
header_fill = PatternFill(start_color="DAEEF3", end_color="DAEEF3", fill_type="solid")
center_align = Alignment(horizontal='center', vertical='center')
left_align = Alignment(horizontal='left', vertical='center')
right_align = Alignment(horizontal='right', vertical='center')
# Create Excel workbook
wb = openpyxl.Workbook()
# Remove default sheet
wb.remove(wb.active)
# Track all data for summary
all_location_summaries = []
# Create a sheet for each location
for location_name, file_list in location_files.items():
# Sanitize sheet name (max 31 chars, no special chars)
safe_sheet_name = "".join(c for c in location_name if c.isalnum() or c in (' ', '-', '_'))[:31]
ws = wb.create_sheet(title=safe_sheet_name)
# Row 1: Report title
final_title = f"{report_title} - {project.name}"
ws['A1'] = final_title
ws['A1'].font = title_font
ws['A1'].alignment = center_align
ws.merge_cells('A1:G1')
ws.row_dimensions[1].height = 20
# Row 3: Location name
ws['A3'] = location_name
ws['A3'].font = Font(name='Arial', bold=True, size=12)
ws['A3'].alignment = center_align
ws.merge_cells('A3:G3')
ws.row_dimensions[3].height = 20
# Row 7: Headers
headers = ['Test Increment #', 'Date', 'Time', 'LAmax (dBA)', 'LA01 (dBA)', 'LA10 (dBA)', 'Comments']
for col, header in enumerate(headers, 1):
cell = ws.cell(row=7, column=col, value=header)
cell.font = header_font
cell.border = thin_border
cell.fill = header_fill
cell.alignment = center_align
ws.row_dimensions[7].height = 16
# Set column widths — A-G sized to fit one printed page width
column_widths = [12, 11, 9, 11, 11, 11, 20]
for i, width in enumerate(column_widths, 1):
ws.column_dimensions[get_column_letter(i)].width = width
# Combine data from all files for this location
all_rnd_rows = []
for file_info in file_list:
file = file_info['file']
file_path = Path("data") / file.file_path
if not file_path.exists():
continue
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
reader = csv.DictReader(io.StringIO(content))
for row in reader:
cleaned_row = {}
for key, value in row.items():
if key:
cleaned_key = key.strip()
cleaned_value = value.strip() if value else ''
if cleaned_value and cleaned_value not in ['-.-', '-', '']:
try:
cleaned_value = float(cleaned_value)
except ValueError:
pass
elif cleaned_value in ['-.-', '-']:
cleaned_value = None
cleaned_row[cleaned_key] = cleaned_value
all_rnd_rows.append(cleaned_row)
except Exception as e:
logger.warning(f"Error reading file {file.file_path}: {e}")
continue
if not all_rnd_rows:
continue
# Sort by start time
all_rnd_rows.sort(key=lambda r: r.get('Start Time', ''))
# Data rows starting at row 8
data_start_row = 8
for idx, row in enumerate(all_rnd_rows, 1):
data_row = data_start_row + idx - 1
ws.cell(row=data_row, column=1, value=idx).border = thin_border
start_time_str = row.get('Start Time', '')
if start_time_str:
try:
dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S')
ws.cell(row=data_row, column=2, value=dt.date())
ws.cell(row=data_row, column=3, value=dt.time())
except ValueError:
ws.cell(row=data_row, column=2, value=start_time_str)
ws.cell(row=data_row, column=3, value='')
else:
ws.cell(row=data_row, column=2, value='')
ws.cell(row=data_row, column=3, value='')
lmax = row.get('Lmax(Main)')
ws.cell(row=data_row, column=4, value=lmax if lmax else '').border = thin_border
ln1 = row.get('LN1(Main)')
ws.cell(row=data_row, column=5, value=ln1 if ln1 else '').border = thin_border
ln2 = row.get('LN2(Main)')
ws.cell(row=data_row, column=6, value=ln2 if ln2 else '').border = thin_border
ws.cell(row=data_row, column=7, value='').border = thin_border
ws.cell(row=data_row, column=2).border = thin_border
ws.cell(row=data_row, column=3).border = thin_border
data_end_row = data_start_row + len(all_rnd_rows) - 1
# Add Line Chart
chart = LineChart()
chart.title = f"{location_name}"
chart.style = 10
chart.y_axis.title = "Sound Level (dBA)"
chart.x_axis.title = "Time"
chart.height = 18
chart.width = 22
data_ref = Reference(ws, min_col=4, min_row=7, max_col=6, max_row=data_end_row)
categories = Reference(ws, min_col=3, min_row=data_start_row, max_row=data_end_row)
chart.add_data(data_ref, titles_from_data=True)
chart.set_categories(categories)
if len(chart.series) >= 3:
chart.series[0].graphicalProperties.line.solidFill = "FF0000"
chart.series[1].graphicalProperties.line.solidFill = "00B050"
chart.series[2].graphicalProperties.line.solidFill = "0070C0"
ws.add_chart(chart, "H3")
# Print layout: A-G fits one page width, landscape
from openpyxl.worksheet.properties import PageSetupProperties
ws.sheet_properties.pageSetUpPr = PageSetupProperties(fitToPage=True)
ws.page_setup.orientation = 'landscape'
ws.page_setup.fitToWidth = 1
ws.page_setup.fitToHeight = 0
# Calculate summary for this location
all_lmax = [r.get('Lmax(Main)') for r in all_rnd_rows if isinstance(r.get('Lmax(Main)'), (int, float))]
all_ln1 = [r.get('LN1(Main)') for r in all_rnd_rows if isinstance(r.get('LN1(Main)'), (int, float))]
all_ln2 = [r.get('LN2(Main)') for r in all_rnd_rows if isinstance(r.get('LN2(Main)'), (int, float))]
all_location_summaries.append({
'location': location_name,
'samples': len(all_rnd_rows),
'lmax_avg': round(sum(all_lmax) / len(all_lmax), 1) if all_lmax else None,
'ln1_avg': round(sum(all_ln1) / len(all_ln1), 1) if all_ln1 else None,
'ln2_avg': round(sum(all_ln2) / len(all_ln2), 1) if all_ln2 else None,
})
# Create Summary sheet at the beginning
summary_ws = wb.create_sheet(title="Summary", index=0)
summary_ws['A1'] = f"{report_title} - {project.name} - Summary"
summary_ws['A1'].font = title_font
summary_ws.merge_cells('A1:E1')
summary_headers = ['Location', 'Samples', 'LAmax Avg', 'LA01 Avg', 'LA10 Avg']
for col, header in enumerate(summary_headers, 1):
cell = summary_ws.cell(row=3, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.border = thin_border
for i, width in enumerate([30, 10, 12, 12, 12], 1):
summary_ws.column_dimensions[get_column_letter(i)].width = width
for idx, loc_summary in enumerate(all_location_summaries, 4):
summary_ws.cell(row=idx, column=1, value=loc_summary['location']).border = thin_border
summary_ws.cell(row=idx, column=2, value=loc_summary['samples']).border = thin_border
summary_ws.cell(row=idx, column=3, value=loc_summary['lmax_avg'] or '-').border = thin_border
summary_ws.cell(row=idx, column=4, value=loc_summary['ln1_avg'] or '-').border = thin_border
summary_ws.cell(row=idx, column=5, value=loc_summary['ln2_avg'] or '-').border = thin_border
# Save to buffer
output = io.BytesIO()
wb.save(output)
output.seek(0)
# Generate filename
project_name_clean = "".join(c for c in project.name if c.isalnum() or c in ('_', '-', ' ')).strip()
filename = f"{project_name_clean}_combined_report.xlsx"
filename = filename.replace(' ', '_')
return StreamingResponse(
output,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="{filename}"'}
)
# ============================================================================
# Combined Report Wizard — config page, preview page, and generate endpoint
# ============================================================================
@router.get("/{project_id}/combined-report-wizard", response_class=HTMLResponse)
async def combined_report_wizard(
request: Request,
project_id: str,
db: Session = Depends(get_db),
):
"""Configuration page for the combined multi-location report wizard."""
from backend.models import ReportTemplate
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
sessions = db.query(MonitoringSession).filter_by(project_id=project_id).all()
# Build location list with Leq file counts (no filtering)
location_file_counts: dict = {}
for session in sessions:
files = db.query(DataFile).filter_by(session_id=session.id).all()
for file in files:
if not file.file_path or not file.file_path.lower().endswith('.rnd'):
continue
from pathlib import Path as _Path
abs_path = _Path("data") / file.file_path
peek = _peek_rnd_headers(abs_path)
if not _is_leq_file(file.file_path, peek):
continue
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None
loc_name = location.name if location else f"Session {session.id[:8]}"
location_file_counts[loc_name] = location_file_counts.get(loc_name, 0) + 1
locations = [
{"name": name, "file_count": count}
for name, count in sorted(location_file_counts.items())
]
report_templates = db.query(ReportTemplate).all()
return templates.TemplateResponse("combined_report_wizard.html", {
"request": request,
"project": project,
"project_id": project_id,
"locations": locations,
"report_templates": report_templates,
})
@router.get("/{project_id}/combined-report-preview", response_class=HTMLResponse)
async def combined_report_preview(
request: Request,
project_id: str,
report_title: str = Query("Background Noise Study"),
project_name: str = Query(""),
client_name: str = Query(""),
start_time: str = Query(""),
end_time: str = Query(""),
start_date: str = Query(""),
end_date: str = Query(""),
enabled_locations: str = Query(""),
db: Session = Depends(get_db),
):
"""Preview and edit combined report data before generating the Excel file."""
enabled_list = [loc.strip() for loc in enabled_locations.split(',') if loc.strip()] if enabled_locations else None
result = _build_combined_location_data(
project_id, db,
start_time=start_time,
end_time=end_time,
start_date=start_date,
end_date=end_date,
enabled_locations=enabled_list,
)
project = result["project"]
location_data = result["location_data"]
total_rows = sum(loc["filtered_count"] for loc in location_data)
final_project_name = project_name if project_name else project.name
# Build time filter display string
time_filter_desc = ""
if start_time and end_time:
time_filter_desc = f"{start_time} {end_time}"
elif start_time or end_time:
time_filter_desc = f"{start_time or ''} {end_time or ''}"
return templates.TemplateResponse("combined_report_preview.html", {
"request": request,
"project": project,
"project_id": project_id,
"report_title": report_title,
"project_name": final_project_name,
"client_name": client_name,
"start_time": start_time,
"end_time": end_time,
"start_date": start_date,
"end_date": end_date,
"time_filter_desc": time_filter_desc,
"location_data": location_data,
"locations_json": json.dumps(location_data),
"total_rows": total_rows,
})
@router.post("/{project_id}/generate-combined-from-preview")
async def generate_combined_from_preview(
project_id: str,
data: dict,
db: Session = Depends(get_db),
):
"""Generate combined Excel report from wizard-edited spreadsheet data."""
try:
import openpyxl
from openpyxl.chart import LineChart, Reference
from openpyxl.styles import Font, Alignment, Border, Side, PatternFill
from openpyxl.utils import get_column_letter
except ImportError:
raise HTTPException(status_code=500, detail="openpyxl is not installed. Run: pip install openpyxl")
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
report_title = data.get("report_title", "Background Noise Study")
project_name = data.get("project_name", project.name)
client_name = data.get("client_name", "")
locations = data.get("locations", [])
if not locations:
raise HTTPException(status_code=400, detail="No location data provided")
# Styles
title_font = Font(name='Arial', bold=True, size=12)
header_font = Font(name='Arial', bold=True, size=10)
data_font = Font(name='Arial', size=10)
thin_border = Border(
left=Side(style='thin'), right=Side(style='thin'),
top=Side(style='thin'), bottom=Side(style='thin')
)
header_fill = PatternFill(start_color="DAEEF3", end_color="DAEEF3", fill_type="solid")
center_align = Alignment(horizontal='center', vertical='center')
wb = openpyxl.Workbook()
wb.remove(wb.active)
all_location_summaries = []
for loc_info in locations:
loc_name = loc_info.get("location_name", "Unknown")
rows = loc_info.get("spreadsheet_data", [])
if not rows:
continue
safe_sheet_name = "".join(c for c in loc_name if c.isalnum() or c in (' ', '-', '_'))[:31]
ws = wb.create_sheet(title=safe_sheet_name)
# Title row
final_title = f"{report_title} - {project_name}"
ws['A1'] = final_title
ws['A1'].font = title_font
ws['A1'].alignment = center_align
ws.merge_cells('A1:G1')
ws.row_dimensions[1].height = 20
# Client row (row 2) if provided
if client_name:
ws['A2'] = client_name
ws['A2'].font = Font(name='Arial', italic=True, size=10)
ws['A2'].alignment = center_align
ws.merge_cells('A2:G2')
# Location row
ws['A3'] = loc_name
ws['A3'].font = Font(name='Arial', bold=True, size=12)
ws['A3'].alignment = center_align
ws.merge_cells('A3:G3')
ws.row_dimensions[3].height = 20
# Column headers at row 7
headers = ['Test Increment #', 'Date', 'Time', 'LAmax (dBA)', 'LA01 (dBA)', 'LA10 (dBA)', 'Comments']
for col, header in enumerate(headers, 1):
cell = ws.cell(row=7, column=col, value=header)
cell.font = header_font
cell.border = thin_border
cell.fill = header_fill
cell.alignment = center_align
ws.row_dimensions[7].height = 16
column_widths = [12, 11, 9, 11, 11, 11, 20]
for i, width in enumerate(column_widths, 1):
ws.column_dimensions[get_column_letter(i)].width = width
# Data rows starting at row 8
data_start_row = 8
lmax_vals = []
ln1_vals = []
ln2_vals = []
for row_idx, row in enumerate(rows):
data_row = data_start_row + row_idx
# row is [test#, date, time, lmax, ln1, ln2, comment]
test_num = row[0] if len(row) > 0 else row_idx + 1
date_val = row[1] if len(row) > 1 else ''
time_val = row[2] if len(row) > 2 else ''
lmax = row[3] if len(row) > 3 else ''
ln1 = row[4] if len(row) > 4 else ''
ln2 = row[5] if len(row) > 5 else ''
comment = row[6] if len(row) > 6 else ''
ws.cell(row=data_row, column=1, value=test_num).border = thin_border
ws.cell(row=data_row, column=2, value=date_val).border = thin_border
ws.cell(row=data_row, column=3, value=time_val).border = thin_border
ws.cell(row=data_row, column=4, value=lmax if lmax != '' else None).border = thin_border
ws.cell(row=data_row, column=5, value=ln1 if ln1 != '' else None).border = thin_border
ws.cell(row=data_row, column=6, value=ln2 if ln2 != '' else None).border = thin_border
ws.cell(row=data_row, column=7, value=comment).border = thin_border
if isinstance(lmax, (int, float)):
lmax_vals.append(lmax)
if isinstance(ln1, (int, float)):
ln1_vals.append(ln1)
if isinstance(ln2, (int, float)):
ln2_vals.append(ln2)
data_end_row = data_start_row + len(rows) - 1
# Line chart
chart = LineChart()
chart.title = loc_name
chart.style = 10
chart.y_axis.title = "Sound Level (dBA)"
chart.x_axis.title = "Time"
chart.height = 18
chart.width = 22
data_ref = Reference(ws, min_col=4, min_row=7, max_col=6, max_row=data_end_row)
categories = Reference(ws, min_col=3, min_row=data_start_row, max_row=data_end_row)
chart.add_data(data_ref, titles_from_data=True)
chart.set_categories(categories)
if len(chart.series) >= 3:
chart.series[0].graphicalProperties.line.solidFill = "FF0000"
chart.series[1].graphicalProperties.line.solidFill = "00B050"
chart.series[2].graphicalProperties.line.solidFill = "0070C0"
ws.add_chart(chart, "H3")
from openpyxl.worksheet.properties import PageSetupProperties
ws.sheet_properties.pageSetUpPr = PageSetupProperties(fitToPage=True)
ws.page_setup.orientation = 'landscape'
ws.page_setup.fitToWidth = 1
ws.page_setup.fitToHeight = 0
all_location_summaries.append({
'location': loc_name,
'samples': len(rows),
'lmax_avg': round(sum(lmax_vals) / len(lmax_vals), 1) if lmax_vals else None,
'ln1_avg': round(sum(ln1_vals) / len(ln1_vals), 1) if ln1_vals else None,
'ln2_avg': round(sum(ln2_vals) / len(ln2_vals), 1) if ln2_vals else None,
})
# Summary sheet
summary_ws = wb.create_sheet(title="Summary", index=0)
summary_ws['A1'] = f"{report_title} - {project_name} - Summary"
summary_ws['A1'].font = title_font
summary_ws.merge_cells('A1:E1')
summary_headers = ['Location', 'Samples', 'LAmax Avg', 'LA01 Avg', 'LA10 Avg']
for col, header in enumerate(summary_headers, 1):
cell = summary_ws.cell(row=3, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.border = thin_border
for i, width in enumerate([30, 10, 12, 12, 12], 1):
summary_ws.column_dimensions[get_column_letter(i)].width = width
for idx, loc_summary in enumerate(all_location_summaries, 4):
summary_ws.cell(row=idx, column=1, value=loc_summary['location']).border = thin_border
summary_ws.cell(row=idx, column=2, value=loc_summary['samples']).border = thin_border
summary_ws.cell(row=idx, column=3, value=loc_summary['lmax_avg'] or '-').border = thin_border
summary_ws.cell(row=idx, column=4, value=loc_summary['ln1_avg'] or '-').border = thin_border
summary_ws.cell(row=idx, column=5, value=loc_summary['ln2_avg'] or '-').border = thin_border
output = io.BytesIO()
wb.save(output)
output.seek(0)
project_name_clean = "".join(c for c in project_name if c.isalnum() or c in ('_', '-', ' ')).strip()
filename = f"{project_name_clean}_combined_report.xlsx".replace(' ', '_')
return StreamingResponse(
output,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="{filename}"'}
)
# ============================================================================
# Project-level bulk upload (entire date-folder structure)
# ============================================================================
def _bulk_parse_rnh(content: bytes) -> dict:
"""Parse a Rion .rnh metadata file for session start/stop times and device info."""
result = {}
try:
text = content.decode("utf-8", errors="replace")
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("["):
continue
if "," in line:
key, _, value = line.partition(",")
key = key.strip()
value = value.strip()
mapping = {
"Serial Number": "serial_number",
"Store Name": "store_name",
"Index Number": "index_number",
"Measurement Start Time": "start_time_str",
"Measurement Stop Time": "stop_time_str",
"Total Measurement Time": "total_time_str",
}
if key in mapping:
result[mapping[key]] = value
except Exception:
pass
return result
def _bulk_parse_datetime(s: str):
if not s:
return None
try:
return datetime.strptime(s.strip(), "%Y/%m/%d %H:%M:%S")
except Exception:
return None
def _bulk_classify_file(filename: str) -> str:
name = filename.lower()
if name.endswith(".rnh"):
return "log"
if name.endswith(".rnd"):
return "measurement"
if name.endswith(".mp3") or name.endswith(".wav") or name.endswith(".m4a"):
return "audio"
if name.endswith(".xlsx") or name.endswith(".xls") or name.endswith(".csv"):
return "data"
return "data"
# Files we skip entirely — already-converted outputs that don't need re-importing
_BULK_SKIP_EXTENSIONS = {".xlsx", ".xls"}
@router.post("/{project_id}/upload-all")
async def upload_all_project_data(
project_id: str,
request: Request,
db: Session = Depends(get_db),
):
"""
Bulk-import an entire structured data folder selected via webkitdirectory.
Expected folder structure (flexible depth):
[date_folder]/[NRL_name]/[Auto_####]/ ← files here
-- OR --
[NRL_name]/[Auto_####]/ ← files here (no date wrapper)
-- OR --
[date_folder]/[NRL_name]/ ← files directly in NRL folder
Each leaf folder group of .rnd/.rnh files becomes one MonitoringSession.
NRL folder names are matched case-insensitively to MonitoringLocation.name.
.mp3 files are stored as audio. .xlsx/.xls are skipped (already-converted).
Unmatched folders are reported but don't cause failure.
"""
form = await request.form()
# Collect (relative_path, filename, bytes) for every uploaded file.
# The JS sends each file as "files" and its webkitRelativePath as "paths".
from collections import defaultdict
uploaded_files = form.getlist("files")
uploaded_paths = form.getlist("paths")
if not uploaded_files:
raise HTTPException(status_code=400, detail="No files received.")
if len(uploaded_paths) != len(uploaded_files):
# Fallback: use bare filename if paths weren't sent
uploaded_paths = [f.filename for f in uploaded_files]
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Load all sound monitoring locations for this project
locations = db.query(MonitoringLocation).filter_by(
project_id=project_id,
location_type="sound",
).all()
# Build a case-insensitive name → location map
loc_by_name: dict[str, MonitoringLocation] = {
loc.name.strip().lower(): loc for loc in locations
}
def _normalize(s: str) -> str:
"""Lowercase, strip spaces/hyphens/underscores for fuzzy comparison."""
return s.lower().replace(" ", "").replace("-", "").replace("_", "")
# Pre-build normalized keys for fuzzy matching
loc_by_normalized: dict[str, MonitoringLocation] = {
_normalize(loc.name): loc for loc in locations
}
def _find_location_for_path(path: str):
"""
Walk path components from right and return first matching location.
Tries exact match first, then normalized (strips spaces/hyphens/underscores),
then checks if the location name *starts with* the normalized folder name.
e.g. folder "NRL 1" matches location "NRL1 - Test Location"
"""
components = path.replace("\\", "/").split("/")
for comp in reversed(components):
# Exact match
key = comp.strip().lower()
if key in loc_by_name:
return loc_by_name[key]
# Normalized match ("NRL 1" == "NRL1")
norm = _normalize(comp)
if norm in loc_by_normalized:
return loc_by_normalized[norm]
# Prefix match: location name starts with the folder component
# e.g. "NRL1" matches "NRL1 - Test Location"
for loc_norm, loc in loc_by_normalized.items():
if loc_norm.startswith(norm) or norm.startswith(loc_norm):
return loc
return None
def _session_group_key(parts: tuple) -> str:
"""
Determine the grouping key for a file path.
Files inside Auto_####/Auto_Leq/ or Auto_####/Auto_Lp_01/ are collapsed
up to their Auto_#### parent so they all land in the same session.
Only folder components are examined (not the filename, which is parts[-1]).
"""
# Only look at folder components — exclude the filename (last part)
folder_parts = parts[:-1]
auto_idx = None
for i, p in enumerate(folder_parts):
p_lower = p.lower()
if p_lower.startswith("auto_") and not p_lower.startswith("auto_leq") and not p_lower.startswith("auto_lp"):
auto_idx = i
if auto_idx is not None:
# Group key = everything up to and including Auto_####
return "/".join(folder_parts[:auto_idx + 1])
# Fallback: use the immediate parent folder
return "/".join(folder_parts) if folder_parts else ""
# --- Group files by session key ---
groups: dict[str, list[tuple[str, bytes]]] = defaultdict(list)
for rel_path, uf in zip(uploaded_paths, uploaded_files):
rel_path = rel_path.replace("\\", "/").strip("/")
parts = _pathlib.PurePosixPath(rel_path).parts
if not parts:
continue
fname = parts[-1]
# Skip already-converted Excel exports
if _pathlib.PurePosixPath(fname).suffix.lower() in _BULK_SKIP_EXTENSIONS:
continue
group_key = _session_group_key(parts)
data = await uf.read()
groups[group_key].append((fname, data))
# Aggregate by (location_id, date_label) so each Auto_#### group is one session
# key: (location_id or None, group_path)
session_results = []
unmatched_paths = []
total_files = 0
total_sessions = 0
for group_path, file_list in sorted(groups.items()):
matched_loc = _find_location_for_path(group_path)
if matched_loc is None:
unmatched_paths.append(group_path)
continue
# Parse .rnh if present in this group
rnh_meta = {}
for fname, fbytes in file_list:
if fname.lower().endswith(".rnh"):
rnh_meta = _bulk_parse_rnh(fbytes)
break
started_at = _bulk_parse_datetime(rnh_meta.get("start_time_str")) or datetime.utcnow()
stopped_at = _bulk_parse_datetime(rnh_meta.get("stop_time_str"))
duration_seconds = None
if started_at and stopped_at:
duration_seconds = int((stopped_at - started_at).total_seconds())
store_name = rnh_meta.get("store_name", "")
serial_number = rnh_meta.get("serial_number", "")
index_number = rnh_meta.get("index_number", "")
# Detect device model from first RND file in this group (in-memory)
_bulk_device_model = None
for _fname, _fbytes in file_list:
if _fname.lower().endswith(".rnd"):
try:
import csv as _csv_dm, io as _io_dm
_text = _fbytes.decode("utf-8", errors="replace")
_reader = _csv_dm.DictReader(_io_dm.StringIO(_text))
_first = next(_reader, None)
if _first and "LAeq" in _first:
_bulk_device_model = "NL-32"
# NL-43/NL-53 have no distinguishing marker vs each other
# at the format level; leave None for those.
except Exception:
pass
break
session_id = str(uuid.uuid4())
monitoring_session = MonitoringSession(
id=session_id,
project_id=project_id,
location_id=matched_loc.id,
unit_id=None,
session_type="sound",
started_at=started_at,
stopped_at=stopped_at,
duration_seconds=duration_seconds,
status="completed",
device_model=_bulk_device_model,
session_metadata=json.dumps({
"source": "bulk_upload",
"group_path": group_path,
"store_name": store_name,
"serial_number": serial_number,
"index_number": index_number,
}),
)
db.add(monitoring_session)
db.commit()
db.refresh(monitoring_session)
# Write files
output_dir = _pathlib.Path("data/Projects") / project_id / session_id
output_dir.mkdir(parents=True, exist_ok=True)
leq_count = 0
lp_count = 0
group_file_count = 0
for fname, fbytes in file_list:
file_type = _bulk_classify_file(fname)
fname_lower = fname.lower()
if fname_lower.endswith(".rnd"):
if "_leq_" in fname_lower:
leq_count += 1
elif "_lp" in fname_lower:
lp_count += 1
dest = output_dir / fname
dest.write_bytes(fbytes)
checksum = hashlib.sha256(fbytes).hexdigest()
rel_path = str(dest.relative_to("data"))
data_file = DataFile(
id=str(uuid.uuid4()),
session_id=session_id,
file_path=rel_path,
file_type=file_type,
file_size_bytes=len(fbytes),
downloaded_at=datetime.utcnow(),
checksum=checksum,
file_metadata=json.dumps({
"source": "bulk_upload",
"original_filename": fname,
"group_path": group_path,
"store_name": store_name,
}),
)
db.add(data_file)
group_file_count += 1
db.commit()
total_files += group_file_count
total_sessions += 1
session_results.append({
"location_name": matched_loc.name,
"location_id": matched_loc.id,
"session_id": session_id,
"group_path": group_path,
"files": group_file_count,
"leq_files": leq_count,
"lp_files": lp_count,
"store_name": store_name,
"started_at": started_at.isoformat() if started_at else None,
})
return {
"success": True,
"sessions_created": total_sessions,
"files_imported": total_files,
"unmatched_folders": unmatched_paths,
"sessions": session_results,
}
@router.get("/types/list", response_class=HTMLResponse)
async def get_project_types(request: Request, db: Session = Depends(get_db)):
"""
Get all available project types.
Returns HTML partial with project type cards.
"""
project_types = db.query(ProjectType).all()
return templates.TemplateResponse("partials/projects/project_type_cards.html", {
"request": request,
"project_types": project_types,
})