diff --git a/CHANGELOG.md b/CHANGELOG.md index 23f02f7..315f3aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,44 @@ All notable changes to seismo-relay are documented here. --- +## v0.11.0 — 2026-04-13 + +### Added + +- **`sfm/database.py` — SeismoDb** — SQLite persistence layer for all ACH data. + Three tables, all unit-keyed by serial number: + - `ach_sessions` — one row per inbound call-home: serial, timestamp, peer IP, + events_downloaded, monitor_entries, duration_seconds + - `events` — one row per triggered waveform event: serial, waveform_key (dedup), + timestamp, Tran/Vert/Long/VectorSum/Mic PPV, project/client/operator/sensor_location + strings, sample_rate, record_type, false_trigger flag + - `monitor_log` — one row per monitoring interval: serial, waveform_key (dedup), + start_time, stop_time, duration_seconds, geo_threshold_ips + - WAL mode, per-request connections — safe for the single-writer / occasional-reader + ACH server pattern + - Deduplication by `(serial, waveform_key)` UNIQUE constraint — re-runs and repeat + call-homes never produce duplicate rows + +- **`ach_server.py` — DB integration** — after each successful call-home, writes new + events and monitor log entries to `seismo_relay.db` then records the session in + `ach_sessions`. DB write failures are logged as warnings and do not abort the session. + +- **`sfm/server.py` — DB read endpoints**: + - `GET /db/units` — distinct serials with last_seen, total_events, total_monitor_entries + - `GET /db/events` — query events with serial / date range / false_trigger filters + - `GET /db/monitor_log` — query monitoring intervals + - `GET /db/sessions` — query ACH call-home sessions + - `PATCH /db/events/{id}/false_trigger` — flag/unflag false triggers (for review UI) + +### Architecture + +- seismo-relay DB is unit-keyed only — no project concepts. Project aggregation is + terra-view's responsibility via `UnitAssignment` / `DeploymentRecord` + date range + queries against the SFM DB endpoints. +- DB file lives at `bridges/captures/seismo_relay.db` by default. + +--- + ## v0.10.0 — 2026-04-11 ### Added diff --git a/CLAUDE.md b/CLAUDE.md index 0ac0bc5..f1a2836 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -416,6 +416,8 @@ for 0x10 records). ## SFM REST API (sfm/server.py) +### Live device endpoints (connect to device per-request) + ``` GET /device/info?port=COM5 ← serial GET /device/info?host=1.2.3.4&tcp_port=9034 ← cellular @@ -428,6 +430,19 @@ POST /device/monitor/stop?host=1.2.3.4&tcp_port=9034 ← stop recording Server retries once on `ProtocolError` for TCP connections (handles cold-boot timing). +### DB read endpoints (query seismo_relay.db written by ach_server.py) + +``` +GET /db/units ← all known serials + summary stats +GET /db/events?serial=BE11529&from_dt=&to_dt=&limit= ← triggered events, newest first +GET /db/monitor_log?serial=BE11529&from_dt=&to_dt= ← monitoring intervals, newest first +GET /db/sessions?serial=BE11529&limit=50 ← ACH call-home sessions, newest first +PATCH /db/events/{id}/false_trigger?value=true ← flag/unflag false triggers +``` + +DB file: `bridges/captures/seismo_relay.db` (default; override with `--db-path` at startup). +All DB endpoints are read-only except `PATCH /db/events/{id}/false_trigger`. + --- ## Key wire captures (reference material) diff --git a/bridges/ach_server.py b/bridges/ach_server.py index 8742e81..f3fc6e4 100644 --- a/bridges/ach_server.py +++ b/bridges/ach_server.py @@ -68,6 +68,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) from minimateplus.transport import SocketTransport from minimateplus.client import MiniMateClient from minimateplus.models import DeviceInfo, Event, MonitorLogEntry +from sfm.database import SeismoDb log = logging.getLogger("ach_server") @@ -136,6 +137,7 @@ class AchSession: events_only: bool, max_events: Optional[int], state_path: Path, + db: "SeismoDb", clear_after_download: bool = False, ) -> None: self.sock = sock @@ -145,6 +147,7 @@ class AchSession: self.events_only = events_only self.max_events = max_events self.state_path = state_path + self.db = db self.clear_after_download = clear_after_download def run(self) -> None: @@ -408,6 +411,30 @@ class AchSession: exc, ) + # ── Persist to SQLite DB ───────────────────────────────────── + _session_start = datetime.datetime.now() + try: + _ev_ins, _ev_skip = self.db.insert_events( + new_events, serial=serial or self.peer, session_id=None + ) + _ml_ins, _ml_skip = self.db.insert_monitor_log( + new_monitor_entries, session_id=None + ) + _session_id = self.db.insert_ach_session( + serial=serial or self.peer, + peer=self.peer, + events_downloaded=_ev_ins, + monitor_entries=_ml_ins, + duration_seconds=(datetime.datetime.now() - _session_start).total_seconds(), + session_time=_session_start, + ) + log.info( + " [DB] session=%s events +%d (skip %d) monitor +%d (skip %d)", + _session_id[:8], _ev_ins, _ev_skip, _ml_ins, _ml_skip, + ) + except Exception as exc: + log.warning(" [WARN] DB write failed: %s -- continuing", exc) + # ── Optional: erase device memory after successful download ──── erased_successfully = False if self.clear_after_download and new_events: @@ -549,6 +576,7 @@ def serve(args: argparse.Namespace) -> None: output_dir = Path(args.output) output_dir.mkdir(parents=True, exist_ok=True) state_path = output_dir / "ach_state.json" + db = SeismoDb(output_dir / "seismo_relay.db") server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -601,6 +629,7 @@ def serve(args: argparse.Namespace) -> None: events_only=args.events_only, max_events=max_ev, state_path=state_path, + db=db, clear_after_download=args.clear_after_download, ) t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") diff --git a/sfm/database.py b/sfm/database.py new file mode 100644 index 0000000..e0172e2 --- /dev/null +++ b/sfm/database.py @@ -0,0 +1,406 @@ +""" +sfm/database.py — SQLite persistence layer for seismo-relay. + +Three tables, all keyed by unit serial number: + + ach_sessions — one row per inbound ACH call-home + events — one row per triggered waveform event (deduped by serial+key) + monitor_log — one row per monitoring interval (deduped by serial+key) + +The DB file lives at: + /seismo_relay.db (default: bridges/captures/seismo_relay.db) + +Usage +----- + from sfm.database import SeismoDb + + db = SeismoDb("bridges/captures/seismo_relay.db") + + # Write a call-home session + session_id = db.insert_ach_session(serial="BE11529", peer="1.2.3.4:51920", + events_downloaded=3, monitor_entries=2, + duration_seconds=47.3) + + # Write events (silently skips duplicates) + db.insert_events(events, serial="BE11529", session_id=session_id) + + # Write monitor log entries + db.insert_monitor_log(entries, session_id=session_id) + + # Query + rows = db.query_events(serial="BE11529", from_dt=datetime(...), to_dt=datetime(...)) +""" + +from __future__ import annotations + +import datetime +import logging +import sqlite3 +import uuid +from pathlib import Path +from typing import Optional + +from minimateplus.models import Event, MonitorLogEntry + +log = logging.getLogger("sfm.database") + +# ── Schema ───────────────────────────────────────────────────────────────────── + +_SCHEMA = """ +PRAGMA journal_mode = WAL; +PRAGMA foreign_keys = ON; + +CREATE TABLE IF NOT EXISTS ach_sessions ( + id TEXT PRIMARY KEY, -- UUID + serial TEXT NOT NULL, + session_time TEXT NOT NULL, -- ISO-8601 UTC + peer TEXT, -- "ip:port" + events_downloaded INTEGER NOT NULL DEFAULT 0, + monitor_entries INTEGER NOT NULL DEFAULT 0, + duration_seconds REAL, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) +); +CREATE INDEX IF NOT EXISTS idx_ach_sessions_serial ON ach_sessions(serial); +CREATE INDEX IF NOT EXISTS idx_ach_sessions_time ON ach_sessions(session_time); + +CREATE TABLE IF NOT EXISTS events ( + id TEXT PRIMARY KEY, -- UUID + serial TEXT NOT NULL, + waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field) + session_id TEXT, -- FK → ach_sessions.id + timestamp TEXT, -- ISO-8601 local time from device + tran_ppv REAL, -- in/s + vert_ppv REAL, -- in/s + long_ppv REAL, -- in/s + peak_vector_sum REAL, -- in/s + mic_ppv REAL, -- psi or dB depending on setup + project TEXT, + client TEXT, + operator TEXT, + sensor_location TEXT, + sample_rate INTEGER, + record_type TEXT, -- "single_shot" | "continuous" + false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag) + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + UNIQUE(serial, waveform_key) +); +CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial); +CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp); +CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id); + +CREATE TABLE IF NOT EXISTS monitor_log ( + id TEXT PRIMARY KEY, -- UUID + serial TEXT NOT NULL, + waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field) + session_id TEXT, -- FK → ach_sessions.id + start_time TEXT, -- ISO-8601 + stop_time TEXT, -- ISO-8601 + duration_seconds REAL, + geo_threshold_ips REAL, -- in/s + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + UNIQUE(serial, waveform_key) +); +CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial); +CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time); +CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id); +""" + + +# ── SeismoDb class ───────────────────────────────────────────────────────────── + +class SeismoDb: + """ + Thin SQLite wrapper for seismo-relay persistence. + + Thread-safe: each call opens, uses, and closes a connection with + check_same_thread=False and WAL mode enabled. For the ACH server's + single-writer / occasional-reader pattern this is more than sufficient. + """ + + def __init__(self, db_path: str | Path) -> None: + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._init_schema() + log.info("SeismoDb initialised at %s", self.db_path) + + # ── Internal helpers ─────────────────────────────────────────────────────── + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(str(self.db_path), check_same_thread=False) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode = WAL") + conn.execute("PRAGMA foreign_keys = ON") + return conn + + def _init_schema(self) -> None: + with self._connect() as conn: + conn.executescript(_SCHEMA) + + @staticmethod + def _iso(dt: Optional[datetime.datetime]) -> Optional[str]: + return dt.isoformat() if dt is not None else None + + @staticmethod + def _new_id() -> str: + return str(uuid.uuid4()) + + # ── ACH sessions ────────────────────────────────────────────────────────── + + def insert_ach_session( + self, + *, + serial: str, + peer: Optional[str] = None, + events_downloaded: int = 0, + monitor_entries: int = 0, + duration_seconds: Optional[float] = None, + session_time: Optional[datetime.datetime] = None, + ) -> str: + """Insert a new ACH session row. Returns the new session UUID.""" + sid = self._new_id() + ts = self._iso(session_time or datetime.datetime.utcnow()) + with self._connect() as conn: + conn.execute( + """ + INSERT INTO ach_sessions + (id, serial, session_time, peer, + events_downloaded, monitor_entries, duration_seconds) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + (sid, serial, ts, peer, + events_downloaded, monitor_entries, duration_seconds), + ) + log.debug("ach_session inserted: %s serial=%s events=%d monitor=%d", + sid, serial, events_downloaded, monitor_entries) + return sid + + def get_sessions( + self, + serial: Optional[str] = None, + limit: int = 50, + ) -> list[dict]: + """Return recent ACH sessions, newest first.""" + with self._connect() as conn: + if serial: + rows = conn.execute( + "SELECT * FROM ach_sessions WHERE serial=? " + "ORDER BY session_time DESC LIMIT ?", + (serial, limit), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM ach_sessions ORDER BY session_time DESC LIMIT ?", + (limit,), + ).fetchall() + return [dict(r) for r in rows] + + # ── Events ──────────────────────────────────────────────────────────────── + + def insert_events( + self, + events: list[Event], + *, + serial: str, + session_id: Optional[str] = None, + ) -> tuple[int, int]: + """ + Insert triggered events. Silently skips duplicates (serial+waveform_key). + Returns (inserted, skipped). + """ + inserted = skipped = 0 + with self._connect() as conn: + for ev in events: + key = ev._waveform_key.hex() if ev._waveform_key else None + if key is None: + skipped += 1 + continue + + ts = None + if ev.timestamp: + try: + ts = datetime.datetime( + ev.timestamp.year, ev.timestamp.month, ev.timestamp.day, + ev.timestamp.hour, ev.timestamp.minute, ev.timestamp.second, + ).isoformat() + except Exception: + ts = str(ev.timestamp) + + pv = ev.peak_values + pi = ev.project_info + + try: + conn.execute( + """ + INSERT INTO events + (id, serial, waveform_key, session_id, timestamp, + tran_ppv, vert_ppv, long_ppv, peak_vector_sum, mic_ppv, + project, client, operator, sensor_location, + sample_rate, record_type) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + self._new_id(), serial, key, session_id, ts, + pv.tran if pv else None, + pv.vert if pv else None, + pv.long if pv else None, + pv.peak_vector_sum if pv else None, + pv.micl if pv else None, + pi.project if pi else None, + pi.client if pi else None, + pi.operator if pi else None, + pi.sensor_location if pi else None, + ev.sample_rate, + ev.record_type, + ), + ) + inserted += 1 + except sqlite3.IntegrityError: + skipped += 1 + + log.debug("insert_events serial=%s inserted=%d skipped=%d", + serial, inserted, skipped) + return inserted, skipped + + def query_events( + self, + serial: Optional[str] = None, + from_dt: Optional[datetime.datetime] = None, + to_dt: Optional[datetime.datetime] = None, + false_trigger: Optional[bool] = None, + limit: int = 500, + offset: int = 0, + ) -> list[dict]: + """Query events with optional filters. Returns newest first.""" + clauses: list[str] = [] + params: list = [] + + if serial: + clauses.append("serial = ?") + params.append(serial) + if from_dt: + clauses.append("timestamp >= ?") + params.append(from_dt.isoformat()) + if to_dt: + clauses.append("timestamp <= ?") + params.append(to_dt.isoformat()) + if false_trigger is not None: + clauses.append("false_trigger = ?") + params.append(1 if false_trigger else 0) + + where = ("WHERE " + " AND ".join(clauses)) if clauses else "" + params += [limit, offset] + + with self._connect() as conn: + rows = conn.execute( + f"SELECT * FROM events {where} " + f"ORDER BY timestamp DESC LIMIT ? OFFSET ?", + params, + ).fetchall() + return [dict(r) for r in rows] + + def set_false_trigger(self, event_id: str, value: bool) -> bool: + """Set or clear the false_trigger flag on an event. Returns True if found.""" + with self._connect() as conn: + cur = conn.execute( + "UPDATE events SET false_trigger=? WHERE id=?", + (1 if value else 0, event_id), + ) + return cur.rowcount > 0 + + # ── Monitor log ─────────────────────────────────────────────────────────── + + def insert_monitor_log( + self, + entries: list[MonitorLogEntry], + *, + session_id: Optional[str] = None, + ) -> tuple[int, int]: + """ + Insert monitor log entries. Silently skips duplicates (serial+waveform_key). + Returns (inserted, skipped). + """ + inserted = skipped = 0 + with self._connect() as conn: + for e in entries: + try: + conn.execute( + """ + INSERT INTO monitor_log + (id, serial, waveform_key, session_id, + start_time, stop_time, duration_seconds, + geo_threshold_ips) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + self._new_id(), + e.serial or "", + e.key, + session_id, + self._iso(e.start_time), + self._iso(e.stop_time), + e.duration_seconds, + e.geo_threshold_ips, + ), + ) + inserted += 1 + except sqlite3.IntegrityError: + skipped += 1 + + log.debug("insert_monitor_log inserted=%d skipped=%d", inserted, skipped) + return inserted, skipped + + def query_monitor_log( + self, + serial: Optional[str] = None, + from_dt: Optional[datetime.datetime] = None, + to_dt: Optional[datetime.datetime] = None, + limit: int = 500, + offset: int = 0, + ) -> list[dict]: + """Query monitor log entries with optional filters. Returns newest first.""" + clauses: list[str] = [] + params: list = [] + + if serial: + clauses.append("serial = ?") + params.append(serial) + if from_dt: + clauses.append("start_time >= ?") + params.append(from_dt.isoformat()) + if to_dt: + clauses.append("start_time <= ?") + params.append(to_dt.isoformat()) + + where = ("WHERE " + " AND ".join(clauses)) if clauses else "" + params += [limit, offset] + + with self._connect() as conn: + rows = conn.execute( + f"SELECT * FROM monitor_log {where} " + f"ORDER BY start_time DESC LIMIT ? OFFSET ?", + params, + ).fetchall() + return [dict(r) for r in rows] + + # ── Fleet overview ──────────────────────────────────────────────────────── + + def query_units(self) -> list[dict]: + """ + Return one row per known serial with summary stats: + last_seen, total_events, total_monitor_entries. + """ + with self._connect() as conn: + rows = conn.execute( + """ + SELECT + s.serial, + MAX(s.session_time) AS last_seen, + SUM(s.events_downloaded) AS total_events, + SUM(s.monitor_entries) AS total_monitor_entries, + COUNT(*) AS total_sessions + FROM ach_sessions s + GROUP BY s.serial + ORDER BY last_seen DESC + """ + ).fetchall() + return [dict(r) for r in rows] diff --git a/sfm/server.py b/sfm/server.py index 0ae36ba..db0e1a7 100644 --- a/sfm/server.py +++ b/sfm/server.py @@ -34,6 +34,7 @@ or: from __future__ import annotations +import datetime import logging import sys from pathlib import Path @@ -58,6 +59,7 @@ from minimateplus import MiniMateClient from minimateplus.protocol import ProtocolError from minimateplus.models import ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp from minimateplus.transport import TcpTransport, DEFAULT_TCP_PORT +from sfm.database import SeismoDb logging.basicConfig( level=logging.INFO, @@ -88,6 +90,21 @@ app.add_middleware( ) +# ── DB ──────────────────────────────────────────────────────────────────────── +# Shared SeismoDb instance. Path can be overridden by --db-path at startup, +# or defaults to bridges/captures/seismo_relay.db relative to the repo root. + +_DEFAULT_DB_PATH = Path(__file__).parent.parent / "bridges" / "captures" / "seismo_relay.db" +_db: Optional[SeismoDb] = None + + +def _get_db() -> SeismoDb: + global _db + if _db is None: + _db = SeismoDb(_DEFAULT_DB_PATH) + return _db + + # ── Serialisers ──────────────────────────────────────────────────────────────── # Plain dict helpers — avoids a Pydantic dependency in the library layer. @@ -698,6 +715,108 @@ def device_monitor_stop( return {"status": "stopped"} +# ── DB read endpoints ───────────────────────────────────────────────────────── +# +# These endpoints expose the seismo-relay SQLite DB written by ach_server.py. +# All queries are read-only. Terra-view calls these to build project event +# views, unit history panels, and (eventually) vibration summary reports. + + +@app.get("/db/units") +def db_units() -> list[dict]: + """ + Return one row per known serial with summary stats: + last_seen, total_events, total_monitor_entries, total_sessions. + """ + return _get_db().query_units() + + +@app.get("/db/events") +def db_events( + serial: Optional[str] = Query(None, description="Filter by unit serial (e.g. BE11529)"), + from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"), + to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"), + false_trigger: Optional[bool] = Query(None, description="Filter by false_trigger flag"), + limit: int = Query(500, description="Max rows to return (default 500)"), + offset: int = Query(0, description="Pagination offset"), +) -> dict: + """ + Query triggered events from the DB. + + Returns events newest-first. All filter params are optional. + + Example: + GET /db/events?serial=BE11529&from_dt=2026-04-01&limit=100 + """ + from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None + to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None + + rows = _get_db().query_events( + serial=serial, + from_dt=from_parsed, + to_dt=to_parsed, + false_trigger=false_trigger, + limit=limit, + offset=offset, + ) + return {"count": len(rows), "events": rows} + + +@app.patch("/db/events/{event_id}/false_trigger") +def db_set_false_trigger( + event_id: str, + value: bool = Query(..., description="True to flag as false trigger, False to clear"), +) -> dict: + """ + Set or clear the false_trigger flag on a single event. + + Used by the terra-view event review UI. + Returns 404 if the event_id is not found. + """ + found = _get_db().set_false_trigger(event_id, value) + if not found: + raise HTTPException(status_code=404, detail=f"Event {event_id} not found") + return {"status": "ok", "event_id": event_id, "false_trigger": value} + + +@app.get("/db/monitor_log") +def db_monitor_log( + serial: Optional[str] = Query(None, description="Filter by unit serial"), + from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"), + to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"), + limit: int = Query(500, description="Max rows to return"), + offset: int = Query(0, description="Pagination offset"), +) -> dict: + """ + Query monitor log entries (continuous monitoring intervals) from the DB. + + Returns entries newest-first. + """ + from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None + to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None + + rows = _get_db().query_monitor_log( + serial=serial, + from_dt=from_parsed, + to_dt=to_parsed, + limit=limit, + offset=offset, + ) + return {"count": len(rows), "entries": rows} + + +@app.get("/db/sessions") +def db_sessions( + serial: Optional[str] = Query(None, description="Filter by unit serial"), + limit: int = Query(50, description="Max rows to return"), +) -> dict: + """ + Query ACH call-home sessions from the DB, newest first. + """ + rows = _get_db().get_sessions(serial=serial, limit=limit) + return {"count": len(rows), "sessions": rows} + + # ── Entry point ──────────────────────────────────────────────────────────────── if __name__ == "__main__":