""" Projects Router Provides API endpoints for the Projects system: - Project CRUD operations - Project dashboards - Project statistics - Type-aware features """ from fastapi import APIRouter, Request, Depends, HTTPException, Query from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from sqlalchemy.orm import Session from sqlalchemy import func, and_ from datetime import datetime, timedelta from typing import Optional from collections import OrderedDict import uuid import json import logging import io from backend.utils.timezone import utc_to_local, format_local_datetime from backend.database import get_db from backend.models import ( Project, ProjectType, MonitoringLocation, UnitAssignment, RecordingSession, ScheduledAction, RecurringSchedule, RosterUnit, ) from backend.templates_config import templates router = APIRouter(prefix="/api/projects", tags=["projects"]) logger = logging.getLogger(__name__) # ============================================================================ # Project List & Overview # ============================================================================ @router.get("/list", response_class=HTMLResponse) async def get_projects_list( request: Request, db: Session = Depends(get_db), status: Optional[str] = Query(None), project_type_id: Optional[str] = Query(None), view: Optional[str] = Query(None), ): """ Get list of all projects. Returns HTML partial with project cards. """ query = db.query(Project) # Filter by status if provided if status: query = query.filter(Project.status == status) # Filter by project type if provided if project_type_id: query = query.filter(Project.project_type_id == project_type_id) projects = query.order_by(Project.created_at.desc()).all() # Enrich each project with stats projects_data = [] for project in projects: # Get project type project_type = db.query(ProjectType).filter_by(id=project.project_type_id).first() # Count locations location_count = db.query(func.count(MonitoringLocation.id)).filter_by( project_id=project.id ).scalar() # Count assigned units unit_count = db.query(func.count(UnitAssignment.id)).filter( and_( UnitAssignment.project_id == project.id, UnitAssignment.status == "active", ) ).scalar() # Count active sessions active_session_count = db.query(func.count(RecordingSession.id)).filter( and_( RecordingSession.project_id == project.id, RecordingSession.status == "recording", ) ).scalar() projects_data.append({ "project": project, "project_type": project_type, "location_count": location_count, "unit_count": unit_count, "active_session_count": active_session_count, }) template_name = "partials/projects/project_list.html" if view == "compact": template_name = "partials/projects/project_list_compact.html" return templates.TemplateResponse(template_name, { "request": request, "projects": projects_data, }) @router.get("/stats", response_class=HTMLResponse) async def get_projects_stats(request: Request, db: Session = Depends(get_db)): """ Get summary statistics for projects overview. Returns HTML partial with stat cards. """ # Count projects by status total_projects = db.query(func.count(Project.id)).scalar() active_projects = db.query(func.count(Project.id)).filter_by(status="active").scalar() completed_projects = db.query(func.count(Project.id)).filter_by(status="completed").scalar() # Count total locations across all projects total_locations = db.query(func.count(MonitoringLocation.id)).scalar() # Count assigned units assigned_units = db.query(func.count(UnitAssignment.id)).filter_by( status="active" ).scalar() # Count active recording sessions active_sessions = db.query(func.count(RecordingSession.id)).filter_by( status="recording" ).scalar() return templates.TemplateResponse("partials/projects/project_stats.html", { "request": request, "total_projects": total_projects, "active_projects": active_projects, "completed_projects": completed_projects, "total_locations": total_locations, "assigned_units": assigned_units, "active_sessions": active_sessions, }) # ============================================================================ # Project CRUD # ============================================================================ @router.post("/create") async def create_project(request: Request, db: Session = Depends(get_db)): """ Create a new project. Expects form data with project details. """ form_data = await request.form() project = Project( id=str(uuid.uuid4()), name=form_data.get("name"), description=form_data.get("description"), project_type_id=form_data.get("project_type_id"), status="active", client_name=form_data.get("client_name"), site_address=form_data.get("site_address"), site_coordinates=form_data.get("site_coordinates"), start_date=datetime.fromisoformat(form_data.get("start_date")) if form_data.get("start_date") else None, end_date=datetime.fromisoformat(form_data.get("end_date")) if form_data.get("end_date") else None, ) db.add(project) db.commit() db.refresh(project) return JSONResponse({ "success": True, "project_id": project.id, "message": f"Project '{project.name}' created successfully", }) @router.get("/{project_id}") async def get_project(project_id: str, db: Session = Depends(get_db)): """ Get project details by ID. Returns JSON with full project data. """ project = db.query(Project).filter_by(id=project_id).first() if not project: raise HTTPException(status_code=404, detail="Project not found") project_type = db.query(ProjectType).filter_by(id=project.project_type_id).first() return { "id": project.id, "name": project.name, "description": project.description, "project_type_id": project.project_type_id, "project_type_name": project_type.name if project_type else None, "status": project.status, "client_name": project.client_name, "site_address": project.site_address, "site_coordinates": project.site_coordinates, "start_date": project.start_date.isoformat() if project.start_date else None, "end_date": project.end_date.isoformat() if project.end_date else None, "created_at": project.created_at.isoformat(), "updated_at": project.updated_at.isoformat(), } @router.put("/{project_id}") async def update_project( project_id: str, request: Request, db: Session = Depends(get_db), ): """ Update project details. Expects JSON body with fields to update. """ project = db.query(Project).filter_by(id=project_id).first() if not project: raise HTTPException(status_code=404, detail="Project not found") data = await request.json() # Update fields if provided if "name" in data: project.name = data["name"] if "description" in data: project.description = data["description"] if "status" in data: project.status = data["status"] if "client_name" in data: project.client_name = data["client_name"] if "site_address" in data: project.site_address = data["site_address"] if "site_coordinates" in data: project.site_coordinates = data["site_coordinates"] if "start_date" in data: project.start_date = datetime.fromisoformat(data["start_date"]) if data["start_date"] else None if "end_date" in data: project.end_date = datetime.fromisoformat(data["end_date"]) if data["end_date"] else None project.updated_at = datetime.utcnow() db.commit() return {"success": True, "message": "Project updated successfully"} @router.delete("/{project_id}") async def delete_project(project_id: str, db: Session = Depends(get_db)): """ Delete a project (soft delete by archiving). """ project = db.query(Project).filter_by(id=project_id).first() if not project: raise HTTPException(status_code=404, detail="Project not found") project.status = "archived" project.updated_at = datetime.utcnow() db.commit() return {"success": True, "message": "Project archived successfully"} # ============================================================================ # Project Dashboard Data # ============================================================================ @router.get("/{project_id}/dashboard", response_class=HTMLResponse) async def get_project_dashboard( project_id: str, request: Request, db: Session = Depends(get_db), ): """ Get project dashboard data. Returns HTML partial with project summary. """ project = db.query(Project).filter_by(id=project_id).first() if not project: raise HTTPException(status_code=404, detail="Project not found") project_type = db.query(ProjectType).filter_by(id=project.project_type_id).first() # Get locations locations = db.query(MonitoringLocation).filter_by(project_id=project_id).all() # Get assigned units with details assignments = db.query(UnitAssignment).filter( and_( UnitAssignment.project_id == project_id, UnitAssignment.status == "active", ) ).all() assigned_units = [] for assignment in assignments: unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first() if unit: assigned_units.append({ "assignment": assignment, "unit": unit, }) # Get active recording sessions active_sessions = db.query(RecordingSession).filter( and_( RecordingSession.project_id == project_id, RecordingSession.status == "recording", ) ).all() # Get completed sessions count completed_sessions_count = db.query(func.count(RecordingSession.id)).filter( and_( RecordingSession.project_id == project_id, RecordingSession.status == "completed", ) ).scalar() # Get upcoming scheduled actions upcoming_actions = db.query(ScheduledAction).filter( and_( ScheduledAction.project_id == project_id, ScheduledAction.execution_status == "pending", ScheduledAction.scheduled_time > datetime.utcnow(), ) ).order_by(ScheduledAction.scheduled_time).limit(5).all() return templates.TemplateResponse("partials/projects/project_dashboard.html", { "request": request, "project": project, "project_type": project_type, "locations": locations, "assigned_units": assigned_units, "active_sessions": active_sessions, "completed_sessions_count": completed_sessions_count, "upcoming_actions": upcoming_actions, }) # ============================================================================ # Project Types # ============================================================================ @router.get("/{project_id}/header", response_class=HTMLResponse) async def get_project_header( project_id: str, request: Request, db: Session = Depends(get_db) ): """ Get project header information for dynamic display. Returns HTML partial with project name, status, and type. """ project = db.query(Project).filter_by(id=project_id).first() if not project: raise HTTPException(status_code=404, detail="Project not found") project_type = db.query(ProjectType).filter_by(id=project.project_type_id).first() return templates.TemplateResponse("partials/projects/project_header.html", { "request": request, "project": project, "project_type": project_type, }) @router.get("/{project_id}/units", response_class=HTMLResponse) async def get_project_units( project_id: str, request: Request, db: Session = Depends(get_db), ): """ Get all units assigned to this project's locations. Returns HTML partial with unit list. """ from backend.models import DataFile # Get all assignments for this project assignments = db.query(UnitAssignment).filter( and_( UnitAssignment.project_id == project_id, UnitAssignment.status == "active", ) ).all() # Enrich with unit and location details units_data = [] for assignment in assignments: unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first() location = db.query(MonitoringLocation).filter_by(id=assignment.location_id).first() # Count sessions for this assignment session_count = db.query(func.count(RecordingSession.id)).filter_by( location_id=assignment.location_id, unit_id=assignment.unit_id, ).scalar() # Count files from sessions file_count = db.query(func.count(DataFile.id)).join( RecordingSession, DataFile.session_id == RecordingSession.id ).filter( RecordingSession.location_id == assignment.location_id, RecordingSession.unit_id == assignment.unit_id, ).scalar() # Check if currently recording active_session = db.query(RecordingSession).filter( and_( RecordingSession.location_id == assignment.location_id, RecordingSession.unit_id == assignment.unit_id, RecordingSession.status == "recording", ) ).first() units_data.append({ "assignment": assignment, "unit": unit, "location": location, "session_count": session_count, "file_count": file_count, "active_session": active_session, }) # Get project type for label context project = db.query(Project).filter_by(id=project_id).first() project_type = db.query(ProjectType).filter_by(id=project.project_type_id).first() if project else None return templates.TemplateResponse("partials/projects/unit_list.html", { "request": request, "project_id": project_id, "units": units_data, "project_type": project_type, }) @router.get("/{project_id}/schedules", response_class=HTMLResponse) async def get_project_schedules( project_id: str, request: Request, db: Session = Depends(get_db), status: Optional[str] = Query(None), ): """ Get scheduled actions for this project. Returns HTML partial with schedule list. Optional status filter: pending, completed, failed, cancelled """ query = db.query(ScheduledAction).filter_by(project_id=project_id) # Filter by status if provided if status: query = query.filter(ScheduledAction.execution_status == status) # For pending actions, show soonest first (ascending) # For completed/failed, show most recent first (descending) if status == "pending": schedules = query.order_by(ScheduledAction.scheduled_time.asc()).all() else: schedules = query.order_by(ScheduledAction.scheduled_time.desc()).all() # Enrich with location details and group by date schedules_by_date = OrderedDict() for schedule in schedules: location = None if schedule.location_id: location = db.query(MonitoringLocation).filter_by(id=schedule.location_id).first() # Get local date for grouping if schedule.scheduled_time: local_dt = utc_to_local(schedule.scheduled_time) date_key = local_dt.strftime("%Y-%m-%d") date_display = local_dt.strftime("%A, %B %d, %Y") # "Wednesday, January 22, 2026" else: date_key = "unknown" date_display = "Unknown Date" if date_key not in schedules_by_date: schedules_by_date[date_key] = { "date_display": date_display, "date_key": date_key, "actions": [], } schedules_by_date[date_key]["actions"].append({ "schedule": schedule, "location": location, }) return templates.TemplateResponse("partials/projects/schedule_list.html", { "request": request, "project_id": project_id, "schedules_by_date": schedules_by_date, }) @router.post("/{project_id}/schedules/{schedule_id}/execute") async def execute_scheduled_action( project_id: str, schedule_id: str, db: Session = Depends(get_db), ): """ Manually execute a scheduled action now. """ from backend.services.scheduler import get_scheduler action = db.query(ScheduledAction).filter_by( id=schedule_id, project_id=project_id, ).first() if not action: raise HTTPException(status_code=404, detail="Action not found") if action.execution_status != "pending": raise HTTPException( status_code=400, detail=f"Action is not pending (status: {action.execution_status})", ) # Execute via scheduler service scheduler = get_scheduler() result = await scheduler.execute_action_by_id(schedule_id) # Refresh from DB to get updated status db.refresh(action) return JSONResponse({ "success": result.get("success", False), "message": f"Action executed: {action.action_type}", "result": result, "action": { "id": action.id, "execution_status": action.execution_status, "executed_at": action.executed_at.isoformat() if action.executed_at else None, "error_message": action.error_message, }, }) @router.post("/{project_id}/schedules/{schedule_id}/cancel") async def cancel_scheduled_action( project_id: str, schedule_id: str, db: Session = Depends(get_db), ): """ Cancel a pending scheduled action. """ action = db.query(ScheduledAction).filter_by( id=schedule_id, project_id=project_id, ).first() if not action: raise HTTPException(status_code=404, detail="Action not found") if action.execution_status != "pending": raise HTTPException( status_code=400, detail=f"Can only cancel pending actions (status: {action.execution_status})", ) action.execution_status = "cancelled" db.commit() return JSONResponse({ "success": True, "message": "Action cancelled successfully", }) @router.get("/{project_id}/sessions", response_class=HTMLResponse) async def get_project_sessions( project_id: str, request: Request, db: Session = Depends(get_db), status: Optional[str] = Query(None), ): """ Get all recording sessions for this project. Returns HTML partial with session list. Optional status filter: recording, completed, paused, failed """ query = db.query(RecordingSession).filter_by(project_id=project_id) # Filter by status if provided if status: query = query.filter(RecordingSession.status == status) sessions = query.order_by(RecordingSession.started_at.desc()).all() # Enrich with unit and location details sessions_data = [] for session in sessions: unit = None location = None if session.unit_id: unit = db.query(RosterUnit).filter_by(id=session.unit_id).first() if session.location_id: location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() sessions_data.append({ "session": session, "unit": unit, "location": location, }) return templates.TemplateResponse("partials/projects/session_list.html", { "request": request, "project_id": project_id, "sessions": sessions_data, }) @router.get("/{project_id}/ftp-browser", response_class=HTMLResponse) async def get_ftp_browser( project_id: str, request: Request, db: Session = Depends(get_db), ): """ Get FTP browser interface for downloading files from assigned SLMs. Returns HTML partial with FTP browser. """ from backend.models import DataFile # Get all assignments for this project assignments = db.query(UnitAssignment).filter( and_( UnitAssignment.project_id == project_id, UnitAssignment.status == "active", ) ).all() # Enrich with unit and location details units_data = [] for assignment in assignments: unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first() location = db.query(MonitoringLocation).filter_by(id=assignment.location_id).first() # Only include SLM units if unit and unit.device_type == "slm": units_data.append({ "assignment": assignment, "unit": unit, "location": location, }) return templates.TemplateResponse("partials/projects/ftp_browser.html", { "request": request, "project_id": project_id, "units": units_data, }) @router.post("/{project_id}/ftp-download-to-server") async def ftp_download_to_server( project_id: str, request: Request, db: Session = Depends(get_db), ): """ Download a file from an SLM to the server via FTP. Creates a DataFile record and stores the file in data/Projects/{project_id}/ """ import httpx import os import hashlib from pathlib import Path from backend.models import DataFile data = await request.json() unit_id = data.get("unit_id") remote_path = data.get("remote_path") location_id = data.get("location_id") if not unit_id or not remote_path: raise HTTPException(status_code=400, detail="Missing unit_id or remote_path") # Get or create active session for this location/unit session = db.query(RecordingSession).filter( and_( RecordingSession.project_id == project_id, RecordingSession.location_id == location_id, RecordingSession.unit_id == unit_id, RecordingSession.status.in_(["recording", "paused"]) ) ).first() # If no active session, create one if not session: session = RecordingSession( id=str(uuid.uuid4()), project_id=project_id, location_id=location_id, unit_id=unit_id, session_type="sound", # SLMs are sound monitoring devices status="completed", started_at=datetime.utcnow(), stopped_at=datetime.utcnow(), session_metadata='{"source": "ftp_download", "note": "Auto-created for FTP download"}' ) db.add(session) db.commit() db.refresh(session) # Download file from SLMM SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100") try: async with httpx.AsyncClient(timeout=300.0) as client: response = await client.post( f"{SLMM_BASE_URL}/api/nl43/{unit_id}/ftp/download", json={"remote_path": remote_path} ) if not response.is_success: raise HTTPException( status_code=response.status_code, detail=f"Failed to download from SLMM: {response.text}" ) # Extract filename from remote_path filename = os.path.basename(remote_path) # Determine file type from extension ext = os.path.splitext(filename)[1].lower() file_type_map = { # Audio files '.wav': 'audio', '.mp3': 'audio', '.flac': 'audio', '.m4a': 'audio', '.aac': 'audio', # Sound level meter measurement files '.rnd': 'measurement', # Data files '.csv': 'data', '.txt': 'data', '.json': 'data', '.xml': 'data', '.dat': 'data', # Log files '.log': 'log', # Archives '.zip': 'archive', '.tar': 'archive', '.gz': 'archive', '.7z': 'archive', '.rar': 'archive', # Images '.jpg': 'image', '.jpeg': 'image', '.png': 'image', '.gif': 'image', # Documents '.pdf': 'document', '.doc': 'document', '.docx': 'document', } file_type = file_type_map.get(ext, 'data') # Create directory structure: data/Projects/{project_id}/{session_id}/ project_dir = Path(f"data/Projects/{project_id}/{session.id}") project_dir.mkdir(parents=True, exist_ok=True) # Save file to disk file_path = project_dir / filename file_content = response.content with open(file_path, 'wb') as f: f.write(file_content) # Calculate checksum checksum = hashlib.sha256(file_content).hexdigest() # Create DataFile record data_file = DataFile( id=str(uuid.uuid4()), session_id=session.id, file_path=str(file_path.relative_to("data")), # Store relative to data/ file_type=file_type, file_size_bytes=len(file_content), downloaded_at=datetime.utcnow(), checksum=checksum, file_metadata=json.dumps({ "source": "ftp", "remote_path": remote_path, "unit_id": unit_id, "location_id": location_id, }) ) db.add(data_file) db.commit() return { "success": True, "message": f"Downloaded {filename} to server", "file_id": data_file.id, "file_path": str(file_path), "file_size": len(file_content), } except httpx.TimeoutException: raise HTTPException( status_code=504, detail="Timeout downloading file from SLM" ) except Exception as e: logger.error(f"Error downloading file to server: {e}") raise HTTPException( status_code=500, detail=f"Failed to download file to server: {str(e)}" ) @router.post("/{project_id}/ftp-download-folder-to-server") async def ftp_download_folder_to_server( project_id: str, request: Request, db: Session = Depends(get_db), ): """ Download an entire folder from an SLM to the server via FTP. Extracts all files from the ZIP and preserves folder structure. Creates individual DataFile records for each file. """ import httpx import os import hashlib import zipfile import io from pathlib import Path from backend.models import DataFile data = await request.json() unit_id = data.get("unit_id") remote_path = data.get("remote_path") location_id = data.get("location_id") if not unit_id or not remote_path: raise HTTPException(status_code=400, detail="Missing unit_id or remote_path") # Get or create active session for this location/unit session = db.query(RecordingSession).filter( and_( RecordingSession.project_id == project_id, RecordingSession.location_id == location_id, RecordingSession.unit_id == unit_id, RecordingSession.status.in_(["recording", "paused"]) ) ).first() # If no active session, create one if not session: session = RecordingSession( id=str(uuid.uuid4()), project_id=project_id, location_id=location_id, unit_id=unit_id, session_type="sound", # SLMs are sound monitoring devices status="completed", started_at=datetime.utcnow(), stopped_at=datetime.utcnow(), session_metadata='{"source": "ftp_folder_download", "note": "Auto-created for FTP folder download"}' ) db.add(session) db.commit() db.refresh(session) # Download folder from SLMM (returns ZIP) SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100") try: async with httpx.AsyncClient(timeout=600.0) as client: # Longer timeout for folders response = await client.post( f"{SLMM_BASE_URL}/api/nl43/{unit_id}/ftp/download-folder", json={"remote_path": remote_path} ) if not response.is_success: raise HTTPException( status_code=response.status_code, detail=f"Failed to download folder from SLMM: {response.text}" ) # Extract folder name from remote_path folder_name = os.path.basename(remote_path.rstrip('/')) # Create base directory: data/Projects/{project_id}/{session_id}/{folder_name}/ base_dir = Path(f"data/Projects/{project_id}/{session.id}/{folder_name}") base_dir.mkdir(parents=True, exist_ok=True) # Extract ZIP and save individual files zip_content = response.content created_files = [] total_size = 0 # File type mapping for classification file_type_map = { # Audio files '.wav': 'audio', '.mp3': 'audio', '.flac': 'audio', '.m4a': 'audio', '.aac': 'audio', # Data files '.csv': 'data', '.txt': 'data', '.json': 'data', '.xml': 'data', '.dat': 'data', # Log files '.log': 'log', # Archives '.zip': 'archive', '.tar': 'archive', '.gz': 'archive', '.7z': 'archive', '.rar': 'archive', # Images '.jpg': 'image', '.jpeg': 'image', '.png': 'image', '.gif': 'image', # Documents '.pdf': 'document', '.doc': 'document', '.docx': 'document', } with zipfile.ZipFile(io.BytesIO(zip_content)) as zf: for zip_info in zf.filelist: # Skip directories if zip_info.is_dir(): continue # Read file from ZIP file_data = zf.read(zip_info.filename) # Determine file path (preserve structure within folder) # zip_info.filename might be like "Auto_0001/measurement.wav" file_path = base_dir / zip_info.filename file_path.parent.mkdir(parents=True, exist_ok=True) # Write file to disk with open(file_path, 'wb') as f: f.write(file_data) # Calculate checksum checksum = hashlib.sha256(file_data).hexdigest() # Determine file type ext = os.path.splitext(zip_info.filename)[1].lower() file_type = file_type_map.get(ext, 'data') # Create DataFile record data_file = DataFile( id=str(uuid.uuid4()), session_id=session.id, file_path=str(file_path.relative_to("data")), file_type=file_type, file_size_bytes=len(file_data), downloaded_at=datetime.utcnow(), checksum=checksum, file_metadata=json.dumps({ "source": "ftp_folder", "remote_path": remote_path, "unit_id": unit_id, "location_id": location_id, "folder_name": folder_name, "relative_path": zip_info.filename, }) ) db.add(data_file) created_files.append({ "filename": zip_info.filename, "size": len(file_data), "type": file_type }) total_size += len(file_data) db.commit() return { "success": True, "message": f"Downloaded folder {folder_name} with {len(created_files)} files", "folder_name": folder_name, "file_count": len(created_files), "total_size": total_size, "files": created_files, } except httpx.TimeoutException: raise HTTPException( status_code=504, detail="Timeout downloading folder from SLM (large folders may take a while)" ) except zipfile.BadZipFile: raise HTTPException( status_code=500, detail="Downloaded file is not a valid ZIP archive" ) except Exception as e: logger.error(f"Error downloading folder to server: {e}") raise HTTPException( status_code=500, detail=f"Failed to download folder to server: {str(e)}" ) # ============================================================================ # Project Types # ============================================================================ @router.get("/{project_id}/files-unified", response_class=HTMLResponse) async def get_unified_files( project_id: str, request: Request, db: Session = Depends(get_db), ): """ Get unified view of all files in this project. Groups files by recording session with full metadata. Returns HTML partial with hierarchical file listing. """ from backend.models import DataFile from pathlib import Path import json # Get all sessions for this project sessions = db.query(RecordingSession).filter_by( project_id=project_id ).order_by(RecordingSession.started_at.desc()).all() sessions_data = [] for session in sessions: # Get files for this session files = db.query(DataFile).filter_by(session_id=session.id).all() # Skip sessions with no files if not files: continue # Get session context unit = None location = None if session.unit_id: unit = db.query(RosterUnit).filter_by(id=session.unit_id).first() if session.location_id: location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() files_data = [] for file in files: # Check if file exists on disk file_path = Path("data") / file.file_path exists_on_disk = file_path.exists() # Get actual file size if exists actual_size = file_path.stat().st_size if exists_on_disk else None # Parse metadata JSON metadata = {} try: if file.file_metadata: metadata = json.loads(file.file_metadata) except Exception as e: logger.warning(f"Failed to parse metadata for file {file.id}: {e}") files_data.append({ "file": file, "exists_on_disk": exists_on_disk, "actual_size": actual_size, "metadata": metadata, }) sessions_data.append({ "session": session, "unit": unit, "location": location, "files": files_data, }) return templates.TemplateResponse("partials/projects/unified_files.html", { "request": request, "project_id": project_id, "sessions": sessions_data, }) @router.get("/{project_id}/files/{file_id}/download") async def download_project_file( project_id: str, file_id: str, db: Session = Depends(get_db), ): """ Download a data file from a project. Returns the file for download. """ from backend.models import DataFile from fastapi.responses import FileResponse from pathlib import Path # Get the file record file_record = db.query(DataFile).filter_by(id=file_id).first() if not file_record: raise HTTPException(status_code=404, detail="File not found") # Verify file belongs to this project session = db.query(RecordingSession).filter_by(id=file_record.session_id).first() if not session or session.project_id != project_id: raise HTTPException(status_code=403, detail="File does not belong to this project") # Build full file path file_path = Path("data") / file_record.file_path if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found on disk") # Extract filename for download filename = file_path.name return FileResponse( path=str(file_path), filename=filename, media_type="application/octet-stream" ) @router.get("/{project_id}/sessions/{session_id}/download-all") async def download_session_files( project_id: str, session_id: str, db: Session = Depends(get_db), ): """ Download all files from a session as a single zip archive. """ from backend.models import DataFile from pathlib import Path import zipfile # Verify session belongs to this project session = db.query(RecordingSession).filter_by(id=session_id).first() if not session: raise HTTPException(status_code=404, detail="Session not found") if session.project_id != project_id: raise HTTPException(status_code=403, detail="Session does not belong to this project") # Get all files for this session files = db.query(DataFile).filter_by(session_id=session_id).all() if not files: raise HTTPException(status_code=404, detail="No files found in this session") # Create zip in memory zip_buffer = io.BytesIO() # Get session info for folder naming session_date = session.started_at.strftime('%Y-%m-%d_%H%M') if session.started_at else 'unknown' # Get unit and location for naming unit = db.query(RosterUnit).filter_by(id=session.unit_id).first() if session.unit_id else None location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None unit_name = unit.id if unit else "unknown_unit" location_name = location.name.replace(" ", "_") if location else "" # Build folder name for zip contents folder_name = f"{session_date}_{unit_name}" if location_name: folder_name += f"_{location_name}" with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: for file_record in files: file_path = Path("data") / file_record.file_path if file_path.exists(): # Add file to zip with folder structure arcname = f"{folder_name}/{file_path.name}" zip_file.write(file_path, arcname) zip_buffer.seek(0) # Generate filename for the zip zip_filename = f"{folder_name}.zip" return StreamingResponse( zip_buffer, media_type="application/zip", headers={"Content-Disposition": f"attachment; filename={zip_filename}"} ) @router.delete("/{project_id}/files/{file_id}") async def delete_project_file( project_id: str, file_id: str, db: Session = Depends(get_db), ): """ Delete a single data file from a project. Removes both the database record and the file on disk. """ from backend.models import DataFile from pathlib import Path # Get the file record file_record = db.query(DataFile).filter_by(id=file_id).first() if not file_record: raise HTTPException(status_code=404, detail="File not found") # Verify file belongs to this project session = db.query(RecordingSession).filter_by(id=file_record.session_id).first() if not session or session.project_id != project_id: raise HTTPException(status_code=403, detail="File does not belong to this project") # Delete file from disk if it exists file_path = Path("data") / file_record.file_path if file_path.exists(): file_path.unlink() # Delete database record db.delete(file_record) db.commit() return JSONResponse({"status": "success", "message": "File deleted"}) @router.delete("/{project_id}/sessions/{session_id}") async def delete_session( project_id: str, session_id: str, db: Session = Depends(get_db), ): """ Delete an entire session and all its files. Removes database records and files on disk. """ from backend.models import DataFile from pathlib import Path # Verify session belongs to this project session = db.query(RecordingSession).filter_by(id=session_id).first() if not session: raise HTTPException(status_code=404, detail="Session not found") if session.project_id != project_id: raise HTTPException(status_code=403, detail="Session does not belong to this project") # Get all files for this session files = db.query(DataFile).filter_by(session_id=session_id).all() # Delete files from disk deleted_count = 0 for file_record in files: file_path = Path("data") / file_record.file_path if file_path.exists(): file_path.unlink() deleted_count += 1 # Delete database record db.delete(file_record) # Delete the session record db.delete(session) db.commit() return JSONResponse({ "status": "success", "message": f"Session and {deleted_count} file(s) deleted" }) @router.get("/{project_id}/files/{file_id}/view-rnd", response_class=HTMLResponse) async def view_rnd_file( request: Request, project_id: str, file_id: str, db: Session = Depends(get_db), ): """ View an RND (sound level meter measurement) file. Returns a dedicated page with data table and charts. """ from backend.models import DataFile from pathlib import Path # Get the file record file_record = db.query(DataFile).filter_by(id=file_id).first() if not file_record: raise HTTPException(status_code=404, detail="File not found") # Verify file belongs to this project session = db.query(RecordingSession).filter_by(id=file_record.session_id).first() if not session or session.project_id != project_id: raise HTTPException(status_code=403, detail="File does not belong to this project") # Build full file path file_path = Path("data") / file_record.file_path if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found on disk") # Get project info project = db.query(Project).filter_by(id=project_id).first() # Get location info if available location = None if session.location_id: location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() # Get unit info if available unit = None if session.unit_id: unit = db.query(RosterUnit).filter_by(id=session.unit_id).first() # Parse file metadata metadata = {} if file_record.file_metadata: try: metadata = json.loads(file_record.file_metadata) except json.JSONDecodeError: pass return templates.TemplateResponse("rnd_viewer.html", { "request": request, "project": project, "project_id": project_id, "file": file_record, "file_id": file_id, "session": session, "location": location, "unit": unit, "metadata": metadata, "filename": file_path.name, }) @router.get("/{project_id}/files/{file_id}/rnd-data") async def get_rnd_data( project_id: str, file_id: str, db: Session = Depends(get_db), ): """ Get parsed RND file data as JSON. Returns the measurement data for charts and tables. """ from backend.models import DataFile from pathlib import Path import csv import io # Get the file record file_record = db.query(DataFile).filter_by(id=file_id).first() if not file_record: raise HTTPException(status_code=404, detail="File not found") # Verify file belongs to this project session = db.query(RecordingSession).filter_by(id=file_record.session_id).first() if not session or session.project_id != project_id: raise HTTPException(status_code=403, detail="File does not belong to this project") # Build full file path file_path = Path("data") / file_record.file_path if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found on disk") # Read and parse the RND file try: with open(file_path, 'r', encoding='utf-8', errors='replace') as f: content = f.read() # Parse as CSV reader = csv.DictReader(io.StringIO(content)) rows = [] headers = [] for row in reader: if not headers: headers = list(row.keys()) # Clean up values - strip whitespace and handle special values cleaned_row = {} for key, value in row.items(): if key: # Skip empty keys cleaned_key = key.strip() cleaned_value = value.strip() if value else '' # Convert numeric values if cleaned_value and cleaned_value not in ['-.-', '-', '']: try: cleaned_value = float(cleaned_value) except ValueError: pass elif cleaned_value in ['-.-', '-']: cleaned_value = None cleaned_row[cleaned_key] = cleaned_value rows.append(cleaned_row) # Detect file type (Leq vs Lp) based on columns file_type = 'unknown' if headers: header_str = ','.join(headers).lower() if 'leq' in header_str: file_type = 'leq' # Time-averaged data elif 'lp(main)' in header_str or 'lp (main)' in header_str: file_type = 'lp' # Instantaneous data # Get summary statistics summary = { "total_rows": len(rows), "file_type": file_type, "headers": [h.strip() for h in headers if h.strip()], } # Calculate min/max/avg for key metrics if available metrics_to_summarize = ['Leq(Main)', 'Lmax(Main)', 'Lmin(Main)', 'Lpeak(Main)', 'Lp(Main)'] for metric in metrics_to_summarize: values = [row.get(metric) for row in rows if isinstance(row.get(metric), (int, float))] if values: summary[f"{metric}_min"] = min(values) summary[f"{metric}_max"] = max(values) summary[f"{metric}_avg"] = sum(values) / len(values) # Get time range if rows: first_time = rows[0].get('Start Time', '') last_time = rows[-1].get('Start Time', '') summary['time_start'] = first_time summary['time_end'] = last_time return { "success": True, "summary": summary, "headers": summary["headers"], "data": rows, } except Exception as e: logger.error(f"Error parsing RND file: {e}") raise HTTPException(status_code=500, detail=f"Error parsing file: {str(e)}") @router.get("/{project_id}/files/{file_id}/generate-report") async def generate_excel_report( project_id: str, file_id: str, report_title: str = Query("Background Noise Study", description="Title for the report"), location_name: str = Query("", description="Location name (e.g., 'NRL 1 - West Side')"), project_name: str = Query("", description="Project name override"), client_name: str = Query("", description="Client name for report header"), start_time: str = Query("", description="Filter start time (HH:MM format, e.g., '19:00')"), end_time: str = Query("", description="Filter end time (HH:MM format, e.g., '07:00')"), start_date: str = Query("", description="Filter start date (YYYY-MM-DD format)"), end_date: str = Query("", description="Filter end date (YYYY-MM-DD format)"), db: Session = Depends(get_db), ): """ Generate an Excel report from an RND file. Creates a formatted Excel workbook with: - Title and location headers - Data table (Test #, Date, Time, LAmax, LA01, LA10, Comments) - Line chart visualization - Time period summary statistics Time filtering: - start_time/end_time: Filter to time window (handles overnight like 19:00-07:00) - start_date/end_date: Filter to date range Column mapping from RND to Report: - Lmax(Main) -> LAmax (dBA) - LN1(Main) -> LA01 (dBA) [L1 percentile] - LN2(Main) -> LA10 (dBA) [L10 percentile] """ from backend.models import DataFile from pathlib import Path import csv try: import openpyxl from openpyxl.chart import LineChart, Reference from openpyxl.chart.label import DataLabelList from openpyxl.styles import Font, Alignment, Border, Side, PatternFill from openpyxl.utils import get_column_letter except ImportError: raise HTTPException( status_code=500, detail="openpyxl is not installed. Run: pip install openpyxl" ) # Get the file record file_record = db.query(DataFile).filter_by(id=file_id).first() if not file_record: raise HTTPException(status_code=404, detail="File not found") # Verify file belongs to this project session = db.query(RecordingSession).filter_by(id=file_record.session_id).first() if not session or session.project_id != project_id: raise HTTPException(status_code=403, detail="File does not belong to this project") # Get related data for report context project = db.query(Project).filter_by(id=project_id).first() location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None # Build full file path file_path = Path("data") / file_record.file_path if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found on disk") # Validate this is a Leq file (contains '_Leq_' in path) # Lp files (instantaneous 100ms readings) don't have the LN percentile data needed for reports if '_Leq_' not in file_record.file_path: raise HTTPException( status_code=400, detail="Reports can only be generated from Leq files (15-minute averaged data). This appears to be an Lp (instantaneous) file." ) # Read and parse the Leq RND file try: with open(file_path, 'r', encoding='utf-8', errors='replace') as f: content = f.read() reader = csv.DictReader(io.StringIO(content)) rnd_rows = [] for row in reader: cleaned_row = {} for key, value in row.items(): if key: cleaned_key = key.strip() cleaned_value = value.strip() if value else '' if cleaned_value and cleaned_value not in ['-.-', '-', '']: try: cleaned_value = float(cleaned_value) except ValueError: pass elif cleaned_value in ['-.-', '-']: cleaned_value = None cleaned_row[cleaned_key] = cleaned_value rnd_rows.append(cleaned_row) if not rnd_rows: raise HTTPException(status_code=400, detail="No data found in RND file") except Exception as e: logger.error(f"Error reading RND file: {e}") raise HTTPException(status_code=500, detail=f"Error reading file: {str(e)}") # Apply time and date filtering def filter_rows_by_time(rows, filter_start_time, filter_end_time, filter_start_date, filter_end_date): """Filter rows by time window and date range.""" if not filter_start_time and not filter_end_time and not filter_start_date and not filter_end_date: return rows filtered = [] # Parse time filters start_hour = start_minute = end_hour = end_minute = None if filter_start_time: try: parts = filter_start_time.split(':') start_hour = int(parts[0]) start_minute = int(parts[1]) if len(parts) > 1 else 0 except (ValueError, IndexError): pass if filter_end_time: try: parts = filter_end_time.split(':') end_hour = int(parts[0]) end_minute = int(parts[1]) if len(parts) > 1 else 0 except (ValueError, IndexError): pass # Parse date filters start_dt = end_dt = None if filter_start_date: try: start_dt = datetime.strptime(filter_start_date, '%Y-%m-%d').date() except ValueError: pass if filter_end_date: try: end_dt = datetime.strptime(filter_end_date, '%Y-%m-%d').date() except ValueError: pass for row in rows: start_time_str = row.get('Start Time', '') if not start_time_str: continue try: dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S') row_date = dt.date() row_hour = dt.hour row_minute = dt.minute # Date filtering if start_dt and row_date < start_dt: continue if end_dt and row_date > end_dt: continue # Time filtering (handle overnight ranges like 19:00-07:00) if start_hour is not None and end_hour is not None: row_time_minutes = row_hour * 60 + row_minute start_time_minutes = start_hour * 60 + start_minute end_time_minutes = end_hour * 60 + end_minute if start_time_minutes > end_time_minutes: # Overnight range (e.g., 19:00-07:00) if not (row_time_minutes >= start_time_minutes or row_time_minutes < end_time_minutes): continue else: # Same day range (e.g., 07:00-19:00) if not (start_time_minutes <= row_time_minutes < end_time_minutes): continue filtered.append(row) except ValueError: # If we can't parse the time, include the row anyway filtered.append(row) return filtered # Apply filters original_count = len(rnd_rows) rnd_rows = filter_rows_by_time(rnd_rows, start_time, end_time, start_date, end_date) if not rnd_rows: time_filter_desc = "" if start_time and end_time: time_filter_desc = f" between {start_time} and {end_time}" if start_date or end_date: time_filter_desc += f" from {start_date or 'start'} to {end_date or 'end'}" raise HTTPException( status_code=400, detail=f"No data found after applying filters{time_filter_desc}. Original file had {original_count} rows." ) # Create Excel workbook wb = openpyxl.Workbook() ws = wb.active ws.title = "Sound Level Data" # Define styles title_font = Font(bold=True, size=14) header_font = Font(bold=True, size=10) thin_border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) header_fill = PatternFill(start_color="DAEEF3", end_color="DAEEF3", fill_type="solid") # Row 1: Report title final_project_name = project_name if project_name else (project.name if project else "") final_title = report_title if final_project_name: final_title = f"{report_title} - {final_project_name}" ws['A1'] = final_title ws['A1'].font = title_font ws.merge_cells('A1:G1') # Row 2: Client name (if provided) if client_name: ws['A2'] = f"Client: {client_name}" ws['A2'].font = Font(italic=True, size=10) # Row 3: Location name final_location = location_name if not final_location and location: final_location = location.name if final_location: ws['A3'] = final_location ws['A3'].font = Font(bold=True, size=11) # Row 4: Time filter info (if applied) if start_time and end_time: filter_info = f"Time Filter: {start_time} - {end_time}" if start_date or end_date: filter_info += f" | Date Range: {start_date or 'start'} to {end_date or 'end'}" filter_info += f" | {len(rnd_rows)} of {original_count} rows" ws['A4'] = filter_info ws['A4'].font = Font(italic=True, size=9, color="666666") # Row 7: Headers headers = ['Test Increment #', 'Date', 'Time', 'LAmax (dBA)', 'LA01 (dBA)', 'LA10 (dBA)', 'Comments'] for col, header in enumerate(headers, 1): cell = ws.cell(row=7, column=col, value=header) cell.font = header_font cell.border = thin_border cell.fill = header_fill cell.alignment = Alignment(horizontal='center') # Set column widths column_widths = [16, 12, 10, 12, 12, 12, 40] for i, width in enumerate(column_widths, 1): ws.column_dimensions[get_column_letter(i)].width = width # Data rows starting at row 8 data_start_row = 8 for idx, row in enumerate(rnd_rows, 1): data_row = data_start_row + idx - 1 # Test Increment # ws.cell(row=data_row, column=1, value=idx).border = thin_border # Parse the Start Time to get Date and Time start_time_str = row.get('Start Time', '') if start_time_str: try: # Format: "2025/12/26 20:23:38" dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S') ws.cell(row=data_row, column=2, value=dt.date()) ws.cell(row=data_row, column=3, value=dt.time()) except ValueError: ws.cell(row=data_row, column=2, value=start_time_str) ws.cell(row=data_row, column=3, value='') else: ws.cell(row=data_row, column=2, value='') ws.cell(row=data_row, column=3, value='') # LAmax - from Lmax(Main) lmax = row.get('Lmax(Main)') ws.cell(row=data_row, column=4, value=lmax if lmax else '').border = thin_border # LA01 - from LN1(Main) ln1 = row.get('LN1(Main)') ws.cell(row=data_row, column=5, value=ln1 if ln1 else '').border = thin_border # LA10 - from LN2(Main) ln2 = row.get('LN2(Main)') ws.cell(row=data_row, column=6, value=ln2 if ln2 else '').border = thin_border # Comments (empty for now, can be populated) ws.cell(row=data_row, column=7, value='').border = thin_border # Apply borders to date/time cells ws.cell(row=data_row, column=2).border = thin_border ws.cell(row=data_row, column=3).border = thin_border data_end_row = data_start_row + len(rnd_rows) - 1 # Add Line Chart chart = LineChart() chart.title = f"{final_location or 'Sound Level Data'} - Background Noise Study" chart.style = 10 chart.y_axis.title = "Sound Level (dBA)" chart.x_axis.title = "Test Increment" chart.height = 12 chart.width = 20 # Data references (LAmax, LA01, LA10 are columns D, E, F) data_ref = Reference(ws, min_col=4, min_row=7, max_col=6, max_row=data_end_row) categories = Reference(ws, min_col=1, min_row=data_start_row, max_row=data_end_row) chart.add_data(data_ref, titles_from_data=True) chart.set_categories(categories) # Style the series if len(chart.series) >= 3: chart.series[0].graphicalProperties.line.solidFill = "FF0000" # LAmax - Red chart.series[1].graphicalProperties.line.solidFill = "00B050" # LA01 - Green chart.series[2].graphicalProperties.line.solidFill = "0070C0" # LA10 - Blue # Position chart to the right of data ws.add_chart(chart, "I3") # Add summary statistics section below the data summary_row = data_end_row + 3 ws.cell(row=summary_row, column=1, value="Summary Statistics").font = Font(bold=True, size=12) # Calculate time-period statistics time_periods = { 'Evening (7PM-10PM)': [], 'Nighttime (10PM-7AM)': [], 'Morning (7AM-12PM)': [], 'Daytime (12PM-7PM)': [] } for row in rnd_rows: start_time_str = row.get('Start Time', '') if start_time_str: try: dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S') hour = dt.hour lmax = row.get('Lmax(Main)') ln1 = row.get('LN1(Main)') ln2 = row.get('LN2(Main)') if isinstance(lmax, (int, float)) and isinstance(ln1, (int, float)) and isinstance(ln2, (int, float)): data_point = {'lmax': lmax, 'ln1': ln1, 'ln2': ln2} if 19 <= hour < 22: time_periods['Evening (7PM-10PM)'].append(data_point) elif hour >= 22 or hour < 7: time_periods['Nighttime (10PM-7AM)'].append(data_point) elif 7 <= hour < 12: time_periods['Morning (7AM-12PM)'].append(data_point) else: # 12-19 time_periods['Daytime (12PM-7PM)'].append(data_point) except ValueError: continue # Summary table headers summary_row += 2 summary_headers = ['Time Period', 'Samples', 'LAmax Avg', 'LA01 Avg', 'LA10 Avg'] for col, header in enumerate(summary_headers, 1): cell = ws.cell(row=summary_row, column=col, value=header) cell.font = header_font cell.fill = header_fill cell.border = thin_border # Summary data summary_row += 1 for period_name, samples in time_periods.items(): ws.cell(row=summary_row, column=1, value=period_name).border = thin_border ws.cell(row=summary_row, column=2, value=len(samples)).border = thin_border if samples: avg_lmax = sum(s['lmax'] for s in samples) / len(samples) avg_ln1 = sum(s['ln1'] for s in samples) / len(samples) avg_ln2 = sum(s['ln2'] for s in samples) / len(samples) ws.cell(row=summary_row, column=3, value=round(avg_lmax, 1)).border = thin_border ws.cell(row=summary_row, column=4, value=round(avg_ln1, 1)).border = thin_border ws.cell(row=summary_row, column=5, value=round(avg_ln2, 1)).border = thin_border else: ws.cell(row=summary_row, column=3, value='-').border = thin_border ws.cell(row=summary_row, column=4, value='-').border = thin_border ws.cell(row=summary_row, column=5, value='-').border = thin_border summary_row += 1 # Overall summary summary_row += 1 ws.cell(row=summary_row, column=1, value='Overall').font = Font(bold=True) ws.cell(row=summary_row, column=1).border = thin_border ws.cell(row=summary_row, column=2, value=len(rnd_rows)).border = thin_border all_lmax = [r.get('Lmax(Main)') for r in rnd_rows if isinstance(r.get('Lmax(Main)'), (int, float))] all_ln1 = [r.get('LN1(Main)') for r in rnd_rows if isinstance(r.get('LN1(Main)'), (int, float))] all_ln2 = [r.get('LN2(Main)') for r in rnd_rows if isinstance(r.get('LN2(Main)'), (int, float))] if all_lmax: ws.cell(row=summary_row, column=3, value=round(sum(all_lmax) / len(all_lmax), 1)).border = thin_border if all_ln1: ws.cell(row=summary_row, column=4, value=round(sum(all_ln1) / len(all_ln1), 1)).border = thin_border if all_ln2: ws.cell(row=summary_row, column=5, value=round(sum(all_ln2) / len(all_ln2), 1)).border = thin_border # Save to buffer output = io.BytesIO() wb.save(output) output.seek(0) # Generate filename filename = file_record.file_path.split('/')[-1].replace('.rnd', '') if location: filename = f"{location.name}_{filename}" filename = f"{filename}_report.xlsx" # Clean filename filename = "".join(c for c in filename if c.isalnum() or c in ('_', '-', '.')).rstrip() return StreamingResponse( output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": f'attachment; filename="{filename}"'} ) @router.get("/{project_id}/files/{file_id}/preview-report") async def preview_report_data( request: Request, project_id: str, file_id: str, report_title: str = Query("Background Noise Study", description="Title for the report"), location_name: str = Query("", description="Location name"), project_name: str = Query("", description="Project name override"), client_name: str = Query("", description="Client name"), start_time: str = Query("", description="Filter start time (HH:MM format)"), end_time: str = Query("", description="Filter end time (HH:MM format)"), start_date: str = Query("", description="Filter start date (YYYY-MM-DD format)"), end_date: str = Query("", description="Filter end date (YYYY-MM-DD format)"), db: Session = Depends(get_db), ): """ Preview report data for editing in jspreadsheet. Returns an HTML page with the spreadsheet editor. """ from backend.models import DataFile, ReportTemplate from pathlib import Path import csv # Get the file record file_record = db.query(DataFile).filter_by(id=file_id).first() if not file_record: raise HTTPException(status_code=404, detail="File not found") # Verify file belongs to this project session = db.query(RecordingSession).filter_by(id=file_record.session_id).first() if not session or session.project_id != project_id: raise HTTPException(status_code=403, detail="File does not belong to this project") # Get related data for report context project = db.query(Project).filter_by(id=project_id).first() location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None # Build full file path file_path = Path("data") / file_record.file_path if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found on disk") # Validate this is a Leq file if '_Leq_' not in file_record.file_path: raise HTTPException( status_code=400, detail="Reports can only be generated from Leq files (15-minute averaged data)." ) # Read and parse the Leq RND file try: with open(file_path, 'r', encoding='utf-8', errors='replace') as f: content = f.read() reader = csv.DictReader(io.StringIO(content)) rnd_rows = [] for row in reader: cleaned_row = {} for key, value in row.items(): if key: cleaned_key = key.strip() cleaned_value = value.strip() if value else '' if cleaned_value and cleaned_value not in ['-.-', '-', '']: try: cleaned_value = float(cleaned_value) except ValueError: pass elif cleaned_value in ['-.-', '-']: cleaned_value = None cleaned_row[cleaned_key] = cleaned_value rnd_rows.append(cleaned_row) if not rnd_rows: raise HTTPException(status_code=400, detail="No data found in RND file") except Exception as e: logger.error(f"Error reading RND file: {e}") raise HTTPException(status_code=500, detail=f"Error reading file: {str(e)}") # Apply time and date filtering (same logic as generate-report) def filter_rows(rows, filter_start_time, filter_end_time, filter_start_date, filter_end_date): if not filter_start_time and not filter_end_time and not filter_start_date and not filter_end_date: return rows filtered = [] start_hour = start_minute = end_hour = end_minute = None if filter_start_time: try: parts = filter_start_time.split(':') start_hour = int(parts[0]) start_minute = int(parts[1]) if len(parts) > 1 else 0 except (ValueError, IndexError): pass if filter_end_time: try: parts = filter_end_time.split(':') end_hour = int(parts[0]) end_minute = int(parts[1]) if len(parts) > 1 else 0 except (ValueError, IndexError): pass start_dt = end_dt = None if filter_start_date: try: start_dt = datetime.strptime(filter_start_date, '%Y-%m-%d').date() except ValueError: pass if filter_end_date: try: end_dt = datetime.strptime(filter_end_date, '%Y-%m-%d').date() except ValueError: pass for row in rows: start_time_str = row.get('Start Time', '') if not start_time_str: continue try: dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S') row_date = dt.date() row_hour = dt.hour row_minute = dt.minute if start_dt and row_date < start_dt: continue if end_dt and row_date > end_dt: continue if start_hour is not None and end_hour is not None: row_time_minutes = row_hour * 60 + row_minute start_time_minutes = start_hour * 60 + start_minute end_time_minutes = end_hour * 60 + end_minute if start_time_minutes > end_time_minutes: if not (row_time_minutes >= start_time_minutes or row_time_minutes < end_time_minutes): continue else: if not (start_time_minutes <= row_time_minutes < end_time_minutes): continue filtered.append(row) except ValueError: filtered.append(row) return filtered original_count = len(rnd_rows) rnd_rows = filter_rows(rnd_rows, start_time, end_time, start_date, end_date) # Convert to spreadsheet data format (array of arrays) spreadsheet_data = [] for idx, row in enumerate(rnd_rows, 1): start_time_str = row.get('Start Time', '') date_str = '' time_str = '' if start_time_str: try: dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S') date_str = dt.strftime('%Y-%m-%d') time_str = dt.strftime('%H:%M:%S') except ValueError: date_str = start_time_str time_str = '' lmax = row.get('Lmax(Main)', '') ln1 = row.get('LN1(Main)', '') ln2 = row.get('LN2(Main)', '') spreadsheet_data.append([ idx, # Test # date_str, time_str, lmax if lmax else '', ln1 if ln1 else '', ln2 if ln2 else '', '' # Comments ]) # Prepare context data final_project_name = project_name if project_name else (project.name if project else "") final_location = location_name if location_name else (location.name if location else "") # Get templates for the dropdown templates = db.query(ReportTemplate).all() return templates.TemplateResponse("report_preview.html", { "request": request, "project_id": project_id, "file_id": file_id, "project": project, "location": location, "file": file_record, "spreadsheet_data": spreadsheet_data, "report_title": report_title, "project_name": final_project_name, "client_name": client_name, "location_name": final_location, "start_time": start_time, "end_time": end_time, "start_date": start_date, "end_date": end_date, "original_count": original_count, "filtered_count": len(rnd_rows), "templates": templates, }) @router.post("/{project_id}/files/{file_id}/generate-from-preview") async def generate_report_from_preview( project_id: str, file_id: str, data: dict, db: Session = Depends(get_db), ): """ Generate an Excel report from edited spreadsheet data. Accepts the edited data from jspreadsheet and creates the final Excel file. """ from backend.models import DataFile from pathlib import Path try: import openpyxl from openpyxl.chart import LineChart, Reference from openpyxl.styles import Font, Alignment, Border, Side, PatternFill from openpyxl.utils import get_column_letter except ImportError: raise HTTPException(status_code=500, detail="openpyxl is not installed") # Get the file record for filename generation file_record = db.query(DataFile).filter_by(id=file_id).first() if not file_record: raise HTTPException(status_code=404, detail="File not found") session = db.query(RecordingSession).filter_by(id=file_record.session_id).first() if not session or session.project_id != project_id: raise HTTPException(status_code=403, detail="File does not belong to this project") project = db.query(Project).filter_by(id=project_id).first() location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None # Extract data from request spreadsheet_data = data.get('data', []) report_title = data.get('report_title', 'Background Noise Study') project_name = data.get('project_name', project.name if project else '') client_name = data.get('client_name', '') location_name = data.get('location_name', location.name if location else '') time_filter = data.get('time_filter', '') if not spreadsheet_data: raise HTTPException(status_code=400, detail="No data provided") # Create Excel workbook wb = openpyxl.Workbook() ws = wb.active ws.title = "Sound Level Data" # Styles title_font = Font(bold=True, size=14) header_font = Font(bold=True, size=10) thin_border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) header_fill = PatternFill(start_color="DAEEF3", end_color="DAEEF3", fill_type="solid") # Row 1: Title final_title = f"{report_title} - {project_name}" if project_name else report_title ws['A1'] = final_title ws['A1'].font = title_font ws.merge_cells('A1:G1') # Row 2: Client if client_name: ws['A2'] = f"Client: {client_name}" ws['A2'].font = Font(italic=True, size=10) # Row 3: Location if location_name: ws['A3'] = location_name ws['A3'].font = Font(bold=True, size=11) # Row 4: Time filter info if time_filter: ws['A4'] = time_filter ws['A4'].font = Font(italic=True, size=9, color="666666") # Row 7: Headers headers = ['Test Increment #', 'Date', 'Time', 'LAmax (dBA)', 'LA01 (dBA)', 'LA10 (dBA)', 'Comments'] for col, header in enumerate(headers, 1): cell = ws.cell(row=7, column=col, value=header) cell.font = header_font cell.border = thin_border cell.fill = header_fill cell.alignment = Alignment(horizontal='center') # Column widths column_widths = [16, 12, 10, 12, 12, 12, 40] for i, width in enumerate(column_widths, 1): ws.column_dimensions[get_column_letter(i)].width = width # Data rows data_start_row = 8 for idx, row_data in enumerate(spreadsheet_data): data_row = data_start_row + idx for col, value in enumerate(row_data, 1): cell = ws.cell(row=data_row, column=col, value=value if value != '' else None) cell.border = thin_border data_end_row = data_start_row + len(spreadsheet_data) - 1 # Add chart if we have data if len(spreadsheet_data) > 0: chart = LineChart() chart.title = f"{location_name or 'Sound Level Data'} - Background Noise Study" chart.style = 10 chart.y_axis.title = "Sound Level (dBA)" chart.x_axis.title = "Test Increment" chart.height = 12 chart.width = 20 data_ref = Reference(ws, min_col=4, min_row=7, max_col=6, max_row=data_end_row) categories = Reference(ws, min_col=1, min_row=data_start_row, max_row=data_end_row) chart.add_data(data_ref, titles_from_data=True) chart.set_categories(categories) if len(chart.series) >= 3: chart.series[0].graphicalProperties.line.solidFill = "FF0000" chart.series[1].graphicalProperties.line.solidFill = "00B050" chart.series[2].graphicalProperties.line.solidFill = "0070C0" ws.add_chart(chart, "I3") # Save to buffer output = io.BytesIO() wb.save(output) output.seek(0) # Generate filename filename = file_record.file_path.split('/')[-1].replace('.rnd', '') if location: filename = f"{location.name}_{filename}" filename = f"{filename}_report.xlsx" filename = "".join(c for c in filename if c.isalnum() or c in ('_', '-', '.')).rstrip() return StreamingResponse( output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": f'attachment; filename="{filename}"'} ) @router.get("/{project_id}/generate-combined-report") async def generate_combined_excel_report( project_id: str, report_title: str = Query("Background Noise Study", description="Title for the report"), db: Session = Depends(get_db), ): """ Generate a combined Excel report from all RND files in a project. Creates a multi-sheet Excel workbook with: - One sheet per location/RND file - Data tables with LAmax, LA01, LA10 - Line charts for each location - Summary sheet combining all locations Column mapping from RND to Report: - Lmax(Main) -> LAmax (dBA) - LN1(Main) -> LA01 (dBA) [L1 percentile] - LN2(Main) -> LA10 (dBA) [L10 percentile] """ from backend.models import DataFile from pathlib import Path import csv try: import openpyxl from openpyxl.chart import LineChart, Reference from openpyxl.styles import Font, Alignment, Border, Side, PatternFill from openpyxl.utils import get_column_letter except ImportError: raise HTTPException( status_code=500, detail="openpyxl is not installed. Run: pip install openpyxl" ) # Get project project = db.query(Project).filter_by(id=project_id).first() if not project: raise HTTPException(status_code=404, detail="Project not found") # Get all sessions with measurement files sessions = db.query(RecordingSession).filter_by(project_id=project_id).all() # Collect all Leq RND files grouped by location # Only include files with '_Leq_' in the path (15-minute averaged data) # Exclude Lp files (instantaneous 100ms readings) location_files = {} for session in sessions: files = db.query(DataFile).filter_by(session_id=session.id).all() for file in files: # Only include Leq files for reports (contain '_Leq_' in path) is_leq_file = file.file_path and '_Leq_' in file.file_path and file.file_path.endswith('.rnd') if is_leq_file: location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None location_name = location.name if location else f"Session {session.id[:8]}" if location_name not in location_files: location_files[location_name] = [] location_files[location_name].append({ 'file': file, 'session': session, 'location': location }) if not location_files: raise HTTPException(status_code=404, detail="No Leq measurement files found in project. Reports require Leq data (files with '_Leq_' in the name).") # Define styles title_font = Font(bold=True, size=14) header_font = Font(bold=True, size=10) thin_border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) header_fill = PatternFill(start_color="DAEEF3", end_color="DAEEF3", fill_type="solid") # Create Excel workbook wb = openpyxl.Workbook() # Remove default sheet wb.remove(wb.active) # Track all data for summary all_location_summaries = [] # Create a sheet for each location for location_name, file_list in location_files.items(): # Sanitize sheet name (max 31 chars, no special chars) safe_sheet_name = "".join(c for c in location_name if c.isalnum() or c in (' ', '-', '_'))[:31] ws = wb.create_sheet(title=safe_sheet_name) # Row 1: Report title final_title = f"{report_title} - {project.name}" ws['A1'] = final_title ws['A1'].font = title_font ws.merge_cells('A1:G1') # Row 3: Location name ws['A3'] = location_name ws['A3'].font = Font(bold=True, size=11) # Row 7: Headers headers = ['Test Increment #', 'Date', 'Time', 'LAmax (dBA)', 'LA01 (dBA)', 'LA10 (dBA)', 'Comments'] for col, header in enumerate(headers, 1): cell = ws.cell(row=7, column=col, value=header) cell.font = header_font cell.border = thin_border cell.fill = header_fill cell.alignment = Alignment(horizontal='center') # Set column widths column_widths = [16, 12, 10, 12, 12, 12, 40] for i, width in enumerate(column_widths, 1): ws.column_dimensions[get_column_letter(i)].width = width # Combine data from all files for this location all_rnd_rows = [] for file_info in file_list: file = file_info['file'] file_path = Path("data") / file.file_path if not file_path.exists(): continue try: with open(file_path, 'r', encoding='utf-8', errors='replace') as f: content = f.read() reader = csv.DictReader(io.StringIO(content)) for row in reader: cleaned_row = {} for key, value in row.items(): if key: cleaned_key = key.strip() cleaned_value = value.strip() if value else '' if cleaned_value and cleaned_value not in ['-.-', '-', '']: try: cleaned_value = float(cleaned_value) except ValueError: pass elif cleaned_value in ['-.-', '-']: cleaned_value = None cleaned_row[cleaned_key] = cleaned_value all_rnd_rows.append(cleaned_row) except Exception as e: logger.warning(f"Error reading file {file.file_path}: {e}") continue if not all_rnd_rows: continue # Sort by start time all_rnd_rows.sort(key=lambda r: r.get('Start Time', '')) # Data rows starting at row 8 data_start_row = 8 for idx, row in enumerate(all_rnd_rows, 1): data_row = data_start_row + idx - 1 ws.cell(row=data_row, column=1, value=idx).border = thin_border start_time_str = row.get('Start Time', '') if start_time_str: try: dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S') ws.cell(row=data_row, column=2, value=dt.date()) ws.cell(row=data_row, column=3, value=dt.time()) except ValueError: ws.cell(row=data_row, column=2, value=start_time_str) ws.cell(row=data_row, column=3, value='') else: ws.cell(row=data_row, column=2, value='') ws.cell(row=data_row, column=3, value='') lmax = row.get('Lmax(Main)') ws.cell(row=data_row, column=4, value=lmax if lmax else '').border = thin_border ln1 = row.get('LN1(Main)') ws.cell(row=data_row, column=5, value=ln1 if ln1 else '').border = thin_border ln2 = row.get('LN2(Main)') ws.cell(row=data_row, column=6, value=ln2 if ln2 else '').border = thin_border ws.cell(row=data_row, column=7, value='').border = thin_border ws.cell(row=data_row, column=2).border = thin_border ws.cell(row=data_row, column=3).border = thin_border data_end_row = data_start_row + len(all_rnd_rows) - 1 # Add Line Chart chart = LineChart() chart.title = f"{location_name}" chart.style = 10 chart.y_axis.title = "Sound Level (dBA)" chart.x_axis.title = "Test Increment" chart.height = 12 chart.width = 20 data_ref = Reference(ws, min_col=4, min_row=7, max_col=6, max_row=data_end_row) categories = Reference(ws, min_col=1, min_row=data_start_row, max_row=data_end_row) chart.add_data(data_ref, titles_from_data=True) chart.set_categories(categories) if len(chart.series) >= 3: chart.series[0].graphicalProperties.line.solidFill = "FF0000" chart.series[1].graphicalProperties.line.solidFill = "00B050" chart.series[2].graphicalProperties.line.solidFill = "0070C0" ws.add_chart(chart, "I3") # Calculate summary for this location all_lmax = [r.get('Lmax(Main)') for r in all_rnd_rows if isinstance(r.get('Lmax(Main)'), (int, float))] all_ln1 = [r.get('LN1(Main)') for r in all_rnd_rows if isinstance(r.get('LN1(Main)'), (int, float))] all_ln2 = [r.get('LN2(Main)') for r in all_rnd_rows if isinstance(r.get('LN2(Main)'), (int, float))] all_location_summaries.append({ 'location': location_name, 'samples': len(all_rnd_rows), 'lmax_avg': round(sum(all_lmax) / len(all_lmax), 1) if all_lmax else None, 'ln1_avg': round(sum(all_ln1) / len(all_ln1), 1) if all_ln1 else None, 'ln2_avg': round(sum(all_ln2) / len(all_ln2), 1) if all_ln2 else None, }) # Create Summary sheet at the beginning summary_ws = wb.create_sheet(title="Summary", index=0) summary_ws['A1'] = f"{report_title} - {project.name} - Summary" summary_ws['A1'].font = title_font summary_ws.merge_cells('A1:E1') summary_headers = ['Location', 'Samples', 'LAmax Avg', 'LA01 Avg', 'LA10 Avg'] for col, header in enumerate(summary_headers, 1): cell = summary_ws.cell(row=3, column=col, value=header) cell.font = header_font cell.fill = header_fill cell.border = thin_border for i, width in enumerate([30, 10, 12, 12, 12], 1): summary_ws.column_dimensions[get_column_letter(i)].width = width for idx, loc_summary in enumerate(all_location_summaries, 4): summary_ws.cell(row=idx, column=1, value=loc_summary['location']).border = thin_border summary_ws.cell(row=idx, column=2, value=loc_summary['samples']).border = thin_border summary_ws.cell(row=idx, column=3, value=loc_summary['lmax_avg'] or '-').border = thin_border summary_ws.cell(row=idx, column=4, value=loc_summary['ln1_avg'] or '-').border = thin_border summary_ws.cell(row=idx, column=5, value=loc_summary['ln2_avg'] or '-').border = thin_border # Save to buffer output = io.BytesIO() wb.save(output) output.seek(0) # Generate filename project_name_clean = "".join(c for c in project.name if c.isalnum() or c in ('_', '-', ' ')).strip() filename = f"{project_name_clean}_combined_report.xlsx" filename = filename.replace(' ', '_') return StreamingResponse( output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": f'attachment; filename="{filename}"'} ) @router.get("/types/list", response_class=HTMLResponse) async def get_project_types(request: Request, db: Session = Depends(get_db)): """ Get all available project types. Returns HTML partial with project type cards. """ project_types = db.query(ProjectType).all() return templates.TemplateResponse("partials/projects/project_type_cards.html", { "request": request, "project_types": project_types, })