296 lines
9.3 KiB
Python
296 lines
9.3 KiB
Python
"""
|
|
Calibration Sync Service
|
|
|
|
Pulls device-reported calibration dates from SFM event sidecars and updates
|
|
RosterUnit.last_calibrated when the device has a newer record than what
|
|
Terra-View has stored.
|
|
|
|
Conflict rule: events-as-truth, but don't go backwards.
|
|
- If the newest event's calibration_date == unit.last_calibrated → no-op.
|
|
- If the last UnitHistory change for last_calibrated is newer than the
|
|
newest event's timestamp → skip (a manual edit was made after this
|
|
event landed; manual wins until a fresher event arrives).
|
|
- Otherwise → write the event's calibration_date, recompute
|
|
next_calibration_due, and log a UnitHistory row with source='sfm_event'.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
from datetime import datetime, date, timedelta
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
import httpx
|
|
import schedule
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.database import SessionLocal
|
|
from backend.models import RosterUnit, UnitHistory, UserPreferences
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
|
|
|
|
|
|
def _get_cal_interval(db: Session) -> int:
|
|
prefs = db.query(UserPreferences).first()
|
|
if prefs and prefs.calibration_interval_days:
|
|
return prefs.calibration_interval_days
|
|
return 365
|
|
|
|
|
|
def _parse_event_ts(value: Any) -> Optional[datetime]:
|
|
if not value:
|
|
return None
|
|
if isinstance(value, datetime):
|
|
return value.replace(tzinfo=None) if value.tzinfo else value
|
|
try:
|
|
s = str(value).replace("Z", "")
|
|
if "+" in s:
|
|
s = s.split("+", 1)[0]
|
|
return datetime.fromisoformat(s)
|
|
except (ValueError, TypeError):
|
|
logger.warning(f"Could not parse event timestamp: {value!r}")
|
|
return None
|
|
|
|
|
|
def _parse_cal_date(value: Any) -> Optional[date]:
|
|
if not value:
|
|
return None
|
|
if isinstance(value, date) and not isinstance(value, datetime):
|
|
return value
|
|
if isinstance(value, datetime):
|
|
return value.date()
|
|
try:
|
|
return datetime.fromisoformat(str(value)).date()
|
|
except (ValueError, TypeError):
|
|
try:
|
|
return datetime.strptime(str(value), "%Y-%m-%d").date()
|
|
except (ValueError, TypeError):
|
|
logger.warning(f"Could not parse calibration_date: {value!r}")
|
|
return None
|
|
|
|
|
|
async def _get_latest_event(client: httpx.AsyncClient, serial: str) -> Optional[Dict[str, Any]]:
|
|
try:
|
|
resp = await client.get(
|
|
f"{SFM_BASE_URL}/db/events",
|
|
params={"serial": serial, "limit": 1},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
events = data.get("events", [])
|
|
return events[0] if events else None
|
|
except (httpx.HTTPError, ValueError) as e:
|
|
logger.warning(f"Failed to fetch latest event for {serial}: {e}")
|
|
return None
|
|
|
|
|
|
async def _get_event_sidecar(client: httpx.AsyncClient, event_id: str) -> Optional[Dict[str, Any]]:
|
|
try:
|
|
resp = await client.get(f"{SFM_BASE_URL}/db/events/{event_id}/sidecar")
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except (httpx.HTTPError, ValueError) as e:
|
|
logger.warning(f"Failed to fetch sidecar for event {event_id}: {e}")
|
|
return None
|
|
|
|
|
|
async def sync_unit_calibration(
|
|
db: Session,
|
|
unit: RosterUnit,
|
|
client: httpx.AsyncClient,
|
|
) -> Dict[str, Any]:
|
|
"""Sync calibration for one seismograph unit. Returns a result dict."""
|
|
result: Dict[str, Any] = {
|
|
"unit_id": unit.id,
|
|
"action": "checked",
|
|
"old": unit.last_calibrated.isoformat() if unit.last_calibrated else None,
|
|
"new": None,
|
|
"event_id": None,
|
|
}
|
|
|
|
event = await _get_latest_event(client, unit.id)
|
|
if not event:
|
|
result["action"] = "no_event"
|
|
return result
|
|
|
|
sidecar = await _get_event_sidecar(client, event["id"])
|
|
if not sidecar:
|
|
result["action"] = "no_sidecar"
|
|
return result
|
|
|
|
device = sidecar.get("device") or {}
|
|
event_cal = _parse_cal_date(device.get("calibration_date"))
|
|
if not event_cal:
|
|
result["action"] = "no_cal_in_sidecar"
|
|
return result
|
|
|
|
result["event_id"] = event["id"]
|
|
result["new"] = event_cal.isoformat()
|
|
|
|
if unit.last_calibrated == event_cal:
|
|
result["action"] = "already_in_sync"
|
|
return result
|
|
|
|
event_ts = _parse_event_ts(event.get("timestamp"))
|
|
last_change = (
|
|
db.query(UnitHistory)
|
|
.filter(
|
|
UnitHistory.unit_id == unit.id,
|
|
UnitHistory.field_name == "last_calibrated",
|
|
)
|
|
.order_by(UnitHistory.changed_at.desc())
|
|
.first()
|
|
)
|
|
|
|
if last_change and event_ts and last_change.changed_at > event_ts:
|
|
result["action"] = "skipped_manual_newer"
|
|
return result
|
|
|
|
old_cal = unit.last_calibrated
|
|
unit.last_calibrated = event_cal
|
|
unit.next_calibration_due = event_cal + timedelta(days=_get_cal_interval(db))
|
|
|
|
db.add(UnitHistory(
|
|
unit_id=unit.id,
|
|
change_type="calibration_status_change",
|
|
field_name="last_calibrated",
|
|
old_value=old_cal.strftime("%Y-%m-%d") if old_cal else None,
|
|
new_value=event_cal.strftime("%Y-%m-%d"),
|
|
source="sfm_event",
|
|
notes=f"Synced from event {event['id']}",
|
|
))
|
|
|
|
result["action"] = "updated"
|
|
return result
|
|
|
|
|
|
async def sync_all_calibrations(db: Optional[Session] = None) -> Dict[str, Any]:
|
|
"""Sync calibration for every non-retired seismograph.
|
|
|
|
If `db` is provided the caller owns the session and commit. Otherwise
|
|
a session is opened, committed, and closed locally — this is what the
|
|
scheduled job uses.
|
|
"""
|
|
owns_session = db is None
|
|
if owns_session:
|
|
db = SessionLocal()
|
|
|
|
summary: Dict[str, Any] = {
|
|
"started_at": datetime.utcnow().isoformat(),
|
|
"checked": 0,
|
|
"updated": 0,
|
|
"skipped_manual_newer": 0,
|
|
"already_in_sync": 0,
|
|
"no_event": 0,
|
|
"no_sidecar": 0,
|
|
"no_cal_in_sidecar": 0,
|
|
"errors": 0,
|
|
"results": [],
|
|
}
|
|
|
|
try:
|
|
units = (
|
|
db.query(RosterUnit)
|
|
.filter(
|
|
RosterUnit.retired == False,
|
|
RosterUnit.device_type == "seismograph",
|
|
)
|
|
.all()
|
|
)
|
|
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
for unit in units:
|
|
summary["checked"] += 1
|
|
try:
|
|
r = await sync_unit_calibration(db, unit, client)
|
|
except Exception as e:
|
|
logger.exception(f"Error syncing calibration for {unit.id}")
|
|
summary["errors"] += 1
|
|
summary["results"].append({"unit_id": unit.id, "action": "error", "error": str(e)})
|
|
continue
|
|
|
|
summary["results"].append(r)
|
|
action = r["action"]
|
|
if action in summary:
|
|
summary[action] += 1
|
|
|
|
if owns_session:
|
|
db.commit()
|
|
|
|
finally:
|
|
if owns_session:
|
|
db.close()
|
|
|
|
summary["finished_at"] = datetime.utcnow().isoformat()
|
|
logger.info(
|
|
f"Calibration sync done: checked={summary['checked']} "
|
|
f"updated={summary['updated']} skipped_manual={summary['skipped_manual_newer']} "
|
|
f"in_sync={summary['already_in_sync']} errors={summary['errors']}"
|
|
)
|
|
return summary
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Background scheduler — runs once daily. Modeled on backup_scheduler.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class CalibrationSyncScheduler:
|
|
"""Runs sync_all_calibrations() once per day at a fixed local time."""
|
|
|
|
def __init__(self, run_at: str = "03:15"):
|
|
self.run_at = run_at
|
|
self.is_running = False
|
|
self.thread: Optional[threading.Thread] = None
|
|
self.last_run: Optional[Dict[str, Any]] = None
|
|
|
|
def _job_wrapper(self):
|
|
"""Run the async sync in a fresh event loop (we're on a worker thread)."""
|
|
try:
|
|
self.last_run = asyncio.run(sync_all_calibrations())
|
|
except Exception as e:
|
|
logger.exception(f"Calibration sync job failed: {e}")
|
|
self.last_run = {"error": str(e), "finished_at": datetime.utcnow().isoformat()}
|
|
|
|
def start(self):
|
|
if self.is_running:
|
|
return
|
|
logger.info(f"Starting calibration sync scheduler (daily at {self.run_at})")
|
|
schedule.every().day.at(self.run_at).do(self._job_wrapper)
|
|
self.is_running = True
|
|
self.thread = threading.Thread(target=self._loop, daemon=True)
|
|
self.thread.start()
|
|
|
|
def _loop(self):
|
|
while self.is_running:
|
|
schedule.run_pending()
|
|
time.sleep(60)
|
|
|
|
def stop(self):
|
|
if not self.is_running:
|
|
return
|
|
logger.info("Stopping calibration sync scheduler")
|
|
self.is_running = False
|
|
if self.thread:
|
|
self.thread.join(timeout=5)
|
|
|
|
def status(self) -> Dict[str, Any]:
|
|
return {
|
|
"running": self.is_running,
|
|
"run_at": self.run_at,
|
|
"last_run": self.last_run,
|
|
}
|
|
|
|
|
|
_scheduler: Optional[CalibrationSyncScheduler] = None
|
|
|
|
|
|
def get_calibration_sync_scheduler() -> CalibrationSyncScheduler:
|
|
global _scheduler
|
|
if _scheduler is None:
|
|
_scheduler = CalibrationSyncScheduler()
|
|
return _scheduler
|