Files
seismo-relay/sfm/database.py
T
serversdown c641d5fc10 feat: v0.15.0
### Added

- **Layered event storage architecture.**  Each event now lands as four
  files in the per-serial waveform store, each with a clear role:

  - `<filename>` — the Blastware-readable binary (BW file).  Untouched.
  - `<filename>.a5.pkl` — the raw 5A frames (regenerative source).
  - `<filename>.h5` — clean per-channel waveform arrays in physical
    units (in/s for geo, psi for mic) plus event metadata (HDF5 with
    gzip compression).  This is the canonical format for downstream
    analysis tools.
  - `<filename>.sfm.json` — the modern review/metadata sidecar (peaks,
    project, source provenance, review state, extensions).

  SQLite (`seismo_relay.db`) is the searchable index over all four.

- **Plot-ready waveform JSON (`sfm.plot.v1`).**  The `/device/event/{idx}/waveform`
  and `/db/events/{id}/waveform.json` endpoints now return samples in
  physical units with explicit time-axis metadata, peak markers, and
  per-channel unit hints — no more guessing the ADC-to-velocity scale
  client-side.  The webapp waveform viewer was rewritten to consume
  this shape.

- **In-app waveform viewer accuracy fix.**  The standalone SFM webapp
  viewer was scaling geophone amplitudes by `geoAdcScale / 32767`
  (≈ 6.206 / 32767), where `geoAdcScale = 6.206053` is the device's
  *in/s per V* hardware constant — not the ADC-counts-to-velocity
  factor.  This silently scaled every plot ~38% too low for Normal-range
  geophones (the correct full-scale is 10.0 in/s, or 1.25 in/s for
  Sensitive).  Conversion is now done server-side using the geo_range
  from compliance config; the client just plots.

- New `sfm/event_hdf5.py` module: `write_event_hdf5()`,
  `read_event_hdf5()`, plus a plot-JSON helper.
- Backfill script extended to also emit `.h5` for existing events.

### Dependencies

- Added `h5py>=3.10` and `numpy>=1.24` for the HDF5 storage layer.
- Added `python-multipart>=0.0.7` (required by FastAPI for the
  `/db/import/blastware_file` endpoint introduced in this release).
2026-05-08 04:39:51 +00:00

585 lines
25 KiB
Python

