356 lines
11 KiB
Python
356 lines
11 KiB
Python
"""
|
|
Scheduler Service
|
|
|
|
Executes scheduled actions for Projects system.
|
|
Monitors pending scheduled actions and executes them by calling device modules (SLMM/SFM).
|
|
|
|
This service runs as a background task in FastAPI, checking for pending actions
|
|
every minute and executing them when their scheduled time arrives.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List, Dict, Any
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import and_
|
|
|
|
from backend.database import SessionLocal
|
|
from backend.models import ScheduledAction, RecordingSession, MonitoringLocation, Project
|
|
from backend.services.device_controller import get_device_controller, DeviceControllerError
|
|
import uuid
|
|
|
|
|
|
class SchedulerService:
|
|
"""
|
|
Service for executing scheduled actions.
|
|
|
|
Usage:
|
|
scheduler = SchedulerService()
|
|
await scheduler.start() # Start background loop
|
|
scheduler.stop() # Stop background loop
|
|
"""
|
|
|
|
def __init__(self, check_interval: int = 60):
|
|
"""
|
|
Initialize scheduler.
|
|
|
|
Args:
|
|
check_interval: Seconds between checks for pending actions (default: 60)
|
|
"""
|
|
self.check_interval = check_interval
|
|
self.running = False
|
|
self.task: Optional[asyncio.Task] = None
|
|
self.device_controller = get_device_controller()
|
|
|
|
async def start(self):
|
|
"""Start the scheduler background task."""
|
|
if self.running:
|
|
print("Scheduler is already running")
|
|
return
|
|
|
|
self.running = True
|
|
self.task = asyncio.create_task(self._run_loop())
|
|
print(f"Scheduler started (checking every {self.check_interval}s)")
|
|
|
|
def stop(self):
|
|
"""Stop the scheduler background task."""
|
|
self.running = False
|
|
if self.task:
|
|
self.task.cancel()
|
|
print("Scheduler stopped")
|
|
|
|
async def _run_loop(self):
|
|
"""Main scheduler loop."""
|
|
while self.running:
|
|
try:
|
|
await self.execute_pending_actions()
|
|
except Exception as e:
|
|
print(f"Scheduler error: {e}")
|
|
# Continue running even if there's an error
|
|
|
|
await asyncio.sleep(self.check_interval)
|
|
|
|
async def execute_pending_actions(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Find and execute all pending scheduled actions that are due.
|
|
|
|
Returns:
|
|
List of execution results
|
|
"""
|
|
db = SessionLocal()
|
|
results = []
|
|
|
|
try:
|
|
# Find pending actions that are due
|
|
now = datetime.utcnow()
|
|
pending_actions = db.query(ScheduledAction).filter(
|
|
and_(
|
|
ScheduledAction.execution_status == "pending",
|
|
ScheduledAction.scheduled_time <= now,
|
|
)
|
|
).order_by(ScheduledAction.scheduled_time).all()
|
|
|
|
if not pending_actions:
|
|
return []
|
|
|
|
print(f"Found {len(pending_actions)} pending action(s) to execute")
|
|
|
|
for action in pending_actions:
|
|
result = await self._execute_action(action, db)
|
|
results.append(result)
|
|
|
|
db.commit()
|
|
|
|
except Exception as e:
|
|
print(f"Error executing pending actions: {e}")
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
return results
|
|
|
|
async def _execute_action(
|
|
self,
|
|
action: ScheduledAction,
|
|
db: Session,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute a single scheduled action.
|
|
|
|
Args:
|
|
action: ScheduledAction to execute
|
|
db: Database session
|
|
|
|
Returns:
|
|
Execution result dict
|
|
"""
|
|
print(f"Executing action {action.id}: {action.action_type} for unit {action.unit_id}")
|
|
|
|
result = {
|
|
"action_id": action.id,
|
|
"action_type": action.action_type,
|
|
"unit_id": action.unit_id,
|
|
"scheduled_time": action.scheduled_time.isoformat(),
|
|
"success": False,
|
|
"error": None,
|
|
}
|
|
|
|
try:
|
|
# Determine which unit to use
|
|
# If unit_id is specified, use it; otherwise get from location assignment
|
|
unit_id = action.unit_id
|
|
if not unit_id:
|
|
# Get assigned unit from location
|
|
from backend.models import UnitAssignment
|
|
assignment = db.query(UnitAssignment).filter(
|
|
and_(
|
|
UnitAssignment.location_id == action.location_id,
|
|
UnitAssignment.status == "active",
|
|
)
|
|
).first()
|
|
|
|
if not assignment:
|
|
raise Exception(f"No active unit assigned to location {action.location_id}")
|
|
|
|
unit_id = assignment.unit_id
|
|
|
|
# Execute the action based on type
|
|
if action.action_type == "start":
|
|
response = await self._execute_start(action, unit_id, db)
|
|
elif action.action_type == "stop":
|
|
response = await self._execute_stop(action, unit_id, db)
|
|
elif action.action_type == "download":
|
|
response = await self._execute_download(action, unit_id, db)
|
|
else:
|
|
raise Exception(f"Unknown action type: {action.action_type}")
|
|
|
|
# Mark action as completed
|
|
action.execution_status = "completed"
|
|
action.executed_at = datetime.utcnow()
|
|
action.module_response = json.dumps(response)
|
|
|
|
result["success"] = True
|
|
result["response"] = response
|
|
|
|
print(f"✓ Action {action.id} completed successfully")
|
|
|
|
except Exception as e:
|
|
# Mark action as failed
|
|
action.execution_status = "failed"
|
|
action.executed_at = datetime.utcnow()
|
|
action.error_message = str(e)
|
|
|
|
result["error"] = str(e)
|
|
|
|
print(f"✗ Action {action.id} failed: {e}")
|
|
|
|
return result
|
|
|
|
async def _execute_start(
|
|
self,
|
|
action: ScheduledAction,
|
|
unit_id: str,
|
|
db: Session,
|
|
) -> Dict[str, Any]:
|
|
"""Execute a 'start' action."""
|
|
# Start recording via device controller
|
|
response = await self.device_controller.start_recording(
|
|
unit_id,
|
|
action.device_type,
|
|
config={}, # TODO: Load config from action.notes or metadata
|
|
)
|
|
|
|
# Create recording session
|
|
session = RecordingSession(
|
|
id=str(uuid.uuid4()),
|
|
project_id=action.project_id,
|
|
location_id=action.location_id,
|
|
unit_id=unit_id,
|
|
session_type="sound" if action.device_type == "sound_level_meter" else "vibration",
|
|
started_at=datetime.utcnow(),
|
|
status="recording",
|
|
session_metadata=json.dumps({"scheduled_action_id": action.id}),
|
|
)
|
|
db.add(session)
|
|
|
|
return {
|
|
"status": "started",
|
|
"session_id": session.id,
|
|
"device_response": response,
|
|
}
|
|
|
|
async def _execute_stop(
|
|
self,
|
|
action: ScheduledAction,
|
|
unit_id: str,
|
|
db: Session,
|
|
) -> Dict[str, Any]:
|
|
"""Execute a 'stop' action."""
|
|
# Stop recording via device controller
|
|
response = await self.device_controller.stop_recording(
|
|
unit_id,
|
|
action.device_type,
|
|
)
|
|
|
|
# Find and update the active recording session
|
|
active_session = db.query(RecordingSession).filter(
|
|
and_(
|
|
RecordingSession.location_id == action.location_id,
|
|
RecordingSession.unit_id == unit_id,
|
|
RecordingSession.status == "recording",
|
|
)
|
|
).first()
|
|
|
|
if active_session:
|
|
active_session.stopped_at = datetime.utcnow()
|
|
active_session.status = "completed"
|
|
active_session.duration_seconds = int(
|
|
(active_session.stopped_at - active_session.started_at).total_seconds()
|
|
)
|
|
|
|
return {
|
|
"status": "stopped",
|
|
"session_id": active_session.id if active_session else None,
|
|
"device_response": response,
|
|
}
|
|
|
|
async def _execute_download(
|
|
self,
|
|
action: ScheduledAction,
|
|
unit_id: str,
|
|
db: Session,
|
|
) -> Dict[str, Any]:
|
|
"""Execute a 'download' action."""
|
|
# Get project and location info for file path
|
|
location = db.query(MonitoringLocation).filter_by(id=action.location_id).first()
|
|
project = db.query(Project).filter_by(id=action.project_id).first()
|
|
|
|
if not location or not project:
|
|
raise Exception("Project or location not found")
|
|
|
|
# Build destination path
|
|
# Example: data/Projects/{project-id}/sound/{location-name}/session-{timestamp}/
|
|
session_timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H%M")
|
|
location_type_dir = "sound" if action.device_type == "sound_level_meter" else "vibration"
|
|
|
|
destination_path = (
|
|
f"data/Projects/{project.id}/{location_type_dir}/"
|
|
f"{location.name}/session-{session_timestamp}/"
|
|
)
|
|
|
|
# Download files via device controller
|
|
response = await self.device_controller.download_files(
|
|
unit_id,
|
|
action.device_type,
|
|
destination_path,
|
|
files=None, # Download all files
|
|
)
|
|
|
|
# TODO: Create DataFile records for downloaded files
|
|
|
|
return {
|
|
"status": "downloaded",
|
|
"destination_path": destination_path,
|
|
"device_response": response,
|
|
}
|
|
|
|
# ========================================================================
|
|
# Manual Execution (for testing/debugging)
|
|
# ========================================================================
|
|
|
|
async def execute_action_by_id(self, action_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Manually execute a specific action by ID.
|
|
|
|
Args:
|
|
action_id: ScheduledAction ID
|
|
|
|
Returns:
|
|
Execution result
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
action = db.query(ScheduledAction).filter_by(id=action_id).first()
|
|
if not action:
|
|
return {"success": False, "error": "Action not found"}
|
|
|
|
result = await self._execute_action(action, db)
|
|
db.commit()
|
|
return result
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"success": False, "error": str(e)}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Singleton instance
|
|
_scheduler_instance: Optional[SchedulerService] = None
|
|
|
|
|
|
def get_scheduler() -> SchedulerService:
|
|
"""
|
|
Get the scheduler singleton instance.
|
|
|
|
Returns:
|
|
SchedulerService instance
|
|
"""
|
|
global _scheduler_instance
|
|
if _scheduler_instance is None:
|
|
_scheduler_instance = SchedulerService()
|
|
return _scheduler_instance
|
|
|
|
|
|
async def start_scheduler():
|
|
"""Start the global scheduler instance."""
|
|
scheduler = get_scheduler()
|
|
await scheduler.start()
|
|
|
|
|
|
def stop_scheduler():
|
|
"""Stop the global scheduler instance."""
|
|
scheduler = get_scheduler()
|
|
scheduler.stop()
|