""" Tests for minimateplus.waveform_codec — Blastware waveform-file body block walker. These tests lock in the STRUCTURAL framing of the body codec. The byte-to-sample mapping is open (see waveform_codec module docstring) — until that's nailed down, :func:`decode_waveform_v2` returns ``None`` and there is no per-sample assertion to make. """ from __future__ import annotations import os import pytest from minimateplus.waveform_codec import ( WaveformBlock, decode_tran_initial, decode_waveform_v2, find_data_start, parse_segment_header, split_segments, walk_body, ) FIXTURES = os.path.join( os.path.dirname(__file__), "fixtures", "decode-re-5-8-26" ) def _bw_body(path): """Strip the 22-byte header and 21-byte STRT and 26-byte footer to get the body.""" with open(path, "rb") as f: binary = f.read() return binary[43:-26] # Fixture metadata — bundled BW binaries from a real BE11529 unit, May 8 2026. # Each is paired with a Blastware TXT export (the ASCII ground truth). FIXTURES_INFO = { "event-a": { "filename": "M529LKVQ.6S0", "n_samples": 3328, # 3.0 s rectime + 0.25 s pretrig at 1024 sps "rectime": 3.0, }, "event-b": { "filename": "M529LK5Q.RG0", "n_samples": 2304, # 2.0 s "rectime": 2.0, }, "event-c": { "filename": "M529LK44.AB0", "n_samples": 1280, # 1.0 s "rectime": 1.0, }, "event-d": { "filename": "M529LK2V.470", "n_samples": 1280, "rectime": 1.0, }, } def _fixture_path(event_name): info = FIXTURES_INFO[event_name] return os.path.join(FIXTURES, event_name, info["filename"]) # ── Find data start ────────────────────────────────────────────────────────── @pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys())) def test_find_data_start_locates_first_block(event_name): """The walker auto-detects the first ``10 NN`` tag within the first 20 bytes.""" path = _fixture_path(event_name) if not os.path.exists(path): pytest.skip(f"fixture missing: {path}") body = _bw_body(path) start = find_data_start(body) assert 0 <= start < 20, f"expected start in [0, 20), got {start}" assert body[start] in (0x00, 0x10, 0x20, 0x30, 0x40), ( f"first tag byte 0x{body[start]:02x} not a recognized block type" ) assert body[start + 1] % 4 == 0 or (body[start] == 0x40 and body[start + 1] == 0x02) def test_find_data_start_canonical_offset_7(): """All events have a 7-byte preamble (3-byte magic + 4-byte Tran anchors).""" for name in FIXTURES_INFO: path = _fixture_path(name) if not os.path.exists(path): pytest.skip(f"fixture missing: {path}") body = _bw_body(path) # Sanity: magic assert body[0:3] == b"\x00\x02\x00", f"{name}: bad magic" # First tag at offset 7 assert find_data_start(body) == 7, f"{name}: expected start=7" # ── Block walker ───────────────────────────────────────────────────────────── def test_walk_body_empty_returns_empty(): assert walk_body(b"") == [] def test_walk_body_invalid_start_returns_empty(): # Body that does not begin with a recognized tag. assert walk_body(b"\xff\xff\xff\xff", start=0) == [] @pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys())) def test_walk_body_produces_blocks(event_name): """The walker should produce a non-empty stream of blocks for every fixture.""" path = _fixture_path(event_name) if not os.path.exists(path): pytest.skip(f"fixture missing: {path}") body = _bw_body(path) blocks = walk_body(body) assert len(blocks) > 0 # All blocks have one of the 5 known tag types. for b in blocks: assert b.tag_hi in (0x10, 0x20, 0x00, 0x30, 0x40), ( f"unknown tag {b.tag_hi:#04x} at offset {b.offset}" ) @pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys())) def test_walk_body_block_lengths_consistent(event_name): """Each block's recorded length matches its on-wire footprint.""" path = _fixture_path(event_name) if not os.path.exists(path): pytest.skip(f"fixture missing: {path}") body = _bw_body(path) blocks = walk_body(body) for b in blocks: # Tag (2 bytes) + payload should equal length. assert 2 + len(b.data) == b.length, ( f"block at {b.offset} length mismatch: tag(2) + data({len(b.data)}) != length({b.length})" ) @pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys())) def test_walk_body_blocks_contiguous(event_name): """Block n+1 starts exactly where block n ends (no gaps, no overlaps).""" path = _fixture_path(event_name) if not os.path.exists(path): pytest.skip(f"fixture missing: {path}") body = _bw_body(path) blocks = walk_body(body) for i in range(1, len(blocks)): prev = blocks[i - 1] cur = blocks[i] assert cur.offset == prev.offset + prev.length, ( f"gap/overlap between block {i-1} (off={prev.offset} len={prev.length}) " f"and block {i} (off={cur.offset})" ) # ── Segment splitting ──────────────────────────────────────────────────────── @pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys())) def test_split_segments_yields_at_least_one(event_name): path = _fixture_path(event_name) if not os.path.exists(path): pytest.skip(f"fixture missing: {path}") body = _bw_body(path) blocks = walk_body(body) segments = split_segments(blocks) assert len(segments) > 0 def test_split_segments_segment_count_at_least_one_per_event(): """The walker should produce at least one ``40 02`` segment header per event. Note: the walker currently bails out partway through event-b (still an open issue — the body codec uses block lengths the walker doesn't handle correctly past offset ~427). The other 3 events walk farther and have many segment headers. """ for name in FIXTURES_INFO: path = _fixture_path(name) if not os.path.exists(path): continue body = _bw_body(path) blocks = walk_body(body) n_40 = sum(1 for b in blocks if b.tag_hi == 0x40) assert n_40 >= 1, f"{name}: no 40 02 segment header found" # ── Segment header parsing ─────────────────────────────────────────────────── def test_parse_segment_header_returns_none_for_non_40(): block = WaveformBlock(offset=0, tag_hi=0x10, tag_lo=0x04, data=b"\x00\x00", length=4) assert parse_segment_header(block) is None def test_parse_segment_header_decodes_fields(): """Decode a known 40 02 block to verify field offsets.""" # First segment header from event-c at body offset 235: # 40 02 00 00 00 00 0a 4b 01 1e 47 00 00 00 02 00 00 01 00 01 payload = bytes.fromhex("00000000 0a4b011e 47000000 02000001 0001".replace(" ", "")) block = WaveformBlock( offset=235, tag_hi=0x40, tag_lo=0x02, data=payload, length=20 ) decoded = parse_segment_header(block) assert decoded is not None assert decoded["counter"] == 0x47 # uint32 LE assert decoded["fixed_pattern"] == b"\x02\x00\x00\x01" assert decoded["anchor_bytes"] == b"\x00\x00\x00\x00" def test_segment_counter_increments(): """The 4-byte counter at bytes [8:12] of each 40 02 payload increments by 1.""" path = _fixture_path("event-c") if not os.path.exists(path): pytest.skip("fixture missing") body = _bw_body(path) blocks = walk_body(body) headers = [b for b in blocks if b.tag_hi == 0x40 and b.tag_lo == 0x02] counters = [parse_segment_header(b)["counter"] for b in headers] assert len(counters) >= 5, "expect at least 5 segments to verify increments" # First few counters should be strictly monotonic (the BW counter is global, # incrementing across the whole flash buffer; some events may share counter # values with the previous event's tail block, so allow non-strict). for i in range(1, min(8, len(counters))): assert counters[i] >= counters[i - 1], ( f"counter went backwards: {counters[i-1]} → {counters[i]}" ) # ── decode_waveform_v2: currently a stub ───────────────────────────────────── @pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys())) def test_decode_waveform_v2_returns_none_until_verified(event_name): """ The full per-channel decoder is not yet wired up. This test ensures decode_waveform_v2 returns ``None`` so callers know to keep using the legacy decoder. When a verified decoder lands, flip this assertion and add ground-truth tests against the bundled TXT exports. """ path = _fixture_path(event_name) if not os.path.exists(path): pytest.skip(f"fixture missing: {path}") body = _bw_body(path) assert decode_waveform_v2(body) is None # ── decode_tran_initial: confirmed correct against ground truth ────────────── # Bundled fixtures for the high-amplitude 5-11-26 events (PPV ~6-7 in/s). # These cracked the Tran codec — see waveform_codec module docstring. TRAN_INITIAL_FIXTURES = [ # (path, expected first N Tran samples in 16-count units, # of samples to verify) ( os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SP0"), [4, 4, 3, 3, 3, 2, 2, 3, 2, 2, 2, 2, 1, 1, 1, 2, 1, 1, 1, 0, 1, 0], 22, ), ( os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SS0"), [-89, -89, -91, -91, -92, -93, -94, -94, -94, -94], 42, ), ( os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SV0"), [-745, -762, -771, -774, -779, -794, -808, -811, -811, -819], 46, ), # Vert-heavy event (T near zero) — segment 0 = 510 samples, all decode correctly. ( os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.JQ0"), [0] * 4 + [-1, 0, 0, -1, -1, 0], 38, ), # Mic-heavy event (geos all near zero) — segment 0 = 482 samples. ( os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.V70"), [0] * 10, 6, ), ] def _full_truth(path): """Load the BW ASCII truth for an event.""" import re with open(path + ".TXT", "r", encoding="utf-8", errors="replace") as f: lines = f.read().splitlines() # Find columns header. header_idx = None for i, line in enumerate(lines): if "Tran" in line and "Vert" in line and "Long" in line and "MicL" in line: header_idx = i break if header_idx is None: return None out = [] for line in lines[header_idx + 1:]: parts = re.split(r"\s+", line.strip()) if len(parts) < 4: continue try: out.append(round(float(parts[0]) * 200)) except ValueError: continue return out @pytest.mark.parametrize("path,expected,n_required", TRAN_INITIAL_FIXTURES) def test_decode_tran_initial_matches_ground_truth(path, expected, n_required): """The Tran initial decoder produces values matching the BW ASCII export exactly.""" if not os.path.exists(path): pytest.skip(f"fixture missing: {path}") with open(path, "rb") as f: raw = f.read() body = raw[43:-26] decoded = decode_tran_initial(body) assert decoded is not None # Check first len(expected) samples match exactly. for i in range(len(expected)): assert decoded[i] == expected[i], ( f"sample {i}: decoded={decoded[i]} expected={expected[i]}" ) # And we got at least n_required samples decoded. assert len(decoded) >= n_required, ( f"decoded only {len(decoded)} samples, expected at least {n_required}" ) def test_decode_tran_initial_handles_empty(): assert decode_tran_initial(b"") is None assert decode_tran_initial(b"not a body") is None def test_decode_tran_initial_synthetic_body(): """A synthetic body with preamble + one 10 04 block decodes correctly.""" # Magic + T[0]=10 + T[1]=20 in 16-count units. # Then 10 04 block with 4 nibbles: (+1, -1, +2, -2) # Encoded high-nibble first: 0x1F = (1, -1), 0x2E = (2, -2) body = b"\x00\x02\x00\x00\x0a\x00\x14" + b"\x10\x04" + b"\x1f\x2e" decoded = decode_tran_initial(body) # T[0]=10, T[1]=20, then deltas (+1, -1, +2, -2) from T[1]=20 assert decoded == [10, 20, 21, 20, 22, 20] def test_decode_tran_initial_with_rle(): """A synthetic body with 00 NN RLE block runs the current Tran value forward.""" # T[0]=5, T[1]=5, then 00 08 RLE block = 8 zero deltas → T[2..9] = 5 body = b"\x00\x02\x00\x00\x05\x00\x05" + b"\x00\x08" decoded = decode_tran_initial(body) assert decoded == [5, 5, 5, 5, 5, 5, 5, 5, 5, 5] def test_decode_tran_initial_full_segment_silent_events(): """For events with near-silent Tran, segment 0 (~482-510 samples) decodes fully.""" for path, _, _ in TRAN_INITIAL_FIXTURES[3:]: # JQ0 (Vert-heavy) and V70 (Mic-heavy) if not os.path.exists(path): pytest.skip(f"fixture missing: {path}") with open(path, "rb") as f: body = f.read()[43:-26] truth = _full_truth(path) decoded = decode_tran_initial(body) assert decoded is not None # The decoder should produce a clean run of samples; check ALL of them # match truth (segment 0 is fully solved for events where T is near zero). n = len(decoded) for i in range(n): assert decoded[i] == truth[i], ( f"{os.path.basename(path)}: sample {i}: decoded={decoded[i]} truth={truth[i]}" ) # And we should have decoded at least 400 samples (= segment 0 worth). assert n >= 400, f"only {n} samples decoded for {path}"