""" test_histogram_codec.py — regression locks for the histogram body codec. The codec is verified byte-exact against BW's ASCII export across the in-repo histogram fixture bundle. Each test cross-checks decoded binary fields against the corresponding .TXT row. Run: python -m pytest tests/test_histogram_codec.py -q """ from __future__ import annotations import os import re import sys from pathlib import Path import pytest sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from minimateplus.blastware_file import _WAVEFORM_HEADER_SIZE from minimateplus.histogram_codec import ( _BLOCK_SIZE, decode_histogram_body, decode_histogram_body_full, geo_count_to_ins, half_period_to_hz, walk_body, ) from minimateplus.waveform_codec import mic_count_to_db _FIXTURE_DIR = Path(__file__).resolve().parent.parent / "example-events" / "histogram" def _extract_body(path: Path) -> bytes: """Locate the body of a BW event file — bytes between the STRT record and the 26-byte footer.""" raw = path.read_bytes() body_start = _WAVEFORM_HEADER_SIZE + 21 pos = body_start footer_pos = -1 while True: pos = raw.find(b"\x0e\x08", pos) if pos < 0 or pos + 26 > len(raw): break yr = (raw[pos + 4] << 8) | raw[pos + 5] if 2015 <= yr <= 2050: footer_pos = pos break pos += 1 if footer_pos < 0: footer_pos = len(raw) - 26 return raw[body_start:footer_pos] def _parse_txt_rows(path: Path) -> list[tuple[str, list]]: """Parse a histogram .TXT into ``[(time_str, [10 col values]), …]``. Special tokens: - ``">100"`` (the BW-display sentinel for freq > 100 Hz) → ``None`` - non-numeric → ``None`` """ text = path.read_text() lines = text.splitlines() hdr = None for i, line in enumerate(lines): if re.match(r"^Tran\s+", line.strip()): hdr = i + 3 # skip 2-row header + units row break if hdr is None: return [] rows: list[tuple[str, list]] = [] for line in lines[hdr:]: parts = line.split("\t") if len(parts) != 11: continue vals: list = [] for p in parts[1:]: s = p.strip() if s.startswith(">"): vals.append(None) # ">100 Hz" sentinel continue try: vals.append(float(s)) except ValueError: vals.append(None) rows.append((parts[0].strip(), vals)) return rows # ── Block-walker plumbing ──────────────────────────────────────────────────── @pytest.mark.parametrize("fixture", [ "N844L20G.630H", "N844L21H.2R0H", "N844L6Z8.ZR0H", "N844L6XE.BH0H", "N844L23B.ND0H", ]) def test_walk_body_returns_records(fixture: str): """Walker yields at least one valid block per fixture.""" path = _FIXTURE_DIR / fixture if not path.exists(): pytest.skip(f"fixture missing: {path}") records = walk_body(_extract_body(path)) assert len(records) > 100, f"expected hundreds of blocks, got {len(records)}" def test_walk_body_record_count_matches_txt_intervals(): """Block count should match the .TXT interval count (off-by-one at the tail is acceptable — last interval may be truncated at recording stop).""" bin_path = _FIXTURE_DIR / "N844L20G.630H" txt_path = _FIXTURE_DIR / "N844L20G_630H_ASCII.TXT" if not bin_path.exists() or not txt_path.exists(): pytest.skip("fixture missing") records = walk_body(_extract_body(bin_path)) txt_rows = _parse_txt_rows(txt_path) # Allow off-by-one (final block may have been mid-write at stop) assert abs(len(records) - len(txt_rows)) <= 1, ( f"binary {len(records)} blocks vs TXT {len(txt_rows)} intervals" ) def test_walk_body_segment_id_increments_every_256_blocks(): """Segment ID advances 0→1→2→… after every 256 blocks within one event.""" path = _FIXTURE_DIR / "N844L20G.630H" if not path.exists(): pytest.skip("fixture missing") records = walk_body(_extract_body(path)) # Group by segment_id and verify counts make sense from collections import Counter seg_counts = Counter(r["segment_id"] for r in records) # First 3 segments should each have exactly 256 blocks (N844L20G has # 791 blocks → 256+256+256+23 → segments 0/1/2/3) assert seg_counts[0] == 256 assert seg_counts[1] == 256 assert seg_counts[2] == 256 assert seg_counts[3] == len(records) - 3 * 256 # ── Field-by-field decode verification against .TXT ground truth ───────────── @pytest.mark.parametrize("fixture", [ "N844L20G.630H", "N844L6Z8.ZR0H", "N844L6XE.BH0H", "N844L23B.ND0H", ]) def test_decoded_geo_peaks_match_txt(fixture: str): """For every block, decoded Tran/Vert/Long peak (count × 0.005) matches the corresponding .TXT cell.""" bin_path = _FIXTURE_DIR / fixture txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT") if not bin_path.exists() or not txt_path.exists(): pytest.skip("fixture missing") records = walk_body(_extract_body(bin_path)) txt_rows = _parse_txt_rows(txt_path) n = min(len(records), len(txt_rows)) assert n > 0 for i in range(n): rec = records[i] _ts, txt = txt_rows[i] # TXT cols 0/2/4 are T/V/L peak in in/s for slot, key in (("T", "t_peak"), ("V", "v_peak"), ("L", "l_peak")): col = {"T": 0, "V": 2, "L": 4}[slot] decoded_ips = geo_count_to_ins(rec[key]) expected = txt[col] assert abs(decoded_ips - expected) < 0.0005, ( f"{fixture} block {i} {slot}_peak: " f"decoded={decoded_ips:.4f} vs txt={expected:.4f}" ) @pytest.mark.parametrize("fixture", [ "N844L6Z8.ZR0H", "N844L6XE.BH0H", ]) def test_decoded_geo_freqs_match_txt(fixture: str): """Decoded half-period → Hz matches the .TXT freq column for blocks where the freq is in-range (not the `>100 Hz` sentinel).""" bin_path = _FIXTURE_DIR / fixture txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT") if not bin_path.exists() or not txt_path.exists(): pytest.skip("fixture missing") records = walk_body(_extract_body(bin_path)) txt_rows = _parse_txt_rows(txt_path) n = min(len(records), len(txt_rows)) for i in range(n): rec = records[i] _ts, txt = txt_rows[i] for slot, key, col in (("T", "t_halfp", 1), ("V", "v_halfp", 3), ("L", "l_halfp", 5)): decoded_hz = half_period_to_hz(rec[key]) expected = txt[col] if expected is None: # TXT shows `>100 Hz` — codec should also yield None assert decoded_hz is None or decoded_hz > 100, ( f"{fixture} block {i} {slot}_freq: codec says " f"{decoded_hz} but TXT says >100" ) continue # TXT rounds; allow ±1 Hz assert decoded_hz is not None assert abs(decoded_hz - expected) < 1.0, ( f"{fixture} block {i} {slot}_freq: " f"decoded={decoded_hz:.2f} Hz vs txt={expected:.2f} Hz" ) @pytest.mark.parametrize("fixture", [ "N844L6XE.BH0H", "N844L23B.ND0H", "N844L6Z8.ZR0H", ]) def test_decoded_mic_db_matches_txt(fixture: str): """Decoded MicL peak count → dB(L) via mic_count_to_db matches the .TXT dB(L) column.""" bin_path = _FIXTURE_DIR / fixture txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT") if not bin_path.exists() or not txt_path.exists(): pytest.skip("fixture missing") records = walk_body(_extract_body(bin_path)) txt_rows = _parse_txt_rows(txt_path) n = min(len(records), len(txt_rows)) for i in range(n): rec = records[i] _ts, txt = txt_rows[i] # TXT col 8 = MicL dB(L) decoded_db = mic_count_to_db(rec["m_peak"]) expected = txt[8] if expected is None: continue # BW rounds to 1 decimal place for display. Tolerance 0.1 dB # absorbs both rounding modes (truncate vs round-half-even). assert abs(decoded_db - expected) < 0.1, ( f"{fixture} block {i} M_dB: " f"decoded={decoded_db:.2f} dB vs txt={expected:.2f} dB" ) @pytest.mark.parametrize("fixture", [ "N844L20G.630H", "N844L6Z8.ZR0H", ]) def test_decoded_mic_freq_matches_txt(fixture: str): """Decoded MicL half-period → freq matches the .TXT col 9 freq.""" bin_path = _FIXTURE_DIR / fixture txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT") if not bin_path.exists() or not txt_path.exists(): pytest.skip("fixture missing") records = walk_body(_extract_body(bin_path)) txt_rows = _parse_txt_rows(txt_path) n = min(len(records), len(txt_rows)) for i in range(n): rec = records[i] _ts, txt = txt_rows[i] decoded_hz = half_period_to_hz(rec["m_halfp"]) expected = txt[9] if expected is None: assert decoded_hz is None or decoded_hz > 100 continue assert decoded_hz is not None assert abs(decoded_hz - expected) < 1.0, ( f"{fixture} block {i} M_freq: " f"decoded={decoded_hz:.2f} Hz vs txt={expected:.2f} Hz" ) # ── Public API ─────────────────────────────────────────────────────────────── def test_decode_histogram_body_returns_four_channels(): """The public API returns the standard 4-channel dict shape.""" path = _FIXTURE_DIR / "N844L20G.630H" if not path.exists(): pytest.skip("fixture missing") decoded = decode_histogram_body(_extract_body(path)) assert decoded is not None assert set(decoded.keys()) == {"Tran", "Vert", "Long", "MicL"} # All channels same length (one value per histogram interval) n = len(decoded["Tran"]) assert all(len(decoded[ch]) == n for ch in ("Vert", "Long", "MicL")) assert n > 100 def test_decode_histogram_body_returns_none_for_non_histogram(): """A waveform-mode body (starts with 00 02 00) doesn't decode as a histogram body.""" fake_waveform_body = b"\x00\x02\x00" + b"\x00" * 100 assert decode_histogram_body(fake_waveform_body) is None def test_decode_histogram_body_returns_none_for_garbage(): """Bytes that don't form valid blocks return None.""" assert decode_histogram_body(b"\xff" * 256) is None def test_decode_histogram_body_full_preserves_frequency_data(): """The structured-record API preserves the per-channel half-period fields that the flat-channel API drops.""" path = _FIXTURE_DIR / "N844L20G.630H" if not path.exists(): pytest.skip("fixture missing") records = decode_histogram_body_full(_extract_body(path)) assert records is not None r0 = records[0] expected_fields = { "segment_id", "block_ctr", "t_peak", "t_halfp", "v_peak", "v_halfp", "l_peak", "l_halfp", "m_peak", "m_halfp", "meta_var", } assert set(r0.keys()) >= expected_fields # ── Helpers ────────────────────────────────────────────────────────────────── def test_half_period_to_hz_sentinel(): """Half-period ≤ 5 returns None (the `>100 Hz` sentinel).""" assert half_period_to_hz(5) is None assert half_period_to_hz(1) is None # halfp=6 gives 512/6 = 85.3 Hz — below the >100 threshold assert half_period_to_hz(6) == pytest.approx(85.33, abs=0.01) def test_geo_count_to_ins_scale(): """1 count = 0.005 in/s at Normal range.""" assert geo_count_to_ins(1) == pytest.approx(0.005) assert geo_count_to_ins(10) == pytest.approx(0.050) assert geo_count_to_ins(0) == 0.0