db management system added
This commit is contained in:
192
backend/services/database_backup.py
Normal file
192
backend/services/database_backup.py
Normal file
@@ -0,0 +1,192 @@
|
||||
"""
|
||||
Database Backup and Restore Service
|
||||
Handles full database snapshots, restoration, and remote synchronization
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional
|
||||
import json
|
||||
|
||||
|
||||
class DatabaseBackupService:
|
||||
"""Manages database backup operations"""
|
||||
|
||||
def __init__(self, db_path: str = "./data/seismo_fleet.db", backups_dir: str = "./data/backups"):
|
||||
self.db_path = Path(db_path)
|
||||
self.backups_dir = Path(backups_dir)
|
||||
self.backups_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def create_snapshot(self, description: Optional[str] = None) -> Dict:
|
||||
"""
|
||||
Create a full database snapshot using SQLite backup API
|
||||
Returns snapshot metadata
|
||||
"""
|
||||
if not self.db_path.exists():
|
||||
raise FileNotFoundError(f"Database not found at {self.db_path}")
|
||||
|
||||
# Generate snapshot filename with timestamp
|
||||
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
||||
snapshot_name = f"snapshot_{timestamp}.db"
|
||||
snapshot_path = self.backups_dir / snapshot_name
|
||||
|
||||
# Get database size before backup
|
||||
db_size = self.db_path.stat().st_size
|
||||
|
||||
try:
|
||||
# Use SQLite backup API for safe backup (handles concurrent access)
|
||||
source_conn = sqlite3.connect(str(self.db_path))
|
||||
dest_conn = sqlite3.connect(str(snapshot_path))
|
||||
|
||||
# Perform the backup
|
||||
with dest_conn:
|
||||
source_conn.backup(dest_conn)
|
||||
|
||||
source_conn.close()
|
||||
dest_conn.close()
|
||||
|
||||
# Create metadata
|
||||
metadata = {
|
||||
"filename": snapshot_name,
|
||||
"created_at": timestamp,
|
||||
"created_at_iso": datetime.utcnow().isoformat(),
|
||||
"description": description or "Manual snapshot",
|
||||
"size_bytes": snapshot_path.stat().st_size,
|
||||
"size_mb": round(snapshot_path.stat().st_size / (1024 * 1024), 2),
|
||||
"original_db_size_bytes": db_size,
|
||||
"type": "manual"
|
||||
}
|
||||
|
||||
# Save metadata as JSON sidecar file
|
||||
metadata_path = self.backups_dir / f"{snapshot_name}.meta.json"
|
||||
with open(metadata_path, 'w') as f:
|
||||
json.dump(metadata, f, indent=2)
|
||||
|
||||
return metadata
|
||||
|
||||
except Exception as e:
|
||||
# Clean up partial snapshot if it exists
|
||||
if snapshot_path.exists():
|
||||
snapshot_path.unlink()
|
||||
raise Exception(f"Snapshot creation failed: {str(e)}")
|
||||
|
||||
def list_snapshots(self) -> List[Dict]:
|
||||
"""
|
||||
List all available snapshots with metadata
|
||||
Returns list sorted by creation date (newest first)
|
||||
"""
|
||||
snapshots = []
|
||||
|
||||
for db_file in sorted(self.backups_dir.glob("snapshot_*.db"), reverse=True):
|
||||
metadata_file = self.backups_dir / f"{db_file.name}.meta.json"
|
||||
|
||||
if metadata_file.exists():
|
||||
with open(metadata_file, 'r') as f:
|
||||
metadata = json.load(f)
|
||||
else:
|
||||
# Fallback for legacy snapshots without metadata
|
||||
stat_info = db_file.stat()
|
||||
metadata = {
|
||||
"filename": db_file.name,
|
||||
"created_at": datetime.fromtimestamp(stat_info.st_mtime).strftime("%Y%m%d_%H%M%S"),
|
||||
"created_at_iso": datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
|
||||
"description": "Legacy snapshot",
|
||||
"size_bytes": stat_info.st_size,
|
||||
"size_mb": round(stat_info.st_size / (1024 * 1024), 2),
|
||||
"type": "manual"
|
||||
}
|
||||
|
||||
snapshots.append(metadata)
|
||||
|
||||
return snapshots
|
||||
|
||||
def delete_snapshot(self, filename: str) -> bool:
|
||||
"""Delete a snapshot and its metadata"""
|
||||
snapshot_path = self.backups_dir / filename
|
||||
metadata_path = self.backups_dir / f"{filename}.meta.json"
|
||||
|
||||
if not snapshot_path.exists():
|
||||
raise FileNotFoundError(f"Snapshot {filename} not found")
|
||||
|
||||
snapshot_path.unlink()
|
||||
if metadata_path.exists():
|
||||
metadata_path.unlink()
|
||||
|
||||
return True
|
||||
|
||||
def restore_snapshot(self, filename: str, create_backup_before_restore: bool = True) -> Dict:
|
||||
"""
|
||||
Restore database from a snapshot
|
||||
Creates a safety backup before restoring if requested
|
||||
"""
|
||||
snapshot_path = self.backups_dir / filename
|
||||
|
||||
if not snapshot_path.exists():
|
||||
raise FileNotFoundError(f"Snapshot {filename} not found")
|
||||
|
||||
if not self.db_path.exists():
|
||||
raise FileNotFoundError(f"Database not found at {self.db_path}")
|
||||
|
||||
backup_info = None
|
||||
|
||||
# Create safety backup before restore
|
||||
if create_backup_before_restore:
|
||||
backup_info = self.create_snapshot(description="Auto-backup before restore")
|
||||
|
||||
try:
|
||||
# Replace database file
|
||||
shutil.copy2(str(snapshot_path), str(self.db_path))
|
||||
|
||||
return {
|
||||
"message": "Database restored successfully",
|
||||
"restored_from": filename,
|
||||
"restored_at": datetime.utcnow().isoformat(),
|
||||
"backup_created": backup_info["filename"] if backup_info else None
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"Restore failed: {str(e)}")
|
||||
|
||||
def get_database_stats(self) -> Dict:
|
||||
"""Get statistics about the current database"""
|
||||
if not self.db_path.exists():
|
||||
return {"error": "Database not found"}
|
||||
|
||||
conn = sqlite3.connect(str(self.db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get table counts
|
||||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
|
||||
tables = cursor.fetchall()
|
||||
|
||||
table_stats = {}
|
||||
total_rows = 0
|
||||
|
||||
for (table_name,) in tables:
|
||||
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
|
||||
count = cursor.fetchone()[0]
|
||||
table_stats[table_name] = count
|
||||
total_rows += count
|
||||
|
||||
conn.close()
|
||||
|
||||
db_size = self.db_path.stat().st_size
|
||||
|
||||
return {
|
||||
"database_path": str(self.db_path),
|
||||
"size_bytes": db_size,
|
||||
"size_mb": round(db_size / (1024 * 1024), 2),
|
||||
"total_rows": total_rows,
|
||||
"tables": table_stats,
|
||||
"last_modified": datetime.fromtimestamp(self.db_path.stat().st_mtime).isoformat()
|
||||
}
|
||||
|
||||
def download_snapshot(self, filename: str) -> Path:
|
||||
"""Get the file path for downloading a snapshot"""
|
||||
snapshot_path = self.backups_dir / filename
|
||||
if not snapshot_path.exists():
|
||||
raise FileNotFoundError(f"Snapshot {filename} not found")
|
||||
return snapshot_path
|
||||
Reference in New Issue
Block a user