Feat/Fix: Scheduler actions more strictly defined. Commands now working.

This commit is contained in:
serversdwn
2026-01-22 20:25:19 +00:00
parent 65ea0920db
commit c771a86675
7 changed files with 360 additions and 38 deletions

View File

@@ -363,13 +363,14 @@ class Alert(Base):
- device_offline: Device became unreachable
- device_online: Device came back online
- schedule_failed: Scheduled action failed to execute
- schedule_completed: Scheduled action completed successfully
"""
__tablename__ = "alerts"
id = Column(String, primary_key=True, index=True) # UUID
# Alert classification
alert_type = Column(String, nullable=False) # "device_offline" | "device_online" | "schedule_failed"
alert_type = Column(String, nullable=False) # "device_offline" | "device_online" | "schedule_failed" | "schedule_completed"
severity = Column(String, default="warning") # "info" | "warning" | "critical"
# Related entities (nullable - may not all apply)

View File

@@ -482,6 +482,83 @@ async def get_project_schedules(
})
@router.post("/{project_id}/schedules/{schedule_id}/execute")
async def execute_scheduled_action(
project_id: str,
schedule_id: str,
db: Session = Depends(get_db),
):
"""
Manually execute a scheduled action now.
"""
from backend.services.scheduler import get_scheduler
action = db.query(ScheduledAction).filter_by(
id=schedule_id,
project_id=project_id,
).first()
if not action:
raise HTTPException(status_code=404, detail="Action not found")
if action.execution_status != "pending":
raise HTTPException(
status_code=400,
detail=f"Action is not pending (status: {action.execution_status})",
)
# Execute via scheduler service
scheduler = get_scheduler()
result = await scheduler.execute_action_by_id(schedule_id)
# Refresh from DB to get updated status
db.refresh(action)
return JSONResponse({
"success": result.get("success", False),
"message": f"Action executed: {action.action_type}",
"result": result,
"action": {
"id": action.id,
"execution_status": action.execution_status,
"executed_at": action.executed_at.isoformat() if action.executed_at else None,
"error_message": action.error_message,
},
})
@router.post("/{project_id}/schedules/{schedule_id}/cancel")
async def cancel_scheduled_action(
project_id: str,
schedule_id: str,
db: Session = Depends(get_db),
):
"""
Cancel a pending scheduled action.
"""
action = db.query(ScheduledAction).filter_by(
id=schedule_id,
project_id=project_id,
).first()
if not action:
raise HTTPException(status_code=404, detail="Action not found")
if action.execution_status != "pending":
raise HTTPException(
status_code=400,
detail=f"Can only cancel pending actions (status: {action.execution_status})",
)
action.execution_status = "cancelled"
db.commit()
return JSONResponse({
"success": True,
"message": "Action cancelled successfully",
})
@router.get("/{project_id}/sessions", response_class=HTMLResponse)
async def get_project_sessions(
project_id: str,

View File

@@ -221,6 +221,61 @@ class AlertService:
expires_hours=24,
)
def create_schedule_completed_alert(
self,
schedule_id: str,
action_type: str,
unit_id: str = None,
project_id: str = None,
location_id: str = None,
metadata: dict = None,
) -> Alert:
"""
Create alert when a scheduled action completes successfully.
Args:
schedule_id: The ScheduledAction ID
action_type: start, stop, download
unit_id: Related unit
project_id: Related project
location_id: Related location
metadata: Additional info (e.g., downloaded folder, index numbers)
Returns:
Created Alert
"""
# Build descriptive message based on action type and metadata
if action_type == "stop" and metadata:
download_folder = metadata.get("downloaded_folder")
download_success = metadata.get("download_success", False)
if download_success and download_folder:
message = f"Measurement stopped and data downloaded ({download_folder})"
elif download_success is False and metadata.get("download_attempted"):
message = "Measurement stopped but download failed"
else:
message = "Measurement stopped successfully"
elif action_type == "start" and metadata:
new_index = metadata.get("new_index")
if new_index is not None:
message = f"Measurement started (index {new_index:04d})"
else:
message = "Measurement started successfully"
else:
message = f"Scheduled {action_type} completed successfully"
return self.create_alert(
alert_type="schedule_completed",
title=f"Scheduled {action_type} completed",
message=message,
severity="info",
unit_id=unit_id,
project_id=project_id,
location_id=location_id,
schedule_id=schedule_id,
metadata={"action_type": action_type, **(metadata or {})},
expires_hours=12, # Info alerts expire quickly
)
def get_active_alerts(
self,
project_id: str = None,

View File

@@ -403,6 +403,87 @@ class DeviceController:
else:
raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}")
# ========================================================================
# Cycle Commands (for scheduled automation)
# ========================================================================
async def start_cycle(
self,
unit_id: str,
device_type: str,
sync_clock: bool = True,
) -> Dict[str, Any]:
"""
Execute complete start cycle for scheduled automation.
This handles the full pre-recording workflow:
1. Sync device clock to server time
2. Find next safe index (with overwrite protection)
3. Start measurement
Args:
unit_id: Unit identifier
device_type: "slm" | "seismograph"
sync_clock: Whether to sync device clock to server time
Returns:
Response dict from device module
"""
if device_type == "slm":
try:
return await self.slmm_client.start_cycle(unit_id, sync_clock)
except SLMMClientError as e:
raise DeviceControllerError(f"SLMM error: {str(e)}")
elif device_type == "seismograph":
return {
"status": "not_implemented",
"message": "Seismograph start cycle not yet implemented",
"unit_id": unit_id,
}
else:
raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}")
async def stop_cycle(
self,
unit_id: str,
device_type: str,
download: bool = True,
) -> Dict[str, Any]:
"""
Execute complete stop cycle for scheduled automation.
This handles the full post-recording workflow:
1. Stop measurement
2. Enable FTP
3. Download measurement folder
4. Verify download
Args:
unit_id: Unit identifier
device_type: "slm" | "seismograph"
download: Whether to download measurement data
Returns:
Response dict from device module
"""
if device_type == "slm":
try:
return await self.slmm_client.stop_cycle(unit_id, download)
except SLMMClientError as e:
raise DeviceControllerError(f"SLMM error: {str(e)}")
elif device_type == "seismograph":
return {
"status": "not_implemented",
"message": "Seismograph stop cycle not yet implemented",
"unit_id": unit_id,
}
else:
raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}")
# ========================================================================
# Health Check
# ========================================================================

View File

@@ -23,6 +23,7 @@ from sqlalchemy import and_
from backend.database import SessionLocal
from backend.models import ScheduledAction, RecordingSession, MonitoringLocation, Project, RecurringSchedule
from backend.services.device_controller import get_device_controller, DeviceControllerError
from backend.services.alert_service import get_alert_service
import uuid
logger = logging.getLogger(__name__)
@@ -197,6 +198,21 @@ class SchedulerService:
print(f"✓ Action {action.id} completed successfully")
# Create success alert
try:
alert_service = get_alert_service(db)
alert_metadata = response.get("cycle_response", {}) if isinstance(response, dict) else {}
alert_service.create_schedule_completed_alert(
schedule_id=action.id,
action_type=action.action_type,
unit_id=unit_id,
project_id=action.project_id,
location_id=action.location_id,
metadata=alert_metadata,
)
except Exception as alert_err:
logger.warning(f"Failed to create success alert: {alert_err}")
except Exception as e:
# Mark action as failed
action.execution_status = "failed"
@@ -207,6 +223,20 @@ class SchedulerService:
print(f"✗ Action {action.id} failed: {e}")
# Create failure alert
try:
alert_service = get_alert_service(db)
alert_service.create_schedule_failed_alert(
schedule_id=action.id,
action_type=action.action_type,
unit_id=unit_id if 'unit_id' in dir() else action.unit_id,
error_message=str(e),
project_id=action.project_id,
location_id=action.location_id,
)
except Exception as alert_err:
logger.warning(f"Failed to create failure alert: {alert_err}")
return result
async def _execute_start(
@@ -215,35 +245,19 @@ class SchedulerService:
unit_id: str,
db: Session,
) -> Dict[str, Any]:
"""Execute a 'start' action."""
# Parse action notes for automation settings
auto_increment_index = False
try:
if action.notes:
notes_data = json.loads(action.notes)
auto_increment_index = notes_data.get("auto_increment_index", False)
except json.JSONDecodeError:
pass # Notes is plain text, not JSON
"""Execute a 'start' action using the start_cycle command.
# If auto_increment_index is enabled, increment the store index before starting
increment_response = None
if auto_increment_index and action.device_type == "slm":
try:
logger.info(f"Auto-incrementing store index for unit {unit_id}")
increment_response = await self.device_controller.increment_index(
unit_id,
action.device_type,
)
logger.info(f"Index incremented: {increment_response}")
except Exception as e:
logger.warning(f"Failed to increment index for {unit_id}: {e}")
# Continue with start anyway - don't fail the whole action
# Start recording via device controller
response = await self.device_controller.start_recording(
start_cycle handles:
1. Sync device clock to server time
2. Find next safe index (with overwrite protection)
3. Start measurement
"""
# Execute the full start cycle via device controller
# SLMM handles clock sync, index increment, and start
cycle_response = await self.device_controller.start_cycle(
unit_id,
action.device_type,
config={},
sync_clock=True,
)
# Create recording session
@@ -257,8 +271,7 @@ class SchedulerService:
status="recording",
session_metadata=json.dumps({
"scheduled_action_id": action.id,
"auto_increment_index": auto_increment_index,
"increment_response": increment_response,
"cycle_response": cycle_response,
}),
)
db.add(session)
@@ -266,9 +279,7 @@ class SchedulerService:
return {
"status": "started",
"session_id": session.id,
"device_response": response,
"index_incremented": auto_increment_index,
"increment_response": increment_response,
"cycle_response": cycle_response,
}
async def _execute_stop(
@@ -277,11 +288,29 @@ class SchedulerService:
unit_id: str,
db: Session,
) -> Dict[str, Any]:
"""Execute a 'stop' action."""
# Stop recording via device controller
response = await self.device_controller.stop_recording(
"""Execute a 'stop' action using the stop_cycle command.
stop_cycle handles:
1. Stop measurement
2. Enable FTP
3. Download measurement folder
4. Verify download
"""
# Parse notes for download preference
include_download = True
try:
if action.notes:
notes_data = json.loads(action.notes)
include_download = notes_data.get("include_download", True)
except json.JSONDecodeError:
pass # Notes is plain text, not JSON
# Execute the full stop cycle via device controller
# SLMM handles stop, FTP enable, and download
cycle_response = await self.device_controller.stop_cycle(
unit_id,
action.device_type,
download=include_download,
)
# Find and update the active recording session
@@ -299,11 +328,20 @@ class SchedulerService:
active_session.duration_seconds = int(
(active_session.stopped_at - active_session.started_at).total_seconds()
)
# Store download info in session metadata
if cycle_response.get("download_success"):
try:
metadata = json.loads(active_session.session_metadata or "{}")
metadata["downloaded_folder"] = cycle_response.get("downloaded_folder")
metadata["local_path"] = cycle_response.get("local_path")
active_session.session_metadata = json.dumps(metadata)
except json.JSONDecodeError:
pass
return {
"status": "stopped",
"session_id": active_session.id if active_session else None,
"device_response": response,
"cycle_response": cycle_response,
}
async def _execute_download(

View File

@@ -9,13 +9,14 @@ that handles TCP/FTP communication with Rion NL-43/NL-53 devices.
"""
import httpx
import os
from typing import Optional, Dict, Any, List
from datetime import datetime
import json
# SLMM backend base URLs
SLMM_BASE_URL = "http://localhost:8100"
# SLMM backend base URLs - use environment variable if set (for Docker)
SLMM_BASE_URL = os.environ.get("SLMM_BASE_URL", "http://localhost:8100")
SLMM_API_BASE = f"{SLMM_BASE_URL}/api/nl43"
@@ -505,6 +506,68 @@ class SLMMClient:
}
return await self._request("POST", f"/{unit_id}/ftp/download", data=data)
# ========================================================================
# Cycle Commands (for scheduled automation)
# ========================================================================
async def start_cycle(
self,
unit_id: str,
sync_clock: bool = True,
) -> Dict[str, Any]:
"""
Execute complete start cycle on device via SLMM.
This handles the full pre-recording workflow:
1. Sync device clock to server time
2. Find next safe index (with overwrite protection)
3. Start measurement
Args:
unit_id: Unit identifier
sync_clock: Whether to sync device clock to server time
Returns:
Dict with clock_synced, old_index, new_index, started, etc.
"""
return await self._request(
"POST",
f"/{unit_id}/start-cycle",
data={"sync_clock": sync_clock},
)
async def stop_cycle(
self,
unit_id: str,
download: bool = True,
download_path: Optional[str] = None,
) -> Dict[str, Any]:
"""
Execute complete stop cycle on device via SLMM.
This handles the full post-recording workflow:
1. Stop measurement
2. Enable FTP
3. Download measurement folder (if download=True)
4. Verify download
Args:
unit_id: Unit identifier
download: Whether to download measurement data
download_path: Custom path for downloaded ZIP (optional)
Returns:
Dict with stopped, ftp_enabled, download_success, local_path, etc.
"""
data = {"download": download}
if download_path:
data["download_path"] = download_path
return await self._request(
"POST",
f"/{unit_id}/stop-cycle",
data=data,
)
# ========================================================================
# Polling Status (for device monitoring/alerts)
# ========================================================================