diff --git a/backend/routers/project_locations.py b/backend/routers/project_locations.py index b1c38f8..1074a5a 100644 --- a/backend/routers/project_locations.py +++ b/backend/routers/project_locations.py @@ -9,7 +9,7 @@ from fastapi import APIRouter, Request, Depends, HTTPException, Query from fastapi.responses import HTMLResponse, JSONResponse from sqlalchemy.orm import Session from sqlalchemy import and_, or_ -from datetime import datetime +from datetime import datetime, timedelta from zoneinfo import ZoneInfo from typing import Optional import uuid @@ -1753,6 +1753,48 @@ def _classify_file(filename: str) -> str: return "data" +def _rnd_interval_seconds(s: Optional[str]) -> Optional[int]: + """Parse an NL-43 interval string ('15m' / '1s' / '1h') into seconds.""" + import re + m = re.match(r"\s*(\d+)\s*([smh])", (s or "").strip().lower()) + if not m: + return None + return int(m.group(1)) * {"s": 1, "m": 60, "h": 3600}[m.group(2)] + + +def _leq_window_local(leq_bytes: bytes): + """Recording window from a Leq .rnd's 'Start Time' column (meter-local time). + + Returns (first_start, last_start, row_count, inferred_interval_seconds). + This is the source of truth for the recording window on NL-43 units, whose + .rnh carries no measurement timestamps. Reuses the report's AU2 normaliser + so NL-43 and AU2 files parse identically. + """ + import csv as _csv + from backend.routers.projects import _normalize_rnd_rows # lazy: avoid import cycle + try: + text = leq_bytes.decode("utf-8", errors="replace") + rows = list(_csv.DictReader(io.StringIO(text))) + except Exception: + return None, None, 0, None + try: + rows, _ = _normalize_rnd_rows(rows) + except Exception: + pass + times = [] + for r in rows: + v = (r.get("Start Time") or "").strip() + try: + times.append(datetime.strptime(v, "%Y/%m/%d %H:%M:%S")) + except (ValueError, TypeError): + continue + if not times: + return None, None, 0, None + times.sort() + inferred = int((times[1] - times[0]).total_seconds()) if len(times) >= 2 else None + return times[0], times[-1], len(times), inferred + + def _is_wanted_nrl_file(fname: str) -> bool: """Keep only the files an NRL ingest cares about: .rnh metadata + the averaged Leq .rnd. Drops the 1-second _Lp_ files and everything else. @@ -1832,6 +1874,7 @@ def _ingest_file_entries( *, source: str = "manual_upload", dedupe: bool = False, + unit_id: Optional[str] = None, ) -> dict: """Core NRL ingest, shared by the HTTP upload and the programmatic FTP pull. @@ -1840,6 +1883,10 @@ def _ingest_file_entries( location's project. Metric-agnostic: the full Leq file is written to disk and every column preserved; metric selection happens in the report layer. + `unit_id` attributes the session to the recording unit when the caller knows + it (manual FTP download / SD upload from a known unit). Left None for paths + that link the unit afterwards (the scheduler's `_ingest_and_link`). + Raises IngestError if no usable files are present. """ # --- Filter to the files we keep (.rnh + Leq .rnd) --- @@ -1872,6 +1919,32 @@ def _ingest_file_entries( index_number = rnh_meta.get("index_number", "") start_time_str = rnh_meta.get("start_time_str", "") + # The NL-43 .rnh has NO measurement timestamps — the real recording window + # lives in the Leq .rnd's "Start Time" column. Whenever the header didn't + # give us a start (and/or stop), derive it from the Leq rows so the session + # gets the true window + duration (and a stable start_time_str for dedupe). + if not start_time_str or stopped_at_local is None: + leq_entry = next( + ((f, b) for f, b in file_entries + if f.lower().endswith(".rnd") and ("_leq_" in f.lower() or f.lower().startswith("au2_"))), + None, + ) + if leq_entry is not None: + first_dt, last_dt, _n, inferred = _leq_window_local(leq_entry[1]) + interval_s = _rnd_interval_seconds(rnh_meta.get("leq_interval")) or inferred or 0 + if first_dt and not start_time_str: + started_at_local = first_dt + start_time_str = first_dt.strftime("%Y/%m/%d %H:%M:%S") + if last_dt and stopped_at_local is None: + stopped_at_local = last_dt + timedelta(seconds=interval_s) + # Recompute UTC + duration from the resolved window. + started_at = local_to_utc(started_at_local) + stopped_at = local_to_utc(stopped_at_local) if stopped_at_local else None + duration_seconds = ( + int((stopped_at - started_at).total_seconds()) + if (started_at and stopped_at) else duration_seconds + ) + # --- Dedupe: skip if this exact measurement is already ingested --- if dedupe: existing = _find_existing_session(db, location.id, store_name, started_at, start_time_str) @@ -1887,6 +1960,7 @@ def _ingest_file_entries( "store_name": store_name, "started_at": started_at.isoformat() if started_at else None, "stopped_at": stopped_at.isoformat() if stopped_at else None, + "duration_seconds": duration_seconds, } # --- Create MonitoringSession (local times drive period/label) --- @@ -1901,7 +1975,7 @@ def _ingest_file_entries( id=session_id, project_id=location.project_id, location_id=location.id, - unit_id=None, + unit_id=unit_id, session_type="sound", started_at=started_at, stopped_at=stopped_at, @@ -1976,6 +2050,7 @@ def _ingest_file_entries( "store_name": store_name, "started_at": started_at.isoformat() if started_at else None, "stopped_at": stopped_at.isoformat() if stopped_at else None, + "duration_seconds": duration_seconds, } @@ -1986,13 +2061,15 @@ def ingest_nrl_zip( *, source: str = "ftp_pull", dedupe: bool = True, + unit_id: Optional[str] = None, ) -> dict: """Programmatically ingest an Auto_#### ZIP (e.g. a scheduled FTP pull). Extracts the ZIP (flattening any nested Auto_Leq/Auto_Lp_ folders), keeps the .rnh + Leq .rnd, parses the header, and creates a MonitoringSession + DataFile rows for `location_id`. Defaults to dedupe=True so repeated daily - pulls of the same closed folder don't create duplicate sessions. + pulls of the same closed folder don't create duplicate sessions. Pass + `unit_id` to attribute the session to the recording unit at creation. Returns the same dict shape as the HTTP upload, plus a `deduped` flag. Raises IngestError on a bad ZIP, no usable files, or unknown location. @@ -2014,7 +2091,7 @@ def ingest_nrl_zip( except zipfile.BadZipFile: raise IngestError("Downloaded data is not a valid ZIP archive.") - return _ingest_file_entries(location, file_entries, db, source=source, dedupe=dedupe) + return _ingest_file_entries(location, file_entries, db, source=source, dedupe=dedupe, unit_id=unit_id) @router.post("/nrl/{location_id}/upload-data")