743 lines
27 KiB
Python
743 lines
27 KiB
Python
"""
|
|
Scheduler Service
|
|
|
|
Executes scheduled actions for Projects system.
|
|
Monitors pending scheduled actions and executes them by calling device modules (SLMM/SFM).
|
|
|
|
Extended to support recurring schedules:
|
|
- Generates ScheduledActions from RecurringSchedule patterns
|
|
- Cleans up old completed/failed actions
|
|
|
|
This service runs as a background task in FastAPI, checking for pending actions
|
|
every minute and executing them when their scheduled time arrives.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List, Dict, Any
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import and_
|
|
|
|
from backend.database import SessionLocal
|
|
from backend.models import ScheduledAction, RecordingSession, MonitoringLocation, Project, RecurringSchedule
|
|
from backend.services.device_controller import get_device_controller, DeviceControllerError
|
|
from backend.services.alert_service import get_alert_service
|
|
import uuid
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SchedulerService:
|
|
"""
|
|
Service for executing scheduled actions.
|
|
|
|
Usage:
|
|
scheduler = SchedulerService()
|
|
await scheduler.start() # Start background loop
|
|
scheduler.stop() # Stop background loop
|
|
"""
|
|
|
|
def __init__(self, check_interval: int = 60):
|
|
"""
|
|
Initialize scheduler.
|
|
|
|
Args:
|
|
check_interval: Seconds between checks for pending actions (default: 60)
|
|
"""
|
|
self.check_interval = check_interval
|
|
self.running = False
|
|
self.task: Optional[asyncio.Task] = None
|
|
self.device_controller = get_device_controller()
|
|
|
|
async def start(self):
|
|
"""Start the scheduler background task."""
|
|
if self.running:
|
|
print("Scheduler is already running")
|
|
return
|
|
|
|
self.running = True
|
|
self.task = asyncio.create_task(self._run_loop())
|
|
print(f"Scheduler started (checking every {self.check_interval}s)")
|
|
|
|
def stop(self):
|
|
"""Stop the scheduler background task."""
|
|
self.running = False
|
|
if self.task:
|
|
self.task.cancel()
|
|
print("Scheduler stopped")
|
|
|
|
async def _run_loop(self):
|
|
"""Main scheduler loop."""
|
|
# Track when we last generated recurring actions (do this once per hour)
|
|
last_generation_check = datetime.utcnow() - timedelta(hours=1)
|
|
|
|
while self.running:
|
|
try:
|
|
# Execute pending actions
|
|
await self.execute_pending_actions()
|
|
|
|
# Generate actions from recurring schedules (every hour)
|
|
now = datetime.utcnow()
|
|
if (now - last_generation_check).total_seconds() >= 3600:
|
|
await self.generate_recurring_actions()
|
|
last_generation_check = now
|
|
|
|
# Cleanup old actions (also every hour, during generation cycle)
|
|
if (now - last_generation_check).total_seconds() < 60:
|
|
await self.cleanup_old_actions()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Scheduler error: {e}", exc_info=True)
|
|
# Continue running even if there's an error
|
|
|
|
await asyncio.sleep(self.check_interval)
|
|
|
|
async def execute_pending_actions(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Find and execute all pending scheduled actions that are due.
|
|
|
|
Returns:
|
|
List of execution results
|
|
"""
|
|
db = SessionLocal()
|
|
results = []
|
|
|
|
try:
|
|
# Find pending actions that are due
|
|
now = datetime.utcnow()
|
|
pending_actions = db.query(ScheduledAction).filter(
|
|
and_(
|
|
ScheduledAction.execution_status == "pending",
|
|
ScheduledAction.scheduled_time <= now,
|
|
)
|
|
).order_by(ScheduledAction.scheduled_time).all()
|
|
|
|
if not pending_actions:
|
|
return []
|
|
|
|
print(f"Found {len(pending_actions)} pending action(s) to execute")
|
|
|
|
for action in pending_actions:
|
|
result = await self._execute_action(action, db)
|
|
results.append(result)
|
|
|
|
db.commit()
|
|
|
|
except Exception as e:
|
|
print(f"Error executing pending actions: {e}")
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
return results
|
|
|
|
async def _execute_action(
|
|
self,
|
|
action: ScheduledAction,
|
|
db: Session,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute a single scheduled action.
|
|
|
|
Args:
|
|
action: ScheduledAction to execute
|
|
db: Database session
|
|
|
|
Returns:
|
|
Execution result dict
|
|
"""
|
|
print(f"Executing action {action.id}: {action.action_type} for unit {action.unit_id}")
|
|
|
|
result = {
|
|
"action_id": action.id,
|
|
"action_type": action.action_type,
|
|
"unit_id": action.unit_id,
|
|
"scheduled_time": action.scheduled_time.isoformat(),
|
|
"success": False,
|
|
"error": None,
|
|
}
|
|
|
|
try:
|
|
# Determine which unit to use
|
|
# If unit_id is specified, use it; otherwise get from location assignment
|
|
unit_id = action.unit_id
|
|
if not unit_id:
|
|
# Get assigned unit from location
|
|
from backend.models import UnitAssignment
|
|
assignment = db.query(UnitAssignment).filter(
|
|
and_(
|
|
UnitAssignment.location_id == action.location_id,
|
|
UnitAssignment.status == "active",
|
|
)
|
|
).first()
|
|
|
|
if not assignment:
|
|
raise Exception(f"No active unit assigned to location {action.location_id}")
|
|
|
|
unit_id = assignment.unit_id
|
|
|
|
# Execute the action based on type
|
|
if action.action_type == "start":
|
|
response = await self._execute_start(action, unit_id, db)
|
|
elif action.action_type == "stop":
|
|
response = await self._execute_stop(action, unit_id, db)
|
|
elif action.action_type == "download":
|
|
response = await self._execute_download(action, unit_id, db)
|
|
elif action.action_type == "cycle":
|
|
response = await self._execute_cycle(action, unit_id, db)
|
|
else:
|
|
raise Exception(f"Unknown action type: {action.action_type}")
|
|
|
|
# Mark action as completed
|
|
action.execution_status = "completed"
|
|
action.executed_at = datetime.utcnow()
|
|
action.module_response = json.dumps(response)
|
|
|
|
result["success"] = True
|
|
result["response"] = response
|
|
|
|
print(f"✓ Action {action.id} completed successfully")
|
|
|
|
# Create success alert
|
|
try:
|
|
alert_service = get_alert_service(db)
|
|
alert_metadata = response.get("cycle_response", {}) if isinstance(response, dict) else {}
|
|
alert_service.create_schedule_completed_alert(
|
|
schedule_id=action.id,
|
|
action_type=action.action_type,
|
|
unit_id=unit_id,
|
|
project_id=action.project_id,
|
|
location_id=action.location_id,
|
|
metadata=alert_metadata,
|
|
)
|
|
except Exception as alert_err:
|
|
logger.warning(f"Failed to create success alert: {alert_err}")
|
|
|
|
except Exception as e:
|
|
# Mark action as failed
|
|
action.execution_status = "failed"
|
|
action.executed_at = datetime.utcnow()
|
|
action.error_message = str(e)
|
|
|
|
result["error"] = str(e)
|
|
|
|
print(f"✗ Action {action.id} failed: {e}")
|
|
|
|
# Create failure alert
|
|
try:
|
|
alert_service = get_alert_service(db)
|
|
alert_service.create_schedule_failed_alert(
|
|
schedule_id=action.id,
|
|
action_type=action.action_type,
|
|
unit_id=unit_id if 'unit_id' in dir() else action.unit_id,
|
|
error_message=str(e),
|
|
project_id=action.project_id,
|
|
location_id=action.location_id,
|
|
)
|
|
except Exception as alert_err:
|
|
logger.warning(f"Failed to create failure alert: {alert_err}")
|
|
|
|
return result
|
|
|
|
async def _execute_start(
|
|
self,
|
|
action: ScheduledAction,
|
|
unit_id: str,
|
|
db: Session,
|
|
) -> Dict[str, Any]:
|
|
"""Execute a 'start' action using the start_cycle command.
|
|
|
|
start_cycle handles:
|
|
1. Sync device clock to server time
|
|
2. Find next safe index (with overwrite protection)
|
|
3. Start measurement
|
|
"""
|
|
# Execute the full start cycle via device controller
|
|
# SLMM handles clock sync, index increment, and start
|
|
cycle_response = await self.device_controller.start_cycle(
|
|
unit_id,
|
|
action.device_type,
|
|
sync_clock=True,
|
|
)
|
|
|
|
# Create recording session
|
|
session = RecordingSession(
|
|
id=str(uuid.uuid4()),
|
|
project_id=action.project_id,
|
|
location_id=action.location_id,
|
|
unit_id=unit_id,
|
|
session_type="sound" if action.device_type == "slm" else "vibration",
|
|
started_at=datetime.utcnow(),
|
|
status="recording",
|
|
session_metadata=json.dumps({
|
|
"scheduled_action_id": action.id,
|
|
"cycle_response": cycle_response,
|
|
}),
|
|
)
|
|
db.add(session)
|
|
|
|
return {
|
|
"status": "started",
|
|
"session_id": session.id,
|
|
"cycle_response": cycle_response,
|
|
}
|
|
|
|
async def _execute_stop(
|
|
self,
|
|
action: ScheduledAction,
|
|
unit_id: str,
|
|
db: Session,
|
|
) -> Dict[str, Any]:
|
|
"""Execute a 'stop' action using the stop_cycle command.
|
|
|
|
stop_cycle handles:
|
|
1. Stop measurement
|
|
2. Enable FTP
|
|
3. Download measurement folder
|
|
4. Verify download
|
|
"""
|
|
# Parse notes for download preference
|
|
include_download = True
|
|
try:
|
|
if action.notes:
|
|
notes_data = json.loads(action.notes)
|
|
include_download = notes_data.get("include_download", True)
|
|
except json.JSONDecodeError:
|
|
pass # Notes is plain text, not JSON
|
|
|
|
# Execute the full stop cycle via device controller
|
|
# SLMM handles stop, FTP enable, and download
|
|
cycle_response = await self.device_controller.stop_cycle(
|
|
unit_id,
|
|
action.device_type,
|
|
download=include_download,
|
|
)
|
|
|
|
# Find and update the active recording session
|
|
active_session = db.query(RecordingSession).filter(
|
|
and_(
|
|
RecordingSession.location_id == action.location_id,
|
|
RecordingSession.unit_id == unit_id,
|
|
RecordingSession.status == "recording",
|
|
)
|
|
).first()
|
|
|
|
if active_session:
|
|
active_session.stopped_at = datetime.utcnow()
|
|
active_session.status = "completed"
|
|
active_session.duration_seconds = int(
|
|
(active_session.stopped_at - active_session.started_at).total_seconds()
|
|
)
|
|
# Store download info in session metadata
|
|
if cycle_response.get("download_success"):
|
|
try:
|
|
metadata = json.loads(active_session.session_metadata or "{}")
|
|
metadata["downloaded_folder"] = cycle_response.get("downloaded_folder")
|
|
metadata["local_path"] = cycle_response.get("local_path")
|
|
active_session.session_metadata = json.dumps(metadata)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return {
|
|
"status": "stopped",
|
|
"session_id": active_session.id if active_session else None,
|
|
"cycle_response": cycle_response,
|
|
}
|
|
|
|
async def _execute_download(
|
|
self,
|
|
action: ScheduledAction,
|
|
unit_id: str,
|
|
db: Session,
|
|
) -> Dict[str, Any]:
|
|
"""Execute a 'download' action.
|
|
|
|
This handles standalone download actions (not part of stop_cycle).
|
|
The workflow is:
|
|
1. Enable FTP on device
|
|
2. Download current measurement folder
|
|
3. (Optionally disable FTP - left enabled for now)
|
|
"""
|
|
# Get project and location info for file path
|
|
location = db.query(MonitoringLocation).filter_by(id=action.location_id).first()
|
|
project = db.query(Project).filter_by(id=action.project_id).first()
|
|
|
|
if not location or not project:
|
|
raise Exception("Project or location not found")
|
|
|
|
# Build destination path (for logging/metadata reference)
|
|
# Actual download location is managed by SLMM (data/downloads/{unit_id}/)
|
|
session_timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H%M")
|
|
location_type_dir = "sound" if action.device_type == "slm" else "vibration"
|
|
|
|
destination_path = (
|
|
f"data/Projects/{project.id}/{location_type_dir}/"
|
|
f"{location.name}/session-{session_timestamp}/"
|
|
)
|
|
|
|
# Step 1: Disable FTP first to reset any stale connection state
|
|
# Then enable FTP on device
|
|
logger.info(f"Resetting FTP on {unit_id} for download (disable then enable)")
|
|
try:
|
|
await self.device_controller.disable_ftp(unit_id, action.device_type)
|
|
except Exception as e:
|
|
logger.warning(f"FTP disable failed (may already be off): {e}")
|
|
await self.device_controller.enable_ftp(unit_id, action.device_type)
|
|
|
|
# Step 2: Download current measurement folder
|
|
# The slmm_client.download_files() now automatically determines the correct
|
|
# folder based on the device's current index number
|
|
response = await self.device_controller.download_files(
|
|
unit_id,
|
|
action.device_type,
|
|
destination_path,
|
|
files=None, # Download all files in current measurement folder
|
|
)
|
|
|
|
# TODO: Create DataFile records for downloaded files
|
|
|
|
return {
|
|
"status": "downloaded",
|
|
"destination_path": destination_path,
|
|
"device_response": response,
|
|
}
|
|
|
|
async def _execute_cycle(
|
|
self,
|
|
action: ScheduledAction,
|
|
unit_id: str,
|
|
db: Session,
|
|
) -> Dict[str, Any]:
|
|
"""Execute a full 'cycle' action: stop -> download -> start.
|
|
|
|
This combines stop, download, and start into a single action with
|
|
appropriate delays between steps to ensure device stability.
|
|
|
|
Workflow:
|
|
0. Pause background polling to prevent command conflicts
|
|
1. Stop measurement (wait 10s)
|
|
2. Disable FTP to reset state (wait 10s)
|
|
3. Enable FTP (wait 10s)
|
|
4. Download current measurement folder
|
|
5. Wait 30s for device to settle
|
|
6. Start new measurement cycle
|
|
7. Re-enable background polling
|
|
|
|
Total time: ~70-90 seconds depending on download size
|
|
"""
|
|
logger.info(f"[CYCLE] === Starting full cycle for {unit_id} ===")
|
|
|
|
result = {
|
|
"status": "cycle_complete",
|
|
"steps": {},
|
|
"old_session_id": None,
|
|
"new_session_id": None,
|
|
"polling_paused": False,
|
|
}
|
|
|
|
# Step 0: Pause background polling for this device to prevent command conflicts
|
|
# NL-43 devices only support one TCP connection at a time
|
|
logger.info(f"[CYCLE] Step 0: Pausing background polling for {unit_id}")
|
|
polling_was_enabled = False
|
|
try:
|
|
if action.device_type == "slm":
|
|
# Get current polling state to restore later
|
|
from backend.services.slmm_client import get_slmm_client
|
|
slmm = get_slmm_client()
|
|
try:
|
|
polling_config = await slmm.get_device_polling_config(unit_id)
|
|
polling_was_enabled = polling_config.get("poll_enabled", False)
|
|
except Exception:
|
|
polling_was_enabled = True # Assume enabled if can't check
|
|
|
|
# Disable polling during cycle
|
|
await slmm.update_device_polling_config(unit_id, poll_enabled=False)
|
|
result["polling_paused"] = True
|
|
logger.info(f"[CYCLE] Background polling paused for {unit_id}")
|
|
except Exception as e:
|
|
logger.warning(f"[CYCLE] Failed to pause polling (continuing anyway): {e}")
|
|
|
|
try:
|
|
# Step 1: Stop measurement
|
|
logger.info(f"[CYCLE] Step 1/7: Stopping measurement on {unit_id}")
|
|
try:
|
|
stop_response = await self.device_controller.stop_recording(unit_id, action.device_type)
|
|
result["steps"]["stop"] = {"success": True, "response": stop_response}
|
|
logger.info(f"[CYCLE] Measurement stopped, waiting 10s...")
|
|
except Exception as e:
|
|
logger.warning(f"[CYCLE] Stop failed (may already be stopped): {e}")
|
|
result["steps"]["stop"] = {"success": False, "error": str(e)}
|
|
|
|
await asyncio.sleep(10)
|
|
|
|
# Step 2: Disable FTP to reset any stale state
|
|
logger.info(f"[CYCLE] Step 2/7: Disabling FTP on {unit_id}")
|
|
try:
|
|
await self.device_controller.disable_ftp(unit_id, action.device_type)
|
|
result["steps"]["ftp_disable"] = {"success": True}
|
|
logger.info(f"[CYCLE] FTP disabled, waiting 10s...")
|
|
except Exception as e:
|
|
logger.warning(f"[CYCLE] FTP disable failed (may already be off): {e}")
|
|
result["steps"]["ftp_disable"] = {"success": False, "error": str(e)}
|
|
|
|
await asyncio.sleep(10)
|
|
|
|
# Step 3: Enable FTP
|
|
logger.info(f"[CYCLE] Step 3/7: Enabling FTP on {unit_id}")
|
|
try:
|
|
await self.device_controller.enable_ftp(unit_id, action.device_type)
|
|
result["steps"]["ftp_enable"] = {"success": True}
|
|
logger.info(f"[CYCLE] FTP enabled, waiting 10s...")
|
|
except Exception as e:
|
|
logger.error(f"[CYCLE] FTP enable failed: {e}")
|
|
result["steps"]["ftp_enable"] = {"success": False, "error": str(e)}
|
|
# Continue anyway - download will fail but we can still try to start
|
|
|
|
await asyncio.sleep(10)
|
|
|
|
# Step 4: Download current measurement folder
|
|
logger.info(f"[CYCLE] Step 4/7: Downloading measurement data from {unit_id}")
|
|
location = db.query(MonitoringLocation).filter_by(id=action.location_id).first()
|
|
project = db.query(Project).filter_by(id=action.project_id).first()
|
|
|
|
if location and project:
|
|
session_timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H%M")
|
|
location_type_dir = "sound" if action.device_type == "slm" else "vibration"
|
|
destination_path = (
|
|
f"data/Projects/{project.id}/{location_type_dir}/"
|
|
f"{location.name}/session-{session_timestamp}/"
|
|
)
|
|
|
|
try:
|
|
download_response = await self.device_controller.download_files(
|
|
unit_id,
|
|
action.device_type,
|
|
destination_path,
|
|
files=None,
|
|
)
|
|
result["steps"]["download"] = {"success": True, "response": download_response}
|
|
logger.info(f"[CYCLE] Download complete")
|
|
except Exception as e:
|
|
logger.error(f"[CYCLE] Download failed: {e}")
|
|
result["steps"]["download"] = {"success": False, "error": str(e)}
|
|
else:
|
|
result["steps"]["download"] = {"success": False, "error": "Project or location not found"}
|
|
|
|
# Close out the old recording session
|
|
active_session = db.query(RecordingSession).filter(
|
|
and_(
|
|
RecordingSession.location_id == action.location_id,
|
|
RecordingSession.unit_id == unit_id,
|
|
RecordingSession.status == "recording",
|
|
)
|
|
).first()
|
|
|
|
if active_session:
|
|
active_session.stopped_at = datetime.utcnow()
|
|
active_session.status = "completed"
|
|
active_session.duration_seconds = int(
|
|
(active_session.stopped_at - active_session.started_at).total_seconds()
|
|
)
|
|
result["old_session_id"] = active_session.id
|
|
|
|
# Step 5: Wait for device to settle before starting new measurement
|
|
logger.info(f"[CYCLE] Step 5/7: Waiting 30s for device to settle...")
|
|
await asyncio.sleep(30)
|
|
|
|
# Step 6: Start new measurement cycle
|
|
logger.info(f"[CYCLE] Step 6/7: Starting new measurement on {unit_id}")
|
|
try:
|
|
cycle_response = await self.device_controller.start_cycle(
|
|
unit_id,
|
|
action.device_type,
|
|
sync_clock=True,
|
|
)
|
|
result["steps"]["start"] = {"success": True, "response": cycle_response}
|
|
|
|
# Create new recording session
|
|
new_session = RecordingSession(
|
|
id=str(uuid.uuid4()),
|
|
project_id=action.project_id,
|
|
location_id=action.location_id,
|
|
unit_id=unit_id,
|
|
session_type="sound" if action.device_type == "slm" else "vibration",
|
|
started_at=datetime.utcnow(),
|
|
status="recording",
|
|
session_metadata=json.dumps({
|
|
"scheduled_action_id": action.id,
|
|
"cycle_response": cycle_response,
|
|
"action_type": "cycle",
|
|
}),
|
|
)
|
|
db.add(new_session)
|
|
result["new_session_id"] = new_session.id
|
|
|
|
logger.info(f"[CYCLE] New measurement started, session {new_session.id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[CYCLE] Start failed: {e}")
|
|
result["steps"]["start"] = {"success": False, "error": str(e)}
|
|
raise # Re-raise to mark the action as failed
|
|
|
|
finally:
|
|
# Step 7: Re-enable background polling (always runs, even on failure)
|
|
if result.get("polling_paused") and polling_was_enabled:
|
|
logger.info(f"[CYCLE] Step 7/7: Re-enabling background polling for {unit_id}")
|
|
try:
|
|
if action.device_type == "slm":
|
|
from backend.services.slmm_client import get_slmm_client
|
|
slmm = get_slmm_client()
|
|
await slmm.update_device_polling_config(unit_id, poll_enabled=True)
|
|
logger.info(f"[CYCLE] Background polling re-enabled for {unit_id}")
|
|
except Exception as e:
|
|
logger.error(f"[CYCLE] Failed to re-enable polling: {e}")
|
|
# Don't raise - cycle completed, just log the error
|
|
|
|
logger.info(f"[CYCLE] === Cycle complete for {unit_id} ===")
|
|
return result
|
|
|
|
# ========================================================================
|
|
# Recurring Schedule Generation
|
|
# ========================================================================
|
|
|
|
async def generate_recurring_actions(self) -> int:
|
|
"""
|
|
Generate ScheduledActions from all enabled recurring schedules.
|
|
|
|
Runs once per hour to generate actions for the next 7 days.
|
|
|
|
Returns:
|
|
Total number of actions generated
|
|
"""
|
|
db = SessionLocal()
|
|
total_generated = 0
|
|
|
|
try:
|
|
from backend.services.recurring_schedule_service import get_recurring_schedule_service
|
|
|
|
service = get_recurring_schedule_service(db)
|
|
schedules = service.get_enabled_schedules()
|
|
|
|
if not schedules:
|
|
logger.debug("No enabled recurring schedules found")
|
|
return 0
|
|
|
|
logger.info(f"Generating actions for {len(schedules)} recurring schedule(s)")
|
|
|
|
for schedule in schedules:
|
|
try:
|
|
actions = service.generate_actions_for_schedule(schedule, horizon_days=7)
|
|
total_generated += len(actions)
|
|
except Exception as e:
|
|
logger.error(f"Error generating actions for schedule {schedule.id}: {e}")
|
|
|
|
if total_generated > 0:
|
|
logger.info(f"Generated {total_generated} scheduled actions from recurring schedules")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in generate_recurring_actions: {e}", exc_info=True)
|
|
finally:
|
|
db.close()
|
|
|
|
return total_generated
|
|
|
|
async def cleanup_old_actions(self, retention_days: int = 30) -> int:
|
|
"""
|
|
Remove old completed/failed actions to prevent database bloat.
|
|
|
|
Args:
|
|
retention_days: Keep actions newer than this many days
|
|
|
|
Returns:
|
|
Number of actions cleaned up
|
|
"""
|
|
db = SessionLocal()
|
|
cleaned = 0
|
|
|
|
try:
|
|
cutoff = datetime.utcnow() - timedelta(days=retention_days)
|
|
|
|
old_actions = db.query(ScheduledAction).filter(
|
|
and_(
|
|
ScheduledAction.execution_status.in_(["completed", "failed", "cancelled"]),
|
|
ScheduledAction.executed_at < cutoff,
|
|
)
|
|
).all()
|
|
|
|
cleaned = len(old_actions)
|
|
for action in old_actions:
|
|
db.delete(action)
|
|
|
|
if cleaned > 0:
|
|
db.commit()
|
|
logger.info(f"Cleaned up {cleaned} old scheduled actions (>{retention_days} days)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up old actions: {e}")
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
return cleaned
|
|
|
|
# ========================================================================
|
|
# Manual Execution (for testing/debugging)
|
|
# ========================================================================
|
|
|
|
async def execute_action_by_id(self, action_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Manually execute a specific action by ID.
|
|
|
|
Args:
|
|
action_id: ScheduledAction ID
|
|
|
|
Returns:
|
|
Execution result
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
action = db.query(ScheduledAction).filter_by(id=action_id).first()
|
|
if not action:
|
|
return {"success": False, "error": "Action not found"}
|
|
|
|
result = await self._execute_action(action, db)
|
|
db.commit()
|
|
return result
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"success": False, "error": str(e)}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Singleton instance
|
|
_scheduler_instance: Optional[SchedulerService] = None
|
|
|
|
|
|
def get_scheduler() -> SchedulerService:
|
|
"""
|
|
Get the scheduler singleton instance.
|
|
|
|
Returns:
|
|
SchedulerService instance
|
|
"""
|
|
global _scheduler_instance
|
|
if _scheduler_instance is None:
|
|
_scheduler_instance = SchedulerService()
|
|
return _scheduler_instance
|
|
|
|
|
|
async def start_scheduler():
|
|
"""Start the global scheduler instance."""
|
|
scheduler = get_scheduler()
|
|
await scheduler.start()
|
|
|
|
|
|
def stop_scheduler():
|
|
"""Stop the global scheduler instance."""
|
|
scheduler = get_scheduler()
|
|
scheduler.stop()
|