""" Nightly Report Pipeline — computation core. Builds the data model for the John-Myler-style "last night vs. baseline" sound report. Source-agnostic: it reads the same on-disk Leq `.rnd` files the manual upload + FTP-pull ingest produce (see `project_locations.ingest_nrl_zip`). Design notes ------------ * **Ingest everything, report selectively.** Ingest preserves every column of the Leq file; this layer chooses which *metrics* to surface via `metric_keys` (a future report wizard is just a UI over that list). * **House format match.** Defaults reproduce the existing Excel report: LAmax (max of interval maxima), LA01 / LA10 (arithmetic average), split into Evening (7–10PM) and Nighttime (10PM–7AM) windows. L90 (background) is added for the baseline comparison. * **Metric labelling from the device.** The LN→percentile assignment is reconfigurable per job; we resolve which `LNx(Main)` column is L90/L10/etc. from the percentile map captured in the session metadata at ingest, falling back to the NL-43 default order. * **Correct averaging.** Leq is energy-averaged (logarithmic); percentiles and Lmax are arithmetic. Baseline references combine the per-night values into a "typical night" (arithmetic mean of per-night values — so baseline Lmax is the typical nightly peak, not the worst-of-week). """ from __future__ import annotations import json import logging import math from dataclasses import dataclass, field from datetime import datetime, timedelta, date from typing import Optional from sqlalchemy.orm import Session from backend.models import MonitoringSession, DataFile, MonitoringLocation, Project logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Metric registry # --------------------------------------------------------------------------- @dataclass(frozen=True) class Metric: """A reportable metric. `agg` is the *within-night* aggregation used to collapse a window's 15-min intervals into one value: - "max" → loudest interval (LAmax) - "arith" → arithmetic mean (percentiles: L01/L10/L90…) - "log" → energy/logarithmic mean (Leq only) `column` pins a fixed .rnd column; `percentile` instead resolves the LNx column from the session's captured percentile map. """ key: str label: str agg: str column: Optional[str] = None percentile: Optional[float] = None METRIC_REGISTRY: dict[str, Metric] = { "lmax": Metric("lmax", "LAmax", "max", column="Lmax(Main)"), "leq": Metric("leq", "LAeq", "log", column="Leq(Main)"), "lmin": Metric("lmin", "LAmin", "arith", column="Lmin(Main)"), "l01": Metric("l01", "LA01", "arith", percentile=1.0), "l10": Metric("l10", "LA10", "arith", percentile=10.0), "l50": Metric("l50", "LA50", "arith", percentile=50.0), "l90": Metric("l90", "LA90", "arith", percentile=90.0), "l95": Metric("l95", "LA95", "arith", percentile=95.0), } # House report metrics + L90 (background) for the baseline comparison. DEFAULT_METRICS: list[str] = ["lmax", "l01", "l10", "l90"] # NL-43 default percentile→slot assignment, used when a session has no captured map. _DEFAULT_SLOT_FOR_PCT: dict[float, int] = {1.0: 1, 10.0: 2, 50.0: 3, 90.0: 4, 95.0: 5} def _resolve_column(metric: Metric, pct_map: dict) -> Optional[str]: """Resolve the .rnd column for a metric, using the session's percentile map.""" if metric.column: return metric.column if metric.percentile is None: return None # pct_map: {"1": "1.0", "2": "10.0", "4": "90.0", ...} → slot : percentile if pct_map: for slot, pval in pct_map.items(): try: if float(pval) == metric.percentile: return f"LN{int(slot)}(Main)" except (ValueError, TypeError): continue slot = _DEFAULT_SLOT_FOR_PCT.get(metric.percentile) return f"LN{slot}(Main)" if slot else None # --------------------------------------------------------------------------- # Time windows # --------------------------------------------------------------------------- @dataclass(frozen=True) class Window: key: str label: str start_hour: int end_hour: int def contains(self, hour: int) -> bool: if self.start_hour < self.end_hour: return self.start_hour <= hour < self.end_hour return hour >= self.start_hour or hour < self.end_hour # Matches the existing Excel report's stats table. DEFAULT_WINDOWS: list[Window] = [ Window("evening", "Evening (7PM–10PM)", 19, 22), Window("nighttime", "Nighttime (10PM–7AM)", 22, 7), ] # The full night used to select which intervals belong to "last night". NIGHT_START_HOUR = 19 NIGHT_LENGTH_HOURS = 12 # --------------------------------------------------------------------------- # Aggregation # --------------------------------------------------------------------------- def _aggregate(values: list, method: str) -> Optional[float]: """Collapse a window's interval values into one number per `method`.""" vals = [v for v in values if isinstance(v, (int, float))] if not vals: return None if method == "max": return round(max(vals), 1) if method == "log": return round(10 * math.log10(sum(10 ** (v / 10.0) for v in vals) / len(vals)), 1) return round(sum(vals) / len(vals), 1) # arithmetic def _combine_across_nights(per_night: list, method: str) -> Optional[float]: """Combine per-night window values into a baseline 'typical night' value. Arithmetic mean for max/arith metrics (so baseline Lmax = typical nightly peak, the agreed default), logarithmic mean for Leq. """ vals = [v for v in per_night if v is not None] if not vals: return None if method == "log": return round(10 * math.log10(sum(10 ** (v / 10.0) for v in vals) / len(vals)), 1) return round(sum(vals) / len(vals), 1) # --------------------------------------------------------------------------- # Row gathering # --------------------------------------------------------------------------- def _parse_dt(s: str) -> Optional[datetime]: try: return datetime.strptime(s, "%Y/%m/%d %H:%M:%S") except (ValueError, TypeError): return None def _location_leq_rows(db: Session, location_id: str) -> list[tuple[datetime, dict, dict]]: """All Leq intervals at a location as (interval_dt, row, percentile_map). Reuses the same .rnd readers as the report endpoints so parsing stays identical. Times are the meter's local clock (as written in the file). """ # Lazy import avoids a service→router import cycle at module load. from backend.routers.projects import ( _read_rnd_file_rows, _normalize_rnd_rows, _is_leq_file, _peek_rnd_headers, ) from pathlib import Path out: list[tuple[datetime, dict, dict]] = [] sessions = db.query(MonitoringSession).filter_by( location_id=location_id, session_type="sound", ).all() for s in sessions: try: meta = json.loads(s.session_metadata or "{}") except (json.JSONDecodeError, TypeError): meta = {} pct_map = meta.get("percentiles", {}) or {} for f in db.query(DataFile).filter_by(session_id=s.id).all(): if not f.file_path or not f.file_path.lower().endswith(".rnd"): continue peek = _peek_rnd_headers(Path("data") / f.file_path) if not _is_leq_file(f.file_path, peek): continue rows = _read_rnd_file_rows(f.file_path) rows, _ = _normalize_rnd_rows(rows) for r in rows: dt = _parse_dt(r.get("Start Time", "")) if dt: out.append((dt, r, pct_map)) out.sort(key=lambda t: t[0]) return out def _rows_in_night(rows: list, night_date: date) -> list: """Rows falling in the night that *starts* on night_date (19:00 → +12h).""" start = datetime(night_date.year, night_date.month, night_date.day, NIGHT_START_HOUR, 0) end = start + timedelta(hours=NIGHT_LENGTH_HOURS) return [(dt, r, p) for (dt, r, p) in rows if start <= dt < end] def _eligible_nights(rows: list, start_date: date, end_date: date) -> list[date]: """Evening-dates in [start_date, end_date] that actually have night data.""" nights = [] cur = start_date while cur <= end_date: if _rows_in_night(rows, cur): nights.append(cur) cur += timedelta(days=1) return nights def _window_value(rows: list, metric: Metric, window: Window) -> Optional[float]: """Single aggregated value for one metric over one window of `rows`.""" vals = [] for dt, r, pct_map in rows: if window.contains(dt.hour): col = _resolve_column(metric, pct_map) if col: vals.append(r.get(col)) return _aggregate(vals, metric.agg) # --------------------------------------------------------------------------- # Report data model # --------------------------------------------------------------------------- @dataclass class CellPair: last_night: Optional[float] baseline: Optional[float] @property def delta(self) -> Optional[float]: if self.last_night is None or self.baseline is None: return None return round(self.last_night - self.baseline, 1) @dataclass class LocationNightReport: location_id: str location_name: str night_date: date metrics: list[Metric] windows: list[Window] # table[window_key][metric_key] = CellPair table: dict[str, dict[str, CellPair]] interval_series: list[dict] night_interval_count: int baseline_nights_used: int notes: list[str] = field(default_factory=list) def build_location_night_report( db: Session, location_id: str, night_date: date, *, metric_keys: Optional[list[str]] = None, windows: Optional[list[Window]] = None, baseline_start: Optional[date] = None, baseline_end: Optional[date] = None, ) -> LocationNightReport: """Build the night-vs-baseline data model for one location. `night_date` is the *evening* date of the night being reported (e.g. the 7/7 in "night of 7/7 → morning 7/8"). Baseline is the typical-night value across the eligible nights in [baseline_start, baseline_end]; pass neither to skip the comparison (baseline cells become None). """ metric_keys = metric_keys or DEFAULT_METRICS metrics = [METRIC_REGISTRY[k] for k in metric_keys] windows = windows or DEFAULT_WINDOWS loc = db.query(MonitoringLocation).filter_by(id=location_id).first() loc_name = loc.name if loc else location_id all_rows = _location_leq_rows(db, location_id) night_rows = _rows_in_night(all_rows, night_date) baseline_nights: list[date] = [] if baseline_start and baseline_end: baseline_nights = _eligible_nights(all_rows, baseline_start, baseline_end) # Don't let the reported night double as its own baseline. baseline_nights = [n for n in baseline_nights if n != night_date] table: dict[str, dict[str, CellPair]] = {} for w in windows: table[w.key] = {} for m in metrics: last_night_val = _window_value(night_rows, m, w) baseline_val = None if baseline_nights: per_night = [ _window_value(_rows_in_night(all_rows, nd), m, w) for nd in baseline_nights ] baseline_val = _combine_across_nights(per_night, m.agg) table[w.key][m.key] = CellPair(last_night_val, baseline_val) interval_series = [] for dt, r, pct_map in night_rows: entry = {"dt": dt, "time": dt.strftime("%H:%M")} for m in metrics: col = _resolve_column(m, pct_map) val = r.get(col) if col else None entry[m.key] = val if isinstance(val, (int, float)) else None interval_series.append(entry) notes: list[str] = [] if not night_rows: notes.append(f"No data found for the night of {night_date:%m/%d/%y}.") if (baseline_start or baseline_end) and not baseline_nights: notes.append("No baseline nights with data in the configured range.") return LocationNightReport( location_id=location_id, location_name=loc_name, night_date=night_date, metrics=metrics, windows=windows, table=table, interval_series=interval_series, night_interval_count=len(night_rows), baseline_nights_used=len(baseline_nights), notes=notes, ) @dataclass class ProjectNightReport: project_id: str project_name: str night_date: date metrics: list[Metric] locations: list[LocationNightReport] def build_project_night_report( db: Session, project_id: str, night_date: date, *, metric_keys: Optional[list[str]] = None, windows: Optional[list[Window]] = None, baseline_start: Optional[date] = None, baseline_end: Optional[date] = None, ) -> ProjectNightReport: """Build the night report for every active sound location in a project.""" metric_keys = metric_keys or DEFAULT_METRICS project = db.query(Project).filter_by(id=project_id).first() project_name = project.name if project else project_id locations = db.query(MonitoringLocation).filter_by( project_id=project_id, location_type="sound", ).order_by(MonitoringLocation.sort_order, MonitoringLocation.name).all() locations = [l for l in locations if getattr(l, "removed_at", None) is None] reports = [ build_location_night_report( db, loc.id, night_date, metric_keys=metric_keys, windows=windows, baseline_start=baseline_start, baseline_end=baseline_end, ) for loc in locations ] return ProjectNightReport( project_id=project_id, project_name=project_name, night_date=night_date, metrics=[METRIC_REGISTRY[k] for k in metric_keys], locations=reports, )