Merge pull request 'feat(reports): FTP night-report pipeline foundation' (#62) from feat/ftp-report-pipeline into dev

Reviewed-on: #62
This commit was merged in pull request #62.
This commit is contained in:
2026-06-11 23:27:34 -04:00
13 changed files with 2398 additions and 148 deletions
+4
View File
@@ -167,6 +167,10 @@ app.include_router(deployments.router)
from backend.routers import calibration
app.include_router(calibration.router)
# Nightly sound-report pipeline (manual triggers; scheduled tick reuses run_nightly_report)
from backend.routers import reports
app.include_router(reports.router)
# Start scheduler service and device status monitor on application startup
from backend.services.scheduler import start_scheduler, stop_scheduler
from backend.services.device_status_monitor import start_device_status_monitor, stop_device_status_monitor
+29
View File
@@ -219,6 +219,35 @@ class ProjectModule(Base):
__table_args__ = (UniqueConstraint("project_id", "module_type", name="uq_project_module"),)
class SoundReportConfig(Base):
"""
Per-project configuration for the automated nightly sound report
(FTP report pipeline). One row per project. Read by the morning tick in
SchedulerService and by the manual /reports endpoints (as defaults).
New table → created by Base.metadata.create_all() on startup; no migration
needed (only a rebuild/restart).
"""
__tablename__ = "sound_report_configs"
id = Column(String, primary_key=True, default=lambda: __import__('uuid').uuid4().__str__())
project_id = Column(String, nullable=False, index=True, unique=True) # FK to projects.id
enabled = Column(Boolean, default=False, nullable=False) # run the daily report?
report_time = Column(String, default="08:00", nullable=False) # local HH:MM to run/send
metric_keys = Column(String, default="lmax,l01,l10,l90", nullable=False) # csv of metric keys
# Baseline source: "captured" = compute from recorded nights in the date range below;
# "reference" = use fixed values typed per location (old-report averages or a spec limit).
baseline_mode = Column(String, default="captured", nullable=False)
baseline_start = Column(Date, nullable=True) # captured-mode range
baseline_end = Column(Date, nullable=True)
recipients = Column(Text, nullable=True) # csv; falls back to REPORT_SMTP_RECIPIENTS env
last_run_date = Column(Date, nullable=True) # evening-date of the last reported night (dedup)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class MonitoringLocation(Base):
"""
Monitoring locations: generic location for monitoring activities.
+289 -144
View File
@@ -1712,6 +1712,19 @@ def _parse_rnh(content: bytes) -> dict:
result["stop_time_str"] = value
elif key == "Total Measurement Time":
result["total_time_str"] = value
elif key == "Frequency Weighting (Main)":
result["frequency_weighting"] = value
elif key == "Time Weighting (Main)":
result["time_weighting"] = value
elif key == "Leq Calculation Interval":
result["leq_interval"] = value
elif key.startswith("Percentile "):
# e.g. "Percentile 4,90.0" → percentiles["4"] = "90.0".
# Lets the report label the LN slots (here LN4 = L90) from the
# device's own config instead of hardcoding which slot is which —
# the percentile assignment is reconfigurable per job.
slot = key[len("Percentile "):].strip()
result.setdefault("percentiles", {})[slot] = value
except Exception:
pass
return result
@@ -1740,6 +1753,270 @@ def _classify_file(filename: str) -> str:
return "data"
def _is_wanted_nrl_file(fname: str) -> bool:
"""Keep only the files an NRL ingest cares about: .rnh metadata + the
averaged Leq .rnd. Drops the 1-second _Lp_ files and everything else.
- NL-43 writes two .rnd types: _Leq_ (15-min averages, wanted) and
_Lp_ (1-second granular, skipped).
- AU2 (NL-23/older Rion) writes a single Au2_####.rnd — always keep.
Note this is purely about which *files* to store, not which *metrics* to
report: the kept Leq file carries every column (Leq, Lmax, L1/L10/L50/
L90/L95, Lpeak, …), so the report layer can select any metric later.
"""
n = fname.lower()
if n.endswith(".rnh"):
return True
if n.endswith(".rnd"):
if "_leq_" in n: # NL-43 Leq file
return True
if n.startswith("au2_"): # AU2 format (NL-23) — Leq equivalent
return True
if "_lp" not in n and "_leq_" not in n:
# Unknown .rnd format — include it so we don't silently drop data
return True
return False
class IngestError(Exception):
"""Raised when an NRL upload/ZIP has no usable data or an invalid target.
Kept HTTP-agnostic so the ingest core can be driven programmatically (the
scheduled FTP pull) as well as from the HTTP upload endpoint. Callers
translate it: the endpoint → HTTP 400, the scheduler → logged failure.
"""
pass
def _find_existing_session(
db: Session,
location_id: str,
store_name: str,
started_at,
start_time_str: str,
):
"""Return an already-ingested session for this location that represents the
same measurement, or None.
Used to make FTP re-pulls idempotent: a daily cycle closes one Auto_####
folder per day, so a session is uniquely identified within a location by
(store_name + measurement start time). Store names recycle across jobs, so
we always match on start time too.
"""
if not store_name and not started_at:
return None
candidates = db.query(MonitoringSession).filter(
MonitoringSession.location_id == location_id,
MonitoringSession.session_type == "sound",
).all()
for s in candidates:
try:
meta = json.loads(s.session_metadata or "{}")
except (json.JSONDecodeError, TypeError):
meta = {}
if store_name and meta.get("store_name") != store_name:
continue
# Same store_name — confirm it's the same measurement by start time.
if start_time_str and meta.get("start_time_str") == start_time_str:
return s
if not meta.get("start_time_str") and started_at and s.started_at == started_at:
return s
return None
def _ingest_file_entries(
location: MonitoringLocation,
file_entries: list[tuple[str, bytes]],
db: Session,
*,
source: str = "manual_upload",
dedupe: bool = False,
) -> dict:
"""Core NRL ingest, shared by the HTTP upload and the programmatic FTP pull.
Takes already-normalized (filename, bytes) entries, keeps the wanted files,
parses the .rnh, and creates a MonitoringSession + DataFile rows under the
location's project. Metric-agnostic: the full Leq file is written to disk
and every column preserved; metric selection happens in the report layer.
Raises IngestError if no usable files are present.
"""
# --- Filter to the files we keep (.rnh + Leq .rnd) ---
file_entries = [(f, b) for f, b in file_entries if _is_wanted_nrl_file(f)]
if not file_entries:
raise IngestError(
"No usable .rnd or .rnh files found. Expected NL-43 _Leq_ files or AU2 format .rnd files."
)
# --- Parse .rnh metadata (first one wins) ---
rnh_meta = {}
for fname, fbytes in file_entries:
if fname.lower().endswith(".rnh"):
rnh_meta = _parse_rnh(fbytes)
break
# RNH stores local time (no UTC offset). Use local for period/label, then
# convert to UTC for storage so the local_datetime filter displays correctly.
started_at_local = _parse_rnh_datetime(rnh_meta.get("start_time_str")) or datetime.utcnow()
stopped_at_local = _parse_rnh_datetime(rnh_meta.get("stop_time_str"))
started_at = local_to_utc(started_at_local)
stopped_at = local_to_utc(stopped_at_local) if stopped_at_local else None
duration_seconds = (
int((stopped_at - started_at).total_seconds())
if (started_at and stopped_at) else None
)
store_name = rnh_meta.get("store_name", "")
serial_number = rnh_meta.get("serial_number", "")
index_number = rnh_meta.get("index_number", "")
start_time_str = rnh_meta.get("start_time_str", "")
# --- Dedupe: skip if this exact measurement is already ingested ---
if dedupe:
existing = _find_existing_session(db, location.id, store_name, started_at, start_time_str)
if existing:
return {
"success": True,
"deduped": True,
"session_id": existing.id,
"files_imported": 0,
"leq_files": 0,
"lp_files": 0,
"metadata_files": 0,
"store_name": store_name,
"started_at": started_at.isoformat() if started_at else None,
"stopped_at": stopped_at.isoformat() if stopped_at else None,
}
# --- Create MonitoringSession (local times drive period/label) ---
period_type = _derive_period_type(started_at_local) if started_at_local else None
session_label = (
_build_session_label(started_at_local, location.name, period_type)
if started_at_local else None
)
session_id = str(uuid.uuid4())
monitoring_session = MonitoringSession(
id=session_id,
project_id=location.project_id,
location_id=location.id,
unit_id=None,
session_type="sound",
started_at=started_at,
stopped_at=stopped_at,
duration_seconds=duration_seconds,
status="completed",
session_label=session_label,
period_type=period_type,
session_metadata=json.dumps({
"source": source,
"store_name": store_name,
"serial_number": serial_number,
"index_number": index_number,
"start_time_str": start_time_str,
# Captured from the .rnh so the report can label metrics from the
# device's own config (which LN slot is L90, the weightings, etc.).
"percentiles": rnh_meta.get("percentiles", {}),
"frequency_weighting": rnh_meta.get("frequency_weighting", ""),
"time_weighting": rnh_meta.get("time_weighting", ""),
"leq_interval": rnh_meta.get("leq_interval", ""),
}),
)
db.add(monitoring_session)
db.commit()
db.refresh(monitoring_session)
# --- Write files to disk + create DataFile records ---
output_dir = Path("data/Projects") / location.project_id / session_id
output_dir.mkdir(parents=True, exist_ok=True)
leq_count = lp_count = metadata_count = files_imported = 0
for fname, fbytes in file_entries:
fname_lower = fname.lower()
if fname_lower.endswith(".rnd"):
if "_leq_" in fname_lower:
leq_count += 1
elif "_lp" in fname_lower:
lp_count += 1
elif fname_lower.endswith(".rnh"):
metadata_count += 1
dest = output_dir / fname
dest.write_bytes(fbytes)
checksum = hashlib.sha256(fbytes).hexdigest()
rel_path = str(dest.relative_to("data"))
db.add(DataFile(
id=str(uuid.uuid4()),
session_id=session_id,
file_path=rel_path,
file_type=_classify_file(fname),
file_size_bytes=len(fbytes),
downloaded_at=datetime.utcnow(),
checksum=checksum,
file_metadata=json.dumps({
"source": source,
"original_filename": fname,
"store_name": store_name,
}),
))
files_imported += 1
db.commit()
return {
"success": True,
"deduped": False,
"session_id": session_id,
"files_imported": files_imported,
"leq_files": leq_count,
"lp_files": lp_count,
"metadata_files": metadata_count,
"store_name": store_name,
"started_at": started_at.isoformat() if started_at else None,
"stopped_at": stopped_at.isoformat() if stopped_at else None,
}
def ingest_nrl_zip(
location_id: str,
zip_bytes: bytes,
db: Session,
*,
source: str = "ftp_pull",
dedupe: bool = True,
) -> dict:
"""Programmatically ingest an Auto_#### ZIP (e.g. a scheduled FTP pull).
Extracts the ZIP (flattening any nested Auto_Leq/Auto_Lp_ folders), keeps
the .rnh + Leq .rnd, parses the header, and creates a MonitoringSession +
DataFile rows for `location_id`. Defaults to dedupe=True so repeated daily
pulls of the same closed folder don't create duplicate sessions.
Returns the same dict shape as the HTTP upload, plus a `deduped` flag.
Raises IngestError on a bad ZIP, no usable files, or unknown location.
"""
location = db.query(MonitoringLocation).filter_by(id=location_id).first()
if not location:
raise IngestError(f"Location {location_id} not found")
try:
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
file_entries: list[tuple[str, bytes]] = []
for info in zf.infolist():
if info.is_dir():
continue
name = Path(info.filename).name # strip nested folder paths
if not name:
continue
file_entries.append((name, zf.read(info)))
except zipfile.BadZipFile:
raise IngestError("Downloaded data is not a valid ZIP archive.")
return _ingest_file_entries(location, file_entries, db, source=source, dedupe=dedupe)
@router.post("/nrl/{location_id}/upload-data")
async def upload_nrl_data(
project_id: str,
@@ -1754,11 +2031,13 @@ async def upload_nrl_data(
- A single .zip file (the Auto_#### folder zipped) — auto-extracted
- Multiple .rnd / .rnh files selected directly from the SD card folder
Creates a MonitoringSession from .rnh metadata and DataFile records
for each measurement file. No unit assignment required.
Normalizes the upload to (filename, bytes) entries, then hands off to the
shared ingest core (`_ingest_file_entries`) — the same path the scheduled
FTP pull uses via `ingest_nrl_zip`. Creates a MonitoringSession from the
.rnh metadata and DataFile records for each measurement file. No unit
assignment required. dedupe=False here preserves the prior manual-upload
behaviour (re-uploading creates a fresh session).
"""
from datetime import datetime
# Verify project and location exist
project = db.query(Project).filter_by(id=project_id).first()
_require_module(project, "sound_monitoring", db)
@@ -1769,7 +2048,7 @@ async def upload_nrl_data(
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# --- Step 1: Normalize to (filename, bytes) list ---
# --- Normalize upload to (filename, bytes) entries ---
file_entries: list[tuple[str, bytes]] = []
if len(files) == 1 and files[0].filename.lower().endswith(".zip"):
@@ -1793,145 +2072,11 @@ async def upload_nrl_data(
if not file_entries:
raise HTTPException(status_code=400, detail="No usable files found in upload.")
# --- Step 1b: Filter to only relevant files ---
# Keep: .rnh (metadata) and measurement .rnd files
# NL-43 generates two .rnd types: _Leq_ (15-min averages, wanted) and _Lp_ (1-sec granular, skip)
# AU2 (NL-23/older Rion) generates a single Au2_####.rnd per session — always keep those
# Drop: _Lp_ .rnd, .xlsx, .mp3, and anything else
def _is_wanted(fname: str) -> bool:
n = fname.lower()
if n.endswith(".rnh"):
return True
if n.endswith(".rnd"):
if "_leq_" in n: # NL-43 Leq file
return True
if n.startswith("au2_"): # AU2 format (NL-23) — always Leq equivalent
return True
if "_lp" not in n and "_leq_" not in n:
# Unknown .rnd format — include it so we don't silently drop data
return True
return False
file_entries = [(fname, fbytes) for fname, fbytes in file_entries if _is_wanted(fname)]
if not file_entries:
raise HTTPException(status_code=400, detail="No usable .rnd or .rnh files found. Expected NL-43 _Leq_ files or AU2 format .rnd files.")
# --- Step 2: Find and parse .rnh metadata ---
rnh_meta = {}
for fname, fbytes in file_entries:
if fname.lower().endswith(".rnh"):
rnh_meta = _parse_rnh(fbytes)
break
# RNH files store local time (no UTC offset). Use local values for period
# classification / label generation, then convert to UTC for DB storage so
# the local_datetime Jinja filter displays the correct time.
started_at_local = _parse_rnh_datetime(rnh_meta.get("start_time_str")) or datetime.utcnow()
stopped_at_local = _parse_rnh_datetime(rnh_meta.get("stop_time_str"))
started_at = local_to_utc(started_at_local)
stopped_at = local_to_utc(stopped_at_local) if stopped_at_local else None
duration_seconds = None
if started_at and stopped_at:
duration_seconds = int((stopped_at - started_at).total_seconds())
store_name = rnh_meta.get("store_name", "")
serial_number = rnh_meta.get("serial_number", "")
index_number = rnh_meta.get("index_number", "")
# --- Step 3: Create MonitoringSession ---
# Use local times for period/label so classification reflects the clock at the site.
period_type = _derive_period_type(started_at_local) if started_at_local else None
session_label = _build_session_label(started_at_local, location.name, period_type) if started_at_local else None
session_id = str(uuid.uuid4())
monitoring_session = MonitoringSession(
id=session_id,
project_id=project_id,
location_id=location_id,
unit_id=None,
session_type="sound",
started_at=started_at,
stopped_at=stopped_at,
duration_seconds=duration_seconds,
status="completed",
session_label=session_label,
period_type=period_type,
session_metadata=json.dumps({
"source": "manual_upload",
"store_name": store_name,
"serial_number": serial_number,
"index_number": index_number,
}),
)
db.add(monitoring_session)
db.commit()
db.refresh(monitoring_session)
# --- Step 4: Write files to disk and create DataFile records ---
output_dir = Path("data/Projects") / project_id / session_id
output_dir.mkdir(parents=True, exist_ok=True)
leq_count = 0
lp_count = 0
metadata_count = 0
files_imported = 0
for fname, fbytes in file_entries:
file_type = _classify_file(fname)
fname_lower = fname.lower()
# Track counts for summary
if fname_lower.endswith(".rnd"):
if "_leq_" in fname_lower:
leq_count += 1
elif "_lp" in fname_lower:
lp_count += 1
elif fname_lower.endswith(".rnh"):
metadata_count += 1
# Write to disk
dest = output_dir / fname
dest.write_bytes(fbytes)
# Compute checksum
checksum = hashlib.sha256(fbytes).hexdigest()
# Store relative path from data/ dir
rel_path = str(dest.relative_to("data"))
data_file = DataFile(
id=str(uuid.uuid4()),
session_id=session_id,
file_path=rel_path,
file_type=file_type,
file_size_bytes=len(fbytes),
downloaded_at=datetime.utcnow(),
checksum=checksum,
file_metadata=json.dumps({
"source": "manual_upload",
"original_filename": fname,
"store_name": store_name,
}),
)
db.add(data_file)
files_imported += 1
db.commit()
return {
"success": True,
"session_id": session_id,
"files_imported": files_imported,
"leq_files": leq_count,
"lp_files": lp_count,
"metadata_files": metadata_count,
"store_name": store_name,
"started_at": started_at.isoformat() if started_at else None,
"stopped_at": stopped_at.isoformat() if stopped_at else None,
}
# --- Hand off to the shared ingest core ---
try:
return _ingest_file_entries(location, file_entries, db, source="manual_upload", dedupe=False)
except IngestError as e:
raise HTTPException(status_code=400, detail=str(e))
# ============================================================================
+434
View File
@@ -0,0 +1,434 @@
"""
Nightly Report Router.
Manual triggers for the night-vs-baseline sound report — the same entry point
the scheduled morning tick will reuse. Two endpoints:
GET …/reports/nightly/view → render and return the HTML inline (preview).
No write, no email. Browser-friendly.
POST …/reports/nightly/run → full run: build → write report.html/json to
disk → (dry-run) email. Returns JSON result.
Dates are the *evening* date of the night being reported (the 7/7 in "night of
7/7 → morning 7/8"). Defaults to last night. Baseline is optional; pass the
baseline-week range to populate the comparison.
"""
from __future__ import annotations
import json
import logging
import re
import uuid
from datetime import datetime, timedelta, date
from html import escape
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import HTMLResponse
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.models import Project, SoundReportConfig, MonitoringLocation
from backend.services.report_pipeline import (
METRIC_REGISTRY, DEFAULT_METRICS, DEFAULT_WINDOWS, _location_reference_baseline,
)
from backend.services.report_orchestrator import run_nightly_report
from backend.utils.timezone import utc_to_local
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/projects/{project_id}/reports", tags=["reports"])
def _default_night_date() -> date:
"""Last night = yesterday in the user's local timezone."""
return (utc_to_local(datetime.utcnow()) - timedelta(days=1)).date()
def _parse_date(s: Optional[str], field: str) -> Optional[date]:
if not s:
return None
try:
return datetime.strptime(s, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(status_code=400, detail=f"{field} must be YYYY-MM-DD (got {s!r})")
def _parse_metrics(s: Optional[str]) -> list[str]:
if not s:
return list(DEFAULT_METRICS)
keys = [k.strip().lower() for k in s.split(",") if k.strip()]
unknown = [k for k in keys if k not in METRIC_REGISTRY]
if unknown:
raise HTTPException(
status_code=400,
detail=f"Unknown metric(s): {unknown}. Known: {sorted(METRIC_REGISTRY)}",
)
return keys or list(DEFAULT_METRICS)
def _validate_hhmm(s) -> str:
"""Validate a local HH:MM (24h) time string."""
try:
hh, mm = str(s).split(":")
h, m = int(hh), int(mm)
if 0 <= h < 24 and 0 <= m < 60:
return f"{h:02d}:{m:02d}"
except (ValueError, AttributeError):
pass
raise HTTPException(status_code=400, detail=f"report_time must be HH:MM 24-hour (got {s!r})")
def _config_dict(cfg: Optional[SoundReportConfig], project_id: str) -> dict:
"""Serialise a config row (or defaults if none yet) to JSON."""
return {
"project_id": project_id,
"exists": cfg is not None,
"enabled": cfg.enabled if cfg else False,
"report_time": cfg.report_time if cfg else "08:00",
"metric_keys": cfg.metric_keys if cfg else ",".join(DEFAULT_METRICS),
"baseline_mode": cfg.baseline_mode if cfg else "captured",
"baseline_start": cfg.baseline_start.isoformat() if cfg and cfg.baseline_start else None,
"baseline_end": cfg.baseline_end.isoformat() if cfg and cfg.baseline_end else None,
"recipients": (cfg.recipients if cfg and cfg.recipients else ""),
"last_run_date": cfg.last_run_date.isoformat() if cfg and cfg.last_run_date else None,
}
@router.get("/config")
async def get_report_config(project_id: str, db: Session = Depends(get_db)):
"""Return the project's nightly-report config (or defaults if not set yet)."""
if not db.query(Project).filter_by(id=project_id).first():
raise HTTPException(status_code=404, detail="Project not found")
cfg = db.query(SoundReportConfig).filter_by(project_id=project_id).first()
return _config_dict(cfg, project_id)
@router.put("/config")
async def put_report_config(project_id: str, request: Request, db: Session = Depends(get_db)):
"""Create or update the project's nightly-report config (JSON body)."""
if not db.query(Project).filter_by(id=project_id).first():
raise HTTPException(status_code=404, detail="Project not found")
data = await request.json()
cfg = db.query(SoundReportConfig).filter_by(project_id=project_id).first()
created = cfg is None
if cfg is None:
cfg = SoundReportConfig(id=str(uuid.uuid4()), project_id=project_id)
db.add(cfg)
if "enabled" in data:
cfg.enabled = bool(data["enabled"])
if "report_time" in data:
cfg.report_time = _validate_hhmm(data["report_time"])
if "metric_keys" in data:
mk = data["metric_keys"]
mk = mk if isinstance(mk, str) else ",".join(mk or [])
cfg.metric_keys = ",".join(_parse_metrics(mk))
if "baseline_mode" in data:
bm = str(data["baseline_mode"]).lower()
if bm not in ("captured", "reference"):
raise HTTPException(status_code=400, detail="baseline_mode must be 'captured' or 'reference'")
cfg.baseline_mode = bm
if "baseline_start" in data or "baseline_end" in data:
bs = _parse_date(data.get("baseline_start") or None, "baseline_start")
be = _parse_date(data.get("baseline_end") or None, "baseline_end")
if (bs and not be) or (be and not bs):
raise HTTPException(status_code=400, detail="Provide both baseline dates, or neither.")
if bs and be and bs > be:
raise HTTPException(status_code=400, detail="baseline_start must be on or before baseline_end.")
cfg.baseline_start, cfg.baseline_end = bs, be
if "recipients" in data:
recips = data["recipients"]
if isinstance(recips, list):
recips = ",".join(recips)
cfg.recipients = (recips or "").strip() or None
db.commit()
db.refresh(cfg)
return {**_config_dict(cfg, project_id), "created": created}
def _resolve_params(project_id, db, night_date, baseline_start, baseline_end, metrics):
"""Validate inputs and resolve the baseline source.
Explicit baseline dates in the query override (captured mode with those
dates). Otherwise the project's saved config supplies the baseline (its
mode + dates) and the default metric set — so the manual view/run match
what the scheduled report does.
Returns (night_date, baseline_mode, baseline_start, baseline_end, metric_keys).
"""
if not db.query(Project).filter_by(id=project_id).first():
raise HTTPException(status_code=404, detail="Project not found")
nd = _parse_date(night_date, "night_date") or _default_night_date()
bs = _parse_date(baseline_start, "baseline_start")
be = _parse_date(baseline_end, "baseline_end")
if (bs and not be) or (be and not bs):
raise HTTPException(status_code=400, detail="Provide both baseline_start and baseline_end, or neither.")
if bs and be and bs > be:
raise HTTPException(status_code=400, detail="baseline_start must be on or before baseline_end.")
cfg = db.query(SoundReportConfig).filter_by(project_id=project_id).first()
if bs and be:
baseline_mode = "captured" # explicit dates win
elif cfg:
baseline_mode = cfg.baseline_mode # fall back to saved config
bs, be = cfg.baseline_start, cfg.baseline_end
else:
baseline_mode = "captured"
if metrics:
metric_keys = _parse_metrics(metrics)
elif cfg and cfg.metric_keys:
metric_keys = _parse_metrics(cfg.metric_keys)
else:
metric_keys = list(DEFAULT_METRICS)
return nd, baseline_mode, bs, be, metric_keys
@router.get("/nightly/view", response_class=HTMLResponse)
async def view_nightly_report(
project_id: str,
night_date: Optional[str] = Query(None, description="Evening date of the night (YYYY-MM-DD). Default: last night."),
baseline_start: Optional[str] = Query(None, description="Baseline range start (YYYY-MM-DD)."),
baseline_end: Optional[str] = Query(None, description="Baseline range end (YYYY-MM-DD)."),
metrics: Optional[str] = Query(None, description="Comma list, e.g. lmax,l01,l10,l90. Default: house set."),
db: Session = Depends(get_db),
):
"""Render the night report and return the HTML inline (preview — no write, no email)."""
nd, bmode, bs, be, metric_keys = _resolve_params(project_id, db, night_date, baseline_start, baseline_end, metrics)
try:
result = run_nightly_report(
db, project_id, nd,
metric_keys=metric_keys, baseline_mode=bmode, baseline_start=bs, baseline_end=be,
send=False, # preview: no email
)
except HTTPException:
raise
except Exception as e: # noqa: BLE001
logger.error("nightly/view failed for %s (%s): %s", project_id, nd, e, exc_info=True)
raise HTTPException(status_code=500, detail=f"Report generation failed: {e}")
return HTMLResponse(result["html"])
@router.post("/nightly/run")
async def run_nightly_report_endpoint(
project_id: str,
night_date: Optional[str] = Query(None, description="Evening date of the night (YYYY-MM-DD). Default: last night."),
baseline_start: Optional[str] = Query(None, description="Baseline range start (YYYY-MM-DD)."),
baseline_end: Optional[str] = Query(None, description="Baseline range end (YYYY-MM-DD)."),
metrics: Optional[str] = Query(None, description="Comma list, e.g. lmax,l01,l10,l90. Default: house set."),
send: bool = Query(True, description="Attempt email (dry-run until SMTP is configured)."),
db: Session = Depends(get_db),
):
"""Run the night report: build → write report.html/report.json to disk → email (best-effort).
This is the same path the scheduled morning tick will call. The `html` field
is omitted from the JSON response (it's large and on disk); use /view to see it.
"""
nd, bmode, bs, be, metric_keys = _resolve_params(project_id, db, night_date, baseline_start, baseline_end, metrics)
try:
result = run_nightly_report(
db, project_id, nd,
metric_keys=metric_keys, baseline_mode=bmode, baseline_start=bs, baseline_end=be,
send=send,
)
except HTTPException:
raise
except Exception as e: # noqa: BLE001
logger.error("nightly/run failed for %s (%s): %s", project_id, nd, e, exc_info=True)
raise HTTPException(status_code=500, detail=f"Report generation failed: {e}")
result.pop("html", None) # keep the JSON response lean — view it via /view or the file
result["view_url"] = (
f"/api/projects/{project_id}/reports/nightly/view"
f"?night_date={nd:%Y-%m-%d}"
+ (f"&baseline_start={bs:%Y-%m-%d}&baseline_end={be:%Y-%m-%d}" if bs and be else "")
+ (f"&metrics={','.join(metric_keys)}")
)
return result
# ============================================================================
# Test email + generated-report archive
# ============================================================================
_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
@router.post("/test-email")
async def send_test_email(project_id: str, request: Request, db: Session = Depends(get_db)):
"""Send a small test email to verify the SMTP relay (dry-run if unconfigured).
Recipients: JSON body {"recipients": "..."} overrides; else the project's
configured recipients; else the REPORT_SMTP_RECIPIENTS env default.
"""
project = db.query(Project).filter_by(id=project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
try:
data = await request.json()
except Exception:
data = {}
raw = (data or {}).get("recipients")
if not raw:
cfg = db.query(SoundReportConfig).filter_by(project_id=project_id).first()
raw = cfg.recipients if cfg else None
recipients = None
if raw:
if isinstance(raw, list):
raw = ",".join(raw)
recipients = [r.strip() for r in raw.split(",") if r.strip()]
from backend.services.report_email import send_report_email
body = (
"<div style=\"font:14px Arial,sans-serif\">"
f"Terra-View test email for <b>{escape(project.name)}</b>.<br>"
"If you got this, the nightly sound-report email path is working.</div>"
)
return send_report_email("Terra-View — nightly report test email", body, recipients=recipients)
@router.get("/list")
async def list_reports(project_id: str, db: Session = Depends(get_db)):
"""List the generated report artifacts on disk for this project (newest first)."""
if not db.query(Project).filter_by(id=project_id).first():
raise HTTPException(status_code=404, detail="Project not found")
base = Path("data/reports") / project_id
out = []
if base.exists():
for d in sorted((p for p in base.iterdir() if p.is_dir()), key=lambda p: p.name, reverse=True):
html_file = d / "report.html"
if html_file.exists():
st = html_file.stat()
out.append({
"night_date": d.name,
"view_url": f"/api/projects/{project_id}/reports/archive/{d.name}",
"xlsx_url": (f"/api/projects/{project_id}/reports/archive/{d.name}/xlsx"
if (d / "report.xlsx").exists() else None),
"size_bytes": st.st_size,
"generated_at": datetime.utcfromtimestamp(st.st_mtime).isoformat(),
})
return {"reports": out, "count": len(out)}
@router.get("/archive/{night_date}", response_class=HTMLResponse)
async def view_archived_report(project_id: str, night_date: str, db: Session = Depends(get_db)):
"""Serve a previously generated report.html from disk (the actual artifact)."""
if not db.query(Project).filter_by(id=project_id).first():
raise HTTPException(status_code=404, detail="Project not found")
if not _DATE_RE.match(night_date):
raise HTTPException(status_code=400, detail="Invalid date (YYYY-MM-DD)")
safe = _parse_date(night_date, "night_date") # also guards path traversal
path = Path("data/reports") / project_id / f"{safe:%Y-%m-%d}" / "report.html"
if not path.exists():
raise HTTPException(status_code=404, detail="No saved report for that date")
return HTMLResponse(path.read_text(encoding="utf-8"))
@router.get("/archive/{night_date}/xlsx")
async def download_archived_xlsx(project_id: str, night_date: str, db: Session = Depends(get_db)):
"""Download a previously generated report.xlsx from disk."""
from fastapi.responses import Response
if not db.query(Project).filter_by(id=project_id).first():
raise HTTPException(status_code=404, detail="Project not found")
if not _DATE_RE.match(night_date):
raise HTTPException(status_code=400, detail="Invalid date (YYYY-MM-DD)")
safe = _parse_date(night_date, "night_date")
path = Path("data/reports") / project_id / f"{safe:%Y-%m-%d}" / "report.xlsx"
if not path.exists():
raise HTTPException(status_code=404, detail="No saved spreadsheet for that date")
return Response(
content=path.read_bytes(),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="night_report_{safe:%Y-%m-%d}.xlsx"'},
)
# ============================================================================
# Reference baseline (fixed values typed per location — limits / prior averages)
# ============================================================================
@router.get("/baseline")
async def get_baseline(project_id: str, db: Session = Depends(get_db)):
"""Return the baseline mode + per-location reference values + the metric/window
grid to render the editor."""
if not db.query(Project).filter_by(id=project_id).first():
raise HTTPException(status_code=404, detail="Project not found")
cfg = db.query(SoundReportConfig).filter_by(project_id=project_id).first()
mode = cfg.baseline_mode if cfg else "captured"
metric_keys = _parse_metrics(cfg.metric_keys) if cfg and cfg.metric_keys else list(DEFAULT_METRICS)
locations = db.query(MonitoringLocation).filter_by(
project_id=project_id, location_type="sound",
).order_by(MonitoringLocation.sort_order, MonitoringLocation.name).all()
locations = [l for l in locations if getattr(l, "removed_at", None) is None]
return {
"mode": mode,
"windows": [{"key": w.key, "label": w.label} for w in DEFAULT_WINDOWS],
"metrics": [{"key": k, "label": METRIC_REGISTRY[k].label} for k in metric_keys],
"locations": [
{"id": loc.id, "name": loc.name, "values": _location_reference_baseline(loc)}
for loc in locations
],
}
@router.put("/baseline")
async def put_baseline(project_id: str, request: Request, db: Session = Depends(get_db)):
"""Save the baseline mode (on config) and per-location reference values
(on each location's metadata). Body:
{"mode": "reference",
"locations": {"<loc_id>": {"nighttime": {"l10": 85}, "evening": {...}}}}
"""
if not db.query(Project).filter_by(id=project_id).first():
raise HTTPException(status_code=404, detail="Project not found")
data = await request.json()
if "mode" in data:
bm = str(data["mode"]).lower()
if bm not in ("captured", "reference"):
raise HTTPException(status_code=400, detail="mode must be 'captured' or 'reference'")
cfg = db.query(SoundReportConfig).filter_by(project_id=project_id).first()
if cfg is None:
cfg = SoundReportConfig(id=str(uuid.uuid4()), project_id=project_id)
db.add(cfg)
cfg.baseline_mode = bm
loc_values = data.get("locations") or {}
updated = 0
for loc_id, windows in loc_values.items():
loc = db.query(MonitoringLocation).filter_by(id=loc_id, project_id=project_id).first()
if not loc or not isinstance(windows, dict):
continue
try:
meta = json.loads(loc.location_metadata or "{}")
except (json.JSONDecodeError, TypeError):
meta = {}
clean: dict = {}
for wkey, mvals in windows.items():
if not isinstance(mvals, dict):
continue
cm = {}
for mkey, val in mvals.items():
if val in (None, ""):
continue
try:
cm[mkey] = round(float(val), 1)
except (ValueError, TypeError):
continue
if cm:
clean[wkey] = cm
if clean:
meta["report_baseline"] = clean
else:
meta.pop("report_baseline", None)
loc.location_metadata = json.dumps(meta)
updated += 1
db.commit()
return {"ok": True, "locations_updated": updated}
+172
View File
@@ -0,0 +1,172 @@
"""
Report email sender — config-driven SMTP via the Python standard library.
Connection settings come from environment variables so the mail backend
(internal relay / Microsoft 365 / Gmail / SendGrid) can be swapped without code
changes — see the build plan: terra-mechanics.com is on M365 and has a smarthost
relay that already sends the seismograph alerts as remote@terra-mechanics.com;
reuse that relay's settings here.
DRY-RUN: if SMTP isn't configured (no host/from), the message is built and
logged but NOT sent, and the call still succeeds. This keeps report generation
working before the relay is wired up, and means a missing/incomplete mail config
can never crash the nightly pipeline.
Env vars
--------
REPORT_SMTP_HOST e.g. smtp.office365.com (unset → dry-run)
REPORT_SMTP_PORT default 587
REPORT_SMTP_SECURITY starttls (default) | ssl | none
REPORT_SMTP_USER optional — omit for IP-authenticated relays
REPORT_SMTP_PASSWORD optional
REPORT_SMTP_FROM e.g. "TMI Monitoring <monitoring@terra-mechanics.com>"
REPORT_SMTP_RECIPIENTS comma-separated default recipient list
REPORT_SMTP_TIMEOUT seconds, default 30
"""
from __future__ import annotations
import logging
import os
import smtplib
import ssl
from dataclasses import dataclass, field
from email.message import EmailMessage
from typing import Optional
logger = logging.getLogger(__name__)
# Convenient MIME type for the Excel attachment.
XLSX_MIME = ("application", "vnd.openxmlformats-officedocument.spreadsheetml.sheet")
@dataclass
class Attachment:
filename: str
content: bytes
maintype: str = "application"
subtype: str = "octet-stream"
@dataclass
class SMTPConfig:
host: str = ""
port: int = 587
security: str = "starttls" # "starttls" | "ssl" | "none"
user: str = ""
password: str = ""
sender: str = ""
recipients: list[str] = field(default_factory=list)
timeout: float = 30.0
@classmethod
def from_env(cls) -> "SMTPConfig":
rec = os.getenv("REPORT_SMTP_RECIPIENTS", "")
return cls(
host=os.getenv("REPORT_SMTP_HOST", "").strip(),
port=int(os.getenv("REPORT_SMTP_PORT", "587") or 587),
security=os.getenv("REPORT_SMTP_SECURITY", "starttls").strip().lower(),
user=os.getenv("REPORT_SMTP_USER", "").strip(),
password=os.getenv("REPORT_SMTP_PASSWORD", ""),
sender=os.getenv("REPORT_SMTP_FROM", "").strip(),
recipients=[r.strip() for r in rec.split(",") if r.strip()],
timeout=float(os.getenv("REPORT_SMTP_TIMEOUT", "30") or 30),
)
@property
def configured(self) -> bool:
"""True only when we have enough to actually send (host + from)."""
return bool(self.host and self.sender)
def build_message(
cfg: SMTPConfig,
subject: str,
html_body: str,
recipients: list[str],
attachments: Optional[list[Attachment]] = None,
text_body: Optional[str] = None,
) -> EmailMessage:
"""Assemble a multipart message: plain-text fallback + HTML + attachments."""
msg = EmailMessage()
msg["From"] = cfg.sender or "terra-view@localhost"
msg["To"] = ", ".join(recipients)
msg["Subject"] = subject
# Plain-text part first, then the HTML alternative (clients prefer the HTML).
msg.set_content(text_body or "This report is best viewed in an HTML email client.")
msg.add_alternative(html_body, subtype="html")
for att in (attachments or []):
msg.add_attachment(
att.content, maintype=att.maintype, subtype=att.subtype, filename=att.filename,
)
return msg
def send_report_email(
subject: str,
html_body: str,
*,
attachments: Optional[list[Attachment]] = None,
recipients: Optional[list[str]] = None,
text_body: Optional[str] = None,
cfg: Optional[SMTPConfig] = None,
) -> dict:
"""Send (or dry-run) the report email.
Returns a result dict: {sent, dry_run, recipients, error}. Never raises on
a send failure — it logs and returns error, so the orchestrator can record
the failure without aborting the rest of the pipeline.
"""
cfg = cfg or SMTPConfig.from_env()
recipients = recipients if recipients is not None else cfg.recipients
result = {"sent": False, "dry_run": False, "recipients": recipients, "error": None}
if not recipients:
result["error"] = "No recipients configured"
logger.warning("Report email: no recipients set; skipping send of %r", subject)
return result
msg = build_message(cfg, subject, html_body, recipients, attachments, text_body)
if not cfg.configured:
result["dry_run"] = True
logger.info(
"Report email DRY-RUN (SMTP not configured): would send %r to %s with %d attachment(s)",
subject, recipients, len(attachments or []),
)
return result
# Validate the security mode: an unrecognized value (typo) must NOT silently
# fall through to a plaintext connection while still sending credentials.
sec = cfg.security if cfg.security in ("ssl", "starttls", "none") else "starttls"
if sec != cfg.security:
logger.warning("Unknown REPORT_SMTP_SECURITY=%r — falling back to 'starttls'", cfg.security)
try:
if sec == "ssl":
ctx = ssl.create_default_context()
with smtplib.SMTP_SSL(cfg.host, cfg.port, timeout=cfg.timeout, context=ctx) as s:
if cfg.user:
s.login(cfg.user, cfg.password)
s.send_message(msg)
else:
with smtplib.SMTP(cfg.host, cfg.port, timeout=cfg.timeout) as s:
s.ehlo()
if sec == "starttls":
s.starttls(context=ssl.create_default_context())
s.ehlo()
if cfg.user:
if sec == "none":
logger.warning(
"Sending SMTP credentials over an UNENCRYPTED connection "
"(REPORT_SMTP_SECURITY=none) — set starttls/ssl if the relay supports it."
)
s.login(cfg.user, cfg.password)
s.send_message(msg)
result["sent"] = True
logger.info("Report email sent: %r to %s", subject, recipients)
except Exception as e: # noqa: BLE001 — surface as result, never abort the pipeline
result["error"] = str(e)
logger.error("Report email send failed: %s", e, exc_info=True)
return result
+150
View File
@@ -0,0 +1,150 @@
"""
Nightly Report Orchestrator.
Ties the pieces together: compute → render → write-to-disk → email.
This is what the daily cycle (or a manual trigger) calls. It ALWAYS writes the
rendered report to disk — `data/reports/{project_id}/{night_date}/report.html`
(+ `report.json` with the raw numbers) — so there's a viewable artifact even
when email is in dry-run (SMTP not configured yet). The email step is
best-effort and never aborts the run.
"""
from __future__ import annotations
import json
import logging
from datetime import date
from pathlib import Path
from typing import Optional
from sqlalchemy.orm import Session
from backend.services.report_pipeline import (
ProjectNightReport, build_project_night_report, Window,
)
from backend.services.report_renderers import render_html_summary, render_excel
from backend.services.report_email import send_report_email, Attachment, XLSX_MIME
logger = logging.getLogger(__name__)
DEFAULT_OUTPUT_ROOT = "data/reports"
def _report_to_dict(report: ProjectNightReport) -> dict:
"""Serialise the report data model to plain JSON (for the on-disk record)."""
return {
"project_id": report.project_id,
"project_name": report.project_name,
"night_date": report.night_date.isoformat(),
"metrics": [m.key for m in report.metrics],
"locations": [
{
"name": loc.location_name,
"night_interval_count": loc.night_interval_count,
"baseline_nights_used": loc.baseline_nights_used,
"notes": loc.notes,
"windows": {
w.key: {
"label": w.label,
"metrics": {
m.key: {
"label": m.label,
"last_night": loc.table[w.key][m.key].last_night,
"baseline": loc.table[w.key][m.key].baseline,
"delta": loc.table[w.key][m.key].delta,
}
for m in loc.metrics
},
}
for w in loc.windows
},
}
for loc in report.locations
],
}
def run_nightly_report(
db: Session,
project_id: str,
night_date: date,
*,
metric_keys: Optional[list[str]] = None,
windows: Optional[list[Window]] = None,
baseline_mode: str = "captured",
baseline_start: Optional[date] = None,
baseline_end: Optional[date] = None,
recipients: Optional[list[str]] = None,
output_root: str = DEFAULT_OUTPUT_ROOT,
send: bool = True,
) -> dict:
"""Build, persist, and (dry-run) email the night report for a project.
Returns a result dict with the on-disk artifact paths and the email result.
Designed to be called from the daily cycle or a manual trigger.
"""
report = build_project_night_report(
db, project_id, night_date,
metric_keys=metric_keys, windows=windows,
baseline_mode=baseline_mode,
baseline_start=baseline_start, baseline_end=baseline_end,
)
html = render_html_summary(report)
subject = f"{report.project_name} — night report {night_date:%m/%d/%y}"
# --- Always persist a viewable copy ---
out_dir = Path(output_root) / project_id / f"{night_date:%Y-%m-%d}"
out_dir.mkdir(parents=True, exist_ok=True)
html_path = out_dir / "report.html"
html_path.write_text(html, encoding="utf-8")
json_path = out_dir / "report.json"
json_path.write_text(json.dumps(_report_to_dict(report), indent=2), encoding="utf-8")
# --- Excel (the email attachment; also written to disk for the archive) ---
attachments: list[Attachment] = []
xlsx_path = None
try:
xlsx_bytes = render_excel(report)
xlsx_path = out_dir / "report.xlsx"
xlsx_path.write_bytes(xlsx_bytes)
safe_name = "".join(c for c in report.project_name if c.isalnum() or c in " -_").strip().replace(" ", "_")
attachments.append(Attachment(
f"{safe_name or 'report'}_{night_date:%Y-%m-%d}_night_report.xlsx",
xlsx_bytes, *XLSX_MIME,
))
except Exception as e: # noqa: BLE001 — never let the spreadsheet sink the report
logger.error("Excel render failed for %s (%s): %s", project_id, night_date, e, exc_info=True)
# --- Email (best-effort; dry-run until SMTP is configured) ---
email_result = {"sent": False, "dry_run": False, "skipped": True, "error": None}
if send:
try:
email_result = send_report_email(
subject, html, attachments=attachments, recipients=recipients,
)
except Exception as e: # noqa: BLE001 — artifacts are already written; never abort on email
logger.error("send_report_email raised for %s (%s): %s", project_id, night_date, e, exc_info=True)
email_result = {"sent": False, "dry_run": False, "skipped": False, "error": str(e)}
result = {
"project_id": project_id,
"project_name": report.project_name,
"night_date": night_date.isoformat(),
"subject": subject,
"location_count": len(report.locations),
"html_path": str(html_path),
"json_path": str(json_path),
"xlsx_path": str(xlsx_path) if xlsx_path else None,
"html": html, # for callers that want to display it inline
"email": email_result,
}
logger.info(
"Nightly report for %s (%s): %d location(s) → %s; email=%s",
report.project_name, night_date, len(report.locations), html_path,
"sent" if email_result.get("sent") else
("dry-run" if email_result.get("dry_run") else
("skipped" if email_result.get("skipped") else f"error: {email_result.get('error')}")),
)
return result
+432
View File
@@ -0,0 +1,432 @@
"""
Nightly Report Pipeline — computation core.
Builds the data model for the John-Myler-style "last night vs. baseline" sound
report. Source-agnostic: it reads the same on-disk Leq `.rnd` files the manual
upload + FTP-pull ingest produce (see `project_locations.ingest_nrl_zip`).
Design notes
------------
* **Ingest everything, report selectively.** Ingest preserves every column of
the Leq file; this layer chooses which *metrics* to surface via `metric_keys`
(a future report wizard is just a UI over that list).
* **House format match.** Defaults reproduce the existing Excel report:
LAmax (max of interval maxima), LA01 / LA10 (arithmetic average), split into
Evening (710PM) and Nighttime (10PM7AM) windows. L90 (background) is added
for the baseline comparison.
* **Metric labelling from the device.** The LN→percentile assignment is
reconfigurable per job; we resolve which `LNx(Main)` column is L90/L10/etc.
from the percentile map captured in the session metadata at ingest, falling
back to the NL-43 default order.
* **Correct averaging.** Leq is energy-averaged (logarithmic); percentiles and
Lmax are arithmetic. Baseline references combine the per-night values into a
"typical night" (arithmetic mean of per-night values — so baseline Lmax is the
typical nightly peak, not the worst-of-week).
"""
from __future__ import annotations
import json
import logging
import math
from dataclasses import dataclass, field
from datetime import datetime, timedelta, date
from typing import Optional
from sqlalchemy.orm import Session
from backend.models import MonitoringSession, DataFile, MonitoringLocation, Project
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Metric registry
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class Metric:
"""A reportable metric.
`agg` is the *within-night* aggregation used to collapse a window's 15-min
intervals into one value:
- "max" → loudest interval (LAmax)
- "arith" → arithmetic mean (percentiles: L01/L10/L90…)
- "log" → energy/logarithmic mean (Leq only)
`column` pins a fixed .rnd column; `percentile` instead resolves the LNx
column from the session's captured percentile map.
"""
key: str
label: str
agg: str
column: Optional[str] = None
percentile: Optional[float] = None
METRIC_REGISTRY: dict[str, Metric] = {
"lmax": Metric("lmax", "LAmax", "max", column="Lmax(Main)"),
"leq": Metric("leq", "LAeq", "log", column="Leq(Main)"),
"lmin": Metric("lmin", "LAmin", "arith", column="Lmin(Main)"),
"l01": Metric("l01", "LA01", "arith", percentile=1.0),
"l10": Metric("l10", "LA10", "arith", percentile=10.0),
"l50": Metric("l50", "LA50", "arith", percentile=50.0),
"l90": Metric("l90", "LA90", "arith", percentile=90.0),
"l95": Metric("l95", "LA95", "arith", percentile=95.0),
}
# House report metrics + L90 (background) for the baseline comparison.
DEFAULT_METRICS: list[str] = ["lmax", "l01", "l10", "l90"]
# NL-43 default percentile→slot assignment, used when a session has no captured map.
_DEFAULT_SLOT_FOR_PCT: dict[float, int] = {1.0: 1, 10.0: 2, 50.0: 3, 90.0: 4, 95.0: 5}
def _resolve_column(metric: Metric, pct_map: dict) -> Optional[str]:
"""Resolve the .rnd column for a metric, using the session's percentile map."""
if metric.column:
return metric.column
if metric.percentile is None:
return None
# pct_map: {"1": "1.0", "2": "10.0", "4": "90.0", ...} → slot : percentile
if pct_map:
for slot, pval in pct_map.items():
try:
if float(pval) == metric.percentile:
return f"LN{int(slot)}(Main)"
except (ValueError, TypeError):
continue
slot = _DEFAULT_SLOT_FOR_PCT.get(metric.percentile)
return f"LN{slot}(Main)" if slot else None
# ---------------------------------------------------------------------------
# Time windows
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class Window:
key: str
label: str
start_hour: int
end_hour: int
def contains(self, hour: int) -> bool:
if self.start_hour < self.end_hour:
return self.start_hour <= hour < self.end_hour
return hour >= self.start_hour or hour < self.end_hour
# Matches the existing Excel report's stats table.
DEFAULT_WINDOWS: list[Window] = [
Window("evening", "Evening (7PM10PM)", 19, 22),
Window("nighttime", "Nighttime (10PM7AM)", 22, 7),
]
# The full night used to select which intervals belong to "last night".
NIGHT_START_HOUR = 19
NIGHT_LENGTH_HOURS = 12
# ---------------------------------------------------------------------------
# Aggregation
# ---------------------------------------------------------------------------
def _aggregate(values: list, method: str) -> Optional[float]:
"""Collapse a window's interval values into one number per `method`."""
vals = [v for v in values if isinstance(v, (int, float))]
if not vals:
return None
if method == "max":
return round(max(vals), 1)
if method == "log":
return round(10 * math.log10(sum(10 ** (v / 10.0) for v in vals) / len(vals)), 1)
return round(sum(vals) / len(vals), 1) # arithmetic
def _combine_across_nights(per_night: list, method: str) -> Optional[float]:
"""Combine per-night window values into a baseline 'typical night' value.
Arithmetic mean for max/arith metrics (so baseline Lmax = typical nightly
peak, the agreed default), logarithmic mean for Leq.
"""
vals = [v for v in per_night if v is not None]
if not vals:
return None
if method == "log":
return round(10 * math.log10(sum(10 ** (v / 10.0) for v in vals) / len(vals)), 1)
return round(sum(vals) / len(vals), 1)
# ---------------------------------------------------------------------------
# Row gathering
# ---------------------------------------------------------------------------
def _parse_dt(s: str) -> Optional[datetime]:
try:
return datetime.strptime(s, "%Y/%m/%d %H:%M:%S")
except (ValueError, TypeError):
return None
def _location_leq_rows(db: Session, location_id: str) -> list[tuple[datetime, dict, dict]]:
"""All Leq intervals at a location as (interval_dt, row, percentile_map).
Reuses the same .rnd readers as the report endpoints so parsing stays
identical. Times are the meter's local clock (as written in the file).
"""
# Lazy import avoids a service→router import cycle at module load.
from backend.routers.projects import (
_read_rnd_file_rows, _normalize_rnd_rows, _is_leq_file, _peek_rnd_headers,
)
from pathlib import Path
out: list[tuple[datetime, dict, dict]] = []
sessions = db.query(MonitoringSession).filter_by(
location_id=location_id, session_type="sound",
).all()
for s in sessions:
try:
meta = json.loads(s.session_metadata or "{}")
except (json.JSONDecodeError, TypeError):
meta = {}
pct_map = meta.get("percentiles", {}) or {}
for f in db.query(DataFile).filter_by(session_id=s.id).all():
if not f.file_path or not f.file_path.lower().endswith(".rnd"):
continue
peek = _peek_rnd_headers(Path("data") / f.file_path)
if not _is_leq_file(f.file_path, peek):
continue
rows = _read_rnd_file_rows(f.file_path)
rows, _ = _normalize_rnd_rows(rows)
for r in rows:
dt = _parse_dt(r.get("Start Time", ""))
if dt:
out.append((dt, r, pct_map))
out.sort(key=lambda t: t[0])
return out
def _rows_in_night(rows: list, night_date: date) -> list:
"""Rows falling in the night that *starts* on night_date (19:00 → +12h)."""
start = datetime(night_date.year, night_date.month, night_date.day, NIGHT_START_HOUR, 0)
end = start + timedelta(hours=NIGHT_LENGTH_HOURS)
return [(dt, r, p) for (dt, r, p) in rows if start <= dt < end]
def _eligible_nights(rows: list, start_date: date, end_date: date) -> list[date]:
"""Evening-dates in [start_date, end_date] that actually have night data."""
nights = []
cur = start_date
while cur <= end_date:
if _rows_in_night(rows, cur):
nights.append(cur)
cur += timedelta(days=1)
return nights
def _window_value(rows: list, metric: Metric, window: Window) -> Optional[float]:
"""Single aggregated value for one metric over one window of `rows`."""
vals = []
for dt, r, pct_map in rows:
if window.contains(dt.hour):
col = _resolve_column(metric, pct_map)
if col:
vals.append(r.get(col))
return _aggregate(vals, metric.agg)
# ---------------------------------------------------------------------------
# Report data model
# ---------------------------------------------------------------------------
@dataclass
class CellPair:
last_night: Optional[float]
baseline: Optional[float]
@property
def delta(self) -> Optional[float]:
if self.last_night is None or self.baseline is None:
return None
return round(self.last_night - self.baseline, 1)
@dataclass
class LocationNightReport:
location_id: str
location_name: str
night_date: date
metrics: list[Metric]
windows: list[Window]
# table[window_key][metric_key] = CellPair
table: dict[str, dict[str, CellPair]]
interval_series: list[dict]
night_interval_count: int
baseline_nights_used: int
notes: list[str] = field(default_factory=list)
def _location_reference_baseline(loc) -> dict:
"""A location's manually-entered reference baseline, from its metadata.
Shape: {window_key: {metric_key: float}} e.g. {"nighttime": {"l10": 85.0}}.
Used when baseline_mode == "reference" — fixed targets/limits or prior-report
averages typed in, rather than computed from captured nights.
"""
if not loc:
return {}
try:
meta = json.loads(loc.location_metadata or "{}")
except (json.JSONDecodeError, TypeError):
return {}
ref = meta.get("report_baseline") or {}
out: dict[str, dict[str, float]] = {}
if isinstance(ref, dict):
for wkey, mvals in ref.items():
if not isinstance(mvals, dict):
continue
clean = {}
for mkey, val in mvals.items():
try:
clean[mkey] = float(val)
except (ValueError, TypeError):
continue
if clean:
out[wkey] = clean
return out
def build_location_night_report(
db: Session,
location_id: str,
night_date: date,
*,
metric_keys: Optional[list[str]] = None,
windows: Optional[list[Window]] = None,
baseline_mode: str = "captured",
baseline_start: Optional[date] = None,
baseline_end: Optional[date] = None,
) -> LocationNightReport:
"""Build the night-vs-baseline data model for one location.
`night_date` is the *evening* date of the night being reported (e.g. the
7/7 in "night of 7/7 → morning 7/8"). Baseline comes from one of:
- "captured": the typical-night value across eligible nights in
[baseline_start, baseline_end] (computed from recorded data);
- "reference": fixed values typed per location (a spec limit like
"L10 = 85", or a prior report's averages).
"""
metric_keys = metric_keys or DEFAULT_METRICS
metrics = [METRIC_REGISTRY[k] for k in metric_keys]
windows = windows or DEFAULT_WINDOWS
loc = db.query(MonitoringLocation).filter_by(id=location_id).first()
loc_name = loc.name if loc else location_id
all_rows = _location_leq_rows(db, location_id)
night_rows = _rows_in_night(all_rows, night_date)
reference = _location_reference_baseline(loc) if baseline_mode == "reference" else {}
baseline_nights: list[date] = []
if baseline_mode != "reference" and baseline_start and baseline_end:
baseline_nights = _eligible_nights(all_rows, baseline_start, baseline_end)
# Don't let the reported night double as its own baseline.
baseline_nights = [n for n in baseline_nights if n != night_date]
table: dict[str, dict[str, CellPair]] = {}
for w in windows:
table[w.key] = {}
for m in metrics:
last_night_val = _window_value(night_rows, m, w)
if baseline_mode == "reference":
baseline_val = reference.get(w.key, {}).get(m.key)
elif baseline_nights:
per_night = [
_window_value(_rows_in_night(all_rows, nd), m, w)
for nd in baseline_nights
]
baseline_val = _combine_across_nights(per_night, m.agg)
else:
baseline_val = None
table[w.key][m.key] = CellPair(last_night_val, baseline_val)
interval_series = []
for dt, r, pct_map in night_rows:
entry = {"dt": dt, "time": dt.strftime("%H:%M")}
for m in metrics:
col = _resolve_column(m, pct_map)
val = r.get(col) if col else None
entry[m.key] = val if isinstance(val, (int, float)) else None
interval_series.append(entry)
notes: list[str] = []
if not night_rows:
notes.append(f"No data found for the night of {night_date:%m/%d/%y}.")
if baseline_mode == "reference":
if not any(reference.values()):
notes.append("Reference-baseline mode is on but no reference values are set for this location.")
elif (baseline_start or baseline_end) and not baseline_nights:
notes.append("No baseline nights with data in the configured range.")
return LocationNightReport(
location_id=location_id,
location_name=loc_name,
night_date=night_date,
metrics=metrics,
windows=windows,
table=table,
interval_series=interval_series,
night_interval_count=len(night_rows),
baseline_nights_used=len(baseline_nights),
notes=notes,
)
@dataclass
class ProjectNightReport:
project_id: str
project_name: str
night_date: date
metrics: list[Metric]
locations: list[LocationNightReport]
def build_project_night_report(
db: Session,
project_id: str,
night_date: date,
*,
metric_keys: Optional[list[str]] = None,
windows: Optional[list[Window]] = None,
baseline_mode: str = "captured",
baseline_start: Optional[date] = None,
baseline_end: Optional[date] = None,
) -> ProjectNightReport:
"""Build the night report for every active sound location in a project."""
metric_keys = metric_keys or DEFAULT_METRICS
project = db.query(Project).filter_by(id=project_id).first()
project_name = project.name if project else project_id
locations = db.query(MonitoringLocation).filter_by(
project_id=project_id, location_type="sound",
).order_by(MonitoringLocation.sort_order, MonitoringLocation.name).all()
locations = [l for l in locations if getattr(l, "removed_at", None) is None]
reports = [
build_location_night_report(
db, loc.id, night_date,
metric_keys=metric_keys, windows=windows,
baseline_mode=baseline_mode,
baseline_start=baseline_start, baseline_end=baseline_end,
)
for loc in locations
]
return ProjectNightReport(
project_id=project_id,
project_name=project_name,
night_date=night_date,
metrics=[METRIC_REGISTRY[k] for k in metric_keys],
locations=reports,
)
+240
View File
@@ -0,0 +1,240 @@
"""
Nightly Report Renderers.
Pluggable renderers over the `report_pipeline` data model. v1 ships the HTML
email body + the Excel attachment; PDF and an inline chart image are v1.1
(each needs a new dependency). Keeping renderers separate from the compute
core means a future report wizard just toggles metrics/renderers — the data
model is unchanged.
Email-client constraints: the HTML uses a table layout with **inline styles
only** (no <style> blocks, no external CSS, no fl/grid), which is the reliable
common denominator across Outlook / Gmail / Apple Mail.
"""
from __future__ import annotations
from html import escape
from backend.services.report_pipeline import ProjectNightReport, LocationNightReport
# Colours: louder-than-baseline reads as a concern (red), quieter as fine (green).
_RED = "#b00020"
_GREEN = "#1a7f37"
_GREY = "#888888"
def _fmt_value(v) -> str:
return f"{v:.1f}" if isinstance(v, (int, float)) else ""
def _fmt_delta(v) -> str:
"""Signed delta with colour; positive (louder) = red, negative (quieter) = green."""
if not isinstance(v, (int, float)):
return f'<span style="color:{_GREY}">—</span>'
if v > 0:
return f'<span style="color:{_RED}">+{v:.1f}</span>'
if v < 0:
return f'<span style="color:{_GREEN}">{v:.1f}</span>'
return f'<span style="color:{_GREY}">0.0</span>'
def _location_table(loc: LocationNightReport) -> str:
"""One location block: heading + Metric × (window: Last / Base / Δ) table."""
th = ('padding:5px 9px;border:1px solid #ccc;background:#f2f2f2;'
'font:bold 12px Arial,sans-serif;text-align:center')
sub = ('padding:4px 8px;border:1px solid #ccc;background:#fafafa;'
'font:11px Arial,sans-serif;text-align:center;color:#555')
td = 'padding:4px 9px;border:1px solid #ccc;font:12px Arial,sans-serif;text-align:center'
td_l = 'padding:4px 9px;border:1px solid #ccc;font:bold 12px Arial,sans-serif;text-align:left'
# Top header: blank label cell + each window spanning Last/Base/Δ
top = f'<th rowspan="2" style="{th}">Metric (dBA)</th>'
for w in loc.windows:
top += f'<th colspan="3" style="{th}">{escape(w.label)}</th>'
sub_row = ''.join(
f'<th style="{sub}">Last</th><th style="{sub}">Base</th><th style="{sub}">&Delta;</th>'
for _ in loc.windows
)
body = ''
for m in loc.metrics:
cells = ''
for w in loc.windows:
cp = loc.table[w.key][m.key]
cells += (f'<td style="{td}">{_fmt_value(cp.last_night)}</td>'
f'<td style="{td}">{_fmt_value(cp.baseline)}</td>'
f'<td style="{td}">{_fmt_delta(cp.delta)}</td>')
body += f'<tr><td style="{td_l}">{escape(m.label)}</td>{cells}</tr>'
meta = (f'{loc.night_interval_count} intervals'
+ (f' · baseline = {loc.baseline_nights_used} night(s)'
if loc.baseline_nights_used else ' · no baseline yet'))
notes = ''
if loc.notes:
notes = ('<div style="font:11px Arial,sans-serif;color:#b00020;margin:2px 0 0">'
+ '<br>'.join(escape(n) for n in loc.notes) + '</div>')
return (
f'<h3 style="font:bold 15px Arial,sans-serif;margin:18px 0 4px">{escape(loc.location_name)}</h3>'
f'<div style="font:11px Arial,sans-serif;color:#666;margin:0 0 6px">{escape(meta)}</div>'
f'<table style="border-collapse:collapse;border:1px solid #ccc">'
f'<thead><tr>{top}</tr><tr>{sub_row}</tr></thead>'
f'<tbody>{body}</tbody></table>{notes}'
)
def render_html_summary(report: ProjectNightReport) -> str:
"""Render the full email-body HTML for a project's night report."""
windows_desc = ", ".join(w.label for w in (report.locations[0].windows if report.locations else []))
header = (
f'<h2 style="font:bold 18px Arial,sans-serif;margin:0 0 2px">'
f'{escape(report.project_name)} — Night Report</h2>'
f'<div style="font:13px Arial,sans-serif;color:#444;margin:0 0 4px">'
f'Night of {report.night_date:%a %m/%d/%y} &nbsp;·&nbsp; last night vs. baseline</div>'
f'<div style="font:11px Arial,sans-serif;color:#888;margin:0 0 10px">'
f'Windows: {escape(windows_desc)}. '
f'&Delta; = last night minus baseline (<span style="color:{_RED}">+ louder</span>, '
f'<span style="color:{_GREEN}"> quieter</span>). '
f'LAmax = loudest interval; L-values are arithmetic averages; '
f'baseline = typical night.</div>'
)
if not report.locations:
body = ('<div style="font:13px Arial,sans-serif;color:#b00020">'
'No sound locations found for this project.</div>')
else:
body = ''.join(_location_table(loc) for loc in report.locations)
footer = ('<div style="font:10px Arial,sans-serif;color:#aaa;margin-top:18px">'
'Automated report — Terra-View. Full interval data in the attached spreadsheet.</div>')
return (f'<!DOCTYPE html><html><body style="margin:0;padding:16px;background:#fff">'
f'{header}{body}{footer}</body></html>')
# ---------------------------------------------------------------------------
# Excel renderer (the email attachment) — one sheet per location:
# interval table + line chart + a Last/Baseline/Δ summary per window.
# Metric-driven, so it adapts to whatever metric set is configured.
# ---------------------------------------------------------------------------
def _safe_sheet_name(name: str) -> str:
bad = set('[]:*?/\\')
cleaned = "".join(c for c in (name or "Location") if c not in bad).strip()
return (cleaned or "Location")[:31]
def render_excel(report: ProjectNightReport) -> bytes:
"""Render the night report as an .xlsx (bytes). One worksheet per location."""
import io as _io
import openpyxl
from openpyxl.chart import LineChart, Reference
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
wb = openpyxl.Workbook()
wb.remove(wb.active)
f_title = Font(name="Arial", bold=True, size=13)
f_h = Font(name="Arial", bold=True, size=10)
f_d = Font(name="Arial", size=10)
f_note = Font(name="Arial", size=9, italic=True, color="888888")
center = Alignment(horizontal="center", vertical="center")
hdr_fill = PatternFill("solid", fgColor="F2F2F2")
thin = Side(style="thin")
box = Border(left=thin, right=thin, top=thin, bottom=thin)
if not report.locations:
ws = wb.create_sheet("No data")
ws["A1"] = f"{report.project_name} — no sound locations"
ws["A1"].font = f_title
used_names: set = set()
for loc in report.locations:
sheet_name = _safe_sheet_name(loc.location_name)
n, base = sheet_name, sheet_name
i = 2
while n in used_names:
n = (base[:28] + f"_{i}"); i += 1
used_names.add(n)
ws = wb.create_sheet(n)
metrics = loc.metrics
ws["A1"] = f"{report.project_name} — Night Report"; ws["A1"].font = f_title
ws["A2"] = loc.location_name; ws["A2"].font = f_h
ws["A3"] = f"Night of {loc.night_date:%m/%d/%y} · 7PM7AM"; ws["A3"].font = f_d
# --- interval table ---
hr = 5
cols = ["Interval #", "Date", "Time"] + [m.label for m in metrics] + ["Comments"]
for ci, label in enumerate(cols, 1):
c = ws.cell(row=hr, column=ci, value=label)
c.font = f_h; c.alignment = center; c.fill = hdr_fill; c.border = box
r = hr + 1
for idx, entry in enumerate(loc.interval_series, 1):
ws.cell(row=r, column=1, value=idx).border = box
dt = entry.get("dt")
ws.cell(row=r, column=2, value=(dt.strftime("%m/%d/%y") if dt else "")).border = box
ws.cell(row=r, column=3, value=entry.get("time", "")).border = box
for mi, m in enumerate(metrics):
v = entry.get(m.key)
cc = ws.cell(row=r, column=4 + mi, value=(v if isinstance(v, (int, float)) else None))
cc.border = box; cc.alignment = center
ws.cell(row=r, column=4 + len(metrics), value="").border = box
r += 1
data_end = max(r - 1, hr + 1)
ws.column_dimensions["A"].width = 9
ws.column_dimensions["B"].width = 10
ws.column_dimensions["C"].width = 8
for mi in range(len(metrics)):
ws.column_dimensions[get_column_letter(4 + mi)].width = 11
ws.column_dimensions[get_column_letter(4 + len(metrics))].width = 22
# --- chart ---
if loc.interval_series and metrics:
chart = LineChart()
chart.title = f"{loc.location_name}{loc.night_date:%m/%d/%y}"
chart.y_axis.title = "dBA"; chart.x_axis.title = "Time"
chart.height = 9; chart.width = 18
data_ref = Reference(ws, min_col=4, max_col=3 + len(metrics), min_row=hr, max_row=data_end)
cats = Reference(ws, min_col=3, min_row=hr + 1, max_row=data_end)
chart.add_data(data_ref, titles_from_data=True)
chart.set_categories(cats)
ws.add_chart(chart, f"{get_column_letter(6 + len(metrics))}5")
# --- summary: Metric × window (Last / Base / Δ) ---
sr = data_end + 3
ws.cell(row=sr, column=1, value="Summary — last night vs baseline").font = f_h
sr += 1
ws.cell(row=sr, column=1, value="Metric").font = f_h
win_col = {}
col = 2
for w in loc.windows:
c = ws.cell(row=sr, column=col, value=w.label); c.font = f_h; c.alignment = center
ws.merge_cells(start_row=sr, start_column=col, end_row=sr, end_column=col + 2)
win_col[w.key] = col
col += 3
sr += 1
for w in loc.windows:
b = win_col[w.key]
for j, lbl in enumerate(["Last", "Base", "Δ"]):
cc = ws.cell(row=sr, column=b + j, value=lbl); cc.font = f_h; cc.alignment = center
sr += 1
for m in metrics:
ws.cell(row=sr, column=1, value=m.label).font = f_d
for w in loc.windows:
cp = loc.table[w.key][m.key]
b = win_col[w.key]
ws.cell(row=sr, column=b + 0, value=cp.last_night).alignment = center
ws.cell(row=sr, column=b + 1, value=cp.baseline).alignment = center
ws.cell(row=sr, column=b + 2, value=cp.delta).alignment = center
sr += 1
if loc.notes:
ws.cell(row=sr + 1, column=1, value="; ".join(loc.notes)).font = f_note
out = _io.BytesIO()
wb.save(out)
return out.getvalue()
+187
View File
@@ -78,6 +78,9 @@ class SchedulerService:
# Execute pending actions
await self.execute_pending_actions()
# Run any due nightly sound reports (FTP report pipeline)
await self.run_due_reports()
# Generate actions from recurring schedules (every hour)
now = datetime.utcnow()
if (now - last_generation_check).total_seconds() >= 3600:
@@ -633,6 +636,46 @@ class SchedulerService:
)
result["old_session_id"] = active_session.id
# Step 4b: Ingest the just-finished Auto_#### folder into Terra-View
# (clean session + DataFiles via ingest_nrl_zip — filters Lp, parses the
# .rnh, dedups). This is what gives the nightly report its data.
if action.device_type == "slm" and result["steps"].get("download", {}).get("success"):
idx = None
try:
idx = int((result["steps"]["download"].get("response") or {}).get("index_number"))
except (ValueError, TypeError):
idx = None
if idx is None:
result["steps"]["ingest"] = {"success": False, "error": "no index_number from download"}
else:
folder_name = f"Auto_{idx:04d}"
try:
ing = await self._ingest_cycle_folder(db, action.location_id, unit_id, folder_name)
result["steps"]["ingest"] = ing
db.commit()
if ing.get("success"):
from backend.models import DataFile
sid = ing.get("session_id")
# ingest_nrl_zip leaves unit_id None — tie the data session to the
# unit that recorded it so it stays linked after we drop the placeholder.
if sid:
s = db.query(MonitoringSession).filter_by(id=sid).first()
if s and not s.unit_id:
s.unit_id = unit_id
db.commit()
# The just-closed "recording" session was only a marker; its data now
# lives in the ingested (unit-linked) session. Drop the empty placeholder
# and repoint old_session_id at the real row.
if active_session and db.query(DataFile).filter_by(session_id=active_session.id).count() == 0:
if sid:
result["old_session_id"] = sid
db.delete(active_session)
db.commit()
logger.info(f"[CYCLE] Ingested {folder_name}: {ing}")
except Exception as e:
logger.error(f"[CYCLE] Ingest failed for {folder_name}: {e}", exc_info=True)
result["steps"]["ingest"] = {"success": False, "error": str(e)}
# Step 5: Wait for device to settle before starting new measurement
logger.info(f"[CYCLE] Step 5/7: Waiting 30s for device to settle...")
await asyncio.sleep(30)
@@ -667,6 +710,33 @@ class SchedulerService:
logger.info(f"[CYCLE] New measurement started, session {new_session.id}")
# Step 6b: Verify the meter actually resumed measuring (fresh DOD).
# Polling is still paused here, so query directly. Advisory: a
# failure alerts loudly but doesn't fail the cycle (DOD reads can
# be transiently flaky); the keepalive poll re-confirms within ~10s.
if action.device_type == "slm":
try:
await asyncio.sleep(2)
live = await self.device_controller.get_live_data(unit_id, action.device_type)
state = ((live or {}).get("measurement_state")
or ((live or {}).get("data") or {}).get("measurement_state") or "")
measuring = str(state).strip().lower() in ("start", "measure", "measuring", "run", "running")
result["steps"]["restart_verified"] = measuring
if measuring:
logger.info(f"[CYCLE] Restart verified — {unit_id} is measuring (state={state}).")
else:
logger.error(f"[CYCLE] Restart NOT verified for {unit_id} — state={state!r}")
try:
get_alert_service(db).create_schedule_failed_alert(
schedule_id=action.id, action_type="cycle", unit_id=unit_id,
error_message=f"Meter did not resume measuring after the cycle (state={state!r}).",
project_id=action.project_id, location_id=action.location_id,
)
except Exception as ae:
logger.warning(f"[CYCLE] restart-verify alert failed: {ae}")
except Exception as e:
logger.warning(f"[CYCLE] Restart verification skipped (DOD read failed): {e}")
except Exception as e:
logger.error(f"[CYCLE] Start failed: {e}")
result["steps"]["start"] = {"success": False, "error": str(e)}
@@ -689,6 +759,37 @@ class SchedulerService:
logger.info(f"[CYCLE] === Cycle complete for {unit_id} ===")
return result
async def _ingest_cycle_folder(self, db, location_id: str, unit_id: str, folder_name: str) -> dict:
"""Fetch a just-finished Auto_#### folder from SLMM (FTP proxy) and ingest
it into Terra-View (clean MonitoringSession + DataFiles via ingest_nrl_zip).
Returns the ingest result dict, or {"success": False, "error": ...}.
Used by _execute_cycle Step 4b.
"""
import os
import httpx
from backend.routers.project_locations import ingest_nrl_zip, IngestError
slmm_base = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
remote_path = f"/NL-43/{folder_name}"
try:
async with httpx.AsyncClient(timeout=600.0) as client:
resp = await client.post(
f"{slmm_base}/api/nl43/{unit_id}/ftp/download-folder",
json={"remote_path": remote_path},
)
except Exception as e:
return {"success": False, "error": f"download-folder request failed: {e}"}
if not resp.is_success or len(resp.content) <= 22: # 22 bytes = empty-zip
return {"success": False, "error": f"empty/failed ZIP from SLMM (status {resp.status_code})"}
try:
res = ingest_nrl_zip(location_id, resp.content, db, source="ftp_cycle", dedupe=True)
return {"success": True, **res}
except IngestError as e:
return {"success": False, "error": str(e)}
# ========================================================================
# Recurring Schedule Generation
# ========================================================================
@@ -782,6 +883,92 @@ class SchedulerService:
return cleaned
# ========================================================================
# Nightly Sound Report (FTP report pipeline)
# ========================================================================
async def run_due_reports(self):
"""Run any project nightly sound reports that are due.
For each enabled SoundReportConfig: if local time is past report_time
and we haven't already reported last night, build the report (writes a
file always; emails if SMTP is configured, else dry-run) and stamp
last_run_date. Idempotent across restarts via last_run_date.
"""
from backend.models import SoundReportConfig
from backend.utils.timezone import utc_to_local
# Decide what's due (cheap, on the loop); run each OFF the event loop.
due_jobs = []
db = SessionLocal()
try:
configs = db.query(SoundReportConfig).filter_by(enabled=True).all()
if not configs:
return
local_now = utc_to_local(datetime.utcnow())
night_date = local_now.date() - timedelta(days=1) # last night's evening date
for cfg in configs:
try:
hh, mm = (int(x) for x in cfg.report_time.split(":"))
except (ValueError, AttributeError):
hh, mm = 8, 0
if (local_now.hour, local_now.minute) < (hh, mm):
continue
if cfg.last_run_date == night_date:
continue
due_jobs.append({
"project_id": cfg.project_id,
"metric_keys": [m.strip() for m in (cfg.metric_keys or "").split(",") if m.strip()] or None,
"recipients": [r.strip() for r in (cfg.recipients or "").split(",") if r.strip()] or None,
"baseline_mode": cfg.baseline_mode,
"baseline_start": cfg.baseline_start,
"baseline_end": cfg.baseline_end,
})
finally:
db.close()
# run_nightly_report is synchronous (blocking file I/O + smtplib up to the
# SMTP timeout). Run it in a worker thread so it never stalls the scheduler
# loop (which also drives time-sensitive device cycles).
for job in due_jobs:
try:
logger.info(f"[REPORT] Running nightly report for project {job['project_id']} (night {night_date})")
result = await asyncio.to_thread(self._run_one_report, night_date, job)
email = (result or {}).get("email", {})
logger.info(
f"[REPORT] project {job['project_id']}: {(result or {}).get('location_count')} location(s); "
f"email={'sent' if email.get('sent') else ('dry-run' if email.get('dry_run') else (email.get('error') or 'skipped'))}"
)
except Exception as e:
logger.error(f"[REPORT] Failed nightly report for project {job['project_id']}: {e}", exc_info=True)
def _run_one_report(self, night_date, job) -> Dict[str, Any]:
"""Sync worker: build/send one project's report and stamp last_run_date.
Uses its own DB session (runs in a thread, off the event loop)."""
from backend.models import SoundReportConfig
from backend.services.report_orchestrator import run_nightly_report
db = SessionLocal()
try:
result = run_nightly_report(
db, job["project_id"], night_date,
metric_keys=job["metric_keys"],
baseline_mode=job["baseline_mode"],
baseline_start=job["baseline_start"],
baseline_end=job["baseline_end"],
recipients=job["recipients"],
)
cfg = db.query(SoundReportConfig).filter_by(project_id=job["project_id"]).first()
if cfg:
cfg.last_run_date = night_date
db.commit()
return result
except Exception:
db.rollback()
raise
finally:
db.close()
# ========================================================================
# Manual Execution (for testing/debugging)
# ========================================================================