refactor(reports): funnel scheduler stop/download/cycle through one ingest

_execute_stop and _execute_download no longer hand-roll ZIP extraction; all three actions now call a shared _ingest_and_link helper (ingest via ingest_nrl_zip, link the unit, drop the empty placeholder session). Every capture path produces the same clean, .rnh-parsed, percentile-aware, deduped, Leq-only session. _execute_download previously created no session at all (TODO); it now ingests like the others.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-15 18:12:15 +00:00
parent 5c38f6ec1e
commit abcfba179f
+113 -96
View File
@@ -309,18 +309,11 @@ class SchedulerService:
2. Enable FTP 2. Enable FTP
3. Download measurement folder to SLMM local storage 3. Download measurement folder to SLMM local storage
After stop_cycle, if download succeeded, this method fetches the ZIP After stop_cycle, if download succeeded, this method ingests the folder
from SLMM and extracts it into Terra-View's project directory, creating into Terra-View through the shared NRL ingest (same path as cycle and the
DataFile records for each file. manual SD-card upload) so the resulting session is Leq-only, has its
`.rnh` parsed (percentile slot map + weightings), and is deduped.
""" """
import hashlib
import io
import os
import zipfile
import httpx
from pathlib import Path
from backend.models import DataFile
# Parse notes for download preference # Parse notes for download preference
include_download = True include_download = True
try: try:
@@ -365,79 +358,39 @@ class SchedulerService:
db.commit() db.commit()
# If SLMM downloaded the folder successfully, fetch the ZIP from SLMM # If SLMM downloaded the folder successfully, ingest it into Terra-View
# and extract it into Terra-View's project directory, creating DataFile records # through the shared NRL ingest (the same path cycle and the manual SD
files_created = 0 # upload use): keeps only the .rnh + Leq .rnd, parses the header
if include_download and cycle_response.get("download_success") and active_session: # (percentile slot map + weightings), dedups, and links the unit. The
# transient "recording" marker session is dropped in favour of the clean
# ingested row. (Replaces the old inline unzip that stored every file —
# incl. the 1-second _Lp_ data — without parsing the .rnh.)
ingest_result = None
ingested_session_id = None
if (include_download and cycle_response.get("download_success")
and active_session and action.device_type == "slm"):
folder_name = cycle_response.get("downloaded_folder") # e.g. "Auto_0058" folder_name = cycle_response.get("downloaded_folder") # e.g. "Auto_0058"
remote_path = f"/NL-43/{folder_name}" if folder_name:
try: try:
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100") ingest_result = await self._ingest_and_link(
async with httpx.AsyncClient(timeout=600.0) as client: db,
zip_response = await client.post( location_id=action.location_id,
f"{SLMM_BASE_URL}/api/nl43/{unit_id}/ftp/download-folder", unit_id=unit_id,
json={"remote_path": remote_path} folder_name=folder_name,
placeholder_session=active_session,
) )
ingested_session_id = ingest_result.get("session_id")
if zip_response.is_success and len(zip_response.content) > 22: logger.info(f"[STOP] Ingested {folder_name}: {ingest_result}")
base_dir = Path(f"data/Projects/{action.project_id}/{active_session.id}/{folder_name}")
base_dir.mkdir(parents=True, exist_ok=True)
file_type_map = {
'.wav': 'audio', '.mp3': 'audio',
'.csv': 'data', '.txt': 'data', '.json': 'data', '.dat': 'data',
'.rnd': 'data', '.rnh': 'data',
'.log': 'log',
'.zip': 'archive',
'.jpg': 'image', '.jpeg': 'image', '.png': 'image',
'.pdf': 'document',
}
with zipfile.ZipFile(io.BytesIO(zip_response.content)) as zf:
for zip_info in zf.filelist:
if zip_info.is_dir():
continue
file_data = zf.read(zip_info.filename)
file_path = base_dir / zip_info.filename
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'wb') as f:
f.write(file_data)
checksum = hashlib.sha256(file_data).hexdigest()
ext = os.path.splitext(zip_info.filename)[1].lower()
data_file = DataFile(
id=str(uuid.uuid4()),
session_id=active_session.id,
file_path=str(file_path.relative_to("data")),
file_type=file_type_map.get(ext, 'data'),
file_size_bytes=len(file_data),
downloaded_at=datetime.utcnow(),
checksum=checksum,
file_metadata=json.dumps({
"source": "stop_cycle",
"remote_path": remote_path,
"unit_id": unit_id,
"folder_name": folder_name,
"relative_path": zip_info.filename,
}),
)
db.add(data_file)
files_created += 1
db.commit()
logger.info(f"Created {files_created} DataFile records for session {active_session.id} from {folder_name}")
else:
logger.warning(f"ZIP from SLMM for {folder_name} was empty or failed, skipping DataFile creation")
except Exception as e: except Exception as e:
logger.error(f"Failed to extract ZIP and create DataFile records for {folder_name}: {e}") logger.error(f"Failed to ingest {folder_name} on stop: {e}", exc_info=True)
# Don't fail the stop action — the device was stopped successfully # Don't fail the stop action — the device was stopped successfully
ingest_result = {"success": False, "error": str(e)}
return { return {
"status": "stopped", "status": "stopped",
"session_id": active_session.id if active_session else None, "session_id": ingested_session_id or (active_session.id if active_session else None),
"cycle_response": cycle_response, "cycle_response": cycle_response,
"files_created": files_created, "ingest": ingest_result,
} }
async def _execute_download( async def _execute_download(
@@ -490,12 +443,38 @@ class SchedulerService:
files=None, # Download all files in current measurement folder files=None, # Download all files in current measurement folder
) )
# TODO: Create DataFile records for downloaded files # Ingest the downloaded folder into Terra-View via the shared NRL ingest
# (same path as stop/cycle): clean Leq-only session, .rnh parsed
# (percentiles + weightings), deduped, unit linked. No placeholder
# session here — a standalone download isn't tied to a "recording" marker.
ingest_result = None
if action.device_type == "slm":
folder_name = (response or {}).get("folder_name")
if not folder_name:
try:
folder_name = f"Auto_{int((response or {}).get('index_number')):04d}"
except (ValueError, TypeError):
folder_name = None
if not folder_name:
ingest_result = {"success": False, "error": "no folder_name/index_number from download"}
else:
try:
ingest_result = await self._ingest_and_link(
db,
location_id=action.location_id,
unit_id=unit_id,
folder_name=folder_name,
)
logger.info(f"[DOWNLOAD] Ingested {folder_name}: {ingest_result}")
except Exception as e:
logger.error(f"Failed to ingest {folder_name} on download: {e}", exc_info=True)
ingest_result = {"success": False, "error": str(e)}
return { return {
"status": "downloaded", "status": "downloaded",
"destination_path": destination_path, "destination_path": destination_path,
"device_response": response, "device_response": response,
"ingest": ingest_result,
} }
async def _execute_cycle( async def _execute_cycle(
@@ -650,27 +629,17 @@ class SchedulerService:
else: else:
folder_name = f"Auto_{idx:04d}" folder_name = f"Auto_{idx:04d}"
try: try:
ing = await self._ingest_cycle_folder(db, action.location_id, unit_id, folder_name) ing = await self._ingest_and_link(
db,
location_id=action.location_id,
unit_id=unit_id,
folder_name=folder_name,
placeholder_session=active_session,
)
result["steps"]["ingest"] = ing result["steps"]["ingest"] = ing
db.commit() # The marker session was dropped; repoint old_session_id at the real row.
if ing.get("success"): if ing.get("placeholder_dropped") and ing.get("session_id"):
from backend.models import DataFile result["old_session_id"] = ing["session_id"]
sid = ing.get("session_id")
# ingest_nrl_zip leaves unit_id None — tie the data session to the
# unit that recorded it so it stays linked after we drop the placeholder.
if sid:
s = db.query(MonitoringSession).filter_by(id=sid).first()
if s and not s.unit_id:
s.unit_id = unit_id
db.commit()
# The just-closed "recording" session was only a marker; its data now
# lives in the ingested (unit-linked) session. Drop the empty placeholder
# and repoint old_session_id at the real row.
if active_session and db.query(DataFile).filter_by(session_id=active_session.id).count() == 0:
if sid:
result["old_session_id"] = sid
db.delete(active_session)
db.commit()
logger.info(f"[CYCLE] Ingested {folder_name}: {ing}") logger.info(f"[CYCLE] Ingested {folder_name}: {ing}")
except Exception as e: except Exception as e:
logger.error(f"[CYCLE] Ingest failed for {folder_name}: {e}", exc_info=True) logger.error(f"[CYCLE] Ingest failed for {folder_name}: {e}", exc_info=True)
@@ -790,6 +759,54 @@ class SchedulerService:
except IngestError as e: except IngestError as e:
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
async def _ingest_and_link(
self,
db,
*,
location_id: str,
unit_id: str,
folder_name: str,
placeholder_session=None,
) -> dict:
"""Ingest a just-finished Auto_#### folder and tie it to the unit.
This is the ONE ingest path that stop / cycle / download all funnel
through, so every route produces the same clean session: Leq-only,
`.rnh` parsed (percentile slot map + weightings captured), deduped.
Steps:
1. Fetch + ingest the folder via the shared NRL ingest
(`_ingest_cycle_folder` → `ingest_nrl_zip`).
2. `ingest_nrl_zip` leaves `unit_id` None — link it to the unit that
recorded the data so the session stays attributed.
3. If a `placeholder_session` (the transient "recording" marker) was
passed and it never accumulated DataFiles of its own, drop it — its
data now lives in the ingested, unit-linked session.
Returns the ingest result dict (with `success`); adds `placeholder_dropped`
when step 3 removed the marker. On ingest failure the placeholder is
left untouched.
"""
ing = await self._ingest_cycle_folder(db, location_id, unit_id, folder_name)
if not ing.get("success"):
return ing
sid = ing.get("session_id")
if sid:
s = db.query(MonitoringSession).filter_by(id=sid).first()
if s and not s.unit_id:
s.unit_id = unit_id
db.commit()
if placeholder_session is not None and sid:
from backend.models import DataFile
if db.query(DataFile).filter_by(session_id=placeholder_session.id).count() == 0:
db.delete(placeholder_session)
db.commit()
ing["placeholder_dropped"] = True
return ing
# ======================================================================== # ========================================================================
# Recurring Schedule Generation # Recurring Schedule Generation
# ======================================================================== # ========================================================================