- Moved Jinja2 template setup to a shared configuration file (templates_config.py) for consistent usage across routers. - Introduced timezone utilities in a new module (timezone.py) to handle UTC to local time conversions and formatting. - Updated all relevant routers to use the new shared template configuration and timezone filters. - Enhanced templates to utilize local time formatting for various datetime fields, improving user experience with timezone awareness.
560 lines
20 KiB
Python
560 lines
20 KiB
Python
"""
|
|
Recurring Schedule Service
|
|
|
|
Manages recurring schedule definitions and generates ScheduledAction
|
|
instances based on patterns (weekly calendar, simple interval).
|
|
"""
|
|
|
|
import json
|
|
import uuid
|
|
import logging
|
|
from datetime import datetime, timedelta, date, time
|
|
from typing import Optional, List, Dict, Any, Tuple
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import and_
|
|
|
|
from backend.models import RecurringSchedule, ScheduledAction, MonitoringLocation, UnitAssignment
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Day name mapping
|
|
DAY_NAMES = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
|
|
|
|
|
|
class RecurringScheduleService:
|
|
"""
|
|
Service for managing recurring schedules and generating ScheduledActions.
|
|
|
|
Supports two schedule types:
|
|
- weekly_calendar: Specific days with start/end times
|
|
- simple_interval: Daily stop/download/restart cycles for 24/7 monitoring
|
|
"""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def create_schedule(
|
|
self,
|
|
project_id: str,
|
|
location_id: str,
|
|
name: str,
|
|
schedule_type: str,
|
|
device_type: str = "slm",
|
|
unit_id: str = None,
|
|
weekly_pattern: dict = None,
|
|
interval_type: str = None,
|
|
cycle_time: str = None,
|
|
include_download: bool = True,
|
|
auto_increment_index: bool = True,
|
|
timezone: str = "America/New_York",
|
|
) -> RecurringSchedule:
|
|
"""
|
|
Create a new recurring schedule.
|
|
|
|
Args:
|
|
project_id: Project ID
|
|
location_id: Monitoring location ID
|
|
name: Schedule name
|
|
schedule_type: "weekly_calendar" or "simple_interval"
|
|
device_type: "slm" or "seismograph"
|
|
unit_id: Specific unit (optional, can use assignment)
|
|
weekly_pattern: Dict of day patterns for weekly_calendar
|
|
interval_type: "daily" or "hourly" for simple_interval
|
|
cycle_time: Time string "HH:MM" for cycle
|
|
include_download: Whether to download data on cycle
|
|
auto_increment_index: Whether to auto-increment store index before start
|
|
timezone: Timezone for schedule times
|
|
|
|
Returns:
|
|
Created RecurringSchedule
|
|
"""
|
|
schedule = RecurringSchedule(
|
|
id=str(uuid.uuid4()),
|
|
project_id=project_id,
|
|
location_id=location_id,
|
|
unit_id=unit_id,
|
|
name=name,
|
|
schedule_type=schedule_type,
|
|
device_type=device_type,
|
|
weekly_pattern=json.dumps(weekly_pattern) if weekly_pattern else None,
|
|
interval_type=interval_type,
|
|
cycle_time=cycle_time,
|
|
include_download=include_download,
|
|
auto_increment_index=auto_increment_index,
|
|
enabled=True,
|
|
timezone=timezone,
|
|
)
|
|
|
|
# Calculate next occurrence
|
|
schedule.next_occurrence = self._calculate_next_occurrence(schedule)
|
|
|
|
self.db.add(schedule)
|
|
self.db.commit()
|
|
self.db.refresh(schedule)
|
|
|
|
logger.info(f"Created recurring schedule: {name} ({schedule_type})")
|
|
return schedule
|
|
|
|
def update_schedule(
|
|
self,
|
|
schedule_id: str,
|
|
**kwargs,
|
|
) -> Optional[RecurringSchedule]:
|
|
"""
|
|
Update a recurring schedule.
|
|
|
|
Args:
|
|
schedule_id: Schedule to update
|
|
**kwargs: Fields to update
|
|
|
|
Returns:
|
|
Updated schedule or None
|
|
"""
|
|
schedule = self.db.query(RecurringSchedule).filter_by(id=schedule_id).first()
|
|
if not schedule:
|
|
return None
|
|
|
|
for key, value in kwargs.items():
|
|
if hasattr(schedule, key):
|
|
if key == "weekly_pattern" and isinstance(value, dict):
|
|
value = json.dumps(value)
|
|
setattr(schedule, key, value)
|
|
|
|
# Recalculate next occurrence
|
|
schedule.next_occurrence = self._calculate_next_occurrence(schedule)
|
|
|
|
self.db.commit()
|
|
self.db.refresh(schedule)
|
|
|
|
logger.info(f"Updated recurring schedule: {schedule.name}")
|
|
return schedule
|
|
|
|
def delete_schedule(self, schedule_id: str) -> bool:
|
|
"""
|
|
Delete a recurring schedule and its pending generated actions.
|
|
|
|
Args:
|
|
schedule_id: Schedule to delete
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
"""
|
|
schedule = self.db.query(RecurringSchedule).filter_by(id=schedule_id).first()
|
|
if not schedule:
|
|
return False
|
|
|
|
# Delete pending generated actions for this schedule
|
|
# The schedule_id is stored in the notes field as JSON
|
|
pending_actions = self.db.query(ScheduledAction).filter(
|
|
and_(
|
|
ScheduledAction.execution_status == "pending",
|
|
ScheduledAction.notes.like(f'%"schedule_id": "{schedule_id}"%'),
|
|
)
|
|
).all()
|
|
|
|
deleted_count = len(pending_actions)
|
|
for action in pending_actions:
|
|
self.db.delete(action)
|
|
|
|
self.db.delete(schedule)
|
|
self.db.commit()
|
|
|
|
logger.info(f"Deleted recurring schedule: {schedule.name} (and {deleted_count} pending actions)")
|
|
return True
|
|
|
|
def enable_schedule(self, schedule_id: str) -> Optional[RecurringSchedule]:
|
|
"""Enable a disabled schedule."""
|
|
return self.update_schedule(schedule_id, enabled=True)
|
|
|
|
def disable_schedule(self, schedule_id: str) -> Optional[RecurringSchedule]:
|
|
"""Disable a schedule."""
|
|
return self.update_schedule(schedule_id, enabled=False)
|
|
|
|
def generate_actions_for_schedule(
|
|
self,
|
|
schedule: RecurringSchedule,
|
|
horizon_days: int = 7,
|
|
preview_only: bool = False,
|
|
) -> List[ScheduledAction]:
|
|
"""
|
|
Generate ScheduledAction entries for the next N days based on pattern.
|
|
|
|
Args:
|
|
schedule: The recurring schedule
|
|
horizon_days: Days ahead to generate
|
|
preview_only: If True, don't save to DB (for preview)
|
|
|
|
Returns:
|
|
List of generated ScheduledAction instances
|
|
"""
|
|
if not schedule.enabled:
|
|
return []
|
|
|
|
if schedule.schedule_type == "weekly_calendar":
|
|
actions = self._generate_weekly_calendar_actions(schedule, horizon_days)
|
|
elif schedule.schedule_type == "simple_interval":
|
|
actions = self._generate_interval_actions(schedule, horizon_days)
|
|
else:
|
|
logger.warning(f"Unknown schedule type: {schedule.schedule_type}")
|
|
return []
|
|
|
|
if not preview_only and actions:
|
|
for action in actions:
|
|
self.db.add(action)
|
|
|
|
schedule.last_generated_at = datetime.utcnow()
|
|
schedule.next_occurrence = self._calculate_next_occurrence(schedule)
|
|
|
|
self.db.commit()
|
|
logger.info(f"Generated {len(actions)} actions for schedule: {schedule.name}")
|
|
|
|
return actions
|
|
|
|
def _generate_weekly_calendar_actions(
|
|
self,
|
|
schedule: RecurringSchedule,
|
|
horizon_days: int,
|
|
) -> List[ScheduledAction]:
|
|
"""
|
|
Generate actions from weekly calendar pattern.
|
|
|
|
Pattern format:
|
|
{
|
|
"monday": {"enabled": true, "start": "19:00", "end": "07:00"},
|
|
"tuesday": {"enabled": false},
|
|
...
|
|
}
|
|
"""
|
|
if not schedule.weekly_pattern:
|
|
return []
|
|
|
|
try:
|
|
pattern = json.loads(schedule.weekly_pattern)
|
|
except json.JSONDecodeError:
|
|
logger.error(f"Invalid weekly_pattern JSON for schedule {schedule.id}")
|
|
return []
|
|
|
|
actions = []
|
|
tz = ZoneInfo(schedule.timezone)
|
|
now_utc = datetime.utcnow()
|
|
now_local = now_utc.replace(tzinfo=ZoneInfo("UTC")).astimezone(tz)
|
|
|
|
# Get unit_id (from schedule or assignment)
|
|
unit_id = self._resolve_unit_id(schedule)
|
|
|
|
for day_offset in range(horizon_days):
|
|
check_date = now_local.date() + timedelta(days=day_offset)
|
|
day_name = DAY_NAMES[check_date.weekday()]
|
|
day_config = pattern.get(day_name, {})
|
|
|
|
if not day_config.get("enabled", False):
|
|
continue
|
|
|
|
start_time_str = day_config.get("start")
|
|
end_time_str = day_config.get("end")
|
|
|
|
if not start_time_str or not end_time_str:
|
|
continue
|
|
|
|
# Parse times
|
|
start_time = self._parse_time(start_time_str)
|
|
end_time = self._parse_time(end_time_str)
|
|
|
|
if not start_time or not end_time:
|
|
continue
|
|
|
|
# Create start datetime in local timezone
|
|
start_local = datetime.combine(check_date, start_time, tzinfo=tz)
|
|
start_utc = start_local.astimezone(ZoneInfo("UTC")).replace(tzinfo=None)
|
|
|
|
# Handle overnight schedules (end time is next day)
|
|
if end_time <= start_time:
|
|
end_date = check_date + timedelta(days=1)
|
|
else:
|
|
end_date = check_date
|
|
|
|
end_local = datetime.combine(end_date, end_time, tzinfo=tz)
|
|
end_utc = end_local.astimezone(ZoneInfo("UTC")).replace(tzinfo=None)
|
|
|
|
# Skip if start time has already passed
|
|
if start_utc <= now_utc:
|
|
continue
|
|
|
|
# Check if action already exists
|
|
if self._action_exists(schedule.project_id, schedule.location_id, "start", start_utc):
|
|
continue
|
|
|
|
# Build notes with automation metadata
|
|
start_notes = json.dumps({
|
|
"schedule_name": schedule.name,
|
|
"schedule_id": schedule.id,
|
|
"auto_increment_index": schedule.auto_increment_index,
|
|
})
|
|
|
|
# Create START action
|
|
start_action = ScheduledAction(
|
|
id=str(uuid.uuid4()),
|
|
project_id=schedule.project_id,
|
|
location_id=schedule.location_id,
|
|
unit_id=unit_id,
|
|
action_type="start",
|
|
device_type=schedule.device_type,
|
|
scheduled_time=start_utc,
|
|
execution_status="pending",
|
|
notes=start_notes,
|
|
)
|
|
actions.append(start_action)
|
|
|
|
# Create STOP action
|
|
stop_notes = json.dumps({
|
|
"schedule_name": schedule.name,
|
|
"schedule_id": schedule.id,
|
|
})
|
|
stop_action = ScheduledAction(
|
|
id=str(uuid.uuid4()),
|
|
project_id=schedule.project_id,
|
|
location_id=schedule.location_id,
|
|
unit_id=unit_id,
|
|
action_type="stop",
|
|
device_type=schedule.device_type,
|
|
scheduled_time=end_utc,
|
|
execution_status="pending",
|
|
notes=stop_notes,
|
|
)
|
|
actions.append(stop_action)
|
|
|
|
# Create DOWNLOAD action if enabled (1 minute after stop)
|
|
if schedule.include_download:
|
|
download_time = end_utc + timedelta(minutes=1)
|
|
download_notes = json.dumps({
|
|
"schedule_name": schedule.name,
|
|
"schedule_id": schedule.id,
|
|
"schedule_type": "weekly_calendar",
|
|
})
|
|
download_action = ScheduledAction(
|
|
id=str(uuid.uuid4()),
|
|
project_id=schedule.project_id,
|
|
location_id=schedule.location_id,
|
|
unit_id=unit_id,
|
|
action_type="download",
|
|
device_type=schedule.device_type,
|
|
scheduled_time=download_time,
|
|
execution_status="pending",
|
|
notes=download_notes,
|
|
)
|
|
actions.append(download_action)
|
|
|
|
return actions
|
|
|
|
def _generate_interval_actions(
|
|
self,
|
|
schedule: RecurringSchedule,
|
|
horizon_days: int,
|
|
) -> List[ScheduledAction]:
|
|
"""
|
|
Generate actions from simple interval pattern.
|
|
|
|
For daily cycles: stop, download (optional), start at cycle_time each day.
|
|
"""
|
|
if not schedule.cycle_time:
|
|
return []
|
|
|
|
cycle_time = self._parse_time(schedule.cycle_time)
|
|
if not cycle_time:
|
|
return []
|
|
|
|
actions = []
|
|
tz = ZoneInfo(schedule.timezone)
|
|
now_utc = datetime.utcnow()
|
|
now_local = now_utc.replace(tzinfo=ZoneInfo("UTC")).astimezone(tz)
|
|
|
|
# Get unit_id
|
|
unit_id = self._resolve_unit_id(schedule)
|
|
|
|
for day_offset in range(horizon_days):
|
|
check_date = now_local.date() + timedelta(days=day_offset)
|
|
|
|
# Create cycle datetime in local timezone
|
|
cycle_local = datetime.combine(check_date, cycle_time, tzinfo=tz)
|
|
cycle_utc = cycle_local.astimezone(ZoneInfo("UTC")).replace(tzinfo=None)
|
|
|
|
# Skip if time has passed
|
|
if cycle_utc <= now_utc:
|
|
continue
|
|
|
|
# Check if action already exists
|
|
if self._action_exists(schedule.project_id, schedule.location_id, "stop", cycle_utc):
|
|
continue
|
|
|
|
# Build notes with metadata
|
|
stop_notes = json.dumps({
|
|
"schedule_name": schedule.name,
|
|
"schedule_id": schedule.id,
|
|
"cycle_type": "daily",
|
|
})
|
|
|
|
# Create STOP action
|
|
stop_action = ScheduledAction(
|
|
id=str(uuid.uuid4()),
|
|
project_id=schedule.project_id,
|
|
location_id=schedule.location_id,
|
|
unit_id=unit_id,
|
|
action_type="stop",
|
|
device_type=schedule.device_type,
|
|
scheduled_time=cycle_utc,
|
|
execution_status="pending",
|
|
notes=stop_notes,
|
|
)
|
|
actions.append(stop_action)
|
|
|
|
# Create DOWNLOAD action if enabled (1 minute after stop)
|
|
if schedule.include_download:
|
|
download_time = cycle_utc + timedelta(minutes=1)
|
|
download_notes = json.dumps({
|
|
"schedule_name": schedule.name,
|
|
"schedule_id": schedule.id,
|
|
"cycle_type": "daily",
|
|
})
|
|
download_action = ScheduledAction(
|
|
id=str(uuid.uuid4()),
|
|
project_id=schedule.project_id,
|
|
location_id=schedule.location_id,
|
|
unit_id=unit_id,
|
|
action_type="download",
|
|
device_type=schedule.device_type,
|
|
scheduled_time=download_time,
|
|
execution_status="pending",
|
|
notes=download_notes,
|
|
)
|
|
actions.append(download_action)
|
|
|
|
# Create START action (2 minutes after stop, or 1 minute after download)
|
|
start_offset = 2 if schedule.include_download else 1
|
|
start_time = cycle_utc + timedelta(minutes=start_offset)
|
|
start_notes = json.dumps({
|
|
"schedule_name": schedule.name,
|
|
"schedule_id": schedule.id,
|
|
"cycle_type": "daily",
|
|
"auto_increment_index": schedule.auto_increment_index,
|
|
})
|
|
start_action = ScheduledAction(
|
|
id=str(uuid.uuid4()),
|
|
project_id=schedule.project_id,
|
|
location_id=schedule.location_id,
|
|
unit_id=unit_id,
|
|
action_type="start",
|
|
device_type=schedule.device_type,
|
|
scheduled_time=start_time,
|
|
execution_status="pending",
|
|
notes=start_notes,
|
|
)
|
|
actions.append(start_action)
|
|
|
|
return actions
|
|
|
|
def _calculate_next_occurrence(self, schedule: RecurringSchedule) -> Optional[datetime]:
|
|
"""Calculate when the next action should occur."""
|
|
if not schedule.enabled:
|
|
return None
|
|
|
|
tz = ZoneInfo(schedule.timezone)
|
|
now_utc = datetime.utcnow()
|
|
now_local = now_utc.replace(tzinfo=ZoneInfo("UTC")).astimezone(tz)
|
|
|
|
if schedule.schedule_type == "weekly_calendar" and schedule.weekly_pattern:
|
|
try:
|
|
pattern = json.loads(schedule.weekly_pattern)
|
|
except:
|
|
return None
|
|
|
|
# Find next enabled day
|
|
for day_offset in range(8): # Check up to a week ahead
|
|
check_date = now_local.date() + timedelta(days=day_offset)
|
|
day_name = DAY_NAMES[check_date.weekday()]
|
|
day_config = pattern.get(day_name, {})
|
|
|
|
if day_config.get("enabled") and day_config.get("start"):
|
|
start_time = self._parse_time(day_config["start"])
|
|
if start_time:
|
|
start_local = datetime.combine(check_date, start_time, tzinfo=tz)
|
|
start_utc = start_local.astimezone(ZoneInfo("UTC")).replace(tzinfo=None)
|
|
if start_utc > now_utc:
|
|
return start_utc
|
|
|
|
elif schedule.schedule_type == "simple_interval" and schedule.cycle_time:
|
|
cycle_time = self._parse_time(schedule.cycle_time)
|
|
if cycle_time:
|
|
# Find next cycle time
|
|
for day_offset in range(2):
|
|
check_date = now_local.date() + timedelta(days=day_offset)
|
|
cycle_local = datetime.combine(check_date, cycle_time, tzinfo=tz)
|
|
cycle_utc = cycle_local.astimezone(ZoneInfo("UTC")).replace(tzinfo=None)
|
|
if cycle_utc > now_utc:
|
|
return cycle_utc
|
|
|
|
return None
|
|
|
|
def _resolve_unit_id(self, schedule: RecurringSchedule) -> Optional[str]:
|
|
"""Get unit_id from schedule or active assignment."""
|
|
if schedule.unit_id:
|
|
return schedule.unit_id
|
|
|
|
# Try to get from active assignment
|
|
assignment = self.db.query(UnitAssignment).filter(
|
|
and_(
|
|
UnitAssignment.location_id == schedule.location_id,
|
|
UnitAssignment.status == "active",
|
|
)
|
|
).first()
|
|
|
|
return assignment.unit_id if assignment else None
|
|
|
|
def _action_exists(
|
|
self,
|
|
project_id: str,
|
|
location_id: str,
|
|
action_type: str,
|
|
scheduled_time: datetime,
|
|
) -> bool:
|
|
"""Check if an action already exists for this time slot."""
|
|
# Allow 5-minute window for duplicate detection
|
|
time_window_start = scheduled_time - timedelta(minutes=5)
|
|
time_window_end = scheduled_time + timedelta(minutes=5)
|
|
|
|
exists = self.db.query(ScheduledAction).filter(
|
|
and_(
|
|
ScheduledAction.project_id == project_id,
|
|
ScheduledAction.location_id == location_id,
|
|
ScheduledAction.action_type == action_type,
|
|
ScheduledAction.scheduled_time >= time_window_start,
|
|
ScheduledAction.scheduled_time <= time_window_end,
|
|
ScheduledAction.execution_status == "pending",
|
|
)
|
|
).first()
|
|
|
|
return exists is not None
|
|
|
|
@staticmethod
|
|
def _parse_time(time_str: str) -> Optional[time]:
|
|
"""Parse time string "HH:MM" to time object."""
|
|
try:
|
|
parts = time_str.split(":")
|
|
return time(int(parts[0]), int(parts[1]))
|
|
except (ValueError, IndexError):
|
|
return None
|
|
|
|
def get_schedules_for_project(self, project_id: str) -> List[RecurringSchedule]:
|
|
"""Get all recurring schedules for a project."""
|
|
return self.db.query(RecurringSchedule).filter_by(project_id=project_id).all()
|
|
|
|
def get_enabled_schedules(self) -> List[RecurringSchedule]:
|
|
"""Get all enabled recurring schedules."""
|
|
return self.db.query(RecurringSchedule).filter_by(enabled=True).all()
|
|
|
|
|
|
def get_recurring_schedule_service(db: Session) -> RecurringScheduleService:
|
|
"""Get a RecurringScheduleService instance."""
|
|
return RecurringScheduleService(db)
|