0466bb4f44
When NN exceeds 0xFC, the codec extends to 12-bit NN by using the
low nibble of the TYPE byte as the high nibble of NN:
1X NN → nibble-delta block, NN = (X << 8) | NN_byte
2X NN → int8-delta block, same NN encoding
Walker and decode_waveform_v2 now handle both narrow (X=0) and wide
(X != 0) forms uniformly.
Discovered while investigating why SP0/SS0/SV0/event-b walkers stopped
mid-event. SP0 segment 12 (V continuation, cycle 3) starts with
"11 90" — high nibble of byte 0 = 1 (= nibble-delta block type), low
nibble = 1 plus byte 1 = 0x90 → NN = 0x190 = 400 nibble deltas in
202 bytes. Walker was rejecting "11" as a non-tag.
Sample count went from 47,364 to 72,972 verified byte-exact:
event-a: 9984 (full) was 9984 (full)
event-b: 6912 (full) was 738
event-c: 3840 (full) was 3840 (full)
event-d: 3840 (full) was 3840 (full)
JQ0: 9984 (full) was 9984 (full)
V70: 9984 (full) was 9984 (full)
SP0: 9984 (full) was 5122
SS0: 9222 (-7 tail) was 1758
SV0: 9222 (-7 tail) was 2114
7 of 9 fixtures now decode end-to-end across all 3 geo channels.
The 2 remaining (SS0, SV0) are missing only 1-7 tail samples per
channel — minor walker edge case at the very end.
74 tests pass (was 71).
519 lines
22 KiB
Python
519 lines
22 KiB
Python
"""
|
||
Tests for minimateplus.waveform_codec — Blastware waveform-file body block walker.
|
||
|
||
These tests lock in the STRUCTURAL framing of the body codec. The byte-to-sample
|
||
mapping is open (see waveform_codec module docstring) — until that's nailed down,
|
||
:func:`decode_waveform_v2` returns ``None`` and there is no per-sample assertion
|
||
to make.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
|
||
import pytest
|
||
|
||
from minimateplus.waveform_codec import (
|
||
WaveformBlock,
|
||
decode_tran_initial,
|
||
decode_waveform_v2,
|
||
decoded_to_adc_counts,
|
||
find_data_start,
|
||
mic_count_to_db,
|
||
parse_segment_header,
|
||
split_segments,
|
||
walk_body,
|
||
)
|
||
|
||
|
||
FIXTURES = os.path.join(
|
||
os.path.dirname(__file__), "fixtures", "decode-re-5-8-26"
|
||
)
|
||
|
||
|
||
def _bw_body(path):
|
||
"""Strip the 22-byte header and 21-byte STRT and 26-byte footer to get the body."""
|
||
with open(path, "rb") as f:
|
||
binary = f.read()
|
||
return binary[43:-26]
|
||
|
||
|
||
# Fixture metadata — bundled BW binaries from a real BE11529 unit, May 8 2026.
|
||
# Each is paired with a Blastware TXT export (the ASCII ground truth).
|
||
FIXTURES_INFO = {
|
||
"event-a": {
|
||
"filename": "M529LKVQ.6S0",
|
||
"n_samples": 3328, # 3.0 s rectime + 0.25 s pretrig at 1024 sps
|
||
"rectime": 3.0,
|
||
},
|
||
"event-b": {
|
||
"filename": "M529LK5Q.RG0",
|
||
"n_samples": 2304, # 2.0 s
|
||
"rectime": 2.0,
|
||
},
|
||
"event-c": {
|
||
"filename": "M529LK44.AB0",
|
||
"n_samples": 1280, # 1.0 s
|
||
"rectime": 1.0,
|
||
},
|
||
"event-d": {
|
||
"filename": "M529LK2V.470",
|
||
"n_samples": 1280,
|
||
"rectime": 1.0,
|
||
},
|
||
}
|
||
|
||
|
||
def _fixture_path(event_name):
|
||
info = FIXTURES_INFO[event_name]
|
||
return os.path.join(FIXTURES, event_name, info["filename"])
|
||
|
||
|
||
# ── Find data start ──────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
||
def test_find_data_start_locates_first_block(event_name):
|
||
"""The walker auto-detects the first ``10 NN`` tag within the first 20 bytes."""
|
||
path = _fixture_path(event_name)
|
||
if not os.path.exists(path):
|
||
pytest.skip(f"fixture missing: {path}")
|
||
body = _bw_body(path)
|
||
start = find_data_start(body)
|
||
assert 0 <= start < 20, f"expected start in [0, 20), got {start}"
|
||
assert body[start] in (0x00, 0x10, 0x20, 0x30, 0x40), (
|
||
f"first tag byte 0x{body[start]:02x} not a recognized block type"
|
||
)
|
||
assert body[start + 1] % 4 == 0 or (body[start] == 0x40 and body[start + 1] == 0x02)
|
||
|
||
|
||
def test_find_data_start_canonical_offset_7():
|
||
"""All events have a 7-byte preamble (3-byte magic + 4-byte Tran anchors)."""
|
||
for name in FIXTURES_INFO:
|
||
path = _fixture_path(name)
|
||
if not os.path.exists(path):
|
||
pytest.skip(f"fixture missing: {path}")
|
||
body = _bw_body(path)
|
||
# Sanity: magic
|
||
assert body[0:3] == b"\x00\x02\x00", f"{name}: bad magic"
|
||
# First tag at offset 7
|
||
assert find_data_start(body) == 7, f"{name}: expected start=7"
|
||
|
||
|
||
# ── Block walker ─────────────────────────────────────────────────────────────
|
||
|
||
|
||
def test_walk_body_empty_returns_empty():
|
||
assert walk_body(b"") == []
|
||
|
||
|
||
def test_walk_body_invalid_start_returns_empty():
|
||
# Body that does not begin with a recognized tag.
|
||
assert walk_body(b"\xff\xff\xff\xff", start=0) == []
|
||
|
||
|
||
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
||
def test_walk_body_produces_blocks(event_name):
|
||
"""The walker should produce a non-empty stream of blocks for every fixture."""
|
||
path = _fixture_path(event_name)
|
||
if not os.path.exists(path):
|
||
pytest.skip(f"fixture missing: {path}")
|
||
body = _bw_body(path)
|
||
blocks = walk_body(body)
|
||
assert len(blocks) > 0
|
||
# All blocks have one of the known tag families. ``1X NN`` / ``2X NN``
|
||
# with X in 0..F are valid (X > 0 means wide-NN encoding).
|
||
for b in blocks:
|
||
assert (b.tag_hi & 0xF0) in (0x10, 0x20, 0x00, 0x30, 0x40), (
|
||
f"unknown tag {b.tag_hi:#04x} at offset {b.offset}"
|
||
)
|
||
|
||
|
||
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
||
def test_walk_body_block_lengths_consistent(event_name):
|
||
"""Each block's recorded length matches its on-wire footprint."""
|
||
path = _fixture_path(event_name)
|
||
if not os.path.exists(path):
|
||
pytest.skip(f"fixture missing: {path}")
|
||
body = _bw_body(path)
|
||
blocks = walk_body(body)
|
||
for b in blocks:
|
||
# Tag (2 bytes) + payload should equal length.
|
||
assert 2 + len(b.data) == b.length, (
|
||
f"block at {b.offset} length mismatch: tag(2) + data({len(b.data)}) != length({b.length})"
|
||
)
|
||
|
||
|
||
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
||
def test_walk_body_blocks_contiguous(event_name):
|
||
"""Block n+1 starts exactly where block n ends (no gaps, no overlaps)."""
|
||
path = _fixture_path(event_name)
|
||
if not os.path.exists(path):
|
||
pytest.skip(f"fixture missing: {path}")
|
||
body = _bw_body(path)
|
||
blocks = walk_body(body)
|
||
for i in range(1, len(blocks)):
|
||
prev = blocks[i - 1]
|
||
cur = blocks[i]
|
||
assert cur.offset == prev.offset + prev.length, (
|
||
f"gap/overlap between block {i-1} (off={prev.offset} len={prev.length}) "
|
||
f"and block {i} (off={cur.offset})"
|
||
)
|
||
|
||
|
||
# ── Segment splitting ────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
||
def test_split_segments_yields_at_least_one(event_name):
|
||
path = _fixture_path(event_name)
|
||
if not os.path.exists(path):
|
||
pytest.skip(f"fixture missing: {path}")
|
||
body = _bw_body(path)
|
||
blocks = walk_body(body)
|
||
segments = split_segments(blocks)
|
||
assert len(segments) > 0
|
||
|
||
|
||
def test_split_segments_segment_count_at_least_one_per_event():
|
||
"""The walker should produce at least one ``40 02`` segment header per event.
|
||
|
||
Note: the walker currently bails out partway through event-b (still an
|
||
open issue — the body codec uses block lengths the walker doesn't
|
||
handle correctly past offset ~427). The other 3 events walk farther
|
||
and have many segment headers.
|
||
"""
|
||
for name in FIXTURES_INFO:
|
||
path = _fixture_path(name)
|
||
if not os.path.exists(path):
|
||
continue
|
||
body = _bw_body(path)
|
||
blocks = walk_body(body)
|
||
n_40 = sum(1 for b in blocks if b.tag_hi == 0x40)
|
||
assert n_40 >= 1, f"{name}: no 40 02 segment header found"
|
||
|
||
|
||
# ── Segment header parsing ───────────────────────────────────────────────────
|
||
|
||
|
||
def test_parse_segment_header_returns_none_for_non_40():
|
||
block = WaveformBlock(offset=0, tag_hi=0x10, tag_lo=0x04, data=b"\x00\x00", length=4)
|
||
assert parse_segment_header(block) is None
|
||
|
||
|
||
def test_parse_segment_header_decodes_fields():
|
||
"""Decode a known 40 02 block to verify field offsets."""
|
||
# First segment header from event-c at body offset 235:
|
||
# 40 02 00 00 00 00 0a 4b 01 1e 47 00 00 00 02 00 00 01 00 01
|
||
payload = bytes.fromhex("00000000 0a4b011e 47000000 02000001 0001".replace(" ", ""))
|
||
block = WaveformBlock(
|
||
offset=235, tag_hi=0x40, tag_lo=0x02, data=payload, length=20
|
||
)
|
||
decoded = parse_segment_header(block)
|
||
assert decoded is not None
|
||
assert decoded["counter"] == 0x47 # uint32 LE
|
||
assert decoded["fixed_pattern"] == b"\x02\x00\x00\x01"
|
||
assert decoded["anchor_bytes"] == b"\x00\x00\x00\x00"
|
||
|
||
|
||
def test_segment_counter_increments():
|
||
"""The 4-byte counter at bytes [8:12] of each 40 02 payload increments by 1."""
|
||
path = _fixture_path("event-c")
|
||
if not os.path.exists(path):
|
||
pytest.skip("fixture missing")
|
||
body = _bw_body(path)
|
||
blocks = walk_body(body)
|
||
headers = [b for b in blocks if b.tag_hi == 0x40 and b.tag_lo == 0x02]
|
||
counters = [parse_segment_header(b)["counter"] for b in headers]
|
||
assert len(counters) >= 5, "expect at least 5 segments to verify increments"
|
||
# First few counters should be strictly monotonic (the BW counter is global,
|
||
# incrementing across the whole flash buffer; some events may share counter
|
||
# values with the previous event's tail block, so allow non-strict).
|
||
for i in range(1, min(8, len(counters))):
|
||
assert counters[i] >= counters[i - 1], (
|
||
f"counter went backwards: {counters[i-1]} → {counters[i]}"
|
||
)
|
||
|
||
|
||
# ── decode_waveform_v2: currently a stub ─────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
||
def test_decode_waveform_v2_returns_dict(event_name):
|
||
"""decode_waveform_v2 returns a dict with all 4 channels (verified 2026-05-11)."""
|
||
path = _fixture_path(event_name)
|
||
if not os.path.exists(path):
|
||
pytest.skip(f"fixture missing: {path}")
|
||
body = _bw_body(path)
|
||
result = decode_waveform_v2(body)
|
||
assert result is not None
|
||
assert set(result.keys()) == {"Tran", "Vert", "Long", "MicL"}
|
||
|
||
|
||
# Multi-channel ground-truth fixtures. Each row: (path, channel, n_to_verify).
|
||
# These lock in the channel-rotation hypothesis: segments cycle T → V → L → M,
|
||
# with each segment header carrying a 2-sample anchor pair (bytes [14:18])
|
||
# for THIS segment's channel plus 2 continuation deltas (bytes [0:4]) for
|
||
# the PREVIOUS channel.
|
||
MULTICHANNEL_FIXTURES = [
|
||
# ALL geo channels fully decoded for every event in the bundle:
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.V70"), "Tran", 3328),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.V70"), "Vert", 3328),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.V70"), "Long", 3328),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.JQ0"), "Tran", 3328),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.JQ0"), "Vert", 3328),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.JQ0"), "Long", 3328),
|
||
# SP0 (loud all-channels): NOW fully decodes after the wide-NN walker fix.
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SP0"), "Tran", 3328),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SP0"), "Vert", 3328),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SP0"), "Long", 3328),
|
||
# SS0 / SV0 (loud-from-start): walker now reaches 3072–3078 samples per
|
||
# channel (out of 3079 total). A few tail samples still missing.
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SS0"), "Tran", 3078),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SS0"), "Vert", 3072),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SS0"), "Long", 3072),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SV0"), "Tran", 3078),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SV0"), "Vert", 3072),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SV0"), "Long", 3072),
|
||
# 5-8-26 quiet bundle: events without 30 NN blocks decode FULLY across all channels.
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||
"event-a", "M529LKVQ.6S0"), "Tran", 3328),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||
"event-a", "M529LKVQ.6S0"), "Vert", 3328),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||
"event-a", "M529LKVQ.6S0"), "Long", 3328),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||
"event-c", "M529LK44.AB0"), "Tran", 1280),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||
"event-c", "M529LK44.AB0"), "Vert", 1280),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||
"event-c", "M529LK44.AB0"), "Long", 1280),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||
"event-d", "M529LK2V.470"), "Tran", 1280),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||
"event-d", "M529LK2V.470"), "Vert", 1280),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||
"event-d", "M529LK2V.470"), "Long", 1280),
|
||
# event-b: 2304 samples × 3 — now fully decodes (was the historical
|
||
# walker-stop case; fixed by wide-NN tag support).
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||
"event-b", "M529LK5Q.RG0"), "Tran", 2304),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||
"event-b", "M529LK5Q.RG0"), "Vert", 2304),
|
||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||
"event-b", "M529LK5Q.RG0"), "Long", 2304),
|
||
]
|
||
|
||
|
||
@pytest.mark.parametrize("path,channel,n", MULTICHANNEL_FIXTURES)
|
||
def test_decode_waveform_v2_channels_match_truth(path, channel, n):
|
||
"""Decoded channels match the BW ASCII export byte-exact for the verified ranges."""
|
||
if not os.path.exists(path):
|
||
pytest.skip(f"fixture missing: {path}")
|
||
with open(path, "rb") as f:
|
||
body = f.read()[43:-26]
|
||
truth = _full_truth_channel(path, channel)
|
||
decoded = decode_waveform_v2(body)
|
||
assert decoded is not None
|
||
pred = decoded[channel]
|
||
assert len(pred) >= n, f"only {len(pred)} samples decoded, expected ≥ {n}"
|
||
for i in range(n):
|
||
assert pred[i] == truth[i], (
|
||
f"{os.path.basename(path)} {channel}[{i}]: pred={pred[i]} truth={truth[i]}"
|
||
)
|
||
|
||
|
||
# ── decode_tran_initial: confirmed correct against ground truth ──────────────
|
||
|
||
# Bundled fixtures for the high-amplitude 5-11-26 events (PPV ~6-7 in/s).
|
||
# These cracked the Tran codec — see waveform_codec module docstring.
|
||
TRAN_INITIAL_FIXTURES = [
|
||
# (path, expected first N Tran samples in 16-count units, # of samples to verify)
|
||
(
|
||
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SP0"),
|
||
[4, 4, 3, 3, 3, 2, 2, 3, 2, 2, 2, 2, 1, 1, 1, 2, 1, 1, 1, 0, 1, 0],
|
||
22,
|
||
),
|
||
(
|
||
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SS0"),
|
||
[-89, -89, -91, -91, -92, -93, -94, -94, -94, -94],
|
||
42,
|
||
),
|
||
(
|
||
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SV0"),
|
||
[-745, -762, -771, -774, -779, -794, -808, -811, -811, -819],
|
||
46,
|
||
),
|
||
# Vert-heavy event (T near zero) — segment 0 = 510 samples, all decode correctly.
|
||
(
|
||
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.JQ0"),
|
||
[0] * 4 + [-1, 0, 0, -1, -1, 0],
|
||
38,
|
||
),
|
||
# Mic-heavy event (geos all near zero) — segment 0 = 482 samples.
|
||
(
|
||
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.V70"),
|
||
[0] * 10,
|
||
6,
|
||
),
|
||
]
|
||
|
||
|
||
def _full_truth(path):
|
||
"""Load Tran samples (in 16-count units) from the BW ASCII export."""
|
||
return _full_truth_channel(path, "Tran")
|
||
|
||
|
||
def _full_truth_channel(path, channel):
|
||
"""Load one channel's samples (in 16-count units) from the BW ASCII export."""
|
||
import glob, re
|
||
col_idx = {"Tran": 0, "Vert": 1, "Long": 2, "MicL": 3}[channel]
|
||
# event-a's TXT has a typo ("M59" vs "M529") — pick the .TXT in the same dir
|
||
# rather than assuming exact-name correspondence.
|
||
txt_path = path + ".TXT"
|
||
if not os.path.exists(txt_path):
|
||
candidates = glob.glob(os.path.join(os.path.dirname(path), "*.TXT"))
|
||
if candidates:
|
||
txt_path = candidates[0]
|
||
with open(txt_path, "r", encoding="utf-8", errors="replace") as f:
|
||
lines = f.read().splitlines()
|
||
header_idx = None
|
||
for i, line in enumerate(lines):
|
||
if "Tran" in line and "Vert" in line and "Long" in line and "MicL" in line:
|
||
header_idx = i
|
||
break
|
||
if header_idx is None:
|
||
return None
|
||
out = []
|
||
for line in lines[header_idx + 1:]:
|
||
parts = re.split(r"\s+", line.strip())
|
||
if len(parts) < 4:
|
||
continue
|
||
try:
|
||
out.append(round(float(parts[col_idx]) * 200))
|
||
except ValueError:
|
||
continue
|
||
return out
|
||
|
||
|
||
@pytest.mark.parametrize("path,expected,n_required", TRAN_INITIAL_FIXTURES)
|
||
def test_decode_tran_initial_matches_ground_truth(path, expected, n_required):
|
||
"""The Tran initial decoder produces values matching the BW ASCII export exactly."""
|
||
if not os.path.exists(path):
|
||
pytest.skip(f"fixture missing: {path}")
|
||
with open(path, "rb") as f:
|
||
raw = f.read()
|
||
body = raw[43:-26]
|
||
decoded = decode_tran_initial(body)
|
||
assert decoded is not None
|
||
# Check first len(expected) samples match exactly.
|
||
for i in range(len(expected)):
|
||
assert decoded[i] == expected[i], (
|
||
f"sample {i}: decoded={decoded[i]} expected={expected[i]}"
|
||
)
|
||
# And we got at least n_required samples decoded.
|
||
assert len(decoded) >= n_required, (
|
||
f"decoded only {len(decoded)} samples, expected at least {n_required}"
|
||
)
|
||
|
||
|
||
def test_decode_tran_initial_handles_empty():
|
||
assert decode_tran_initial(b"") is None
|
||
assert decode_tran_initial(b"not a body") is None
|
||
|
||
|
||
def test_decode_tran_initial_synthetic_body():
|
||
"""A synthetic body with preamble + one 10 04 block decodes correctly."""
|
||
# Magic + T[0]=10 + T[1]=20 in 16-count units.
|
||
# Then 10 04 block with 4 nibbles: (+1, -1, +2, -2)
|
||
# Encoded high-nibble first: 0x1F = (1, -1), 0x2E = (2, -2)
|
||
body = b"\x00\x02\x00\x00\x0a\x00\x14" + b"\x10\x04" + b"\x1f\x2e"
|
||
decoded = decode_tran_initial(body)
|
||
# T[0]=10, T[1]=20, then deltas (+1, -1, +2, -2) from T[1]=20
|
||
assert decoded == [10, 20, 21, 20, 22, 20]
|
||
|
||
|
||
def test_decode_tran_initial_with_rle():
|
||
"""A synthetic body with 00 NN RLE block runs the current Tran value forward."""
|
||
# T[0]=5, T[1]=5, then 00 08 RLE block = 8 zero deltas → T[2..9] = 5
|
||
body = b"\x00\x02\x00\x00\x05\x00\x05" + b"\x00\x08"
|
||
decoded = decode_tran_initial(body)
|
||
assert decoded == [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
|
||
|
||
|
||
def test_decode_tran_initial_full_segment_silent_events():
|
||
"""For events with near-silent Tran, segment 0 (~482-510 samples) decodes fully."""
|
||
for path, _, _ in TRAN_INITIAL_FIXTURES[3:]: # JQ0 (Vert-heavy) and V70 (Mic-heavy)
|
||
if not os.path.exists(path):
|
||
pytest.skip(f"fixture missing: {path}")
|
||
with open(path, "rb") as f:
|
||
body = f.read()[43:-26]
|
||
truth = _full_truth(path)
|
||
decoded = decode_tran_initial(body)
|
||
assert decoded is not None
|
||
# The decoder should produce a clean run of samples; check ALL of them
|
||
# match truth (segment 0 is fully solved for events where T is near zero).
|
||
n = len(decoded)
|
||
for i in range(n):
|
||
assert decoded[i] == truth[i], (
|
||
f"{os.path.basename(path)}: sample {i}: decoded={decoded[i]} truth={truth[i]}"
|
||
)
|
||
# And we should have decoded at least 400 samples (= segment 0 worth).
|
||
assert n >= 400, f"only {n} samples decoded for {path}"
|
||
|
||
|
||
# ── ADC scaling + dB conversion ──────────────────────────────────────────────
|
||
|
||
|
||
def test_decoded_to_adc_counts_geo_scales_by_16():
|
||
"""Geo channels in decoder units (16-count) should multiply by 16 to ADC."""
|
||
decoded = {"Tran": [0, 1, -2, 100], "Vert": [5], "Long": [-10], "MicL": [813]}
|
||
adc = decoded_to_adc_counts(decoded)
|
||
assert adc["Tran"] == [0, 16, -32, 1600]
|
||
assert adc["Vert"] == [80]
|
||
assert adc["Long"] == [-160]
|
||
# Mic passes through unchanged (already ADC counts).
|
||
assert adc["MicL"] == [813]
|
||
|
||
|
||
def test_decoded_to_adc_counts_empty():
|
||
assert decoded_to_adc_counts({}) == {}
|
||
assert decoded_to_adc_counts(
|
||
{"Tran": [], "Vert": [], "Long": [], "MicL": []}
|
||
) == {"Tran": [], "Vert": [], "Long": [], "MicL": []}
|
||
|
||
|
||
def test_mic_count_to_db_zero_is_zero():
|
||
assert mic_count_to_db(0) == 0.0
|
||
|
||
|
||
def test_mic_count_to_db_unit_is_reference():
|
||
"""count = ±1 → ±81.94 dB (the calibration reference)."""
|
||
assert abs(mic_count_to_db(1) - 81.94) < 0.01
|
||
assert abs(mic_count_to_db(-1) - (-81.94)) < 0.01
|
||
|
||
|
||
def test_mic_count_to_db_doubles_every_6db():
|
||
"""Each doubling of |count| adds ~6.02 dB."""
|
||
# count=2 → 87.96 dB (+ 6.02 from 81.94)
|
||
assert abs(mic_count_to_db(2) - 87.96) < 0.05
|
||
# count=4 → 93.98 dB
|
||
assert abs(mic_count_to_db(4) - 93.98) < 0.05
|
||
# count=8 → 100.00 dB
|
||
assert abs(mic_count_to_db(8) - 100.00) < 0.05
|
||
|
||
|
||
def test_mic_count_to_db_v70_peak():
|
||
"""V70 mic peak count 813 → 140.14 dB (matches BW reported PSPL 140.1)."""
|
||
assert abs(mic_count_to_db(813) - 140.14) < 0.1
|
||
# And the negative-direction equivalent
|
||
assert abs(mic_count_to_db(-813) - (-140.14)) < 0.1
|
||
|
||
|
||
# ── End-to-end: decode_a5_frames (production entry point) ───────────────────
|
||
|
||
|
||
def test_decode_a5_frames_empty():
|
||
from minimateplus.waveform_codec import decode_a5_frames
|
||
assert decode_a5_frames([]) is None
|
||
assert decode_a5_frames(None) is None
|