Move SLM control center groundwork onto dev
This commit is contained in:
355
backend/services/scheduler.py
Normal file
355
backend/services/scheduler.py
Normal file
@@ -0,0 +1,355 @@
|
||||
"""
|
||||
Scheduler Service
|
||||
|
||||
Executes scheduled actions for Projects system.
|
||||
Monitors pending scheduled actions and executes them by calling device modules (SLMM/SFM).
|
||||
|
||||
This service runs as a background task in FastAPI, checking for pending actions
|
||||
every minute and executing them when their scheduled time arrives.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, List, Dict, Any
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import and_
|
||||
|
||||
from backend.database import SessionLocal
|
||||
from backend.models import ScheduledAction, RecordingSession, MonitoringLocation, Project
|
||||
from backend.services.device_controller import get_device_controller, DeviceControllerError
|
||||
import uuid
|
||||
|
||||
|
||||
class SchedulerService:
|
||||
"""
|
||||
Service for executing scheduled actions.
|
||||
|
||||
Usage:
|
||||
scheduler = SchedulerService()
|
||||
await scheduler.start() # Start background loop
|
||||
scheduler.stop() # Stop background loop
|
||||
"""
|
||||
|
||||
def __init__(self, check_interval: int = 60):
|
||||
"""
|
||||
Initialize scheduler.
|
||||
|
||||
Args:
|
||||
check_interval: Seconds between checks for pending actions (default: 60)
|
||||
"""
|
||||
self.check_interval = check_interval
|
||||
self.running = False
|
||||
self.task: Optional[asyncio.Task] = None
|
||||
self.device_controller = get_device_controller()
|
||||
|
||||
async def start(self):
|
||||
"""Start the scheduler background task."""
|
||||
if self.running:
|
||||
print("Scheduler is already running")
|
||||
return
|
||||
|
||||
self.running = True
|
||||
self.task = asyncio.create_task(self._run_loop())
|
||||
print(f"Scheduler started (checking every {self.check_interval}s)")
|
||||
|
||||
def stop(self):
|
||||
"""Stop the scheduler background task."""
|
||||
self.running = False
|
||||
if self.task:
|
||||
self.task.cancel()
|
||||
print("Scheduler stopped")
|
||||
|
||||
async def _run_loop(self):
|
||||
"""Main scheduler loop."""
|
||||
while self.running:
|
||||
try:
|
||||
await self.execute_pending_actions()
|
||||
except Exception as e:
|
||||
print(f"Scheduler error: {e}")
|
||||
# Continue running even if there's an error
|
||||
|
||||
await asyncio.sleep(self.check_interval)
|
||||
|
||||
async def execute_pending_actions(self) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Find and execute all pending scheduled actions that are due.
|
||||
|
||||
Returns:
|
||||
List of execution results
|
||||
"""
|
||||
db = SessionLocal()
|
||||
results = []
|
||||
|
||||
try:
|
||||
# Find pending actions that are due
|
||||
now = datetime.utcnow()
|
||||
pending_actions = db.query(ScheduledAction).filter(
|
||||
and_(
|
||||
ScheduledAction.execution_status == "pending",
|
||||
ScheduledAction.scheduled_time <= now,
|
||||
)
|
||||
).order_by(ScheduledAction.scheduled_time).all()
|
||||
|
||||
if not pending_actions:
|
||||
return []
|
||||
|
||||
print(f"Found {len(pending_actions)} pending action(s) to execute")
|
||||
|
||||
for action in pending_actions:
|
||||
result = await self._execute_action(action, db)
|
||||
results.append(result)
|
||||
|
||||
db.commit()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error executing pending actions: {e}")
|
||||
db.rollback()
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
return results
|
||||
|
||||
async def _execute_action(
|
||||
self,
|
||||
action: ScheduledAction,
|
||||
db: Session,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Execute a single scheduled action.
|
||||
|
||||
Args:
|
||||
action: ScheduledAction to execute
|
||||
db: Database session
|
||||
|
||||
Returns:
|
||||
Execution result dict
|
||||
"""
|
||||
print(f"Executing action {action.id}: {action.action_type} for unit {action.unit_id}")
|
||||
|
||||
result = {
|
||||
"action_id": action.id,
|
||||
"action_type": action.action_type,
|
||||
"unit_id": action.unit_id,
|
||||
"scheduled_time": action.scheduled_time.isoformat(),
|
||||
"success": False,
|
||||
"error": None,
|
||||
}
|
||||
|
||||
try:
|
||||
# Determine which unit to use
|
||||
# If unit_id is specified, use it; otherwise get from location assignment
|
||||
unit_id = action.unit_id
|
||||
if not unit_id:
|
||||
# Get assigned unit from location
|
||||
from backend.models import UnitAssignment
|
||||
assignment = db.query(UnitAssignment).filter(
|
||||
and_(
|
||||
UnitAssignment.location_id == action.location_id,
|
||||
UnitAssignment.status == "active",
|
||||
)
|
||||
).first()
|
||||
|
||||
if not assignment:
|
||||
raise Exception(f"No active unit assigned to location {action.location_id}")
|
||||
|
||||
unit_id = assignment.unit_id
|
||||
|
||||
# Execute the action based on type
|
||||
if action.action_type == "start":
|
||||
response = await self._execute_start(action, unit_id, db)
|
||||
elif action.action_type == "stop":
|
||||
response = await self._execute_stop(action, unit_id, db)
|
||||
elif action.action_type == "download":
|
||||
response = await self._execute_download(action, unit_id, db)
|
||||
else:
|
||||
raise Exception(f"Unknown action type: {action.action_type}")
|
||||
|
||||
# Mark action as completed
|
||||
action.execution_status = "completed"
|
||||
action.executed_at = datetime.utcnow()
|
||||
action.module_response = json.dumps(response)
|
||||
|
||||
result["success"] = True
|
||||
result["response"] = response
|
||||
|
||||
print(f"✓ Action {action.id} completed successfully")
|
||||
|
||||
except Exception as e:
|
||||
# Mark action as failed
|
||||
action.execution_status = "failed"
|
||||
action.executed_at = datetime.utcnow()
|
||||
action.error_message = str(e)
|
||||
|
||||
result["error"] = str(e)
|
||||
|
||||
print(f"✗ Action {action.id} failed: {e}")
|
||||
|
||||
return result
|
||||
|
||||
async def _execute_start(
|
||||
self,
|
||||
action: ScheduledAction,
|
||||
unit_id: str,
|
||||
db: Session,
|
||||
) -> Dict[str, Any]:
|
||||
"""Execute a 'start' action."""
|
||||
# Start recording via device controller
|
||||
response = await self.device_controller.start_recording(
|
||||
unit_id,
|
||||
action.device_type,
|
||||
config={}, # TODO: Load config from action.notes or metadata
|
||||
)
|
||||
|
||||
# Create recording session
|
||||
session = RecordingSession(
|
||||
id=str(uuid.uuid4()),
|
||||
project_id=action.project_id,
|
||||
location_id=action.location_id,
|
||||
unit_id=unit_id,
|
||||
session_type="sound" if action.device_type == "sound_level_meter" else "vibration",
|
||||
started_at=datetime.utcnow(),
|
||||
status="recording",
|
||||
session_metadata=json.dumps({"scheduled_action_id": action.id}),
|
||||
)
|
||||
db.add(session)
|
||||
|
||||
return {
|
||||
"status": "started",
|
||||
"session_id": session.id,
|
||||
"device_response": response,
|
||||
}
|
||||
|
||||
async def _execute_stop(
|
||||
self,
|
||||
action: ScheduledAction,
|
||||
unit_id: str,
|
||||
db: Session,
|
||||
) -> Dict[str, Any]:
|
||||
"""Execute a 'stop' action."""
|
||||
# Stop recording via device controller
|
||||
response = await self.device_controller.stop_recording(
|
||||
unit_id,
|
||||
action.device_type,
|
||||
)
|
||||
|
||||
# Find and update the active recording session
|
||||
active_session = db.query(RecordingSession).filter(
|
||||
and_(
|
||||
RecordingSession.location_id == action.location_id,
|
||||
RecordingSession.unit_id == unit_id,
|
||||
RecordingSession.status == "recording",
|
||||
)
|
||||
).first()
|
||||
|
||||
if active_session:
|
||||
active_session.stopped_at = datetime.utcnow()
|
||||
active_session.status = "completed"
|
||||
active_session.duration_seconds = int(
|
||||
(active_session.stopped_at - active_session.started_at).total_seconds()
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "stopped",
|
||||
"session_id": active_session.id if active_session else None,
|
||||
"device_response": response,
|
||||
}
|
||||
|
||||
async def _execute_download(
|
||||
self,
|
||||
action: ScheduledAction,
|
||||
unit_id: str,
|
||||
db: Session,
|
||||
) -> Dict[str, Any]:
|
||||
"""Execute a 'download' action."""
|
||||
# Get project and location info for file path
|
||||
location = db.query(MonitoringLocation).filter_by(id=action.location_id).first()
|
||||
project = db.query(Project).filter_by(id=action.project_id).first()
|
||||
|
||||
if not location or not project:
|
||||
raise Exception("Project or location not found")
|
||||
|
||||
# Build destination path
|
||||
# Example: data/Projects/{project-id}/sound/{location-name}/session-{timestamp}/
|
||||
session_timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H%M")
|
||||
location_type_dir = "sound" if action.device_type == "sound_level_meter" else "vibration"
|
||||
|
||||
destination_path = (
|
||||
f"data/Projects/{project.id}/{location_type_dir}/"
|
||||
f"{location.name}/session-{session_timestamp}/"
|
||||
)
|
||||
|
||||
# Download files via device controller
|
||||
response = await self.device_controller.download_files(
|
||||
unit_id,
|
||||
action.device_type,
|
||||
destination_path,
|
||||
files=None, # Download all files
|
||||
)
|
||||
|
||||
# TODO: Create DataFile records for downloaded files
|
||||
|
||||
return {
|
||||
"status": "downloaded",
|
||||
"destination_path": destination_path,
|
||||
"device_response": response,
|
||||
}
|
||||
|
||||
# ========================================================================
|
||||
# Manual Execution (for testing/debugging)
|
||||
# ========================================================================
|
||||
|
||||
async def execute_action_by_id(self, action_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Manually execute a specific action by ID.
|
||||
|
||||
Args:
|
||||
action_id: ScheduledAction ID
|
||||
|
||||
Returns:
|
||||
Execution result
|
||||
"""
|
||||
db = SessionLocal()
|
||||
try:
|
||||
action = db.query(ScheduledAction).filter_by(id=action_id).first()
|
||||
if not action:
|
||||
return {"success": False, "error": "Action not found"}
|
||||
|
||||
result = await self._execute_action(action, db)
|
||||
db.commit()
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
return {"success": False, "error": str(e)}
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
# Singleton instance
|
||||
_scheduler_instance: Optional[SchedulerService] = None
|
||||
|
||||
|
||||
def get_scheduler() -> SchedulerService:
|
||||
"""
|
||||
Get the scheduler singleton instance.
|
||||
|
||||
Returns:
|
||||
SchedulerService instance
|
||||
"""
|
||||
global _scheduler_instance
|
||||
if _scheduler_instance is None:
|
||||
_scheduler_instance = SchedulerService()
|
||||
return _scheduler_instance
|
||||
|
||||
|
||||
async def start_scheduler():
|
||||
"""Start the global scheduler instance."""
|
||||
scheduler = get_scheduler()
|
||||
await scheduler.start()
|
||||
|
||||
|
||||
def stop_scheduler():
|
||||
"""Stop the global scheduler instance."""
|
||||
scheduler = get_scheduler()
|
||||
scheduler.stop()
|
||||
Reference in New Issue
Block a user