diff --git a/backend/main.py b/backend/main.py index db48b2f..8e8219d 100644 --- a/backend/main.py +++ b/backend/main.py @@ -144,9 +144,14 @@ app.include_router(fleet_calendar.router) from backend.routers import deployments app.include_router(deployments.router) +# Calibration sync router (SFM-driven cal date updates) +from backend.routers import calibration +app.include_router(calibration.router) + # Start scheduler service and device status monitor on application startup from backend.services.scheduler import start_scheduler, stop_scheduler from backend.services.device_status_monitor import start_device_status_monitor, stop_device_status_monitor +from backend.services.calibration_sync import get_calibration_sync_scheduler @app.on_event("startup") async def startup_event(): @@ -159,6 +164,10 @@ async def startup_event(): await start_device_status_monitor() logger.info("Device status monitor started") + logger.info("Starting calibration sync scheduler...") + get_calibration_sync_scheduler().start() + logger.info("Calibration sync scheduler started") + @app.on_event("shutdown") def shutdown_event(): """Clean up services on app shutdown""" @@ -170,6 +179,10 @@ def shutdown_event(): stop_scheduler() logger.info("Scheduler service stopped") + logger.info("Stopping calibration sync scheduler...") + get_calibration_sync_scheduler().stop() + logger.info("Calibration sync scheduler stopped") + # Legacy routes from the original backend from backend import routes as legacy_routes diff --git a/backend/routers/calibration.py b/backend/routers/calibration.py new file mode 100644 index 0000000..f0edd52 --- /dev/null +++ b/backend/routers/calibration.py @@ -0,0 +1,31 @@ +""" +Calibration Sync Router + +Endpoints for triggering and inspecting the SFM-driven calibration sync. +The scheduled job runs daily; this router is what the "Sync now" button in +Settings calls, plus a status endpoint for diagnostics. +""" + +from fastapi import APIRouter +from typing import Dict, Any + +from backend.services.calibration_sync import ( + sync_all_calibrations, + get_calibration_sync_scheduler, +) + +router = APIRouter(prefix="/api/calibration", tags=["calibration"]) + + +@router.post("/sync") +async def trigger_calibration_sync() -> Dict[str, Any]: + """Run a full calibration sync now and return the summary.""" + summary = await sync_all_calibrations() + get_calibration_sync_scheduler().last_run = summary + return summary + + +@router.get("/sync/status") +def calibration_sync_status() -> Dict[str, Any]: + """Return scheduler status and the most recent run's summary.""" + return get_calibration_sync_scheduler().status() diff --git a/backend/services/calibration_sync.py b/backend/services/calibration_sync.py new file mode 100644 index 0000000..085fe48 --- /dev/null +++ b/backend/services/calibration_sync.py @@ -0,0 +1,295 @@ +""" +Calibration Sync Service + +Pulls device-reported calibration dates from SFM event sidecars and updates +RosterUnit.last_calibrated when the device has a newer record than what +Terra-View has stored. + +Conflict rule: events-as-truth, but don't go backwards. + - If the newest event's calibration_date == unit.last_calibrated → no-op. + - If the last UnitHistory change for last_calibrated is newer than the + newest event's timestamp → skip (a manual edit was made after this + event landed; manual wins until a fresher event arrives). + - Otherwise → write the event's calibration_date, recompute + next_calibration_due, and log a UnitHistory row with source='sfm_event'. +""" + +import asyncio +import logging +import os +import threading +import time +from datetime import datetime, date, timedelta +from typing import Optional, Dict, Any, List + +import httpx +import schedule +from sqlalchemy.orm import Session + +from backend.database import SessionLocal +from backend.models import RosterUnit, UnitHistory, UserPreferences + +logger = logging.getLogger(__name__) + +SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200") + + +def _get_cal_interval(db: Session) -> int: + prefs = db.query(UserPreferences).first() + if prefs and prefs.calibration_interval_days: + return prefs.calibration_interval_days + return 365 + + +def _parse_event_ts(value: Any) -> Optional[datetime]: + if not value: + return None + if isinstance(value, datetime): + return value.replace(tzinfo=None) if value.tzinfo else value + try: + s = str(value).replace("Z", "") + if "+" in s: + s = s.split("+", 1)[0] + return datetime.fromisoformat(s) + except (ValueError, TypeError): + logger.warning(f"Could not parse event timestamp: {value!r}") + return None + + +def _parse_cal_date(value: Any) -> Optional[date]: + if not value: + return None + if isinstance(value, date) and not isinstance(value, datetime): + return value + if isinstance(value, datetime): + return value.date() + try: + return datetime.fromisoformat(str(value)).date() + except (ValueError, TypeError): + try: + return datetime.strptime(str(value), "%Y-%m-%d").date() + except (ValueError, TypeError): + logger.warning(f"Could not parse calibration_date: {value!r}") + return None + + +async def _get_latest_event(client: httpx.AsyncClient, serial: str) -> Optional[Dict[str, Any]]: + try: + resp = await client.get( + f"{SFM_BASE_URL}/db/events", + params={"serial": serial, "limit": 1}, + ) + resp.raise_for_status() + data = resp.json() + events = data.get("events", []) + return events[0] if events else None + except (httpx.HTTPError, ValueError) as e: + logger.warning(f"Failed to fetch latest event for {serial}: {e}") + return None + + +async def _get_event_sidecar(client: httpx.AsyncClient, event_id: str) -> Optional[Dict[str, Any]]: + try: + resp = await client.get(f"{SFM_BASE_URL}/db/events/{event_id}/sidecar") + resp.raise_for_status() + return resp.json() + except (httpx.HTTPError, ValueError) as e: + logger.warning(f"Failed to fetch sidecar for event {event_id}: {e}") + return None + + +async def sync_unit_calibration( + db: Session, + unit: RosterUnit, + client: httpx.AsyncClient, +) -> Dict[str, Any]: + """Sync calibration for one seismograph unit. Returns a result dict.""" + result: Dict[str, Any] = { + "unit_id": unit.id, + "action": "checked", + "old": unit.last_calibrated.isoformat() if unit.last_calibrated else None, + "new": None, + "event_id": None, + } + + event = await _get_latest_event(client, unit.id) + if not event: + result["action"] = "no_event" + return result + + sidecar = await _get_event_sidecar(client, event["id"]) + if not sidecar: + result["action"] = "no_sidecar" + return result + + device = sidecar.get("device") or {} + event_cal = _parse_cal_date(device.get("calibration_date")) + if not event_cal: + result["action"] = "no_cal_in_sidecar" + return result + + result["event_id"] = event["id"] + result["new"] = event_cal.isoformat() + + if unit.last_calibrated == event_cal: + result["action"] = "already_in_sync" + return result + + event_ts = _parse_event_ts(event.get("timestamp")) + last_change = ( + db.query(UnitHistory) + .filter( + UnitHistory.unit_id == unit.id, + UnitHistory.field_name == "last_calibrated", + ) + .order_by(UnitHistory.changed_at.desc()) + .first() + ) + + if last_change and event_ts and last_change.changed_at > event_ts: + result["action"] = "skipped_manual_newer" + return result + + old_cal = unit.last_calibrated + unit.last_calibrated = event_cal + unit.next_calibration_due = event_cal + timedelta(days=_get_cal_interval(db)) + + db.add(UnitHistory( + unit_id=unit.id, + change_type="calibration_status_change", + field_name="last_calibrated", + old_value=old_cal.strftime("%Y-%m-%d") if old_cal else None, + new_value=event_cal.strftime("%Y-%m-%d"), + source="sfm_event", + notes=f"Synced from event {event['id']}", + )) + + result["action"] = "updated" + return result + + +async def sync_all_calibrations(db: Optional[Session] = None) -> Dict[str, Any]: + """Sync calibration for every non-retired seismograph. + + If `db` is provided the caller owns the session and commit. Otherwise + a session is opened, committed, and closed locally — this is what the + scheduled job uses. + """ + owns_session = db is None + if owns_session: + db = SessionLocal() + + summary: Dict[str, Any] = { + "started_at": datetime.utcnow().isoformat(), + "checked": 0, + "updated": 0, + "skipped_manual_newer": 0, + "already_in_sync": 0, + "no_event": 0, + "no_sidecar": 0, + "no_cal_in_sidecar": 0, + "errors": 0, + "results": [], + } + + try: + units = ( + db.query(RosterUnit) + .filter( + RosterUnit.retired == False, + RosterUnit.device_type == "seismograph", + ) + .all() + ) + + async with httpx.AsyncClient(timeout=15.0) as client: + for unit in units: + summary["checked"] += 1 + try: + r = await sync_unit_calibration(db, unit, client) + except Exception as e: + logger.exception(f"Error syncing calibration for {unit.id}") + summary["errors"] += 1 + summary["results"].append({"unit_id": unit.id, "action": "error", "error": str(e)}) + continue + + summary["results"].append(r) + action = r["action"] + if action in summary: + summary[action] += 1 + + if owns_session: + db.commit() + + finally: + if owns_session: + db.close() + + summary["finished_at"] = datetime.utcnow().isoformat() + logger.info( + f"Calibration sync done: checked={summary['checked']} " + f"updated={summary['updated']} skipped_manual={summary['skipped_manual_newer']} " + f"in_sync={summary['already_in_sync']} errors={summary['errors']}" + ) + return summary + + +# --------------------------------------------------------------------------- +# Background scheduler — runs once daily. Modeled on backup_scheduler. +# --------------------------------------------------------------------------- + +class CalibrationSyncScheduler: + """Runs sync_all_calibrations() once per day at a fixed local time.""" + + def __init__(self, run_at: str = "03:15"): + self.run_at = run_at + self.is_running = False + self.thread: Optional[threading.Thread] = None + self.last_run: Optional[Dict[str, Any]] = None + + def _job_wrapper(self): + """Run the async sync in a fresh event loop (we're on a worker thread).""" + try: + self.last_run = asyncio.run(sync_all_calibrations()) + except Exception as e: + logger.exception(f"Calibration sync job failed: {e}") + self.last_run = {"error": str(e), "finished_at": datetime.utcnow().isoformat()} + + def start(self): + if self.is_running: + return + logger.info(f"Starting calibration sync scheduler (daily at {self.run_at})") + schedule.every().day.at(self.run_at).do(self._job_wrapper) + self.is_running = True + self.thread = threading.Thread(target=self._loop, daemon=True) + self.thread.start() + + def _loop(self): + while self.is_running: + schedule.run_pending() + time.sleep(60) + + def stop(self): + if not self.is_running: + return + logger.info("Stopping calibration sync scheduler") + self.is_running = False + if self.thread: + self.thread.join(timeout=5) + + def status(self) -> Dict[str, Any]: + return { + "running": self.is_running, + "run_at": self.run_at, + "last_run": self.last_run, + } + + +_scheduler: Optional[CalibrationSyncScheduler] = None + + +def get_calibration_sync_scheduler() -> CalibrationSyncScheduler: + global _scheduler + if _scheduler is None: + _scheduler = CalibrationSyncScheduler() + return _scheduler diff --git a/templates/settings.html b/templates/settings.html index c97b3b1..59d8a6a 100644 --- a/templates/settings.html +++ b/templates/settings.html @@ -472,6 +472,20 @@ + +
+

