""" Automatic Database Backup Scheduler Handles scheduled automatic backups of the database """ import schedule import time import threading from datetime import datetime from typing import Optional import logging from app.seismo.services.database_backup import DatabaseBackupService logger = logging.getLogger(__name__) class BackupScheduler: """Manages automatic database backups on a schedule""" def __init__(self, db_path: str = "./data/seismo_fleet.db", backups_dir: str = "./data/backups"): self.backup_service = DatabaseBackupService(db_path=db_path, backups_dir=backups_dir) self.scheduler_thread: Optional[threading.Thread] = None self.is_running = False # Default settings self.backup_interval_hours = 24 # Daily backups self.keep_count = 10 # Keep last 10 backups self.enabled = False def configure(self, interval_hours: int = 24, keep_count: int = 10, enabled: bool = True): """ Configure backup scheduler settings Args: interval_hours: Hours between automatic backups keep_count: Number of backups to retain enabled: Whether automatic backups are enabled """ self.backup_interval_hours = interval_hours self.keep_count = keep_count self.enabled = enabled logger.info(f"Backup scheduler configured: interval={interval_hours}h, keep={keep_count}, enabled={enabled}") def create_automatic_backup(self): """Create an automatic backup and cleanup old ones""" if not self.enabled: logger.info("Automatic backups are disabled, skipping") return try: timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC") description = f"Automatic backup - {timestamp}" logger.info("Creating automatic backup...") snapshot = self.backup_service.create_snapshot(description=description) logger.info(f"Automatic backup created: {snapshot['filename']} ({snapshot['size_mb']} MB)") # Cleanup old backups cleanup_result = self.backup_service.cleanup_old_snapshots(keep_count=self.keep_count) if cleanup_result['deleted'] > 0: logger.info(f"Cleaned up {cleanup_result['deleted']} old snapshots") return snapshot except Exception as e: logger.error(f"Automatic backup failed: {str(e)}") return None def start(self): """Start the backup scheduler in a background thread""" if self.is_running: logger.warning("Backup scheduler is already running") return if not self.enabled: logger.info("Backup scheduler is disabled, not starting") return logger.info(f"Starting backup scheduler (every {self.backup_interval_hours} hours)") # Clear any existing scheduled jobs schedule.clear() # Schedule the backup job schedule.every(self.backup_interval_hours).hours.do(self.create_automatic_backup) # Also run immediately on startup self.create_automatic_backup() # Start the scheduler thread self.is_running = True self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True) self.scheduler_thread.start() logger.info("Backup scheduler started successfully") def _run_scheduler(self): """Internal method to run the scheduler loop""" while self.is_running: schedule.run_pending() time.sleep(60) # Check every minute def stop(self): """Stop the backup scheduler""" if not self.is_running: logger.warning("Backup scheduler is not running") return logger.info("Stopping backup scheduler...") self.is_running = False schedule.clear() if self.scheduler_thread: self.scheduler_thread.join(timeout=5) logger.info("Backup scheduler stopped") def get_status(self) -> dict: """Get current scheduler status""" next_run = None if self.is_running and schedule.jobs: next_run = schedule.jobs[0].next_run.isoformat() if schedule.jobs[0].next_run else None return { "enabled": self.enabled, "running": self.is_running, "interval_hours": self.backup_interval_hours, "keep_count": self.keep_count, "next_run": next_run } # Global scheduler instance _scheduler_instance: Optional[BackupScheduler] = None def get_backup_scheduler() -> BackupScheduler: """Get or create the global backup scheduler instance""" global _scheduler_instance if _scheduler_instance is None: _scheduler_instance = BackupScheduler() return _scheduler_instance