feat(reports): FTP night-report pipeline foundation #62
@@ -636,6 +636,35 @@ class SchedulerService:
|
||||
)
|
||||
result["old_session_id"] = active_session.id
|
||||
|
||||
# Step 4b: Ingest the just-finished Auto_#### folder into Terra-View
|
||||
# (clean session + DataFiles via ingest_nrl_zip — filters Lp, parses the
|
||||
# .rnh, dedups). This is what gives the nightly report its data.
|
||||
if action.device_type == "slm" and result["steps"].get("download", {}).get("success"):
|
||||
idx = None
|
||||
try:
|
||||
idx = int((result["steps"]["download"].get("response") or {}).get("index_number"))
|
||||
except (ValueError, TypeError):
|
||||
idx = None
|
||||
if idx is None:
|
||||
result["steps"]["ingest"] = {"success": False, "error": "no index_number from download"}
|
||||
else:
|
||||
folder_name = f"Auto_{idx:04d}"
|
||||
try:
|
||||
ing = await self._ingest_cycle_folder(db, action.location_id, unit_id, folder_name)
|
||||
result["steps"]["ingest"] = ing
|
||||
db.commit()
|
||||
# The just-closed "recording" session was only a marker — if the
|
||||
# ingest created the real data session, drop the empty placeholder.
|
||||
if ing.get("success") and active_session:
|
||||
from backend.models import DataFile
|
||||
if db.query(DataFile).filter_by(session_id=active_session.id).count() == 0:
|
||||
db.delete(active_session)
|
||||
db.commit()
|
||||
logger.info(f"[CYCLE] Ingested {folder_name}: {ing}")
|
||||
except Exception as e:
|
||||
logger.error(f"[CYCLE] Ingest failed for {folder_name}: {e}", exc_info=True)
|
||||
result["steps"]["ingest"] = {"success": False, "error": str(e)}
|
||||
|
||||
# Step 5: Wait for device to settle before starting new measurement
|
||||
logger.info(f"[CYCLE] Step 5/7: Waiting 30s for device to settle...")
|
||||
await asyncio.sleep(30)
|
||||
@@ -670,6 +699,33 @@ class SchedulerService:
|
||||
|
||||
logger.info(f"[CYCLE] New measurement started, session {new_session.id}")
|
||||
|
||||
# Step 6b: Verify the meter actually resumed measuring (fresh DOD).
|
||||
# Polling is still paused here, so query directly. Advisory: a
|
||||
# failure alerts loudly but doesn't fail the cycle (DOD reads can
|
||||
# be transiently flaky); the keepalive poll re-confirms within ~10s.
|
||||
if action.device_type == "slm":
|
||||
try:
|
||||
await asyncio.sleep(2)
|
||||
live = await self.device_controller.get_live_data(unit_id, action.device_type)
|
||||
state = ((live or {}).get("measurement_state")
|
||||
or ((live or {}).get("data") or {}).get("measurement_state") or "")
|
||||
measuring = str(state).strip().lower() in ("start", "measure", "measuring", "run", "running")
|
||||
result["steps"]["restart_verified"] = measuring
|
||||
if measuring:
|
||||
logger.info(f"[CYCLE] Restart verified — {unit_id} is measuring (state={state}).")
|
||||
else:
|
||||
logger.error(f"[CYCLE] Restart NOT verified for {unit_id} — state={state!r}")
|
||||
try:
|
||||
get_alert_service(db).create_schedule_failed_alert(
|
||||
schedule_id=action.id, action_type="cycle", unit_id=unit_id,
|
||||
error_message=f"Meter did not resume measuring after the cycle (state={state!r}).",
|
||||
project_id=action.project_id, location_id=action.location_id,
|
||||
)
|
||||
except Exception as ae:
|
||||
logger.warning(f"[CYCLE] restart-verify alert failed: {ae}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[CYCLE] Restart verification skipped (DOD read failed): {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[CYCLE] Start failed: {e}")
|
||||
result["steps"]["start"] = {"success": False, "error": str(e)}
|
||||
@@ -692,6 +748,37 @@ class SchedulerService:
|
||||
logger.info(f"[CYCLE] === Cycle complete for {unit_id} ===")
|
||||
return result
|
||||
|
||||
async def _ingest_cycle_folder(self, db, location_id: str, unit_id: str, folder_name: str) -> dict:
|
||||
"""Fetch a just-finished Auto_#### folder from SLMM (FTP proxy) and ingest
|
||||
it into Terra-View (clean MonitoringSession + DataFiles via ingest_nrl_zip).
|
||||
|
||||
Returns the ingest result dict, or {"success": False, "error": ...}.
|
||||
Used by _execute_cycle Step 4b.
|
||||
"""
|
||||
import os
|
||||
import httpx
|
||||
from backend.routers.project_locations import ingest_nrl_zip, IngestError
|
||||
|
||||
slmm_base = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
|
||||
remote_path = f"/NL-43/{folder_name}"
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=600.0) as client:
|
||||
resp = await client.post(
|
||||
f"{slmm_base}/api/nl43/{unit_id}/ftp/download-folder",
|
||||
json={"remote_path": remote_path},
|
||||
)
|
||||
except Exception as e:
|
||||
return {"success": False, "error": f"download-folder request failed: {e}"}
|
||||
|
||||
if not resp.is_success or len(resp.content) <= 22: # 22 bytes = empty-zip
|
||||
return {"success": False, "error": f"empty/failed ZIP from SLMM (status {resp.status_code})"}
|
||||
|
||||
try:
|
||||
res = ingest_nrl_zip(location_id, resp.content, db, source="ftp_cycle", dedupe=True)
|
||||
return {"success": True, **res}
|
||||
except IngestError as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# ========================================================================
|
||||
# Recurring Schedule Generation
|
||||
# ========================================================================
|
||||
|
||||
Reference in New Issue
Block a user