From 3711b11bda31ca6265089309e9a29657c71cf806 Mon Sep 17 00:00:00 2001 From: serversdown Date: Wed, 6 May 2026 19:03:38 +0000 Subject: [PATCH] feat: add waveform store handling --- bridges/ach_server.py | 54 ++++++- sfm/database.py | 66 +++++++- sfm/server.py | 197 ++++++++++++++++++++++- sfm/waveform_store.py | 164 +++++++++++++++++++ tests/test_waveform_store.py | 302 +++++++++++++++++++++++++++++++++++ 5 files changed, 777 insertions(+), 6 deletions(-) create mode 100644 sfm/waveform_store.py create mode 100644 tests/test_waveform_store.py diff --git a/bridges/ach_server.py b/bridges/ach_server.py index edd4f2e..25d988c 100644 --- a/bridges/ach_server.py +++ b/bridges/ach_server.py @@ -70,6 +70,7 @@ from minimateplus.transport import SocketTransport from minimateplus.client import MiniMateClient from minimateplus.models import DeviceInfo, Event, MonitorLogEntry from sfm.database import SeismoDb +from sfm.waveform_store import WaveformStore log = logging.getLogger("ach_server") @@ -139,6 +140,7 @@ class AchSession: max_events: Optional[int], state_path: Path, db: "SeismoDb", + store: "WaveformStore", clear_after_download: bool = False, restart_monitoring: bool = False, ) -> None: @@ -150,6 +152,7 @@ class AchSession: self.max_events = max_events self.state_path = state_path self.db = db + self.store = store self.clear_after_download = clear_after_download self.restart_monitoring = restart_monitoring @@ -407,8 +410,37 @@ class AchSession: if skipped: log.info(" (skipped %d already-downloaded event(s))", skipped) + # ── Persist .G10 / A5 sidecars to the waveform store ── + # Saves ride alongside the existing JSON dump so the on-disk + # .G10 and the events.json reference the same set of events. + waveform_records: dict[str, dict] = {} + for ev in new_events: + if not ev._a5_frames: + continue + try: + rec = self.store.save( + ev, + serial=serial or "UNKNOWN", + a5_frames=ev._a5_frames, + ) + if ev._waveform_key is not None: + waveform_records[ev._waveform_key.hex()] = rec + log.info( + " [WAVE] saved %s (%d bytes)", + rec["filename"], rec["filesize"], + ) + except Exception as exc: + key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????" + log.warning( + " [WARN] Waveform store save failed for %s: %s", + key_hex, exc, + ) + if new_events: - _save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events]) + _save_json( + session_dir / "events.json", + [_event_to_dict(e, waveform_records) for e in new_events], + ) for ev in new_events: pv = ev.peak_values @@ -467,7 +499,10 @@ class AchSession: _session_start = datetime.datetime.now() try: _ev_ins, _ev_skip = self.db.insert_events( - new_events, serial=serial or self.peer, session_id=None + new_events, + serial=serial or self.peer, + session_id=None, + waveform_records=waveform_records, ) _ml_ins, _ml_skip = self.db.insert_monitor_log( new_monitor_entries, session_id=None @@ -592,7 +627,10 @@ def _device_info_to_dict(d: DeviceInfo) -> dict: } -def _event_to_dict(e: Event) -> dict: +def _event_to_dict( + e: Event, + waveform_records: Optional[dict[str, dict]] = None, +) -> dict: pv = e.peak_values pi = e.project_info peaks = {} @@ -611,6 +649,11 @@ def _event_to_dict(e: Event) -> dict: for ch, vals in e.raw_samples.items() } samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform" + + rec: dict = {} + if waveform_records and e._waveform_key is not None: + rec = waveform_records.get(e._waveform_key.hex(), {}) or {} + return { "timestamp": str(e.timestamp) if e.timestamp else None, "project": pi.project if pi else None, @@ -619,6 +662,9 @@ def _event_to_dict(e: Event) -> dict: "sensor_location": pi.sensor_location if pi else None, "peaks": peaks, "raw_samples_preview": samples, + "blastware_filename": rec.get("filename"), + "blastware_filesize": rec.get("filesize"), + "a5_pickle_filename": rec.get("a5_pickle_filename"), } @@ -640,6 +686,7 @@ def serve(args: argparse.Namespace) -> None: output_dir.mkdir(parents=True, exist_ok=True) state_path = output_dir / "ach_state.json" db = SeismoDb(output_dir / "seismo_relay.db") + store = WaveformStore(output_dir / "waveforms") server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -694,6 +741,7 @@ def serve(args: argparse.Namespace) -> None: max_events=max_ev, state_path=state_path, db=db, + store=store, clear_after_download=args.clear_after_download, restart_monitoring=args.restart_monitoring, ) diff --git a/sfm/database.py b/sfm/database.py index 110ba7a..0f3c648 100644 --- a/sfm/database.py +++ b/sfm/database.py @@ -81,6 +81,9 @@ CREATE TABLE IF NOT EXISTS events ( sample_rate INTEGER, record_type TEXT, -- "single_shot" | "continuous" false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag) + blastware_filename TEXT, -- e.g. "M529LKIQ.G10" — within waveform store + blastware_filesize INTEGER, -- bytes; NULL if no .G10 saved + a5_pickle_filename TEXT, -- ".a5.pkl" sidecar created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), UNIQUE(serial, timestamp) ); @@ -184,6 +187,20 @@ class SeismoDb: """) log.info("_migrate: events table rebuilt OK") + # Migration 1b: add Blastware-file columns to existing events tables. + # New columns are NULLable so old rows just read NULL. + existing_cols = { + r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall() + } + for col, ddl in ( + ("blastware_filename", "TEXT"), + ("blastware_filesize", "INTEGER"), + ("a5_pickle_filename", "TEXT"), + ): + if col not in existing_cols: + log.info("_migrate: events ADD COLUMN %s %s", col, ddl) + conn.execute(f"ALTER TABLE events ADD COLUMN {col} {ddl}") + # Migration 2: change monitor_log UNIQUE from (serial, waveform_key) to # (serial, start_time) — same reasoning as events. row = conn.execute( @@ -282,12 +299,24 @@ class SeismoDb: *, serial: str, session_id: Optional[str] = None, + waveform_records: Optional[dict[str, dict]] = None, ) -> tuple[int, int]: """ Insert triggered events. Silently skips duplicates (serial+timestamp). Returns (inserted, skipped). + + ``waveform_records`` (optional): dict keyed by event waveform_key (hex) + whose value is a record from ``WaveformStore.save()``: + {"filename": str, "filesize": int, "a5_pickle_filename": str} + + For events whose key is in this dict, the matching columns are + populated. If a row with the same (serial, timestamp) already exists + (dedup hit), the matching waveform record is upserted onto the + existing row so a re-download via the live endpoint refreshes the + file metadata. """ inserted = skipped = 0 + wave_recs = waveform_records or {} with self._connect() as conn: for ev in events: key = ev._waveform_key.hex() if ev._waveform_key else None @@ -307,6 +336,7 @@ class SeismoDb: pv = ev.peak_values pi = ev.project_info + rec = wave_recs.get(key) or {} try: conn.execute( @@ -315,8 +345,9 @@ class SeismoDb: (id, serial, waveform_key, session_id, timestamp, tran_ppv, vert_ppv, long_ppv, peak_vector_sum, mic_ppv, project, client, operator, sensor_location, - sample_rate, record_type) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + sample_rate, record_type, + blastware_filename, blastware_filesize, a5_pickle_filename) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( self._new_id(), serial, key, session_id, ts, @@ -331,16 +362,47 @@ class SeismoDb: pi.sensor_location if pi else None, ev.sample_rate, ev.record_type, + rec.get("filename"), + rec.get("filesize"), + rec.get("a5_pickle_filename"), ), ) inserted += 1 except sqlite3.IntegrityError: skipped += 1 + # Upsert waveform fields onto the existing dedup row so a + # re-download via the live endpoint refreshes filename / + # size / sidecar without churning the rest of the row. + if rec and ts: + conn.execute( + """ + UPDATE events + SET blastware_filename = ?, + blastware_filesize = ?, + a5_pickle_filename = ? + WHERE serial = ? AND timestamp = ? + """, + ( + rec.get("filename"), + rec.get("filesize"), + rec.get("a5_pickle_filename"), + serial, + ts, + ), + ) log.debug("insert_events serial=%s inserted=%d skipped=%d", serial, inserted, skipped) return inserted, skipped + def get_event(self, event_id: str) -> Optional[dict]: + """Return one event row by id, or None.""" + with self._connect() as conn: + row = conn.execute( + "SELECT * FROM events WHERE id = ?", (event_id,), + ).fetchone() + return dict(row) if row else None + def query_events( self, serial: Optional[str] = None, diff --git a/sfm/server.py b/sfm/server.py index 9683254..1f0b525 100644 --- a/sfm/server.py +++ b/sfm/server.py @@ -47,7 +47,7 @@ from typing import Optional try: from fastapi import Body, FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware - from fastapi.responses import FileResponse, JSONResponse + from fastapi.responses import FileResponse, JSONResponse, StreamingResponse from pydantic import BaseModel import uvicorn except ImportError: @@ -63,8 +63,10 @@ from minimateplus.protocol import ProtocolError from minimateplus.models import CallHomeConfig, ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp from minimateplus.transport import TcpTransport, DEFAULT_TCP_PORT from minimateplus.blastware_file import write_blastware_file, blastware_filename +from minimateplus.client import _decode_a5_metadata_into, _decode_a5_waveform from sfm.cache import SFMCache, get_cache from sfm.database import SeismoDb +from sfm.waveform_store import WaveformStore logging.basicConfig( level=logging.INFO, @@ -101,6 +103,7 @@ app.add_middleware( _DEFAULT_DB_PATH = Path(__file__).parent.parent / "bridges" / "captures" / "seismo_relay.db" _db: Optional[SeismoDb] = None +_store: Optional[WaveformStore] = None def _get_db() -> SeismoDb: @@ -110,6 +113,18 @@ def _get_db() -> SeismoDb: return _db +def _get_store() -> WaveformStore: + """ + Persistent .G10 + A5-sidecar store, rooted at /waveforms/. + Mirrors the layout used by bridges/ach_server.py so files saved by ACH + ingestion and by live SFM downloads share one canonical location. + """ + global _store + if _store is None: + _store = WaveformStore(_get_db().db_path.parent / "waveforms") + return _store + + # ── Live device cache ───────────────────────────────────────────────────────── # In-memory cache for live device data. Avoids re-dialing the device on every # request when the data hasn't changed. @@ -946,6 +961,27 @@ def device_event_blastware_file( out_path, len(a5_frames), serial, ) + # Promote to canonical persistent store + DB row so this event is + # queryable via /db/events afterwards (matches the ACH ingestion path). + if serial != "UNKNOWN" and ev._waveform_key is not None: + try: + rec = _get_store().save(ev, serial=serial, a5_frames=a5_frames) + _get_db().insert_events( + [ev], + serial=serial, + waveform_records={ev._waveform_key.hex(): rec}, + ) + log.info( + "blastware_file: persisted to store (%s, %d bytes)", + rec["filename"], rec["filesize"], + ) + except Exception as exc: + log.warning( + "blastware_file: persistent store save failed: %s " + "— temp file still served", + exc, + ) + return FileResponse( path=str(out_path), filename=filename, @@ -1435,6 +1471,165 @@ def db_set_false_trigger( return {"status": "ok", "event_id": event_id, "false_trigger": value} +# ── /db/events/{id} — waveform file accessors ───────────────────────────────── +# +# These endpoints serve files from the persistent WaveformStore, so a Blastware +# file or its decoded JSON for a previously-ingested ACH event can be fetched +# without re-dialing the device. + +@app.get("/db/events/{event_id}/blastware_file") +def db_event_blastware_file(event_id: str) -> FileResponse: + """ + Return the Blastware-format waveform file (.G10/.W/.H/etc.) for a + previously-ingested event. 404 if the event is unknown or has no + .G10 in the store (events ingested before the store was wired will + show this — re-download via the live endpoint to populate). + """ + row = _get_db().get_event(event_id) + if row is None: + raise HTTPException(status_code=404, detail=f"Event {event_id} not found") + serial = row.get("serial") + filename = row.get("blastware_filename") + if not serial or not filename: + raise HTTPException( + status_code=404, + detail=( + f"Event {event_id} has no Blastware file in the store. " + "Re-download via the live endpoint to populate." + ), + ) + bw_path = _get_store().open_blastware(serial, filename) + if bw_path is None: + raise HTTPException( + status_code=410, + detail=f"Stored file missing on disk: {filename}", + ) + return FileResponse( + path=str(bw_path), + filename=filename, + media_type="application/octet-stream", + ) + + +@app.get("/db/events/{event_id}/waveform.json") +def db_event_waveform_json(event_id: str) -> dict: + """ + Return the decoded raw_samples + metadata for a stored event in the + same JSON shape as `/device/event/{index}/waveform`. + + Reads the `.a5.pkl` sidecar from the store, rebuilds an Event in + memory, runs the standard A5 decoders, and serialises the result. + """ + row = _get_db().get_event(event_id) + if row is None: + raise HTTPException(status_code=404, detail=f"Event {event_id} not found") + serial = row.get("serial") + filename = row.get("blastware_filename") + a5_name = row.get("a5_pickle_filename") + if not serial or not filename or not a5_name: + raise HTTPException( + status_code=404, + detail=( + f"Event {event_id} has no A5 sidecar in the store. " + "Re-download via the live endpoint to populate." + ), + ) + a5_frames = _get_store().load_a5(serial, filename) + if not a5_frames: + raise HTTPException( + status_code=410, + detail=f"A5 sidecar missing or unreadable: {a5_name}", + ) + + # Rebuild a minimal Event from DB fields, then decode A5 onto it. + ev = Event(index=-1) + try: + _decode_a5_metadata_into(a5_frames, ev) + except Exception as exc: + log.warning("db_event_waveform_json: metadata decode failed: %s", exc) + try: + _decode_a5_waveform(a5_frames, ev) + except Exception as exc: + log.error("db_event_waveform_json: waveform decode failed: %s", exc, exc_info=True) + raise HTTPException(status_code=500, detail=f"Waveform decode failed: {exc}") from exc + + raw = getattr(ev, "raw_samples", None) or {} + samples_decoded = len(raw.get("Tran", [])) + return { + "event_id": event_id, + "serial": serial, + "record_type": ev.record_type or row.get("record_type"), + "timestamp": _serialise_timestamp(ev.timestamp) or row.get("timestamp"), + "total_samples": ev.total_samples, + "pretrig_samples": ev.pretrig_samples, + "rectime_seconds": ev.rectime_seconds, + "samples_decoded": samples_decoded, + "sample_rate": ev.sample_rate or row.get("sample_rate"), + "peak_values": _serialise_peak_values(ev.peak_values) or { + "transverse": row.get("tran_ppv"), + "vertical": row.get("vert_ppv"), + "longitudinal": row.get("long_ppv"), + "vector_sum": row.get("peak_vector_sum"), + "mic": row.get("mic_ppv"), + }, + "channels": raw, + } + + +@app.get("/db/units/{serial}/waveforms.zip") +def db_unit_waveforms_zip( + serial: str, + from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"), + to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"), + limit: int = Query(5000, description="Hard cap on events bundled (default 5000)"), +) -> StreamingResponse: + """ + Stream a ZIP of all .G10/.W files for a serial in the optional date range. + Events without a stored Blastware file are silently skipped. + """ + import io + import zipfile + + from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None + to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None + + rows = _get_db().query_events( + serial=serial, + from_dt=from_parsed, + to_dt=to_parsed, + limit=limit, + offset=0, + ) + store = _get_store() + + buf = io.BytesIO() + written = 0 + with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: + for row in rows: + fn = row.get("blastware_filename") + if not fn: + continue + bw_path = store.open_blastware(serial, fn) + if bw_path is None: + continue + zf.write(bw_path, arcname=fn) + written += 1 + + if written == 0: + raise HTTPException( + status_code=404, + detail=f"No stored Blastware files found for serial {serial} in range", + ) + + buf.seek(0) + safe_serial = serial.replace("/", "_") + headers = { + "Content-Disposition": f'attachment; filename="{safe_serial}_waveforms.zip"', + "X-Waveform-Count": str(written), + } + return StreamingResponse(buf, media_type="application/zip", headers=headers) + + @app.get("/db/monitor_log") def db_monitor_log( serial: Optional[str] = Query(None, description="Filter by unit serial"), diff --git a/sfm/waveform_store.py b/sfm/waveform_store.py new file mode 100644 index 0000000..edf097a --- /dev/null +++ b/sfm/waveform_store.py @@ -0,0 +1,164 @@ +""" +sfm/waveform_store.py — On-disk store for Blastware-format waveform files. + +Layout (flat per-serial): + + // ← .G10 / .W / .H / etc. (Blastware-readable) + //.a5.pkl ← pickled list of A5 S3Frame dicts + +`` is whatever `minimateplus.blastware_file.blastware_filename` +produces for the event (encodes serial + timestamp + record type). Filenames +never collide for the same physical event. + +The `.a5.pkl` sidecar lets the .G10 be regenerated later if the encoder +changes — captures the raw 5A frame stream as serializable dicts so the +schema isn't tied to the `S3Frame` dataclass layout. +""" + +from __future__ import annotations + +import logging +import pickle +from pathlib import Path +from typing import Optional + +from minimateplus.blastware_file import blastware_filename, write_blastware_file +from minimateplus.framing import S3Frame +from minimateplus.models import Event + +log = logging.getLogger("sfm.waveform_store") + +A5_PICKLE_VERSION = 1 + + +def _frame_to_dict(f: S3Frame) -> dict: + return { + "sub": f.sub, + "page_hi": f.page_hi, + "page_lo": f.page_lo, + "data": bytes(f.data), + "chk_byte": f.chk_byte, + "checksum_valid": f.checksum_valid, + } + + +def _dict_to_frame(d: dict) -> S3Frame: + return S3Frame( + sub=d["sub"], + page_hi=d["page_hi"], + page_lo=d["page_lo"], + data=bytes(d["data"]), + checksum_valid=d.get("checksum_valid", True), + chk_byte=d.get("chk_byte", 0), + ) + + +class WaveformStore: + """ + Persistent store for Blastware-format waveform files + their A5 source frames. + + Thread safety: write_blastware_file is single-shot; concurrent saves of the + *same* filename would race, but the filename encodes second-resolution + timestamps + serial, so collisions across threads/processes are vanishingly + unlikely in practice. + """ + + def __init__(self, root: str | Path) -> None: + self.root = Path(root) + self.root.mkdir(parents=True, exist_ok=True) + log.info("WaveformStore root=%s", self.root) + + # ── path helpers ──────────────────────────────────────────────────────────── + + def _serial_dir(self, serial: str) -> Path: + d = self.root / serial + d.mkdir(parents=True, exist_ok=True) + return d + + def paths_for(self, serial: str, filename: str) -> tuple[Path, Path]: + """Return (blastware_path, a5_pickle_path) for a given serial+filename.""" + d = self._serial_dir(serial) + return d / filename, d / f"{filename}.a5.pkl" + + def open_blastware(self, serial: str, filename: str) -> Optional[Path]: + """Return absolute path to an existing .G10 file or None.""" + bw_path, _ = self.paths_for(serial, filename) + return bw_path if bw_path.exists() else None + + # ── save / load ───────────────────────────────────────────────────────────── + + def save( + self, + ev: Event, + serial: str, + a5_frames: list[S3Frame], + ) -> dict: + """ + Write the .G10 file and the .a5.pkl sidecar for one event. + + Returns a record dict suitable for persisting alongside the DB row: + + { + "filename": "M529LKIQ.G10", + "filesize": 8708, + "a5_pickle_filename": "M529LKIQ.G10.a5.pkl", + } + + Idempotent: if the .G10 already exists, it is overwritten with the + freshly-encoded version (same bytes for the same a5_frames). + """ + if not a5_frames: + raise ValueError("WaveformStore.save: a5_frames is empty") + if not serial: + raise ValueError("WaveformStore.save: serial is required") + + filename = blastware_filename(ev, serial) + bw_path, a5_path = self.paths_for(serial, filename) + + # 1. encode the .G10 + # Delete any stale file at this path so partial writes never leak + # trailing bytes from a previous larger file (matches the live + # endpoint's defensive unlink). + try: + bw_path.unlink() + except FileNotFoundError: + pass + write_blastware_file(ev, a5_frames, bw_path) + filesize = bw_path.stat().st_size + + # 2. write the .a5.pkl sidecar + try: + a5_path.unlink() + except FileNotFoundError: + pass + payload = { + "version": A5_PICKLE_VERSION, + "frames": [_frame_to_dict(f) for f in a5_frames], + } + with a5_path.open("wb") as fp: + pickle.dump(payload, fp, protocol=pickle.HIGHEST_PROTOCOL) + + log.info( + "WaveformStore.save serial=%s filename=%s filesize=%d frames=%d", + serial, filename, filesize, len(a5_frames), + ) + return { + "filename": filename, + "filesize": filesize, + "a5_pickle_filename": a5_path.name, + } + + def load_a5(self, serial: str, filename: str) -> Optional[list[S3Frame]]: + """ + Re-hydrate the pickled A5 frame stream for a stored event. + Returns None if the sidecar is missing. + """ + _, a5_path = self.paths_for(serial, filename) + if not a5_path.exists(): + return None + with a5_path.open("rb") as fp: + payload = pickle.load(fp) + if not isinstance(payload, dict) or "frames" not in payload: + log.warning("WaveformStore.load_a5: malformed sidecar at %s", a5_path) + return None + return [_dict_to_frame(d) for d in payload["frames"]] diff --git a/tests/test_waveform_store.py b/tests/test_waveform_store.py new file mode 100644 index 0000000..4207b25 --- /dev/null +++ b/tests/test_waveform_store.py @@ -0,0 +1,302 @@ +""" +test_waveform_store.py — unit tests for sfm/waveform_store.py and the +SeismoDb columns + insert_events upsert path that the store depends on. + +These tests exercise the *store + DB plumbing* in isolation — they do not +re-test write_blastware_file (covered separately) and do not require a live +device or a wire capture. + +Run: + python -m pytest tests/test_waveform_store.py -v +""" + +from __future__ import annotations + +import os +import sys +import datetime +from pathlib import Path + +try: + import pytest +except ImportError: # allow running standalone without pytest installed + pytest = None # type: ignore + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from minimateplus.framing import S3Frame +from minimateplus.models import Event, Timestamp + + +# ── Test fixtures ────────────────────────────────────────────────────────────── + + +def _make_synthetic_event() -> tuple[Event, list[S3Frame]]: + """ + Build a minimal Event + a 3-frame A5 stream that satisfies + write_blastware_file's STRT-extraction path. + + Frame 0 (probe): contains a STRT record at the canonical position so + write_blastware_file finds it without falling back. + Frame 1 (sample): 0x0200 bytes of zeros at page_key=0x0010 (sample marker). + Frame 2 (TERM): page_key=0x0000 marks the terminator. + """ + key4 = bytes.fromhex("01110000") + rectime = 3 + strt = b"STRT" + b"\xff\xfe" + key4 + key4 + bytes(7) + bytes([rectime]) + + # Probe payload prefix: 7 zero bytes then STRT (matches blastware_file._strip + # logic which looks for STRT in data[7:]). Tail with 32 zero bytes of fake + # body so reconstruction has something to slice. + probe_data = bytes(7) + strt + bytes(32) + probe = S3Frame(sub=0xA5, page_hi=0x10, page_lo=0x00, data=probe_data, + checksum_valid=True, chk_byte=0x00) + + sample = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x10, + data=bytes(7) + bytes(0x0200), checksum_valid=True, + chk_byte=0x00) + + term = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x00, + data=bytes(7) + bytes(64), checksum_valid=True, + chk_byte=0x00) + + ev = Event(index=0) + ev._waveform_key = key4 + ev.timestamp = Timestamp( + raw=b"", + flag=0x10, + year=2026, + unknown_byte=0, + month=5, + day=6, + hour=12, + minute=34, + second=56, + ) + ev.rectime_seconds = rectime + ev.record_type = "Waveform" + ev._a5_frames = [probe, sample, term] + return ev, [probe, sample, term] + + +# ── Frame round-trip ─────────────────────────────────────────────────────────── + + +def test_frame_dict_round_trip(): + """_frame_to_dict and _dict_to_frame must round-trip every field.""" + from sfm.waveform_store import _dict_to_frame, _frame_to_dict + + f = S3Frame( + sub=0xA5, page_hi=0x12, page_lo=0x34, + data=b"\x10\x02\x00\xab\xcd", + checksum_valid=False, + chk_byte=0x42, + ) + d = _frame_to_dict(f) + g = _dict_to_frame(d) + assert g.sub == f.sub + assert g.page_hi == f.page_hi + assert g.page_lo == f.page_lo + assert g.data == f.data + assert g.checksum_valid == f.checksum_valid + assert g.chk_byte == f.chk_byte + + +# ── Store save/load round-trip ───────────────────────────────────────────────── + + +def test_waveform_store_save_load_round_trip(tmp_path: Path): + """save() writes both files; load_a5() returns equivalent frames.""" + from sfm.waveform_store import WaveformStore + + store = WaveformStore(tmp_path / "waveforms") + ev, frames = _make_synthetic_event() + + rec = store.save(ev, serial="BE11529", a5_frames=frames) + + assert rec["filename"].startswith("M529") + assert rec["filesize"] > 0 + assert rec["a5_pickle_filename"] == rec["filename"] + ".a5.pkl" + + bw_path = store.open_blastware("BE11529", rec["filename"]) + assert bw_path is not None + assert bw_path.exists() + assert bw_path.stat().st_size == rec["filesize"] + + # Sidecar exists and round-trips + loaded = store.load_a5("BE11529", rec["filename"]) + assert loaded is not None + assert len(loaded) == len(frames) + for orig, got in zip(frames, loaded): + assert got.sub == orig.sub + assert got.page_hi == orig.page_hi + assert got.page_lo == orig.page_lo + assert got.data == orig.data + + +def test_waveform_store_missing_returns_none(tmp_path: Path): + """open_blastware / load_a5 return None for nonexistent entries.""" + from sfm.waveform_store import WaveformStore + + store = WaveformStore(tmp_path / "waveforms") + assert store.open_blastware("BE99999", "no_such.G10") is None + assert store.load_a5("BE99999", "no_such.G10") is None + + +def test_waveform_store_idempotent_save(tmp_path: Path): + """Saving the same event twice produces the same .G10 bytes.""" + from sfm.waveform_store import WaveformStore + + store = WaveformStore(tmp_path / "waveforms") + ev, frames = _make_synthetic_event() + + rec1 = store.save(ev, serial="BE11529", a5_frames=frames) + bw_path = store.open_blastware("BE11529", rec1["filename"]) + bytes1 = bw_path.read_bytes() + + rec2 = store.save(ev, serial="BE11529", a5_frames=frames) + bytes2 = bw_path.read_bytes() + + assert rec1["filename"] == rec2["filename"] + assert bytes1 == bytes2 + + +# ── DB integration ──────────────────────────────────────────────────────────── + + +def test_seismodb_persists_waveform_columns(tmp_path: Path): + """insert_events writes the new columns when waveform_records is supplied.""" + from sfm.database import SeismoDb + + db = SeismoDb(tmp_path / "seismo_relay.db") + ev, _ = _make_synthetic_event() + + rec = { + "filename": "M529LKIQ.G10", + "filesize": 8708, + "a5_pickle_filename": "M529LKIQ.G10.a5.pkl", + } + inserted, skipped = db.insert_events( + [ev], + serial="BE11529", + waveform_records={ev._waveform_key.hex(): rec}, + ) + assert inserted == 1 + assert skipped == 0 + + rows = db.query_events(serial="BE11529") + assert len(rows) == 1 + row = rows[0] + assert row["blastware_filename"] == rec["filename"] + assert row["blastware_filesize"] == rec["filesize"] + assert row["a5_pickle_filename"] == rec["a5_pickle_filename"] + + # get_event by id returns the same fields + row2 = db.get_event(row["id"]) + assert row2 is not None + assert row2["blastware_filename"] == rec["filename"] + + +def test_seismodb_dedup_upserts_waveform_fields(tmp_path: Path): + """Re-inserting the same (serial, timestamp) refreshes waveform fields.""" + from sfm.database import SeismoDb + + db = SeismoDb(tmp_path / "seismo_relay.db") + ev, _ = _make_synthetic_event() + + db.insert_events([ev], serial="BE11529") # no waveform record yet + rows = db.query_events(serial="BE11529") + assert rows[0]["blastware_filename"] is None + + rec = { + "filename": "M529LKIQ.G10", + "filesize": 4242, + "a5_pickle_filename": "M529LKIQ.G10.a5.pkl", + } + inserted, skipped = db.insert_events( + [ev], + serial="BE11529", + waveform_records={ev._waveform_key.hex(): rec}, + ) + assert inserted == 0 # dedup'd + assert skipped == 1 + rows = db.query_events(serial="BE11529") + assert rows[0]["blastware_filename"] == rec["filename"] + assert rows[0]["blastware_filesize"] == 4242 + + +def test_seismodb_migration_adds_columns(tmp_path: Path): + """An existing DB without the new columns gets them added on init.""" + import sqlite3 + + db_path = tmp_path / "old.db" + # Build a "v0" events table without the new columns. + with sqlite3.connect(str(db_path)) as conn: + conn.executescript(""" + CREATE TABLE events ( + id TEXT PRIMARY KEY, + serial TEXT NOT NULL, + waveform_key TEXT NOT NULL, + session_id TEXT, + timestamp TEXT, + tran_ppv REAL, + vert_ppv REAL, + long_ppv REAL, + peak_vector_sum REAL, + mic_ppv REAL, + project TEXT, + client TEXT, + operator TEXT, + sensor_location TEXT, + sample_rate INTEGER, + record_type TEXT, + false_trigger INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + UNIQUE(serial, timestamp) + ); + INSERT INTO events + (id, serial, waveform_key, timestamp) + VALUES + ('legacy-id', 'BE11529', '01110000', + '2026-04-01T12:00:00'); + """) + + # Initialise SeismoDb against the old DB — migration should run. + from sfm.database import SeismoDb + + db = SeismoDb(db_path) + rows = db.query_events(serial="BE11529") + assert len(rows) == 1 + assert rows[0]["blastware_filename"] is None + assert "blastware_filesize" in rows[0] + assert "a5_pickle_filename" in rows[0] + + +if __name__ == "__main__": + if pytest is not None: + pytest.main([__file__, "-v"]) + else: + # Standalone runner — does not require pytest. + import inspect + import tempfile + import traceback as _tb + + passed = failed = 0 + for _name, _fn in sorted(globals().items()): + if not _name.startswith("test_") or not callable(_fn): + continue + try: + _sig = inspect.signature(_fn) + if "tmp_path" in _sig.parameters: + with tempfile.TemporaryDirectory() as _td: + _fn(Path(_td)) + else: + _fn() + print(f"PASS {_name}") + passed += 1 + except Exception: + print(f"FAIL {_name}") + _tb.print_exc() + failed += 1 + print(f"\n{passed} passed, {failed} failed") + sys.exit(0 if failed == 0 else 1)