feat: SLM project report generator added. WIP
This commit is contained in:
@@ -10,7 +10,7 @@ Provides API endpoints for the Projects system:
|
||||
|
||||
from fastapi import APIRouter, Request, Depends, HTTPException, Query
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from fastapi.responses import HTMLResponse, JSONResponse
|
||||
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import func, and_
|
||||
from datetime import datetime, timedelta
|
||||
@@ -18,6 +18,7 @@ from typing import Optional
|
||||
import uuid
|
||||
import json
|
||||
import logging
|
||||
import io
|
||||
|
||||
from backend.database import get_db
|
||||
from backend.models import (
|
||||
@@ -1025,6 +1026,147 @@ async def download_project_file(
|
||||
)
|
||||
|
||||
|
||||
@router.get("/{project_id}/sessions/{session_id}/download-all")
|
||||
async def download_session_files(
|
||||
project_id: str,
|
||||
session_id: str,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""
|
||||
Download all files from a session as a single zip archive.
|
||||
"""
|
||||
from backend.models import DataFile
|
||||
from pathlib import Path
|
||||
import zipfile
|
||||
|
||||
# Verify session belongs to this project
|
||||
session = db.query(RecordingSession).filter_by(id=session_id).first()
|
||||
if not session:
|
||||
raise HTTPException(status_code=404, detail="Session not found")
|
||||
if session.project_id != project_id:
|
||||
raise HTTPException(status_code=403, detail="Session does not belong to this project")
|
||||
|
||||
# Get all files for this session
|
||||
files = db.query(DataFile).filter_by(session_id=session_id).all()
|
||||
if not files:
|
||||
raise HTTPException(status_code=404, detail="No files found in this session")
|
||||
|
||||
# Create zip in memory
|
||||
zip_buffer = io.BytesIO()
|
||||
|
||||
# Get session info for folder naming
|
||||
session_date = session.started_at.strftime('%Y-%m-%d_%H%M') if session.started_at else 'unknown'
|
||||
|
||||
# Get unit and location for naming
|
||||
unit = db.query(RosterUnit).filter_by(id=session.unit_id).first() if session.unit_id else None
|
||||
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None
|
||||
|
||||
unit_name = unit.id if unit else "unknown_unit"
|
||||
location_name = location.name.replace(" ", "_") if location else ""
|
||||
|
||||
# Build folder name for zip contents
|
||||
folder_name = f"{session_date}_{unit_name}"
|
||||
if location_name:
|
||||
folder_name += f"_{location_name}"
|
||||
|
||||
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
|
||||
for file_record in files:
|
||||
file_path = Path("data") / file_record.file_path
|
||||
if file_path.exists():
|
||||
# Add file to zip with folder structure
|
||||
arcname = f"{folder_name}/{file_path.name}"
|
||||
zip_file.write(file_path, arcname)
|
||||
|
||||
zip_buffer.seek(0)
|
||||
|
||||
# Generate filename for the zip
|
||||
zip_filename = f"{folder_name}.zip"
|
||||
|
||||
return StreamingResponse(
|
||||
zip_buffer,
|
||||
media_type="application/zip",
|
||||
headers={"Content-Disposition": f"attachment; filename={zip_filename}"}
|
||||
)
|
||||
|
||||
|
||||
@router.delete("/{project_id}/files/{file_id}")
|
||||
async def delete_project_file(
|
||||
project_id: str,
|
||||
file_id: str,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""
|
||||
Delete a single data file from a project.
|
||||
Removes both the database record and the file on disk.
|
||||
"""
|
||||
from backend.models import DataFile
|
||||
from pathlib import Path
|
||||
|
||||
# Get the file record
|
||||
file_record = db.query(DataFile).filter_by(id=file_id).first()
|
||||
if not file_record:
|
||||
raise HTTPException(status_code=404, detail="File not found")
|
||||
|
||||
# Verify file belongs to this project
|
||||
session = db.query(RecordingSession).filter_by(id=file_record.session_id).first()
|
||||
if not session or session.project_id != project_id:
|
||||
raise HTTPException(status_code=403, detail="File does not belong to this project")
|
||||
|
||||
# Delete file from disk if it exists
|
||||
file_path = Path("data") / file_record.file_path
|
||||
if file_path.exists():
|
||||
file_path.unlink()
|
||||
|
||||
# Delete database record
|
||||
db.delete(file_record)
|
||||
db.commit()
|
||||
|
||||
return JSONResponse({"status": "success", "message": "File deleted"})
|
||||
|
||||
|
||||
@router.delete("/{project_id}/sessions/{session_id}")
|
||||
async def delete_session(
|
||||
project_id: str,
|
||||
session_id: str,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""
|
||||
Delete an entire session and all its files.
|
||||
Removes database records and files on disk.
|
||||
"""
|
||||
from backend.models import DataFile
|
||||
from pathlib import Path
|
||||
|
||||
# Verify session belongs to this project
|
||||
session = db.query(RecordingSession).filter_by(id=session_id).first()
|
||||
if not session:
|
||||
raise HTTPException(status_code=404, detail="Session not found")
|
||||
if session.project_id != project_id:
|
||||
raise HTTPException(status_code=403, detail="Session does not belong to this project")
|
||||
|
||||
# Get all files for this session
|
||||
files = db.query(DataFile).filter_by(session_id=session_id).all()
|
||||
|
||||
# Delete files from disk
|
||||
deleted_count = 0
|
||||
for file_record in files:
|
||||
file_path = Path("data") / file_record.file_path
|
||||
if file_path.exists():
|
||||
file_path.unlink()
|
||||
deleted_count += 1
|
||||
# Delete database record
|
||||
db.delete(file_record)
|
||||
|
||||
# Delete the session record
|
||||
db.delete(session)
|
||||
db.commit()
|
||||
|
||||
return JSONResponse({
|
||||
"status": "success",
|
||||
"message": f"Session and {deleted_count} file(s) deleted"
|
||||
})
|
||||
|
||||
|
||||
@router.get("/{project_id}/files/{file_id}/view-rnd", response_class=HTMLResponse)
|
||||
async def view_rnd_file(
|
||||
request: Request,
|
||||
@@ -1195,6 +1337,586 @@ async def get_rnd_data(
|
||||
raise HTTPException(status_code=500, detail=f"Error parsing file: {str(e)}")
|
||||
|
||||
|
||||
@router.get("/{project_id}/files/{file_id}/generate-report")
|
||||
async def generate_excel_report(
|
||||
project_id: str,
|
||||
file_id: str,
|
||||
report_title: str = Query("Background Noise Study", description="Title for the report"),
|
||||
location_name: str = Query("", description="Location name (e.g., 'NRL 1 - West Side')"),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""
|
||||
Generate an Excel report from an RND file.
|
||||
|
||||
Creates a formatted Excel workbook with:
|
||||
- Title and location headers
|
||||
- Data table (Test #, Date, Time, LAmax, LA01, LA10, Comments)
|
||||
- Line chart visualization
|
||||
- Time period summary statistics
|
||||
|
||||
Column mapping from RND to Report:
|
||||
- Lmax(Main) -> LAmax (dBA)
|
||||
- LN1(Main) -> LA01 (dBA) [L1 percentile]
|
||||
- LN2(Main) -> LA10 (dBA) [L10 percentile]
|
||||
"""
|
||||
from backend.models import DataFile
|
||||
from pathlib import Path
|
||||
import csv
|
||||
|
||||
try:
|
||||
import openpyxl
|
||||
from openpyxl.chart import LineChart, Reference
|
||||
from openpyxl.chart.label import DataLabelList
|
||||
from openpyxl.styles import Font, Alignment, Border, Side, PatternFill
|
||||
from openpyxl.utils import get_column_letter
|
||||
except ImportError:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="openpyxl is not installed. Run: pip install openpyxl"
|
||||
)
|
||||
|
||||
# Get the file record
|
||||
file_record = db.query(DataFile).filter_by(id=file_id).first()
|
||||
if not file_record:
|
||||
raise HTTPException(status_code=404, detail="File not found")
|
||||
|
||||
# Verify file belongs to this project
|
||||
session = db.query(RecordingSession).filter_by(id=file_record.session_id).first()
|
||||
if not session or session.project_id != project_id:
|
||||
raise HTTPException(status_code=403, detail="File does not belong to this project")
|
||||
|
||||
# Get related data for report context
|
||||
project = db.query(Project).filter_by(id=project_id).first()
|
||||
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None
|
||||
|
||||
# Build full file path
|
||||
file_path = Path("data") / file_record.file_path
|
||||
if not file_path.exists():
|
||||
raise HTTPException(status_code=404, detail="File not found on disk")
|
||||
|
||||
# Validate this is a Leq file (contains '_Leq_' in path)
|
||||
# Lp files (instantaneous 100ms readings) don't have the LN percentile data needed for reports
|
||||
if '_Leq_' not in file_record.file_path:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Reports can only be generated from Leq files (15-minute averaged data). This appears to be an Lp (instantaneous) file."
|
||||
)
|
||||
|
||||
# Read and parse the Leq RND file
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
|
||||
content = f.read()
|
||||
|
||||
reader = csv.DictReader(io.StringIO(content))
|
||||
rnd_rows = []
|
||||
for row in reader:
|
||||
cleaned_row = {}
|
||||
for key, value in row.items():
|
||||
if key:
|
||||
cleaned_key = key.strip()
|
||||
cleaned_value = value.strip() if value else ''
|
||||
if cleaned_value and cleaned_value not in ['-.-', '-', '']:
|
||||
try:
|
||||
cleaned_value = float(cleaned_value)
|
||||
except ValueError:
|
||||
pass
|
||||
elif cleaned_value in ['-.-', '-']:
|
||||
cleaned_value = None
|
||||
cleaned_row[cleaned_key] = cleaned_value
|
||||
rnd_rows.append(cleaned_row)
|
||||
|
||||
if not rnd_rows:
|
||||
raise HTTPException(status_code=400, detail="No data found in RND file")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading RND file: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Error reading file: {str(e)}")
|
||||
|
||||
# Create Excel workbook
|
||||
wb = openpyxl.Workbook()
|
||||
ws = wb.active
|
||||
ws.title = "Sound Level Data"
|
||||
|
||||
# Define styles
|
||||
title_font = Font(bold=True, size=14)
|
||||
header_font = Font(bold=True, size=10)
|
||||
thin_border = Border(
|
||||
left=Side(style='thin'),
|
||||
right=Side(style='thin'),
|
||||
top=Side(style='thin'),
|
||||
bottom=Side(style='thin')
|
||||
)
|
||||
header_fill = PatternFill(start_color="DAEEF3", end_color="DAEEF3", fill_type="solid")
|
||||
|
||||
# Row 1: Report title
|
||||
final_title = report_title
|
||||
if project:
|
||||
final_title = f"{report_title} - {project.name}"
|
||||
ws['A1'] = final_title
|
||||
ws['A1'].font = title_font
|
||||
ws.merge_cells('A1:G1')
|
||||
|
||||
# Row 3: Location name
|
||||
final_location = location_name
|
||||
if not final_location and location:
|
||||
final_location = location.name
|
||||
if final_location:
|
||||
ws['A3'] = final_location
|
||||
ws['A3'].font = Font(bold=True, size=11)
|
||||
|
||||
# Row 7: Headers
|
||||
headers = ['Test Increment #', 'Date', 'Time', 'LAmax (dBA)', 'LA01 (dBA)', 'LA10 (dBA)', 'Comments']
|
||||
for col, header in enumerate(headers, 1):
|
||||
cell = ws.cell(row=7, column=col, value=header)
|
||||
cell.font = header_font
|
||||
cell.border = thin_border
|
||||
cell.fill = header_fill
|
||||
cell.alignment = Alignment(horizontal='center')
|
||||
|
||||
# Set column widths
|
||||
column_widths = [16, 12, 10, 12, 12, 12, 40]
|
||||
for i, width in enumerate(column_widths, 1):
|
||||
ws.column_dimensions[get_column_letter(i)].width = width
|
||||
|
||||
# Data rows starting at row 8
|
||||
data_start_row = 8
|
||||
for idx, row in enumerate(rnd_rows, 1):
|
||||
data_row = data_start_row + idx - 1
|
||||
|
||||
# Test Increment #
|
||||
ws.cell(row=data_row, column=1, value=idx).border = thin_border
|
||||
|
||||
# Parse the Start Time to get Date and Time
|
||||
start_time_str = row.get('Start Time', '')
|
||||
if start_time_str:
|
||||
try:
|
||||
# Format: "2025/12/26 20:23:38"
|
||||
dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S')
|
||||
ws.cell(row=data_row, column=2, value=dt.date())
|
||||
ws.cell(row=data_row, column=3, value=dt.time())
|
||||
except ValueError:
|
||||
ws.cell(row=data_row, column=2, value=start_time_str)
|
||||
ws.cell(row=data_row, column=3, value='')
|
||||
else:
|
||||
ws.cell(row=data_row, column=2, value='')
|
||||
ws.cell(row=data_row, column=3, value='')
|
||||
|
||||
# LAmax - from Lmax(Main)
|
||||
lmax = row.get('Lmax(Main)')
|
||||
ws.cell(row=data_row, column=4, value=lmax if lmax else '').border = thin_border
|
||||
|
||||
# LA01 - from LN1(Main)
|
||||
ln1 = row.get('LN1(Main)')
|
||||
ws.cell(row=data_row, column=5, value=ln1 if ln1 else '').border = thin_border
|
||||
|
||||
# LA10 - from LN2(Main)
|
||||
ln2 = row.get('LN2(Main)')
|
||||
ws.cell(row=data_row, column=6, value=ln2 if ln2 else '').border = thin_border
|
||||
|
||||
# Comments (empty for now, can be populated)
|
||||
ws.cell(row=data_row, column=7, value='').border = thin_border
|
||||
|
||||
# Apply borders to date/time cells
|
||||
ws.cell(row=data_row, column=2).border = thin_border
|
||||
ws.cell(row=data_row, column=3).border = thin_border
|
||||
|
||||
data_end_row = data_start_row + len(rnd_rows) - 1
|
||||
|
||||
# Add Line Chart
|
||||
chart = LineChart()
|
||||
chart.title = f"{final_location or 'Sound Level Data'} - Background Noise Study"
|
||||
chart.style = 10
|
||||
chart.y_axis.title = "Sound Level (dBA)"
|
||||
chart.x_axis.title = "Test Increment"
|
||||
chart.height = 12
|
||||
chart.width = 20
|
||||
|
||||
# Data references (LAmax, LA01, LA10 are columns D, E, F)
|
||||
data_ref = Reference(ws, min_col=4, min_row=7, max_col=6, max_row=data_end_row)
|
||||
categories = Reference(ws, min_col=1, min_row=data_start_row, max_row=data_end_row)
|
||||
|
||||
chart.add_data(data_ref, titles_from_data=True)
|
||||
chart.set_categories(categories)
|
||||
|
||||
# Style the series
|
||||
if len(chart.series) >= 3:
|
||||
chart.series[0].graphicalProperties.line.solidFill = "FF0000" # LAmax - Red
|
||||
chart.series[1].graphicalProperties.line.solidFill = "00B050" # LA01 - Green
|
||||
chart.series[2].graphicalProperties.line.solidFill = "0070C0" # LA10 - Blue
|
||||
|
||||
# Position chart to the right of data
|
||||
ws.add_chart(chart, "I3")
|
||||
|
||||
# Add summary statistics section below the data
|
||||
summary_row = data_end_row + 3
|
||||
ws.cell(row=summary_row, column=1, value="Summary Statistics").font = Font(bold=True, size=12)
|
||||
|
||||
# Calculate time-period statistics
|
||||
time_periods = {
|
||||
'Evening (7PM-10PM)': [],
|
||||
'Nighttime (10PM-7AM)': [],
|
||||
'Morning (7AM-12PM)': [],
|
||||
'Daytime (12PM-7PM)': []
|
||||
}
|
||||
|
||||
for row in rnd_rows:
|
||||
start_time_str = row.get('Start Time', '')
|
||||
if start_time_str:
|
||||
try:
|
||||
dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S')
|
||||
hour = dt.hour
|
||||
|
||||
lmax = row.get('Lmax(Main)')
|
||||
ln1 = row.get('LN1(Main)')
|
||||
ln2 = row.get('LN2(Main)')
|
||||
|
||||
if isinstance(lmax, (int, float)) and isinstance(ln1, (int, float)) and isinstance(ln2, (int, float)):
|
||||
data_point = {'lmax': lmax, 'ln1': ln1, 'ln2': ln2}
|
||||
|
||||
if 19 <= hour < 22:
|
||||
time_periods['Evening (7PM-10PM)'].append(data_point)
|
||||
elif hour >= 22 or hour < 7:
|
||||
time_periods['Nighttime (10PM-7AM)'].append(data_point)
|
||||
elif 7 <= hour < 12:
|
||||
time_periods['Morning (7AM-12PM)'].append(data_point)
|
||||
else: # 12-19
|
||||
time_periods['Daytime (12PM-7PM)'].append(data_point)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
# Summary table headers
|
||||
summary_row += 2
|
||||
summary_headers = ['Time Period', 'Samples', 'LAmax Avg', 'LA01 Avg', 'LA10 Avg']
|
||||
for col, header in enumerate(summary_headers, 1):
|
||||
cell = ws.cell(row=summary_row, column=col, value=header)
|
||||
cell.font = header_font
|
||||
cell.fill = header_fill
|
||||
cell.border = thin_border
|
||||
|
||||
# Summary data
|
||||
summary_row += 1
|
||||
for period_name, samples in time_periods.items():
|
||||
ws.cell(row=summary_row, column=1, value=period_name).border = thin_border
|
||||
ws.cell(row=summary_row, column=2, value=len(samples)).border = thin_border
|
||||
|
||||
if samples:
|
||||
avg_lmax = sum(s['lmax'] for s in samples) / len(samples)
|
||||
avg_ln1 = sum(s['ln1'] for s in samples) / len(samples)
|
||||
avg_ln2 = sum(s['ln2'] for s in samples) / len(samples)
|
||||
ws.cell(row=summary_row, column=3, value=round(avg_lmax, 1)).border = thin_border
|
||||
ws.cell(row=summary_row, column=4, value=round(avg_ln1, 1)).border = thin_border
|
||||
ws.cell(row=summary_row, column=5, value=round(avg_ln2, 1)).border = thin_border
|
||||
else:
|
||||
ws.cell(row=summary_row, column=3, value='-').border = thin_border
|
||||
ws.cell(row=summary_row, column=4, value='-').border = thin_border
|
||||
ws.cell(row=summary_row, column=5, value='-').border = thin_border
|
||||
|
||||
summary_row += 1
|
||||
|
||||
# Overall summary
|
||||
summary_row += 1
|
||||
ws.cell(row=summary_row, column=1, value='Overall').font = Font(bold=True)
|
||||
ws.cell(row=summary_row, column=1).border = thin_border
|
||||
ws.cell(row=summary_row, column=2, value=len(rnd_rows)).border = thin_border
|
||||
|
||||
all_lmax = [r.get('Lmax(Main)') for r in rnd_rows if isinstance(r.get('Lmax(Main)'), (int, float))]
|
||||
all_ln1 = [r.get('LN1(Main)') for r in rnd_rows if isinstance(r.get('LN1(Main)'), (int, float))]
|
||||
all_ln2 = [r.get('LN2(Main)') for r in rnd_rows if isinstance(r.get('LN2(Main)'), (int, float))]
|
||||
|
||||
if all_lmax:
|
||||
ws.cell(row=summary_row, column=3, value=round(sum(all_lmax) / len(all_lmax), 1)).border = thin_border
|
||||
if all_ln1:
|
||||
ws.cell(row=summary_row, column=4, value=round(sum(all_ln1) / len(all_ln1), 1)).border = thin_border
|
||||
if all_ln2:
|
||||
ws.cell(row=summary_row, column=5, value=round(sum(all_ln2) / len(all_ln2), 1)).border = thin_border
|
||||
|
||||
# Save to buffer
|
||||
output = io.BytesIO()
|
||||
wb.save(output)
|
||||
output.seek(0)
|
||||
|
||||
# Generate filename
|
||||
filename = file_record.file_path.split('/')[-1].replace('.rnd', '')
|
||||
if location:
|
||||
filename = f"{location.name}_{filename}"
|
||||
filename = f"{filename}_report.xlsx"
|
||||
# Clean filename
|
||||
filename = "".join(c for c in filename if c.isalnum() or c in ('_', '-', '.')).rstrip()
|
||||
|
||||
return StreamingResponse(
|
||||
output,
|
||||
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
||||
headers={"Content-Disposition": f'attachment; filename="{filename}"'}
|
||||
)
|
||||
|
||||
|
||||
@router.get("/{project_id}/generate-combined-report")
|
||||
async def generate_combined_excel_report(
|
||||
project_id: str,
|
||||
report_title: str = Query("Background Noise Study", description="Title for the report"),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""
|
||||
Generate a combined Excel report from all RND files in a project.
|
||||
|
||||
Creates a multi-sheet Excel workbook with:
|
||||
- One sheet per location/RND file
|
||||
- Data tables with LAmax, LA01, LA10
|
||||
- Line charts for each location
|
||||
- Summary sheet combining all locations
|
||||
|
||||
Column mapping from RND to Report:
|
||||
- Lmax(Main) -> LAmax (dBA)
|
||||
- LN1(Main) -> LA01 (dBA) [L1 percentile]
|
||||
- LN2(Main) -> LA10 (dBA) [L10 percentile]
|
||||
"""
|
||||
from backend.models import DataFile
|
||||
from pathlib import Path
|
||||
import csv
|
||||
|
||||
try:
|
||||
import openpyxl
|
||||
from openpyxl.chart import LineChart, Reference
|
||||
from openpyxl.styles import Font, Alignment, Border, Side, PatternFill
|
||||
from openpyxl.utils import get_column_letter
|
||||
except ImportError:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="openpyxl is not installed. Run: pip install openpyxl"
|
||||
)
|
||||
|
||||
# Get project
|
||||
project = db.query(Project).filter_by(id=project_id).first()
|
||||
if not project:
|
||||
raise HTTPException(status_code=404, detail="Project not found")
|
||||
|
||||
# Get all sessions with measurement files
|
||||
sessions = db.query(RecordingSession).filter_by(project_id=project_id).all()
|
||||
|
||||
# Collect all Leq RND files grouped by location
|
||||
# Only include files with '_Leq_' in the path (15-minute averaged data)
|
||||
# Exclude Lp files (instantaneous 100ms readings)
|
||||
location_files = {}
|
||||
for session in sessions:
|
||||
files = db.query(DataFile).filter_by(session_id=session.id).all()
|
||||
for file in files:
|
||||
# Only include Leq files for reports (contain '_Leq_' in path)
|
||||
is_leq_file = file.file_path and '_Leq_' in file.file_path and file.file_path.endswith('.rnd')
|
||||
if is_leq_file:
|
||||
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None
|
||||
location_name = location.name if location else f"Session {session.id[:8]}"
|
||||
|
||||
if location_name not in location_files:
|
||||
location_files[location_name] = []
|
||||
location_files[location_name].append({
|
||||
'file': file,
|
||||
'session': session,
|
||||
'location': location
|
||||
})
|
||||
|
||||
if not location_files:
|
||||
raise HTTPException(status_code=404, detail="No Leq measurement files found in project. Reports require Leq data (files with '_Leq_' in the name).")
|
||||
|
||||
# Define styles
|
||||
title_font = Font(bold=True, size=14)
|
||||
header_font = Font(bold=True, size=10)
|
||||
thin_border = Border(
|
||||
left=Side(style='thin'),
|
||||
right=Side(style='thin'),
|
||||
top=Side(style='thin'),
|
||||
bottom=Side(style='thin')
|
||||
)
|
||||
header_fill = PatternFill(start_color="DAEEF3", end_color="DAEEF3", fill_type="solid")
|
||||
|
||||
# Create Excel workbook
|
||||
wb = openpyxl.Workbook()
|
||||
|
||||
# Remove default sheet
|
||||
wb.remove(wb.active)
|
||||
|
||||
# Track all data for summary
|
||||
all_location_summaries = []
|
||||
|
||||
# Create a sheet for each location
|
||||
for location_name, file_list in location_files.items():
|
||||
# Sanitize sheet name (max 31 chars, no special chars)
|
||||
safe_sheet_name = "".join(c for c in location_name if c.isalnum() or c in (' ', '-', '_'))[:31]
|
||||
ws = wb.create_sheet(title=safe_sheet_name)
|
||||
|
||||
# Row 1: Report title
|
||||
final_title = f"{report_title} - {project.name}"
|
||||
ws['A1'] = final_title
|
||||
ws['A1'].font = title_font
|
||||
ws.merge_cells('A1:G1')
|
||||
|
||||
# Row 3: Location name
|
||||
ws['A3'] = location_name
|
||||
ws['A3'].font = Font(bold=True, size=11)
|
||||
|
||||
# Row 7: Headers
|
||||
headers = ['Test Increment #', 'Date', 'Time', 'LAmax (dBA)', 'LA01 (dBA)', 'LA10 (dBA)', 'Comments']
|
||||
for col, header in enumerate(headers, 1):
|
||||
cell = ws.cell(row=7, column=col, value=header)
|
||||
cell.font = header_font
|
||||
cell.border = thin_border
|
||||
cell.fill = header_fill
|
||||
cell.alignment = Alignment(horizontal='center')
|
||||
|
||||
# Set column widths
|
||||
column_widths = [16, 12, 10, 12, 12, 12, 40]
|
||||
for i, width in enumerate(column_widths, 1):
|
||||
ws.column_dimensions[get_column_letter(i)].width = width
|
||||
|
||||
# Combine data from all files for this location
|
||||
all_rnd_rows = []
|
||||
for file_info in file_list:
|
||||
file = file_info['file']
|
||||
file_path = Path("data") / file.file_path
|
||||
|
||||
if not file_path.exists():
|
||||
continue
|
||||
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
|
||||
content = f.read()
|
||||
|
||||
reader = csv.DictReader(io.StringIO(content))
|
||||
for row in reader:
|
||||
cleaned_row = {}
|
||||
for key, value in row.items():
|
||||
if key:
|
||||
cleaned_key = key.strip()
|
||||
cleaned_value = value.strip() if value else ''
|
||||
if cleaned_value and cleaned_value not in ['-.-', '-', '']:
|
||||
try:
|
||||
cleaned_value = float(cleaned_value)
|
||||
except ValueError:
|
||||
pass
|
||||
elif cleaned_value in ['-.-', '-']:
|
||||
cleaned_value = None
|
||||
cleaned_row[cleaned_key] = cleaned_value
|
||||
all_rnd_rows.append(cleaned_row)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error reading file {file.file_path}: {e}")
|
||||
continue
|
||||
|
||||
if not all_rnd_rows:
|
||||
continue
|
||||
|
||||
# Sort by start time
|
||||
all_rnd_rows.sort(key=lambda r: r.get('Start Time', ''))
|
||||
|
||||
# Data rows starting at row 8
|
||||
data_start_row = 8
|
||||
for idx, row in enumerate(all_rnd_rows, 1):
|
||||
data_row = data_start_row + idx - 1
|
||||
|
||||
ws.cell(row=data_row, column=1, value=idx).border = thin_border
|
||||
|
||||
start_time_str = row.get('Start Time', '')
|
||||
if start_time_str:
|
||||
try:
|
||||
dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S')
|
||||
ws.cell(row=data_row, column=2, value=dt.date())
|
||||
ws.cell(row=data_row, column=3, value=dt.time())
|
||||
except ValueError:
|
||||
ws.cell(row=data_row, column=2, value=start_time_str)
|
||||
ws.cell(row=data_row, column=3, value='')
|
||||
else:
|
||||
ws.cell(row=data_row, column=2, value='')
|
||||
ws.cell(row=data_row, column=3, value='')
|
||||
|
||||
lmax = row.get('Lmax(Main)')
|
||||
ws.cell(row=data_row, column=4, value=lmax if lmax else '').border = thin_border
|
||||
|
||||
ln1 = row.get('LN1(Main)')
|
||||
ws.cell(row=data_row, column=5, value=ln1 if ln1 else '').border = thin_border
|
||||
|
||||
ln2 = row.get('LN2(Main)')
|
||||
ws.cell(row=data_row, column=6, value=ln2 if ln2 else '').border = thin_border
|
||||
|
||||
ws.cell(row=data_row, column=7, value='').border = thin_border
|
||||
ws.cell(row=data_row, column=2).border = thin_border
|
||||
ws.cell(row=data_row, column=3).border = thin_border
|
||||
|
||||
data_end_row = data_start_row + len(all_rnd_rows) - 1
|
||||
|
||||
# Add Line Chart
|
||||
chart = LineChart()
|
||||
chart.title = f"{location_name}"
|
||||
chart.style = 10
|
||||
chart.y_axis.title = "Sound Level (dBA)"
|
||||
chart.x_axis.title = "Test Increment"
|
||||
chart.height = 12
|
||||
chart.width = 20
|
||||
|
||||
data_ref = Reference(ws, min_col=4, min_row=7, max_col=6, max_row=data_end_row)
|
||||
categories = Reference(ws, min_col=1, min_row=data_start_row, max_row=data_end_row)
|
||||
|
||||
chart.add_data(data_ref, titles_from_data=True)
|
||||
chart.set_categories(categories)
|
||||
|
||||
if len(chart.series) >= 3:
|
||||
chart.series[0].graphicalProperties.line.solidFill = "FF0000"
|
||||
chart.series[1].graphicalProperties.line.solidFill = "00B050"
|
||||
chart.series[2].graphicalProperties.line.solidFill = "0070C0"
|
||||
|
||||
ws.add_chart(chart, "I3")
|
||||
|
||||
# Calculate summary for this location
|
||||
all_lmax = [r.get('Lmax(Main)') for r in all_rnd_rows if isinstance(r.get('Lmax(Main)'), (int, float))]
|
||||
all_ln1 = [r.get('LN1(Main)') for r in all_rnd_rows if isinstance(r.get('LN1(Main)'), (int, float))]
|
||||
all_ln2 = [r.get('LN2(Main)') for r in all_rnd_rows if isinstance(r.get('LN2(Main)'), (int, float))]
|
||||
|
||||
all_location_summaries.append({
|
||||
'location': location_name,
|
||||
'samples': len(all_rnd_rows),
|
||||
'lmax_avg': round(sum(all_lmax) / len(all_lmax), 1) if all_lmax else None,
|
||||
'ln1_avg': round(sum(all_ln1) / len(all_ln1), 1) if all_ln1 else None,
|
||||
'ln2_avg': round(sum(all_ln2) / len(all_ln2), 1) if all_ln2 else None,
|
||||
})
|
||||
|
||||
# Create Summary sheet at the beginning
|
||||
summary_ws = wb.create_sheet(title="Summary", index=0)
|
||||
|
||||
summary_ws['A1'] = f"{report_title} - {project.name} - Summary"
|
||||
summary_ws['A1'].font = title_font
|
||||
summary_ws.merge_cells('A1:E1')
|
||||
|
||||
summary_headers = ['Location', 'Samples', 'LAmax Avg', 'LA01 Avg', 'LA10 Avg']
|
||||
for col, header in enumerate(summary_headers, 1):
|
||||
cell = summary_ws.cell(row=3, column=col, value=header)
|
||||
cell.font = header_font
|
||||
cell.fill = header_fill
|
||||
cell.border = thin_border
|
||||
|
||||
for i, width in enumerate([30, 10, 12, 12, 12], 1):
|
||||
summary_ws.column_dimensions[get_column_letter(i)].width = width
|
||||
|
||||
for idx, loc_summary in enumerate(all_location_summaries, 4):
|
||||
summary_ws.cell(row=idx, column=1, value=loc_summary['location']).border = thin_border
|
||||
summary_ws.cell(row=idx, column=2, value=loc_summary['samples']).border = thin_border
|
||||
summary_ws.cell(row=idx, column=3, value=loc_summary['lmax_avg'] or '-').border = thin_border
|
||||
summary_ws.cell(row=idx, column=4, value=loc_summary['ln1_avg'] or '-').border = thin_border
|
||||
summary_ws.cell(row=idx, column=5, value=loc_summary['ln2_avg'] or '-').border = thin_border
|
||||
|
||||
# Save to buffer
|
||||
output = io.BytesIO()
|
||||
wb.save(output)
|
||||
output.seek(0)
|
||||
|
||||
# Generate filename
|
||||
project_name_clean = "".join(c for c in project.name if c.isalnum() or c in ('_', '-', ' ')).strip()
|
||||
filename = f"{project_name_clean}_combined_report.xlsx"
|
||||
filename = filename.replace(' ', '_')
|
||||
|
||||
return StreamingResponse(
|
||||
output,
|
||||
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
||||
headers={"Content-Disposition": f'attachment; filename="{filename}"'}
|
||||
)
|
||||
|
||||
|
||||
@router.get("/types/list", response_class=HTMLResponse)
|
||||
async def get_project_types(request: Request, db: Session = Depends(get_db)):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user