diff --git a/backend/services/scheduler.py b/backend/services/scheduler.py index 0982358..e149da9 100644 --- a/backend/services/scheduler.py +++ b/backend/services/scheduler.py @@ -636,6 +636,35 @@ class SchedulerService: ) result["old_session_id"] = active_session.id + # Step 4b: Ingest the just-finished Auto_#### folder into Terra-View + # (clean session + DataFiles via ingest_nrl_zip — filters Lp, parses the + # .rnh, dedups). This is what gives the nightly report its data. + if action.device_type == "slm" and result["steps"].get("download", {}).get("success"): + idx = None + try: + idx = int((result["steps"]["download"].get("response") or {}).get("index_number")) + except (ValueError, TypeError): + idx = None + if idx is None: + result["steps"]["ingest"] = {"success": False, "error": "no index_number from download"} + else: + folder_name = f"Auto_{idx:04d}" + try: + ing = await self._ingest_cycle_folder(db, action.location_id, unit_id, folder_name) + result["steps"]["ingest"] = ing + db.commit() + # The just-closed "recording" session was only a marker — if the + # ingest created the real data session, drop the empty placeholder. + if ing.get("success") and active_session: + from backend.models import DataFile + if db.query(DataFile).filter_by(session_id=active_session.id).count() == 0: + db.delete(active_session) + db.commit() + logger.info(f"[CYCLE] Ingested {folder_name}: {ing}") + except Exception as e: + logger.error(f"[CYCLE] Ingest failed for {folder_name}: {e}", exc_info=True) + result["steps"]["ingest"] = {"success": False, "error": str(e)} + # Step 5: Wait for device to settle before starting new measurement logger.info(f"[CYCLE] Step 5/7: Waiting 30s for device to settle...") await asyncio.sleep(30) @@ -670,6 +699,33 @@ class SchedulerService: logger.info(f"[CYCLE] New measurement started, session {new_session.id}") + # Step 6b: Verify the meter actually resumed measuring (fresh DOD). + # Polling is still paused here, so query directly. Advisory: a + # failure alerts loudly but doesn't fail the cycle (DOD reads can + # be transiently flaky); the keepalive poll re-confirms within ~10s. + if action.device_type == "slm": + try: + await asyncio.sleep(2) + live = await self.device_controller.get_live_data(unit_id, action.device_type) + state = ((live or {}).get("measurement_state") + or ((live or {}).get("data") or {}).get("measurement_state") or "") + measuring = str(state).strip().lower() in ("start", "measure", "measuring", "run", "running") + result["steps"]["restart_verified"] = measuring + if measuring: + logger.info(f"[CYCLE] Restart verified — {unit_id} is measuring (state={state}).") + else: + logger.error(f"[CYCLE] Restart NOT verified for {unit_id} — state={state!r}") + try: + get_alert_service(db).create_schedule_failed_alert( + schedule_id=action.id, action_type="cycle", unit_id=unit_id, + error_message=f"Meter did not resume measuring after the cycle (state={state!r}).", + project_id=action.project_id, location_id=action.location_id, + ) + except Exception as ae: + logger.warning(f"[CYCLE] restart-verify alert failed: {ae}") + except Exception as e: + logger.warning(f"[CYCLE] Restart verification skipped (DOD read failed): {e}") + except Exception as e: logger.error(f"[CYCLE] Start failed: {e}") result["steps"]["start"] = {"success": False, "error": str(e)} @@ -692,6 +748,37 @@ class SchedulerService: logger.info(f"[CYCLE] === Cycle complete for {unit_id} ===") return result + async def _ingest_cycle_folder(self, db, location_id: str, unit_id: str, folder_name: str) -> dict: + """Fetch a just-finished Auto_#### folder from SLMM (FTP proxy) and ingest + it into Terra-View (clean MonitoringSession + DataFiles via ingest_nrl_zip). + + Returns the ingest result dict, or {"success": False, "error": ...}. + Used by _execute_cycle Step 4b. + """ + import os + import httpx + from backend.routers.project_locations import ingest_nrl_zip, IngestError + + slmm_base = os.getenv("SLMM_BASE_URL", "http://localhost:8100") + remote_path = f"/NL-43/{folder_name}" + try: + async with httpx.AsyncClient(timeout=600.0) as client: + resp = await client.post( + f"{slmm_base}/api/nl43/{unit_id}/ftp/download-folder", + json={"remote_path": remote_path}, + ) + except Exception as e: + return {"success": False, "error": f"download-folder request failed: {e}"} + + if not resp.is_success or len(resp.content) <= 22: # 22 bytes = empty-zip + return {"success": False, "error": f"empty/failed ZIP from SLMM (status {resp.status_code})"} + + try: + res = ingest_nrl_zip(location_id, resp.content, db, source="ftp_cycle", dedupe=True) + return {"success": True, **res} + except IngestError as e: + return {"success": False, "error": str(e)} + # ======================================================================== # Recurring Schedule Generation # ========================================================================