""" test_waveform_store.py — unit tests for sfm/waveform_store.py and the SeismoDb columns + insert_events upsert path that the store depends on. These tests exercise the *store + DB plumbing* in isolation — they do not re-test write_blastware_file (covered separately) and do not require a live device or a wire capture. Run: python -m pytest tests/test_waveform_store.py -v """ from __future__ import annotations import os import sys import datetime from pathlib import Path try: import pytest except ImportError: # allow running standalone without pytest installed pytest = None # type: ignore sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from minimateplus.framing import S3Frame from minimateplus.models import Event, Timestamp # ── Test fixtures ────────────────────────────────────────────────────────────── def _make_synthetic_event() -> tuple[Event, list[S3Frame]]: """ Build a minimal Event + a 3-frame A5 stream that satisfies write_blastware_file's STRT-extraction path. Frame 0 (probe): contains a STRT record at the canonical position so write_blastware_file finds it without falling back. Frame 1 (sample): 0x0200 bytes of zeros at page_key=0x0010 (sample marker). Frame 2 (TERM): page_key=0x0000 marks the terminator. """ key4 = bytes.fromhex("01110000") rectime = 3 strt = b"STRT" + b"\xff\xfe" + key4 + key4 + bytes(7) + bytes([rectime]) # Probe payload prefix: 7 zero bytes then STRT (matches blastware_file._strip # logic which looks for STRT in data[7:]). Tail with 32 zero bytes of fake # body so reconstruction has something to slice. probe_data = bytes(7) + strt + bytes(32) probe = S3Frame(sub=0xA5, page_hi=0x10, page_lo=0x00, data=probe_data, checksum_valid=True, chk_byte=0x00) sample = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x10, data=bytes(7) + bytes(0x0200), checksum_valid=True, chk_byte=0x00) term = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x00, data=bytes(7) + bytes(64), checksum_valid=True, chk_byte=0x00) ev = Event(index=0) ev._waveform_key = key4 ev.timestamp = Timestamp( raw=b"", flag=0x10, year=2026, unknown_byte=0, month=5, day=6, hour=12, minute=34, second=56, ) ev.rectime_seconds = rectime ev.record_type = "Waveform" ev._a5_frames = [probe, sample, term] return ev, [probe, sample, term] # ── Frame round-trip ─────────────────────────────────────────────────────────── def test_frame_dict_round_trip(): """_frame_to_dict and _dict_to_frame must round-trip every field.""" from sfm.waveform_store import _dict_to_frame, _frame_to_dict f = S3Frame( sub=0xA5, page_hi=0x12, page_lo=0x34, data=b"\x10\x02\x00\xab\xcd", checksum_valid=False, chk_byte=0x42, ) d = _frame_to_dict(f) g = _dict_to_frame(d) assert g.sub == f.sub assert g.page_hi == f.page_hi assert g.page_lo == f.page_lo assert g.data == f.data assert g.checksum_valid == f.checksum_valid assert g.chk_byte == f.chk_byte # ── Store save/load round-trip ───────────────────────────────────────────────── def test_waveform_store_save_load_round_trip(tmp_path: Path): """save() writes both files; load_a5() returns equivalent frames.""" from sfm.waveform_store import WaveformStore store = WaveformStore(tmp_path / "waveforms") ev, frames = _make_synthetic_event() rec = store.save(ev, serial="BE11529", a5_frames=frames) assert rec["filename"].startswith("M529") assert rec["filesize"] > 0 assert rec["a5_pickle_filename"] == rec["filename"] + ".a5.pkl" bw_path = store.open_blastware("BE11529", rec["filename"]) assert bw_path is not None assert bw_path.exists() assert bw_path.stat().st_size == rec["filesize"] # Sidecar exists and round-trips loaded = store.load_a5("BE11529", rec["filename"]) assert loaded is not None assert len(loaded) == len(frames) for orig, got in zip(frames, loaded): assert got.sub == orig.sub assert got.page_hi == orig.page_hi assert got.page_lo == orig.page_lo assert got.data == orig.data def test_waveform_store_missing_returns_none(tmp_path: Path): """open_blastware / load_a5 return None for nonexistent entries.""" from sfm.waveform_store import WaveformStore store = WaveformStore(tmp_path / "waveforms") assert store.open_blastware("BE99999", "no_such.G10") is None assert store.load_a5("BE99999", "no_such.G10") is None def test_waveform_store_idempotent_save(tmp_path: Path): """Saving the same event twice produces the same .G10 bytes.""" from sfm.waveform_store import WaveformStore store = WaveformStore(tmp_path / "waveforms") ev, frames = _make_synthetic_event() rec1 = store.save(ev, serial="BE11529", a5_frames=frames) bw_path = store.open_blastware("BE11529", rec1["filename"]) bytes1 = bw_path.read_bytes() rec2 = store.save(ev, serial="BE11529", a5_frames=frames) bytes2 = bw_path.read_bytes() assert rec1["filename"] == rec2["filename"] assert bytes1 == bytes2 # ── DB integration ──────────────────────────────────────────────────────────── def test_seismodb_persists_waveform_columns(tmp_path: Path): """insert_events writes the new columns when waveform_records is supplied.""" from sfm.database import SeismoDb db = SeismoDb(tmp_path / "seismo_relay.db") ev, _ = _make_synthetic_event() rec = { "filename": "M529LKIQ.G10", "filesize": 8708, "a5_pickle_filename": "M529LKIQ.G10.a5.pkl", } inserted, skipped = db.insert_events( [ev], serial="BE11529", waveform_records={ev._waveform_key.hex(): rec}, ) assert inserted == 1 assert skipped == 0 rows = db.query_events(serial="BE11529") assert len(rows) == 1 row = rows[0] assert row["blastware_filename"] == rec["filename"] assert row["blastware_filesize"] == rec["filesize"] assert row["a5_pickle_filename"] == rec["a5_pickle_filename"] # get_event by id returns the same fields row2 = db.get_event(row["id"]) assert row2 is not None assert row2["blastware_filename"] == rec["filename"] def test_seismodb_dedup_upserts_waveform_fields(tmp_path: Path): """Re-inserting the same (serial, timestamp) refreshes waveform fields.""" from sfm.database import SeismoDb db = SeismoDb(tmp_path / "seismo_relay.db") ev, _ = _make_synthetic_event() db.insert_events([ev], serial="BE11529") # no waveform record yet rows = db.query_events(serial="BE11529") assert rows[0]["blastware_filename"] is None rec = { "filename": "M529LKIQ.G10", "filesize": 4242, "a5_pickle_filename": "M529LKIQ.G10.a5.pkl", } inserted, skipped = db.insert_events( [ev], serial="BE11529", waveform_records={ev._waveform_key.hex(): rec}, ) assert inserted == 0 # dedup'd assert skipped == 1 rows = db.query_events(serial="BE11529") assert rows[0]["blastware_filename"] == rec["filename"] assert rows[0]["blastware_filesize"] == 4242 def test_seismodb_migration_adds_columns(tmp_path: Path): """An existing DB without the new columns gets them added on init.""" import sqlite3 db_path = tmp_path / "old.db" # Build a "v0" events table without the new columns. with sqlite3.connect(str(db_path)) as conn: conn.executescript(""" CREATE TABLE events ( id TEXT PRIMARY KEY, serial TEXT NOT NULL, waveform_key TEXT NOT NULL, session_id TEXT, timestamp TEXT, tran_ppv REAL, vert_ppv REAL, long_ppv REAL, peak_vector_sum REAL, mic_ppv REAL, project TEXT, client TEXT, operator TEXT, sensor_location TEXT, sample_rate INTEGER, record_type TEXT, false_trigger INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), UNIQUE(serial, timestamp) ); INSERT INTO events (id, serial, waveform_key, timestamp) VALUES ('legacy-id', 'BE11529', '01110000', '2026-04-01T12:00:00'); """) # Initialise SeismoDb against the old DB — migration should run. from sfm.database import SeismoDb db = SeismoDb(db_path) rows = db.query_events(serial="BE11529") assert len(rows) == 1 assert rows[0]["blastware_filename"] is None assert "blastware_filesize" in rows[0] assert "a5_pickle_filename" in rows[0] if __name__ == "__main__": if pytest is not None: pytest.main([__file__, "-v"]) else: # Standalone runner — does not require pytest. import inspect import tempfile import traceback as _tb passed = failed = 0 for _name, _fn in sorted(globals().items()): if not _name.startswith("test_") or not callable(_fn): continue try: _sig = inspect.signature(_fn) if "tmp_path" in _sig.parameters: with tempfile.TemporaryDirectory() as _td: _fn(Path(_td)) else: _fn() print(f"PASS {_name}") passed += 1 except Exception: print(f"FAIL {_name}") _tb.print_exc() failed += 1 print(f"\n{passed} passed, {failed} failed") sys.exit(0 if failed == 0 else 1)