Files
seismo-relay/tests/test_event_file_io.py
T
serversdown fa9d3cdef2 read_blastware_file: leave peak_values=None when samples can't be decoded
Fixes a data-loss bug discovered while dry-running the backfill against
the prod store.

Symptom: every histogram event in the store has its body decoded by
read_blastware_file → codec returns None → samples = empty dict →
``ev.peak_values = _peaks_from_samples(empty)`` returns
``PeakValues(0, 0, 0, 0, 0)`` (NOT None).  The backfill script's
existing "seed from DB row when peak_values is None" branch then
correctly *skips* the seeding, and the all-zeros PeakValues flows into
``db.insert_events()``'s UPSERT path, OVERWRITING the existing good DB
peak values for that event (which were populated from the paired BW
ASCII report at ingest).

Net effect: running the backfill on prod would have wiped the PPV /
mic / vector-sum columns for ~10,000 histogram events.

Fix: only compute peaks-from-samples when there are actually samples.
For events the codec couldn't decode (histogram-mode bodies, until
the §7.6.2 histogram codec is wired in), leave peak_values=None as
the "we don't know" signal.  Downstream consumers:

  - backfill_sidecars.py — its existing ``if ev.peak_values is None:``
    branch (line 243) seeds from the DB row, preserving the real
    BW-report peaks across the regen.
  - WaveformStore.save_imported_bw — apply_report_to_event overlays
    peaks from the paired BW ASCII report when one was uploaded.
    Histogram imports without a paired report end up with NULL peaks
    in the DB, which is correct (better than zeros — clearly says
    "no peak data available" rather than "peaks are exactly zero").

Updated the existing synthetic-event round-trip test to expect
peak_values=None for the no-real-body case, which is the truth now.

The 7 fixture-corpus regression tests for real BW waveforms continue
to pass — those have decodable samples, so peak_values is still
populated from the codec output as before.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 20:30:53 +00:00

558 lines
22 KiB
Python

