d506ebc103
the BE9558 / BE18003 extension-byte case The bytes at [7]/[11]/[15]/[19] are an annotation field (purpose still unclear — empirically non-zero on intervals with sub-Hz or unmeasurable freq), NOT the high byte of the peak count. The N844 fixture corpus the original RE was done against had zero values in those bytes for every block, so uint8 and uint16 LE were equivalent there — but on real BE9558 Tran-drift events and BE18003 Histogram+Continuous events the uint16 LE interpretation produced peaks up to 268 in/s and 35× inflated PVS sums. Cross-correlated against BW's per-interval ASCII export on: - K558LKZU/LL1P/LL3K → 100% T/V/L/M peak match (1435 blocks each) - T003LKZR/LL0O/LL1M → 100% T/V/L, 99.3% M (0.05 dB rounding only) - N599LKZS/LL0L → 100% all channels - N844 fixture corpus → 100% all channels (unchanged) Annotations preserved on every record for future RE; the defensive _MAX_PEAK_COUNT bound is no longer needed (uint8 maxes at 1.275 in/s, well below any physical limit). Synthetic regression test added using the verbatim K558LKZU.RE0H interval-12 block. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
386 lines
14 KiB
Python
386 lines
14 KiB
Python
"""
|
||
test_histogram_codec.py — regression locks for the histogram body codec.
|
||
|
||
The codec is verified byte-exact against BW's ASCII export across the
|
||
in-repo histogram fixture bundle. Each test cross-checks decoded
|
||
binary fields against the corresponding .TXT row.
|
||
|
||
Run:
|
||
python -m pytest tests/test_histogram_codec.py -q
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import re
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
import pytest
|
||
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
from minimateplus.blastware_file import _WAVEFORM_HEADER_SIZE
|
||
from minimateplus.histogram_codec import (
|
||
_BLOCK_SIZE,
|
||
decode_histogram_body,
|
||
decode_histogram_body_full,
|
||
geo_count_to_ins,
|
||
half_period_to_hz,
|
||
walk_body,
|
||
)
|
||
from minimateplus.waveform_codec import mic_count_to_db
|
||
|
||
|
||
_FIXTURE_DIR = Path(__file__).resolve().parent.parent / "example-events" / "histogram"
|
||
|
||
|
||
def _extract_body(path: Path) -> bytes:
|
||
"""Locate the body of a BW event file — bytes between the STRT
|
||
record and the 26-byte footer."""
|
||
raw = path.read_bytes()
|
||
body_start = _WAVEFORM_HEADER_SIZE + 21
|
||
pos = body_start
|
||
footer_pos = -1
|
||
while True:
|
||
pos = raw.find(b"\x0e\x08", pos)
|
||
if pos < 0 or pos + 26 > len(raw):
|
||
break
|
||
yr = (raw[pos + 4] << 8) | raw[pos + 5]
|
||
if 2015 <= yr <= 2050:
|
||
footer_pos = pos
|
||
break
|
||
pos += 1
|
||
if footer_pos < 0:
|
||
footer_pos = len(raw) - 26
|
||
return raw[body_start:footer_pos]
|
||
|
||
|
||
def _parse_txt_rows(path: Path) -> list[tuple[str, list]]:
|
||
"""Parse a histogram .TXT into ``[(time_str, [10 col values]), …]``.
|
||
|
||
Special tokens:
|
||
- ``">100"`` (the BW-display sentinel for freq > 100 Hz) → ``None``
|
||
- non-numeric → ``None``
|
||
"""
|
||
text = path.read_text()
|
||
lines = text.splitlines()
|
||
hdr = None
|
||
for i, line in enumerate(lines):
|
||
if re.match(r"^Tran\s+", line.strip()):
|
||
hdr = i + 3 # skip 2-row header + units row
|
||
break
|
||
if hdr is None:
|
||
return []
|
||
rows: list[tuple[str, list]] = []
|
||
for line in lines[hdr:]:
|
||
parts = line.split("\t")
|
||
if len(parts) != 11:
|
||
continue
|
||
vals: list = []
|
||
for p in parts[1:]:
|
||
s = p.strip()
|
||
if s.startswith(">"):
|
||
vals.append(None) # ">100 Hz" sentinel
|
||
continue
|
||
try:
|
||
vals.append(float(s))
|
||
except ValueError:
|
||
vals.append(None)
|
||
rows.append((parts[0].strip(), vals))
|
||
return rows
|
||
|
||
|
||
# ── Block-walker plumbing ────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.parametrize("fixture", [
|
||
"N844L20G.630H",
|
||
"N844L21H.2R0H",
|
||
"N844L6Z8.ZR0H",
|
||
"N844L6XE.BH0H",
|
||
"N844L23B.ND0H",
|
||
])
|
||
def test_walk_body_returns_records(fixture: str):
|
||
"""Walker yields at least one valid block per fixture."""
|
||
path = _FIXTURE_DIR / fixture
|
||
if not path.exists():
|
||
pytest.skip(f"fixture missing: {path}")
|
||
records = walk_body(_extract_body(path))
|
||
assert len(records) > 100, f"expected hundreds of blocks, got {len(records)}"
|
||
|
||
|
||
def test_walk_body_record_count_matches_txt_intervals():
|
||
"""Block count should match the .TXT interval count (off-by-one
|
||
at the tail is acceptable — last interval may be truncated at
|
||
recording stop)."""
|
||
bin_path = _FIXTURE_DIR / "N844L20G.630H"
|
||
txt_path = _FIXTURE_DIR / "N844L20G_630H_ASCII.TXT"
|
||
if not bin_path.exists() or not txt_path.exists():
|
||
pytest.skip("fixture missing")
|
||
records = walk_body(_extract_body(bin_path))
|
||
txt_rows = _parse_txt_rows(txt_path)
|
||
# Allow off-by-one (final block may have been mid-write at stop)
|
||
assert abs(len(records) - len(txt_rows)) <= 1, (
|
||
f"binary {len(records)} blocks vs TXT {len(txt_rows)} intervals"
|
||
)
|
||
|
||
|
||
def test_walk_body_segment_id_increments_every_256_blocks():
|
||
"""Segment ID advances 0→1→2→… after every 256 blocks within
|
||
one event."""
|
||
path = _FIXTURE_DIR / "N844L20G.630H"
|
||
if not path.exists():
|
||
pytest.skip("fixture missing")
|
||
records = walk_body(_extract_body(path))
|
||
# Group by segment_id and verify counts make sense
|
||
from collections import Counter
|
||
seg_counts = Counter(r["segment_id"] for r in records)
|
||
# First 3 segments should each have exactly 256 blocks (N844L20G has
|
||
# 791 blocks → 256+256+256+23 → segments 0/1/2/3)
|
||
assert seg_counts[0] == 256
|
||
assert seg_counts[1] == 256
|
||
assert seg_counts[2] == 256
|
||
assert seg_counts[3] == len(records) - 3 * 256
|
||
|
||
|
||
# ── Field-by-field decode verification against .TXT ground truth ─────────────
|
||
|
||
|
||
@pytest.mark.parametrize("fixture", [
|
||
"N844L20G.630H",
|
||
"N844L6Z8.ZR0H",
|
||
"N844L6XE.BH0H",
|
||
"N844L23B.ND0H",
|
||
])
|
||
def test_decoded_geo_peaks_match_txt(fixture: str):
|
||
"""For every block, decoded Tran/Vert/Long peak (count × 0.005)
|
||
matches the corresponding .TXT cell."""
|
||
bin_path = _FIXTURE_DIR / fixture
|
||
txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT")
|
||
if not bin_path.exists() or not txt_path.exists():
|
||
pytest.skip("fixture missing")
|
||
records = walk_body(_extract_body(bin_path))
|
||
txt_rows = _parse_txt_rows(txt_path)
|
||
n = min(len(records), len(txt_rows))
|
||
assert n > 0
|
||
for i in range(n):
|
||
rec = records[i]
|
||
_ts, txt = txt_rows[i]
|
||
# TXT cols 0/2/4 are T/V/L peak in in/s
|
||
for slot, key in (("T", "t_peak"), ("V", "v_peak"), ("L", "l_peak")):
|
||
col = {"T": 0, "V": 2, "L": 4}[slot]
|
||
decoded_ips = geo_count_to_ins(rec[key])
|
||
expected = txt[col]
|
||
assert abs(decoded_ips - expected) < 0.0005, (
|
||
f"{fixture} block {i} {slot}_peak: "
|
||
f"decoded={decoded_ips:.4f} vs txt={expected:.4f}"
|
||
)
|
||
|
||
|
||
@pytest.mark.parametrize("fixture", [
|
||
"N844L6Z8.ZR0H",
|
||
"N844L6XE.BH0H",
|
||
])
|
||
def test_decoded_geo_freqs_match_txt(fixture: str):
|
||
"""Decoded half-period → Hz matches the .TXT freq column for blocks
|
||
where the freq is in-range (not the `>100 Hz` sentinel)."""
|
||
bin_path = _FIXTURE_DIR / fixture
|
||
txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT")
|
||
if not bin_path.exists() or not txt_path.exists():
|
||
pytest.skip("fixture missing")
|
||
records = walk_body(_extract_body(bin_path))
|
||
txt_rows = _parse_txt_rows(txt_path)
|
||
n = min(len(records), len(txt_rows))
|
||
for i in range(n):
|
||
rec = records[i]
|
||
_ts, txt = txt_rows[i]
|
||
for slot, key, col in (("T", "t_halfp", 1), ("V", "v_halfp", 3), ("L", "l_halfp", 5)):
|
||
decoded_hz = half_period_to_hz(rec[key])
|
||
expected = txt[col]
|
||
if expected is None:
|
||
# TXT shows `>100 Hz` — codec should also yield None
|
||
assert decoded_hz is None or decoded_hz > 100, (
|
||
f"{fixture} block {i} {slot}_freq: codec says "
|
||
f"{decoded_hz} but TXT says >100"
|
||
)
|
||
continue
|
||
# TXT rounds; allow ±1 Hz
|
||
assert decoded_hz is not None
|
||
assert abs(decoded_hz - expected) < 1.0, (
|
||
f"{fixture} block {i} {slot}_freq: "
|
||
f"decoded={decoded_hz:.2f} Hz vs txt={expected:.2f} Hz"
|
||
)
|
||
|
||
|
||
@pytest.mark.parametrize("fixture", [
|
||
"N844L6XE.BH0H",
|
||
"N844L23B.ND0H",
|
||
"N844L6Z8.ZR0H",
|
||
])
|
||
def test_decoded_mic_db_matches_txt(fixture: str):
|
||
"""Decoded MicL peak count → dB(L) via mic_count_to_db matches
|
||
the .TXT dB(L) column."""
|
||
bin_path = _FIXTURE_DIR / fixture
|
||
txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT")
|
||
if not bin_path.exists() or not txt_path.exists():
|
||
pytest.skip("fixture missing")
|
||
records = walk_body(_extract_body(bin_path))
|
||
txt_rows = _parse_txt_rows(txt_path)
|
||
n = min(len(records), len(txt_rows))
|
||
for i in range(n):
|
||
rec = records[i]
|
||
_ts, txt = txt_rows[i]
|
||
# TXT col 8 = MicL dB(L)
|
||
decoded_db = mic_count_to_db(rec["m_peak"])
|
||
expected = txt[8]
|
||
if expected is None:
|
||
continue
|
||
# BW rounds to 1 decimal place for display. Tolerance 0.1 dB
|
||
# absorbs both rounding modes (truncate vs round-half-even).
|
||
assert abs(decoded_db - expected) < 0.1, (
|
||
f"{fixture} block {i} M_dB: "
|
||
f"decoded={decoded_db:.2f} dB vs txt={expected:.2f} dB"
|
||
)
|
||
|
||
|
||
@pytest.mark.parametrize("fixture", [
|
||
"N844L20G.630H",
|
||
"N844L6Z8.ZR0H",
|
||
])
|
||
def test_decoded_mic_freq_matches_txt(fixture: str):
|
||
"""Decoded MicL half-period → freq matches the .TXT col 9 freq."""
|
||
bin_path = _FIXTURE_DIR / fixture
|
||
txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT")
|
||
if not bin_path.exists() or not txt_path.exists():
|
||
pytest.skip("fixture missing")
|
||
records = walk_body(_extract_body(bin_path))
|
||
txt_rows = _parse_txt_rows(txt_path)
|
||
n = min(len(records), len(txt_rows))
|
||
for i in range(n):
|
||
rec = records[i]
|
||
_ts, txt = txt_rows[i]
|
||
decoded_hz = half_period_to_hz(rec["m_halfp"])
|
||
expected = txt[9]
|
||
if expected is None:
|
||
assert decoded_hz is None or decoded_hz > 100
|
||
continue
|
||
assert decoded_hz is not None
|
||
assert abs(decoded_hz - expected) < 1.0, (
|
||
f"{fixture} block {i} M_freq: "
|
||
f"decoded={decoded_hz:.2f} Hz vs txt={expected:.2f} Hz"
|
||
)
|
||
|
||
|
||
# ── Public API ───────────────────────────────────────────────────────────────
|
||
|
||
|
||
def test_decode_histogram_body_returns_four_channels():
|
||
"""The public API returns the standard 4-channel dict shape."""
|
||
path = _FIXTURE_DIR / "N844L20G.630H"
|
||
if not path.exists():
|
||
pytest.skip("fixture missing")
|
||
decoded = decode_histogram_body(_extract_body(path))
|
||
assert decoded is not None
|
||
assert set(decoded.keys()) == {"Tran", "Vert", "Long", "MicL"}
|
||
# All channels same length (one value per histogram interval)
|
||
n = len(decoded["Tran"])
|
||
assert all(len(decoded[ch]) == n for ch in ("Vert", "Long", "MicL"))
|
||
assert n > 100
|
||
|
||
|
||
def test_decode_histogram_body_returns_none_for_non_histogram():
|
||
"""A waveform-mode body (starts with 00 02 00) doesn't decode as
|
||
a histogram body."""
|
||
fake_waveform_body = b"\x00\x02\x00" + b"\x00" * 100
|
||
assert decode_histogram_body(fake_waveform_body) is None
|
||
|
||
|
||
def test_decode_histogram_body_returns_none_for_garbage():
|
||
"""Bytes that don't form valid blocks return None."""
|
||
assert decode_histogram_body(b"\xff" * 256) is None
|
||
|
||
|
||
def test_decode_histogram_body_full_preserves_frequency_data():
|
||
"""The structured-record API preserves the per-channel half-period
|
||
fields that the flat-channel API drops."""
|
||
path = _FIXTURE_DIR / "N844L20G.630H"
|
||
if not path.exists():
|
||
pytest.skip("fixture missing")
|
||
records = decode_histogram_body_full(_extract_body(path))
|
||
assert records is not None
|
||
r0 = records[0]
|
||
expected_fields = {
|
||
"segment_id", "block_ctr",
|
||
"t_peak", "t_halfp", "v_peak", "v_halfp",
|
||
"l_peak", "l_halfp", "m_peak", "m_halfp",
|
||
"meta_var",
|
||
}
|
||
assert set(r0.keys()) >= expected_fields
|
||
|
||
|
||
# ── Helpers ──────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def test_half_period_to_hz_sentinel():
|
||
"""Half-period ≤ 5 returns None (the `>100 Hz` sentinel)."""
|
||
assert half_period_to_hz(5) is None
|
||
assert half_period_to_hz(1) is None
|
||
# halfp=6 gives 512/6 = 85.3 Hz — below the >100 threshold
|
||
assert half_period_to_hz(6) == pytest.approx(85.33, abs=0.01)
|
||
|
||
|
||
def test_geo_count_to_ins_scale():
|
||
"""1 count = 0.005 in/s at Normal range."""
|
||
assert geo_count_to_ins(1) == pytest.approx(0.005)
|
||
assert geo_count_to_ins(10) == pytest.approx(0.050)
|
||
assert geo_count_to_ins(0) == 0.0
|
||
|
||
|
||
# ── Regression: peak is uint8 byte[N], NOT uint16 LE byte[N:N+2] ────────────
|
||
#
|
||
# Block taken verbatim from K558LKZU.RE0H (BE9558) interval 12 — a real
|
||
# field event where the Tran channel had developed a DC offset and was
|
||
# producing sub-Hz drift content the device couldn't characterize.
|
||
# The annotation byte at [7] = 0xd2 is non-zero in that case. The
|
||
# legacy codec read [6:8] as uint16 LE, producing T_peak = 53763 →
|
||
# 268 in/s — physically impossible and 35× too high for the actual
|
||
# 0.015 in/s value (T_lo = 3 alone gives the correct count).
|
||
# Verified against the paired BW ASCII export.
|
||
_K558_INTERVAL_12_BLOCK = bytes.fromhex(
|
||
"00 00 0c 01 0a 00 03 d2 45 00 02 00 02 00 02 00"
|
||
"02 00 10 00 06 00 00 00 0e 91 2f 00 1e 0a 00 00".replace(" ", "")
|
||
)
|
||
|
||
|
||
def test_extension_byte_does_not_inflate_peak():
|
||
"""The annotation byte at [7]/[11]/[15]/[19] must NOT contribute to
|
||
the peak count. Decoded T_peak must be 3 (uint8 byte[6]), NOT
|
||
53763 (uint16 LE byte[6:8])."""
|
||
body = _K558_INTERVAL_12_BLOCK
|
||
records = decode_histogram_body_full(body)
|
||
assert records is not None
|
||
assert len(records) == 1
|
||
r = records[0]
|
||
assert r["t_peak"] == 3, f"T_peak should be 3 (uint8), got {r['t_peak']}"
|
||
assert r["v_peak"] == 2
|
||
assert r["l_peak"] == 2
|
||
assert r["m_peak"] == 16
|
||
# Half-periods unchanged — still uint16 LE.
|
||
assert r["t_halfp"] == 0x0045 # 69 → 7.4 Hz
|
||
assert r["m_halfp"] == 6 # → 85.3 Hz
|
||
# Annotation byte is preserved (for future RE) but does not affect peak.
|
||
assert r["annotations"] == (0xd2, 0x00, 0x00, 0x00)
|
||
|
||
|
||
def test_extension_byte_decoded_to_correct_in_s():
|
||
"""End-to-end: the channel-grouped output for the K558 ext block
|
||
should give T = 3 counts = 0.015 in/s, not 53763 counts = 268 in/s."""
|
||
channels = decode_histogram_body(_K558_INTERVAL_12_BLOCK)
|
||
assert channels is not None
|
||
assert channels["Tran"] == [3]
|
||
assert geo_count_to_ins(channels["Tran"][0]) == pytest.approx(0.015)
|
||
assert channels["Vert"] == [2]
|
||
assert channels["Long"] == [2]
|
||
assert channels["MicL"] == [16]
|