diff --git a/backend/models.py b/backend/models.py index 50f552d..407d99c 100644 --- a/backend/models.py +++ b/backend/models.py @@ -363,13 +363,14 @@ class Alert(Base): - device_offline: Device became unreachable - device_online: Device came back online - schedule_failed: Scheduled action failed to execute + - schedule_completed: Scheduled action completed successfully """ __tablename__ = "alerts" id = Column(String, primary_key=True, index=True) # UUID # Alert classification - alert_type = Column(String, nullable=False) # "device_offline" | "device_online" | "schedule_failed" + alert_type = Column(String, nullable=False) # "device_offline" | "device_online" | "schedule_failed" | "schedule_completed" severity = Column(String, default="warning") # "info" | "warning" | "critical" # Related entities (nullable - may not all apply) diff --git a/backend/routers/projects.py b/backend/routers/projects.py index 2ef5f7e..162283f 100644 --- a/backend/routers/projects.py +++ b/backend/routers/projects.py @@ -482,6 +482,83 @@ async def get_project_schedules( }) +@router.post("/{project_id}/schedules/{schedule_id}/execute") +async def execute_scheduled_action( + project_id: str, + schedule_id: str, + db: Session = Depends(get_db), +): + """ + Manually execute a scheduled action now. + """ + from backend.services.scheduler import get_scheduler + + action = db.query(ScheduledAction).filter_by( + id=schedule_id, + project_id=project_id, + ).first() + + if not action: + raise HTTPException(status_code=404, detail="Action not found") + + if action.execution_status != "pending": + raise HTTPException( + status_code=400, + detail=f"Action is not pending (status: {action.execution_status})", + ) + + # Execute via scheduler service + scheduler = get_scheduler() + result = await scheduler.execute_action_by_id(schedule_id) + + # Refresh from DB to get updated status + db.refresh(action) + + return JSONResponse({ + "success": result.get("success", False), + "message": f"Action executed: {action.action_type}", + "result": result, + "action": { + "id": action.id, + "execution_status": action.execution_status, + "executed_at": action.executed_at.isoformat() if action.executed_at else None, + "error_message": action.error_message, + }, + }) + + +@router.post("/{project_id}/schedules/{schedule_id}/cancel") +async def cancel_scheduled_action( + project_id: str, + schedule_id: str, + db: Session = Depends(get_db), +): + """ + Cancel a pending scheduled action. + """ + action = db.query(ScheduledAction).filter_by( + id=schedule_id, + project_id=project_id, + ).first() + + if not action: + raise HTTPException(status_code=404, detail="Action not found") + + if action.execution_status != "pending": + raise HTTPException( + status_code=400, + detail=f"Can only cancel pending actions (status: {action.execution_status})", + ) + + action.execution_status = "cancelled" + db.commit() + + return JSONResponse({ + "success": True, + "message": "Action cancelled successfully", + }) + + @router.get("/{project_id}/sessions", response_class=HTMLResponse) async def get_project_sessions( project_id: str, diff --git a/backend/services/alert_service.py b/backend/services/alert_service.py index e460799..f10ffd1 100644 --- a/backend/services/alert_service.py +++ b/backend/services/alert_service.py @@ -221,6 +221,61 @@ class AlertService: expires_hours=24, ) + def create_schedule_completed_alert( + self, + schedule_id: str, + action_type: str, + unit_id: str = None, + project_id: str = None, + location_id: str = None, + metadata: dict = None, + ) -> Alert: + """ + Create alert when a scheduled action completes successfully. + + Args: + schedule_id: The ScheduledAction ID + action_type: start, stop, download + unit_id: Related unit + project_id: Related project + location_id: Related location + metadata: Additional info (e.g., downloaded folder, index numbers) + + Returns: + Created Alert + """ + # Build descriptive message based on action type and metadata + if action_type == "stop" and metadata: + download_folder = metadata.get("downloaded_folder") + download_success = metadata.get("download_success", False) + if download_success and download_folder: + message = f"Measurement stopped and data downloaded ({download_folder})" + elif download_success is False and metadata.get("download_attempted"): + message = "Measurement stopped but download failed" + else: + message = "Measurement stopped successfully" + elif action_type == "start" and metadata: + new_index = metadata.get("new_index") + if new_index is not None: + message = f"Measurement started (index {new_index:04d})" + else: + message = "Measurement started successfully" + else: + message = f"Scheduled {action_type} completed successfully" + + return self.create_alert( + alert_type="schedule_completed", + title=f"Scheduled {action_type} completed", + message=message, + severity="info", + unit_id=unit_id, + project_id=project_id, + location_id=location_id, + schedule_id=schedule_id, + metadata={"action_type": action_type, **(metadata or {})}, + expires_hours=12, # Info alerts expire quickly + ) + def get_active_alerts( self, project_id: str = None, diff --git a/backend/services/device_controller.py b/backend/services/device_controller.py index 2024ba6..82ae6fb 100644 --- a/backend/services/device_controller.py +++ b/backend/services/device_controller.py @@ -403,6 +403,87 @@ class DeviceController: else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") + # ======================================================================== + # Cycle Commands (for scheduled automation) + # ======================================================================== + + async def start_cycle( + self, + unit_id: str, + device_type: str, + sync_clock: bool = True, + ) -> Dict[str, Any]: + """ + Execute complete start cycle for scheduled automation. + + This handles the full pre-recording workflow: + 1. Sync device clock to server time + 2. Find next safe index (with overwrite protection) + 3. Start measurement + + Args: + unit_id: Unit identifier + device_type: "slm" | "seismograph" + sync_clock: Whether to sync device clock to server time + + Returns: + Response dict from device module + """ + if device_type == "slm": + try: + return await self.slmm_client.start_cycle(unit_id, sync_clock) + except SLMMClientError as e: + raise DeviceControllerError(f"SLMM error: {str(e)}") + + elif device_type == "seismograph": + return { + "status": "not_implemented", + "message": "Seismograph start cycle not yet implemented", + "unit_id": unit_id, + } + + else: + raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") + + async def stop_cycle( + self, + unit_id: str, + device_type: str, + download: bool = True, + ) -> Dict[str, Any]: + """ + Execute complete stop cycle for scheduled automation. + + This handles the full post-recording workflow: + 1. Stop measurement + 2. Enable FTP + 3. Download measurement folder + 4. Verify download + + Args: + unit_id: Unit identifier + device_type: "slm" | "seismograph" + download: Whether to download measurement data + + Returns: + Response dict from device module + """ + if device_type == "slm": + try: + return await self.slmm_client.stop_cycle(unit_id, download) + except SLMMClientError as e: + raise DeviceControllerError(f"SLMM error: {str(e)}") + + elif device_type == "seismograph": + return { + "status": "not_implemented", + "message": "Seismograph stop cycle not yet implemented", + "unit_id": unit_id, + } + + else: + raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") + # ======================================================================== # Health Check # ======================================================================== diff --git a/backend/services/scheduler.py b/backend/services/scheduler.py index d0852bd..866ec64 100644 --- a/backend/services/scheduler.py +++ b/backend/services/scheduler.py @@ -23,6 +23,7 @@ from sqlalchemy import and_ from backend.database import SessionLocal from backend.models import ScheduledAction, RecordingSession, MonitoringLocation, Project, RecurringSchedule from backend.services.device_controller import get_device_controller, DeviceControllerError +from backend.services.alert_service import get_alert_service import uuid logger = logging.getLogger(__name__) @@ -197,6 +198,21 @@ class SchedulerService: print(f"✓ Action {action.id} completed successfully") + # Create success alert + try: + alert_service = get_alert_service(db) + alert_metadata = response.get("cycle_response", {}) if isinstance(response, dict) else {} + alert_service.create_schedule_completed_alert( + schedule_id=action.id, + action_type=action.action_type, + unit_id=unit_id, + project_id=action.project_id, + location_id=action.location_id, + metadata=alert_metadata, + ) + except Exception as alert_err: + logger.warning(f"Failed to create success alert: {alert_err}") + except Exception as e: # Mark action as failed action.execution_status = "failed" @@ -207,6 +223,20 @@ class SchedulerService: print(f"✗ Action {action.id} failed: {e}") + # Create failure alert + try: + alert_service = get_alert_service(db) + alert_service.create_schedule_failed_alert( + schedule_id=action.id, + action_type=action.action_type, + unit_id=unit_id if 'unit_id' in dir() else action.unit_id, + error_message=str(e), + project_id=action.project_id, + location_id=action.location_id, + ) + except Exception as alert_err: + logger.warning(f"Failed to create failure alert: {alert_err}") + return result async def _execute_start( @@ -215,35 +245,19 @@ class SchedulerService: unit_id: str, db: Session, ) -> Dict[str, Any]: - """Execute a 'start' action.""" - # Parse action notes for automation settings - auto_increment_index = False - try: - if action.notes: - notes_data = json.loads(action.notes) - auto_increment_index = notes_data.get("auto_increment_index", False) - except json.JSONDecodeError: - pass # Notes is plain text, not JSON + """Execute a 'start' action using the start_cycle command. - # If auto_increment_index is enabled, increment the store index before starting - increment_response = None - if auto_increment_index and action.device_type == "slm": - try: - logger.info(f"Auto-incrementing store index for unit {unit_id}") - increment_response = await self.device_controller.increment_index( - unit_id, - action.device_type, - ) - logger.info(f"Index incremented: {increment_response}") - except Exception as e: - logger.warning(f"Failed to increment index for {unit_id}: {e}") - # Continue with start anyway - don't fail the whole action - - # Start recording via device controller - response = await self.device_controller.start_recording( + start_cycle handles: + 1. Sync device clock to server time + 2. Find next safe index (with overwrite protection) + 3. Start measurement + """ + # Execute the full start cycle via device controller + # SLMM handles clock sync, index increment, and start + cycle_response = await self.device_controller.start_cycle( unit_id, action.device_type, - config={}, + sync_clock=True, ) # Create recording session @@ -257,8 +271,7 @@ class SchedulerService: status="recording", session_metadata=json.dumps({ "scheduled_action_id": action.id, - "auto_increment_index": auto_increment_index, - "increment_response": increment_response, + "cycle_response": cycle_response, }), ) db.add(session) @@ -266,9 +279,7 @@ class SchedulerService: return { "status": "started", "session_id": session.id, - "device_response": response, - "index_incremented": auto_increment_index, - "increment_response": increment_response, + "cycle_response": cycle_response, } async def _execute_stop( @@ -277,11 +288,29 @@ class SchedulerService: unit_id: str, db: Session, ) -> Dict[str, Any]: - """Execute a 'stop' action.""" - # Stop recording via device controller - response = await self.device_controller.stop_recording( + """Execute a 'stop' action using the stop_cycle command. + + stop_cycle handles: + 1. Stop measurement + 2. Enable FTP + 3. Download measurement folder + 4. Verify download + """ + # Parse notes for download preference + include_download = True + try: + if action.notes: + notes_data = json.loads(action.notes) + include_download = notes_data.get("include_download", True) + except json.JSONDecodeError: + pass # Notes is plain text, not JSON + + # Execute the full stop cycle via device controller + # SLMM handles stop, FTP enable, and download + cycle_response = await self.device_controller.stop_cycle( unit_id, action.device_type, + download=include_download, ) # Find and update the active recording session @@ -299,11 +328,20 @@ class SchedulerService: active_session.duration_seconds = int( (active_session.stopped_at - active_session.started_at).total_seconds() ) + # Store download info in session metadata + if cycle_response.get("download_success"): + try: + metadata = json.loads(active_session.session_metadata or "{}") + metadata["downloaded_folder"] = cycle_response.get("downloaded_folder") + metadata["local_path"] = cycle_response.get("local_path") + active_session.session_metadata = json.dumps(metadata) + except json.JSONDecodeError: + pass return { "status": "stopped", "session_id": active_session.id if active_session else None, - "device_response": response, + "cycle_response": cycle_response, } async def _execute_download( diff --git a/backend/services/slmm_client.py b/backend/services/slmm_client.py index ce3c5d5..a242c12 100644 --- a/backend/services/slmm_client.py +++ b/backend/services/slmm_client.py @@ -9,13 +9,14 @@ that handles TCP/FTP communication with Rion NL-43/NL-53 devices. """ import httpx +import os from typing import Optional, Dict, Any, List from datetime import datetime import json -# SLMM backend base URLs -SLMM_BASE_URL = "http://localhost:8100" +# SLMM backend base URLs - use environment variable if set (for Docker) +SLMM_BASE_URL = os.environ.get("SLMM_BASE_URL", "http://localhost:8100") SLMM_API_BASE = f"{SLMM_BASE_URL}/api/nl43" @@ -505,6 +506,68 @@ class SLMMClient: } return await self._request("POST", f"/{unit_id}/ftp/download", data=data) + # ======================================================================== + # Cycle Commands (for scheduled automation) + # ======================================================================== + + async def start_cycle( + self, + unit_id: str, + sync_clock: bool = True, + ) -> Dict[str, Any]: + """ + Execute complete start cycle on device via SLMM. + + This handles the full pre-recording workflow: + 1. Sync device clock to server time + 2. Find next safe index (with overwrite protection) + 3. Start measurement + + Args: + unit_id: Unit identifier + sync_clock: Whether to sync device clock to server time + + Returns: + Dict with clock_synced, old_index, new_index, started, etc. + """ + return await self._request( + "POST", + f"/{unit_id}/start-cycle", + data={"sync_clock": sync_clock}, + ) + + async def stop_cycle( + self, + unit_id: str, + download: bool = True, + download_path: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Execute complete stop cycle on device via SLMM. + + This handles the full post-recording workflow: + 1. Stop measurement + 2. Enable FTP + 3. Download measurement folder (if download=True) + 4. Verify download + + Args: + unit_id: Unit identifier + download: Whether to download measurement data + download_path: Custom path for downloaded ZIP (optional) + + Returns: + Dict with stopped, ftp_enabled, download_success, local_path, etc. + """ + data = {"download": download} + if download_path: + data["download_path"] = download_path + return await self._request( + "POST", + f"/{unit_id}/stop-cycle", + data=data, + ) + # ======================================================================== # Polling Status (for device monitoring/alerts) # ======================================================================== diff --git a/templates/partials/projects/schedule_list.html b/templates/partials/projects/schedule_list.html index 5054d6e..3ea9f67 100644 --- a/templates/partials/projects/schedule_list.html +++ b/templates/partials/projects/schedule_list.html @@ -71,6 +71,13 @@ {{ item.schedule.result_message }} {% endif %} + + {% if item.schedule.error_message %} +
+ Error: + {{ item.schedule.error_message }} +
+ {% endif %}