6ac126e05c
User uploaded 3 high-amplitude events (PPV 6-7 in/s — shook the geophone
hard) to decode-re/5-11-26/. These cracked the Tran codec:
- Preamble bytes [3:5] and [5:7] = Tran[0] and Tran[1] as int16 BE
in 16-count units (LSB = 0.005 in/s). Confirmed across all 7
fixtures.
- First data block carries Tran deltas from sample 2 onward:
* 10 NN block: NN/2 bytes of payload, each byte = two 4-bit signed
nibble deltas (high nibble first)
* 20 NN block: NN int8 signed deltas
Verified 22+42+46 = 110 Tran samples across SP0/SS0/SV0 with 0 errors
against BW's ASCII export.
Why the earlier 96-combination brute force failed: the quiet 5-8
events all had T[0] = T[1] ≈ 0 so the preamble's per-channel encoding
was undetectable. Loud events made the encoding obvious.
What's solved:
- minimateplus.waveform_codec.decode_tran_initial: returns first
N Tran samples in 16-count units for any body.
- Walker length formula for in-data 30 NN blocks (NN*2 instead of NN*4).
- Walker now handles bodies that start with 20 NN (in addition to 10 NN).
What's still open:
- Tran past the first data block (multi-block channel switching).
- Vert / Long / MicL channel encodings.
- Walker correctness past offset ~427 in event-b.
Tests: 36 pass. decode_waveform_v2 still returns None — the full
multi-channel decoder is not wired up. decode_tran_initial is the
new verified entry point.
Files: minimateplus/waveform_codec.py, tests/test_waveform_codec.py
(adds 5-11-26 fixtures + decode_tran_initial tests), and
docs/instantel_protocol_reference.md §7.6.1 (Tran codec spec).
315 lines
12 KiB
Python
315 lines
12 KiB
Python
"""
|
|
Tests for minimateplus.waveform_codec — Blastware waveform-file body block walker.
|
|
|
|
These tests lock in the STRUCTURAL framing of the body codec. The byte-to-sample
|
|
mapping is open (see waveform_codec module docstring) — until that's nailed down,
|
|
:func:`decode_waveform_v2` returns ``None`` and there is no per-sample assertion
|
|
to make.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
|
|
import pytest
|
|
|
|
from minimateplus.waveform_codec import (
|
|
WaveformBlock,
|
|
decode_tran_initial,
|
|
decode_waveform_v2,
|
|
find_data_start,
|
|
parse_segment_header,
|
|
split_segments,
|
|
walk_body,
|
|
)
|
|
|
|
|
|
FIXTURES = os.path.join(
|
|
os.path.dirname(__file__), "fixtures", "decode-re-5-8-26"
|
|
)
|
|
|
|
|
|
def _bw_body(path):
|
|
"""Strip the 22-byte header and 21-byte STRT and 26-byte footer to get the body."""
|
|
with open(path, "rb") as f:
|
|
binary = f.read()
|
|
return binary[43:-26]
|
|
|
|
|
|
# Fixture metadata — bundled BW binaries from a real BE11529 unit, May 8 2026.
|
|
# Each is paired with a Blastware TXT export (the ASCII ground truth).
|
|
FIXTURES_INFO = {
|
|
"event-a": {
|
|
"filename": "M529LKVQ.6S0",
|
|
"n_samples": 3328, # 3.0 s rectime + 0.25 s pretrig at 1024 sps
|
|
"rectime": 3.0,
|
|
},
|
|
"event-b": {
|
|
"filename": "M529LK5Q.RG0",
|
|
"n_samples": 2304, # 2.0 s
|
|
"rectime": 2.0,
|
|
},
|
|
"event-c": {
|
|
"filename": "M529LK44.AB0",
|
|
"n_samples": 1280, # 1.0 s
|
|
"rectime": 1.0,
|
|
},
|
|
"event-d": {
|
|
"filename": "M529LK2V.470",
|
|
"n_samples": 1280,
|
|
"rectime": 1.0,
|
|
},
|
|
}
|
|
|
|
|
|
def _fixture_path(event_name):
|
|
info = FIXTURES_INFO[event_name]
|
|
return os.path.join(FIXTURES, event_name, info["filename"])
|
|
|
|
|
|
# ── Find data start ──────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
|
def test_find_data_start_locates_first_block(event_name):
|
|
"""The walker auto-detects the first ``10 NN`` tag within the first 20 bytes."""
|
|
path = _fixture_path(event_name)
|
|
if not os.path.exists(path):
|
|
pytest.skip(f"fixture missing: {path}")
|
|
body = _bw_body(path)
|
|
start = find_data_start(body)
|
|
assert 0 <= start < 20, f"expected start in [0, 20), got {start}"
|
|
assert body[start] == 0x10
|
|
assert body[start + 1] % 4 == 0
|
|
assert 0 < body[start + 1] <= 0xFC
|
|
|
|
|
|
def test_find_data_start_preamble_lengths():
|
|
"""All 4 events have either a 7-byte (single-shot) or 9-byte (continuous) preamble."""
|
|
starts = {}
|
|
for name in FIXTURES_INFO:
|
|
path = _fixture_path(name)
|
|
if not os.path.exists(path):
|
|
pytest.skip(f"fixture missing: {path}")
|
|
body = _bw_body(path)
|
|
starts[name] = find_data_start(body)
|
|
# Empirically: events a, b have 9-byte preamble; events c, d have 7-byte.
|
|
assert starts["event-a"] == 9
|
|
assert starts["event-b"] == 9
|
|
assert starts["event-c"] == 7
|
|
assert starts["event-d"] == 7
|
|
|
|
|
|
# ── Block walker ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_walk_body_empty_returns_empty():
|
|
assert walk_body(b"") == []
|
|
|
|
|
|
def test_walk_body_invalid_start_returns_empty():
|
|
# Body that does not begin with a recognized tag.
|
|
assert walk_body(b"\xff\xff\xff\xff", start=0) == []
|
|
|
|
|
|
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
|
def test_walk_body_produces_blocks(event_name):
|
|
"""The walker should produce a non-empty stream of blocks for every fixture."""
|
|
path = _fixture_path(event_name)
|
|
if not os.path.exists(path):
|
|
pytest.skip(f"fixture missing: {path}")
|
|
body = _bw_body(path)
|
|
blocks = walk_body(body)
|
|
assert len(blocks) > 0
|
|
# All blocks have one of the 5 known tag types.
|
|
for b in blocks:
|
|
assert b.tag_hi in (0x10, 0x20, 0x00, 0x30, 0x40), (
|
|
f"unknown tag {b.tag_hi:#04x} at offset {b.offset}"
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
|
def test_walk_body_block_lengths_consistent(event_name):
|
|
"""Each block's recorded length matches its on-wire footprint."""
|
|
path = _fixture_path(event_name)
|
|
if not os.path.exists(path):
|
|
pytest.skip(f"fixture missing: {path}")
|
|
body = _bw_body(path)
|
|
blocks = walk_body(body)
|
|
for b in blocks:
|
|
# Tag (2 bytes) + payload should equal length.
|
|
assert 2 + len(b.data) == b.length, (
|
|
f"block at {b.offset} length mismatch: tag(2) + data({len(b.data)}) != length({b.length})"
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
|
def test_walk_body_blocks_contiguous(event_name):
|
|
"""Block n+1 starts exactly where block n ends (no gaps, no overlaps)."""
|
|
path = _fixture_path(event_name)
|
|
if not os.path.exists(path):
|
|
pytest.skip(f"fixture missing: {path}")
|
|
body = _bw_body(path)
|
|
blocks = walk_body(body)
|
|
for i in range(1, len(blocks)):
|
|
prev = blocks[i - 1]
|
|
cur = blocks[i]
|
|
assert cur.offset == prev.offset + prev.length, (
|
|
f"gap/overlap between block {i-1} (off={prev.offset} len={prev.length}) "
|
|
f"and block {i} (off={cur.offset})"
|
|
)
|
|
|
|
|
|
# ── Segment splitting ────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
|
def test_split_segments_yields_at_least_one(event_name):
|
|
path = _fixture_path(event_name)
|
|
if not os.path.exists(path):
|
|
pytest.skip(f"fixture missing: {path}")
|
|
body = _bw_body(path)
|
|
blocks = walk_body(body)
|
|
segments = split_segments(blocks)
|
|
assert len(segments) > 0
|
|
|
|
|
|
def test_split_segments_segment_count_at_least_one_per_event():
|
|
"""The walker should produce at least one ``40 02`` segment header per event.
|
|
|
|
Note: the walker currently bails out partway through event-b (still an
|
|
open issue — the body codec uses block lengths the walker doesn't
|
|
handle correctly past offset ~427). The other 3 events walk farther
|
|
and have many segment headers.
|
|
"""
|
|
for name in FIXTURES_INFO:
|
|
path = _fixture_path(name)
|
|
if not os.path.exists(path):
|
|
continue
|
|
body = _bw_body(path)
|
|
blocks = walk_body(body)
|
|
n_40 = sum(1 for b in blocks if b.tag_hi == 0x40)
|
|
assert n_40 >= 1, f"{name}: no 40 02 segment header found"
|
|
|
|
|
|
# ── Segment header parsing ───────────────────────────────────────────────────
|
|
|
|
|
|
def test_parse_segment_header_returns_none_for_non_40():
|
|
block = WaveformBlock(offset=0, tag_hi=0x10, tag_lo=0x04, data=b"\x00\x00", length=4)
|
|
assert parse_segment_header(block) is None
|
|
|
|
|
|
def test_parse_segment_header_decodes_fields():
|
|
"""Decode a known 40 02 block to verify field offsets."""
|
|
# First segment header from event-c at body offset 235:
|
|
# 40 02 00 00 00 00 0a 4b 01 1e 47 00 00 00 02 00 00 01 00 01
|
|
payload = bytes.fromhex("00000000 0a4b011e 47000000 02000001 0001".replace(" ", ""))
|
|
block = WaveformBlock(
|
|
offset=235, tag_hi=0x40, tag_lo=0x02, data=payload, length=20
|
|
)
|
|
decoded = parse_segment_header(block)
|
|
assert decoded is not None
|
|
assert decoded["counter"] == 0x47 # uint32 LE
|
|
assert decoded["fixed_pattern"] == b"\x02\x00\x00\x01"
|
|
assert decoded["anchor_bytes"] == b"\x00\x00\x00\x00"
|
|
|
|
|
|
def test_segment_counter_increments():
|
|
"""The 4-byte counter at bytes [8:12] of each 40 02 payload increments by 1."""
|
|
path = _fixture_path("event-c")
|
|
if not os.path.exists(path):
|
|
pytest.skip("fixture missing")
|
|
body = _bw_body(path)
|
|
blocks = walk_body(body)
|
|
headers = [b for b in blocks if b.tag_hi == 0x40 and b.tag_lo == 0x02]
|
|
counters = [parse_segment_header(b)["counter"] for b in headers]
|
|
assert len(counters) >= 5, "expect at least 5 segments to verify increments"
|
|
# First few counters should be strictly monotonic (the BW counter is global,
|
|
# incrementing across the whole flash buffer; some events may share counter
|
|
# values with the previous event's tail block, so allow non-strict).
|
|
for i in range(1, min(8, len(counters))):
|
|
assert counters[i] >= counters[i - 1], (
|
|
f"counter went backwards: {counters[i-1]} → {counters[i]}"
|
|
)
|
|
|
|
|
|
# ── decode_waveform_v2: currently a stub ─────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
|
def test_decode_waveform_v2_returns_none_until_verified(event_name):
|
|
"""
|
|
The full per-channel decoder is not yet wired up.
|
|
|
|
This test ensures decode_waveform_v2 returns ``None`` so callers know
|
|
to keep using the legacy decoder. When a verified decoder lands,
|
|
flip this assertion and add ground-truth tests against the bundled
|
|
TXT exports.
|
|
"""
|
|
path = _fixture_path(event_name)
|
|
if not os.path.exists(path):
|
|
pytest.skip(f"fixture missing: {path}")
|
|
body = _bw_body(path)
|
|
assert decode_waveform_v2(body) is None
|
|
|
|
|
|
# ── decode_tran_initial: confirmed correct against ground truth ──────────────
|
|
|
|
# Bundled fixtures for the high-amplitude 5-11-26 events (PPV ~6-7 in/s).
|
|
# These cracked the Tran codec — see waveform_codec module docstring.
|
|
TRAN_INITIAL_FIXTURES = [
|
|
# (path, expected first N Tran samples in 16-count units, # of samples to verify)
|
|
(
|
|
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SP0"),
|
|
[4, 4, 3, 3, 3, 2, 2, 3, 2, 2, 2, 2, 1, 1, 1, 2, 1, 1, 1, 0, 1, 0],
|
|
22,
|
|
),
|
|
(
|
|
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SS0"),
|
|
[-89, -89, -91, -91, -92, -93, -94, -94, -94, -94],
|
|
42,
|
|
),
|
|
(
|
|
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SV0"),
|
|
[-745, -762, -771, -774, -779, -794, -808, -811, -811, -819],
|
|
46,
|
|
),
|
|
]
|
|
|
|
|
|
@pytest.mark.parametrize("path,expected,n_required", TRAN_INITIAL_FIXTURES)
|
|
def test_decode_tran_initial_matches_ground_truth(path, expected, n_required):
|
|
"""The Tran initial decoder produces values matching the BW ASCII export exactly."""
|
|
if not os.path.exists(path):
|
|
pytest.skip(f"fixture missing: {path}")
|
|
with open(path, "rb") as f:
|
|
raw = f.read()
|
|
body = raw[43:-26]
|
|
decoded = decode_tran_initial(body)
|
|
assert decoded is not None
|
|
# Check first len(expected) samples match exactly.
|
|
for i in range(len(expected)):
|
|
assert decoded[i] == expected[i], (
|
|
f"sample {i}: decoded={decoded[i]} expected={expected[i]}"
|
|
)
|
|
# And we got at least n_required samples decoded.
|
|
assert len(decoded) >= n_required, (
|
|
f"decoded only {len(decoded)} samples, expected at least {n_required}"
|
|
)
|
|
|
|
|
|
def test_decode_tran_initial_handles_empty():
|
|
assert decode_tran_initial(b"") is None
|
|
assert decode_tran_initial(b"not a body") is None
|
|
|
|
|
|
def test_decode_tran_initial_synthetic_body():
|
|
"""A synthetic body with preamble + one 10 04 block decodes correctly."""
|
|
# Magic + T[0]=10 + T[1]=20 in 16-count units.
|
|
# Then 10 04 block with 4 nibbles: (+1, -1, +2, -2)
|
|
# Encoded high-nibble first: 0x1F = (1, -1), 0x2E = (2, -2)
|
|
body = b"\x00\x02\x00\x00\x0a\x00\x14" + b"\x10\x04" + b"\x1f\x2e"
|
|
decoded = decode_tran_initial(body)
|
|
# T[0]=10, T[1]=20, then deltas (+1, -1, +2, -2) from T[1]=20
|
|
assert decoded == [10, 20, 21, 20, 22, 20]
|