c1b5efae56
Baseline can now come from fixed values typed per location, not just captured
data — for a spec limit ("L10 = 85") or a prior report's averages when the raw
data isn't available.
- SoundReportConfig.baseline_mode ("captured" | "reference").
- report_pipeline: _location_reference_baseline() reads per-location values from
location_metadata; build_*_night_report honor baseline_mode (reference cells
use the typed value; unset metrics compare against nothing).
- reports router: GET/PUT /reports/baseline (mode on config + per-location values
in location_metadata); config carries baseline_mode; manual view/run fall back
to the saved config's baseline when no explicit dates are given.
- orchestrator + scheduler tick thread baseline_mode through.
Verified end-to-end: PUT/GET /baseline, reference deltas (L10 66.6 vs 85 -> -18.4),
unset metrics compare against nothing, captured-mode regression intact.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
433 lines
16 KiB
Python
433 lines
16 KiB
Python
"""
|
||
Nightly Report Pipeline — computation core.
|
||
|
||
Builds the data model for the John-Myler-style "last night vs. baseline" sound
|
||
report. Source-agnostic: it reads the same on-disk Leq `.rnd` files the manual
|
||
upload + FTP-pull ingest produce (see `project_locations.ingest_nrl_zip`).
|
||
|
||
Design notes
|
||
------------
|
||
* **Ingest everything, report selectively.** Ingest preserves every column of
|
||
the Leq file; this layer chooses which *metrics* to surface via `metric_keys`
|
||
(a future report wizard is just a UI over that list).
|
||
* **House format match.** Defaults reproduce the existing Excel report:
|
||
LAmax (max of interval maxima), LA01 / LA10 (arithmetic average), split into
|
||
Evening (7–10PM) and Nighttime (10PM–7AM) windows. L90 (background) is added
|
||
for the baseline comparison.
|
||
* **Metric labelling from the device.** The LN→percentile assignment is
|
||
reconfigurable per job; we resolve which `LNx(Main)` column is L90/L10/etc.
|
||
from the percentile map captured in the session metadata at ingest, falling
|
||
back to the NL-43 default order.
|
||
* **Correct averaging.** Leq is energy-averaged (logarithmic); percentiles and
|
||
Lmax are arithmetic. Baseline references combine the per-night values into a
|
||
"typical night" (arithmetic mean of per-night values — so baseline Lmax is the
|
||
typical nightly peak, not the worst-of-week).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import math
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timedelta, date
|
||
from typing import Optional
|
||
|
||
from sqlalchemy.orm import Session
|
||
|
||
from backend.models import MonitoringSession, DataFile, MonitoringLocation, Project
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Metric registry
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class Metric:
|
||
"""A reportable metric.
|
||
|
||
`agg` is the *within-night* aggregation used to collapse a window's 15-min
|
||
intervals into one value:
|
||
- "max" → loudest interval (LAmax)
|
||
- "arith" → arithmetic mean (percentiles: L01/L10/L90…)
|
||
- "log" → energy/logarithmic mean (Leq only)
|
||
`column` pins a fixed .rnd column; `percentile` instead resolves the LNx
|
||
column from the session's captured percentile map.
|
||
"""
|
||
key: str
|
||
label: str
|
||
agg: str
|
||
column: Optional[str] = None
|
||
percentile: Optional[float] = None
|
||
|
||
|
||
METRIC_REGISTRY: dict[str, Metric] = {
|
||
"lmax": Metric("lmax", "LAmax", "max", column="Lmax(Main)"),
|
||
"leq": Metric("leq", "LAeq", "log", column="Leq(Main)"),
|
||
"lmin": Metric("lmin", "LAmin", "arith", column="Lmin(Main)"),
|
||
"l01": Metric("l01", "LA01", "arith", percentile=1.0),
|
||
"l10": Metric("l10", "LA10", "arith", percentile=10.0),
|
||
"l50": Metric("l50", "LA50", "arith", percentile=50.0),
|
||
"l90": Metric("l90", "LA90", "arith", percentile=90.0),
|
||
"l95": Metric("l95", "LA95", "arith", percentile=95.0),
|
||
}
|
||
|
||
# House report metrics + L90 (background) for the baseline comparison.
|
||
DEFAULT_METRICS: list[str] = ["lmax", "l01", "l10", "l90"]
|
||
|
||
# NL-43 default percentile→slot assignment, used when a session has no captured map.
|
||
_DEFAULT_SLOT_FOR_PCT: dict[float, int] = {1.0: 1, 10.0: 2, 50.0: 3, 90.0: 4, 95.0: 5}
|
||
|
||
|
||
def _resolve_column(metric: Metric, pct_map: dict) -> Optional[str]:
|
||
"""Resolve the .rnd column for a metric, using the session's percentile map."""
|
||
if metric.column:
|
||
return metric.column
|
||
if metric.percentile is None:
|
||
return None
|
||
# pct_map: {"1": "1.0", "2": "10.0", "4": "90.0", ...} → slot : percentile
|
||
if pct_map:
|
||
for slot, pval in pct_map.items():
|
||
try:
|
||
if float(pval) == metric.percentile:
|
||
return f"LN{int(slot)}(Main)"
|
||
except (ValueError, TypeError):
|
||
continue
|
||
slot = _DEFAULT_SLOT_FOR_PCT.get(metric.percentile)
|
||
return f"LN{slot}(Main)" if slot else None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Time windows
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class Window:
|
||
key: str
|
||
label: str
|
||
start_hour: int
|
||
end_hour: int
|
||
|
||
def contains(self, hour: int) -> bool:
|
||
if self.start_hour < self.end_hour:
|
||
return self.start_hour <= hour < self.end_hour
|
||
return hour >= self.start_hour or hour < self.end_hour
|
||
|
||
|
||
# Matches the existing Excel report's stats table.
|
||
DEFAULT_WINDOWS: list[Window] = [
|
||
Window("evening", "Evening (7PM–10PM)", 19, 22),
|
||
Window("nighttime", "Nighttime (10PM–7AM)", 22, 7),
|
||
]
|
||
|
||
# The full night used to select which intervals belong to "last night".
|
||
NIGHT_START_HOUR = 19
|
||
NIGHT_LENGTH_HOURS = 12
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Aggregation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _aggregate(values: list, method: str) -> Optional[float]:
|
||
"""Collapse a window's interval values into one number per `method`."""
|
||
vals = [v for v in values if isinstance(v, (int, float))]
|
||
if not vals:
|
||
return None
|
||
if method == "max":
|
||
return round(max(vals), 1)
|
||
if method == "log":
|
||
return round(10 * math.log10(sum(10 ** (v / 10.0) for v in vals) / len(vals)), 1)
|
||
return round(sum(vals) / len(vals), 1) # arithmetic
|
||
|
||
|
||
def _combine_across_nights(per_night: list, method: str) -> Optional[float]:
|
||
"""Combine per-night window values into a baseline 'typical night' value.
|
||
|
||
Arithmetic mean for max/arith metrics (so baseline Lmax = typical nightly
|
||
peak, the agreed default), logarithmic mean for Leq.
|
||
"""
|
||
vals = [v for v in per_night if v is not None]
|
||
if not vals:
|
||
return None
|
||
if method == "log":
|
||
return round(10 * math.log10(sum(10 ** (v / 10.0) for v in vals) / len(vals)), 1)
|
||
return round(sum(vals) / len(vals), 1)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Row gathering
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _parse_dt(s: str) -> Optional[datetime]:
|
||
try:
|
||
return datetime.strptime(s, "%Y/%m/%d %H:%M:%S")
|
||
except (ValueError, TypeError):
|
||
return None
|
||
|
||
|
||
def _location_leq_rows(db: Session, location_id: str) -> list[tuple[datetime, dict, dict]]:
|
||
"""All Leq intervals at a location as (interval_dt, row, percentile_map).
|
||
|
||
Reuses the same .rnd readers as the report endpoints so parsing stays
|
||
identical. Times are the meter's local clock (as written in the file).
|
||
"""
|
||
# Lazy import avoids a service→router import cycle at module load.
|
||
from backend.routers.projects import (
|
||
_read_rnd_file_rows, _normalize_rnd_rows, _is_leq_file, _peek_rnd_headers,
|
||
)
|
||
from pathlib import Path
|
||
|
||
out: list[tuple[datetime, dict, dict]] = []
|
||
sessions = db.query(MonitoringSession).filter_by(
|
||
location_id=location_id, session_type="sound",
|
||
).all()
|
||
for s in sessions:
|
||
try:
|
||
meta = json.loads(s.session_metadata or "{}")
|
||
except (json.JSONDecodeError, TypeError):
|
||
meta = {}
|
||
pct_map = meta.get("percentiles", {}) or {}
|
||
for f in db.query(DataFile).filter_by(session_id=s.id).all():
|
||
if not f.file_path or not f.file_path.lower().endswith(".rnd"):
|
||
continue
|
||
peek = _peek_rnd_headers(Path("data") / f.file_path)
|
||
if not _is_leq_file(f.file_path, peek):
|
||
continue
|
||
rows = _read_rnd_file_rows(f.file_path)
|
||
rows, _ = _normalize_rnd_rows(rows)
|
||
for r in rows:
|
||
dt = _parse_dt(r.get("Start Time", ""))
|
||
if dt:
|
||
out.append((dt, r, pct_map))
|
||
out.sort(key=lambda t: t[0])
|
||
return out
|
||
|
||
|
||
def _rows_in_night(rows: list, night_date: date) -> list:
|
||
"""Rows falling in the night that *starts* on night_date (19:00 → +12h)."""
|
||
start = datetime(night_date.year, night_date.month, night_date.day, NIGHT_START_HOUR, 0)
|
||
end = start + timedelta(hours=NIGHT_LENGTH_HOURS)
|
||
return [(dt, r, p) for (dt, r, p) in rows if start <= dt < end]
|
||
|
||
|
||
def _eligible_nights(rows: list, start_date: date, end_date: date) -> list[date]:
|
||
"""Evening-dates in [start_date, end_date] that actually have night data."""
|
||
nights = []
|
||
cur = start_date
|
||
while cur <= end_date:
|
||
if _rows_in_night(rows, cur):
|
||
nights.append(cur)
|
||
cur += timedelta(days=1)
|
||
return nights
|
||
|
||
|
||
def _window_value(rows: list, metric: Metric, window: Window) -> Optional[float]:
|
||
"""Single aggregated value for one metric over one window of `rows`."""
|
||
vals = []
|
||
for dt, r, pct_map in rows:
|
||
if window.contains(dt.hour):
|
||
col = _resolve_column(metric, pct_map)
|
||
if col:
|
||
vals.append(r.get(col))
|
||
return _aggregate(vals, metric.agg)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Report data model
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass
|
||
class CellPair:
|
||
last_night: Optional[float]
|
||
baseline: Optional[float]
|
||
|
||
@property
|
||
def delta(self) -> Optional[float]:
|
||
if self.last_night is None or self.baseline is None:
|
||
return None
|
||
return round(self.last_night - self.baseline, 1)
|
||
|
||
|
||
@dataclass
|
||
class LocationNightReport:
|
||
location_id: str
|
||
location_name: str
|
||
night_date: date
|
||
metrics: list[Metric]
|
||
windows: list[Window]
|
||
# table[window_key][metric_key] = CellPair
|
||
table: dict[str, dict[str, CellPair]]
|
||
interval_series: list[dict]
|
||
night_interval_count: int
|
||
baseline_nights_used: int
|
||
notes: list[str] = field(default_factory=list)
|
||
|
||
|
||
def _location_reference_baseline(loc) -> dict:
|
||
"""A location's manually-entered reference baseline, from its metadata.
|
||
|
||
Shape: {window_key: {metric_key: float}} e.g. {"nighttime": {"l10": 85.0}}.
|
||
Used when baseline_mode == "reference" — fixed targets/limits or prior-report
|
||
averages typed in, rather than computed from captured nights.
|
||
"""
|
||
if not loc:
|
||
return {}
|
||
try:
|
||
meta = json.loads(loc.location_metadata or "{}")
|
||
except (json.JSONDecodeError, TypeError):
|
||
return {}
|
||
ref = meta.get("report_baseline") or {}
|
||
out: dict[str, dict[str, float]] = {}
|
||
if isinstance(ref, dict):
|
||
for wkey, mvals in ref.items():
|
||
if not isinstance(mvals, dict):
|
||
continue
|
||
clean = {}
|
||
for mkey, val in mvals.items():
|
||
try:
|
||
clean[mkey] = float(val)
|
||
except (ValueError, TypeError):
|
||
continue
|
||
if clean:
|
||
out[wkey] = clean
|
||
return out
|
||
|
||
|
||
def build_location_night_report(
|
||
db: Session,
|
||
location_id: str,
|
||
night_date: date,
|
||
*,
|
||
metric_keys: Optional[list[str]] = None,
|
||
windows: Optional[list[Window]] = None,
|
||
baseline_mode: str = "captured",
|
||
baseline_start: Optional[date] = None,
|
||
baseline_end: Optional[date] = None,
|
||
) -> LocationNightReport:
|
||
"""Build the night-vs-baseline data model for one location.
|
||
|
||
`night_date` is the *evening* date of the night being reported (e.g. the
|
||
7/7 in "night of 7/7 → morning 7/8"). Baseline comes from one of:
|
||
- "captured": the typical-night value across eligible nights in
|
||
[baseline_start, baseline_end] (computed from recorded data);
|
||
- "reference": fixed values typed per location (a spec limit like
|
||
"L10 = 85", or a prior report's averages).
|
||
"""
|
||
metric_keys = metric_keys or DEFAULT_METRICS
|
||
metrics = [METRIC_REGISTRY[k] for k in metric_keys]
|
||
windows = windows or DEFAULT_WINDOWS
|
||
|
||
loc = db.query(MonitoringLocation).filter_by(id=location_id).first()
|
||
loc_name = loc.name if loc else location_id
|
||
|
||
all_rows = _location_leq_rows(db, location_id)
|
||
night_rows = _rows_in_night(all_rows, night_date)
|
||
|
||
reference = _location_reference_baseline(loc) if baseline_mode == "reference" else {}
|
||
|
||
baseline_nights: list[date] = []
|
||
if baseline_mode != "reference" and baseline_start and baseline_end:
|
||
baseline_nights = _eligible_nights(all_rows, baseline_start, baseline_end)
|
||
# Don't let the reported night double as its own baseline.
|
||
baseline_nights = [n for n in baseline_nights if n != night_date]
|
||
|
||
table: dict[str, dict[str, CellPair]] = {}
|
||
for w in windows:
|
||
table[w.key] = {}
|
||
for m in metrics:
|
||
last_night_val = _window_value(night_rows, m, w)
|
||
if baseline_mode == "reference":
|
||
baseline_val = reference.get(w.key, {}).get(m.key)
|
||
elif baseline_nights:
|
||
per_night = [
|
||
_window_value(_rows_in_night(all_rows, nd), m, w)
|
||
for nd in baseline_nights
|
||
]
|
||
baseline_val = _combine_across_nights(per_night, m.agg)
|
||
else:
|
||
baseline_val = None
|
||
table[w.key][m.key] = CellPair(last_night_val, baseline_val)
|
||
|
||
interval_series = []
|
||
for dt, r, pct_map in night_rows:
|
||
entry = {"dt": dt, "time": dt.strftime("%H:%M")}
|
||
for m in metrics:
|
||
col = _resolve_column(m, pct_map)
|
||
val = r.get(col) if col else None
|
||
entry[m.key] = val if isinstance(val, (int, float)) else None
|
||
interval_series.append(entry)
|
||
|
||
notes: list[str] = []
|
||
if not night_rows:
|
||
notes.append(f"No data found for the night of {night_date:%m/%d/%y}.")
|
||
if baseline_mode == "reference":
|
||
if not any(reference.values()):
|
||
notes.append("Reference-baseline mode is on but no reference values are set for this location.")
|
||
elif (baseline_start or baseline_end) and not baseline_nights:
|
||
notes.append("No baseline nights with data in the configured range.")
|
||
|
||
return LocationNightReport(
|
||
location_id=location_id,
|
||
location_name=loc_name,
|
||
night_date=night_date,
|
||
metrics=metrics,
|
||
windows=windows,
|
||
table=table,
|
||
interval_series=interval_series,
|
||
night_interval_count=len(night_rows),
|
||
baseline_nights_used=len(baseline_nights),
|
||
notes=notes,
|
||
)
|
||
|
||
|
||
@dataclass
|
||
class ProjectNightReport:
|
||
project_id: str
|
||
project_name: str
|
||
night_date: date
|
||
metrics: list[Metric]
|
||
locations: list[LocationNightReport]
|
||
|
||
|
||
def build_project_night_report(
|
||
db: Session,
|
||
project_id: str,
|
||
night_date: date,
|
||
*,
|
||
metric_keys: Optional[list[str]] = None,
|
||
windows: Optional[list[Window]] = None,
|
||
baseline_mode: str = "captured",
|
||
baseline_start: Optional[date] = None,
|
||
baseline_end: Optional[date] = None,
|
||
) -> ProjectNightReport:
|
||
"""Build the night report for every active sound location in a project."""
|
||
metric_keys = metric_keys or DEFAULT_METRICS
|
||
project = db.query(Project).filter_by(id=project_id).first()
|
||
project_name = project.name if project else project_id
|
||
|
||
locations = db.query(MonitoringLocation).filter_by(
|
||
project_id=project_id, location_type="sound",
|
||
).order_by(MonitoringLocation.sort_order, MonitoringLocation.name).all()
|
||
locations = [l for l in locations if getattr(l, "removed_at", None) is None]
|
||
|
||
reports = [
|
||
build_location_night_report(
|
||
db, loc.id, night_date,
|
||
metric_keys=metric_keys, windows=windows,
|
||
baseline_mode=baseline_mode,
|
||
baseline_start=baseline_start, baseline_end=baseline_end,
|
||
)
|
||
for loc in locations
|
||
]
|
||
|
||
return ProjectNightReport(
|
||
project_id=project_id,
|
||
project_name=project_name,
|
||
night_date=night_date,
|
||
metrics=[METRIC_REGISTRY[k] for k in metric_keys],
|
||
locations=reports,
|
||
)
|