chore: modular monolith folder split (no behavior change)

This commit is contained in:
serversdwn
2026-01-08 20:54:30 +00:00
parent 893cb96e8d
commit 991aaca34b
90 changed files with 16129 additions and 28 deletions

View File

@@ -0,0 +1,192 @@
"""
Database Backup and Restore Service
Handles full database snapshots, restoration, and remote synchronization
"""
import os
import shutil
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional
import json
class DatabaseBackupService:
"""Manages database backup operations"""
def __init__(self, db_path: str = "./data/seismo_fleet.db", backups_dir: str = "./data/backups"):
self.db_path = Path(db_path)
self.backups_dir = Path(backups_dir)
self.backups_dir.mkdir(parents=True, exist_ok=True)
def create_snapshot(self, description: Optional[str] = None) -> Dict:
"""
Create a full database snapshot using SQLite backup API
Returns snapshot metadata
"""
if not self.db_path.exists():
raise FileNotFoundError(f"Database not found at {self.db_path}")
# Generate snapshot filename with timestamp
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
snapshot_name = f"snapshot_{timestamp}.db"
snapshot_path = self.backups_dir / snapshot_name
# Get database size before backup
db_size = self.db_path.stat().st_size
try:
# Use SQLite backup API for safe backup (handles concurrent access)
source_conn = sqlite3.connect(str(self.db_path))
dest_conn = sqlite3.connect(str(snapshot_path))
# Perform the backup
with dest_conn:
source_conn.backup(dest_conn)
source_conn.close()
dest_conn.close()
# Create metadata
metadata = {
"filename": snapshot_name,
"created_at": timestamp,
"created_at_iso": datetime.utcnow().isoformat(),
"description": description or "Manual snapshot",
"size_bytes": snapshot_path.stat().st_size,
"size_mb": round(snapshot_path.stat().st_size / (1024 * 1024), 2),
"original_db_size_bytes": db_size,
"type": "manual"
}
# Save metadata as JSON sidecar file
metadata_path = self.backups_dir / f"{snapshot_name}.meta.json"
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
return metadata
except Exception as e:
# Clean up partial snapshot if it exists
if snapshot_path.exists():
snapshot_path.unlink()
raise Exception(f"Snapshot creation failed: {str(e)}")
def list_snapshots(self) -> List[Dict]:
"""
List all available snapshots with metadata
Returns list sorted by creation date (newest first)
"""
snapshots = []
for db_file in sorted(self.backups_dir.glob("snapshot_*.db"), reverse=True):
metadata_file = self.backups_dir / f"{db_file.name}.meta.json"
if metadata_file.exists():
with open(metadata_file, 'r') as f:
metadata = json.load(f)
else:
# Fallback for legacy snapshots without metadata
stat_info = db_file.stat()
metadata = {
"filename": db_file.name,
"created_at": datetime.fromtimestamp(stat_info.st_mtime).strftime("%Y%m%d_%H%M%S"),
"created_at_iso": datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
"description": "Legacy snapshot",
"size_bytes": stat_info.st_size,
"size_mb": round(stat_info.st_size / (1024 * 1024), 2),
"type": "manual"
}
snapshots.append(metadata)
return snapshots
def delete_snapshot(self, filename: str) -> bool:
"""Delete a snapshot and its metadata"""
snapshot_path = self.backups_dir / filename
metadata_path = self.backups_dir / f"{filename}.meta.json"
if not snapshot_path.exists():
raise FileNotFoundError(f"Snapshot {filename} not found")
snapshot_path.unlink()
if metadata_path.exists():
metadata_path.unlink()
return True
def restore_snapshot(self, filename: str, create_backup_before_restore: bool = True) -> Dict:
"""
Restore database from a snapshot
Creates a safety backup before restoring if requested
"""
snapshot_path = self.backups_dir / filename
if not snapshot_path.exists():
raise FileNotFoundError(f"Snapshot {filename} not found")
if not self.db_path.exists():
raise FileNotFoundError(f"Database not found at {self.db_path}")
backup_info = None
# Create safety backup before restore
if create_backup_before_restore:
backup_info = self.create_snapshot(description="Auto-backup before restore")
try:
# Replace database file
shutil.copy2(str(snapshot_path), str(self.db_path))
return {
"message": "Database restored successfully",
"restored_from": filename,
"restored_at": datetime.utcnow().isoformat(),
"backup_created": backup_info["filename"] if backup_info else None
}
except Exception as e:
raise Exception(f"Restore failed: {str(e)}")
def get_database_stats(self) -> Dict:
"""Get statistics about the current database"""
if not self.db_path.exists():
return {"error": "Database not found"}
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
# Get table counts
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = cursor.fetchall()
table_stats = {}
total_rows = 0
for (table_name,) in tables:
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
count = cursor.fetchone()[0]
table_stats[table_name] = count
total_rows += count
conn.close()
db_size = self.db_path.stat().st_size
return {
"database_path": str(self.db_path),
"size_bytes": db_size,
"size_mb": round(db_size / (1024 * 1024), 2),
"total_rows": total_rows,
"tables": table_stats,
"last_modified": datetime.fromtimestamp(self.db_path.stat().st_mtime).isoformat()
}
def download_snapshot(self, filename: str) -> Path:
"""Get the file path for downloading a snapshot"""
snapshot_path = self.backups_dir / filename
if not snapshot_path.exists():
raise FileNotFoundError(f"Snapshot {filename} not found")
return snapshot_path