Sync from SFM events

+

+ Reads calibration_date from each seismograph's most recent event sidecar and updates + Last Calibrated when the device reports a newer date than what's stored. + Manual edits made after the latest event are preserved. Runs automatically once a day. +

+ +
+
@@ -890,6 +904,41 @@ async function saveCalibrationDefaults() { } } +async function runCalibrationSync() { + const btn = document.getElementById('cal-sync-btn'); + const out = document.getElementById('cal-sync-result'); + btn.disabled = true; + const originalLabel = btn.textContent; + btn.textContent = 'Syncing…'; + out.textContent = ''; + out.className = 'mt-3 text-sm text-gray-700 dark:text-gray-300'; + + try { + const response = await fetch('/api/calibration/sync', { method: 'POST' }); + const data = await response.json(); + if (!response.ok) { + out.className = 'mt-3 text-sm text-red-600 dark:text-red-400'; + out.textContent = 'Error: ' + (data.detail || response.statusText); + return; + } + const parts = [ + `Checked ${data.checked}`, + `Updated ${data.updated}`, + `Already in sync ${data.already_in_sync}`, + `Manual kept ${data.skipped_manual_newer}`, + `No event ${data.no_event}`, + ]; + if (data.errors) parts.push(`Errors ${data.errors}`); + out.textContent = parts.join(' · '); + } catch (error) { + out.className = 'mt-3 text-sm text-red-600 dark:text-red-400'; + out.textContent = 'Error: ' + error.message; + } finally { + btn.disabled = false; + btn.textContent = originalLabel; + } +} + // ========== DATA TAB - IMPORT/EXPORT ========== // Merge Mode Import