Merge pull request 'feat: add waveform store handling' (#16) from sfm-waveform-store into main

Reviewed-on: #16
This commit was merged in pull request #16.
This commit is contained in:
2026-05-08 15:03:32 -04:00
19 changed files with 5188 additions and 462 deletions
+209
View File
@@ -0,0 +1,209 @@
"""
test_cache_invalidation.py — verify post-erase key-reuse correctness.
The device's event-key counter resets to 0x01110000 after every memory erase,
so a bare-key dedup (the old behaviour) silently treats a freshly-recorded
event 0 as if it were the previously-downloaded one. These tests exercise
the (key, timestamp)-based eviction logic in:
- bridges/ach_server.py (state-file migration + force flag)
- sfm/server.py (_LiveCache.set_events / set_waveform)
Run:
python tests/test_cache_invalidation.py
"""
from __future__ import annotations
import json
import os
import sys
import tempfile
from pathlib import Path
try:
import pytest
except ImportError:
pytest = None # type: ignore
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# ── ACH state migration ───────────────────────────────────────────────────────
def test_ach_state_legacy_migration(tmp_path: Path):
"""
Legacy v1 state with a `downloaded_keys` list is migrated on _load_state
to the v2 `downloaded_events` dict. All legacy keys come back with empty
timestamps so the (key, ts) compare in get_events() always falls through
to a fresh download.
"""
from bridges.ach_server import _load_state
state_path = tmp_path / "ach_state.json"
legacy = {
"BE11529": {
"downloaded_keys": ["01110000", "0111245a"],
"max_downloaded_key": "0111245a",
"last_seen": "2026-04-11T01:04:36",
"serial": "BE11529",
"peer": "63.43.212.232:51920",
},
}
state_path.write_text(json.dumps(legacy))
migrated = _load_state(state_path)
unit = migrated["BE11529"]
assert "downloaded_keys" not in unit
assert unit["downloaded_events"] == {
"01110000": "",
"0111245a": "",
}
# max_downloaded_key is preserved verbatim
assert unit["max_downloaded_key"] == "0111245a"
def test_ach_state_v2_passes_through(tmp_path: Path):
"""A v2 state file is returned verbatim — no migration touches it."""
from bridges.ach_server import _load_state
state_path = tmp_path / "ach_state.json"
v2 = {
"BE11529": {
"downloaded_events": {
"01110000": "2026-04-15T14:23:45",
"0111245a": "2026-04-16T09:01:12",
},
"max_downloaded_key": "0111245a",
"serial": "BE11529",
},
}
state_path.write_text(json.dumps(v2))
loaded = _load_state(state_path)
assert loaded["BE11529"]["downloaded_events"] == v2["BE11529"]["downloaded_events"]
def test_ach_state_missing_returns_empty(tmp_path: Path):
"""Nonexistent state path → empty dict (not an error)."""
from bridges.ach_server import _load_state
assert _load_state(tmp_path / "absent.json") == {}
# ── _LiveCache eviction ───────────────────────────────────────────────────────
def _ev(index: int, key: str, ts: str) -> dict:
return {"index": index, "waveform_key": key, "timestamp": ts}
def test_live_cache_set_events_no_eviction_when_keys_match():
"""No flush when incoming events match the cached (key, ts) at each index."""
from sfm.live_cache import LiveCache as _LiveCache
c = _LiveCache()
conn = "tcp:1.2.3.4:12345"
c.set_events(conn, 2, [_ev(0, "01110000", "2026-04-15T14:23:45"),
_ev(1, "0111245a", "2026-04-16T09:01:12")])
c.set_waveform(conn, 0, _ev(0, "01110000", "2026-04-15T14:23:45"))
# Same events again — must not flush.
c.set_events(conn, 2, [_ev(0, "01110000", "2026-04-15T14:23:45"),
_ev(1, "0111245a", "2026-04-16T09:01:12")])
assert c._events[conn][0] == 2
assert (conn, 0) in c._waveforms
def test_live_cache_set_events_flushes_on_post_erase_collision():
"""
Index 0 keeps the same key (01110000 reuses) but the timestamp differs
→ device was erased + re-recorded → flush all events + waveforms for the
device.
"""
from sfm.live_cache import LiveCache as _LiveCache
c = _LiveCache()
conn = "tcp:1.2.3.4:12345"
# First "session": index 0 key=01110000 ts=2026-04-15.
c.set_events(conn, 1, [_ev(0, "01110000", "2026-04-15T14:23:45")])
c.set_waveform(conn, 0, _ev(0, "01110000", "2026-04-15T14:23:45"))
assert (conn, 0) in c._waveforms
# Second "session" after erase: index 0 still key=01110000 but new ts.
c.set_events(conn, 1, [_ev(0, "01110000", "2026-05-06T12:34:56")])
# Stale waveform for index 0 must have been flushed by the eviction path
# before the new event was inserted. The new events list IS in cache but
# the cached waveform from the prior session is gone.
assert (conn, 0) not in c._waveforms
assert c._events[conn][1][0]["timestamp"] == "2026-05-06T12:34:56"
def test_live_cache_set_waveform_flushes_on_mismatch():
"""set_waveform alone should also evict when (key, ts) differs."""
from sfm.live_cache import LiveCache as _LiveCache
c = _LiveCache()
conn = "tcp:1.2.3.4:12345"
c.set_waveform(conn, 0, _ev(0, "01110000", "2026-04-15T14:23:45"))
c.set_waveform(conn, 1, _ev(1, "0111245a", "2026-04-16T09:01:12"))
# Index 0 swap: same key, new timestamp.
c.set_waveform(conn, 0, _ev(0, "01110000", "2026-05-06T12:34:56"))
# Index 1's stale waveform must be flushed — keeping it would mix eras.
assert (conn, 1) not in c._waveforms
# The newly-inserted index 0 entry is what's there.
assert c._waveforms[(conn, 0)]["timestamp"] == "2026-05-06T12:34:56"
def test_live_cache_partial_signature_does_not_flush():
"""
If incoming event lacks waveform_key OR timestamp, we cannot prove a
mismatch — eviction must NOT trigger. Avoids spurious flushes from
legacy / partial event shapes.
"""
from sfm.live_cache import LiveCache as _LiveCache
c = _LiveCache()
conn = "tcp:1.2.3.4:12345"
c.set_waveform(conn, 0, _ev(0, "01110000", "2026-04-15T14:23:45"))
# Incoming entry missing the timestamp — cannot prove a mismatch.
c.set_waveform(conn, 0, {"index": 0, "waveform_key": "01110000"})
# Cache should contain the new entry; the implementation overwrites
# the index-0 row but does NOT flush other indices. Since there are no
# other indices in this test, just check the entry exists.
assert (conn, 0) in c._waveforms
if __name__ == "__main__":
if pytest is not None:
pytest.main([__file__, "-v"])
else:
import inspect
import traceback as _tb
passed = failed = 0
for _name, _fn in sorted(globals().items()):
if not _name.startswith("test_") or not callable(_fn):
continue
try:
_sig = inspect.signature(_fn)
if "tmp_path" in _sig.parameters:
with tempfile.TemporaryDirectory() as _td:
_fn(Path(_td))
else:
_fn()
print(f"PASS {_name}")
passed += 1
except Exception:
print(f"FAIL {_name}")
_tb.print_exc()
failed += 1
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)
+348
View File
@@ -0,0 +1,348 @@
"""
test_event_file_io.py — sidecar write/read/patch round-trips,
WaveformStore sidecar integration, and the BW-import path.
Run:
python tests/test_event_file_io.py
"""
from __future__ import annotations
import json
import os
import sys
import tempfile
from pathlib import Path
try:
import pytest
except ImportError:
pytest = None # type: ignore
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from minimateplus import event_file_io
from minimateplus.framing import S3Frame
from minimateplus.models import Event, Timestamp
# ── Fixtures shared with test_waveform_store.py ───────────────────────────────
def _make_synthetic_event() -> tuple[Event, list[S3Frame]]:
"""Same shape as tests/test_waveform_store.py — minimum viable Event +
A5 stream that makes write_blastware_file emit a parseable file.
STRT is exactly 21 bytes; rectime_seconds lands at byte 18 to match
`_decode_a5_waveform`'s expected layout (which is also what
`read_blastware_file()` reads back)."""
key4 = bytes.fromhex("01110000")
rectime = 3
strt = bytearray(21)
strt[0:4] = b"STRT"
strt[4:6] = b"\xff\xfe"
strt[6:10] = key4 # end_key (per data[23:27] in CLAUDE.md)
strt[10:14] = key4 # start_key (per data[27:31])
strt[18] = rectime
strt = bytes(strt)
probe_data = bytes(7) + strt + bytes(32)
probe = S3Frame(sub=0xA5, page_hi=0x10, page_lo=0x00, data=probe_data,
checksum_valid=True, chk_byte=0x00)
sample = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x10,
data=bytes(7) + bytes(0x0200), checksum_valid=True,
chk_byte=0x00)
# Build a valid 26-byte footer (0e 08 + ts1 + ts2 + 6 const + 2 crc)
# and embed it at the END of the terminator's contribution so
# write_blastware_file finds the real `0e 08` marker rather than
# falling back to slicing the last 26 bytes of zero garbage.
# ts byte order: [day][month][year_HI][year_LO][0x00][hour][min][sec]
footer = (
b"\x0e\x08"
+ bytes([6, 5, 0x07, 0xea, 0, 12, 34, 56]) # ts1 = 2026-05-06 12:34:56
+ bytes([6, 5, 0x07, 0xea, 0, 12, 35, 6]) # ts2 = ts1 + ~10s
+ b"\x00\x01\x00\x02\x00\x00"
+ b"\x00\x00"
)
assert len(footer) == 26
term_data = bytes(11) + bytes(38) + footer # 11 prefix + 38 pad + 26 footer = 75
term = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x00,
data=term_data, checksum_valid=True, chk_byte=0x00)
ev = Event(index=0)
ev._waveform_key = key4
ev.timestamp = Timestamp(
raw=b"", flag=0x10, year=2026, unknown_byte=0,
month=5, day=6, hour=12, minute=34, second=56,
)
ev.rectime_seconds = rectime
ev.record_type = "Waveform"
ev._a5_frames = [probe, sample, term]
return ev, [probe, sample, term]
# ── Sidecar write/read round-trip ─────────────────────────────────────────────
def test_event_to_sidecar_dict_shape():
ev, _ = _make_synthetic_event()
d = event_file_io.event_to_sidecar_dict(
ev,
serial="BE11529",
blastware_filename="M529LKIQ.7M0W",
blastware_filesize=1024,
blastware_sha256="abcd" * 16,
source_kind="sfm-live",
a5_pickle_filename="M529LKIQ.7M0W.a5.pkl",
)
assert d["schema_version"] == event_file_io.SCHEMA_VERSION
assert d["kind"] == event_file_io.SIDECAR_KIND
assert d["event"]["serial"] == "BE11529"
assert d["event"]["timestamp"] == "2026-05-06T12:34:56"
assert d["event"]["waveform_key"] == "01110000"
assert d["blastware"]["sha256"] == "abcd" * 16
assert d["source"]["kind"] == "sfm-live"
assert d["review"] == {
"false_trigger": False, "reviewer": None,
"reviewed_at": None, "notes": "",
}
assert d["extensions"] == {}
def test_sidecar_write_and_read_round_trip(tmp_path: Path):
ev, _ = _make_synthetic_event()
path = tmp_path / "M529LKIQ.7M0W.sfm.json"
src = event_file_io.event_to_sidecar_dict(
ev, serial="BE11529",
blastware_filename="M529LKIQ.7M0W", blastware_filesize=1024,
blastware_sha256="x" * 64, source_kind="sfm-ach",
)
event_file_io.write_sidecar(path, src)
loaded = event_file_io.read_sidecar(path)
assert loaded["event"] == src["event"]
assert loaded["blastware"] == src["blastware"]
assert loaded["source"]["kind"] == "sfm-ach"
def test_sidecar_rejects_unsupported_schema_version(tmp_path: Path):
path = tmp_path / "future.sfm.json"
path.write_text(json.dumps({
"schema_version": event_file_io.SCHEMA_VERSION + 1,
"kind": event_file_io.SIDECAR_KIND,
}))
try:
event_file_io.read_sidecar(path)
except ValueError as exc:
assert "schema_version" in str(exc)
return
raise AssertionError("read_sidecar should have rejected unsupported version")
def test_sidecar_extensions_survive_round_trip(tmp_path: Path):
"""Forward-compat: unknown keys inside `extensions` survive a r/w cycle."""
ev, _ = _make_synthetic_event()
path = tmp_path / "x.sfm.json"
d = event_file_io.event_to_sidecar_dict(
ev, serial="BE11529",
blastware_filename="X", blastware_filesize=0, blastware_sha256="",
source_kind="sfm-live",
extensions={"vendor.acme.gps": {"lat": 40.7, "lon": -74.0}},
)
event_file_io.write_sidecar(path, d)
back = event_file_io.read_sidecar(path)
assert back["extensions"]["vendor.acme.gps"]["lat"] == 40.7
def test_sidecar_patch_review_stamps_reviewed_at(tmp_path: Path):
ev, _ = _make_synthetic_event()
path = tmp_path / "patch.sfm.json"
event_file_io.write_sidecar(
path,
event_file_io.event_to_sidecar_dict(
ev, serial="BE11529",
blastware_filename="X", blastware_filesize=0, blastware_sha256="",
source_kind="sfm-live",
),
)
new = event_file_io.patch_sidecar(
path,
review={"false_trigger": True, "notes": "truck thump", "reviewer": "brian"},
)
assert new["review"]["false_trigger"] is True
assert new["review"]["notes"] == "truck thump"
assert new["review"]["reviewer"] == "brian"
assert new["review"]["reviewed_at"], "reviewed_at must be auto-stamped"
on_disk = event_file_io.read_sidecar(path)
assert on_disk["review"]["false_trigger"] is True
# ── WaveformStore integration ─────────────────────────────────────────────────
def test_waveform_store_save_writes_sidecar(tmp_path: Path):
from sfm.waveform_store import WaveformStore
store = WaveformStore(tmp_path / "waveforms")
ev, frames = _make_synthetic_event()
rec = store.save(ev, serial="BE11529", a5_frames=frames, source_kind="sfm-live")
assert rec["sidecar_filename"].endswith(".sfm.json")
assert rec["sha256"] and len(rec["sha256"]) == 64
sc = store.load_sidecar("BE11529", rec["filename"])
assert sc is not None
assert sc["blastware"]["filename"] == rec["filename"]
assert sc["blastware"]["sha256"] == rec["sha256"]
assert sc["source"]["kind"] == "sfm-live"
# The .a5.pkl reference should match the actual filename on disk.
assert sc["source"]["a5_pickle_filename"] == rec["a5_pickle_filename"]
def test_waveform_store_save_preserves_review_across_resave(tmp_path: Path):
"""Re-saving the same event must preserve a user's prior review edits."""
from sfm.waveform_store import WaveformStore
store = WaveformStore(tmp_path / "waveforms")
ev, frames = _make_synthetic_event()
rec = store.save(ev, serial="BE11529", a5_frames=frames)
# User flips false_trigger and adds a note.
store.patch_sidecar(
"BE11529", rec["filename"],
review={"false_trigger": True, "notes": "hello"},
)
# A second save (e.g. Force refresh re-download) must keep those edits.
store.save(ev, serial="BE11529", a5_frames=frames)
sc = store.load_sidecar("BE11529", rec["filename"])
assert sc["review"]["false_trigger"] is True
assert sc["review"]["notes"] == "hello"
def test_waveform_store_patch_sidecar_returns_none_when_missing(tmp_path: Path):
from sfm.waveform_store import WaveformStore
store = WaveformStore(tmp_path / "waveforms")
out = store.patch_sidecar("BE99999", "no.such.W", review={"notes": "x"})
assert out is None
# ── DB integration: sidecar_filename column + update_event_review ─────────────
def test_seismodb_persists_sidecar_filename_and_review_sync(tmp_path: Path):
from sfm.database import SeismoDb
db = SeismoDb(tmp_path / "seismo_relay.db")
ev, _ = _make_synthetic_event()
rec = {
"filename": "M529LKIQ.7M0W",
"filesize": 8708,
"a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl",
"sidecar_filename": "M529LKIQ.7M0W.sfm.json",
}
inserted, _ = db.insert_events(
[ev], serial="BE11529",
waveform_records={ev._waveform_key.hex(): rec},
)
assert inserted == 1
rows = db.query_events(serial="BE11529")
row = rows[0]
assert row["sidecar_filename"] == rec["sidecar_filename"]
# update_event_review keeps false_trigger column in sync with sidecar.
assert db.update_event_review(row["id"], {"false_trigger": True}) is True
again = db.get_event(row["id"])
assert again["false_trigger"] == 1
# Empty review block (no false_trigger key) → no-op but row exists.
assert db.update_event_review(row["id"], {"notes": "x"}) is True
# ── BW-file reader (read_blastware_file) ─────────────────────────────────────
def test_read_blastware_file_round_trip(tmp_path: Path):
"""write → read → key/timestamp/rectime survive."""
from minimateplus.blastware_file import write_blastware_file, blastware_filename
ev, frames = _make_synthetic_event()
bw_path = tmp_path / blastware_filename(ev, "BE11529")
write_blastware_file(ev, frames, bw_path)
parsed = event_file_io.read_blastware_file(bw_path)
assert parsed._waveform_key == ev._waveform_key
assert parsed.rectime_seconds == ev.rectime_seconds
# Timestamp lands via the footer; year/month/day/hour/min/sec all survive.
assert parsed.timestamp is not None
assert parsed.timestamp.year == ev.timestamp.year
assert parsed.timestamp.month == ev.timestamp.month
assert parsed.timestamp.day == ev.timestamp.day
assert parsed.timestamp.hour == ev.timestamp.hour
assert parsed.timestamp.minute == ev.timestamp.minute
assert parsed.timestamp.second == ev.timestamp.second
# No A5 source recoverable.
assert parsed._a5_frames is None
# Peaks computed from samples (synthetic = zero samples → zero peaks).
assert parsed.peak_values is not None
assert parsed.peak_values.peak_vector_sum == 0.0
def test_save_imported_bw_round_trip(tmp_path: Path):
"""save_imported_bw stores a copy + sidecar with source.kind = bw-import."""
from minimateplus.blastware_file import write_blastware_file, blastware_filename
from sfm.waveform_store import WaveformStore
# Produce a BW file outside the store.
ev, frames = _make_synthetic_event()
fname = blastware_filename(ev, "BE11529")
src = tmp_path / fname
write_blastware_file(ev, frames, src)
store = WaveformStore(tmp_path / "waveforms")
parsed_ev, rec = store.save_imported_bw(src.read_bytes(), source_path=src)
assert rec["filename"] == fname
assert rec["a5_pickle_filename"] is None # no A5 source for BW imports
sc = store.load_sidecar("BE11529", fname)
assert sc is not None
assert sc["source"]["kind"] == "bw-import"
assert sc["source"]["a5_pickle_filename"] is None
# The stored binary should match the source byte-for-byte (we just copied).
stored_path = store.open_blastware("BE11529", fname)
assert stored_path is not None
assert stored_path.read_bytes() == src.read_bytes()
if __name__ == "__main__":
if pytest is not None:
pytest.main([__file__, "-v"])
else:
import inspect
import traceback as _tb
passed = failed = 0
for _name, _fn in sorted(globals().items()):
if not _name.startswith("test_") or not callable(_fn):
continue
try:
_sig = inspect.signature(_fn)
if "tmp_path" in _sig.parameters:
with tempfile.TemporaryDirectory() as _td:
_fn(Path(_td))
else:
_fn()
print(f"PASS {_name}")
passed += 1
except Exception:
print(f"FAIL {_name}")
_tb.print_exc()
failed += 1
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)
+296
View File
@@ -0,0 +1,296 @@
"""
test_event_hdf5.py — HDF5 codec round-trip + plot.v1 JSON shape sanity.
Run:
python tests/test_event_hdf5.py
"""
from __future__ import annotations
import os
import sys
import tempfile
from pathlib import Path
try:
import pytest
except ImportError:
pytest = None # type: ignore
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from minimateplus.framing import S3Frame
from minimateplus.models import Event, PeakValues, ProjectInfo, Timestamp
from sfm import event_hdf5
# ── Fixtures ──────────────────────────────────────────────────────────────────
def _make_event_with_samples(n: int = 256) -> Event:
"""An Event with synthetic int16 ADC samples on all four channels.
Channel content:
- Tran: ramp from -16384 to +16383 (peak ≈ 5 in/s for Normal range)
- Vert: full-scale dirac at index n//2 (peak = 10 in/s)
- Long: zeros
- MicL: small ramp
Peak values are set on the event the way the device's 0C record
would supply them — used by the HDF5 writer for the mic per-count
factor.
"""
tran = [int((i / max(n - 1, 1)) * 32767 - 16384) for i in range(n)]
vert = [0] * n
if n:
vert[n // 2] = 32767
long_ = [0] * n
mic = [int((i / max(n - 1, 1)) * 5000) for i in range(n)]
ev = Event(index=0)
ev._waveform_key = bytes.fromhex("01110000")
ev.timestamp = Timestamp(
raw=b"", flag=0x10,
year=2026, unknown_byte=0, month=5, day=7,
hour=10, minute=0, second=0,
)
ev.record_type = "Waveform"
ev.sample_rate = 1024
ev.pretrig_samples = n // 4
ev.total_samples = n
ev.rectime_seconds = n / 1024.0
ev.raw_samples = {"Tran": tran, "Vert": vert, "Long": long_, "MicL": mic}
ev.peak_values = PeakValues(
tran=5.0, vert=10.0, long=0.0,
peak_vector_sum=10.0, micl=0.001,
)
ev.project_info = ProjectInfo(
project="TestProj", client="TestClient",
operator="brian", sensor_location="loc-A",
)
return ev
# ── HDF5 round-trip ───────────────────────────────────────────────────────────
def test_hdf5_round_trip_preserves_metadata(tmp_path: Path):
ev = _make_event_with_samples()
h5 = tmp_path / "test.h5"
event_hdf5.write_event_hdf5(
h5, ev, serial="BE11529", geo_range="normal",
)
data = event_hdf5.read_event_hdf5(h5)
a = data["attrs"]
assert a["schema_version"] == event_hdf5.SCHEMA_VERSION
assert a["kind"] == event_hdf5.HDF5_KIND
assert a["serial"] == "BE11529"
assert a["waveform_key"] == "01110000"
assert a["sample_rate"] == 1024
assert a["pretrig_samples"] == 64
assert a["geo_range"] == "normal"
assert a["geo_full_scale_ips"] == 10.0
assert a["project"] == "TestProj"
assert a["client"] == "TestClient"
assert a["operator"] == "brian"
# Float attrs may round-trip with tiny precision noise.
assert abs(a["peak_tran_ips"] - 5.0) < 1e-6
assert abs(a["peak_vert_ips"] - 10.0) < 1e-6
def test_hdf5_samples_in_physical_units_normal_range(tmp_path: Path):
"""Vert hits ADC full-scale (32767) → with Normal range FS=10 in/s,
the HDF5 sample value should be ≈ 10 * 32767/32768 in/s."""
ev = _make_event_with_samples()
h5 = tmp_path / "n.h5"
event_hdf5.write_event_hdf5(h5, ev, serial="BE11529", geo_range="normal")
data = event_hdf5.read_event_hdf5(h5)
vert = data["samples"]["Vert"]
assert vert.dtype.name == "float32"
assert max(abs(v) for v in vert) > 9.99 # full-scale ≈ 10.0
# The dirac was at n//2 → 32767 ADC counts.
expected_peak = 10.0 * 32767 / 32768
assert abs(max(vert) - expected_peak) < 1e-3
def test_hdf5_samples_in_physical_units_sensitive_range(tmp_path: Path):
"""Same fixture but Sensitive range → full-scale 1.250 in/s."""
ev = _make_event_with_samples()
h5 = tmp_path / "s.h5"
event_hdf5.write_event_hdf5(h5, ev, serial="BE11529", geo_range="sensitive")
data = event_hdf5.read_event_hdf5(h5)
vert = data["samples"]["Vert"]
expected_peak = 1.250 * 32767 / 32768
assert abs(max(vert) - expected_peak) < 1e-4
def test_hdf5_includes_int16_samples(tmp_path: Path):
ev = _make_event_with_samples()
h5 = tmp_path / "i.h5"
event_hdf5.write_event_hdf5(h5, ev, serial="BE11529")
data = event_hdf5.read_event_hdf5(h5)
assert data["samples_int16"] is not None
assert "Tran" in data["samples_int16"]
assert data["samples_int16"]["Vert"].dtype.name == "int16"
def test_hdf5_rejects_unsupported_schema(tmp_path: Path):
"""Round-tripping with a tampered schema_version raises ValueError."""
import h5py
h5 = tmp_path / "future.h5"
with h5py.File(h5, "w") as f:
f.attrs["schema_version"] = 99
f.attrs["kind"] = event_hdf5.HDF5_KIND
try:
event_hdf5.read_event_hdf5(h5)
except ValueError as exc:
assert "schema_version" in str(exc)
return
raise AssertionError("read_event_hdf5 should reject unsupported schema_version")
# ── plot.v1 JSON shape ────────────────────────────────────────────────────────
def test_event_to_plot_json_shape():
ev = _make_event_with_samples()
j = event_hdf5.event_to_plot_json(ev, serial="BE11529", geo_range="normal")
assert j["schema"] == "sfm.plot.v1"
assert j["serial"] == "BE11529"
assert j["geo_range"] == "normal"
assert j["geo_full_scale_ips"] == 10.0
assert j["trigger_ms"] == 0.0
t = j["time_axis"]
assert t["sample_rate"] == 1024
assert t["pretrig_samples"] == 64
assert t["n_samples"] == 256
# t0_ms = -pretrig * dt_ms = -64 * (1000/1024) ≈ -62.5
assert abs(t["t0_ms"] - (-64 * 1000 / 1024)) < 1e-3
assert abs(t["dt_ms"] - (1000 / 1024)) < 1e-6
chans = j["channels"]
for name in ("Tran", "Vert", "Long", "MicL"):
assert name in chans, f"missing channel: {name}"
assert chans[name]["unit"] in ("in/s", "psi")
assert "values" in chans[name]
assert "peak" in chans[name]
assert "peak_t_ms" in chans[name]
# Values are in physical units: Vert peak ≈ 10 in/s.
assert max(chans["Vert"]["values"]) > 9.99
def test_event_to_plot_json_peak_t_ms_locates_dirac():
"""The Vert channel's full-scale dirac at sample n//2 should produce
peak_t_ms = (n//2 - pretrig) * dt_ms."""
ev = _make_event_with_samples(n=256)
j = event_hdf5.event_to_plot_json(ev, serial="BE11529")
expected = (128 - 64) * (1000 / 1024) # = 62.5 ms
assert abs(j["channels"]["Vert"]["peak_t_ms"] - expected) < 1e-2
def test_plot_json_from_hdf5_round_trip(tmp_path: Path):
"""plot_json_from_hdf5 produces the same shape as event_to_plot_json."""
ev = _make_event_with_samples()
h5 = tmp_path / "rt.h5"
event_hdf5.write_event_hdf5(h5, ev, serial="BE11529", geo_range="normal")
j_disk = event_hdf5.plot_json_from_hdf5(h5, event_id="abc-123")
j_mem = event_hdf5.event_to_plot_json(ev, serial="BE11529", geo_range="normal", event_id="abc-123")
# Top-level shape parity
for k in ("schema", "serial", "geo_range", "geo_full_scale_ips",
"trigger_ms", "record_type", "waveform_key", "event_id"):
assert j_disk.get(k) == j_mem.get(k), f"mismatch on {k}"
assert j_disk["time_axis"]["sample_rate"] == j_mem["time_axis"]["sample_rate"]
assert j_disk["time_axis"]["n_samples"] == j_mem["time_axis"]["n_samples"]
# Sample values must match within float32 precision.
for ch in ("Tran", "Vert", "Long", "MicL"):
a = j_disk["channels"][ch]["values"]
b = j_mem["channels"][ch]["values"]
assert len(a) == len(b)
if a:
mx = max(abs(x - y) for x, y in zip(a, b))
assert mx < 1e-3, f"{ch}: max diff {mx}"
# ── WaveformStore integration with HDF5 ───────────────────────────────────────
def _make_synthetic_event_for_save() -> tuple[Event, list[S3Frame]]:
"""Same flavour as test_event_file_io.py but ensures _make_event_with_samples
is also wired into the BW write path so we can exercise WaveformStore.save."""
ev = _make_event_with_samples(n=128)
# Build a minimum 3-frame A5 stream (probe + sample + term) — same
# shape used in the other test files. The encoder only really needs
# the STRT in the probe + a non-zero body and a footer in the term.
key4 = ev._waveform_key
rectime = int(ev.rectime_seconds or 0) or 1
strt = bytearray(21)
strt[0:4] = b"STRT"
strt[4:6] = b"\xff\xfe"
strt[6:10] = key4
strt[10:14] = key4
strt[18] = rectime
probe = S3Frame(sub=0xA5, page_hi=0x10, page_lo=0x00,
data=bytes(7) + bytes(strt) + bytes(32),
checksum_valid=True, chk_byte=0x00)
sample = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x10,
data=bytes(7) + bytes(0x0200), checksum_valid=True, chk_byte=0x00)
footer = (
b"\x0e\x08"
+ bytes([7, 5, 0x07, 0xea, 0, 10, 0, 0])
+ bytes([7, 5, 0x07, 0xea, 0, 10, 0, 1])
+ b"\x00\x01\x00\x02\x00\x00\x00\x00"
)
term = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x00,
data=bytes(11) + bytes(38) + footer, checksum_valid=True, chk_byte=0x00)
ev._a5_frames = [probe, sample, term]
return ev, [probe, sample, term]
def test_waveform_store_save_emits_hdf5(tmp_path: Path):
from sfm.waveform_store import WaveformStore
store = WaveformStore(tmp_path / "waveforms")
ev, frames = _make_synthetic_event_for_save()
rec = store.save(ev, serial="BE11529", a5_frames=frames, geo_range="normal")
assert rec["hdf5_filename"], "hdf5_filename should be present in save() record"
h5 = store.hdf5_path_for("BE11529", rec["filename"])
assert h5.exists(), "WaveformStore.save should produce a .h5 file"
# The HDF5 round-trip should match the event's metadata.
data = event_hdf5.read_event_hdf5(h5)
assert data["attrs"]["serial"] == "BE11529"
assert data["attrs"]["geo_range"] == "normal"
if __name__ == "__main__":
if pytest is not None:
pytest.main([__file__, "-v"])
else:
import inspect
import traceback as _tb
passed = failed = 0
for _name, _fn in sorted(globals().items()):
if not _name.startswith("test_") or not callable(_fn):
continue
try:
_sig = inspect.signature(_fn)
if "tmp_path" in _sig.parameters:
with tempfile.TemporaryDirectory() as _td:
_fn(Path(_td))
else:
_fn()
print(f"PASS {_name}")
passed += 1
except Exception:
print(f"FAIL {_name}")
_tb.print_exc()
failed += 1
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)
+302
View File
@@ -0,0 +1,302 @@
"""
test_waveform_store.py — unit tests for sfm/waveform_store.py and the
SeismoDb columns + insert_events upsert path that the store depends on.
These tests exercise the *store + DB plumbing* in isolation — they do not
re-test write_blastware_file (covered separately) and do not require a live
device or a wire capture.
Run:
python -m pytest tests/test_waveform_store.py -v
"""
from __future__ import annotations
import os
import sys
import datetime
from pathlib import Path
try:
import pytest
except ImportError: # allow running standalone without pytest installed
pytest = None # type: ignore
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from minimateplus.framing import S3Frame
from minimateplus.models import Event, Timestamp
# ── Test fixtures ──────────────────────────────────────────────────────────────
def _make_synthetic_event() -> tuple[Event, list[S3Frame]]:
"""
Build a minimal Event + a 3-frame A5 stream that satisfies
write_blastware_file's STRT-extraction path.
Frame 0 (probe): contains a STRT record at the canonical position so
write_blastware_file finds it without falling back.
Frame 1 (sample): 0x0200 bytes of zeros at page_key=0x0010 (sample marker).
Frame 2 (TERM): page_key=0x0000 marks the terminator.
"""
key4 = bytes.fromhex("01110000")
rectime = 3
strt = b"STRT" + b"\xff\xfe" + key4 + key4 + bytes(7) + bytes([rectime])
# Probe payload prefix: 7 zero bytes then STRT (matches blastware_file._strip
# logic which looks for STRT in data[7:]). Tail with 32 zero bytes of fake
# body so reconstruction has something to slice.
probe_data = bytes(7) + strt + bytes(32)
probe = S3Frame(sub=0xA5, page_hi=0x10, page_lo=0x00, data=probe_data,
checksum_valid=True, chk_byte=0x00)
sample = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x10,
data=bytes(7) + bytes(0x0200), checksum_valid=True,
chk_byte=0x00)
term = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x00,
data=bytes(7) + bytes(64), checksum_valid=True,
chk_byte=0x00)
ev = Event(index=0)
ev._waveform_key = key4
ev.timestamp = Timestamp(
raw=b"",
flag=0x10,
year=2026,
unknown_byte=0,
month=5,
day=6,
hour=12,
minute=34,
second=56,
)
ev.rectime_seconds = rectime
ev.record_type = "Waveform"
ev._a5_frames = [probe, sample, term]
return ev, [probe, sample, term]
# ── Frame round-trip ───────────────────────────────────────────────────────────
def test_frame_dict_round_trip():
"""_frame_to_dict and _dict_to_frame must round-trip every field."""
from sfm.waveform_store import _dict_to_frame, _frame_to_dict
f = S3Frame(
sub=0xA5, page_hi=0x12, page_lo=0x34,
data=b"\x10\x02\x00\xab\xcd",
checksum_valid=False,
chk_byte=0x42,
)
d = _frame_to_dict(f)
g = _dict_to_frame(d)
assert g.sub == f.sub
assert g.page_hi == f.page_hi
assert g.page_lo == f.page_lo
assert g.data == f.data
assert g.checksum_valid == f.checksum_valid
assert g.chk_byte == f.chk_byte
# ── Store save/load round-trip ─────────────────────────────────────────────────
def test_waveform_store_save_load_round_trip(tmp_path: Path):
"""save() writes both files; load_a5() returns equivalent frames."""
from sfm.waveform_store import WaveformStore
store = WaveformStore(tmp_path / "waveforms")
ev, frames = _make_synthetic_event()
rec = store.save(ev, serial="BE11529", a5_frames=frames)
assert rec["filename"].startswith("M529")
assert rec["filesize"] > 0
assert rec["a5_pickle_filename"] == rec["filename"] + ".a5.pkl"
bw_path = store.open_blastware("BE11529", rec["filename"])
assert bw_path is not None
assert bw_path.exists()
assert bw_path.stat().st_size == rec["filesize"]
# Sidecar exists and round-trips
loaded = store.load_a5("BE11529", rec["filename"])
assert loaded is not None
assert len(loaded) == len(frames)
for orig, got in zip(frames, loaded):
assert got.sub == orig.sub
assert got.page_hi == orig.page_hi
assert got.page_lo == orig.page_lo
assert got.data == orig.data
def test_waveform_store_missing_returns_none(tmp_path: Path):
"""open_blastware / load_a5 return None for nonexistent entries."""
from sfm.waveform_store import WaveformStore
store = WaveformStore(tmp_path / "waveforms")
assert store.open_blastware("BE99999", "no_such.7M0W") is None
assert store.load_a5("BE99999", "no_such.7M0W") is None
def test_waveform_store_idempotent_save(tmp_path: Path):
"""Saving the same event twice produces the same event-file bytes."""
from sfm.waveform_store import WaveformStore
store = WaveformStore(tmp_path / "waveforms")
ev, frames = _make_synthetic_event()
rec1 = store.save(ev, serial="BE11529", a5_frames=frames)
bw_path = store.open_blastware("BE11529", rec1["filename"])
bytes1 = bw_path.read_bytes()
rec2 = store.save(ev, serial="BE11529", a5_frames=frames)
bytes2 = bw_path.read_bytes()
assert rec1["filename"] == rec2["filename"]
assert bytes1 == bytes2
# ── DB integration ────────────────────────────────────────────────────────────
def test_seismodb_persists_waveform_columns(tmp_path: Path):
"""insert_events writes the new columns when waveform_records is supplied."""
from sfm.database import SeismoDb
db = SeismoDb(tmp_path / "seismo_relay.db")
ev, _ = _make_synthetic_event()
rec = {
"filename": "M529LKIQ.7M0W",
"filesize": 8708,
"a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl",
}
inserted, skipped = db.insert_events(
[ev],
serial="BE11529",
waveform_records={ev._waveform_key.hex(): rec},
)
assert inserted == 1
assert skipped == 0
rows = db.query_events(serial="BE11529")
assert len(rows) == 1
row = rows[0]
assert row["blastware_filename"] == rec["filename"]
assert row["blastware_filesize"] == rec["filesize"]
assert row["a5_pickle_filename"] == rec["a5_pickle_filename"]
# get_event by id returns the same fields
row2 = db.get_event(row["id"])
assert row2 is not None
assert row2["blastware_filename"] == rec["filename"]
def test_seismodb_dedup_upserts_waveform_fields(tmp_path: Path):
"""Re-inserting the same (serial, timestamp) refreshes waveform fields."""
from sfm.database import SeismoDb
db = SeismoDb(tmp_path / "seismo_relay.db")
ev, _ = _make_synthetic_event()
db.insert_events([ev], serial="BE11529") # no waveform record yet
rows = db.query_events(serial="BE11529")
assert rows[0]["blastware_filename"] is None
rec = {
"filename": "M529LKIQ.7M0W",
"filesize": 4242,
"a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl",
}
inserted, skipped = db.insert_events(
[ev],
serial="BE11529",
waveform_records={ev._waveform_key.hex(): rec},
)
assert inserted == 0 # dedup'd
assert skipped == 1
rows = db.query_events(serial="BE11529")
assert rows[0]["blastware_filename"] == rec["filename"]
assert rows[0]["blastware_filesize"] == 4242
def test_seismodb_migration_adds_columns(tmp_path: Path):
"""An existing DB without the new columns gets them added on init."""
import sqlite3
db_path = tmp_path / "old.db"
# Build a "v0" events table without the new columns.
with sqlite3.connect(str(db_path)) as conn:
conn.executescript("""
CREATE TABLE events (
id TEXT PRIMARY KEY,
serial TEXT NOT NULL,
waveform_key TEXT NOT NULL,
session_id TEXT,
timestamp TEXT,
tran_ppv REAL,
vert_ppv REAL,
long_ppv REAL,
peak_vector_sum REAL,
mic_ppv REAL,
project TEXT,
client TEXT,
operator TEXT,
sensor_location TEXT,
sample_rate INTEGER,
record_type TEXT,
false_trigger INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(serial, timestamp)
);
INSERT INTO events
(id, serial, waveform_key, timestamp)
VALUES
('legacy-id', 'BE11529', '01110000',
'2026-04-01T12:00:00');
""")
# Initialise SeismoDb against the old DB — migration should run.
from sfm.database import SeismoDb
db = SeismoDb(db_path)
rows = db.query_events(serial="BE11529")
assert len(rows) == 1
assert rows[0]["blastware_filename"] is None
assert "blastware_filesize" in rows[0]
assert "a5_pickle_filename" in rows[0]
if __name__ == "__main__":
if pytest is not None:
pytest.main([__file__, "-v"])
else:
# Standalone runner — does not require pytest.
import inspect
import tempfile
import traceback as _tb
passed = failed = 0
for _name, _fn in sorted(globals().items()):
if not _name.startswith("test_") or not callable(_fn):
continue
try:
_sig = inspect.signature(_fn)
if "tmp_path" in _sig.parameters:
with tempfile.TemporaryDirectory() as _td:
_fn(Path(_td))
else:
_fn()
print(f"PASS {_name}")
passed += 1
except Exception:
print(f"FAIL {_name}")
_tb.print_exc()
failed += 1
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)