""" Scheduler Service Executes scheduled actions for Projects system. Monitors pending scheduled actions and executes them by calling device modules (SLMM/SFM). This service runs as a background task in FastAPI, checking for pending actions every minute and executing them when their scheduled time arrives. """ import asyncio import json from datetime import datetime, timedelta from typing import Optional, List, Dict, Any from sqlalchemy.orm import Session from sqlalchemy import and_ from backend.database import SessionLocal from backend.models import ScheduledAction, RecordingSession, MonitoringLocation, Project from backend.services.device_controller import get_device_controller, DeviceControllerError import uuid class SchedulerService: """ Service for executing scheduled actions. Usage: scheduler = SchedulerService() await scheduler.start() # Start background loop scheduler.stop() # Stop background loop """ def __init__(self, check_interval: int = 60): """ Initialize scheduler. Args: check_interval: Seconds between checks for pending actions (default: 60) """ self.check_interval = check_interval self.running = False self.task: Optional[asyncio.Task] = None self.device_controller = get_device_controller() async def start(self): """Start the scheduler background task.""" if self.running: print("Scheduler is already running") return self.running = True self.task = asyncio.create_task(self._run_loop()) print(f"Scheduler started (checking every {self.check_interval}s)") def stop(self): """Stop the scheduler background task.""" self.running = False if self.task: self.task.cancel() print("Scheduler stopped") async def _run_loop(self): """Main scheduler loop.""" while self.running: try: await self.execute_pending_actions() except Exception as e: print(f"Scheduler error: {e}") # Continue running even if there's an error await asyncio.sleep(self.check_interval) async def execute_pending_actions(self) -> List[Dict[str, Any]]: """ Find and execute all pending scheduled actions that are due. Returns: List of execution results """ db = SessionLocal() results = [] try: # Find pending actions that are due now = datetime.utcnow() pending_actions = db.query(ScheduledAction).filter( and_( ScheduledAction.execution_status == "pending", ScheduledAction.scheduled_time <= now, ) ).order_by(ScheduledAction.scheduled_time).all() if not pending_actions: return [] print(f"Found {len(pending_actions)} pending action(s) to execute") for action in pending_actions: result = await self._execute_action(action, db) results.append(result) db.commit() except Exception as e: print(f"Error executing pending actions: {e}") db.rollback() finally: db.close() return results async def _execute_action( self, action: ScheduledAction, db: Session, ) -> Dict[str, Any]: """ Execute a single scheduled action. Args: action: ScheduledAction to execute db: Database session Returns: Execution result dict """ print(f"Executing action {action.id}: {action.action_type} for unit {action.unit_id}") result = { "action_id": action.id, "action_type": action.action_type, "unit_id": action.unit_id, "scheduled_time": action.scheduled_time.isoformat(), "success": False, "error": None, } try: # Determine which unit to use # If unit_id is specified, use it; otherwise get from location assignment unit_id = action.unit_id if not unit_id: # Get assigned unit from location from backend.models import UnitAssignment assignment = db.query(UnitAssignment).filter( and_( UnitAssignment.location_id == action.location_id, UnitAssignment.status == "active", ) ).first() if not assignment: raise Exception(f"No active unit assigned to location {action.location_id}") unit_id = assignment.unit_id # Execute the action based on type if action.action_type == "start": response = await self._execute_start(action, unit_id, db) elif action.action_type == "stop": response = await self._execute_stop(action, unit_id, db) elif action.action_type == "download": response = await self._execute_download(action, unit_id, db) else: raise Exception(f"Unknown action type: {action.action_type}") # Mark action as completed action.execution_status = "completed" action.executed_at = datetime.utcnow() action.module_response = json.dumps(response) result["success"] = True result["response"] = response print(f"✓ Action {action.id} completed successfully") except Exception as e: # Mark action as failed action.execution_status = "failed" action.executed_at = datetime.utcnow() action.error_message = str(e) result["error"] = str(e) print(f"✗ Action {action.id} failed: {e}") return result async def _execute_start( self, action: ScheduledAction, unit_id: str, db: Session, ) -> Dict[str, Any]: """Execute a 'start' action.""" # Start recording via device controller response = await self.device_controller.start_recording( unit_id, action.device_type, config={}, # TODO: Load config from action.notes or metadata ) # Create recording session session = RecordingSession( id=str(uuid.uuid4()), project_id=action.project_id, location_id=action.location_id, unit_id=unit_id, session_type="sound" if action.device_type == "slm" else "vibration", started_at=datetime.utcnow(), status="recording", session_metadata=json.dumps({"scheduled_action_id": action.id}), ) db.add(session) return { "status": "started", "session_id": session.id, "device_response": response, } async def _execute_stop( self, action: ScheduledAction, unit_id: str, db: Session, ) -> Dict[str, Any]: """Execute a 'stop' action.""" # Stop recording via device controller response = await self.device_controller.stop_recording( unit_id, action.device_type, ) # Find and update the active recording session active_session = db.query(RecordingSession).filter( and_( RecordingSession.location_id == action.location_id, RecordingSession.unit_id == unit_id, RecordingSession.status == "recording", ) ).first() if active_session: active_session.stopped_at = datetime.utcnow() active_session.status = "completed" active_session.duration_seconds = int( (active_session.stopped_at - active_session.started_at).total_seconds() ) return { "status": "stopped", "session_id": active_session.id if active_session else None, "device_response": response, } async def _execute_download( self, action: ScheduledAction, unit_id: str, db: Session, ) -> Dict[str, Any]: """Execute a 'download' action.""" # Get project and location info for file path location = db.query(MonitoringLocation).filter_by(id=action.location_id).first() project = db.query(Project).filter_by(id=action.project_id).first() if not location or not project: raise Exception("Project or location not found") # Build destination path # Example: data/Projects/{project-id}/sound/{location-name}/session-{timestamp}/ session_timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H%M") location_type_dir = "sound" if action.device_type == "slm" else "vibration" destination_path = ( f"data/Projects/{project.id}/{location_type_dir}/" f"{location.name}/session-{session_timestamp}/" ) # Download files via device controller response = await self.device_controller.download_files( unit_id, action.device_type, destination_path, files=None, # Download all files ) # TODO: Create DataFile records for downloaded files return { "status": "downloaded", "destination_path": destination_path, "device_response": response, } # ======================================================================== # Manual Execution (for testing/debugging) # ======================================================================== async def execute_action_by_id(self, action_id: str) -> Dict[str, Any]: """ Manually execute a specific action by ID. Args: action_id: ScheduledAction ID Returns: Execution result """ db = SessionLocal() try: action = db.query(ScheduledAction).filter_by(id=action_id).first() if not action: return {"success": False, "error": "Action not found"} result = await self._execute_action(action, db) db.commit() return result except Exception as e: db.rollback() return {"success": False, "error": str(e)} finally: db.close() # Singleton instance _scheduler_instance: Optional[SchedulerService] = None def get_scheduler() -> SchedulerService: """ Get the scheduler singleton instance. Returns: SchedulerService instance """ global _scheduler_instance if _scheduler_instance is None: _scheduler_instance = SchedulerService() return _scheduler_instance async def start_scheduler(): """Start the global scheduler instance.""" scheduler = get_scheduler() await scheduler.start() def stop_scheduler(): """Stop the global scheduler instance.""" scheduler = get_scheduler() scheduler.stop()