""" sfm/database.py — SQLite persistence layer for seismo-relay. Three tables, all keyed by unit serial number: ach_sessions — one row per inbound ACH call-home events — one row per triggered waveform event (deduped by serial+key) monitor_log — one row per monitoring interval (deduped by serial+key) The DB file lives at: /seismo_relay.db (default: bridges/captures/seismo_relay.db) Usage ----- from sfm.database import SeismoDb db = SeismoDb("bridges/captures/seismo_relay.db") # Write a call-home session session_id = db.insert_ach_session(serial="BE11529", peer="1.2.3.4:51920", events_downloaded=3, monitor_entries=2, duration_seconds=47.3) # Write events (silently skips duplicates) db.insert_events(events, serial="BE11529", session_id=session_id) # Write monitor log entries db.insert_monitor_log(entries, session_id=session_id) # Query rows = db.query_events(serial="BE11529", from_dt=datetime(...), to_dt=datetime(...)) """ from __future__ import annotations import datetime import logging import sqlite3 import uuid from pathlib import Path from typing import Optional from minimateplus.models import Event, MonitorLogEntry log = logging.getLogger("sfm.database") # ── Schema ───────────────────────────────────────────────────────────────────── _SCHEMA = """ PRAGMA journal_mode = WAL; PRAGMA foreign_keys = ON; CREATE TABLE IF NOT EXISTS ach_sessions ( id TEXT PRIMARY KEY, -- UUID serial TEXT NOT NULL, session_time TEXT NOT NULL, -- ISO-8601 UTC peer TEXT, -- "ip:port" events_downloaded INTEGER NOT NULL DEFAULT 0, monitor_entries INTEGER NOT NULL DEFAULT 0, duration_seconds REAL, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) ); CREATE INDEX IF NOT EXISTS idx_ach_sessions_serial ON ach_sessions(serial); CREATE INDEX IF NOT EXISTS idx_ach_sessions_time ON ach_sessions(session_time); CREATE TABLE IF NOT EXISTS events ( id TEXT PRIMARY KEY, -- UUID serial TEXT NOT NULL, waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field) session_id TEXT, -- FK → ach_sessions.id timestamp TEXT, -- ISO-8601 local time from device tran_ppv REAL, -- in/s vert_ppv REAL, -- in/s long_ppv REAL, -- in/s peak_vector_sum REAL, -- in/s mic_ppv REAL, -- psi or dB depending on setup project TEXT, client TEXT, operator TEXT, sensor_location TEXT, sample_rate INTEGER, record_type TEXT, -- "single_shot" | "continuous" false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag) created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), UNIQUE(serial, waveform_key) ); CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial); CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp); CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id); CREATE TABLE IF NOT EXISTS monitor_log ( id TEXT PRIMARY KEY, -- UUID serial TEXT NOT NULL, waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field) session_id TEXT, -- FK → ach_sessions.id start_time TEXT, -- ISO-8601 stop_time TEXT, -- ISO-8601 duration_seconds REAL, geo_threshold_ips REAL, -- in/s created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), UNIQUE(serial, waveform_key) ); CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial); CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time); CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id); """ # ── SeismoDb class ───────────────────────────────────────────────────────────── class SeismoDb: """ Thin SQLite wrapper for seismo-relay persistence. Thread-safe: each call opens, uses, and closes a connection with check_same_thread=False and WAL mode enabled. For the ACH server's single-writer / occasional-reader pattern this is more than sufficient. """ def __init__(self, db_path: str | Path) -> None: self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self._init_schema() log.info("SeismoDb initialised at %s", self.db_path) # ── Internal helpers ─────────────────────────────────────────────────────── def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(str(self.db_path), check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode = WAL") conn.execute("PRAGMA foreign_keys = ON") return conn def _init_schema(self) -> None: with self._connect() as conn: conn.executescript(_SCHEMA) @staticmethod def _iso(dt: Optional[datetime.datetime]) -> Optional[str]: return dt.isoformat() if dt is not None else None @staticmethod def _new_id() -> str: return str(uuid.uuid4()) # ── ACH sessions ────────────────────────────────────────────────────────── def insert_ach_session( self, *, serial: str, peer: Optional[str] = None, events_downloaded: int = 0, monitor_entries: int = 0, duration_seconds: Optional[float] = None, session_time: Optional[datetime.datetime] = None, ) -> str: """Insert a new ACH session row. Returns the new session UUID.""" sid = self._new_id() ts = self._iso(session_time or datetime.datetime.utcnow()) with self._connect() as conn: conn.execute( """ INSERT INTO ach_sessions (id, serial, session_time, peer, events_downloaded, monitor_entries, duration_seconds) VALUES (?, ?, ?, ?, ?, ?, ?) """, (sid, serial, ts, peer, events_downloaded, monitor_entries, duration_seconds), ) log.debug("ach_session inserted: %s serial=%s events=%d monitor=%d", sid, serial, events_downloaded, monitor_entries) return sid def get_sessions( self, serial: Optional[str] = None, limit: int = 50, ) -> list[dict]: """Return recent ACH sessions, newest first.""" with self._connect() as conn: if serial: rows = conn.execute( "SELECT * FROM ach_sessions WHERE serial=? " "ORDER BY session_time DESC LIMIT ?", (serial, limit), ).fetchall() else: rows = conn.execute( "SELECT * FROM ach_sessions ORDER BY session_time DESC LIMIT ?", (limit,), ).fetchall() return [dict(r) for r in rows] # ── Events ──────────────────────────────────────────────────────────────── def insert_events( self, events: list[Event], *, serial: str, session_id: Optional[str] = None, ) -> tuple[int, int]: """ Insert triggered events. Silently skips duplicates (serial+waveform_key). Returns (inserted, skipped). """ inserted = skipped = 0 with self._connect() as conn: for ev in events: key = ev._waveform_key.hex() if ev._waveform_key else None if key is None: skipped += 1 continue ts = None if ev.timestamp: try: ts = datetime.datetime( ev.timestamp.year, ev.timestamp.month, ev.timestamp.day, ev.timestamp.hour, ev.timestamp.minute, ev.timestamp.second, ).isoformat() except Exception: ts = str(ev.timestamp) pv = ev.peak_values pi = ev.project_info try: conn.execute( """ INSERT INTO events (id, serial, waveform_key, session_id, timestamp, tran_ppv, vert_ppv, long_ppv, peak_vector_sum, mic_ppv, project, client, operator, sensor_location, sample_rate, record_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( self._new_id(), serial, key, session_id, ts, pv.tran if pv else None, pv.vert if pv else None, pv.long if pv else None, pv.peak_vector_sum if pv else None, pv.micl if pv else None, pi.project if pi else None, pi.client if pi else None, pi.operator if pi else None, pi.sensor_location if pi else None, ev.sample_rate, ev.record_type, ), ) inserted += 1 except sqlite3.IntegrityError: skipped += 1 log.debug("insert_events serial=%s inserted=%d skipped=%d", serial, inserted, skipped) return inserted, skipped def query_events( self, serial: Optional[str] = None, from_dt: Optional[datetime.datetime] = None, to_dt: Optional[datetime.datetime] = None, false_trigger: Optional[bool] = None, limit: int = 500, offset: int = 0, ) -> list[dict]: """Query events with optional filters. Returns newest first.""" clauses: list[str] = [] params: list = [] if serial: clauses.append("serial = ?") params.append(serial) if from_dt: clauses.append("timestamp >= ?") params.append(from_dt.isoformat()) if to_dt: clauses.append("timestamp <= ?") params.append(to_dt.isoformat()) if false_trigger is not None: clauses.append("false_trigger = ?") params.append(1 if false_trigger else 0) where = ("WHERE " + " AND ".join(clauses)) if clauses else "" params += [limit, offset] with self._connect() as conn: rows = conn.execute( f"SELECT * FROM events {where} " f"ORDER BY timestamp DESC LIMIT ? OFFSET ?", params, ).fetchall() return [dict(r) for r in rows] def set_false_trigger(self, event_id: str, value: bool) -> bool: """Set or clear the false_trigger flag on an event. Returns True if found.""" with self._connect() as conn: cur = conn.execute( "UPDATE events SET false_trigger=? WHERE id=?", (1 if value else 0, event_id), ) return cur.rowcount > 0 # ── Monitor log ─────────────────────────────────────────────────────────── def insert_monitor_log( self, entries: list[MonitorLogEntry], *, session_id: Optional[str] = None, ) -> tuple[int, int]: """ Insert monitor log entries. Silently skips duplicates (serial+waveform_key). Returns (inserted, skipped). """ inserted = skipped = 0 with self._connect() as conn: for e in entries: try: conn.execute( """ INSERT INTO monitor_log (id, serial, waveform_key, session_id, start_time, stop_time, duration_seconds, geo_threshold_ips) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( self._new_id(), e.serial or "", e.key, session_id, self._iso(e.start_time), self._iso(e.stop_time), e.duration_seconds, e.geo_threshold_ips, ), ) inserted += 1 except sqlite3.IntegrityError: skipped += 1 log.debug("insert_monitor_log inserted=%d skipped=%d", inserted, skipped) return inserted, skipped def query_monitor_log( self, serial: Optional[str] = None, from_dt: Optional[datetime.datetime] = None, to_dt: Optional[datetime.datetime] = None, limit: int = 500, offset: int = 0, ) -> list[dict]: """Query monitor log entries with optional filters. Returns newest first.""" clauses: list[str] = [] params: list = [] if serial: clauses.append("serial = ?") params.append(serial) if from_dt: clauses.append("start_time >= ?") params.append(from_dt.isoformat()) if to_dt: clauses.append("start_time <= ?") params.append(to_dt.isoformat()) where = ("WHERE " + " AND ".join(clauses)) if clauses else "" params += [limit, offset] with self._connect() as conn: rows = conn.execute( f"SELECT * FROM monitor_log {where} " f"ORDER BY start_time DESC LIMIT ? OFFSET ?", params, ).fetchall() return [dict(r) for r in rows] # ── Fleet overview ──────────────────────────────────────────────────────── def query_units(self) -> list[dict]: """ Return one row per known serial with summary stats: last_seen, total_events, total_monitor_entries. """ with self._connect() as conn: rows = conn.execute( """ SELECT s.serial, MAX(s.session_time) AS last_seen, SUM(s.events_downloaded) AS total_events, SUM(s.monitor_entries) AS total_monitor_entries, COUNT(*) AS total_sessions FROM ach_sessions s GROUP BY s.serial ORDER BY last_seen DESC """ ).fetchall() return [dict(r) for r in rows]