fix(reports): derive session recording window from the Leq rows

The NL-43 .rnh carries no measurement timestamps, so _ingest_file_entries was stamping every session with utcnow() and no duration. Derive started_at/stopped_at/duration from the Leq .rnd 'Start Time' column when the header lacks them (interval from the .rnh, else inferred from row spacing). Adds an optional unit_id so callers that know the recording unit attribute the session at creation, and returns duration_seconds.

Side effect: NL-43 dedupe now works (it keyed on a previously-empty start_time_str). Affects all ingest paths: manual upload, FTP cycle, stop, download, and manual FTP download.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-15 18:12:15 +00:00
parent abcfba179f
commit 2ecf1f54d5
+81 -4
View File
@@ -9,7 +9,7 @@ from fastapi import APIRouter, Request, Depends, HTTPException, Query
from fastapi.responses import HTMLResponse, JSONResponse
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from datetime import datetime
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from typing import Optional
import uuid
@@ -1753,6 +1753,48 @@ def _classify_file(filename: str) -> str:
return "data"
def _rnd_interval_seconds(s: Optional[str]) -> Optional[int]:
"""Parse an NL-43 interval string ('15m' / '1s' / '1h') into seconds."""
import re
m = re.match(r"\s*(\d+)\s*([smh])", (s or "").strip().lower())
if not m:
return None
return int(m.group(1)) * {"s": 1, "m": 60, "h": 3600}[m.group(2)]
def _leq_window_local(leq_bytes: bytes):
"""Recording window from a Leq .rnd's 'Start Time' column (meter-local time).
Returns (first_start, last_start, row_count, inferred_interval_seconds).
This is the source of truth for the recording window on NL-43 units, whose
.rnh carries no measurement timestamps. Reuses the report's AU2 normaliser
so NL-43 and AU2 files parse identically.
"""
import csv as _csv
from backend.routers.projects import _normalize_rnd_rows # lazy: avoid import cycle
try:
text = leq_bytes.decode("utf-8", errors="replace")
rows = list(_csv.DictReader(io.StringIO(text)))
except Exception:
return None, None, 0, None
try:
rows, _ = _normalize_rnd_rows(rows)
except Exception:
pass
times = []
for r in rows:
v = (r.get("Start Time") or "").strip()
try:
times.append(datetime.strptime(v, "%Y/%m/%d %H:%M:%S"))
except (ValueError, TypeError):
continue
if not times:
return None, None, 0, None
times.sort()
inferred = int((times[1] - times[0]).total_seconds()) if len(times) >= 2 else None
return times[0], times[-1], len(times), inferred
def _is_wanted_nrl_file(fname: str) -> bool:
"""Keep only the files an NRL ingest cares about: .rnh metadata + the
averaged Leq .rnd. Drops the 1-second _Lp_ files and everything else.
@@ -1832,6 +1874,7 @@ def _ingest_file_entries(
*,
source: str = "manual_upload",
dedupe: bool = False,
unit_id: Optional[str] = None,
) -> dict:
"""Core NRL ingest, shared by the HTTP upload and the programmatic FTP pull.
@@ -1840,6 +1883,10 @@ def _ingest_file_entries(
location's project. Metric-agnostic: the full Leq file is written to disk
and every column preserved; metric selection happens in the report layer.
`unit_id` attributes the session to the recording unit when the caller knows
it (manual FTP download / SD upload from a known unit). Left None for paths
that link the unit afterwards (the scheduler's `_ingest_and_link`).
Raises IngestError if no usable files are present.
"""
# --- Filter to the files we keep (.rnh + Leq .rnd) ---
@@ -1872,6 +1919,32 @@ def _ingest_file_entries(
index_number = rnh_meta.get("index_number", "")
start_time_str = rnh_meta.get("start_time_str", "")
# The NL-43 .rnh has NO measurement timestamps — the real recording window
# lives in the Leq .rnd's "Start Time" column. Whenever the header didn't
# give us a start (and/or stop), derive it from the Leq rows so the session
# gets the true window + duration (and a stable start_time_str for dedupe).
if not start_time_str or stopped_at_local is None:
leq_entry = next(
((f, b) for f, b in file_entries
if f.lower().endswith(".rnd") and ("_leq_" in f.lower() or f.lower().startswith("au2_"))),
None,
)
if leq_entry is not None:
first_dt, last_dt, _n, inferred = _leq_window_local(leq_entry[1])
interval_s = _rnd_interval_seconds(rnh_meta.get("leq_interval")) or inferred or 0
if first_dt and not start_time_str:
started_at_local = first_dt
start_time_str = first_dt.strftime("%Y/%m/%d %H:%M:%S")
if last_dt and stopped_at_local is None:
stopped_at_local = last_dt + timedelta(seconds=interval_s)
# Recompute UTC + duration from the resolved window.
started_at = local_to_utc(started_at_local)
stopped_at = local_to_utc(stopped_at_local) if stopped_at_local else None
duration_seconds = (
int((stopped_at - started_at).total_seconds())
if (started_at and stopped_at) else duration_seconds
)
# --- Dedupe: skip if this exact measurement is already ingested ---
if dedupe:
existing = _find_existing_session(db, location.id, store_name, started_at, start_time_str)
@@ -1887,6 +1960,7 @@ def _ingest_file_entries(
"store_name": store_name,
"started_at": started_at.isoformat() if started_at else None,
"stopped_at": stopped_at.isoformat() if stopped_at else None,
"duration_seconds": duration_seconds,
}
# --- Create MonitoringSession (local times drive period/label) ---
@@ -1901,7 +1975,7 @@ def _ingest_file_entries(
id=session_id,
project_id=location.project_id,
location_id=location.id,
unit_id=None,
unit_id=unit_id,
session_type="sound",
started_at=started_at,
stopped_at=stopped_at,
@@ -1976,6 +2050,7 @@ def _ingest_file_entries(
"store_name": store_name,
"started_at": started_at.isoformat() if started_at else None,
"stopped_at": stopped_at.isoformat() if stopped_at else None,
"duration_seconds": duration_seconds,
}
@@ -1986,13 +2061,15 @@ def ingest_nrl_zip(
*,
source: str = "ftp_pull",
dedupe: bool = True,
unit_id: Optional[str] = None,
) -> dict:
"""Programmatically ingest an Auto_#### ZIP (e.g. a scheduled FTP pull).
Extracts the ZIP (flattening any nested Auto_Leq/Auto_Lp_ folders), keeps
the .rnh + Leq .rnd, parses the header, and creates a MonitoringSession +
DataFile rows for `location_id`. Defaults to dedupe=True so repeated daily
pulls of the same closed folder don't create duplicate sessions.
pulls of the same closed folder don't create duplicate sessions. Pass
`unit_id` to attribute the session to the recording unit at creation.
Returns the same dict shape as the HTTP upload, plus a `deduped` flag.
Raises IngestError on a bad ZIP, no usable files, or unknown location.
@@ -2014,7 +2091,7 @@ def ingest_nrl_zip(
except zipfile.BadZipFile:
raise IngestError("Downloaded data is not a valid ZIP archive.")
return _ingest_file_entries(location, file_entries, db, source=source, dedupe=dedupe)
return _ingest_file_entries(location, file_entries, db, source=source, dedupe=dedupe, unit_id=unit_id)
@router.post("/nrl/{location_id}/upload-data")