""" sfm/database.py — SQLite persistence layer for seismo-relay. Three tables, all keyed by unit serial number: ach_sessions — one row per inbound ACH call-home events — one row per triggered waveform event (deduped by serial+timestamp) monitor_log — one row per monitoring interval (deduped by serial+start_time) The DB file lives at: /seismo_relay.db (default: bridges/captures/seismo_relay.db) Usage ----- from sfm.database import SeismoDb db = SeismoDb("bridges/captures/seismo_relay.db") # Write a call-home session session_id = db.insert_ach_session(serial="BE11529", peer="1.2.3.4:51920", events_downloaded=3, monitor_entries=2, duration_seconds=47.3) # Write events (silently skips duplicates) db.insert_events(events, serial="BE11529", session_id=session_id) # Write monitor log entries db.insert_monitor_log(entries, session_id=session_id) # Query rows = db.query_events(serial="BE11529", from_dt=datetime(...), to_dt=datetime(...)) """ from __future__ import annotations import datetime import logging import sqlite3 import uuid from pathlib import Path from typing import Optional from minimateplus.models import Event, MonitorLogEntry log = logging.getLogger("sfm.database") # ── Schema ───────────────────────────────────────────────────────────────────── _SCHEMA = """ PRAGMA journal_mode = WAL; PRAGMA foreign_keys = ON; CREATE TABLE IF NOT EXISTS ach_sessions ( id TEXT PRIMARY KEY, -- UUID serial TEXT NOT NULL, session_time TEXT NOT NULL, -- ISO-8601 UTC peer TEXT, -- "ip:port" events_downloaded INTEGER NOT NULL DEFAULT 0, monitor_entries INTEGER NOT NULL DEFAULT 0, duration_seconds REAL, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) ); CREATE INDEX IF NOT EXISTS idx_ach_sessions_serial ON ach_sessions(serial); CREATE INDEX IF NOT EXISTS idx_ach_sessions_time ON ach_sessions(session_time); CREATE TABLE IF NOT EXISTS events ( id TEXT PRIMARY KEY, -- UUID serial TEXT NOT NULL, waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field) session_id TEXT, -- FK → ach_sessions.id timestamp TEXT, -- ISO-8601 local time from device tran_ppv REAL, -- in/s vert_ppv REAL, -- in/s long_ppv REAL, -- in/s peak_vector_sum REAL, -- in/s mic_ppv REAL, -- psi or dB depending on setup project TEXT, client TEXT, operator TEXT, sensor_location TEXT, sample_rate INTEGER, record_type TEXT, -- "single_shot" | "continuous" false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag) blastware_filename TEXT, -- event file within waveform store; extension is per-event (AB0T encodes timestamp) blastware_filesize INTEGER, -- bytes; NULL if no event file saved a5_pickle_filename TEXT, -- ".a5.pkl" sidecar sidecar_filename TEXT, -- ".sfm.json" review/metadata sidecar device_family TEXT, -- "series3" (MiniMate Plus / BW) | "series4" (Micromate / Thor) — drives per-family UI rendering (units, labels) created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), UNIQUE(serial, timestamp) ); CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial); CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp); CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id); CREATE TABLE IF NOT EXISTS monitor_log ( id TEXT PRIMARY KEY, -- UUID serial TEXT NOT NULL, waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field) session_id TEXT, -- FK → ach_sessions.id start_time TEXT, -- ISO-8601 stop_time TEXT, -- ISO-8601 duration_seconds REAL, geo_threshold_ips REAL, -- in/s created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), UNIQUE(serial, start_time) ); CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial); CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time); CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id); """ # ── SeismoDb class ───────────────────────────────────────────────────────────── class SeismoDb: """ Thin SQLite wrapper for seismo-relay persistence. Thread-safe: each call opens, uses, and closes a connection with check_same_thread=False and WAL mode enabled. For the ACH server's single-writer / occasional-reader pattern this is more than sufficient. """ def __init__(self, db_path: str | Path) -> None: self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self._init_schema() log.info("SeismoDb initialised at %s", self.db_path) # ── Internal helpers ─────────────────────────────────────────────────────── def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(str(self.db_path), check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode = WAL") conn.execute("PRAGMA foreign_keys = ON") return conn def _init_schema(self) -> None: with self._connect() as conn: conn.executescript(_SCHEMA) self._migrate(conn) def _migrate(self, conn: sqlite3.Connection) -> None: """Apply in-place schema migrations for existing databases.""" # Migration 1: change events UNIQUE from (serial, waveform_key) [or any # waveform_key-based variant] to (serial, timestamp). # Rationale: device key counter resets to 01110000 after every erase, so # waveform_key is not a stable dedup field across erase cycles. The event # timestamp (from the device clock) is the correct natural key. row = conn.execute( "SELECT sql FROM sqlite_master WHERE type='table' AND name='events'" ).fetchone() if row and "UNIQUE(serial, timestamp)" not in row[0]: log.info("_migrate: rebuilding events table — UNIQUE(serial, timestamp)") conn.executescript(""" ALTER TABLE events RENAME TO events_old; CREATE TABLE events ( id TEXT PRIMARY KEY, serial TEXT NOT NULL, waveform_key TEXT NOT NULL, session_id TEXT, timestamp TEXT, tran_ppv REAL, vert_ppv REAL, long_ppv REAL, peak_vector_sum REAL, mic_ppv REAL, project TEXT, client TEXT, operator TEXT, sensor_location TEXT, sample_rate INTEGER, record_type TEXT, false_trigger INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), UNIQUE(serial, timestamp) ); INSERT OR IGNORE INTO events SELECT * FROM events_old; DROP TABLE events_old; CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial); CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp); CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id); """) log.info("_migrate: events table rebuilt OK") # Migration 1b: add Blastware-file columns to existing events tables. # New columns are NULLable so old rows just read NULL. existing_cols = { r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall() } for col, ddl in ( ("blastware_filename", "TEXT"), ("blastware_filesize", "INTEGER"), ("a5_pickle_filename", "TEXT"), ("sidecar_filename", "TEXT"), ("device_family", "TEXT"), ): if col not in existing_cols: log.info("_migrate: events ADD COLUMN %s %s", col, ddl) conn.execute(f"ALTER TABLE events ADD COLUMN {col} {ddl}") # Migration 1c: backfill device_family for existing rows by sniffing # the device-native binary filename's extension. Thor (Micromate # Series IV) writes `.IDFH` / `.IDFW`; MiniMate Plus (Series III) # writes `.AB0*` / `.N00` / `.` Blastware extensions. We do # this here rather than from sidecars so the migration is fully # self-contained (doesn't need the waveform-store root) and runs at # DB-init time. Only fills NULL device_family so re-runs are no-ops. rebackfill = conn.execute( "SELECT COUNT(*) FROM events WHERE device_family IS NULL" ).fetchone() if rebackfill and rebackfill[0] > 0: log.info("_migrate: backfilling device_family for %d events", rebackfill[0]) # Series IV (Thor IDF) — extension is exactly .IDFH or .IDFW conn.execute( """ UPDATE events SET device_family = 'series4' WHERE device_family IS NULL AND ( UPPER(blastware_filename) LIKE '%.IDFH' OR UPPER(blastware_filename) LIKE '%.IDFW' ) """ ) # Everything else with a filename → Series III (Blastware family) conn.execute( """ UPDATE events SET device_family = 'series3' WHERE device_family IS NULL AND blastware_filename IS NOT NULL """ ) # Rows with no filename (e.g. older monitor_log-derived events) # stay NULL — UI handles NULL as "unknown family". remaining = conn.execute( "SELECT COUNT(*) FROM events WHERE device_family IS NULL" ).fetchone()[0] log.info("_migrate: device_family backfill complete (remaining NULL=%d)", remaining) # Migration 2: change monitor_log UNIQUE from (serial, waveform_key) to # (serial, start_time) — same reasoning as events. row = conn.execute( "SELECT sql FROM sqlite_master WHERE type='table' AND name='monitor_log'" ).fetchone() if row and "UNIQUE(serial, start_time)" not in row[0]: log.info("_migrate: rebuilding monitor_log table — UNIQUE(serial, start_time)") conn.executescript(""" ALTER TABLE monitor_log RENAME TO monitor_log_old; CREATE TABLE monitor_log ( id TEXT PRIMARY KEY, serial TEXT NOT NULL, waveform_key TEXT NOT NULL, session_id TEXT, start_time TEXT, stop_time TEXT, duration_seconds REAL, geo_threshold_ips REAL, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), UNIQUE(serial, start_time) ); INSERT OR IGNORE INTO monitor_log SELECT * FROM monitor_log_old; DROP TABLE monitor_log_old; CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial); CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time); CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id); """) log.info("_migrate: monitor_log table rebuilt OK") @staticmethod def _iso(dt: Optional[datetime.datetime]) -> Optional[str]: return dt.isoformat() if dt is not None else None @staticmethod def _new_id() -> str: return str(uuid.uuid4()) # ── ACH sessions ────────────────────────────────────────────────────────── def insert_ach_session( self, *, serial: str, peer: Optional[str] = None, events_downloaded: int = 0, monitor_entries: int = 0, duration_seconds: Optional[float] = None, session_time: Optional[datetime.datetime] = None, ) -> str: """Insert a new ACH session row. Returns the new session UUID.""" sid = self._new_id() ts = self._iso(session_time or datetime.datetime.utcnow()) with self._connect() as conn: conn.execute( """ INSERT INTO ach_sessions (id, serial, session_time, peer, events_downloaded, monitor_entries, duration_seconds) VALUES (?, ?, ?, ?, ?, ?, ?) """, (sid, serial, ts, peer, events_downloaded, monitor_entries, duration_seconds), ) log.debug("ach_session inserted: %s serial=%s events=%d monitor=%d", sid, serial, events_downloaded, monitor_entries) return sid def get_sessions( self, serial: Optional[str] = None, limit: int = 50, ) -> list[dict]: """Return recent ACH sessions, newest first.""" with self._connect() as conn: if serial: rows = conn.execute( "SELECT * FROM ach_sessions WHERE serial=? " "ORDER BY session_time DESC LIMIT ?", (serial, limit), ).fetchall() else: rows = conn.execute( "SELECT * FROM ach_sessions ORDER BY session_time DESC LIMIT ?", (limit,), ).fetchall() return [dict(r) for r in rows] # ── Events ──────────────────────────────────────────────────────────────── def insert_events( self, events: list[Event], *, serial: str, session_id: Optional[str] = None, waveform_records: Optional[dict[str, dict]] = None, device_family: Optional[str] = None, ) -> tuple[int, int]: """ Insert triggered events. Silently skips duplicates (serial+timestamp). Returns (inserted, skipped). ``waveform_records`` (optional): dict keyed by event waveform_key (hex) whose value is a record from ``WaveformStore.save()``: {"filename": str, "filesize": int, "a5_pickle_filename": str} For events whose key is in this dict, the matching columns are populated. If a row with the same (serial, timestamp) already exists (dedup hit), the matching waveform record is upserted onto the existing row so a re-download via the live endpoint refreshes the file metadata. ``device_family`` (optional): "series3" (MiniMate Plus / Blastware) or "series4" (Micromate / Thor). Drives per-family UI rendering — most importantly the mic-unit convention (psi vs dB(L)). Set on every insert and overwritten on every UPSERT so the latest writer wins. """ inserted = skipped = 0 wave_recs = waveform_records or {} with self._connect() as conn: for ev in events: key = ev._waveform_key.hex() if ev._waveform_key else None if key is None: skipped += 1 continue ts = None if ev.timestamp: try: ts = datetime.datetime( ev.timestamp.year, ev.timestamp.month, ev.timestamp.day, ev.timestamp.hour, ev.timestamp.minute, ev.timestamp.second, ).isoformat() except Exception: ts = str(ev.timestamp) pv = ev.peak_values pi = ev.project_info rec = wave_recs.get(key) or {} try: conn.execute( """ INSERT INTO events (id, serial, waveform_key, session_id, timestamp, tran_ppv, vert_ppv, long_ppv, peak_vector_sum, mic_ppv, project, client, operator, sensor_location, sample_rate, record_type, blastware_filename, blastware_filesize, a5_pickle_filename, sidecar_filename, device_family) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( self._new_id(), serial, key, session_id, ts, pv.tran if pv else None, pv.vert if pv else None, pv.long if pv else None, pv.peak_vector_sum if pv else None, pv.micl if pv else None, pi.project if pi else None, pi.client if pi else None, pi.operator if pi else None, pi.sensor_location if pi else None, ev.sample_rate, ev.record_type, rec.get("filename"), rec.get("filesize"), rec.get("a5_pickle_filename"), rec.get("sidecar_filename"), device_family, ), ) inserted += 1 except sqlite3.IntegrityError: skipped += 1 # UPSERT path: a row for this (serial, timestamp) already # exists. Refresh every device-authoritative field from # the new data so that a re-import with better data (e.g. # a watcher re-forward where the previous attempt missed # the paired BW ASCII report) replaces stale peaks / # project info / sample_rate. # # Preserved (not in this UPDATE): # id, waveform_key, session_id, created_at — immutable / FK # false_trigger — operator review state # # Behaviour change vs prior versions: this UPDATE used # to only refresh filename / filesize / a5_pickle / # sidecar fields. As a result, the first insert's # broken-codec peak values were locked in forever even # if subsequent re-forwards arrived with correct # report-derived values. Now every re-import lifts the # DB row up to whatever the latest Event carries. conn.execute( """ UPDATE events SET tran_ppv = ?, vert_ppv = ?, long_ppv = ?, peak_vector_sum = ?, mic_ppv = ?, project = ?, client = ?, operator = ?, sensor_location = ?, sample_rate = ?, record_type = ?, blastware_filename = ?, blastware_filesize = ?, a5_pickle_filename = ?, sidecar_filename = ?, device_family = COALESCE(?, device_family) WHERE serial = ? AND timestamp = ? """, ( pv.tran if pv else None, pv.vert if pv else None, pv.long if pv else None, pv.peak_vector_sum if pv else None, pv.micl if pv else None, pi.project if pi else None, pi.client if pi else None, pi.operator if pi else None, pi.sensor_location if pi else None, ev.sample_rate, ev.record_type, rec.get("filename") if rec else None, rec.get("filesize") if rec else None, rec.get("a5_pickle_filename") if rec else None, rec.get("sidecar_filename") if rec else None, device_family, serial, ts, ), ) log.debug("insert_events serial=%s inserted=%d skipped=%d", serial, inserted, skipped) return inserted, skipped def get_event(self, event_id: str) -> Optional[dict]: """Return one event row by id, or None.""" with self._connect() as conn: row = conn.execute( "SELECT * FROM events WHERE id = ?", (event_id,), ).fetchone() return dict(row) if row else None def query_events( self, serial: Optional[str] = None, from_dt: Optional[datetime.datetime] = None, to_dt: Optional[datetime.datetime] = None, false_trigger: Optional[bool] = None, limit: int = 500, offset: int = 0, ) -> list[dict]: """Query events with optional filters. Returns newest first.""" clauses: list[str] = [] params: list = [] if serial: clauses.append("serial = ?") params.append(serial) if from_dt: clauses.append("timestamp >= ?") params.append(from_dt.isoformat()) if to_dt: clauses.append("timestamp <= ?") params.append(to_dt.isoformat()) if false_trigger is not None: clauses.append("false_trigger = ?") params.append(1 if false_trigger else 0) where = ("WHERE " + " AND ".join(clauses)) if clauses else "" params += [limit, offset] with self._connect() as conn: rows = conn.execute( f"SELECT * FROM events {where} " f"ORDER BY timestamp DESC LIMIT ? OFFSET ?", params, ).fetchall() return [dict(r) for r in rows] def set_false_trigger(self, event_id: str, value: bool) -> bool: """Set or clear the false_trigger flag on an event. Returns True if found.""" with self._connect() as conn: cur = conn.execute( "UPDATE events SET false_trigger=? WHERE id=?", (1 if value else 0, event_id), ) return cur.rowcount > 0 def delete_event(self, event_id: str) -> Optional[dict]: """ Hard-delete one event row by id. Returns the deleted row (so the caller can clean up any on-disk files referenced by it) or None if no row matched. """ with self._connect() as conn: row = conn.execute( "SELECT * FROM events WHERE id = ?", (event_id,), ).fetchone() if row is None: return None conn.execute("DELETE FROM events WHERE id = ?", (event_id,)) return dict(row) def delete_events_bulk( self, serial: Optional[str] = None, from_dt: Optional[datetime.datetime] = None, to_dt: Optional[datetime.datetime] = None, false_trigger: Optional[bool] = None, ids: Optional[list[str]] = None, ) -> list[dict]: """ Hard-delete events matching the given filters. Returns the list of deleted row dicts. Refuses to delete with no filters at all (would wipe the whole table) — raises ValueError. Filter semantics match query_events: serial / from_dt / to_dt / false_trigger combine with AND. `ids` is an additional inclusion list (event_id IN (...)); if supplied alongside other filters, only rows matching all conditions are deleted. """ clauses: list[str] = [] params: list = [] if serial: clauses.append("serial = ?") params.append(serial) if from_dt: clauses.append("timestamp >= ?") params.append(from_dt.isoformat()) if to_dt: clauses.append("timestamp <= ?") params.append(to_dt.isoformat()) if false_trigger is not None: clauses.append("false_trigger = ?") params.append(1 if false_trigger else 0) if ids: placeholders = ",".join("?" * len(ids)) clauses.append(f"id IN ({placeholders})") params.extend(ids) if not clauses: raise ValueError( "delete_events_bulk refuses to delete with no filters " "(would wipe the entire events table)" ) where = "WHERE " + " AND ".join(clauses) with self._connect() as conn: rows = conn.execute( f"SELECT * FROM events {where}", params, ).fetchall() if rows: conn.execute(f"DELETE FROM events {where}", params) return [dict(r) for r in rows] def update_event_review(self, event_id: str, review: dict) -> bool: """ Sync derived index columns from a sidecar's `review` block. Currently the only derived index is `events.false_trigger` — kept in sync so `/db/events?false_trigger=true` queries don't have to scan every sidecar JSON on disk. The sidecar JSON itself remains the source of truth for the full review state. Returns True when the row exists, False otherwise. No-op fields (review without `false_trigger`) leave the column untouched. """ if not isinstance(review, dict): return False if "false_trigger" not in review: # Nothing derived to update; just confirm the row exists. with self._connect() as conn: row = conn.execute( "SELECT 1 FROM events WHERE id=?", (event_id,), ).fetchone() return row is not None flag = 1 if review.get("false_trigger") else 0 with self._connect() as conn: cur = conn.execute( "UPDATE events SET false_trigger=? WHERE id=?", (flag, event_id), ) return cur.rowcount > 0 # ── Monitor log ─────────────────────────────────────────────────────────── def insert_monitor_log( self, entries: list[MonitorLogEntry], *, session_id: Optional[str] = None, ) -> tuple[int, int]: """ Insert monitor log entries. Silently skips duplicates (serial+start_time). Returns (inserted, skipped). """ inserted = skipped = 0 with self._connect() as conn: for e in entries: try: conn.execute( """ INSERT INTO monitor_log (id, serial, waveform_key, session_id, start_time, stop_time, duration_seconds, geo_threshold_ips) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( self._new_id(), e.serial or "", e.key, session_id, self._iso(e.start_time), self._iso(e.stop_time), e.duration_seconds, e.geo_threshold_ips, ), ) inserted += 1 except sqlite3.IntegrityError: skipped += 1 log.debug("insert_monitor_log inserted=%d skipped=%d", inserted, skipped) return inserted, skipped def query_monitor_log( self, serial: Optional[str] = None, from_dt: Optional[datetime.datetime] = None, to_dt: Optional[datetime.datetime] = None, limit: int = 500, offset: int = 0, ) -> list[dict]: """Query monitor log entries with optional filters. Returns newest first.""" clauses: list[str] = [] params: list = [] if serial: clauses.append("serial = ?") params.append(serial) if from_dt: clauses.append("start_time >= ?") params.append(from_dt.isoformat()) if to_dt: clauses.append("start_time <= ?") params.append(to_dt.isoformat()) where = ("WHERE " + " AND ".join(clauses)) if clauses else "" params += [limit, offset] with self._connect() as conn: rows = conn.execute( f"SELECT * FROM monitor_log {where} " f"ORDER BY start_time DESC LIMIT ? OFFSET ?", params, ).fetchall() return [dict(r) for r in rows] # ── Fleet overview ──────────────────────────────────────────────────────── def query_units(self) -> list[dict]: """ Return one row per known serial with summary stats. Aggregates from BOTH source tables: - `events` — populated by every ingest path (live ACH, /db/import/blastware_file from the series3-watcher forwarder, etc.) - `ach_sessions` — only populated by the live ACH server; empty for events that came in via the BW-importer route. Earlier this method only joined on `ach_sessions`, which made watcher-forwarded units invisible to the SFM webapp's fleet overview even though their events were correctly populated in `events`. Now we union the two and surface every serial that has activity in either table. Fields: serial — unit serial number (e.g. "BE11529") last_seen — most recent of MAX(events.timestamp) and MAX(ach_sessions.session_time) total_events — COUNT(*) from `events` (the authoritative count regardless of ingest path) total_monitor_entries — from `ach_sessions`, 0 when absent total_sessions — COUNT(*) from `ach_sessions`, 0 when absent """ with self._connect() as conn: event_stats = { row["serial"]: row for row in conn.execute( """ SELECT serial, MAX(timestamp) AS last_event_at, COUNT(*) AS total_events FROM events GROUP BY serial """, ).fetchall() } session_stats = { row["serial"]: row for row in conn.execute( """ SELECT serial, MAX(session_time) AS last_session_at, SUM(monitor_entries) AS total_monitor_entries, COUNT(*) AS total_sessions FROM ach_sessions GROUP BY serial """, ).fetchall() } all_serials = set(event_stats) | set(session_stats) units = [] for serial in all_serials: e = event_stats.get(serial) s = session_stats.get(serial) last_event_at = e["last_event_at"] if e else None last_session_at = s["last_session_at"] if s else None # Prefer whichever timestamp is more recent last_seen = max( (t for t in (last_event_at, last_session_at) if t), default=None, ) units.append({ "serial": serial, "last_seen": last_seen, "total_events": e["total_events"] if e else 0, "total_monitor_entries": s["total_monitor_entries"] if s else 0, "total_sessions": s["total_sessions"] if s else 0, }) # Sort by last_seen desc; serials with no timestamp at all sink to the bottom. units.sort(key=lambda u: u.get("last_seen") or "", reverse=True) return units