fix: improved 24hr cycle via scheduler. Should help prevent issues with DLs.
This commit is contained in:
@@ -199,7 +199,7 @@ class AlertService:
|
||||
|
||||
Args:
|
||||
schedule_id: The ScheduledAction or RecurringSchedule ID
|
||||
action_type: start, stop, download
|
||||
action_type: start, stop, download, cycle
|
||||
unit_id: Related unit
|
||||
error_message: Error from execution
|
||||
project_id: Related project
|
||||
@@ -235,7 +235,7 @@ class AlertService:
|
||||
|
||||
Args:
|
||||
schedule_id: The ScheduledAction ID
|
||||
action_type: start, stop, download
|
||||
action_type: start, stop, download, cycle
|
||||
unit_id: Related unit
|
||||
project_id: Related project
|
||||
location_id: Related location
|
||||
|
||||
@@ -384,73 +384,33 @@ class RecurringScheduleService:
|
||||
if cycle_utc <= now_utc:
|
||||
continue
|
||||
|
||||
# Check if action already exists
|
||||
if self._action_exists(schedule.project_id, schedule.location_id, "stop", cycle_utc):
|
||||
# Check if cycle action already exists
|
||||
if self._action_exists(schedule.project_id, schedule.location_id, "cycle", cycle_utc):
|
||||
continue
|
||||
|
||||
# Build notes with metadata
|
||||
stop_notes = json.dumps({
|
||||
# Build notes with metadata for cycle action
|
||||
cycle_notes = json.dumps({
|
||||
"schedule_name": schedule.name,
|
||||
"schedule_id": schedule.id,
|
||||
"cycle_type": "daily",
|
||||
"include_download": schedule.include_download,
|
||||
"auto_increment_index": schedule.auto_increment_index,
|
||||
})
|
||||
|
||||
# Create STOP action
|
||||
stop_action = ScheduledAction(
|
||||
# Create single CYCLE action that handles stop -> download -> start
|
||||
# The scheduler's _execute_cycle method handles the full workflow with delays
|
||||
cycle_action = ScheduledAction(
|
||||
id=str(uuid.uuid4()),
|
||||
project_id=schedule.project_id,
|
||||
location_id=schedule.location_id,
|
||||
unit_id=unit_id,
|
||||
action_type="stop",
|
||||
action_type="cycle",
|
||||
device_type=schedule.device_type,
|
||||
scheduled_time=cycle_utc,
|
||||
execution_status="pending",
|
||||
notes=stop_notes,
|
||||
notes=cycle_notes,
|
||||
)
|
||||
actions.append(stop_action)
|
||||
|
||||
# Create DOWNLOAD action if enabled (1 minute after stop)
|
||||
if schedule.include_download:
|
||||
download_time = cycle_utc + timedelta(minutes=1)
|
||||
download_notes = json.dumps({
|
||||
"schedule_name": schedule.name,
|
||||
"schedule_id": schedule.id,
|
||||
"cycle_type": "daily",
|
||||
})
|
||||
download_action = ScheduledAction(
|
||||
id=str(uuid.uuid4()),
|
||||
project_id=schedule.project_id,
|
||||
location_id=schedule.location_id,
|
||||
unit_id=unit_id,
|
||||
action_type="download",
|
||||
device_type=schedule.device_type,
|
||||
scheduled_time=download_time,
|
||||
execution_status="pending",
|
||||
notes=download_notes,
|
||||
)
|
||||
actions.append(download_action)
|
||||
|
||||
# Create START action (2 minutes after stop, or 1 minute after download)
|
||||
start_offset = 2 if schedule.include_download else 1
|
||||
start_time = cycle_utc + timedelta(minutes=start_offset)
|
||||
start_notes = json.dumps({
|
||||
"schedule_name": schedule.name,
|
||||
"schedule_id": schedule.id,
|
||||
"cycle_type": "daily",
|
||||
"auto_increment_index": schedule.auto_increment_index,
|
||||
})
|
||||
start_action = ScheduledAction(
|
||||
id=str(uuid.uuid4()),
|
||||
project_id=schedule.project_id,
|
||||
location_id=schedule.location_id,
|
||||
unit_id=unit_id,
|
||||
action_type="start",
|
||||
device_type=schedule.device_type,
|
||||
scheduled_time=start_time,
|
||||
execution_status="pending",
|
||||
notes=start_notes,
|
||||
)
|
||||
actions.append(start_action)
|
||||
actions.append(cycle_action)
|
||||
|
||||
return actions
|
||||
|
||||
|
||||
@@ -185,6 +185,8 @@ class SchedulerService:
|
||||
response = await self._execute_stop(action, unit_id, db)
|
||||
elif action.action_type == "download":
|
||||
response = await self._execute_download(action, unit_id, db)
|
||||
elif action.action_type == "cycle":
|
||||
response = await self._execute_cycle(action, unit_id, db)
|
||||
else:
|
||||
raise Exception(f"Unknown action type: {action.action_type}")
|
||||
|
||||
@@ -375,8 +377,13 @@ class SchedulerService:
|
||||
f"{location.name}/session-{session_timestamp}/"
|
||||
)
|
||||
|
||||
# Step 1: Enable FTP on device
|
||||
logger.info(f"Enabling FTP on {unit_id} for download")
|
||||
# Step 1: Disable FTP first to reset any stale connection state
|
||||
# Then enable FTP on device
|
||||
logger.info(f"Resetting FTP on {unit_id} for download (disable then enable)")
|
||||
try:
|
||||
await self.device_controller.disable_ftp(unit_id, action.device_type)
|
||||
except Exception as e:
|
||||
logger.warning(f"FTP disable failed (may already be off): {e}")
|
||||
await self.device_controller.enable_ftp(unit_id, action.device_type)
|
||||
|
||||
# Step 2: Download current measurement folder
|
||||
@@ -397,6 +404,200 @@ class SchedulerService:
|
||||
"device_response": response,
|
||||
}
|
||||
|
||||
async def _execute_cycle(
|
||||
self,
|
||||
action: ScheduledAction,
|
||||
unit_id: str,
|
||||
db: Session,
|
||||
) -> Dict[str, Any]:
|
||||
"""Execute a full 'cycle' action: stop -> download -> start.
|
||||
|
||||
This combines stop, download, and start into a single action with
|
||||
appropriate delays between steps to ensure device stability.
|
||||
|
||||
Workflow:
|
||||
0. Pause background polling to prevent command conflicts
|
||||
1. Stop measurement (wait 10s)
|
||||
2. Disable FTP to reset state (wait 10s)
|
||||
3. Enable FTP (wait 10s)
|
||||
4. Download current measurement folder
|
||||
5. Wait 30s for device to settle
|
||||
6. Start new measurement cycle
|
||||
7. Re-enable background polling
|
||||
|
||||
Total time: ~70-90 seconds depending on download size
|
||||
"""
|
||||
logger.info(f"[CYCLE] === Starting full cycle for {unit_id} ===")
|
||||
|
||||
result = {
|
||||
"status": "cycle_complete",
|
||||
"steps": {},
|
||||
"old_session_id": None,
|
||||
"new_session_id": None,
|
||||
"polling_paused": False,
|
||||
}
|
||||
|
||||
# Step 0: Pause background polling for this device to prevent command conflicts
|
||||
# NL-43 devices only support one TCP connection at a time
|
||||
logger.info(f"[CYCLE] Step 0: Pausing background polling for {unit_id}")
|
||||
polling_was_enabled = False
|
||||
try:
|
||||
if action.device_type == "slm":
|
||||
# Get current polling state to restore later
|
||||
from backend.services.slmm_client import get_slmm_client
|
||||
slmm = get_slmm_client()
|
||||
try:
|
||||
polling_config = await slmm.get_device_polling_config(unit_id)
|
||||
polling_was_enabled = polling_config.get("poll_enabled", False)
|
||||
except Exception:
|
||||
polling_was_enabled = True # Assume enabled if can't check
|
||||
|
||||
# Disable polling during cycle
|
||||
await slmm.update_device_polling_config(unit_id, poll_enabled=False)
|
||||
result["polling_paused"] = True
|
||||
logger.info(f"[CYCLE] Background polling paused for {unit_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[CYCLE] Failed to pause polling (continuing anyway): {e}")
|
||||
|
||||
try:
|
||||
# Step 1: Stop measurement
|
||||
logger.info(f"[CYCLE] Step 1/7: Stopping measurement on {unit_id}")
|
||||
try:
|
||||
stop_response = await self.device_controller.stop_recording(unit_id, action.device_type)
|
||||
result["steps"]["stop"] = {"success": True, "response": stop_response}
|
||||
logger.info(f"[CYCLE] Measurement stopped, waiting 10s...")
|
||||
except Exception as e:
|
||||
logger.warning(f"[CYCLE] Stop failed (may already be stopped): {e}")
|
||||
result["steps"]["stop"] = {"success": False, "error": str(e)}
|
||||
|
||||
await asyncio.sleep(10)
|
||||
|
||||
# Step 2: Disable FTP to reset any stale state
|
||||
logger.info(f"[CYCLE] Step 2/7: Disabling FTP on {unit_id}")
|
||||
try:
|
||||
await self.device_controller.disable_ftp(unit_id, action.device_type)
|
||||
result["steps"]["ftp_disable"] = {"success": True}
|
||||
logger.info(f"[CYCLE] FTP disabled, waiting 10s...")
|
||||
except Exception as e:
|
||||
logger.warning(f"[CYCLE] FTP disable failed (may already be off): {e}")
|
||||
result["steps"]["ftp_disable"] = {"success": False, "error": str(e)}
|
||||
|
||||
await asyncio.sleep(10)
|
||||
|
||||
# Step 3: Enable FTP
|
||||
logger.info(f"[CYCLE] Step 3/7: Enabling FTP on {unit_id}")
|
||||
try:
|
||||
await self.device_controller.enable_ftp(unit_id, action.device_type)
|
||||
result["steps"]["ftp_enable"] = {"success": True}
|
||||
logger.info(f"[CYCLE] FTP enabled, waiting 10s...")
|
||||
except Exception as e:
|
||||
logger.error(f"[CYCLE] FTP enable failed: {e}")
|
||||
result["steps"]["ftp_enable"] = {"success": False, "error": str(e)}
|
||||
# Continue anyway - download will fail but we can still try to start
|
||||
|
||||
await asyncio.sleep(10)
|
||||
|
||||
# Step 4: Download current measurement folder
|
||||
logger.info(f"[CYCLE] Step 4/7: Downloading measurement data from {unit_id}")
|
||||
location = db.query(MonitoringLocation).filter_by(id=action.location_id).first()
|
||||
project = db.query(Project).filter_by(id=action.project_id).first()
|
||||
|
||||
if location and project:
|
||||
session_timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H%M")
|
||||
location_type_dir = "sound" if action.device_type == "slm" else "vibration"
|
||||
destination_path = (
|
||||
f"data/Projects/{project.id}/{location_type_dir}/"
|
||||
f"{location.name}/session-{session_timestamp}/"
|
||||
)
|
||||
|
||||
try:
|
||||
download_response = await self.device_controller.download_files(
|
||||
unit_id,
|
||||
action.device_type,
|
||||
destination_path,
|
||||
files=None,
|
||||
)
|
||||
result["steps"]["download"] = {"success": True, "response": download_response}
|
||||
logger.info(f"[CYCLE] Download complete")
|
||||
except Exception as e:
|
||||
logger.error(f"[CYCLE] Download failed: {e}")
|
||||
result["steps"]["download"] = {"success": False, "error": str(e)}
|
||||
else:
|
||||
result["steps"]["download"] = {"success": False, "error": "Project or location not found"}
|
||||
|
||||
# Close out the old recording session
|
||||
active_session = db.query(RecordingSession).filter(
|
||||
and_(
|
||||
RecordingSession.location_id == action.location_id,
|
||||
RecordingSession.unit_id == unit_id,
|
||||
RecordingSession.status == "recording",
|
||||
)
|
||||
).first()
|
||||
|
||||
if active_session:
|
||||
active_session.stopped_at = datetime.utcnow()
|
||||
active_session.status = "completed"
|
||||
active_session.duration_seconds = int(
|
||||
(active_session.stopped_at - active_session.started_at).total_seconds()
|
||||
)
|
||||
result["old_session_id"] = active_session.id
|
||||
|
||||
# Step 5: Wait for device to settle before starting new measurement
|
||||
logger.info(f"[CYCLE] Step 5/7: Waiting 30s for device to settle...")
|
||||
await asyncio.sleep(30)
|
||||
|
||||
# Step 6: Start new measurement cycle
|
||||
logger.info(f"[CYCLE] Step 6/7: Starting new measurement on {unit_id}")
|
||||
try:
|
||||
cycle_response = await self.device_controller.start_cycle(
|
||||
unit_id,
|
||||
action.device_type,
|
||||
sync_clock=True,
|
||||
)
|
||||
result["steps"]["start"] = {"success": True, "response": cycle_response}
|
||||
|
||||
# Create new recording session
|
||||
new_session = RecordingSession(
|
||||
id=str(uuid.uuid4()),
|
||||
project_id=action.project_id,
|
||||
location_id=action.location_id,
|
||||
unit_id=unit_id,
|
||||
session_type="sound" if action.device_type == "slm" else "vibration",
|
||||
started_at=datetime.utcnow(),
|
||||
status="recording",
|
||||
session_metadata=json.dumps({
|
||||
"scheduled_action_id": action.id,
|
||||
"cycle_response": cycle_response,
|
||||
"action_type": "cycle",
|
||||
}),
|
||||
)
|
||||
db.add(new_session)
|
||||
result["new_session_id"] = new_session.id
|
||||
|
||||
logger.info(f"[CYCLE] New measurement started, session {new_session.id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[CYCLE] Start failed: {e}")
|
||||
result["steps"]["start"] = {"success": False, "error": str(e)}
|
||||
raise # Re-raise to mark the action as failed
|
||||
|
||||
finally:
|
||||
# Step 7: Re-enable background polling (always runs, even on failure)
|
||||
if result.get("polling_paused") and polling_was_enabled:
|
||||
logger.info(f"[CYCLE] Step 7/7: Re-enabling background polling for {unit_id}")
|
||||
try:
|
||||
if action.device_type == "slm":
|
||||
from backend.services.slmm_client import get_slmm_client
|
||||
slmm = get_slmm_client()
|
||||
await slmm.update_device_polling_config(unit_id, poll_enabled=True)
|
||||
logger.info(f"[CYCLE] Background polling re-enabled for {unit_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"[CYCLE] Failed to re-enable polling: {e}")
|
||||
# Don't raise - cycle completed, just log the error
|
||||
|
||||
logger.info(f"[CYCLE] === Cycle complete for {unit_id} ===")
|
||||
return result
|
||||
|
||||
# ========================================================================
|
||||
# Recurring Schedule Generation
|
||||
# ========================================================================
|
||||
|
||||
@@ -109,7 +109,8 @@ class SLMMClient:
|
||||
f"SLMM operation failed: {error_detail}"
|
||||
)
|
||||
except Exception as e:
|
||||
raise SLMMClientError(f"Unexpected error: {str(e)}")
|
||||
error_msg = str(e) if str(e) else type(e).__name__
|
||||
raise SLMMClientError(f"Unexpected error: {error_msg}")
|
||||
|
||||
# ========================================================================
|
||||
# Unit Management
|
||||
@@ -579,7 +580,13 @@ class SLMMClient:
|
||||
"""
|
||||
# Get current index number from device
|
||||
index_info = await self.get_index_number(unit_id)
|
||||
index_number = index_info.get("index_number", 0)
|
||||
index_number_raw = index_info.get("index_number", 0)
|
||||
|
||||
# Convert to int - device returns string like "0000" or "0001"
|
||||
try:
|
||||
index_number = int(index_number_raw)
|
||||
except (ValueError, TypeError):
|
||||
index_number = 0
|
||||
|
||||
# Format as Auto_XXXX folder name
|
||||
folder_name = f"Auto_{index_number:04d}"
|
||||
|
||||
Reference in New Issue
Block a user