Merge pull request 'merge full s3 codec decoded' (#23) from codec-re into main
Reviewed-on: #23
This commit was merged in pull request #23.
This commit is contained in:
Vendored
BIN
Binary file not shown.
Vendored
BIN
Binary file not shown.
+3386
File diff suppressed because it is too large
Load Diff
Vendored
BIN
Binary file not shown.
+3137
File diff suppressed because it is too large
Load Diff
Vendored
BIN
Binary file not shown.
+3137
File diff suppressed because it is too large
Load Diff
Vendored
BIN
Binary file not shown.
+3387
File diff suppressed because it is too large
Load Diff
Vendored
BIN
Binary file not shown.
+3387
File diff suppressed because it is too large
Load Diff
Binary file not shown.
+3386
File diff suppressed because it is too large
Load Diff
Binary file not shown.
File diff suppressed because it is too large
Load Diff
Binary file not shown.
File diff suppressed because it is too large
Load Diff
Binary file not shown.
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,518 @@
|
||||
"""
|
||||
Tests for minimateplus.waveform_codec — Blastware waveform-file body block walker.
|
||||
|
||||
These tests lock in the STRUCTURAL framing of the body codec. The byte-to-sample
|
||||
mapping is open (see waveform_codec module docstring) — until that's nailed down,
|
||||
:func:`decode_waveform_v2` returns ``None`` and there is no per-sample assertion
|
||||
to make.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
from minimateplus.waveform_codec import (
|
||||
WaveformBlock,
|
||||
decode_tran_initial,
|
||||
decode_waveform_v2,
|
||||
decoded_to_adc_counts,
|
||||
find_data_start,
|
||||
mic_count_to_db,
|
||||
parse_segment_header,
|
||||
split_segments,
|
||||
walk_body,
|
||||
)
|
||||
|
||||
|
||||
FIXTURES = os.path.join(
|
||||
os.path.dirname(__file__), "fixtures", "decode-re-5-8-26"
|
||||
)
|
||||
|
||||
|
||||
def _bw_body(path):
|
||||
"""Strip the 22-byte header and 21-byte STRT and 26-byte footer to get the body."""
|
||||
with open(path, "rb") as f:
|
||||
binary = f.read()
|
||||
return binary[43:-26]
|
||||
|
||||
|
||||
# Fixture metadata — bundled BW binaries from a real BE11529 unit, May 8 2026.
|
||||
# Each is paired with a Blastware TXT export (the ASCII ground truth).
|
||||
FIXTURES_INFO = {
|
||||
"event-a": {
|
||||
"filename": "M529LKVQ.6S0",
|
||||
"n_samples": 3328, # 3.0 s rectime + 0.25 s pretrig at 1024 sps
|
||||
"rectime": 3.0,
|
||||
},
|
||||
"event-b": {
|
||||
"filename": "M529LK5Q.RG0",
|
||||
"n_samples": 2304, # 2.0 s
|
||||
"rectime": 2.0,
|
||||
},
|
||||
"event-c": {
|
||||
"filename": "M529LK44.AB0",
|
||||
"n_samples": 1280, # 1.0 s
|
||||
"rectime": 1.0,
|
||||
},
|
||||
"event-d": {
|
||||
"filename": "M529LK2V.470",
|
||||
"n_samples": 1280,
|
||||
"rectime": 1.0,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _fixture_path(event_name):
|
||||
info = FIXTURES_INFO[event_name]
|
||||
return os.path.join(FIXTURES, event_name, info["filename"])
|
||||
|
||||
|
||||
# ── Find data start ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
||||
def test_find_data_start_locates_first_block(event_name):
|
||||
"""The walker auto-detects the first ``10 NN`` tag within the first 20 bytes."""
|
||||
path = _fixture_path(event_name)
|
||||
if not os.path.exists(path):
|
||||
pytest.skip(f"fixture missing: {path}")
|
||||
body = _bw_body(path)
|
||||
start = find_data_start(body)
|
||||
assert 0 <= start < 20, f"expected start in [0, 20), got {start}"
|
||||
assert body[start] in (0x00, 0x10, 0x20, 0x30, 0x40), (
|
||||
f"first tag byte 0x{body[start]:02x} not a recognized block type"
|
||||
)
|
||||
assert body[start + 1] % 4 == 0 or (body[start] == 0x40 and body[start + 1] == 0x02)
|
||||
|
||||
|
||||
def test_find_data_start_canonical_offset_7():
|
||||
"""All events have a 7-byte preamble (3-byte magic + 4-byte Tran anchors)."""
|
||||
for name in FIXTURES_INFO:
|
||||
path = _fixture_path(name)
|
||||
if not os.path.exists(path):
|
||||
pytest.skip(f"fixture missing: {path}")
|
||||
body = _bw_body(path)
|
||||
# Sanity: magic
|
||||
assert body[0:3] == b"\x00\x02\x00", f"{name}: bad magic"
|
||||
# First tag at offset 7
|
||||
assert find_data_start(body) == 7, f"{name}: expected start=7"
|
||||
|
||||
|
||||
# ── Block walker ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_walk_body_empty_returns_empty():
|
||||
assert walk_body(b"") == []
|
||||
|
||||
|
||||
def test_walk_body_invalid_start_returns_empty():
|
||||
# Body that does not begin with a recognized tag.
|
||||
assert walk_body(b"\xff\xff\xff\xff", start=0) == []
|
||||
|
||||
|
||||
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
||||
def test_walk_body_produces_blocks(event_name):
|
||||
"""The walker should produce a non-empty stream of blocks for every fixture."""
|
||||
path = _fixture_path(event_name)
|
||||
if not os.path.exists(path):
|
||||
pytest.skip(f"fixture missing: {path}")
|
||||
body = _bw_body(path)
|
||||
blocks = walk_body(body)
|
||||
assert len(blocks) > 0
|
||||
# All blocks have one of the known tag families. ``1X NN`` / ``2X NN``
|
||||
# with X in 0..F are valid (X > 0 means wide-NN encoding).
|
||||
for b in blocks:
|
||||
assert (b.tag_hi & 0xF0) in (0x10, 0x20, 0x00, 0x30, 0x40), (
|
||||
f"unknown tag {b.tag_hi:#04x} at offset {b.offset}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
||||
def test_walk_body_block_lengths_consistent(event_name):
|
||||
"""Each block's recorded length matches its on-wire footprint."""
|
||||
path = _fixture_path(event_name)
|
||||
if not os.path.exists(path):
|
||||
pytest.skip(f"fixture missing: {path}")
|
||||
body = _bw_body(path)
|
||||
blocks = walk_body(body)
|
||||
for b in blocks:
|
||||
# Tag (2 bytes) + payload should equal length.
|
||||
assert 2 + len(b.data) == b.length, (
|
||||
f"block at {b.offset} length mismatch: tag(2) + data({len(b.data)}) != length({b.length})"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
||||
def test_walk_body_blocks_contiguous(event_name):
|
||||
"""Block n+1 starts exactly where block n ends (no gaps, no overlaps)."""
|
||||
path = _fixture_path(event_name)
|
||||
if not os.path.exists(path):
|
||||
pytest.skip(f"fixture missing: {path}")
|
||||
body = _bw_body(path)
|
||||
blocks = walk_body(body)
|
||||
for i in range(1, len(blocks)):
|
||||
prev = blocks[i - 1]
|
||||
cur = blocks[i]
|
||||
assert cur.offset == prev.offset + prev.length, (
|
||||
f"gap/overlap between block {i-1} (off={prev.offset} len={prev.length}) "
|
||||
f"and block {i} (off={cur.offset})"
|
||||
)
|
||||
|
||||
|
||||
# ── Segment splitting ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
||||
def test_split_segments_yields_at_least_one(event_name):
|
||||
path = _fixture_path(event_name)
|
||||
if not os.path.exists(path):
|
||||
pytest.skip(f"fixture missing: {path}")
|
||||
body = _bw_body(path)
|
||||
blocks = walk_body(body)
|
||||
segments = split_segments(blocks)
|
||||
assert len(segments) > 0
|
||||
|
||||
|
||||
def test_split_segments_segment_count_at_least_one_per_event():
|
||||
"""The walker should produce at least one ``40 02`` segment header per event.
|
||||
|
||||
Note: the walker currently bails out partway through event-b (still an
|
||||
open issue — the body codec uses block lengths the walker doesn't
|
||||
handle correctly past offset ~427). The other 3 events walk farther
|
||||
and have many segment headers.
|
||||
"""
|
||||
for name in FIXTURES_INFO:
|
||||
path = _fixture_path(name)
|
||||
if not os.path.exists(path):
|
||||
continue
|
||||
body = _bw_body(path)
|
||||
blocks = walk_body(body)
|
||||
n_40 = sum(1 for b in blocks if b.tag_hi == 0x40)
|
||||
assert n_40 >= 1, f"{name}: no 40 02 segment header found"
|
||||
|
||||
|
||||
# ── Segment header parsing ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_parse_segment_header_returns_none_for_non_40():
|
||||
block = WaveformBlock(offset=0, tag_hi=0x10, tag_lo=0x04, data=b"\x00\x00", length=4)
|
||||
assert parse_segment_header(block) is None
|
||||
|
||||
|
||||
def test_parse_segment_header_decodes_fields():
|
||||
"""Decode a known 40 02 block to verify field offsets."""
|
||||
# First segment header from event-c at body offset 235:
|
||||
# 40 02 00 00 00 00 0a 4b 01 1e 47 00 00 00 02 00 00 01 00 01
|
||||
payload = bytes.fromhex("00000000 0a4b011e 47000000 02000001 0001".replace(" ", ""))
|
||||
block = WaveformBlock(
|
||||
offset=235, tag_hi=0x40, tag_lo=0x02, data=payload, length=20
|
||||
)
|
||||
decoded = parse_segment_header(block)
|
||||
assert decoded is not None
|
||||
assert decoded["counter"] == 0x47 # uint32 LE
|
||||
assert decoded["fixed_pattern"] == b"\x02\x00\x00\x01"
|
||||
assert decoded["anchor_bytes"] == b"\x00\x00\x00\x00"
|
||||
|
||||
|
||||
def test_segment_counter_increments():
|
||||
"""The 4-byte counter at bytes [8:12] of each 40 02 payload increments by 1."""
|
||||
path = _fixture_path("event-c")
|
||||
if not os.path.exists(path):
|
||||
pytest.skip("fixture missing")
|
||||
body = _bw_body(path)
|
||||
blocks = walk_body(body)
|
||||
headers = [b for b in blocks if b.tag_hi == 0x40 and b.tag_lo == 0x02]
|
||||
counters = [parse_segment_header(b)["counter"] for b in headers]
|
||||
assert len(counters) >= 5, "expect at least 5 segments to verify increments"
|
||||
# First few counters should be strictly monotonic (the BW counter is global,
|
||||
# incrementing across the whole flash buffer; some events may share counter
|
||||
# values with the previous event's tail block, so allow non-strict).
|
||||
for i in range(1, min(8, len(counters))):
|
||||
assert counters[i] >= counters[i - 1], (
|
||||
f"counter went backwards: {counters[i-1]} → {counters[i]}"
|
||||
)
|
||||
|
||||
|
||||
# ── decode_waveform_v2: currently a stub ─────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.parametrize("event_name", list(FIXTURES_INFO.keys()))
|
||||
def test_decode_waveform_v2_returns_dict(event_name):
|
||||
"""decode_waveform_v2 returns a dict with all 4 channels (verified 2026-05-11)."""
|
||||
path = _fixture_path(event_name)
|
||||
if not os.path.exists(path):
|
||||
pytest.skip(f"fixture missing: {path}")
|
||||
body = _bw_body(path)
|
||||
result = decode_waveform_v2(body)
|
||||
assert result is not None
|
||||
assert set(result.keys()) == {"Tran", "Vert", "Long", "MicL"}
|
||||
|
||||
|
||||
# Multi-channel ground-truth fixtures. Each row: (path, channel, n_to_verify).
|
||||
# These lock in the channel-rotation hypothesis: segments cycle T → V → L → M,
|
||||
# with each segment header carrying a 2-sample anchor pair (bytes [14:18])
|
||||
# for THIS segment's channel plus 2 continuation deltas (bytes [0:4]) for
|
||||
# the PREVIOUS channel.
|
||||
MULTICHANNEL_FIXTURES = [
|
||||
# ALL geo channels fully decoded for every event in the bundle:
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.V70"), "Tran", 3328),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.V70"), "Vert", 3328),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.V70"), "Long", 3328),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.JQ0"), "Tran", 3328),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.JQ0"), "Vert", 3328),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.JQ0"), "Long", 3328),
|
||||
# SP0 (loud all-channels): NOW fully decodes after the wide-NN walker fix.
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SP0"), "Tran", 3328),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SP0"), "Vert", 3328),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SP0"), "Long", 3328),
|
||||
# SS0 / SV0 (loud-from-start): walker now reaches 3072–3078 samples per
|
||||
# channel (out of 3079 total). A few tail samples still missing.
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SS0"), "Tran", 3078),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SS0"), "Vert", 3072),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SS0"), "Long", 3072),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SV0"), "Tran", 3078),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SV0"), "Vert", 3072),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SV0"), "Long", 3072),
|
||||
# 5-8-26 quiet bundle: events without 30 NN blocks decode FULLY across all channels.
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||||
"event-a", "M529LKVQ.6S0"), "Tran", 3328),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||||
"event-a", "M529LKVQ.6S0"), "Vert", 3328),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||||
"event-a", "M529LKVQ.6S0"), "Long", 3328),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||||
"event-c", "M529LK44.AB0"), "Tran", 1280),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||||
"event-c", "M529LK44.AB0"), "Vert", 1280),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||||
"event-c", "M529LK44.AB0"), "Long", 1280),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||||
"event-d", "M529LK2V.470"), "Tran", 1280),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||||
"event-d", "M529LK2V.470"), "Vert", 1280),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||||
"event-d", "M529LK2V.470"), "Long", 1280),
|
||||
# event-b: 2304 samples × 3 — now fully decodes (was the historical
|
||||
# walker-stop case; fixed by wide-NN tag support).
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||||
"event-b", "M529LK5Q.RG0"), "Tran", 2304),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||||
"event-b", "M529LK5Q.RG0"), "Vert", 2304),
|
||||
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
|
||||
"event-b", "M529LK5Q.RG0"), "Long", 2304),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("path,channel,n", MULTICHANNEL_FIXTURES)
|
||||
def test_decode_waveform_v2_channels_match_truth(path, channel, n):
|
||||
"""Decoded channels match the BW ASCII export byte-exact for the verified ranges."""
|
||||
if not os.path.exists(path):
|
||||
pytest.skip(f"fixture missing: {path}")
|
||||
with open(path, "rb") as f:
|
||||
body = f.read()[43:-26]
|
||||
truth = _full_truth_channel(path, channel)
|
||||
decoded = decode_waveform_v2(body)
|
||||
assert decoded is not None
|
||||
pred = decoded[channel]
|
||||
assert len(pred) >= n, f"only {len(pred)} samples decoded, expected ≥ {n}"
|
||||
for i in range(n):
|
||||
assert pred[i] == truth[i], (
|
||||
f"{os.path.basename(path)} {channel}[{i}]: pred={pred[i]} truth={truth[i]}"
|
||||
)
|
||||
|
||||
|
||||
# ── decode_tran_initial: confirmed correct against ground truth ──────────────
|
||||
|
||||
# Bundled fixtures for the high-amplitude 5-11-26 events (PPV ~6-7 in/s).
|
||||
# These cracked the Tran codec — see waveform_codec module docstring.
|
||||
TRAN_INITIAL_FIXTURES = [
|
||||
# (path, expected first N Tran samples in 16-count units, # of samples to verify)
|
||||
(
|
||||
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SP0"),
|
||||
[4, 4, 3, 3, 3, 2, 2, 3, 2, 2, 2, 2, 1, 1, 1, 2, 1, 1, 1, 0, 1, 0],
|
||||
22,
|
||||
),
|
||||
(
|
||||
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SS0"),
|
||||
[-89, -89, -91, -91, -92, -93, -94, -94, -94, -94],
|
||||
42,
|
||||
),
|
||||
(
|
||||
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SV0"),
|
||||
[-745, -762, -771, -774, -779, -794, -808, -811, -811, -819],
|
||||
46,
|
||||
),
|
||||
# Vert-heavy event (T near zero) — segment 0 = 510 samples, all decode correctly.
|
||||
(
|
||||
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.JQ0"),
|
||||
[0] * 4 + [-1, 0, 0, -1, -1, 0],
|
||||
38,
|
||||
),
|
||||
# Mic-heavy event (geos all near zero) — segment 0 = 482 samples.
|
||||
(
|
||||
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.V70"),
|
||||
[0] * 10,
|
||||
6,
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def _full_truth(path):
|
||||
"""Load Tran samples (in 16-count units) from the BW ASCII export."""
|
||||
return _full_truth_channel(path, "Tran")
|
||||
|
||||
|
||||
def _full_truth_channel(path, channel):
|
||||
"""Load one channel's samples (in 16-count units) from the BW ASCII export."""
|
||||
import glob, re
|
||||
col_idx = {"Tran": 0, "Vert": 1, "Long": 2, "MicL": 3}[channel]
|
||||
# event-a's TXT has a typo ("M59" vs "M529") — pick the .TXT in the same dir
|
||||
# rather than assuming exact-name correspondence.
|
||||
txt_path = path + ".TXT"
|
||||
if not os.path.exists(txt_path):
|
||||
candidates = glob.glob(os.path.join(os.path.dirname(path), "*.TXT"))
|
||||
if candidates:
|
||||
txt_path = candidates[0]
|
||||
with open(txt_path, "r", encoding="utf-8", errors="replace") as f:
|
||||
lines = f.read().splitlines()
|
||||
header_idx = None
|
||||
for i, line in enumerate(lines):
|
||||
if "Tran" in line and "Vert" in line and "Long" in line and "MicL" in line:
|
||||
header_idx = i
|
||||
break
|
||||
if header_idx is None:
|
||||
return None
|
||||
out = []
|
||||
for line in lines[header_idx + 1:]:
|
||||
parts = re.split(r"\s+", line.strip())
|
||||
if len(parts) < 4:
|
||||
continue
|
||||
try:
|
||||
out.append(round(float(parts[col_idx]) * 200))
|
||||
except ValueError:
|
||||
continue
|
||||
return out
|
||||
|
||||
|
||||
@pytest.mark.parametrize("path,expected,n_required", TRAN_INITIAL_FIXTURES)
|
||||
def test_decode_tran_initial_matches_ground_truth(path, expected, n_required):
|
||||
"""The Tran initial decoder produces values matching the BW ASCII export exactly."""
|
||||
if not os.path.exists(path):
|
||||
pytest.skip(f"fixture missing: {path}")
|
||||
with open(path, "rb") as f:
|
||||
raw = f.read()
|
||||
body = raw[43:-26]
|
||||
decoded = decode_tran_initial(body)
|
||||
assert decoded is not None
|
||||
# Check first len(expected) samples match exactly.
|
||||
for i in range(len(expected)):
|
||||
assert decoded[i] == expected[i], (
|
||||
f"sample {i}: decoded={decoded[i]} expected={expected[i]}"
|
||||
)
|
||||
# And we got at least n_required samples decoded.
|
||||
assert len(decoded) >= n_required, (
|
||||
f"decoded only {len(decoded)} samples, expected at least {n_required}"
|
||||
)
|
||||
|
||||
|
||||
def test_decode_tran_initial_handles_empty():
|
||||
assert decode_tran_initial(b"") is None
|
||||
assert decode_tran_initial(b"not a body") is None
|
||||
|
||||
|
||||
def test_decode_tran_initial_synthetic_body():
|
||||
"""A synthetic body with preamble + one 10 04 block decodes correctly."""
|
||||
# Magic + T[0]=10 + T[1]=20 in 16-count units.
|
||||
# Then 10 04 block with 4 nibbles: (+1, -1, +2, -2)
|
||||
# Encoded high-nibble first: 0x1F = (1, -1), 0x2E = (2, -2)
|
||||
body = b"\x00\x02\x00\x00\x0a\x00\x14" + b"\x10\x04" + b"\x1f\x2e"
|
||||
decoded = decode_tran_initial(body)
|
||||
# T[0]=10, T[1]=20, then deltas (+1, -1, +2, -2) from T[1]=20
|
||||
assert decoded == [10, 20, 21, 20, 22, 20]
|
||||
|
||||
|
||||
def test_decode_tran_initial_with_rle():
|
||||
"""A synthetic body with 00 NN RLE block runs the current Tran value forward."""
|
||||
# T[0]=5, T[1]=5, then 00 08 RLE block = 8 zero deltas → T[2..9] = 5
|
||||
body = b"\x00\x02\x00\x00\x05\x00\x05" + b"\x00\x08"
|
||||
decoded = decode_tran_initial(body)
|
||||
assert decoded == [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
|
||||
|
||||
|
||||
def test_decode_tran_initial_full_segment_silent_events():
|
||||
"""For events with near-silent Tran, segment 0 (~482-510 samples) decodes fully."""
|
||||
for path, _, _ in TRAN_INITIAL_FIXTURES[3:]: # JQ0 (Vert-heavy) and V70 (Mic-heavy)
|
||||
if not os.path.exists(path):
|
||||
pytest.skip(f"fixture missing: {path}")
|
||||
with open(path, "rb") as f:
|
||||
body = f.read()[43:-26]
|
||||
truth = _full_truth(path)
|
||||
decoded = decode_tran_initial(body)
|
||||
assert decoded is not None
|
||||
# The decoder should produce a clean run of samples; check ALL of them
|
||||
# match truth (segment 0 is fully solved for events where T is near zero).
|
||||
n = len(decoded)
|
||||
for i in range(n):
|
||||
assert decoded[i] == truth[i], (
|
||||
f"{os.path.basename(path)}: sample {i}: decoded={decoded[i]} truth={truth[i]}"
|
||||
)
|
||||
# And we should have decoded at least 400 samples (= segment 0 worth).
|
||||
assert n >= 400, f"only {n} samples decoded for {path}"
|
||||
|
||||
|
||||
# ── ADC scaling + dB conversion ──────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_decoded_to_adc_counts_geo_scales_by_16():
|
||||
"""Geo channels in decoder units (16-count) should multiply by 16 to ADC."""
|
||||
decoded = {"Tran": [0, 1, -2, 100], "Vert": [5], "Long": [-10], "MicL": [813]}
|
||||
adc = decoded_to_adc_counts(decoded)
|
||||
assert adc["Tran"] == [0, 16, -32, 1600]
|
||||
assert adc["Vert"] == [80]
|
||||
assert adc["Long"] == [-160]
|
||||
# Mic passes through unchanged (already ADC counts).
|
||||
assert adc["MicL"] == [813]
|
||||
|
||||
|
||||
def test_decoded_to_adc_counts_empty():
|
||||
assert decoded_to_adc_counts({}) == {}
|
||||
assert decoded_to_adc_counts(
|
||||
{"Tran": [], "Vert": [], "Long": [], "MicL": []}
|
||||
) == {"Tran": [], "Vert": [], "Long": [], "MicL": []}
|
||||
|
||||
|
||||
def test_mic_count_to_db_zero_is_zero():
|
||||
assert mic_count_to_db(0) == 0.0
|
||||
|
||||
|
||||
def test_mic_count_to_db_unit_is_reference():
|
||||
"""count = ±1 → ±81.94 dB (the calibration reference)."""
|
||||
assert abs(mic_count_to_db(1) - 81.94) < 0.01
|
||||
assert abs(mic_count_to_db(-1) - (-81.94)) < 0.01
|
||||
|
||||
|
||||
def test_mic_count_to_db_doubles_every_6db():
|
||||
"""Each doubling of |count| adds ~6.02 dB."""
|
||||
# count=2 → 87.96 dB (+ 6.02 from 81.94)
|
||||
assert abs(mic_count_to_db(2) - 87.96) < 0.05
|
||||
# count=4 → 93.98 dB
|
||||
assert abs(mic_count_to_db(4) - 93.98) < 0.05
|
||||
# count=8 → 100.00 dB
|
||||
assert abs(mic_count_to_db(8) - 100.00) < 0.05
|
||||
|
||||
|
||||
def test_mic_count_to_db_v70_peak():
|
||||
"""V70 mic peak count 813 → 140.14 dB (matches BW reported PSPL 140.1)."""
|
||||
assert abs(mic_count_to_db(813) - 140.14) < 0.1
|
||||
# And the negative-direction equivalent
|
||||
assert abs(mic_count_to_db(-813) - (-140.14)) < 0.1
|
||||
|
||||
|
||||
# ── End-to-end: decode_a5_frames (production entry point) ───────────────────
|
||||
|
||||
|
||||
def test_decode_a5_frames_empty():
|
||||
from minimateplus.waveform_codec import decode_a5_frames
|
||||
assert decode_a5_frames([]) is None
|
||||
assert decode_a5_frames(None) is None
|
||||
Reference in New Issue
Block a user