Sound night-report pipeline (v1): automated FTP capture → ingest → morning report #66

Merged
serversdown merged 10 commits from feat/ftp-report-pipeline into dev 2026-06-16 20:17:10 -04:00
Showing only changes of commit 3c5e830f9c - Show all commits
+45 -24
View File
@@ -680,31 +680,52 @@ class SchedulerService:
logger.info(f"[CYCLE] New measurement started, session {new_session.id}")
# Step 6b: Verify the meter actually resumed measuring (fresh DOD).
# Polling is still paused here, so query directly. Advisory: a
# failure alerts loudly but doesn't fail the cycle (DOD reads can
# be transiently flaky); the keepalive poll re-confirms within ~10s.
# Polling is still paused here, so query directly. If it didn't
# resume, retry ONCE with a plain start (start_recording — does NOT
# re-index, unlike start_cycle) before alerting: a meter left
# stopped overnight is the costly failure, and a transient restart
# hiccup is common on the NL-43. We retry only on a *confident*
# not-measuring reading — never on a failed/inconclusive DOD read —
# so a flaky read can't disrupt an already-running measurement.
if action.device_type == "slm":
try:
await asyncio.sleep(2)
live = await self.device_controller.get_live_data(unit_id, action.device_type)
state = ((live or {}).get("measurement_state")
or ((live or {}).get("data") or {}).get("measurement_state") or "")
measuring = str(state).strip().lower() in ("start", "measure", "measuring", "run", "running")
result["steps"]["restart_verified"] = measuring
if measuring:
logger.info(f"[CYCLE] Restart verified — {unit_id} is measuring (state={state}).")
else:
logger.error(f"[CYCLE] Restart NOT verified for {unit_id} — state={state!r}")
try:
get_alert_service(db).create_schedule_failed_alert(
schedule_id=action.id, action_type="cycle", unit_id=unit_id,
error_message=f"Meter did not resume measuring after the cycle (state={state!r}).",
project_id=action.project_id, location_id=action.location_id,
)
except Exception as ae:
logger.warning(f"[CYCLE] restart-verify alert failed: {ae}")
except Exception as e:
logger.warning(f"[CYCLE] Restart verification skipped (DOD read failed): {e}")
async def _check_measuring():
"""Return (measuring, state); measuring is None if the DOD read failed."""
try:
await asyncio.sleep(2)
live = await self.device_controller.get_live_data(unit_id, action.device_type)
state = ((live or {}).get("measurement_state")
or ((live or {}).get("data") or {}).get("measurement_state") or "")
ok = str(state).strip().lower() in ("start", "measure", "measuring", "run", "running")
return ok, state
except Exception as e:
logger.warning(f"[CYCLE] Restart-verify DOD read failed: {e}")
return None, None
measuring, state = await _check_measuring()
if measuring is False:
logger.warning(f"[CYCLE] {unit_id} not measuring after restart (state={state!r}) — retrying start once.")
result["steps"]["restart_retry"] = True
try:
await self.device_controller.start_recording(unit_id, action.device_type)
measuring, state = await _check_measuring()
except Exception as e:
logger.error(f"[CYCLE] Restart retry (start_recording) failed for {unit_id}: {e}")
result["steps"]["restart_verified"] = measuring
if measuring:
logger.info(f"[CYCLE] Restart verified — {unit_id} is measuring (state={state}).")
elif measuring is False:
logger.error(f"[CYCLE] Restart NOT verified for {unit_id} after retry — state={state!r}")
try:
get_alert_service(db).create_schedule_failed_alert(
schedule_id=action.id, action_type="cycle", unit_id=unit_id,
error_message=f"Meter did not resume measuring after the cycle + one retry (state={state!r}).",
project_id=action.project_id, location_id=action.location_id,
)
except Exception as ae:
logger.warning(f"[CYCLE] restart-verify alert failed: {ae}")
else:
logger.warning(f"[CYCLE] Restart verification inconclusive for {unit_id} (DOD read failed); keepalive poll will re-confirm.")
except Exception as e:
logger.error(f"[CYCLE] Start failed: {e}")