diff --git a/backend/routers/project_locations.py b/backend/routers/project_locations.py index b1c38f8..1074a5a 100644 --- a/backend/routers/project_locations.py +++ b/backend/routers/project_locations.py @@ -9,7 +9,7 @@ from fastapi import APIRouter, Request, Depends, HTTPException, Query from fastapi.responses import HTMLResponse, JSONResponse from sqlalchemy.orm import Session from sqlalchemy import and_, or_ -from datetime import datetime +from datetime import datetime, timedelta from zoneinfo import ZoneInfo from typing import Optional import uuid @@ -1753,6 +1753,48 @@ def _classify_file(filename: str) -> str: return "data" +def _rnd_interval_seconds(s: Optional[str]) -> Optional[int]: + """Parse an NL-43 interval string ('15m' / '1s' / '1h') into seconds.""" + import re + m = re.match(r"\s*(\d+)\s*([smh])", (s or "").strip().lower()) + if not m: + return None + return int(m.group(1)) * {"s": 1, "m": 60, "h": 3600}[m.group(2)] + + +def _leq_window_local(leq_bytes: bytes): + """Recording window from a Leq .rnd's 'Start Time' column (meter-local time). + + Returns (first_start, last_start, row_count, inferred_interval_seconds). + This is the source of truth for the recording window on NL-43 units, whose + .rnh carries no measurement timestamps. Reuses the report's AU2 normaliser + so NL-43 and AU2 files parse identically. + """ + import csv as _csv + from backend.routers.projects import _normalize_rnd_rows # lazy: avoid import cycle + try: + text = leq_bytes.decode("utf-8", errors="replace") + rows = list(_csv.DictReader(io.StringIO(text))) + except Exception: + return None, None, 0, None + try: + rows, _ = _normalize_rnd_rows(rows) + except Exception: + pass + times = [] + for r in rows: + v = (r.get("Start Time") or "").strip() + try: + times.append(datetime.strptime(v, "%Y/%m/%d %H:%M:%S")) + except (ValueError, TypeError): + continue + if not times: + return None, None, 0, None + times.sort() + inferred = int((times[1] - times[0]).total_seconds()) if len(times) >= 2 else None + return times[0], times[-1], len(times), inferred + + def _is_wanted_nrl_file(fname: str) -> bool: """Keep only the files an NRL ingest cares about: .rnh metadata + the averaged Leq .rnd. Drops the 1-second _Lp_ files and everything else. @@ -1832,6 +1874,7 @@ def _ingest_file_entries( *, source: str = "manual_upload", dedupe: bool = False, + unit_id: Optional[str] = None, ) -> dict: """Core NRL ingest, shared by the HTTP upload and the programmatic FTP pull. @@ -1840,6 +1883,10 @@ def _ingest_file_entries( location's project. Metric-agnostic: the full Leq file is written to disk and every column preserved; metric selection happens in the report layer. + `unit_id` attributes the session to the recording unit when the caller knows + it (manual FTP download / SD upload from a known unit). Left None for paths + that link the unit afterwards (the scheduler's `_ingest_and_link`). + Raises IngestError if no usable files are present. """ # --- Filter to the files we keep (.rnh + Leq .rnd) --- @@ -1872,6 +1919,32 @@ def _ingest_file_entries( index_number = rnh_meta.get("index_number", "") start_time_str = rnh_meta.get("start_time_str", "") + # The NL-43 .rnh has NO measurement timestamps — the real recording window + # lives in the Leq .rnd's "Start Time" column. Whenever the header didn't + # give us a start (and/or stop), derive it from the Leq rows so the session + # gets the true window + duration (and a stable start_time_str for dedupe). + if not start_time_str or stopped_at_local is None: + leq_entry = next( + ((f, b) for f, b in file_entries + if f.lower().endswith(".rnd") and ("_leq_" in f.lower() or f.lower().startswith("au2_"))), + None, + ) + if leq_entry is not None: + first_dt, last_dt, _n, inferred = _leq_window_local(leq_entry[1]) + interval_s = _rnd_interval_seconds(rnh_meta.get("leq_interval")) or inferred or 0 + if first_dt and not start_time_str: + started_at_local = first_dt + start_time_str = first_dt.strftime("%Y/%m/%d %H:%M:%S") + if last_dt and stopped_at_local is None: + stopped_at_local = last_dt + timedelta(seconds=interval_s) + # Recompute UTC + duration from the resolved window. + started_at = local_to_utc(started_at_local) + stopped_at = local_to_utc(stopped_at_local) if stopped_at_local else None + duration_seconds = ( + int((stopped_at - started_at).total_seconds()) + if (started_at and stopped_at) else duration_seconds + ) + # --- Dedupe: skip if this exact measurement is already ingested --- if dedupe: existing = _find_existing_session(db, location.id, store_name, started_at, start_time_str) @@ -1887,6 +1960,7 @@ def _ingest_file_entries( "store_name": store_name, "started_at": started_at.isoformat() if started_at else None, "stopped_at": stopped_at.isoformat() if stopped_at else None, + "duration_seconds": duration_seconds, } # --- Create MonitoringSession (local times drive period/label) --- @@ -1901,7 +1975,7 @@ def _ingest_file_entries( id=session_id, project_id=location.project_id, location_id=location.id, - unit_id=None, + unit_id=unit_id, session_type="sound", started_at=started_at, stopped_at=stopped_at, @@ -1976,6 +2050,7 @@ def _ingest_file_entries( "store_name": store_name, "started_at": started_at.isoformat() if started_at else None, "stopped_at": stopped_at.isoformat() if stopped_at else None, + "duration_seconds": duration_seconds, } @@ -1986,13 +2061,15 @@ def ingest_nrl_zip( *, source: str = "ftp_pull", dedupe: bool = True, + unit_id: Optional[str] = None, ) -> dict: """Programmatically ingest an Auto_#### ZIP (e.g. a scheduled FTP pull). Extracts the ZIP (flattening any nested Auto_Leq/Auto_Lp_ folders), keeps the .rnh + Leq .rnd, parses the header, and creates a MonitoringSession + DataFile rows for `location_id`. Defaults to dedupe=True so repeated daily - pulls of the same closed folder don't create duplicate sessions. + pulls of the same closed folder don't create duplicate sessions. Pass + `unit_id` to attribute the session to the recording unit at creation. Returns the same dict shape as the HTTP upload, plus a `deduped` flag. Raises IngestError on a bad ZIP, no usable files, or unknown location. @@ -2014,7 +2091,7 @@ def ingest_nrl_zip( except zipfile.BadZipFile: raise IngestError("Downloaded data is not a valid ZIP archive.") - return _ingest_file_entries(location, file_entries, db, source=source, dedupe=dedupe) + return _ingest_file_entries(location, file_entries, db, source=source, dedupe=dedupe, unit_id=unit_id) @router.post("/nrl/{location_id}/upload-data") diff --git a/backend/routers/projects.py b/backend/routers/projects.py index 3c61236..b1407de 100644 --- a/backend/routers/projects.py +++ b/backend/routers/projects.py @@ -1591,24 +1591,32 @@ async def get_sessions_calendar( async def get_ftp_browser( project_id: str, request: Request, + location_id: Optional[str] = None, db: Session = Depends(get_db), ): """ Get FTP browser interface for downloading files from assigned SLMs. Returns HTML partial with FTP browser. Sound Monitoring projects only. + + When `location_id` is given, scope to just the unit(s) assigned to that NRL + (used by the per-NRL Data Files tab, which mirrors the project-wide tab). """ from backend.models import DataFile project = db.query(Project).filter_by(id=project_id).first() _require_module(project, "sound_monitoring", db) - # Get all assignments for this project (active = assigned_until IS NULL) - assignments = db.query(UnitAssignment).filter( + # Active assignments for this project (active = assigned_until IS NULL), + # optionally scoped to a single NRL/location. + q = db.query(UnitAssignment).filter( and_( UnitAssignment.project_id == project_id, UnitAssignment.assigned_until == None, ) - ).all() + ) + if location_id: + q = q.filter(UnitAssignment.location_id == location_id) + assignments = q.all() # Enrich with unit and location details units_data = [] @@ -1638,9 +1646,13 @@ async def ftp_download_to_server( db: Session = Depends(get_db), ): """ - Download a file from an SLM to the server via FTP. - Creates a DataFile record and stores the file in data/Projects/{project_id}/ - Sound Monitoring projects only. + Download a single file from an SLM to the server via FTP. + + NRL measurement files (.rnh / _Leq_ .rnd) are routed through the shared NRL + ingest so the session is parsed and attributed to the unit (a lone .rnh still + yields the real recording window + duration). Any other file type — or a + unit with no location — falls back to a generic stored DataFile, preserving + the original behaviour. Sound Monitoring projects only. """ import httpx import os @@ -1658,7 +1670,55 @@ async def ftp_download_to_server( if not unit_id or not remote_path: raise HTTPException(status_code=400, detail="Missing unit_id or remote_path") - # Get or create active session for this location/unit + filename = os.path.basename(remote_path) + + # Download the file from SLMM + SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100") + try: + async with httpx.AsyncClient(timeout=300.0) as client: + response = await client.post( + f"{SLMM_BASE_URL}/api/nl43/{unit_id}/ftp/download", + json={"remote_path": remote_path} + ) + except httpx.TimeoutException: + raise HTTPException(status_code=504, detail="Timeout downloading file from SLM") + except Exception as e: + logger.error(f"Error reaching SLMM for file download: {e}") + raise HTTPException(status_code=502, detail=f"Failed to reach SLMM: {str(e)}") + + if not response.is_success: + raise HTTPException( + status_code=response.status_code, + detail=f"Failed to download from SLMM: {response.text}", + ) + file_content = response.content + + # NRL measurement file + known location → shared ingest (parsed + attributed). + from backend.routers.project_locations import ( + _ingest_file_entries, IngestError, _is_wanted_nrl_file, + ) + if location_id and _is_wanted_nrl_file(filename): + location = db.query(MonitoringLocation).filter_by(id=location_id).first() + if location: + try: + result = _ingest_file_entries( + location, [(filename, file_content)], db, + source="ftp_manual", dedupe=False, unit_id=unit_id, + ) + except IngestError as e: + raise HTTPException(status_code=400, detail=str(e)) + return { + "success": True, + "message": f"Imported {filename} as NRL measurement data", + "ingested": True, + "session_id": result["session_id"], + "file_size": len(file_content), + "started_at": result["started_at"], + "stopped_at": result["stopped_at"], + "duration_seconds": result["duration_seconds"], + } + + # --- Generic path: any other file type (or no location) — store as-is --- session = db.query(MonitoringSession).filter( and_( MonitoringSession.project_id == project_id, @@ -1668,7 +1728,6 @@ async def ftp_download_to_server( ) ).first() - # If no active session, create one if not session: _ftp_unit = db.query(RosterUnit).filter_by(id=unit_id).first() session = MonitoringSession( @@ -1687,115 +1746,50 @@ async def ftp_download_to_server( db.commit() db.refresh(session) - # Download file from SLMM - SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100") + ext = os.path.splitext(filename)[1].lower() + file_type_map = { + '.wav': 'audio', '.mp3': 'audio', '.flac': 'audio', '.m4a': 'audio', '.aac': 'audio', + '.rnd': 'measurement', + '.csv': 'data', '.txt': 'data', '.json': 'data', '.xml': 'data', '.dat': 'data', + '.log': 'log', + '.zip': 'archive', '.tar': 'archive', '.gz': 'archive', '.7z': 'archive', '.rar': 'archive', + '.jpg': 'image', '.jpeg': 'image', '.png': 'image', '.gif': 'image', + '.pdf': 'document', '.doc': 'document', '.docx': 'document', + } + file_type = file_type_map.get(ext, 'data') - try: - async with httpx.AsyncClient(timeout=300.0) as client: - response = await client.post( - f"{SLMM_BASE_URL}/api/nl43/{unit_id}/ftp/download", - json={"remote_path": remote_path} - ) + project_dir = Path(f"data/Projects/{project_id}/{session.id}") + project_dir.mkdir(parents=True, exist_ok=True) + file_path = project_dir / filename + with open(file_path, 'wb') as f: + f.write(file_content) + checksum = hashlib.sha256(file_content).hexdigest() - if not response.is_success: - raise HTTPException( - status_code=response.status_code, - detail=f"Failed to download from SLMM: {response.text}" - ) + data_file = DataFile( + id=str(uuid.uuid4()), + session_id=session.id, + file_path=str(file_path.relative_to("data")), # Store relative to data/ + file_type=file_type, + file_size_bytes=len(file_content), + downloaded_at=datetime.utcnow(), + checksum=checksum, + file_metadata=json.dumps({ + "source": "ftp", + "remote_path": remote_path, + "unit_id": unit_id, + "location_id": location_id, + }) + ) + db.add(data_file) + db.commit() - # Extract filename from remote_path - filename = os.path.basename(remote_path) - - # Determine file type from extension - ext = os.path.splitext(filename)[1].lower() - file_type_map = { - # Audio files - '.wav': 'audio', - '.mp3': 'audio', - '.flac': 'audio', - '.m4a': 'audio', - '.aac': 'audio', - # Sound level meter measurement files - '.rnd': 'measurement', - # Data files - '.csv': 'data', - '.txt': 'data', - '.json': 'data', - '.xml': 'data', - '.dat': 'data', - # Log files - '.log': 'log', - # Archives - '.zip': 'archive', - '.tar': 'archive', - '.gz': 'archive', - '.7z': 'archive', - '.rar': 'archive', - # Images - '.jpg': 'image', - '.jpeg': 'image', - '.png': 'image', - '.gif': 'image', - # Documents - '.pdf': 'document', - '.doc': 'document', - '.docx': 'document', - } - file_type = file_type_map.get(ext, 'data') - - # Create directory structure: data/Projects/{project_id}/{session_id}/ - project_dir = Path(f"data/Projects/{project_id}/{session.id}") - project_dir.mkdir(parents=True, exist_ok=True) - - # Save file to disk - file_path = project_dir / filename - file_content = response.content - - with open(file_path, 'wb') as f: - f.write(file_content) - - # Calculate checksum - checksum = hashlib.sha256(file_content).hexdigest() - - # Create DataFile record - data_file = DataFile( - id=str(uuid.uuid4()), - session_id=session.id, - file_path=str(file_path.relative_to("data")), # Store relative to data/ - file_type=file_type, - file_size_bytes=len(file_content), - downloaded_at=datetime.utcnow(), - checksum=checksum, - file_metadata=json.dumps({ - "source": "ftp", - "remote_path": remote_path, - "unit_id": unit_id, - "location_id": location_id, - }) - ) - - db.add(data_file) - db.commit() - - return { - "success": True, - "message": f"Downloaded {filename} to server", - "file_id": data_file.id, - "file_path": str(file_path), - "file_size": len(file_content), - } - - except httpx.TimeoutException: - raise HTTPException( - status_code=504, - detail="Timeout downloading file from SLM" - ) - except Exception as e: - logger.error(f"Error downloading file to server: {e}") - raise HTTPException( - status_code=500, - detail=f"Failed to download file to server: {str(e)}" - ) + return { + "success": True, + "message": f"Downloaded {filename} to server", + "file_id": data_file.id, + "file_path": str(file_path), + "file_size": len(file_content), + } @router.post("/{project_id}/ftp-download-folder-to-server") @@ -1805,20 +1799,20 @@ async def ftp_download_folder_to_server( db: Session = Depends(get_db), ): """ - Download an entire folder from an SLM to the server via FTP. - Extracts all files from the ZIP and preserves folder structure. - Creates individual DataFile records for each file. - Sound Monitoring projects only. + Download an entire Auto_#### measurement folder from an SLM to the server. + + Routes the downloaded ZIP through the shared NRL ingest — the same path the + scheduled FTP pull, the daily cycle, and the manual SD-card upload use. That + means: keep the .rnh + Leq .rnd, parse the header (real recording start/stop + + duration, percentile slot map, weightings), drop the 1-second _Lp_ files, + and create one clean MonitoringSession attributed to the unit. Sound + Monitoring projects only. """ import httpx import os - import hashlib - import zipfile - import io _require_module(db.query(Project).filter_by(id=project_id).first(), "sound_monitoring", db) - from pathlib import Path - from backend.models import DataFile + from backend.routers.project_locations import ingest_nrl_zip, IngestError data = await request.json() unit_id = data.get("unit_id") @@ -1827,160 +1821,66 @@ async def ftp_download_folder_to_server( if not unit_id or not remote_path: raise HTTPException(status_code=400, detail="Missing unit_id or remote_path") - - # Get or create active session for this location/unit - session = db.query(MonitoringSession).filter( - and_( - MonitoringSession.project_id == project_id, - MonitoringSession.location_id == location_id, - MonitoringSession.unit_id == unit_id, - MonitoringSession.status.in_(["recording", "paused"]) + if not location_id: + raise HTTPException( + status_code=400, + detail=("This unit isn't assigned to a monitoring location. Assign it to an " + "NRL first so the downloaded measurement attaches to the right location."), ) - ).first() - # If no active session, create one - if not session: - _ftp_unit = db.query(RosterUnit).filter_by(id=unit_id).first() - session = MonitoringSession( - id=str(uuid.uuid4()), - project_id=project_id, - location_id=location_id, - unit_id=unit_id, - session_type="sound", # SLMs are sound monitoring devices - status="completed", - started_at=datetime.utcnow(), - stopped_at=datetime.utcnow(), - device_model=_ftp_unit.slm_model if _ftp_unit else None, - session_metadata='{"source": "ftp_folder_download", "note": "Auto-created for FTP folder download"}' - ) - db.add(session) - db.commit() - db.refresh(session) - - # Download folder from SLMM (returns ZIP) + # Download the folder from SLMM (returns a ZIP of the Auto_#### folder) SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100") - try: - async with httpx.AsyncClient(timeout=600.0) as client: # Longer timeout for folders + async with httpx.AsyncClient(timeout=600.0) as client: # longer timeout for folders response = await client.post( f"{SLMM_BASE_URL}/api/nl43/{unit_id}/ftp/download-folder", json={"remote_path": remote_path} ) - - if not response.is_success: - raise HTTPException( - status_code=response.status_code, - detail=f"Failed to download folder from SLMM: {response.text}" - ) - - # Extract folder name from remote_path - folder_name = os.path.basename(remote_path.rstrip('/')) - - # Create base directory: data/Projects/{project_id}/{session_id}/{folder_name}/ - base_dir = Path(f"data/Projects/{project_id}/{session.id}/{folder_name}") - base_dir.mkdir(parents=True, exist_ok=True) - - # Extract ZIP and save individual files - zip_content = response.content - created_files = [] - total_size = 0 - - # File type mapping for classification - file_type_map = { - # Audio files - '.wav': 'audio', '.mp3': 'audio', '.flac': 'audio', '.m4a': 'audio', '.aac': 'audio', - # Data files - '.csv': 'data', '.txt': 'data', '.json': 'data', '.xml': 'data', '.dat': 'data', - # Log files - '.log': 'log', - # Archives - '.zip': 'archive', '.tar': 'archive', '.gz': 'archive', '.7z': 'archive', '.rar': 'archive', - # Images - '.jpg': 'image', '.jpeg': 'image', '.png': 'image', '.gif': 'image', - # Documents - '.pdf': 'document', '.doc': 'document', '.docx': 'document', - } - - with zipfile.ZipFile(io.BytesIO(zip_content)) as zf: - for zip_info in zf.filelist: - # Skip directories - if zip_info.is_dir(): - continue - - # Read file from ZIP - file_data = zf.read(zip_info.filename) - - # Determine file path (preserve structure within folder) - # zip_info.filename might be like "Auto_0001/measurement.wav" - file_path = base_dir / zip_info.filename - file_path.parent.mkdir(parents=True, exist_ok=True) - - # Write file to disk - with open(file_path, 'wb') as f: - f.write(file_data) - - # Calculate checksum - checksum = hashlib.sha256(file_data).hexdigest() - - # Determine file type - ext = os.path.splitext(zip_info.filename)[1].lower() - file_type = file_type_map.get(ext, 'data') - - # Create DataFile record - data_file = DataFile( - id=str(uuid.uuid4()), - session_id=session.id, - file_path=str(file_path.relative_to("data")), - file_type=file_type, - file_size_bytes=len(file_data), - downloaded_at=datetime.utcnow(), - checksum=checksum, - file_metadata=json.dumps({ - "source": "ftp_folder", - "remote_path": remote_path, - "unit_id": unit_id, - "location_id": location_id, - "folder_name": folder_name, - "relative_path": zip_info.filename, - }) - ) - - db.add(data_file) - created_files.append({ - "filename": zip_info.filename, - "size": len(file_data), - "type": file_type - }) - total_size += len(file_data) - - db.commit() - - return { - "success": True, - "message": f"Downloaded folder {folder_name} with {len(created_files)} files", - "folder_name": folder_name, - "file_count": len(created_files), - "total_size": total_size, - "files": created_files, - } - except httpx.TimeoutException: raise HTTPException( status_code=504, - detail="Timeout downloading folder from SLM (large folders may take a while)" - ) - except zipfile.BadZipFile: - raise HTTPException( - status_code=500, - detail="Downloaded file is not a valid ZIP archive" + detail="Timeout downloading folder from SLM (large folders may take a while)", ) except Exception as e: - logger.error(f"Error downloading folder to server: {e}") + logger.error(f"Error reaching SLMM for folder download: {e}") + raise HTTPException(status_code=502, detail=f"Failed to reach SLMM: {str(e)}") + + if not response.is_success: raise HTTPException( - status_code=500, - detail=f"Failed to download folder to server: {str(e)}" + status_code=response.status_code, + detail=f"Failed to download folder from SLMM: {response.text}", ) + # Ingest through the shared NRL core. dedupe=False so a re-download of a + # still-growing folder captures the latest intervals (matches manual upload). + try: + result = ingest_nrl_zip( + location_id, response.content, db, + source="ftp_manual", dedupe=False, unit_id=unit_id, + ) + except IngestError as e: + # No usable .rnd/.rnh in the folder, or unknown location. + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error ingesting downloaded folder: {e}") + raise HTTPException(status_code=500, detail=f"Failed to ingest downloaded folder: {str(e)}") + + folder_name = os.path.basename(remote_path.rstrip('/')) + return { + "success": True, + "message": ( + f"Imported {result['leq_files']} Leq file(s) from {folder_name} " + f"({result['files_imported']} stored; 1-second _Lp_ data skipped)" + ), + "folder_name": folder_name, + "session_id": result["session_id"], + "file_count": result["files_imported"], + "leq_files": result["leq_files"], + "started_at": result["started_at"], + "stopped_at": result["stopped_at"], + "duration_seconds": result["duration_seconds"], + } + # ============================================================================ # Project Types @@ -1990,21 +1890,26 @@ async def ftp_download_folder_to_server( async def get_unified_files( project_id: str, request: Request, + location_id: Optional[str] = None, db: Session = Depends(get_db), ): """ Get unified view of all files in this project. Groups files by recording session with full metadata. Returns HTML partial with hierarchical file listing. + + When `location_id` is given, scope to a single NRL/location (used by the + per-NRL Data Files tab so it mirrors the project-wide tab). """ from backend.models import DataFile from pathlib import Path import json - # Get all sessions for this project - sessions = db.query(MonitoringSession).filter_by( - project_id=project_id - ).order_by(MonitoringSession.started_at.desc()).all() + # Sessions for this project (optionally scoped to one NRL/location) + q = db.query(MonitoringSession).filter_by(project_id=project_id) + if location_id: + q = q.filter(MonitoringSession.location_id == location_id) + sessions = q.order_by(MonitoringSession.started_at.desc()).all() sessions_data = [] for session in sessions: diff --git a/backend/services/scheduler.py b/backend/services/scheduler.py index e1ce32e..ec03cd1 100644 --- a/backend/services/scheduler.py +++ b/backend/services/scheduler.py @@ -309,18 +309,11 @@ class SchedulerService: 2. Enable FTP 3. Download measurement folder to SLMM local storage - After stop_cycle, if download succeeded, this method fetches the ZIP - from SLMM and extracts it into Terra-View's project directory, creating - DataFile records for each file. + After stop_cycle, if download succeeded, this method ingests the folder + into Terra-View through the shared NRL ingest (same path as cycle and the + manual SD-card upload) so the resulting session is Leq-only, has its + `.rnh` parsed (percentile slot map + weightings), and is deduped. """ - import hashlib - import io - import os - import zipfile - import httpx - from pathlib import Path - from backend.models import DataFile - # Parse notes for download preference include_download = True try: @@ -365,79 +358,39 @@ class SchedulerService: db.commit() - # If SLMM downloaded the folder successfully, fetch the ZIP from SLMM - # and extract it into Terra-View's project directory, creating DataFile records - files_created = 0 - if include_download and cycle_response.get("download_success") and active_session: + # If SLMM downloaded the folder successfully, ingest it into Terra-View + # through the shared NRL ingest (the same path cycle and the manual SD + # upload use): keeps only the .rnh + Leq .rnd, parses the header + # (percentile slot map + weightings), dedups, and links the unit. The + # transient "recording" marker session is dropped in favour of the clean + # ingested row. (Replaces the old inline unzip that stored every file — + # incl. the 1-second _Lp_ data — without parsing the .rnh.) + ingest_result = None + ingested_session_id = None + if (include_download and cycle_response.get("download_success") + and active_session and action.device_type == "slm"): folder_name = cycle_response.get("downloaded_folder") # e.g. "Auto_0058" - remote_path = f"/NL-43/{folder_name}" - - try: - SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100") - async with httpx.AsyncClient(timeout=600.0) as client: - zip_response = await client.post( - f"{SLMM_BASE_URL}/api/nl43/{unit_id}/ftp/download-folder", - json={"remote_path": remote_path} + if folder_name: + try: + ingest_result = await self._ingest_and_link( + db, + location_id=action.location_id, + unit_id=unit_id, + folder_name=folder_name, + placeholder_session=active_session, ) - - if zip_response.is_success and len(zip_response.content) > 22: - base_dir = Path(f"data/Projects/{action.project_id}/{active_session.id}/{folder_name}") - base_dir.mkdir(parents=True, exist_ok=True) - - file_type_map = { - '.wav': 'audio', '.mp3': 'audio', - '.csv': 'data', '.txt': 'data', '.json': 'data', '.dat': 'data', - '.rnd': 'data', '.rnh': 'data', - '.log': 'log', - '.zip': 'archive', - '.jpg': 'image', '.jpeg': 'image', '.png': 'image', - '.pdf': 'document', - } - - with zipfile.ZipFile(io.BytesIO(zip_response.content)) as zf: - for zip_info in zf.filelist: - if zip_info.is_dir(): - continue - file_data = zf.read(zip_info.filename) - file_path = base_dir / zip_info.filename - file_path.parent.mkdir(parents=True, exist_ok=True) - with open(file_path, 'wb') as f: - f.write(file_data) - checksum = hashlib.sha256(file_data).hexdigest() - ext = os.path.splitext(zip_info.filename)[1].lower() - data_file = DataFile( - id=str(uuid.uuid4()), - session_id=active_session.id, - file_path=str(file_path.relative_to("data")), - file_type=file_type_map.get(ext, 'data'), - file_size_bytes=len(file_data), - downloaded_at=datetime.utcnow(), - checksum=checksum, - file_metadata=json.dumps({ - "source": "stop_cycle", - "remote_path": remote_path, - "unit_id": unit_id, - "folder_name": folder_name, - "relative_path": zip_info.filename, - }), - ) - db.add(data_file) - files_created += 1 - - db.commit() - logger.info(f"Created {files_created} DataFile records for session {active_session.id} from {folder_name}") - else: - logger.warning(f"ZIP from SLMM for {folder_name} was empty or failed, skipping DataFile creation") - - except Exception as e: - logger.error(f"Failed to extract ZIP and create DataFile records for {folder_name}: {e}") - # Don't fail the stop action — the device was stopped successfully + ingested_session_id = ingest_result.get("session_id") + logger.info(f"[STOP] Ingested {folder_name}: {ingest_result}") + except Exception as e: + logger.error(f"Failed to ingest {folder_name} on stop: {e}", exc_info=True) + # Don't fail the stop action — the device was stopped successfully + ingest_result = {"success": False, "error": str(e)} return { "status": "stopped", - "session_id": active_session.id if active_session else None, + "session_id": ingested_session_id or (active_session.id if active_session else None), "cycle_response": cycle_response, - "files_created": files_created, + "ingest": ingest_result, } async def _execute_download( @@ -490,12 +443,38 @@ class SchedulerService: files=None, # Download all files in current measurement folder ) - # TODO: Create DataFile records for downloaded files + # Ingest the downloaded folder into Terra-View via the shared NRL ingest + # (same path as stop/cycle): clean Leq-only session, .rnh parsed + # (percentiles + weightings), deduped, unit linked. No placeholder + # session here — a standalone download isn't tied to a "recording" marker. + ingest_result = None + if action.device_type == "slm": + folder_name = (response or {}).get("folder_name") + if not folder_name: + try: + folder_name = f"Auto_{int((response or {}).get('index_number')):04d}" + except (ValueError, TypeError): + folder_name = None + if not folder_name: + ingest_result = {"success": False, "error": "no folder_name/index_number from download"} + else: + try: + ingest_result = await self._ingest_and_link( + db, + location_id=action.location_id, + unit_id=unit_id, + folder_name=folder_name, + ) + logger.info(f"[DOWNLOAD] Ingested {folder_name}: {ingest_result}") + except Exception as e: + logger.error(f"Failed to ingest {folder_name} on download: {e}", exc_info=True) + ingest_result = {"success": False, "error": str(e)} return { "status": "downloaded", "destination_path": destination_path, "device_response": response, + "ingest": ingest_result, } async def _execute_cycle( @@ -650,27 +629,17 @@ class SchedulerService: else: folder_name = f"Auto_{idx:04d}" try: - ing = await self._ingest_cycle_folder(db, action.location_id, unit_id, folder_name) + ing = await self._ingest_and_link( + db, + location_id=action.location_id, + unit_id=unit_id, + folder_name=folder_name, + placeholder_session=active_session, + ) result["steps"]["ingest"] = ing - db.commit() - if ing.get("success"): - from backend.models import DataFile - sid = ing.get("session_id") - # ingest_nrl_zip leaves unit_id None — tie the data session to the - # unit that recorded it so it stays linked after we drop the placeholder. - if sid: - s = db.query(MonitoringSession).filter_by(id=sid).first() - if s and not s.unit_id: - s.unit_id = unit_id - db.commit() - # The just-closed "recording" session was only a marker; its data now - # lives in the ingested (unit-linked) session. Drop the empty placeholder - # and repoint old_session_id at the real row. - if active_session and db.query(DataFile).filter_by(session_id=active_session.id).count() == 0: - if sid: - result["old_session_id"] = sid - db.delete(active_session) - db.commit() + # The marker session was dropped; repoint old_session_id at the real row. + if ing.get("placeholder_dropped") and ing.get("session_id"): + result["old_session_id"] = ing["session_id"] logger.info(f"[CYCLE] Ingested {folder_name}: {ing}") except Exception as e: logger.error(f"[CYCLE] Ingest failed for {folder_name}: {e}", exc_info=True) @@ -711,31 +680,52 @@ class SchedulerService: logger.info(f"[CYCLE] New measurement started, session {new_session.id}") # Step 6b: Verify the meter actually resumed measuring (fresh DOD). - # Polling is still paused here, so query directly. Advisory: a - # failure alerts loudly but doesn't fail the cycle (DOD reads can - # be transiently flaky); the keepalive poll re-confirms within ~10s. + # Polling is still paused here, so query directly. If it didn't + # resume, retry ONCE with a plain start (start_recording — does NOT + # re-index, unlike start_cycle) before alerting: a meter left + # stopped overnight is the costly failure, and a transient restart + # hiccup is common on the NL-43. We retry only on a *confident* + # not-measuring reading — never on a failed/inconclusive DOD read — + # so a flaky read can't disrupt an already-running measurement. if action.device_type == "slm": - try: - await asyncio.sleep(2) - live = await self.device_controller.get_live_data(unit_id, action.device_type) - state = ((live or {}).get("measurement_state") - or ((live or {}).get("data") or {}).get("measurement_state") or "") - measuring = str(state).strip().lower() in ("start", "measure", "measuring", "run", "running") - result["steps"]["restart_verified"] = measuring - if measuring: - logger.info(f"[CYCLE] Restart verified — {unit_id} is measuring (state={state}).") - else: - logger.error(f"[CYCLE] Restart NOT verified for {unit_id} — state={state!r}") - try: - get_alert_service(db).create_schedule_failed_alert( - schedule_id=action.id, action_type="cycle", unit_id=unit_id, - error_message=f"Meter did not resume measuring after the cycle (state={state!r}).", - project_id=action.project_id, location_id=action.location_id, - ) - except Exception as ae: - logger.warning(f"[CYCLE] restart-verify alert failed: {ae}") - except Exception as e: - logger.warning(f"[CYCLE] Restart verification skipped (DOD read failed): {e}") + async def _check_measuring(): + """Return (measuring, state); measuring is None if the DOD read failed.""" + try: + await asyncio.sleep(2) + live = await self.device_controller.get_live_data(unit_id, action.device_type) + state = ((live or {}).get("measurement_state") + or ((live or {}).get("data") or {}).get("measurement_state") or "") + ok = str(state).strip().lower() in ("start", "measure", "measuring", "run", "running") + return ok, state + except Exception as e: + logger.warning(f"[CYCLE] Restart-verify DOD read failed: {e}") + return None, None + + measuring, state = await _check_measuring() + if measuring is False: + logger.warning(f"[CYCLE] {unit_id} not measuring after restart (state={state!r}) — retrying start once.") + result["steps"]["restart_retry"] = True + try: + await self.device_controller.start_recording(unit_id, action.device_type) + measuring, state = await _check_measuring() + except Exception as e: + logger.error(f"[CYCLE] Restart retry (start_recording) failed for {unit_id}: {e}") + + result["steps"]["restart_verified"] = measuring + if measuring: + logger.info(f"[CYCLE] Restart verified — {unit_id} is measuring (state={state}).") + elif measuring is False: + logger.error(f"[CYCLE] Restart NOT verified for {unit_id} after retry — state={state!r}") + try: + get_alert_service(db).create_schedule_failed_alert( + schedule_id=action.id, action_type="cycle", unit_id=unit_id, + error_message=f"Meter did not resume measuring after the cycle + one retry (state={state!r}).", + project_id=action.project_id, location_id=action.location_id, + ) + except Exception as ae: + logger.warning(f"[CYCLE] restart-verify alert failed: {ae}") + else: + logger.warning(f"[CYCLE] Restart verification inconclusive for {unit_id} (DOD read failed); keepalive poll will re-confirm.") except Exception as e: logger.error(f"[CYCLE] Start failed: {e}") @@ -790,6 +780,54 @@ class SchedulerService: except IngestError as e: return {"success": False, "error": str(e)} + async def _ingest_and_link( + self, + db, + *, + location_id: str, + unit_id: str, + folder_name: str, + placeholder_session=None, + ) -> dict: + """Ingest a just-finished Auto_#### folder and tie it to the unit. + + This is the ONE ingest path that stop / cycle / download all funnel + through, so every route produces the same clean session: Leq-only, + `.rnh` parsed (percentile slot map + weightings captured), deduped. + + Steps: + 1. Fetch + ingest the folder via the shared NRL ingest + (`_ingest_cycle_folder` → `ingest_nrl_zip`). + 2. `ingest_nrl_zip` leaves `unit_id` None — link it to the unit that + recorded the data so the session stays attributed. + 3. If a `placeholder_session` (the transient "recording" marker) was + passed and it never accumulated DataFiles of its own, drop it — its + data now lives in the ingested, unit-linked session. + + Returns the ingest result dict (with `success`); adds `placeholder_dropped` + when step 3 removed the marker. On ingest failure the placeholder is + left untouched. + """ + ing = await self._ingest_cycle_folder(db, location_id, unit_id, folder_name) + if not ing.get("success"): + return ing + + sid = ing.get("session_id") + if sid: + s = db.query(MonitoringSession).filter_by(id=sid).first() + if s and not s.unit_id: + s.unit_id = unit_id + db.commit() + + if placeholder_session is not None and sid: + from backend.models import DataFile + if db.query(DataFile).filter_by(session_id=placeholder_session.id).count() == 0: + db.delete(placeholder_session) + db.commit() + ing["placeholder_dropped"] = True + + return ing + # ======================================================================== # Recurring Schedule Generation # ======================================================================== diff --git a/docs/adr/0001-device-data-ownership.md b/docs/adr/0001-device-data-ownership.md new file mode 100644 index 0000000..72daa37 --- /dev/null +++ b/docs/adr/0001-device-data-ownership.md @@ -0,0 +1,57 @@ +# ADR 0001 — Device data ownership: modules own raw data, Terra-View owns fleet context + +- **Status:** Accepted (SLMM grandfathered as a known exception — see Consequences) +- **Date:** 2026-06-16 +- **Deciders:** Brian +- **Applies to:** Terra-View core and all device modules (SFM, SLMM, and future modules) + +## Context + +Terra-View is a fleet-management / UI layer that talks to specialized **device modules**, each of which speaks one device's protocol (see the architecture note in `CLAUDE.md`). Two modules exist today, and they store their data **differently**: + +- **SFM (seismograph / seismo-relay).** Owns its own database **and** waveform store. Terra-View holds **no** seismic event or waveform data — it reads through live, e.g. `GET {SFM_BASE_URL}/db/events` (`backend/routers/activity.py`, `backend/routers/admin_modules.py`). Terra-View renders; SFM persists. +- **SLMM (sound level meters).** A thin device-control shim. The sound **measurement data is stored in Terra-View** — `MonitoringSession` + `DataFile` rows in `data/seismo_fleet.db`, and the `.rnh` + Leq `.rnd` files under `data/Projects/{project_id}/{session_id}/` (`backend/routers/project_locations.py:_ingest_file_entries`). SLMM only keeps device config + a live-status cache (`slmm.db`) and a transient download staging area (`data/downloads/{unit_id}/`). + +This inconsistency is real, not cosmetic. It raises an obvious question every time we add a feature or a module: *where does this device's data live?* Without a stated rule, the answer drifts per-module, which is exactly how conceptual integrity erodes. + +### Why the asymmetry exists (history, not sloppiness) + +1. **Path dependence.** seismo-relay pre-existed as a complete data system; Terra-View integrated *with* it. SLMM was built fresh as a control shim, so persistence drifted up into Terra-View. +2. **Coupling.** A seismic event is largely self-contained — Terra-View just tags it to a unit. A Leq interval is only meaningful against an NRL location + baseline + report config, which are **Terra-View concepts**. Sound data has stronger natural gravity toward Terra-View ownership than seismic events do. + +## Decision + +Adopt one explicit ownership rule for all device data: + +> **The device module owns the raw device data (waveforms, events, Leq files, raw telemetry). Terra-View owns the fleet/project/location/session/report context that gives that data meaning.** + +Note this is **not** "Terra-View stores nothing" — Terra-View remains the system of record for roster, projects, locations, deployments, history, schedules, and the associations between fleet entities and module-owned data. What it should **not** own is a second copy of raw device telemetry. + +**Litmus test for any "where does this live?" call:** *whose question does this data answer?* +- "What did the sensor record?" (raw waveform / Leq rows) → **the module**. +- "Which NRL, which night, versus which baseline?" (context) → **Terra-View**. + +### Application + +- **New device modules MUST follow the SFM pattern**: the module owns its data and exposes a read API (`/db/*` or equivalent); Terra-View references it and reads through, rather than ingesting a copy. +- **SFM** already conforms. No change. +- **SLMM does not conform** and is explicitly **grandfathered** (see Consequences). + +## Consequences + +**Positive** +- Consistent module boundaries → lower cognitive load, fewer "which copy is authoritative?" bugs. +- Terra-View stays thin; "add a device type = add a module" stays true (the CLAUDE.md north star). +- Single source of truth for raw data; no silent duplication. + +**Negative / costs** +- Realigning **SLMM** to this rule is a non-trivial refactor: move ingest + file storage into SLMM, build a SLMM read API, repoint the report engine and the Data Files UI to read through it, handle the session↔location association across the module boundary, and migrate existing `MonitoringSession`/`DataFile` data. The FTP night-report pipeline currently **assumes Terra-View ownership**. + +**SLMM grandfather clause** +- SLMM stays as-is for now. Realignment is a **deliberate future project**, not a background cleanup, and should be triggered by a real signal — e.g. a 3rd device type arriving, or the duplication/coupling actually causing pain. Until then, Terra-View remains the system of record for sound data, and that is an accepted, documented exception rather than an aspiration. +- The current sound data flow (for reference): `NL-43 SD card → (FTP) → SLMM data/downloads/ → (proxy ZIP) → Terra-View ingest → data/Projects/ + seismo_fleet.db`. The 1-second `_Lp_` files are dropped at ingest and never land in Terra-View. + +## Related + +- `CLAUDE.md` — module architecture ("Terra-View does NOT communicate directly with physical devices"). +- FTP night-report pipeline (`feat/ftp-report-pipeline`) — built on the current SLMM/Terra-View-ownership model; a future SLMM realignment would need to repoint it. diff --git a/templates/nrl_detail.html b/templates/nrl_detail.html index cafc12a..656435d 100644 --- a/templates/nrl_detail.html +++ b/templates/nrl_detail.html @@ -357,6 +357,16 @@
-