From abcfba179f09e1796a467b726e869098d38a8780 Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 15 Jun 2026 18:12:15 +0000 Subject: [PATCH] refactor(reports): funnel scheduler stop/download/cycle through one ingest _execute_stop and _execute_download no longer hand-roll ZIP extraction; all three actions now call a shared _ingest_and_link helper (ingest via ingest_nrl_zip, link the unit, drop the empty placeholder session). Every capture path produces the same clean, .rnh-parsed, percentile-aware, deduped, Leq-only session. _execute_download previously created no session at all (TODO); it now ingests like the others. Co-Authored-By: Claude Opus 4.8 (1M context) --- backend/services/scheduler.py | 215 ++++++++++++++++++---------------- 1 file changed, 116 insertions(+), 99 deletions(-) diff --git a/backend/services/scheduler.py b/backend/services/scheduler.py index e1ce32e..fc16ee9 100644 --- a/backend/services/scheduler.py +++ b/backend/services/scheduler.py @@ -309,18 +309,11 @@ class SchedulerService: 2. Enable FTP 3. Download measurement folder to SLMM local storage - After stop_cycle, if download succeeded, this method fetches the ZIP - from SLMM and extracts it into Terra-View's project directory, creating - DataFile records for each file. + After stop_cycle, if download succeeded, this method ingests the folder + into Terra-View through the shared NRL ingest (same path as cycle and the + manual SD-card upload) so the resulting session is Leq-only, has its + `.rnh` parsed (percentile slot map + weightings), and is deduped. """ - import hashlib - import io - import os - import zipfile - import httpx - from pathlib import Path - from backend.models import DataFile - # Parse notes for download preference include_download = True try: @@ -365,79 +358,39 @@ class SchedulerService: db.commit() - # If SLMM downloaded the folder successfully, fetch the ZIP from SLMM - # and extract it into Terra-View's project directory, creating DataFile records - files_created = 0 - if include_download and cycle_response.get("download_success") and active_session: + # If SLMM downloaded the folder successfully, ingest it into Terra-View + # through the shared NRL ingest (the same path cycle and the manual SD + # upload use): keeps only the .rnh + Leq .rnd, parses the header + # (percentile slot map + weightings), dedups, and links the unit. The + # transient "recording" marker session is dropped in favour of the clean + # ingested row. (Replaces the old inline unzip that stored every file — + # incl. the 1-second _Lp_ data — without parsing the .rnh.) + ingest_result = None + ingested_session_id = None + if (include_download and cycle_response.get("download_success") + and active_session and action.device_type == "slm"): folder_name = cycle_response.get("downloaded_folder") # e.g. "Auto_0058" - remote_path = f"/NL-43/{folder_name}" - - try: - SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100") - async with httpx.AsyncClient(timeout=600.0) as client: - zip_response = await client.post( - f"{SLMM_BASE_URL}/api/nl43/{unit_id}/ftp/download-folder", - json={"remote_path": remote_path} + if folder_name: + try: + ingest_result = await self._ingest_and_link( + db, + location_id=action.location_id, + unit_id=unit_id, + folder_name=folder_name, + placeholder_session=active_session, ) - - if zip_response.is_success and len(zip_response.content) > 22: - base_dir = Path(f"data/Projects/{action.project_id}/{active_session.id}/{folder_name}") - base_dir.mkdir(parents=True, exist_ok=True) - - file_type_map = { - '.wav': 'audio', '.mp3': 'audio', - '.csv': 'data', '.txt': 'data', '.json': 'data', '.dat': 'data', - '.rnd': 'data', '.rnh': 'data', - '.log': 'log', - '.zip': 'archive', - '.jpg': 'image', '.jpeg': 'image', '.png': 'image', - '.pdf': 'document', - } - - with zipfile.ZipFile(io.BytesIO(zip_response.content)) as zf: - for zip_info in zf.filelist: - if zip_info.is_dir(): - continue - file_data = zf.read(zip_info.filename) - file_path = base_dir / zip_info.filename - file_path.parent.mkdir(parents=True, exist_ok=True) - with open(file_path, 'wb') as f: - f.write(file_data) - checksum = hashlib.sha256(file_data).hexdigest() - ext = os.path.splitext(zip_info.filename)[1].lower() - data_file = DataFile( - id=str(uuid.uuid4()), - session_id=active_session.id, - file_path=str(file_path.relative_to("data")), - file_type=file_type_map.get(ext, 'data'), - file_size_bytes=len(file_data), - downloaded_at=datetime.utcnow(), - checksum=checksum, - file_metadata=json.dumps({ - "source": "stop_cycle", - "remote_path": remote_path, - "unit_id": unit_id, - "folder_name": folder_name, - "relative_path": zip_info.filename, - }), - ) - db.add(data_file) - files_created += 1 - - db.commit() - logger.info(f"Created {files_created} DataFile records for session {active_session.id} from {folder_name}") - else: - logger.warning(f"ZIP from SLMM for {folder_name} was empty or failed, skipping DataFile creation") - - except Exception as e: - logger.error(f"Failed to extract ZIP and create DataFile records for {folder_name}: {e}") - # Don't fail the stop action — the device was stopped successfully + ingested_session_id = ingest_result.get("session_id") + logger.info(f"[STOP] Ingested {folder_name}: {ingest_result}") + except Exception as e: + logger.error(f"Failed to ingest {folder_name} on stop: {e}", exc_info=True) + # Don't fail the stop action — the device was stopped successfully + ingest_result = {"success": False, "error": str(e)} return { "status": "stopped", - "session_id": active_session.id if active_session else None, + "session_id": ingested_session_id or (active_session.id if active_session else None), "cycle_response": cycle_response, - "files_created": files_created, + "ingest": ingest_result, } async def _execute_download( @@ -490,12 +443,38 @@ class SchedulerService: files=None, # Download all files in current measurement folder ) - # TODO: Create DataFile records for downloaded files + # Ingest the downloaded folder into Terra-View via the shared NRL ingest + # (same path as stop/cycle): clean Leq-only session, .rnh parsed + # (percentiles + weightings), deduped, unit linked. No placeholder + # session here — a standalone download isn't tied to a "recording" marker. + ingest_result = None + if action.device_type == "slm": + folder_name = (response or {}).get("folder_name") + if not folder_name: + try: + folder_name = f"Auto_{int((response or {}).get('index_number')):04d}" + except (ValueError, TypeError): + folder_name = None + if not folder_name: + ingest_result = {"success": False, "error": "no folder_name/index_number from download"} + else: + try: + ingest_result = await self._ingest_and_link( + db, + location_id=action.location_id, + unit_id=unit_id, + folder_name=folder_name, + ) + logger.info(f"[DOWNLOAD] Ingested {folder_name}: {ingest_result}") + except Exception as e: + logger.error(f"Failed to ingest {folder_name} on download: {e}", exc_info=True) + ingest_result = {"success": False, "error": str(e)} return { "status": "downloaded", "destination_path": destination_path, "device_response": response, + "ingest": ingest_result, } async def _execute_cycle( @@ -650,27 +629,17 @@ class SchedulerService: else: folder_name = f"Auto_{idx:04d}" try: - ing = await self._ingest_cycle_folder(db, action.location_id, unit_id, folder_name) + ing = await self._ingest_and_link( + db, + location_id=action.location_id, + unit_id=unit_id, + folder_name=folder_name, + placeholder_session=active_session, + ) result["steps"]["ingest"] = ing - db.commit() - if ing.get("success"): - from backend.models import DataFile - sid = ing.get("session_id") - # ingest_nrl_zip leaves unit_id None — tie the data session to the - # unit that recorded it so it stays linked after we drop the placeholder. - if sid: - s = db.query(MonitoringSession).filter_by(id=sid).first() - if s and not s.unit_id: - s.unit_id = unit_id - db.commit() - # The just-closed "recording" session was only a marker; its data now - # lives in the ingested (unit-linked) session. Drop the empty placeholder - # and repoint old_session_id at the real row. - if active_session and db.query(DataFile).filter_by(session_id=active_session.id).count() == 0: - if sid: - result["old_session_id"] = sid - db.delete(active_session) - db.commit() + # The marker session was dropped; repoint old_session_id at the real row. + if ing.get("placeholder_dropped") and ing.get("session_id"): + result["old_session_id"] = ing["session_id"] logger.info(f"[CYCLE] Ingested {folder_name}: {ing}") except Exception as e: logger.error(f"[CYCLE] Ingest failed for {folder_name}: {e}", exc_info=True) @@ -790,6 +759,54 @@ class SchedulerService: except IngestError as e: return {"success": False, "error": str(e)} + async def _ingest_and_link( + self, + db, + *, + location_id: str, + unit_id: str, + folder_name: str, + placeholder_session=None, + ) -> dict: + """Ingest a just-finished Auto_#### folder and tie it to the unit. + + This is the ONE ingest path that stop / cycle / download all funnel + through, so every route produces the same clean session: Leq-only, + `.rnh` parsed (percentile slot map + weightings captured), deduped. + + Steps: + 1. Fetch + ingest the folder via the shared NRL ingest + (`_ingest_cycle_folder` → `ingest_nrl_zip`). + 2. `ingest_nrl_zip` leaves `unit_id` None — link it to the unit that + recorded the data so the session stays attributed. + 3. If a `placeholder_session` (the transient "recording" marker) was + passed and it never accumulated DataFiles of its own, drop it — its + data now lives in the ingested, unit-linked session. + + Returns the ingest result dict (with `success`); adds `placeholder_dropped` + when step 3 removed the marker. On ingest failure the placeholder is + left untouched. + """ + ing = await self._ingest_cycle_folder(db, location_id, unit_id, folder_name) + if not ing.get("success"): + return ing + + sid = ing.get("session_id") + if sid: + s = db.query(MonitoringSession).filter_by(id=sid).first() + if s and not s.unit_id: + s.unit_id = unit_id + db.commit() + + if placeholder_session is not None and sid: + from backend.models import DataFile + if db.query(DataFile).filter_by(session_id=placeholder_session.id).count() == 0: + db.delete(placeholder_session) + db.commit() + ing["placeholder_dropped"] = True + + return ing + # ======================================================================== # Recurring Schedule Generation # ========================================================================