feat: support for day time monitoring data, combined report generation now compaitible with mixed day and night types.

This commit is contained in:
2026-03-07 00:16:58 +00:00
parent 67a2faa2d3
commit f89f04cd6f
6 changed files with 941 additions and 399 deletions

View File

@@ -0,0 +1,131 @@
#!/usr/bin/env python3
"""
Migration: Add session_label and period_type columns to monitoring_sessions.
session_label - user-editable display name, e.g. "NRL-1 Sun 2/23 Night"
period_type - one of: weekday_day | weekday_night | weekend_day | weekend_night
Auto-derived from started_at when NULL.
Period definitions (used in report stats table):
weekday_day Mon-Fri 07:00-22:00 -> Daytime (7AM-10PM)
weekday_night Mon-Fri 22:00-07:00 -> Nighttime (10PM-7AM)
weekend_day Sat-Sun 07:00-22:00 -> Daytime (7AM-10PM)
weekend_night Sat-Sun 22:00-07:00 -> Nighttime (10PM-7AM)
Run once inside the Docker container:
docker exec terra-view python3 backend/migrate_add_session_period_type.py
"""
from pathlib import Path
from datetime import datetime
DB_PATH = Path("data/seismo_fleet.db")
def _derive_period_type(started_at_str: str) -> str | None:
"""Derive period_type from a started_at ISO datetime string."""
if not started_at_str:
return None
try:
dt = datetime.fromisoformat(started_at_str)
except ValueError:
return None
is_weekend = dt.weekday() >= 5 # 5=Sat, 6=Sun
is_night = dt.hour >= 22 or dt.hour < 7
if is_weekend:
return "weekend_night" if is_night else "weekend_day"
else:
return "weekday_night" if is_night else "weekday_day"
def _build_label(started_at_str: str, location_name: str | None, period_type: str | None) -> str | None:
"""Build a human-readable session label."""
if not started_at_str:
return None
try:
dt = datetime.fromisoformat(started_at_str)
except ValueError:
return None
day_abbr = dt.strftime("%a") # Mon, Tue, Sun, etc.
date_str = dt.strftime("%-m/%-d") # 2/23
period_labels = {
"weekday_day": "Day",
"weekday_night": "Night",
"weekend_day": "Day",
"weekend_night": "Night",
}
period_str = period_labels.get(period_type or "", "")
parts = []
if location_name:
parts.append(location_name)
parts.append(f"{day_abbr} {date_str}")
if period_str:
parts.append(period_str)
return "".join(parts)
def migrate():
import sqlite3
if not DB_PATH.exists():
print(f"Database not found at {DB_PATH}. Are you running from /home/serversdown/terra-view?")
return
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
# 1. Add columns (idempotent)
cur.execute("PRAGMA table_info(monitoring_sessions)")
existing_cols = {row["name"] for row in cur.fetchall()}
for col, typedef in [("session_label", "TEXT"), ("period_type", "TEXT")]:
if col not in existing_cols:
cur.execute(f"ALTER TABLE monitoring_sessions ADD COLUMN {col} {typedef}")
conn.commit()
print(f"✓ Added column {col} to monitoring_sessions")
else:
print(f"○ Column {col} already exists — skipping ALTER TABLE")
# 2. Backfill existing rows
cur.execute(
"""SELECT ms.id, ms.started_at, ms.location_id
FROM monitoring_sessions ms
WHERE ms.period_type IS NULL OR ms.session_label IS NULL"""
)
sessions = cur.fetchall()
print(f"Backfilling {len(sessions)} session(s)...")
updated = 0
for row in sessions:
session_id = row["id"]
started_at = row["started_at"]
location_id = row["location_id"]
# Look up location name
location_name = None
if location_id:
cur.execute("SELECT name FROM monitoring_locations WHERE id = ?", (location_id,))
loc_row = cur.fetchone()
if loc_row:
location_name = loc_row["name"]
period_type = _derive_period_type(started_at)
label = _build_label(started_at, location_name, period_type)
cur.execute(
"UPDATE monitoring_sessions SET period_type = ?, session_label = ? WHERE id = ?",
(period_type, label, session_id),
)
updated += 1
conn.commit()
conn.close()
print(f"✓ Backfilled {updated} session(s).")
print("Migration complete.")
if __name__ == "__main__":
migrate()