"""
sfm/database.py — SQLite persistence layer for seismo-relay.
Three tables, all keyed by unit serial number:
ach_sessions — one row per inbound ACH call-home
events — one row per triggered waveform event (deduped by serial+timestamp)
monitor_log — one row per monitoring interval (deduped by serial+start_time)
The DB file lives at:
<output_dir>/seismo_relay.db (default: bridges/captures/seismo_relay.db)
Usage
-----
from sfm.database import SeismoDb
db = SeismoDb("bridges/captures/seismo_relay.db")
# Write a call-home session
session_id = db.insert_ach_session(serial="BE11529", peer="1.2.3.4:51920",
events_downloaded=3, monitor_entries=2,
duration_seconds=47.3)
# Write events (silently skips duplicates)
db.insert_events(events, serial="BE11529", session_id=session_id)
# Write monitor log entries
db.insert_monitor_log(entries, session_id=session_id)
# Query
rows = db.query_events(serial="BE11529", from_dt=datetime(...), to_dt=datetime(...))
"""
from __future__ import annotations
import datetime
import logging
import sqlite3
import uuid
from pathlib import Path
from typing import Optional
from minimateplus.models import Event, MonitorLogEntry
log = logging.getLogger("sfm.database")
# ── Schema ─────────────────────────────────────────────────────────────────────
_SCHEMA = """
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS ach_sessions (
id TEXT PRIMARY KEY, -- UUID
serial TEXT NOT NULL,
session_time TEXT NOT NULL, -- ISO-8601 UTC
peer TEXT, -- "ip:port"
events_downloaded INTEGER NOT NULL DEFAULT 0,
monitor_entries INTEGER NOT NULL DEFAULT 0,
duration_seconds REAL,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
);
CREATE INDEX IF NOT EXISTS idx_ach_sessions_serial ON ach_sessions(serial);
CREATE INDEX IF NOT EXISTS idx_ach_sessions_time ON ach_sessions(session_time);
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY, -- UUID
serial TEXT NOT NULL,
waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field)
session_id TEXT, -- FK → ach_sessions.id
timestamp TEXT, -- ISO-8601 local time from device
tran_ppv REAL, -- in/s
vert_ppv REAL, -- in/s
long_ppv REAL, -- in/s
peak_vector_sum REAL, -- in/s
mic_ppv REAL, -- psi or dB depending on setup
project TEXT,
client TEXT,
operator TEXT,
sensor_location TEXT,
sample_rate INTEGER,
record_type TEXT, -- "single_shot" | "continuous"
false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag)
blastware_filename TEXT, -- event file within waveform store; extension is per-event (AB0T encodes timestamp)
blastware_filesize INTEGER, -- bytes; NULL if no event file saved
a5_pickle_filename TEXT, -- "<filename>.a5.pkl" sidecar
sidecar_filename TEXT, -- "<filename>.sfm.json" review/metadata sidecar
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(serial, timestamp)
);
CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id);
CREATE TABLE IF NOT EXISTS monitor_log (
id TEXT PRIMARY KEY, -- UUID
serial TEXT NOT NULL,
waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field)
session_id TEXT, -- FK → ach_sessions.id
start_time TEXT, -- ISO-8601
stop_time TEXT, -- ISO-8601
duration_seconds REAL,
geo_threshold_ips REAL, -- in/s
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(serial, start_time)
);
CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial);
CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time);
CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id);
"""
# ── SeismoDb class ─────────────────────────────────────────────────────────────
class SeismoDb:
"""
Thin SQLite wrapper for seismo-relay persistence.
Thread-safe: each call opens, uses, and closes a connection with
check_same_thread=False and WAL mode enabled. For the ACH server's
single-writer / occasional-reader pattern this is more than sufficient.
"""
def __init__(self, db_path: str | Path) -> None:
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_schema()
log.info("SeismoDb initialised at %s", self.db_path)
# ── Internal helpers ───────────────────────────────────────────────────────
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode = WAL")
conn.execute("PRAGMA foreign_keys = ON")
return conn
def _init_schema(self) -> None:
with self._connect() as conn:
conn.executescript(_SCHEMA)
self._migrate(conn)
def _migrate(self, conn: sqlite3.Connection) -> None:
"""Apply in-place schema migrations for existing databases."""
# Migration 1: change events UNIQUE from (serial, waveform_key) [or any
# waveform_key-based variant] to (serial, timestamp).
# Rationale: device key counter resets to 01110000 after every erase, so
# waveform_key is not a stable dedup field across erase cycles. The event
# timestamp (from the device clock) is the correct natural key.
row = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='events'"
).fetchone()
if row and "UNIQUE(serial, timestamp)" not in row[0]:
log.info("_migrate: rebuilding events table — UNIQUE(serial, timestamp)")
conn.executescript("""
ALTER TABLE events RENAME TO events_old;
CREATE TABLE events (
id TEXT PRIMARY KEY,
serial TEXT NOT NULL,
waveform_key TEXT NOT NULL,
session_id TEXT,
timestamp TEXT,
tran_ppv REAL,
vert_ppv REAL,
long_ppv REAL,
peak_vector_sum REAL,
mic_ppv REAL,
project TEXT,
client TEXT,
operator TEXT,
sensor_location TEXT,
sample_rate INTEGER,
record_type TEXT,
false_trigger INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(serial, timestamp)
);
INSERT OR IGNORE INTO events SELECT * FROM events_old;
DROP TABLE events_old;
CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id);
""")
log.info("_migrate: events table rebuilt OK")
# Migration 1b: add Blastware-file columns to existing events tables.
# New columns are NULLable so old rows just read NULL.
existing_cols = {
r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()
}
for col, ddl in (
("blastware_filename", "TEXT"),
("blastware_filesize", "INTEGER"),
("a5_pickle_filename", "TEXT"),
("sidecar_filename", "TEXT"),
):
if col not in existing_cols:
log.info("_migrate: events ADD COLUMN %s %s", col, ddl)
conn.execute(f"ALTER TABLE events ADD COLUMN {col} {ddl}")
# Migration 2: change monitor_log UNIQUE from (serial, waveform_key) to
# (serial, start_time) — same reasoning as events.
row = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='monitor_log'"
).fetchone()
if row and "UNIQUE(serial, start_time)" not in row[0]:
log.info("_migrate: rebuilding monitor_log table — UNIQUE(serial, start_time)")
conn.executescript("""
ALTER TABLE monitor_log RENAME TO monitor_log_old;
CREATE TABLE monitor_log (
id TEXT PRIMARY KEY,
serial TEXT NOT NULL,
waveform_key TEXT NOT NULL,
session_id TEXT,
start_time TEXT,
stop_time TEXT,
duration_seconds REAL,
geo_threshold_ips REAL,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(serial, start_time)
);
INSERT OR IGNORE INTO monitor_log SELECT * FROM monitor_log_old;
DROP TABLE monitor_log_old;
CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial);
CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time);
CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id);
""")
log.info("_migrate: monitor_log table rebuilt OK")
@staticmethod
def _iso(dt: Optional[datetime.datetime]) -> Optional[str]:
return dt.isoformat() if dt is not None else None
@staticmethod
def _new_id() -> str:
return str(uuid.uuid4())
# ── ACH sessions ──────────────────────────────────────────────────────────
def insert_ach_session(
self,
*,
serial: str,
peer: Optional[str] = None,
events_downloaded: int = 0,
monitor_entries: int = 0,
duration_seconds: Optional[float] = None,
session_time: Optional[datetime.datetime] = None,
) -> str:
"""Insert a new ACH session row. Returns the new session UUID."""
sid = self._new_id()
ts = self._iso(session_time or datetime.datetime.utcnow())
with self._connect() as conn:
conn.execute(
"""
INSERT INTO ach_sessions
(id, serial, session_time, peer,
events_downloaded, monitor_entries, duration_seconds)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(sid, serial, ts, peer,
events_downloaded, monitor_entries, duration_seconds),
)
log.debug("ach_session inserted: %s serial=%s events=%d monitor=%d",
sid, serial, events_downloaded, monitor_entries)
return sid
def get_sessions(
self,
serial: Optional[str] = None,
limit: int = 50,
) -> list[dict]:
"""Return recent ACH sessions, newest first."""
with self._connect() as conn:
if serial:
rows = conn.execute(
"SELECT * FROM ach_sessions WHERE serial=? "
"ORDER BY session_time DESC LIMIT ?",
(serial, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM ach_sessions ORDER BY session_time DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
# ── Events ────────────────────────────────────────────────────────────────
def insert_events(
self,
events: list[Event],
*,
serial: str,
session_id: Optional[str] = None,
waveform_records: Optional[dict[str, dict]] = None,
) -> tuple[int, int]:
"""
Insert triggered events. Silently skips duplicates (serial+timestamp).
Returns (inserted, skipped).
``waveform_records`` (optional): dict keyed by event waveform_key (hex)
whose value is a record from ``WaveformStore.save()``:
{"filename": str, "filesize": int, "a5_pickle_filename": str}
For events whose key is in this dict, the matching columns are
populated. If a row with the same (serial, timestamp) already exists
(dedup hit), the matching waveform record is upserted onto the
existing row so a re-download via the live endpoint refreshes the
file metadata.
"""
inserted = skipped = 0
wave_recs = waveform_records or {}
with self._connect() as conn:
for ev in events:
key = ev._waveform_key.hex() if ev._waveform_key else None
if key is None:
skipped += 1
continue
ts = None
if ev.timestamp:
try:
ts = datetime.datetime(
ev.timestamp.year, ev.timestamp.month, ev.timestamp.day,
ev.timestamp.hour, ev.timestamp.minute, ev.timestamp.second,
).isoformat()
except Exception:
ts = str(ev.timestamp)
pv = ev.peak_values
pi = ev.project_info
rec = wave_recs.get(key) or {}
try:
conn.execute(
"""
INSERT INTO events
(id, serial, waveform_key, session_id, timestamp,
tran_ppv, vert_ppv, long_ppv, peak_vector_sum, mic_ppv,
project, client, operator, sensor_location,
sample_rate, record_type,
blastware_filename, blastware_filesize,
a5_pickle_filename, sidecar_filename)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
self._new_id(), serial, key, session_id, ts,
pv.tran if pv else None,
pv.vert if pv else None,
pv.long if pv else None,
pv.peak_vector_sum if pv else None,
pv.micl if pv else None,
pi.project if pi else None,
pi.client if pi else None,
pi.operator if pi else None,
pi.sensor_location if pi else None,
ev.sample_rate,
ev.record_type,
rec.get("filename"),
rec.get("filesize"),
rec.get("a5_pickle_filename"),
rec.get("sidecar_filename"),
),
)
inserted += 1
except sqlite3.IntegrityError:
skipped += 1
# Upsert waveform fields onto the existing dedup row so a
# re-download via the live endpoint refreshes filename /
# size / sidecar without churning the rest of the row.
if rec and ts:
conn.execute(
"""
UPDATE events
SET blastware_filename = ?,
blastware_filesize = ?,
a5_pickle_filename = ?,
sidecar_filename = ?
WHERE serial = ? AND timestamp = ?
""",
(
rec.get("filename"),
rec.get("filesize"),
rec.get("a5_pickle_filename"),
rec.get("sidecar_filename"),
serial,
ts,
),
)
log.debug("insert_events serial=%s inserted=%d skipped=%d",
serial, inserted, skipped)
return inserted, skipped
def get_event(self, event_id: str) -> Optional[dict]:
"""Return one event row by id, or None."""
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM events WHERE id = ?", (event_id,),
).fetchone()
return dict(row) if row else None
def query_events(
self,
serial: Optional[str] = None,
from_dt: Optional[datetime.datetime] = None,
to_dt: Optional[datetime.datetime] = None,
false_trigger: Optional[bool] = None,
limit: int = 500,
offset: int = 0,
) -> list[dict]:
"""Query events with optional filters. Returns newest first."""
clauses: list[str] = []
params: list = []
if serial:
clauses.append("serial = ?")
params.append(serial)
if from_dt:
clauses.append("timestamp >= ?")
params.append(from_dt.isoformat())
if to_dt:
clauses.append("timestamp <= ?")
params.append(to_dt.isoformat())
if false_trigger is not None:
clauses.append("false_trigger = ?")
params.append(1 if false_trigger else 0)
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
params += [limit, offset]
with self._connect() as conn:
rows = conn.execute(
f"SELECT * FROM events {where} "
f"ORDER BY timestamp DESC LIMIT ? OFFSET ?",
params,
).fetchall()
return [dict(r) for r in rows]
def set_false_trigger(self, event_id: str, value: bool) -> bool:
"""Set or clear the false_trigger flag on an event. Returns True if found."""
with self._connect() as conn:
cur = conn.execute(
"UPDATE events SET false_trigger=? WHERE id=?",
(1 if value else 0, event_id),
)
return cur.rowcount > 0
def update_event_review(self, event_id: str, review: dict) -> bool:
"""
Sync derived index columns from a sidecar's `review` block.
Currently the only derived index is `events.false_trigger` — kept
in sync so `/db/events?false_trigger=true` queries don't have to
scan every sidecar JSON on disk. The sidecar JSON itself remains
the source of truth for the full review state.
Returns True when the row exists, False otherwise. No-op fields
(review without `false_trigger`) leave the column untouched.
"""
if not isinstance(review, dict):
return False
if "false_trigger" not in review:
# Nothing derived to update; just confirm the row exists.
with self._connect() as conn:
row = conn.execute(
"SELECT 1 FROM events WHERE id=?", (event_id,),
).fetchone()
return row is not None
flag = 1 if review.get("false_trigger") else 0
with self._connect() as conn:
cur = conn.execute(
"UPDATE events SET false_trigger=? WHERE id=?",
(flag, event_id),
)
return cur.rowcount > 0
# ── Monitor log ───────────────────────────────────────────────────────────
def insert_monitor_log(
self,
entries: list[MonitorLogEntry],
*,
session_id: Optional[str] = None,
) -> tuple[int, int]:
"""
Insert monitor log entries. Silently skips duplicates (serial+start_time).
Returns (inserted, skipped).
"""
inserted = skipped = 0
with self._connect() as conn:
for e in entries:
try:
conn.execute(
"""
INSERT INTO monitor_log
(id, serial, waveform_key, session_id,
start_time, stop_time, duration_seconds,
geo_threshold_ips)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
self._new_id(),
e.serial or "",
e.key,
session_id,
self._iso(e.start_time),
self._iso(e.stop_time),
e.duration_seconds,
e.geo_threshold_ips,
),
)
inserted += 1
except sqlite3.IntegrityError:
skipped += 1
log.debug("insert_monitor_log inserted=%d skipped=%d", inserted, skipped)
return inserted, skipped
def query_monitor_log(
self,
serial: Optional[str] = None,
from_dt: Optional[datetime.datetime] = None,
to_dt: Optional[datetime.datetime] = None,
limit: int = 500,
offset: int = 0,
) -> list[dict]:
"""Query monitor log entries with optional filters. Returns newest first."""
clauses: list[str] = []
params: list = []
if serial:
clauses.append("serial = ?")
params.append(serial)
if from_dt:
clauses.append("start_time >= ?")
params.append(from_dt.isoformat())
if to_dt:
clauses.append("start_time <= ?")
params.append(to_dt.isoformat())
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
params += [limit, offset]
with self._connect() as conn:
rows = conn.execute(
f"SELECT * FROM monitor_log {where} "
f"ORDER BY start_time DESC LIMIT ? OFFSET ?",
params,
).fetchall()
return [dict(r) for r in rows]
# ── Fleet overview ────────────────────────────────────────────────────────
def query_units(self) -> list[dict]:
"""
Return one row per known serial with summary stats:
last_seen, total_events, total_monitor_entries.
"""
with self._connect() as conn:
rows = conn.execute(
"""
SELECT
s.serial,
MAX(s.session_time) AS last_seen,
SUM(s.events_downloaded) AS total_events,
SUM(s.monitor_entries) AS total_monitor_entries,
COUNT(*) AS total_sessions
FROM ach_sessions s
GROUP BY s.serial
ORDER BY last_seen DESC
"""
).fetchall()
return [dict(r) for r in rows]