c641d5fc10
### Added
- **Layered event storage architecture.** Each event now lands as four
files in the per-serial waveform store, each with a clear role:
- `<filename>` — the Blastware-readable binary (BW file). Untouched.
- `<filename>.a5.pkl` — the raw 5A frames (regenerative source).
- `<filename>.h5` — clean per-channel waveform arrays in physical
units (in/s for geo, psi for mic) plus event metadata (HDF5 with
gzip compression). This is the canonical format for downstream
analysis tools.
- `<filename>.sfm.json` — the modern review/metadata sidecar (peaks,
project, source provenance, review state, extensions).
SQLite (`seismo_relay.db`) is the searchable index over all four.
- **Plot-ready waveform JSON (`sfm.plot.v1`).** The `/device/event/{idx}/waveform`
and `/db/events/{id}/waveform.json` endpoints now return samples in
physical units with explicit time-axis metadata, peak markers, and
per-channel unit hints — no more guessing the ADC-to-velocity scale
client-side. The webapp waveform viewer was rewritten to consume
this shape.
- **In-app waveform viewer accuracy fix.** The standalone SFM webapp
viewer was scaling geophone amplitudes by `geoAdcScale / 32767`
(≈ 6.206 / 32767), where `geoAdcScale = 6.206053` is the device's
*in/s per V* hardware constant — not the ADC-counts-to-velocity
factor. This silently scaled every plot ~38% too low for Normal-range
geophones (the correct full-scale is 10.0 in/s, or 1.25 in/s for
Sensitive). Conversion is now done server-side using the geo_range
from compliance config; the client just plots.
- New `sfm/event_hdf5.py` module: `write_event_hdf5()`,
`read_event_hdf5()`, plus a plot-JSON helper.
- Backfill script extended to also emit `.h5` for existing events.
### Dependencies
- Added `h5py>=3.10` and `numpy>=1.24` for the HDF5 storage layer.
- Added `python-multipart>=0.0.7` (required by FastAPI for the
`/db/import/blastware_file` endpoint introduced in this release).
297 lines
11 KiB
Python
297 lines
11 KiB
Python
"""
|
|
test_event_hdf5.py — HDF5 codec round-trip + plot.v1 JSON shape sanity.
|
|
|
|
Run:
|
|
python tests/test_event_hdf5.py
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import pytest
|
|
except ImportError:
|
|
pytest = None # type: ignore
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from minimateplus.framing import S3Frame
|
|
from minimateplus.models import Event, PeakValues, ProjectInfo, Timestamp
|
|
from sfm import event_hdf5
|
|
|
|
|
|
# ── Fixtures ──────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _make_event_with_samples(n: int = 256) -> Event:
|
|
"""An Event with synthetic int16 ADC samples on all four channels.
|
|
|
|
Channel content:
|
|
- Tran: ramp from -16384 to +16383 (peak ≈ 5 in/s for Normal range)
|
|
- Vert: full-scale dirac at index n//2 (peak = 10 in/s)
|
|
- Long: zeros
|
|
- MicL: small ramp
|
|
Peak values are set on the event the way the device's 0C record
|
|
would supply them — used by the HDF5 writer for the mic per-count
|
|
factor.
|
|
"""
|
|
tran = [int((i / max(n - 1, 1)) * 32767 - 16384) for i in range(n)]
|
|
vert = [0] * n
|
|
if n:
|
|
vert[n // 2] = 32767
|
|
long_ = [0] * n
|
|
mic = [int((i / max(n - 1, 1)) * 5000) for i in range(n)]
|
|
|
|
ev = Event(index=0)
|
|
ev._waveform_key = bytes.fromhex("01110000")
|
|
ev.timestamp = Timestamp(
|
|
raw=b"", flag=0x10,
|
|
year=2026, unknown_byte=0, month=5, day=7,
|
|
hour=10, minute=0, second=0,
|
|
)
|
|
ev.record_type = "Waveform"
|
|
ev.sample_rate = 1024
|
|
ev.pretrig_samples = n // 4
|
|
ev.total_samples = n
|
|
ev.rectime_seconds = n / 1024.0
|
|
ev.raw_samples = {"Tran": tran, "Vert": vert, "Long": long_, "MicL": mic}
|
|
ev.peak_values = PeakValues(
|
|
tran=5.0, vert=10.0, long=0.0,
|
|
peak_vector_sum=10.0, micl=0.001,
|
|
)
|
|
ev.project_info = ProjectInfo(
|
|
project="TestProj", client="TestClient",
|
|
operator="brian", sensor_location="loc-A",
|
|
)
|
|
return ev
|
|
|
|
|
|
# ── HDF5 round-trip ───────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_hdf5_round_trip_preserves_metadata(tmp_path: Path):
|
|
ev = _make_event_with_samples()
|
|
h5 = tmp_path / "test.h5"
|
|
event_hdf5.write_event_hdf5(
|
|
h5, ev, serial="BE11529", geo_range="normal",
|
|
)
|
|
|
|
data = event_hdf5.read_event_hdf5(h5)
|
|
a = data["attrs"]
|
|
assert a["schema_version"] == event_hdf5.SCHEMA_VERSION
|
|
assert a["kind"] == event_hdf5.HDF5_KIND
|
|
assert a["serial"] == "BE11529"
|
|
assert a["waveform_key"] == "01110000"
|
|
assert a["sample_rate"] == 1024
|
|
assert a["pretrig_samples"] == 64
|
|
assert a["geo_range"] == "normal"
|
|
assert a["geo_full_scale_ips"] == 10.0
|
|
assert a["project"] == "TestProj"
|
|
assert a["client"] == "TestClient"
|
|
assert a["operator"] == "brian"
|
|
# Float attrs may round-trip with tiny precision noise.
|
|
assert abs(a["peak_tran_ips"] - 5.0) < 1e-6
|
|
assert abs(a["peak_vert_ips"] - 10.0) < 1e-6
|
|
|
|
|
|
def test_hdf5_samples_in_physical_units_normal_range(tmp_path: Path):
|
|
"""Vert hits ADC full-scale (32767) → with Normal range FS=10 in/s,
|
|
the HDF5 sample value should be ≈ 10 * 32767/32768 in/s."""
|
|
ev = _make_event_with_samples()
|
|
h5 = tmp_path / "n.h5"
|
|
event_hdf5.write_event_hdf5(h5, ev, serial="BE11529", geo_range="normal")
|
|
data = event_hdf5.read_event_hdf5(h5)
|
|
|
|
vert = data["samples"]["Vert"]
|
|
assert vert.dtype.name == "float32"
|
|
assert max(abs(v) for v in vert) > 9.99 # full-scale ≈ 10.0
|
|
# The dirac was at n//2 → 32767 ADC counts.
|
|
expected_peak = 10.0 * 32767 / 32768
|
|
assert abs(max(vert) - expected_peak) < 1e-3
|
|
|
|
|
|
def test_hdf5_samples_in_physical_units_sensitive_range(tmp_path: Path):
|
|
"""Same fixture but Sensitive range → full-scale 1.250 in/s."""
|
|
ev = _make_event_with_samples()
|
|
h5 = tmp_path / "s.h5"
|
|
event_hdf5.write_event_hdf5(h5, ev, serial="BE11529", geo_range="sensitive")
|
|
data = event_hdf5.read_event_hdf5(h5)
|
|
|
|
vert = data["samples"]["Vert"]
|
|
expected_peak = 1.250 * 32767 / 32768
|
|
assert abs(max(vert) - expected_peak) < 1e-4
|
|
|
|
|
|
def test_hdf5_includes_int16_samples(tmp_path: Path):
|
|
ev = _make_event_with_samples()
|
|
h5 = tmp_path / "i.h5"
|
|
event_hdf5.write_event_hdf5(h5, ev, serial="BE11529")
|
|
data = event_hdf5.read_event_hdf5(h5)
|
|
assert data["samples_int16"] is not None
|
|
assert "Tran" in data["samples_int16"]
|
|
assert data["samples_int16"]["Vert"].dtype.name == "int16"
|
|
|
|
|
|
def test_hdf5_rejects_unsupported_schema(tmp_path: Path):
|
|
"""Round-tripping with a tampered schema_version raises ValueError."""
|
|
import h5py
|
|
h5 = tmp_path / "future.h5"
|
|
with h5py.File(h5, "w") as f:
|
|
f.attrs["schema_version"] = 99
|
|
f.attrs["kind"] = event_hdf5.HDF5_KIND
|
|
try:
|
|
event_hdf5.read_event_hdf5(h5)
|
|
except ValueError as exc:
|
|
assert "schema_version" in str(exc)
|
|
return
|
|
raise AssertionError("read_event_hdf5 should reject unsupported schema_version")
|
|
|
|
|
|
# ── plot.v1 JSON shape ────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_event_to_plot_json_shape():
|
|
ev = _make_event_with_samples()
|
|
j = event_hdf5.event_to_plot_json(ev, serial="BE11529", geo_range="normal")
|
|
assert j["schema"] == "sfm.plot.v1"
|
|
assert j["serial"] == "BE11529"
|
|
assert j["geo_range"] == "normal"
|
|
assert j["geo_full_scale_ips"] == 10.0
|
|
assert j["trigger_ms"] == 0.0
|
|
|
|
t = j["time_axis"]
|
|
assert t["sample_rate"] == 1024
|
|
assert t["pretrig_samples"] == 64
|
|
assert t["n_samples"] == 256
|
|
# t0_ms = -pretrig * dt_ms = -64 * (1000/1024) ≈ -62.5
|
|
assert abs(t["t0_ms"] - (-64 * 1000 / 1024)) < 1e-3
|
|
assert abs(t["dt_ms"] - (1000 / 1024)) < 1e-6
|
|
|
|
chans = j["channels"]
|
|
for name in ("Tran", "Vert", "Long", "MicL"):
|
|
assert name in chans, f"missing channel: {name}"
|
|
assert chans[name]["unit"] in ("in/s", "psi")
|
|
assert "values" in chans[name]
|
|
assert "peak" in chans[name]
|
|
assert "peak_t_ms" in chans[name]
|
|
|
|
# Values are in physical units: Vert peak ≈ 10 in/s.
|
|
assert max(chans["Vert"]["values"]) > 9.99
|
|
|
|
|
|
def test_event_to_plot_json_peak_t_ms_locates_dirac():
|
|
"""The Vert channel's full-scale dirac at sample n//2 should produce
|
|
peak_t_ms = (n//2 - pretrig) * dt_ms."""
|
|
ev = _make_event_with_samples(n=256)
|
|
j = event_hdf5.event_to_plot_json(ev, serial="BE11529")
|
|
expected = (128 - 64) * (1000 / 1024) # = 62.5 ms
|
|
assert abs(j["channels"]["Vert"]["peak_t_ms"] - expected) < 1e-2
|
|
|
|
|
|
def test_plot_json_from_hdf5_round_trip(tmp_path: Path):
|
|
"""plot_json_from_hdf5 produces the same shape as event_to_plot_json."""
|
|
ev = _make_event_with_samples()
|
|
h5 = tmp_path / "rt.h5"
|
|
event_hdf5.write_event_hdf5(h5, ev, serial="BE11529", geo_range="normal")
|
|
|
|
j_disk = event_hdf5.plot_json_from_hdf5(h5, event_id="abc-123")
|
|
j_mem = event_hdf5.event_to_plot_json(ev, serial="BE11529", geo_range="normal", event_id="abc-123")
|
|
|
|
# Top-level shape parity
|
|
for k in ("schema", "serial", "geo_range", "geo_full_scale_ips",
|
|
"trigger_ms", "record_type", "waveform_key", "event_id"):
|
|
assert j_disk.get(k) == j_mem.get(k), f"mismatch on {k}"
|
|
assert j_disk["time_axis"]["sample_rate"] == j_mem["time_axis"]["sample_rate"]
|
|
assert j_disk["time_axis"]["n_samples"] == j_mem["time_axis"]["n_samples"]
|
|
|
|
# Sample values must match within float32 precision.
|
|
for ch in ("Tran", "Vert", "Long", "MicL"):
|
|
a = j_disk["channels"][ch]["values"]
|
|
b = j_mem["channels"][ch]["values"]
|
|
assert len(a) == len(b)
|
|
if a:
|
|
mx = max(abs(x - y) for x, y in zip(a, b))
|
|
assert mx < 1e-3, f"{ch}: max diff {mx}"
|
|
|
|
|
|
# ── WaveformStore integration with HDF5 ───────────────────────────────────────
|
|
|
|
|
|
def _make_synthetic_event_for_save() -> tuple[Event, list[S3Frame]]:
|
|
"""Same flavour as test_event_file_io.py but ensures _make_event_with_samples
|
|
is also wired into the BW write path so we can exercise WaveformStore.save."""
|
|
ev = _make_event_with_samples(n=128)
|
|
# Build a minimum 3-frame A5 stream (probe + sample + term) — same
|
|
# shape used in the other test files. The encoder only really needs
|
|
# the STRT in the probe + a non-zero body and a footer in the term.
|
|
key4 = ev._waveform_key
|
|
rectime = int(ev.rectime_seconds or 0) or 1
|
|
strt = bytearray(21)
|
|
strt[0:4] = b"STRT"
|
|
strt[4:6] = b"\xff\xfe"
|
|
strt[6:10] = key4
|
|
strt[10:14] = key4
|
|
strt[18] = rectime
|
|
probe = S3Frame(sub=0xA5, page_hi=0x10, page_lo=0x00,
|
|
data=bytes(7) + bytes(strt) + bytes(32),
|
|
checksum_valid=True, chk_byte=0x00)
|
|
sample = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x10,
|
|
data=bytes(7) + bytes(0x0200), checksum_valid=True, chk_byte=0x00)
|
|
footer = (
|
|
b"\x0e\x08"
|
|
+ bytes([7, 5, 0x07, 0xea, 0, 10, 0, 0])
|
|
+ bytes([7, 5, 0x07, 0xea, 0, 10, 0, 1])
|
|
+ b"\x00\x01\x00\x02\x00\x00\x00\x00"
|
|
)
|
|
term = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x00,
|
|
data=bytes(11) + bytes(38) + footer, checksum_valid=True, chk_byte=0x00)
|
|
ev._a5_frames = [probe, sample, term]
|
|
return ev, [probe, sample, term]
|
|
|
|
|
|
def test_waveform_store_save_emits_hdf5(tmp_path: Path):
|
|
from sfm.waveform_store import WaveformStore
|
|
store = WaveformStore(tmp_path / "waveforms")
|
|
ev, frames = _make_synthetic_event_for_save()
|
|
rec = store.save(ev, serial="BE11529", a5_frames=frames, geo_range="normal")
|
|
|
|
assert rec["hdf5_filename"], "hdf5_filename should be present in save() record"
|
|
h5 = store.hdf5_path_for("BE11529", rec["filename"])
|
|
assert h5.exists(), "WaveformStore.save should produce a .h5 file"
|
|
# The HDF5 round-trip should match the event's metadata.
|
|
data = event_hdf5.read_event_hdf5(h5)
|
|
assert data["attrs"]["serial"] == "BE11529"
|
|
assert data["attrs"]["geo_range"] == "normal"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if pytest is not None:
|
|
pytest.main([__file__, "-v"])
|
|
else:
|
|
import inspect
|
|
import traceback as _tb
|
|
|
|
passed = failed = 0
|
|
for _name, _fn in sorted(globals().items()):
|
|
if not _name.startswith("test_") or not callable(_fn):
|
|
continue
|
|
try:
|
|
_sig = inspect.signature(_fn)
|
|
if "tmp_path" in _sig.parameters:
|
|
with tempfile.TemporaryDirectory() as _td:
|
|
_fn(Path(_td))
|
|
else:
|
|
_fn()
|
|
print(f"PASS {_name}")
|
|
passed += 1
|
|
except Exception:
|
|
print(f"FAIL {_name}")
|
|
_tb.print_exc()
|
|
failed += 1
|
|
print(f"\n{passed} passed, {failed} failed")
|
|
sys.exit(0 if failed == 0 else 1)
|