From ed195ed96b954e2870e22b0e8a49a18d4a42174f Mon Sep 17 00:00:00 2001 From: serversdown Date: Wed, 10 Jun 2026 20:41:05 +0000 Subject: [PATCH 01/15] feat(reports): FTP night-report pipeline foundation Terra-View side of the daily night-vs-baseline sound report for the John Myler 24/7 job. Engine is built and verified end-to-end against real meter data; SMTP send + scheduler/capture wiring still pending. - ingest: refactor upload_nrl_data into a callable ingest_nrl_zip(location_id, zip_bytes, db) sharing one core with the HTTP endpoint. Capture the .rnh percentile map + weightings into session metadata; dedup on store-name + start time. Ingest stays metric-agnostic (every Leq column preserved). - report_pipeline.py: metric registry, Evening/Nighttime windows, correct aggregation (Lmax=max, Ln=arithmetic, Leq=logarithmic), baseline = typical night, per-location + per-project builders. - report_renderers.py: HTML email-body renderer (Last/Base/delta layout). - report_email.py: config-driven SMTP via stdlib (env vars) with a dry-run fallback so the pipeline runs without credentials. - report_orchestrator.py: compute -> render -> always write report.html + report.json to disk -> best-effort email. Co-Authored-By: Claude Opus 4.8 (1M context) --- REPORT_PIPELINE_BRIEF.md | 59 ++++ backend/routers/project_locations.py | 433 ++++++++++++++++-------- backend/services/report_email.py | 161 +++++++++ backend/services/report_orchestrator.py | 130 +++++++ backend/services/report_pipeline.py | 389 +++++++++++++++++++++ backend/services/report_renderers.py | 114 +++++++ 6 files changed, 1142 insertions(+), 144 deletions(-) create mode 100644 REPORT_PIPELINE_BRIEF.md create mode 100644 backend/services/report_email.py create mode 100644 backend/services/report_orchestrator.py create mode 100644 backend/services/report_pipeline.py create mode 100644 backend/services/report_renderers.py diff --git a/REPORT_PIPELINE_BRIEF.md b/REPORT_PIPELINE_BRIEF.md new file mode 100644 index 0000000..78674fc --- /dev/null +++ b/REPORT_PIPELINE_BRIEF.md @@ -0,0 +1,59 @@ +# FTP Report Pipeline — session brief + +**Branch:** `feat/ftp-report-pipeline` (off `dev`), worktree `/home/serversdown/terra-view-reports`. +**Scope:** Terra-View only. Do NOT touch SLMM — the SLMM alert/monitor work is live in a +parallel session on `slmm` branch `feat/drd-fix`. Pull device data through the **existing** +SLMM FTP proxy endpoints; add no SLMM code (for v1). + +See memory note `client_sound_monitoring_job_2026-07` for the client requirements + timeline. + +## Goal +Automated **daily morning report** for the John Myler 3-location sound job: each AM, last +night's noise levels vs the **baseline week**, per location. Data pulled from the meters via +FTP (the meter records 24/7 to SD regardless of TCP wedges). Alerts are a *separate* workstream +(SLMM, real-time DOD) — not in scope here. + +## The big realization (why this is small) +The hard parts already exist: +- **SLMM (use as-is, via the `/api/slmm/...` proxy):** + - `GET /api/slmm/{unit}/ftp/files?path=/NL-43` → list files/folders + - `POST /api/slmm/{unit}/ftp/download-folder` → returns the `Auto_####` folder as a **ZIP** +- **Terra-View ingest (reuse):** `backend/routers/project_locations.py:1743` `upload_nrl_data` + already accepts a **ZIP**, extracts, keeps `.rnh` + `_Leq_ .rnd` (drops `_Lp_`/junk via + `_is_wanted`), runs `_parse_rnh` (line 1687) → creates `MonitoringSession` + `DataFile`. +- **Report generator (reuse, source-agnostic):** `backend/routers/projects.py`. The `.rnd` + file reads funnel through 3 helpers — `_peek_rnd_headers` (~135), `_is_leq_file` (~147), + `_read_rnd_file_rows` (~256). `.rnd` files live on disk under `data/{file_path}` (DataFile + holds the path, not a BLOB). The stats/Excel/formatting logic doesn't care where bytes come from. + +## Build (Terra-View) +1. **Refactor** `upload_nrl_data`'s core into a callable `ingest_nrl_zip(location_id, zip_bytes, db)` + so it can be invoked programmatically (not only via HTTP UploadFile). +2. **Scheduled pull job** (reuse the existing scheduler): per project location/unit → + `GET /ftp/files` to find new `Auto_####` folders → `POST /ftp/download-folder` (zip) → + `ingest_nrl_zip(...)`. **Dedup** so repeated pulls don't duplicate sessions/files + (track ingested folder names per location). +3. **Baseline aggregation:** aggregate the baseline-week `_Leq_` intervals per location → + reference values (nighttime Leq, L90 floor, typical Lmax). +4. **Nightly report + email:** compute last night's metrics per location, compare to baseline + (deltas), render (reuse the Excel/report machinery), email each morning. + +## Data-location decision (light version, agreed) +Keep `MonitoringSession`/`DataFile` **metadata in TV** for now; reuse the existing on-disk file +store. Optional refinement (later): have SLMM keep the pulled files and TV read them through a +SLMM file-serve endpoint (avoids the copy-into-TV step). Don't do that refinement under the +deadline unless trivial — the report logic is identical either way. + +## Open questions to resolve early +1. **What's actually in a `_Leq_ .rnd`** — Leq only, or Leq + Lmax + Ln per 15-min interval? + Decides whether the night-vs-baseline report can show L90/Lmax or just Leq. Inspect a real file. +2. **Session rollover / dedup** — does a 2-week run write one growing `Auto_####` folder or new + folders? Drives the "what's new" logic. +3. **`download-folder` over a multi-day run** — confirm it zips cleanly (size/time). + +## Client params (confirm with Dave before locking) +Threshold/metric + their "night" window; report recipients + format (email body vs PDF/Excel). + +## Timeline +Setup ~7/1–7/2 (baseline week), shutdown week through ~7/17. Reports needed by ~7/8 (before +shutdown). Today is ~3 weeks out — reliability > features. diff --git a/backend/routers/project_locations.py b/backend/routers/project_locations.py index 0b17003..b1c38f8 100644 --- a/backend/routers/project_locations.py +++ b/backend/routers/project_locations.py @@ -1712,6 +1712,19 @@ def _parse_rnh(content: bytes) -> dict: result["stop_time_str"] = value elif key == "Total Measurement Time": result["total_time_str"] = value + elif key == "Frequency Weighting (Main)": + result["frequency_weighting"] = value + elif key == "Time Weighting (Main)": + result["time_weighting"] = value + elif key == "Leq Calculation Interval": + result["leq_interval"] = value + elif key.startswith("Percentile "): + # e.g. "Percentile 4,90.0" → percentiles["4"] = "90.0". + # Lets the report label the LN slots (here LN4 = L90) from the + # device's own config instead of hardcoding which slot is which — + # the percentile assignment is reconfigurable per job. + slot = key[len("Percentile "):].strip() + result.setdefault("percentiles", {})[slot] = value except Exception: pass return result @@ -1740,6 +1753,270 @@ def _classify_file(filename: str) -> str: return "data" +def _is_wanted_nrl_file(fname: str) -> bool: + """Keep only the files an NRL ingest cares about: .rnh metadata + the + averaged Leq .rnd. Drops the 1-second _Lp_ files and everything else. + + - NL-43 writes two .rnd types: _Leq_ (15-min averages, wanted) and + _Lp_ (1-second granular, skipped). + - AU2 (NL-23/older Rion) writes a single Au2_####.rnd — always keep. + + Note this is purely about which *files* to store, not which *metrics* to + report: the kept Leq file carries every column (Leq, Lmax, L1/L10/L50/ + L90/L95, Lpeak, …), so the report layer can select any metric later. + """ + n = fname.lower() + if n.endswith(".rnh"): + return True + if n.endswith(".rnd"): + if "_leq_" in n: # NL-43 Leq file + return True + if n.startswith("au2_"): # AU2 format (NL-23) — Leq equivalent + return True + if "_lp" not in n and "_leq_" not in n: + # Unknown .rnd format — include it so we don't silently drop data + return True + return False + + +class IngestError(Exception): + """Raised when an NRL upload/ZIP has no usable data or an invalid target. + + Kept HTTP-agnostic so the ingest core can be driven programmatically (the + scheduled FTP pull) as well as from the HTTP upload endpoint. Callers + translate it: the endpoint → HTTP 400, the scheduler → logged failure. + """ + pass + + +def _find_existing_session( + db: Session, + location_id: str, + store_name: str, + started_at, + start_time_str: str, +): + """Return an already-ingested session for this location that represents the + same measurement, or None. + + Used to make FTP re-pulls idempotent: a daily cycle closes one Auto_#### + folder per day, so a session is uniquely identified within a location by + (store_name + measurement start time). Store names recycle across jobs, so + we always match on start time too. + """ + if not store_name and not started_at: + return None + candidates = db.query(MonitoringSession).filter( + MonitoringSession.location_id == location_id, + MonitoringSession.session_type == "sound", + ).all() + for s in candidates: + try: + meta = json.loads(s.session_metadata or "{}") + except (json.JSONDecodeError, TypeError): + meta = {} + if store_name and meta.get("store_name") != store_name: + continue + # Same store_name — confirm it's the same measurement by start time. + if start_time_str and meta.get("start_time_str") == start_time_str: + return s + if not meta.get("start_time_str") and started_at and s.started_at == started_at: + return s + return None + + +def _ingest_file_entries( + location: MonitoringLocation, + file_entries: list[tuple[str, bytes]], + db: Session, + *, + source: str = "manual_upload", + dedupe: bool = False, +) -> dict: + """Core NRL ingest, shared by the HTTP upload and the programmatic FTP pull. + + Takes already-normalized (filename, bytes) entries, keeps the wanted files, + parses the .rnh, and creates a MonitoringSession + DataFile rows under the + location's project. Metric-agnostic: the full Leq file is written to disk + and every column preserved; metric selection happens in the report layer. + + Raises IngestError if no usable files are present. + """ + # --- Filter to the files we keep (.rnh + Leq .rnd) --- + file_entries = [(f, b) for f, b in file_entries if _is_wanted_nrl_file(f)] + if not file_entries: + raise IngestError( + "No usable .rnd or .rnh files found. Expected NL-43 _Leq_ files or AU2 format .rnd files." + ) + + # --- Parse .rnh metadata (first one wins) --- + rnh_meta = {} + for fname, fbytes in file_entries: + if fname.lower().endswith(".rnh"): + rnh_meta = _parse_rnh(fbytes) + break + + # RNH stores local time (no UTC offset). Use local for period/label, then + # convert to UTC for storage so the local_datetime filter displays correctly. + started_at_local = _parse_rnh_datetime(rnh_meta.get("start_time_str")) or datetime.utcnow() + stopped_at_local = _parse_rnh_datetime(rnh_meta.get("stop_time_str")) + started_at = local_to_utc(started_at_local) + stopped_at = local_to_utc(stopped_at_local) if stopped_at_local else None + duration_seconds = ( + int((stopped_at - started_at).total_seconds()) + if (started_at and stopped_at) else None + ) + + store_name = rnh_meta.get("store_name", "") + serial_number = rnh_meta.get("serial_number", "") + index_number = rnh_meta.get("index_number", "") + start_time_str = rnh_meta.get("start_time_str", "") + + # --- Dedupe: skip if this exact measurement is already ingested --- + if dedupe: + existing = _find_existing_session(db, location.id, store_name, started_at, start_time_str) + if existing: + return { + "success": True, + "deduped": True, + "session_id": existing.id, + "files_imported": 0, + "leq_files": 0, + "lp_files": 0, + "metadata_files": 0, + "store_name": store_name, + "started_at": started_at.isoformat() if started_at else None, + "stopped_at": stopped_at.isoformat() if stopped_at else None, + } + + # --- Create MonitoringSession (local times drive period/label) --- + period_type = _derive_period_type(started_at_local) if started_at_local else None + session_label = ( + _build_session_label(started_at_local, location.name, period_type) + if started_at_local else None + ) + + session_id = str(uuid.uuid4()) + monitoring_session = MonitoringSession( + id=session_id, + project_id=location.project_id, + location_id=location.id, + unit_id=None, + session_type="sound", + started_at=started_at, + stopped_at=stopped_at, + duration_seconds=duration_seconds, + status="completed", + session_label=session_label, + period_type=period_type, + session_metadata=json.dumps({ + "source": source, + "store_name": store_name, + "serial_number": serial_number, + "index_number": index_number, + "start_time_str": start_time_str, + # Captured from the .rnh so the report can label metrics from the + # device's own config (which LN slot is L90, the weightings, etc.). + "percentiles": rnh_meta.get("percentiles", {}), + "frequency_weighting": rnh_meta.get("frequency_weighting", ""), + "time_weighting": rnh_meta.get("time_weighting", ""), + "leq_interval": rnh_meta.get("leq_interval", ""), + }), + ) + db.add(monitoring_session) + db.commit() + db.refresh(monitoring_session) + + # --- Write files to disk + create DataFile records --- + output_dir = Path("data/Projects") / location.project_id / session_id + output_dir.mkdir(parents=True, exist_ok=True) + + leq_count = lp_count = metadata_count = files_imported = 0 + for fname, fbytes in file_entries: + fname_lower = fname.lower() + if fname_lower.endswith(".rnd"): + if "_leq_" in fname_lower: + leq_count += 1 + elif "_lp" in fname_lower: + lp_count += 1 + elif fname_lower.endswith(".rnh"): + metadata_count += 1 + + dest = output_dir / fname + dest.write_bytes(fbytes) + checksum = hashlib.sha256(fbytes).hexdigest() + rel_path = str(dest.relative_to("data")) + + db.add(DataFile( + id=str(uuid.uuid4()), + session_id=session_id, + file_path=rel_path, + file_type=_classify_file(fname), + file_size_bytes=len(fbytes), + downloaded_at=datetime.utcnow(), + checksum=checksum, + file_metadata=json.dumps({ + "source": source, + "original_filename": fname, + "store_name": store_name, + }), + )) + files_imported += 1 + + db.commit() + + return { + "success": True, + "deduped": False, + "session_id": session_id, + "files_imported": files_imported, + "leq_files": leq_count, + "lp_files": lp_count, + "metadata_files": metadata_count, + "store_name": store_name, + "started_at": started_at.isoformat() if started_at else None, + "stopped_at": stopped_at.isoformat() if stopped_at else None, + } + + +def ingest_nrl_zip( + location_id: str, + zip_bytes: bytes, + db: Session, + *, + source: str = "ftp_pull", + dedupe: bool = True, +) -> dict: + """Programmatically ingest an Auto_#### ZIP (e.g. a scheduled FTP pull). + + Extracts the ZIP (flattening any nested Auto_Leq/Auto_Lp_ folders), keeps + the .rnh + Leq .rnd, parses the header, and creates a MonitoringSession + + DataFile rows for `location_id`. Defaults to dedupe=True so repeated daily + pulls of the same closed folder don't create duplicate sessions. + + Returns the same dict shape as the HTTP upload, plus a `deduped` flag. + Raises IngestError on a bad ZIP, no usable files, or unknown location. + """ + location = db.query(MonitoringLocation).filter_by(id=location_id).first() + if not location: + raise IngestError(f"Location {location_id} not found") + + try: + with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: + file_entries: list[tuple[str, bytes]] = [] + for info in zf.infolist(): + if info.is_dir(): + continue + name = Path(info.filename).name # strip nested folder paths + if not name: + continue + file_entries.append((name, zf.read(info))) + except zipfile.BadZipFile: + raise IngestError("Downloaded data is not a valid ZIP archive.") + + return _ingest_file_entries(location, file_entries, db, source=source, dedupe=dedupe) + + @router.post("/nrl/{location_id}/upload-data") async def upload_nrl_data( project_id: str, @@ -1754,11 +2031,13 @@ async def upload_nrl_data( - A single .zip file (the Auto_#### folder zipped) — auto-extracted - Multiple .rnd / .rnh files selected directly from the SD card folder - Creates a MonitoringSession from .rnh metadata and DataFile records - for each measurement file. No unit assignment required. + Normalizes the upload to (filename, bytes) entries, then hands off to the + shared ingest core (`_ingest_file_entries`) — the same path the scheduled + FTP pull uses via `ingest_nrl_zip`. Creates a MonitoringSession from the + .rnh metadata and DataFile records for each measurement file. No unit + assignment required. dedupe=False here preserves the prior manual-upload + behaviour (re-uploading creates a fresh session). """ - from datetime import datetime - # Verify project and location exist project = db.query(Project).filter_by(id=project_id).first() _require_module(project, "sound_monitoring", db) @@ -1769,7 +2048,7 @@ async def upload_nrl_data( if not location: raise HTTPException(status_code=404, detail="Location not found") - # --- Step 1: Normalize to (filename, bytes) list --- + # --- Normalize upload to (filename, bytes) entries --- file_entries: list[tuple[str, bytes]] = [] if len(files) == 1 and files[0].filename.lower().endswith(".zip"): @@ -1793,145 +2072,11 @@ async def upload_nrl_data( if not file_entries: raise HTTPException(status_code=400, detail="No usable files found in upload.") - # --- Step 1b: Filter to only relevant files --- - # Keep: .rnh (metadata) and measurement .rnd files - # NL-43 generates two .rnd types: _Leq_ (15-min averages, wanted) and _Lp_ (1-sec granular, skip) - # AU2 (NL-23/older Rion) generates a single Au2_####.rnd per session — always keep those - # Drop: _Lp_ .rnd, .xlsx, .mp3, and anything else - def _is_wanted(fname: str) -> bool: - n = fname.lower() - if n.endswith(".rnh"): - return True - if n.endswith(".rnd"): - if "_leq_" in n: # NL-43 Leq file - return True - if n.startswith("au2_"): # AU2 format (NL-23) — always Leq equivalent - return True - if "_lp" not in n and "_leq_" not in n: - # Unknown .rnd format — include it so we don't silently drop data - return True - return False - - file_entries = [(fname, fbytes) for fname, fbytes in file_entries if _is_wanted(fname)] - - if not file_entries: - raise HTTPException(status_code=400, detail="No usable .rnd or .rnh files found. Expected NL-43 _Leq_ files or AU2 format .rnd files.") - - # --- Step 2: Find and parse .rnh metadata --- - rnh_meta = {} - for fname, fbytes in file_entries: - if fname.lower().endswith(".rnh"): - rnh_meta = _parse_rnh(fbytes) - break - - # RNH files store local time (no UTC offset). Use local values for period - # classification / label generation, then convert to UTC for DB storage so - # the local_datetime Jinja filter displays the correct time. - started_at_local = _parse_rnh_datetime(rnh_meta.get("start_time_str")) or datetime.utcnow() - stopped_at_local = _parse_rnh_datetime(rnh_meta.get("stop_time_str")) - - started_at = local_to_utc(started_at_local) - stopped_at = local_to_utc(stopped_at_local) if stopped_at_local else None - - duration_seconds = None - if started_at and stopped_at: - duration_seconds = int((stopped_at - started_at).total_seconds()) - - store_name = rnh_meta.get("store_name", "") - serial_number = rnh_meta.get("serial_number", "") - index_number = rnh_meta.get("index_number", "") - - # --- Step 3: Create MonitoringSession --- - # Use local times for period/label so classification reflects the clock at the site. - period_type = _derive_period_type(started_at_local) if started_at_local else None - session_label = _build_session_label(started_at_local, location.name, period_type) if started_at_local else None - - session_id = str(uuid.uuid4()) - monitoring_session = MonitoringSession( - id=session_id, - project_id=project_id, - location_id=location_id, - unit_id=None, - session_type="sound", - started_at=started_at, - stopped_at=stopped_at, - duration_seconds=duration_seconds, - status="completed", - session_label=session_label, - period_type=period_type, - session_metadata=json.dumps({ - "source": "manual_upload", - "store_name": store_name, - "serial_number": serial_number, - "index_number": index_number, - }), - ) - db.add(monitoring_session) - db.commit() - db.refresh(monitoring_session) - - # --- Step 4: Write files to disk and create DataFile records --- - output_dir = Path("data/Projects") / project_id / session_id - output_dir.mkdir(parents=True, exist_ok=True) - - leq_count = 0 - lp_count = 0 - metadata_count = 0 - files_imported = 0 - - for fname, fbytes in file_entries: - file_type = _classify_file(fname) - fname_lower = fname.lower() - - # Track counts for summary - if fname_lower.endswith(".rnd"): - if "_leq_" in fname_lower: - leq_count += 1 - elif "_lp" in fname_lower: - lp_count += 1 - elif fname_lower.endswith(".rnh"): - metadata_count += 1 - - # Write to disk - dest = output_dir / fname - dest.write_bytes(fbytes) - - # Compute checksum - checksum = hashlib.sha256(fbytes).hexdigest() - - # Store relative path from data/ dir - rel_path = str(dest.relative_to("data")) - - data_file = DataFile( - id=str(uuid.uuid4()), - session_id=session_id, - file_path=rel_path, - file_type=file_type, - file_size_bytes=len(fbytes), - downloaded_at=datetime.utcnow(), - checksum=checksum, - file_metadata=json.dumps({ - "source": "manual_upload", - "original_filename": fname, - "store_name": store_name, - }), - ) - db.add(data_file) - files_imported += 1 - - db.commit() - - return { - "success": True, - "session_id": session_id, - "files_imported": files_imported, - "leq_files": leq_count, - "lp_files": lp_count, - "metadata_files": metadata_count, - "store_name": store_name, - "started_at": started_at.isoformat() if started_at else None, - "stopped_at": stopped_at.isoformat() if stopped_at else None, - } + # --- Hand off to the shared ingest core --- + try: + return _ingest_file_entries(location, file_entries, db, source="manual_upload", dedupe=False) + except IngestError as e: + raise HTTPException(status_code=400, detail=str(e)) # ============================================================================ diff --git a/backend/services/report_email.py b/backend/services/report_email.py new file mode 100644 index 0000000..8041003 --- /dev/null +++ b/backend/services/report_email.py @@ -0,0 +1,161 @@ +""" +Report email sender — config-driven SMTP via the Python standard library. + +Connection settings come from environment variables so the mail backend +(internal relay / Microsoft 365 / Gmail / SendGrid) can be swapped without code +changes — see the build plan: terra-mechanics.com is on M365 and has a smarthost +relay that already sends the seismograph alerts as remote@terra-mechanics.com; +reuse that relay's settings here. + +DRY-RUN: if SMTP isn't configured (no host/from), the message is built and +logged but NOT sent, and the call still succeeds. This keeps report generation +working before the relay is wired up, and means a missing/incomplete mail config +can never crash the nightly pipeline. + +Env vars +-------- + REPORT_SMTP_HOST e.g. smtp.office365.com (unset → dry-run) + REPORT_SMTP_PORT default 587 + REPORT_SMTP_SECURITY starttls (default) | ssl | none + REPORT_SMTP_USER optional — omit for IP-authenticated relays + REPORT_SMTP_PASSWORD optional + REPORT_SMTP_FROM e.g. "TMI Monitoring " + REPORT_SMTP_RECIPIENTS comma-separated default recipient list + REPORT_SMTP_TIMEOUT seconds, default 30 +""" + +from __future__ import annotations + +import logging +import os +import smtplib +import ssl +from dataclasses import dataclass, field +from email.message import EmailMessage +from typing import Optional + +logger = logging.getLogger(__name__) + +# Convenient MIME type for the Excel attachment. +XLSX_MIME = ("application", "vnd.openxmlformats-officedocument.spreadsheetml.sheet") + + +@dataclass +class Attachment: + filename: str + content: bytes + maintype: str = "application" + subtype: str = "octet-stream" + + +@dataclass +class SMTPConfig: + host: str = "" + port: int = 587 + security: str = "starttls" # "starttls" | "ssl" | "none" + user: str = "" + password: str = "" + sender: str = "" + recipients: list[str] = field(default_factory=list) + timeout: float = 30.0 + + @classmethod + def from_env(cls) -> "SMTPConfig": + rec = os.getenv("REPORT_SMTP_RECIPIENTS", "") + return cls( + host=os.getenv("REPORT_SMTP_HOST", "").strip(), + port=int(os.getenv("REPORT_SMTP_PORT", "587") or 587), + security=os.getenv("REPORT_SMTP_SECURITY", "starttls").strip().lower(), + user=os.getenv("REPORT_SMTP_USER", "").strip(), + password=os.getenv("REPORT_SMTP_PASSWORD", ""), + sender=os.getenv("REPORT_SMTP_FROM", "").strip(), + recipients=[r.strip() for r in rec.split(",") if r.strip()], + timeout=float(os.getenv("REPORT_SMTP_TIMEOUT", "30") or 30), + ) + + @property + def configured(self) -> bool: + """True only when we have enough to actually send (host + from).""" + return bool(self.host and self.sender) + + +def build_message( + cfg: SMTPConfig, + subject: str, + html_body: str, + recipients: list[str], + attachments: Optional[list[Attachment]] = None, + text_body: Optional[str] = None, +) -> EmailMessage: + """Assemble a multipart message: plain-text fallback + HTML + attachments.""" + msg = EmailMessage() + msg["From"] = cfg.sender or "terra-view@localhost" + msg["To"] = ", ".join(recipients) + msg["Subject"] = subject + # Plain-text part first, then the HTML alternative (clients prefer the HTML). + msg.set_content(text_body or "This report is best viewed in an HTML email client.") + msg.add_alternative(html_body, subtype="html") + for att in (attachments or []): + msg.add_attachment( + att.content, maintype=att.maintype, subtype=att.subtype, filename=att.filename, + ) + return msg + + +def send_report_email( + subject: str, + html_body: str, + *, + attachments: Optional[list[Attachment]] = None, + recipients: Optional[list[str]] = None, + text_body: Optional[str] = None, + cfg: Optional[SMTPConfig] = None, +) -> dict: + """Send (or dry-run) the report email. + + Returns a result dict: {sent, dry_run, recipients, error}. Never raises on + a send failure — it logs and returns error, so the orchestrator can record + the failure without aborting the rest of the pipeline. + """ + cfg = cfg or SMTPConfig.from_env() + recipients = recipients if recipients is not None else cfg.recipients + result = {"sent": False, "dry_run": False, "recipients": recipients, "error": None} + + if not recipients: + result["error"] = "No recipients configured" + logger.warning("Report email: no recipients set; skipping send of %r", subject) + return result + + msg = build_message(cfg, subject, html_body, recipients, attachments, text_body) + + if not cfg.configured: + result["dry_run"] = True + logger.info( + "Report email DRY-RUN (SMTP not configured): would send %r to %s with %d attachment(s)", + subject, recipients, len(attachments or []), + ) + return result + + try: + if cfg.security == "ssl": + ctx = ssl.create_default_context() + with smtplib.SMTP_SSL(cfg.host, cfg.port, timeout=cfg.timeout, context=ctx) as s: + if cfg.user: + s.login(cfg.user, cfg.password) + s.send_message(msg) + else: + with smtplib.SMTP(cfg.host, cfg.port, timeout=cfg.timeout) as s: + s.ehlo() + if cfg.security == "starttls": + s.starttls(context=ssl.create_default_context()) + s.ehlo() + if cfg.user: + s.login(cfg.user, cfg.password) + s.send_message(msg) + result["sent"] = True + logger.info("Report email sent: %r to %s", subject, recipients) + except Exception as e: # noqa: BLE001 — surface as result, never abort the pipeline + result["error"] = str(e) + logger.error("Report email send failed: %s", e, exc_info=True) + + return result diff --git a/backend/services/report_orchestrator.py b/backend/services/report_orchestrator.py new file mode 100644 index 0000000..cb83eeb --- /dev/null +++ b/backend/services/report_orchestrator.py @@ -0,0 +1,130 @@ +""" +Nightly Report Orchestrator. + +Ties the pieces together: compute → render → write-to-disk → email. + +This is what the daily cycle (or a manual trigger) calls. It ALWAYS writes the +rendered report to disk — `data/reports/{project_id}/{night_date}/report.html` +(+ `report.json` with the raw numbers) — so there's a viewable artifact even +when email is in dry-run (SMTP not configured yet). The email step is +best-effort and never aborts the run. +""" + +from __future__ import annotations + +import json +import logging +from datetime import date +from pathlib import Path +from typing import Optional + +from sqlalchemy.orm import Session + +from backend.services.report_pipeline import ( + ProjectNightReport, build_project_night_report, Window, +) +from backend.services.report_renderers import render_html_summary +from backend.services.report_email import send_report_email, Attachment + +logger = logging.getLogger(__name__) + +DEFAULT_OUTPUT_ROOT = "data/reports" + + +def _report_to_dict(report: ProjectNightReport) -> dict: + """Serialise the report data model to plain JSON (for the on-disk record).""" + return { + "project_id": report.project_id, + "project_name": report.project_name, + "night_date": report.night_date.isoformat(), + "metrics": [m.key for m in report.metrics], + "locations": [ + { + "name": loc.location_name, + "night_interval_count": loc.night_interval_count, + "baseline_nights_used": loc.baseline_nights_used, + "notes": loc.notes, + "windows": { + w.key: { + "label": w.label, + "metrics": { + m.key: { + "label": m.label, + "last_night": loc.table[w.key][m.key].last_night, + "baseline": loc.table[w.key][m.key].baseline, + "delta": loc.table[w.key][m.key].delta, + } + for m in loc.metrics + }, + } + for w in loc.windows + }, + } + for loc in report.locations + ], + } + + +def run_nightly_report( + db: Session, + project_id: str, + night_date: date, + *, + metric_keys: Optional[list[str]] = None, + windows: Optional[list[Window]] = None, + baseline_start: Optional[date] = None, + baseline_end: Optional[date] = None, + recipients: Optional[list[str]] = None, + output_root: str = DEFAULT_OUTPUT_ROOT, + send: bool = True, +) -> dict: + """Build, persist, and (dry-run) email the night report for a project. + + Returns a result dict with the on-disk artifact paths and the email result. + Designed to be called from the daily cycle or a manual trigger. + """ + report = build_project_night_report( + db, project_id, night_date, + metric_keys=metric_keys, windows=windows, + baseline_start=baseline_start, baseline_end=baseline_end, + ) + + html = render_html_summary(report) + subject = f"{report.project_name} — night report {night_date:%m/%d/%y}" + + # --- Always persist a viewable copy --- + out_dir = Path(output_root) / project_id / f"{night_date:%Y-%m-%d}" + out_dir.mkdir(parents=True, exist_ok=True) + html_path = out_dir / "report.html" + html_path.write_text(html, encoding="utf-8") + json_path = out_dir / "report.json" + json_path.write_text(json.dumps(_report_to_dict(report), indent=2), encoding="utf-8") + + # --- Excel attachment (renderer not built yet — held until metric set is final) --- + attachments: list[Attachment] = [] + + # --- Email (best-effort; dry-run until SMTP is configured) --- + email_result = {"sent": False, "dry_run": False, "skipped": True, "error": None} + if send: + email_result = send_report_email( + subject, html, attachments=attachments, recipients=recipients, + ) + + result = { + "project_id": project_id, + "project_name": report.project_name, + "night_date": night_date.isoformat(), + "subject": subject, + "location_count": len(report.locations), + "html_path": str(html_path), + "json_path": str(json_path), + "email": email_result, + } + logger.info( + "Nightly report for %s (%s): %d location(s) → %s; email=%s", + report.project_name, night_date, len(report.locations), html_path, + "sent" if email_result.get("sent") else + ("dry-run" if email_result.get("dry_run") else + ("skipped" if email_result.get("skipped") else f"error: {email_result.get('error')}")), + ) + return result diff --git a/backend/services/report_pipeline.py b/backend/services/report_pipeline.py new file mode 100644 index 0000000..2a736f0 --- /dev/null +++ b/backend/services/report_pipeline.py @@ -0,0 +1,389 @@ +""" +Nightly Report Pipeline — computation core. + +Builds the data model for the John-Myler-style "last night vs. baseline" sound +report. Source-agnostic: it reads the same on-disk Leq `.rnd` files the manual +upload + FTP-pull ingest produce (see `project_locations.ingest_nrl_zip`). + +Design notes +------------ +* **Ingest everything, report selectively.** Ingest preserves every column of + the Leq file; this layer chooses which *metrics* to surface via `metric_keys` + (a future report wizard is just a UI over that list). +* **House format match.** Defaults reproduce the existing Excel report: + LAmax (max of interval maxima), LA01 / LA10 (arithmetic average), split into + Evening (7–10PM) and Nighttime (10PM–7AM) windows. L90 (background) is added + for the baseline comparison. +* **Metric labelling from the device.** The LN→percentile assignment is + reconfigurable per job; we resolve which `LNx(Main)` column is L90/L10/etc. + from the percentile map captured in the session metadata at ingest, falling + back to the NL-43 default order. +* **Correct averaging.** Leq is energy-averaged (logarithmic); percentiles and + Lmax are arithmetic. Baseline references combine the per-night values into a + "typical night" (arithmetic mean of per-night values — so baseline Lmax is the + typical nightly peak, not the worst-of-week). +""" + +from __future__ import annotations + +import json +import logging +import math +from dataclasses import dataclass, field +from datetime import datetime, timedelta, date +from typing import Optional + +from sqlalchemy.orm import Session + +from backend.models import MonitoringSession, DataFile, MonitoringLocation, Project + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Metric registry +# --------------------------------------------------------------------------- + +@dataclass(frozen=True) +class Metric: + """A reportable metric. + + `agg` is the *within-night* aggregation used to collapse a window's 15-min + intervals into one value: + - "max" → loudest interval (LAmax) + - "arith" → arithmetic mean (percentiles: L01/L10/L90…) + - "log" → energy/logarithmic mean (Leq only) + `column` pins a fixed .rnd column; `percentile` instead resolves the LNx + column from the session's captured percentile map. + """ + key: str + label: str + agg: str + column: Optional[str] = None + percentile: Optional[float] = None + + +METRIC_REGISTRY: dict[str, Metric] = { + "lmax": Metric("lmax", "LAmax", "max", column="Lmax(Main)"), + "leq": Metric("leq", "LAeq", "log", column="Leq(Main)"), + "lmin": Metric("lmin", "LAmin", "arith", column="Lmin(Main)"), + "l01": Metric("l01", "LA01", "arith", percentile=1.0), + "l10": Metric("l10", "LA10", "arith", percentile=10.0), + "l50": Metric("l50", "LA50", "arith", percentile=50.0), + "l90": Metric("l90", "LA90", "arith", percentile=90.0), + "l95": Metric("l95", "LA95", "arith", percentile=95.0), +} + +# House report metrics + L90 (background) for the baseline comparison. +DEFAULT_METRICS: list[str] = ["lmax", "l01", "l10", "l90"] + +# NL-43 default percentile→slot assignment, used when a session has no captured map. +_DEFAULT_SLOT_FOR_PCT: dict[float, int] = {1.0: 1, 10.0: 2, 50.0: 3, 90.0: 4, 95.0: 5} + + +def _resolve_column(metric: Metric, pct_map: dict) -> Optional[str]: + """Resolve the .rnd column for a metric, using the session's percentile map.""" + if metric.column: + return metric.column + if metric.percentile is None: + return None + # pct_map: {"1": "1.0", "2": "10.0", "4": "90.0", ...} → slot : percentile + if pct_map: + for slot, pval in pct_map.items(): + try: + if float(pval) == metric.percentile: + return f"LN{int(slot)}(Main)" + except (ValueError, TypeError): + continue + slot = _DEFAULT_SLOT_FOR_PCT.get(metric.percentile) + return f"LN{slot}(Main)" if slot else None + + +# --------------------------------------------------------------------------- +# Time windows +# --------------------------------------------------------------------------- + +@dataclass(frozen=True) +class Window: + key: str + label: str + start_hour: int + end_hour: int + + def contains(self, hour: int) -> bool: + if self.start_hour < self.end_hour: + return self.start_hour <= hour < self.end_hour + return hour >= self.start_hour or hour < self.end_hour + + +# Matches the existing Excel report's stats table. +DEFAULT_WINDOWS: list[Window] = [ + Window("evening", "Evening (7PM–10PM)", 19, 22), + Window("nighttime", "Nighttime (10PM–7AM)", 22, 7), +] + +# The full night used to select which intervals belong to "last night". +NIGHT_START_HOUR = 19 +NIGHT_LENGTH_HOURS = 12 + + +# --------------------------------------------------------------------------- +# Aggregation +# --------------------------------------------------------------------------- + +def _aggregate(values: list, method: str) -> Optional[float]: + """Collapse a window's interval values into one number per `method`.""" + vals = [v for v in values if isinstance(v, (int, float))] + if not vals: + return None + if method == "max": + return round(max(vals), 1) + if method == "log": + return round(10 * math.log10(sum(10 ** (v / 10.0) for v in vals) / len(vals)), 1) + return round(sum(vals) / len(vals), 1) # arithmetic + + +def _combine_across_nights(per_night: list, method: str) -> Optional[float]: + """Combine per-night window values into a baseline 'typical night' value. + + Arithmetic mean for max/arith metrics (so baseline Lmax = typical nightly + peak, the agreed default), logarithmic mean for Leq. + """ + vals = [v for v in per_night if v is not None] + if not vals: + return None + if method == "log": + return round(10 * math.log10(sum(10 ** (v / 10.0) for v in vals) / len(vals)), 1) + return round(sum(vals) / len(vals), 1) + + +# --------------------------------------------------------------------------- +# Row gathering +# --------------------------------------------------------------------------- + +def _parse_dt(s: str) -> Optional[datetime]: + try: + return datetime.strptime(s, "%Y/%m/%d %H:%M:%S") + except (ValueError, TypeError): + return None + + +def _location_leq_rows(db: Session, location_id: str) -> list[tuple[datetime, dict, dict]]: + """All Leq intervals at a location as (interval_dt, row, percentile_map). + + Reuses the same .rnd readers as the report endpoints so parsing stays + identical. Times are the meter's local clock (as written in the file). + """ + # Lazy import avoids a service→router import cycle at module load. + from backend.routers.projects import ( + _read_rnd_file_rows, _normalize_rnd_rows, _is_leq_file, _peek_rnd_headers, + ) + from pathlib import Path + + out: list[tuple[datetime, dict, dict]] = [] + sessions = db.query(MonitoringSession).filter_by( + location_id=location_id, session_type="sound", + ).all() + for s in sessions: + try: + meta = json.loads(s.session_metadata or "{}") + except (json.JSONDecodeError, TypeError): + meta = {} + pct_map = meta.get("percentiles", {}) or {} + for f in db.query(DataFile).filter_by(session_id=s.id).all(): + if not f.file_path or not f.file_path.lower().endswith(".rnd"): + continue + peek = _peek_rnd_headers(Path("data") / f.file_path) + if not _is_leq_file(f.file_path, peek): + continue + rows = _read_rnd_file_rows(f.file_path) + rows, _ = _normalize_rnd_rows(rows) + for r in rows: + dt = _parse_dt(r.get("Start Time", "")) + if dt: + out.append((dt, r, pct_map)) + out.sort(key=lambda t: t[0]) + return out + + +def _rows_in_night(rows: list, night_date: date) -> list: + """Rows falling in the night that *starts* on night_date (19:00 → +12h).""" + start = datetime(night_date.year, night_date.month, night_date.day, NIGHT_START_HOUR, 0) + end = start + timedelta(hours=NIGHT_LENGTH_HOURS) + return [(dt, r, p) for (dt, r, p) in rows if start <= dt < end] + + +def _eligible_nights(rows: list, start_date: date, end_date: date) -> list[date]: + """Evening-dates in [start_date, end_date] that actually have night data.""" + nights = [] + cur = start_date + while cur <= end_date: + if _rows_in_night(rows, cur): + nights.append(cur) + cur += timedelta(days=1) + return nights + + +def _window_value(rows: list, metric: Metric, window: Window) -> Optional[float]: + """Single aggregated value for one metric over one window of `rows`.""" + vals = [] + for dt, r, pct_map in rows: + if window.contains(dt.hour): + col = _resolve_column(metric, pct_map) + if col: + vals.append(r.get(col)) + return _aggregate(vals, metric.agg) + + +# --------------------------------------------------------------------------- +# Report data model +# --------------------------------------------------------------------------- + +@dataclass +class CellPair: + last_night: Optional[float] + baseline: Optional[float] + + @property + def delta(self) -> Optional[float]: + if self.last_night is None or self.baseline is None: + return None + return round(self.last_night - self.baseline, 1) + + +@dataclass +class LocationNightReport: + location_id: str + location_name: str + night_date: date + metrics: list[Metric] + windows: list[Window] + # table[window_key][metric_key] = CellPair + table: dict[str, dict[str, CellPair]] + interval_series: list[dict] + night_interval_count: int + baseline_nights_used: int + notes: list[str] = field(default_factory=list) + + +def build_location_night_report( + db: Session, + location_id: str, + night_date: date, + *, + metric_keys: Optional[list[str]] = None, + windows: Optional[list[Window]] = None, + baseline_start: Optional[date] = None, + baseline_end: Optional[date] = None, +) -> LocationNightReport: + """Build the night-vs-baseline data model for one location. + + `night_date` is the *evening* date of the night being reported (e.g. the + 7/7 in "night of 7/7 → morning 7/8"). Baseline is the typical-night value + across the eligible nights in [baseline_start, baseline_end]; pass neither + to skip the comparison (baseline cells become None). + """ + metric_keys = metric_keys or DEFAULT_METRICS + metrics = [METRIC_REGISTRY[k] for k in metric_keys] + windows = windows or DEFAULT_WINDOWS + + loc = db.query(MonitoringLocation).filter_by(id=location_id).first() + loc_name = loc.name if loc else location_id + + all_rows = _location_leq_rows(db, location_id) + night_rows = _rows_in_night(all_rows, night_date) + + baseline_nights: list[date] = [] + if baseline_start and baseline_end: + baseline_nights = _eligible_nights(all_rows, baseline_start, baseline_end) + # Don't let the reported night double as its own baseline. + baseline_nights = [n for n in baseline_nights if n != night_date] + + table: dict[str, dict[str, CellPair]] = {} + for w in windows: + table[w.key] = {} + for m in metrics: + last_night_val = _window_value(night_rows, m, w) + baseline_val = None + if baseline_nights: + per_night = [ + _window_value(_rows_in_night(all_rows, nd), m, w) + for nd in baseline_nights + ] + baseline_val = _combine_across_nights(per_night, m.agg) + table[w.key][m.key] = CellPair(last_night_val, baseline_val) + + interval_series = [] + for dt, r, pct_map in night_rows: + entry = {"dt": dt, "time": dt.strftime("%H:%M")} + for m in metrics: + col = _resolve_column(m, pct_map) + val = r.get(col) if col else None + entry[m.key] = val if isinstance(val, (int, float)) else None + interval_series.append(entry) + + notes: list[str] = [] + if not night_rows: + notes.append(f"No data found for the night of {night_date:%m/%d/%y}.") + if (baseline_start or baseline_end) and not baseline_nights: + notes.append("No baseline nights with data in the configured range.") + + return LocationNightReport( + location_id=location_id, + location_name=loc_name, + night_date=night_date, + metrics=metrics, + windows=windows, + table=table, + interval_series=interval_series, + night_interval_count=len(night_rows), + baseline_nights_used=len(baseline_nights), + notes=notes, + ) + + +@dataclass +class ProjectNightReport: + project_id: str + project_name: str + night_date: date + metrics: list[Metric] + locations: list[LocationNightReport] + + +def build_project_night_report( + db: Session, + project_id: str, + night_date: date, + *, + metric_keys: Optional[list[str]] = None, + windows: Optional[list[Window]] = None, + baseline_start: Optional[date] = None, + baseline_end: Optional[date] = None, +) -> ProjectNightReport: + """Build the night report for every active sound location in a project.""" + metric_keys = metric_keys or DEFAULT_METRICS + project = db.query(Project).filter_by(id=project_id).first() + project_name = project.name if project else project_id + + locations = db.query(MonitoringLocation).filter_by( + project_id=project_id, location_type="sound", + ).order_by(MonitoringLocation.sort_order, MonitoringLocation.name).all() + locations = [l for l in locations if getattr(l, "removed_at", None) is None] + + reports = [ + build_location_night_report( + db, loc.id, night_date, + metric_keys=metric_keys, windows=windows, + baseline_start=baseline_start, baseline_end=baseline_end, + ) + for loc in locations + ] + + return ProjectNightReport( + project_id=project_id, + project_name=project_name, + night_date=night_date, + metrics=[METRIC_REGISTRY[k] for k in metric_keys], + locations=reports, + ) diff --git a/backend/services/report_renderers.py b/backend/services/report_renderers.py new file mode 100644 index 0000000..994f500 --- /dev/null +++ b/backend/services/report_renderers.py @@ -0,0 +1,114 @@ +""" +Nightly Report Renderers. + +Pluggable renderers over the `report_pipeline` data model. v1 ships the HTML +email body + the Excel attachment; PDF and an inline chart image are v1.1 +(each needs a new dependency). Keeping renderers separate from the compute +core means a future report wizard just toggles metrics/renderers — the data +model is unchanged. + +Email-client constraints: the HTML uses a table layout with **inline styles +only** (no