minimateplus: wire read_blastware_file to verified body codec #24
@@ -27,6 +27,7 @@ from typing import Optional, Union
|
||||
from .models import Event, PeakValues, ProjectInfo, Timestamp
|
||||
from . import blastware_file as _bw # avoid circular reference at module load
|
||||
from .bw_ascii_report import BwAsciiReport
|
||||
from .waveform_codec import decode_waveform_v2, decoded_to_adc_counts
|
||||
|
||||
# Reference pressure for dB(L) → psi conversion (20 µPa expressed in psi).
|
||||
# Same constant as sfm/sfm_webapp.html so server-side and browser-side
|
||||
@@ -755,11 +756,28 @@ def read_blastware_file(path: Union[str, Path]) -> Event:
|
||||
ts1 = _bw._decode_ts_be(footer[2:10])
|
||||
ts2 = _bw._decode_ts_be(footer[10:18])
|
||||
|
||||
# Body: first 6 bytes are the preamble (00 00 ff ff ff ff). Strip
|
||||
# them before decoding samples. Any trailing tail past the last
|
||||
# full sample-set is silently truncated by _decode_samples_4ch.
|
||||
sample_bytes = body[6:] if body[:6].hex() in ("0000ffffffff", "0000FFFFFFFF") else body
|
||||
samples = _decode_samples_4ch_int16_le(sample_bytes)
|
||||
# Body: decode via the verified BW waveform-body codec. The body
|
||||
# starts with the codec's 7-byte preamble ``00 02 00 [Tran[0] BE]
|
||||
# [Tran[1] BE]`` and continues with the tagged-block stream the codec
|
||||
# walks. See ``minimateplus/waveform_codec.py`` + ``docs/waveform_codec_re_status.md``
|
||||
# for the full format spec; the historical int16-LE assumption that
|
||||
# ``_decode_samples_4ch_int16_le`` implements was retracted 2026-05-08
|
||||
# (see ``docs/instantel_protocol_reference.md`` §7.6.1).
|
||||
#
|
||||
# If decode fails (malformed file, truncated body, synthetic test
|
||||
# input), fall back to empty channels — the rest of the event
|
||||
# (timestamp, waveform_key, project strings) is still recoverable
|
||||
# and useful. The peaks-from-samples helper handles empty input
|
||||
# gracefully.
|
||||
decoded = decode_waveform_v2(body)
|
||||
if decoded is None:
|
||||
log.warning(
|
||||
"%s: waveform body codec failed to decode (body starts %s) — "
|
||||
"raw_samples will be empty", path, body[:8].hex(" "),
|
||||
)
|
||||
samples = {"Tran": [], "Vert": [], "Long": [], "MicL": []}
|
||||
else:
|
||||
samples = decoded_to_adc_counts(decoded)
|
||||
|
||||
# Metadata strings (label-anchored search across the body).
|
||||
project = _find_first_string(body, b"Project:")
|
||||
|
||||
@@ -294,6 +294,97 @@ def test_read_blastware_file_round_trip(tmp_path: Path):
|
||||
assert parsed.peak_values.peak_vector_sum == 0.0
|
||||
|
||||
|
||||
_BW_CODEC_FIXTURES = [
|
||||
# (path, expected_n_samples_per_channel, BW-reported Vert PPV in/s for sanity)
|
||||
("tests/fixtures/decode-re-5-8-26/event-a/M529LKVQ.6S0", 3328, 0.780),
|
||||
("tests/fixtures/decode-re-5-8-26/event-b/M529LK5Q.RG0", 2304, 0.505),
|
||||
("tests/fixtures/decode-re-5-8-26/event-c/M529LK44.AB0", 1280, 0.610),
|
||||
("tests/fixtures/decode-re-5-8-26/event-d/M529LK2V.470", 1280, 0.565),
|
||||
("tests/fixtures/5-11-26/M529LL1L.V70", 3328, 0.010),
|
||||
("tests/fixtures/5-11-26/M529LL1L.JQ0", 3328, 3.465),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("path,expected_n,expected_ppv", _BW_CODEC_FIXTURES)
|
||||
def test_read_blastware_file_decodes_via_codec(path: str, expected_n: int, expected_ppv: float):
|
||||
"""Regression lock: ``read_blastware_file()`` must use the verified
|
||||
waveform-body codec (``minimateplus.waveform_codec``), not the
|
||||
retracted int16-LE assumption.
|
||||
|
||||
Verifies against the real BW fixture corpus: every event in the
|
||||
bundled fixtures must produce the expected per-channel sample count
|
||||
and a Vert PPV close to BW's own reported value. Catches any
|
||||
accidental regression of the body decoder back to the old
|
||||
``_decode_samples_4ch_int16_le`` path (which produced ±32K noise
|
||||
on every event, giving wildly wrong PPVs).
|
||||
"""
|
||||
repo_root = Path(__file__).resolve().parent.parent
|
||||
full_path = repo_root / path
|
||||
if not full_path.exists():
|
||||
pytest.skip(f"fixture missing: {full_path}")
|
||||
|
||||
ev = event_file_io.read_blastware_file(full_path)
|
||||
assert ev.raw_samples is not None
|
||||
for ch in ("Tran", "Vert", "Long"):
|
||||
assert len(ev.raw_samples[ch]) == expected_n, (
|
||||
f"{ch}: expected {expected_n} samples, got {len(ev.raw_samples[ch])}"
|
||||
)
|
||||
|
||||
# PPV check: the codec produces decoded samples in 1-count ADC units;
|
||||
# _peaks_from_samples scales by GEO_NORMAL_FS_INS / 32767. BW's own
|
||||
# PPV is computed at slightly different precision/interpolation, so
|
||||
# we allow a 0.2 in/s tolerance — well under the broken-decoder
|
||||
# signature (which would produce ~10 in/s saturation).
|
||||
assert ev.peak_values is not None
|
||||
assert abs(ev.peak_values.vert - expected_ppv) < 0.2, (
|
||||
f"Vert PPV {ev.peak_values.vert:.3f} differs from BW's "
|
||||
f"{expected_ppv:.3f} by >0.2 in/s — codec regression?"
|
||||
)
|
||||
|
||||
|
||||
def test_read_blastware_file_v70_samples_match_txt_truth():
|
||||
"""Strongest regression lock: every one of V70's 3328 decoded
|
||||
sample-sets must match the .TXT ground truth table within the
|
||||
0.005 in/s display quantum."""
|
||||
repo_root = Path(__file__).resolve().parent.parent
|
||||
bw_path = repo_root / "tests/fixtures/5-11-26/M529LL1L.V70"
|
||||
txt_path = repo_root / "tests/fixtures/5-11-26/M529LL1L.V70.TXT"
|
||||
if not bw_path.exists() or not txt_path.exists():
|
||||
pytest.skip(f"V70 fixture missing")
|
||||
|
||||
import re
|
||||
ev = event_file_io.read_blastware_file(bw_path)
|
||||
|
||||
# Parse .TXT ground truth sample table
|
||||
text = txt_path.read_text()
|
||||
lines = text.splitlines()
|
||||
hdr_idx = next(i for i, line in enumerate(lines)
|
||||
if re.match(r"^Tran\s+Vert\s+Long\s+MicL?", line.strip()))
|
||||
truth = []
|
||||
for line in lines[hdr_idx + 1:]:
|
||||
parts = line.strip().split()
|
||||
if len(parts) != 4:
|
||||
continue
|
||||
try:
|
||||
truth.append([float(x) for x in parts])
|
||||
except ValueError:
|
||||
continue
|
||||
assert len(truth) == 3328, f"expected 3328 truth rows, got {len(truth)}"
|
||||
|
||||
def adc_to_ins(count):
|
||||
return count / 32767.0 * 10.0
|
||||
|
||||
for i, truth_row in enumerate(truth):
|
||||
for ch_idx, ch_name in enumerate(("Tran", "Vert", "Long")):
|
||||
decoded_ips = adc_to_ins(ev.raw_samples[ch_name][i])
|
||||
truth_ips = truth_row[ch_idx]
|
||||
# 0.003 in/s tolerance: <0.005 quantum + small float precision room
|
||||
assert abs(decoded_ips - truth_ips) < 0.003, (
|
||||
f"row {i} {ch_name}: decoded {decoded_ips:+.4f} vs "
|
||||
f"truth {truth_ips:+.4f} (delta {decoded_ips - truth_ips:+.4f})"
|
||||
)
|
||||
|
||||
|
||||
def test_save_imported_bw_with_paired_report(tmp_path: Path):
|
||||
"""save_imported_bw + a paired BW ASCII report fold the report's
|
||||
rich derived fields into the sidecar. This is the daemon-forwarded
|
||||
|
||||
Reference in New Issue
Block a user