View File

@@ -272,6 +272,14 @@ class MonitoringSession(Base):
duration_seconds = Column(Integer, nullable=True)
status = Column(String, default="recording") # recording, completed, failed
# Human-readable label auto-derived from date/location, editable by user.
# e.g. "NRL-1 — Sun 2/23 — Night"
session_label = Column(String, nullable=True)
# Period classification for report stats columns.
# weekday_day | weekday_night | weekend_day | weekend_night
period_type = Column(String, nullable=True)
# Snapshot of device configuration at recording time
session_metadata = Column(Text, nullable=True) # JSON

View File

@@ -35,6 +35,37 @@ from backend.templates_config import templates
router = APIRouter(prefix="/api/projects/{project_id}", tags=["project-locations"])
# ============================================================================
# Session period helpers
# ============================================================================
def _derive_period_type(dt: datetime) -> str:
"""
Classify a session start time into one of four period types.
Night = 22:0007:00, Day = 07:0022:00.
Weekend = Saturday (5) or Sunday (6).
"""
is_weekend = dt.weekday() >= 5
is_night = dt.hour >= 22 or dt.hour < 7
if is_weekend:
return "weekend_night" if is_night else "weekend_day"
return "weekday_night" if is_night else "weekday_day"
def _build_session_label(dt: datetime, location_name: str, period_type: str) -> str:
"""Build a human-readable session label, e.g. 'NRL-1 — Sun 2/23 — Night'."""
day_abbr = dt.strftime("%a")
date_str = f"{dt.month}/{dt.day}"
period_str = {
"weekday_day": "Day",
"weekday_night": "Night",
"weekend_day": "Day",
"weekend_night": "Night",
}.get(period_type, "")
parts = [p for p in [location_name, f"{day_abbr} {date_str}", period_str] if p]
return "".join(parts)
# ============================================================================
# Monitoring Locations CRUD
# ============================================================================
@@ -676,6 +707,9 @@ async def upload_nrl_data(
index_number = rnh_meta.get("index_number", "")
# --- Step 3: Create MonitoringSession ---
period_type = _derive_period_type(started_at) if started_at else None
session_label = _build_session_label(started_at, location.name, period_type) if started_at else None
session_id = str(uuid.uuid4())
monitoring_session = MonitoringSession(
id=session_id,
@@ -687,6 +721,8 @@ async def upload_nrl_data(
stopped_at=stopped_at,
duration_seconds=duration_seconds,
status="completed",
session_label=session_label,
period_type=period_type,
session_metadata=json.dumps({
"source": "manual_upload",
"store_name": store_name,

View File

@@ -1794,6 +1794,34 @@ async def delete_session(
})
VALID_PERIOD_TYPES = {"weekday_day", "weekday_night", "weekend_day", "weekend_night"}
@router.patch("/{project_id}/sessions/{session_id}")
async def patch_session(
project_id: str,
session_id: str,
data: dict,
db: Session = Depends(get_db),
):
"""Update session_label and/or period_type on a monitoring session."""
session = db.query(MonitoringSession).filter_by(id=session_id).first()
if not session:
raise HTTPException(status_code=404, detail="Session not found")
if session.project_id != project_id:
raise HTTPException(status_code=403, detail="Session does not belong to this project")
if "session_label" in data:
session.session_label = str(data["session_label"]).strip() or None
if "period_type" in data:
pt = data["period_type"]
if pt and pt not in VALID_PERIOD_TYPES:
raise HTTPException(status_code=400, detail=f"Invalid period_type. Must be one of: {', '.join(sorted(VALID_PERIOD_TYPES))}")
session.period_type = pt or None
db.commit()
return JSONResponse({"status": "success", "session_label": session.session_label, "period_type": session.period_type})
@router.get("/{project_id}/files/{file_id}/view-rnd", response_class=HTMLResponse)
async def view_rnd_file(
request: Request,
@@ -3277,32 +3305,59 @@ async def combined_report_wizard(
):
"""Configuration page for the combined multi-location report wizard."""
from backend.models import ReportTemplate
from pathlib import Path as _Path
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
sessions = db.query(MonitoringSession).filter_by(project_id=project_id).all()
sessions = db.query(MonitoringSession).filter_by(project_id=project_id).order_by(MonitoringSession.started_at).all()
# Build location list with Leq file counts (no filtering)
location_file_counts: dict = {}
# Build location -> sessions list, only including sessions that have Leq files
location_sessions: dict = {} # loc_name -> list of session dicts
for session in sessions:
files = db.query(DataFile).filter_by(session_id=session.id).all()
has_leq = False
for file in files:
if not file.file_path or not file.file_path.lower().endswith('.rnd'):
continue
from pathlib import Path as _Path
abs_path = _Path("data") / file.file_path
peek = _peek_rnd_headers(abs_path)
if not _is_leq_file(file.file_path, peek):
continue
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None
loc_name = location.name if location else f"Session {session.id[:8]}"
location_file_counts[loc_name] = location_file_counts.get(loc_name, 0) + 1
if _is_leq_file(file.file_path, peek):
has_leq = True
break
if not has_leq:
continue
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None
loc_name = location.name if location else f"Session {session.id[:8]}"
if loc_name not in location_sessions:
location_sessions[loc_name] = []
# Build a display date and day-of-week from started_at
date_display = ""
day_of_week = ""
if session.started_at:
date_display = session.started_at.strftime("%-m/%-d/%Y")
day_of_week = session.started_at.strftime("%A") # Monday, Sunday, etc.
location_sessions[loc_name].append({
"session_id": session.id,
"session_label": session.session_label or "",
"date_display": date_display,
"day_of_week": day_of_week,
"started_at": session.started_at.isoformat() if session.started_at else "",
"stopped_at": session.stopped_at.isoformat() if session.stopped_at else "",
"duration_h": (session.duration_seconds // 3600) if session.duration_seconds else 0,
"duration_m": ((session.duration_seconds % 3600) // 60) if session.duration_seconds else 0,
"period_type": session.period_type or "",
"status": session.status,
})
locations = [
{"name": name, "file_count": count}
for name, count in sorted(location_file_counts.items())
{"name": name, "sessions": sess_list}
for name, sess_list in sorted(location_sessions.items())
]
report_templates = db.query(ReportTemplate).all()
@@ -3312,10 +3367,111 @@ async def combined_report_wizard(
"project": project,
"project_id": project_id,
"locations": locations,
"locations_json": json.dumps(locations),
"report_templates": report_templates,
})
def _build_location_data_from_sessions(project_id: str, db, selected_session_ids: list) -> dict:
"""
Build per-location spreadsheet data using an explicit list of session IDs.
Only rows from those sessions are included. Per-session period_type is
stored on each row so the report can filter stats correctly.
"""
from pathlib import Path as _Path
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
if not selected_session_ids:
raise HTTPException(status_code=400, detail="No sessions selected.")
# Load every requested session — one entry per (session_id, loc_name) pair.
# Keyed by session_id so overnight sessions are never split by calendar date.
session_entries: dict = {} # session_id -> {loc_name, session_label, period_type, rows[]}
for session_id in selected_session_ids:
session = db.query(MonitoringSession).filter_by(id=session_id, project_id=project_id).first()
if not session:
continue
location = db.query(MonitoringLocation).filter_by(id=session.location_id).first() if session.location_id else None
loc_name = location.name if location else f"Session {session_id[:8]}"
session_entries[session_id] = {
"loc_name": loc_name,
"session_label": session.session_label or "",
"period_type": session.period_type or "",
"started_at": session.started_at,
"rows": [],
}
files = db.query(DataFile).filter_by(session_id=session_id).all()
for file in files:
if not file.file_path or not file.file_path.lower().endswith('.rnd'):
continue
abs_path = _Path("data") / file.file_path
peek = _peek_rnd_headers(abs_path)
if not _is_leq_file(file.file_path, peek):
continue
rows = _read_rnd_file_rows(file.file_path)
rows, _ = _normalize_rnd_rows(rows)
session_entries[session_id]["rows"].extend(rows)
if not any(e["rows"] for e in session_entries.values()):
raise HTTPException(status_code=404, detail="No Leq data found in the selected sessions.")
location_data = []
for session_id in selected_session_ids:
entry = session_entries.get(session_id)
if not entry or not entry["rows"]:
continue
loc_name = entry["loc_name"]
period_type = entry["period_type"]
raw_rows = sorted(entry["rows"], key=lambda r: r.get('Start Time', ''))
spreadsheet_data = []
for idx, row in enumerate(raw_rows, 1):
start_time_str = row.get('Start Time', '')
date_str = time_str = ''
if start_time_str:
try:
dt = datetime.strptime(start_time_str, '%Y/%m/%d %H:%M:%S')
date_str = dt.strftime('%Y-%m-%d')
time_str = dt.strftime('%H:%M')
except ValueError:
date_str = start_time_str
lmax = row.get('Lmax(Main)', '')
ln1 = row.get('LN1(Main)', '')
ln2 = row.get('LN2(Main)', '')
spreadsheet_data.append([
idx,
date_str,
time_str,
lmax if lmax else '',
ln1 if ln1 else '',
ln2 if ln2 else '',
'',
period_type, # col index 7 — hidden, used by report gen for day/night bucketing
])
location_data.append({
"session_id": session_id,
"location_name": loc_name,
"session_label": entry["session_label"],
"period_type": period_type,
"started_at": entry["started_at"].isoformat() if entry["started_at"] else "",
"raw_count": len(raw_rows),
"filtered_count": len(raw_rows),
"spreadsheet_data": spreadsheet_data,
})
return {"project": project, "location_data": location_data}
@router.get("/{project_id}/combined-report-preview", response_class=HTMLResponse)
async def combined_report_preview(
request: Request,
@@ -3323,38 +3479,19 @@ async def combined_report_preview(
report_title: str = Query("Background Noise Study"),
project_name: str = Query(""),
client_name: str = Query(""),
start_time: str = Query(""),
end_time: str = Query(""),
start_date: str = Query(""),
end_date: str = Query(""),
enabled_locations: str = Query(""),
selected_sessions: str = Query(""), # comma-separated session IDs
db: Session = Depends(get_db),
):
"""Preview and edit combined report data before generating the Excel file."""
enabled_list = [loc.strip() for loc in enabled_locations.split(',') if loc.strip()] if enabled_locations else None
session_ids = [s.strip() for s in selected_sessions.split(',') if s.strip()] if selected_sessions else []
result = _build_combined_location_data(
project_id, db,
start_time=start_time,
end_time=end_time,
start_date=start_date,
end_date=end_date,
enabled_locations=enabled_list,
)
result = _build_location_data_from_sessions(project_id, db, session_ids)
project = result["project"]
location_data = result["location_data"]
total_rows = sum(loc["filtered_count"] for loc in location_data)
final_project_name = project_name if project_name else project.name
# Build time filter display string
time_filter_desc = ""
if start_time and end_time:
time_filter_desc = f"{start_time} {end_time}"
elif start_time or end_time:
time_filter_desc = f"{start_time or ''} {end_time or ''}"
return templates.TemplateResponse("combined_report_preview.html", {
"request": request,
"project": project,
@@ -3362,11 +3499,7 @@ async def combined_report_preview(
"report_title": report_title,
"project_name": final_project_name,
"client_name": client_name,
"start_time": start_time,
"end_time": end_time,
"start_date": start_date,
"end_date": end_date,
"time_filter_desc": time_filter_desc,
"time_filter_desc": f"{len(session_ids)} session{'s' if len(session_ids) != 1 else ''} selected",
"location_data": location_data,
"locations_json": json.dumps(location_data),
"total_rows": total_rows,
@@ -3474,13 +3607,14 @@ async def generate_combined_from_preview(
b_inner = last_inner if is_last else data_inner
b_right = last_right if is_last else data_right
test_num = row[0] if len(row) > 0 else row_idx + 1
date_val = row[1] if len(row) > 1 else ''
time_val = row[2] if len(row) > 2 else ''
lmax = row[3] if len(row) > 3 else ''
ln1 = row[4] if len(row) > 4 else ''
ln2 = row[5] if len(row) > 5 else ''
comment = row[6] if len(row) > 6 else ''
test_num = row[0] if len(row) > 0 else row_idx + 1
date_val = row[1] if len(row) > 1 else ''
time_val = row[2] if len(row) > 2 else ''
lmax = row[3] if len(row) > 3 else ''
ln1 = row[4] if len(row) > 4 else ''
ln2 = row[5] if len(row) > 5 else ''
comment = row[6] if len(row) > 6 else ''
row_period = row[7] if len(row) > 7 else '' # hidden period_type from session
c = ws.cell(row=dr, column=1, value=test_num)
c.font = f_data; c.alignment = center_a; c.border = b_left
@@ -3505,15 +3639,8 @@ async def generate_combined_from_preview(
if isinstance(ln2, (int, float)):
ln2_vals.append(ln2)
if time_val and isinstance(lmax, (int, float)) and isinstance(ln1, (int, float)) and isinstance(ln2, (int, float)):
try:
try:
row_dt = datetime.strptime(str(time_val), '%H:%M')
except ValueError:
row_dt = datetime.strptime(str(time_val), '%H:%M:%S')
parsed_rows.append((row_dt, float(lmax), float(ln1), float(ln2)))
except (ValueError, TypeError):
pass
if isinstance(lmax, (int, float)) and isinstance(ln1, (int, float)) and isinstance(ln2, (int, float)):
parsed_rows.append((row_period, float(lmax), float(ln1), float(ln2)))
data_end_row = data_start_row + len(day_rows) - 1
@@ -3548,44 +3675,109 @@ async def generate_combined_from_preview(
ws.merge_cells(start_row=29, start_column=9, end_row=29, end_column=14)
hdr_fill_tbl = PatternFill(start_color="F2F2F2", end_color="F2F2F2", fill_type="solid")
c = ws.cell(row=31, column=9, value=""); c.border = tbl_top_left; c.font = f_bold
c = ws.cell(row=31, column=10, value="Evening (7PM to 10PM)")
c.font = f_bold; c.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
c.border = tbl_top_mid; c.fill = hdr_fill_tbl
ws.merge_cells(start_row=31, start_column=10, end_row=31, end_column=11)
c = ws.cell(row=31, column=12, value="Nighttime (10PM to 7AM)")
c.font = f_bold; c.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
c.border = tbl_top_right; c.fill = hdr_fill_tbl
ws.merge_cells(start_row=31, start_column=12, end_row=31, end_column=13)
ws.row_dimensions[31].height = 15
evening = [(lmx, l1, l2) for dt, lmx, l1, l2 in parsed_rows if 19 <= dt.hour < 22]
nighttime = [(lmx, l1, l2) for dt, lmx, l1, l2 in parsed_rows if dt.hour >= 22 or dt.hour < 7]
def _avg(vals): return round(sum(vals) / len(vals), 1) if vals else None
def _max(vals): return round(max(vals), 1) if vals else None
def write_stat(row_num, label, eve_val, night_val, is_last=False):
# --- Dynamic period detection ----------------------------------------
# Use the period_type stored on each row (from the session record).
# Rows without a period_type fall back to time-of-day detection.
# The four canonical types map to two display columns:
# Day -> "Daytime (7AM to 10PM)"
# Night -> "Nighttime (10PM to 7AM)"
PERIOD_TYPE_IS_DAY = {"weekday_day", "weekend_day"}
PERIOD_TYPE_IS_NIGHT = {"weekday_night", "weekend_night"}
day_rows_data = []
night_rows_data = []
for pt, lmx, l1, l2 in parsed_rows:
if pt in PERIOD_TYPE_IS_DAY:
day_rows_data.append((lmx, l1, l2))
elif pt in PERIOD_TYPE_IS_NIGHT:
night_rows_data.append((lmx, l1, l2))
else:
# No period_type — fall back to time-of-day (shouldn't happen for
# new uploads, but handles legacy data gracefully)
# We can't derive from time here since parsed_rows no longer stores dt.
# Put in day as a safe default.
day_rows_data.append((lmx, l1, l2))
all_candidate_periods = [
("Daytime (7AM to 10PM)", day_rows_data),
("Nighttime (10PM to 7AM)", night_rows_data),
]
active_periods = [(label, rows) for label, rows in all_candidate_periods if rows]
# If nothing at all, show both columns empty
if not active_periods:
active_periods = [("Daytime (7AM to 10PM)", []), ("Nighttime (10PM to 7AM)", [])]
# Build header row (row 31) with one merged pair of columns per active period
# Layout: col 9 = row label, then pairs: (10,11), (12,13), (14,15)
num_periods = len(active_periods)
period_start_cols = [10 + i * 2 for i in range(num_periods)]
# Left/right border helpers for the header row
def _hdr_border(i, n):
is_first = (i == 0)
is_last = (i == n - 1)
return Border(
left=med if is_first else thin,
right=med if is_last else thin,
top=med,
bottom=thin,
)
def _mid_border(i, n, is_data_last=False):
is_first = (i == 0)
is_last = (i == n - 1)
b = tbl_bot_mid if is_data_last else tbl_mid_mid
return Border(
left=med if is_first else thin,
right=med if is_last else thin,
top=b.top,
bottom=b.bottom,
)
c = ws.cell(row=31, column=9, value=""); c.border = tbl_top_left; c.font = f_bold
ws.row_dimensions[31].height = 30
for i, (period_label, _) in enumerate(active_periods):
sc = period_start_cols[i]
is_last_col = (i == num_periods - 1)
c = ws.cell(row=31, column=sc, value=period_label.replace('\n', ' '))
c.font = f_bold
c.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
c.border = _hdr_border(i, num_periods)
c.fill = hdr_fill_tbl
ws.merge_cells(start_row=31, start_column=sc, end_row=31, end_column=sc + 1)
def write_stat_dynamic(row_num, row_label, period_vals_list, is_last=False):
bl = tbl_bot_left if is_last else tbl_mid_left
bm = tbl_bot_mid if is_last else tbl_mid_mid
br = tbl_bot_right if is_last else tbl_mid_right
lbl = ws.cell(row=row_num, column=9, value=label)
lbl = ws.cell(row=row_num, column=9, value=row_label)
lbl.font = f_data; lbl.border = bl
lbl.alignment = Alignment(horizontal='left', vertical='center')
ev_str = f"{eve_val} dBA" if eve_val is not None else ""
ev = ws.cell(row=row_num, column=10, value=ev_str)
ev.font = f_bold; ev.border = bm
ev.alignment = Alignment(horizontal='center', vertical='center')
ws.merge_cells(start_row=row_num, start_column=10, end_row=row_num, end_column=11)
ni_str = f"{night_val} dBA" if night_val is not None else ""
ni = ws.cell(row=row_num, column=12, value=ni_str)
ni.font = f_bold; ni.border = br
ni.alignment = Alignment(horizontal='center', vertical='center')
ws.merge_cells(start_row=row_num, start_column=12, end_row=row_num, end_column=13)
n = len(period_vals_list)
for i, val in enumerate(period_vals_list):
sc = period_start_cols[i]
is_last_col = (i == n - 1)
val_str = f"{val} dBA" if val is not None else ""
c = ws.cell(row=row_num, column=sc, value=val_str)
c.font = f_bold
c.alignment = Alignment(horizontal='center', vertical='center')
c.border = Border(
left=med if i == 0 else thin,
right=med if is_last_col else thin,
top=tbl_bot_mid.top if is_last else tbl_mid_mid.top,
bottom=tbl_bot_mid.bottom if is_last else tbl_mid_mid.bottom,
)
ws.merge_cells(start_row=row_num, start_column=sc, end_row=row_num, end_column=sc + 1)
write_stat(32, "LAmax", _max([v[0] for v in evening]), _max([v[0] for v in nighttime]))
write_stat(33, "LA01 Average", _avg([v[1] for v in evening]), _avg([v[1] for v in nighttime]))
write_stat(34, "LA10 Average", _avg([v[2] for v in evening]), _avg([v[2] for v in nighttime]), is_last=True)
write_stat_dynamic(32, "LAmax",
[_max([v[0] for v in rows]) for _, rows in active_periods])
write_stat_dynamic(33, "LA01 Average",
[_avg([v[1] for v in rows]) for _, rows in active_periods])
write_stat_dynamic(34, "LA10 Average",
[_avg([v[2] for v in rows]) for _, rows in active_periods], is_last=True)
ws.sheet_properties.pageSetUpPr = PageSetupProperties(fitToPage=False)
ws.page_setup.orientation = 'portrait'
@@ -3624,58 +3816,58 @@ async def generate_combined_from_preview(
summary_ws.cell(row=idx, column=5, value=s['ln2_avg'] or '-').border = thin_border
# ----------------------------------------------------------------
# Split each location's rows by date, collect all unique dates
# Build one workbook per session (each location entry is one session)
# ----------------------------------------------------------------
# Structure: dates_map[date_str][loc_name] = [row, ...]
dates_map: dict = {}
for loc_info in locations:
loc_name = loc_info.get("location_name", "Unknown")
rows = loc_info.get("spreadsheet_data", [])
for row in rows:
date_val = str(row[1]).strip() if len(row) > 1 else ''
if not date_val:
date_val = "Unknown Date"
dates_map.setdefault(date_val, {}).setdefault(loc_name, []).append(row)
if not locations:
raise HTTPException(status_code=400, detail="No location data provided")
if not dates_map:
raise HTTPException(status_code=400, detail="No data rows found in provided location data")
sorted_dates = sorted(dates_map.keys())
project_name_clean = "".join(c for c in project_name if c.isalnum() or c in ('_', '-', ' ')).strip().replace(' ', '_')
final_title = f"{report_title} - {project_name}"
# ----------------------------------------------------------------
# Build one workbook per day, zip them
# ----------------------------------------------------------------
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for date_str in sorted_dates:
loc_data_for_day = dates_map[date_str]
final_title = f"{report_title} - {project_name}"
for loc_info in locations:
loc_name = loc_info.get("location_name", "Unknown")
session_label = loc_info.get("session_label", "")
period_type = loc_info.get("period_type", "")
started_at_str = loc_info.get("started_at", "")
rows = loc_info.get("spreadsheet_data", [])
if not rows:
continue
# Re-number interval # sequentially
for i, row in enumerate(rows):
if len(row) > 0:
row[0] = i + 1
wb = openpyxl.Workbook()
wb.remove(wb.active)
loc_summaries = []
for loc_name in sorted(loc_data_for_day.keys()):
day_rows = loc_data_for_day[loc_name]
# Re-number interval # sequentially for this day
for i, row in enumerate(day_rows):
if len(row) > 0:
row[0] = i + 1
safe_sheet = "".join(c for c in loc_name if c.isalnum() or c in (' ', '-', '_'))[:31]
ws = wb.create_sheet(title=safe_sheet)
summary = _build_location_sheet(ws, loc_name, rows, final_title)
safe_name = "".join(c for c in loc_name if c.isalnum() or c in (' ', '-', '_'))[:31]
ws = wb.create_sheet(title=safe_name)
summary = _build_location_sheet(ws, loc_name, day_rows, final_title)
loc_summaries.append(summary)
# Derive a date label for the summary sheet from started_at or first row
day_label = session_label or loc_name
if started_at_str:
try:
_dt = datetime.fromisoformat(started_at_str)
day_label = _dt.strftime('%-m/%-d/%Y')
if session_label:
day_label = session_label
except Exception:
pass
_build_summary_sheet(wb, date_str, project_name, loc_summaries)
_build_summary_sheet(wb, day_label, project_name, [summary])
xlsx_buf = io.BytesIO()
wb.save(xlsx_buf)
xlsx_buf.seek(0)
date_clean = date_str.replace('/', '-').replace(' ', '_')
xlsx_name = f"{project_name_clean}_{date_clean}_report.xlsx"
# Build a clean filename from label or location+date
label_clean = session_label or loc_name
label_clean = "".join(c for c in label_clean if c.isalnum() or c in (' ', '-', '_', '/')).strip().replace(' ', '_').replace('/', '-')
xlsx_name = f"{project_name_clean}_{label_clean}_report.xlsx"
zf.writestr(xlsx_name, xlsx_buf.read())
zip_buffer.seek(0)