Feat: Scheduler implemented, WIP
This commit is contained in:
@@ -4,22 +4,29 @@ Scheduler Service
|
||||
Executes scheduled actions for Projects system.
|
||||
Monitors pending scheduled actions and executes them by calling device modules (SLMM/SFM).
|
||||
|
||||
Extended to support recurring schedules:
|
||||
- Generates ScheduledActions from RecurringSchedule patterns
|
||||
- Cleans up old completed/failed actions
|
||||
|
||||
This service runs as a background task in FastAPI, checking for pending actions
|
||||
every minute and executing them when their scheduled time arrives.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, List, Dict, Any
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import and_
|
||||
|
||||
from backend.database import SessionLocal
|
||||
from backend.models import ScheduledAction, RecordingSession, MonitoringLocation, Project
|
||||
from backend.models import ScheduledAction, RecordingSession, MonitoringLocation, Project, RecurringSchedule
|
||||
from backend.services.device_controller import get_device_controller, DeviceControllerError
|
||||
import uuid
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SchedulerService:
|
||||
"""
|
||||
@@ -62,11 +69,26 @@ class SchedulerService:
|
||||
|
||||
async def _run_loop(self):
|
||||
"""Main scheduler loop."""
|
||||
# Track when we last generated recurring actions (do this once per hour)
|
||||
last_generation_check = datetime.utcnow() - timedelta(hours=1)
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
# Execute pending actions
|
||||
await self.execute_pending_actions()
|
||||
|
||||
# Generate actions from recurring schedules (every hour)
|
||||
now = datetime.utcnow()
|
||||
if (now - last_generation_check).total_seconds() >= 3600:
|
||||
await self.generate_recurring_actions()
|
||||
last_generation_check = now
|
||||
|
||||
# Cleanup old actions (also every hour, during generation cycle)
|
||||
if (now - last_generation_check).total_seconds() < 60:
|
||||
await self.cleanup_old_actions()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Scheduler error: {e}")
|
||||
logger.error(f"Scheduler error: {e}", exc_info=True)
|
||||
# Continue running even if there's an error
|
||||
|
||||
await asyncio.sleep(self.check_interval)
|
||||
@@ -194,11 +216,34 @@ class SchedulerService:
|
||||
db: Session,
|
||||
) -> Dict[str, Any]:
|
||||
"""Execute a 'start' action."""
|
||||
# Parse action notes for automation settings
|
||||
auto_increment_index = False
|
||||
try:
|
||||
if action.notes:
|
||||
notes_data = json.loads(action.notes)
|
||||
auto_increment_index = notes_data.get("auto_increment_index", False)
|
||||
except json.JSONDecodeError:
|
||||
pass # Notes is plain text, not JSON
|
||||
|
||||
# If auto_increment_index is enabled, increment the store index before starting
|
||||
increment_response = None
|
||||
if auto_increment_index and action.device_type == "slm":
|
||||
try:
|
||||
logger.info(f"Auto-incrementing store index for unit {unit_id}")
|
||||
increment_response = await self.device_controller.increment_index(
|
||||
unit_id,
|
||||
action.device_type,
|
||||
)
|
||||
logger.info(f"Index incremented: {increment_response}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to increment index for {unit_id}: {e}")
|
||||
# Continue with start anyway - don't fail the whole action
|
||||
|
||||
# Start recording via device controller
|
||||
response = await self.device_controller.start_recording(
|
||||
unit_id,
|
||||
action.device_type,
|
||||
config={}, # TODO: Load config from action.notes or metadata
|
||||
config={},
|
||||
)
|
||||
|
||||
# Create recording session
|
||||
@@ -210,7 +255,11 @@ class SchedulerService:
|
||||
session_type="sound" if action.device_type == "slm" else "vibration",
|
||||
started_at=datetime.utcnow(),
|
||||
status="recording",
|
||||
session_metadata=json.dumps({"scheduled_action_id": action.id}),
|
||||
session_metadata=json.dumps({
|
||||
"scheduled_action_id": action.id,
|
||||
"auto_increment_index": auto_increment_index,
|
||||
"increment_response": increment_response,
|
||||
}),
|
||||
)
|
||||
db.add(session)
|
||||
|
||||
@@ -218,6 +267,8 @@ class SchedulerService:
|
||||
"status": "started",
|
||||
"session_id": session.id,
|
||||
"device_response": response,
|
||||
"index_incremented": auto_increment_index,
|
||||
"increment_response": increment_response,
|
||||
}
|
||||
|
||||
async def _execute_stop(
|
||||
@@ -295,6 +346,90 @@ class SchedulerService:
|
||||
"device_response": response,
|
||||
}
|
||||
|
||||
# ========================================================================
|
||||
# Recurring Schedule Generation
|
||||
# ========================================================================
|
||||
|
||||
async def generate_recurring_actions(self) -> int:
|
||||
"""
|
||||
Generate ScheduledActions from all enabled recurring schedules.
|
||||
|
||||
Runs once per hour to generate actions for the next 7 days.
|
||||
|
||||
Returns:
|
||||
Total number of actions generated
|
||||
"""
|
||||
db = SessionLocal()
|
||||
total_generated = 0
|
||||
|
||||
try:
|
||||
from backend.services.recurring_schedule_service import get_recurring_schedule_service
|
||||
|
||||
service = get_recurring_schedule_service(db)
|
||||
schedules = service.get_enabled_schedules()
|
||||
|
||||
if not schedules:
|
||||
logger.debug("No enabled recurring schedules found")
|
||||
return 0
|
||||
|
||||
logger.info(f"Generating actions for {len(schedules)} recurring schedule(s)")
|
||||
|
||||
for schedule in schedules:
|
||||
try:
|
||||
actions = service.generate_actions_for_schedule(schedule, horizon_days=7)
|
||||
total_generated += len(actions)
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating actions for schedule {schedule.id}: {e}")
|
||||
|
||||
if total_generated > 0:
|
||||
logger.info(f"Generated {total_generated} scheduled actions from recurring schedules")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in generate_recurring_actions: {e}", exc_info=True)
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
return total_generated
|
||||
|
||||
async def cleanup_old_actions(self, retention_days: int = 30) -> int:
|
||||
"""
|
||||
Remove old completed/failed actions to prevent database bloat.
|
||||
|
||||
Args:
|
||||
retention_days: Keep actions newer than this many days
|
||||
|
||||
Returns:
|
||||
Number of actions cleaned up
|
||||
"""
|
||||
db = SessionLocal()
|
||||
cleaned = 0
|
||||
|
||||
try:
|
||||
cutoff = datetime.utcnow() - timedelta(days=retention_days)
|
||||
|
||||
old_actions = db.query(ScheduledAction).filter(
|
||||
and_(
|
||||
ScheduledAction.execution_status.in_(["completed", "failed", "cancelled"]),
|
||||
ScheduledAction.executed_at < cutoff,
|
||||
)
|
||||
).all()
|
||||
|
||||
cleaned = len(old_actions)
|
||||
for action in old_actions:
|
||||
db.delete(action)
|
||||
|
||||
if cleaned > 0:
|
||||
db.commit()
|
||||
logger.info(f"Cleaned up {cleaned} old scheduled actions (>{retention_days} days)")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up old actions: {e}")
|
||||
db.rollback()
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
return cleaned
|
||||
|
||||
# ========================================================================
|
||||
# Manual Execution (for testing/debugging)
|
||||
# ========================================================================
|
||||
|
||||
Reference in New Issue
Block a user