"""
test_event_file_io.py — sidecar write/read/patch round-trips,
WaveformStore sidecar integration, and the BW-import path.
Run:
python tests/test_event_file_io.py
"""
from __future__ import annotations
import json
import os
import sys
import tempfile
from pathlib import Path
try:
import pytest
except ImportError:
pytest = None # type: ignore
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from minimateplus import event_file_io
from minimateplus.framing import S3Frame
from minimateplus.models import Event, Timestamp
# ── Fixtures shared with test_waveform_store.py ───────────────────────────────
def _make_synthetic_event() -> tuple[Event, list[S3Frame]]:
"""Same shape as tests/test_waveform_store.py — minimum viable Event +
A5 stream that makes write_blastware_file emit a parseable file.
STRT is exactly 21 bytes; rectime_seconds lands at byte 18 to match
`_decode_a5_waveform`'s expected layout (which is also what
`read_blastware_file()` reads back)."""
key4 = bytes.fromhex("01110000")
rectime = 3
strt = bytearray(21)
strt[0:4] = b"STRT"
strt[4:6] = b"\xff\xfe"
strt[6:10] = key4 # end_key (per data[23:27] in CLAUDE.md)
strt[10:14] = key4 # start_key (per data[27:31])
strt[18] = rectime
strt = bytes(strt)
probe_data = bytes(7) + strt + bytes(32)
probe = S3Frame(sub=0xA5, page_hi=0x10, page_lo=0x00, data=probe_data,
checksum_valid=True, chk_byte=0x00)
sample = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x10,
data=bytes(7) + bytes(0x0200), checksum_valid=True,
chk_byte=0x00)
# Build a valid 26-byte footer (0e 08 + ts1 + ts2 + 6 const + 2 crc)
# and embed it at the END of the terminator's contribution so
# write_blastware_file finds the real `0e 08` marker rather than
# falling back to slicing the last 26 bytes of zero garbage.
# ts byte order: [day][month][year_HI][year_LO][0x00][hour][min][sec]
footer = (
b"\x0e\x08"
+ bytes([6, 5, 0x07, 0xea, 0, 12, 34, 56]) # ts1 = 2026-05-06 12:34:56
+ bytes([6, 5, 0x07, 0xea, 0, 12, 35, 6]) # ts2 = ts1 + ~10s
+ b"\x00\x01\x00\x02\x00\x00"
+ b"\x00\x00"
)
assert len(footer) == 26
term_data = bytes(11) + bytes(38) + footer # 11 prefix + 38 pad + 26 footer = 75
term = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x00,
data=term_data, checksum_valid=True, chk_byte=0x00)
ev = Event(index=0)
ev._waveform_key = key4
ev.timestamp = Timestamp(
raw=b"", flag=0x10, year=2026, unknown_byte=0,
month=5, day=6, hour=12, minute=34, second=56,
)
ev.rectime_seconds = rectime
ev.record_type = "Waveform"
ev._a5_frames = [probe, sample, term]
return ev, [probe, sample, term]
# ── Sidecar write/read round-trip ─────────────────────────────────────────────
def test_event_to_sidecar_dict_shape():
ev, _ = _make_synthetic_event()
d = event_file_io.event_to_sidecar_dict(
ev,
serial="BE11529",
blastware_filename="M529LKIQ.7M0W",
blastware_filesize=1024,
blastware_sha256="abcd" * 16,
source_kind="sfm-live",
a5_pickle_filename="M529LKIQ.7M0W.a5.pkl",
)
assert d["schema_version"] == event_file_io.SCHEMA_VERSION
assert d["kind"] == event_file_io.SIDECAR_KIND
assert d["event"]["serial"] == "BE11529"
assert d["event"]["timestamp"] == "2026-05-06T12:34:56"
assert d["event"]["waveform_key"] == "01110000"
assert d["blastware"]["sha256"] == "abcd" * 16
assert d["source"]["kind"] == "sfm-live"
assert d["review"] == {
"false_trigger": False, "reviewer": None,
"reviewed_at": None, "notes": "",
}
assert d["extensions"] == {}
def test_sidecar_write_and_read_round_trip(tmp_path: Path):
ev, _ = _make_synthetic_event()
path = tmp_path / "M529LKIQ.7M0W.sfm.json"
src = event_file_io.event_to_sidecar_dict(
ev, serial="BE11529",
blastware_filename="M529LKIQ.7M0W", blastware_filesize=1024,
blastware_sha256="x" * 64, source_kind="sfm-ach",
)
event_file_io.write_sidecar(path, src)
loaded = event_file_io.read_sidecar(path)
assert loaded["event"] == src["event"]
assert loaded["blastware"] == src["blastware"]
assert loaded["source"]["kind"] == "sfm-ach"
def test_sidecar_rejects_unsupported_schema_version(tmp_path: Path):
path = tmp_path / "future.sfm.json"
path.write_text(json.dumps({
"schema_version": event_file_io.SCHEMA_VERSION + 1,
"kind": event_file_io.SIDECAR_KIND,
}))
try:
event_file_io.read_sidecar(path)
except ValueError as exc:
assert "schema_version" in str(exc)
return
raise AssertionError("read_sidecar should have rejected unsupported version")
def test_sidecar_extensions_survive_round_trip(tmp_path: Path):
"""Forward-compat: unknown keys inside `extensions` survive a r/w cycle."""
ev, _ = _make_synthetic_event()
path = tmp_path / "x.sfm.json"
d = event_file_io.event_to_sidecar_dict(
ev, serial="BE11529",
blastware_filename="X", blastware_filesize=0, blastware_sha256="",
source_kind="sfm-live",
extensions={"vendor.acme.gps": {"lat": 40.7, "lon": -74.0}},
)
event_file_io.write_sidecar(path, d)
back = event_file_io.read_sidecar(path)
assert back["extensions"]["vendor.acme.gps"]["lat"] == 40.7
def test_sidecar_patch_review_stamps_reviewed_at(tmp_path: Path):
ev, _ = _make_synthetic_event()
path = tmp_path / "patch.sfm.json"
event_file_io.write_sidecar(
path,
event_file_io.event_to_sidecar_dict(
ev, serial="BE11529",
blastware_filename="X", blastware_filesize=0, blastware_sha256="",
source_kind="sfm-live",
),
)
new = event_file_io.patch_sidecar(
path,
review={"false_trigger": True, "notes": "truck thump", "reviewer": "brian"},
)
assert new["review"]["false_trigger"] is True
assert new["review"]["notes"] == "truck thump"
assert new["review"]["reviewer"] == "brian"
assert new["review"]["reviewed_at"], "reviewed_at must be auto-stamped"
on_disk = event_file_io.read_sidecar(path)
assert on_disk["review"]["false_trigger"] is True
# ── WaveformStore integration ─────────────────────────────────────────────────
def test_waveform_store_save_writes_sidecar(tmp_path: Path):
from sfm.waveform_store import WaveformStore
store = WaveformStore(tmp_path / "waveforms")
ev, frames = _make_synthetic_event()
rec = store.save(ev, serial="BE11529", a5_frames=frames, source_kind="sfm-live")
assert rec["sidecar_filename"].endswith(".sfm.json")
assert rec["sha256"] and len(rec["sha256"]) == 64
sc = store.load_sidecar("BE11529", rec["filename"])
assert sc is not None
assert sc["blastware"]["filename"] == rec["filename"]
assert sc["blastware"]["sha256"] == rec["sha256"]
assert sc["source"]["kind"] == "sfm-live"
# The .a5.pkl reference should match the actual filename on disk.
assert sc["source"]["a5_pickle_filename"] == rec["a5_pickle_filename"]
def test_waveform_store_save_preserves_review_across_resave(tmp_path: Path):
"""Re-saving the same event must preserve a user's prior review edits."""
from sfm.waveform_store import WaveformStore
store = WaveformStore(tmp_path / "waveforms")
ev, frames = _make_synthetic_event()
rec = store.save(ev, serial="BE11529", a5_frames=frames)
# User flips false_trigger and adds a note.
store.patch_sidecar(
"BE11529", rec["filename"],
review={"false_trigger": True, "notes": "hello"},
)
# A second save (e.g. Force refresh re-download) must keep those edits.
store.save(ev, serial="BE11529", a5_frames=frames)
sc = store.load_sidecar("BE11529", rec["filename"])
assert sc["review"]["false_trigger"] is True
assert sc["review"]["notes"] == "hello"
def test_waveform_store_patch_sidecar_returns_none_when_missing(tmp_path: Path):
from sfm.waveform_store import WaveformStore
store = WaveformStore(tmp_path / "waveforms")
out = store.patch_sidecar("BE99999", "no.such.W", review={"notes": "x"})
assert out is None
# ── DB integration: sidecar_filename column + update_event_review ─────────────
def test_seismodb_persists_sidecar_filename_and_review_sync(tmp_path: Path):
from sfm.database import SeismoDb
db = SeismoDb(tmp_path / "seismo_relay.db")
ev, _ = _make_synthetic_event()
rec = {
"filename": "M529LKIQ.7M0W",
"filesize": 8708,
"a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl",
"sidecar_filename": "M529LKIQ.7M0W.sfm.json",
}
inserted, _ = db.insert_events(
[ev], serial="BE11529",
waveform_records={ev._waveform_key.hex(): rec},
)
assert inserted == 1
rows = db.query_events(serial="BE11529")
row = rows[0]
assert row["sidecar_filename"] == rec["sidecar_filename"]
# update_event_review keeps false_trigger column in sync with sidecar.
assert db.update_event_review(row["id"], {"false_trigger": True}) is True
again = db.get_event(row["id"])
assert again["false_trigger"] == 1
# Empty review block (no false_trigger key) → no-op but row exists.
assert db.update_event_review(row["id"], {"notes": "x"}) is True
# ── BW-file reader (read_blastware_file) ─────────────────────────────────────
def test_read_blastware_file_round_trip(tmp_path: Path):
"""write → read → key/timestamp/rectime survive."""
from minimateplus.blastware_file import write_blastware_file, blastware_filename
ev, frames = _make_synthetic_event()
bw_path = tmp_path / blastware_filename(ev, "BE11529")
write_blastware_file(ev, frames, bw_path)
parsed = event_file_io.read_blastware_file(bw_path)
assert parsed._waveform_key == ev._waveform_key
assert parsed.rectime_seconds == ev.rectime_seconds
# Timestamp lands via the footer; year/month/day/hour/min/sec all survive.
assert parsed.timestamp is not None
assert parsed.timestamp.year == ev.timestamp.year
assert parsed.timestamp.month == ev.timestamp.month
assert parsed.timestamp.day == ev.timestamp.day
assert parsed.timestamp.hour == ev.timestamp.hour
assert parsed.timestamp.minute == ev.timestamp.minute
assert parsed.timestamp.second == ev.timestamp.second
# No A5 source recoverable.
assert parsed._a5_frames is None
# The synthetic event has no real waveform body, so the codec can't
# decode samples → read_blastware_file leaves peak_values=None
# (the "we don't know" signal) rather than fabricating all-zero
# peaks that would otherwise overwrite real DB values via UPSERT.
assert parsed.peak_values is None
assert parsed.raw_samples is not None
# Empty channels — codec returned None for the malformed synthetic body.
for ch in ("Tran", "Vert", "Long", "MicL"):
assert parsed.raw_samples[ch] == []
_BW_CODEC_FIXTURES = [
# (path, expected_n_samples_per_channel, BW-reported Vert PPV in/s for sanity)
("tests/fixtures/decode-re-5-8-26/event-a/M529LKVQ.6S0", 3328, 0.780),
("tests/fixtures/decode-re-5-8-26/event-b/M529LK5Q.RG0", 2304, 0.505),
("tests/fixtures/decode-re-5-8-26/event-c/M529LK44.AB0", 1280, 0.610),
("tests/fixtures/decode-re-5-8-26/event-d/M529LK2V.470", 1280, 0.565),
("tests/fixtures/5-11-26/M529LL1L.V70", 3328, 0.010),
("tests/fixtures/5-11-26/M529LL1L.JQ0", 3328, 3.465),
]
@pytest.mark.parametrize("path,expected_n,expected_ppv", _BW_CODEC_FIXTURES)
def test_read_blastware_file_decodes_via_codec(path: str, expected_n: int, expected_ppv: float):
"""Regression lock: ``read_blastware_file()`` must use the verified
waveform-body codec (``minimateplus.waveform_codec``), not the
retracted int16-LE assumption.
Verifies against the real BW fixture corpus: every event in the
bundled fixtures must produce the expected per-channel sample count
and a Vert PPV close to BW's own reported value. Catches any
accidental regression of the body decoder back to the old
``_decode_samples_4ch_int16_le`` path (which produced ±32K noise
on every event, giving wildly wrong PPVs).
"""
repo_root = Path(__file__).resolve().parent.parent
full_path = repo_root / path
if not full_path.exists():
pytest.skip(f"fixture missing: {full_path}")
ev = event_file_io.read_blastware_file(full_path)
assert ev.raw_samples is not None
for ch in ("Tran", "Vert", "Long"):
assert len(ev.raw_samples[ch]) == expected_n, (
f"{ch}: expected {expected_n} samples, got {len(ev.raw_samples[ch])}"
)
# PPV check: the codec produces decoded samples in 1-count ADC units;
# _peaks_from_samples scales by GEO_NORMAL_FS_INS / 32767. BW's own
# PPV is computed at slightly different precision/interpolation, so
# we allow a 0.2 in/s tolerance — well under the broken-decoder
# signature (which would produce ~10 in/s saturation).
assert ev.peak_values is not None
assert abs(ev.peak_values.vert - expected_ppv) < 0.2, (
f"Vert PPV {ev.peak_values.vert:.3f} differs from BW's "
f"{expected_ppv:.3f} by >0.2 in/s — codec regression?"
)
def test_read_blastware_file_v70_samples_match_txt_truth():
"""Strongest regression lock: every one of V70's 3328 decoded
sample-sets must match the .TXT ground truth table within the
0.005 in/s display quantum."""
repo_root = Path(__file__).resolve().parent.parent
bw_path = repo_root / "tests/fixtures/5-11-26/M529LL1L.V70"
txt_path = repo_root / "tests/fixtures/5-11-26/M529LL1L.V70.TXT"
if not bw_path.exists() or not txt_path.exists():
pytest.skip(f"V70 fixture missing")
import re
ev = event_file_io.read_blastware_file(bw_path)
# Parse .TXT ground truth sample table
text = txt_path.read_text()
lines = text.splitlines()
hdr_idx = next(i for i, line in enumerate(lines)
if re.match(r"^Tran\s+Vert\s+Long\s+MicL?", line.strip()))
truth = []
for line in lines[hdr_idx + 1:]:
parts = line.strip().split()
if len(parts) != 4:
continue
try:
truth.append([float(x) for x in parts])
except ValueError:
continue
assert len(truth) == 3328, f"expected 3328 truth rows, got {len(truth)}"
def adc_to_ins(count):
return count / 32767.0 * 10.0
for i, truth_row in enumerate(truth):
for ch_idx, ch_name in enumerate(("Tran", "Vert", "Long")):
decoded_ips = adc_to_ins(ev.raw_samples[ch_name][i])
truth_ips = truth_row[ch_idx]
# 0.003 in/s tolerance: <0.005 quantum + small float precision room
assert abs(decoded_ips - truth_ips) < 0.003, (
f"row {i} {ch_name}: decoded {decoded_ips:+.4f} vs "
f"truth {truth_ips:+.4f} (delta {decoded_ips - truth_ips:+.4f})"
)
def test_save_imported_bw_with_paired_report(tmp_path: Path):
"""save_imported_bw + a paired BW ASCII report fold the report's
rich derived fields into the sidecar. This is the daemon-forwarded
ACH workflow: BW writes <event>.AB0 and <event>.AB0.TXT side by side;
the daemon ships both; we overlay the report-decoded values onto the
sidecar (peaks, project, plus the rich `bw_report` block)."""
from minimateplus.blastware_file import write_blastware_file, blastware_filename
from sfm.waveform_store import WaveformStore
ev, frames = _make_synthetic_event()
fname = blastware_filename(ev, "BE11529")
src = tmp_path / fname
write_blastware_file(ev, frames, src)
# Use one of the real BW ASCII exports as the paired report.
report_path = (
Path(__file__).parent.parent
/ "decode-re" / "5-8-26" / "event-c" / "M529LK44.AB0.TXT"
)
if not report_path.exists():
import pytest as _pt
_pt.skip("decode-re fixtures not present")
report_bytes = report_path.read_bytes()
store = WaveformStore(tmp_path / "waveforms")
parsed_ev, rec = store.save_imported_bw(
src.read_bytes(),
source_path=src,
bw_report_text=report_bytes,
)
sc = store.load_sidecar("BE11529", fname)
assert sc is not None
# ── bw_report block populated with the rich fields ──────────────────
assert "bw_report" in sc
br = sc["bw_report"]
assert br["available"] is True
assert br["event_type"] == "Full Waveform"
assert br["recording"]["sample_rate_sps"] == 1024
assert br["recording"]["geo_range_ips"] == 10.0
# Per-channel derived stats
assert br["peaks"]["tran"]["ppv_ips"] == 0.065
assert br["peaks"]["vert"]["ppv_ips"] == 0.610
assert br["peaks"]["long"]["ppv_ips"] == 0.070
assert br["peaks"]["vert"]["peak_accel_g"] == 0.437
assert br["peaks"]["vert"]["peak_disp_in"] == 0.006
assert br["peaks"]["tran"]["zc_freq_hz"] == 47.0
assert br["peaks"]["vector_sum"]["ips"] == 0.612
assert br["peaks"]["vector_sum"]["time_s"] == 0.024
# Sensor self-check per channel
assert br["sensor_check"]["tran"]["freq_hz"] == 7.4
assert br["sensor_check"]["tran"]["ratio"] == 3.7
assert br["sensor_check"]["tran"]["result"] == "Passed"
assert br["sensor_check"]["mic"]["amplitude_mv"] == 533.0
# Mic block
assert br["mic"]["weighting"] == "Linear Weighting"
assert br["mic"]["pspl_dbl"] == 88.0
# Monitor log roundtripped
assert len(br["monitor_log"]) == 1
assert "2026-04-23T15:46:16" in br["monitor_log"][0]["start"]
assert br["pc_sw_version"] == "V 10.74"
# ── Overlay onto canonical peak_values ──────────────────────────────
# Report values win over the broken-codec samples-derived peaks.
assert sc["peak_values"]["transverse"] == 0.065
assert sc["peak_values"]["vertical"] == 0.610
assert sc["peak_values"]["longitudinal"] == 0.070
assert sc["peak_values"]["vector_sum"] == 0.612
# Mic PSPL converted to psi (dbl=88 → 10^(88/20) * 2.9e-9)
assert sc["peak_values"]["mic_psi"] is not None
assert 1e-5 < sc["peak_values"]["mic_psi"] < 1e-3
# ── Overlay onto project_info ───────────────────────────────────────
assert sc["project_info"]["project"] == "Test4-21-26"
assert sc["project_info"]["client"] == "Test-Client1"
assert sc["project_info"]["operator"] == "Brian and claude"
assert sc["project_info"]["sensor_location"] == "catbed"
# ── Event timestamp overlaid from report ───────────────────────────
assert sc["event"]["timestamp"] == "2026-04-23T15:56:35"
def test_save_imported_bw_without_report_works_unchanged(tmp_path: Path):
"""Calling save_imported_bw with no bw_report_text behaves exactly
as before — no `bw_report` block, peak_values come from samples."""
from minimateplus.blastware_file import write_blastware_file, blastware_filename
from sfm.waveform_store import WaveformStore
ev, frames = _make_synthetic_event()
fname = blastware_filename(ev, "BE11529")
src = tmp_path / fname
write_blastware_file(ev, frames, src)
store = WaveformStore(tmp_path / "waveforms")
store.save_imported_bw(src.read_bytes(), source_path=src)
sc = store.load_sidecar("BE11529", fname)
assert sc is not None
assert "bw_report" not in sc # block is absent without a report
# Synthetic event has zero samples → peaks all zero (was true before this change)
assert sc["peak_values"]["transverse"] == 0.0
def test_save_imported_bw_round_trip(tmp_path: Path):
"""save_imported_bw stores a copy + sidecar with source.kind = bw-import."""
from minimateplus.blastware_file import write_blastware_file, blastware_filename
from sfm.waveform_store import WaveformStore
# Produce a BW file outside the store.
ev, frames = _make_synthetic_event()
fname = blastware_filename(ev, "BE11529")
src = tmp_path / fname
write_blastware_file(ev, frames, src)
store = WaveformStore(tmp_path / "waveforms")
parsed_ev, rec = store.save_imported_bw(src.read_bytes(), source_path=src)
assert rec["filename"] == fname
assert rec["a5_pickle_filename"] is None # no A5 source for BW imports
# The serial decoded from the BW filename surfaces on the record so
# the import endpoint can use it when calling SeismoDb.insert_events()
# (otherwise forwarded events would all bucket into serial="UNKNOWN").
assert rec["serial"] == "BE11529"
sc = store.load_sidecar("BE11529", fname)
assert sc is not None
assert sc["source"]["kind"] == "bw-import"
assert sc["source"]["a5_pickle_filename"] is None
# The stored binary should match the source byte-for-byte (we just copied).
stored_path = store.open_blastware("BE11529", fname)
assert stored_path is not None
assert stored_path.read_bytes() == src.read_bytes()
if __name__ == "__main__":
if pytest is not None:
pytest.main([__file__, "-v"])
else:
import inspect
import traceback as _tb
passed = failed = 0
for _name, _fn in sorted(globals().items()):
if not _name.startswith("test_") or not callable(_fn):
continue
try:
_sig = inspect.signature(_fn)
if "tmp_path" in _sig.parameters:
with tempfile.TemporaryDirectory() as _td:
_fn(Path(_td))
else:
_fn()
print(f"PASS {_name}")
passed += 1
except Exception:
print(f"FAIL {_name}")
_tb.print_exc()
failed += 1
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)