Files
seismo-relay/tests/test_histogram_codec.py
T
serversdown 7183b953e4 minimateplus: histogram body codec — FULLY DECODED
The histogram-mode event body is now byte-exact decodable.
Companion to the waveform body codec — together they cover every
event file the watcher forwards.  Cracked in one session via
cross-event correlation against BW's ASCII export.

The §7.6.2 spec in instantel_protocol_reference.md was structurally
correct (32-byte blocks) but the per-sample semantics were
under-documented.  Cross-checking block 130 of N844L6Z8.ZR0H
against its TXT row revealed the layout perfectly:

  slot[0] = 10 (constant marker)
  slot[1] = T_peak_count    (× 0.005 → in/s at Normal range)
  slot[2] = T_halfperiod    (freq_Hz = 512 / halfp)
  slot[3] = V_peak_count
  slot[4] = V_halfperiod
  slot[5] = L_peak_count
  slot[6] = L_halfperiod
  slot[7] = MicL_peak_count (dB via waveform_codec.mic_count_to_db)
  slot[8] = MicL_halfperiod

The `>100 Hz` sentinel is halfperiod ≤ 5 (since 512/5 = 100 Hz).
Mic dB uses the SAME formula as the waveform codec (sign × (81.94
+ 20·log10(|count|))) — they share the mic ADC calibration constant.

Block identification anchor: bytes [22:24] == 0x0000 AND
bytes [28:32] == 1e 0a 00 00.  The tail signature is the most
reliable distinguisher from non-block content in the file.

Files:

  minimateplus/histogram_codec.py (new) — decoder + public API
    matching the waveform codec's shape:
      walk_body(body) -> records
      decode_histogram_body(body) -> {Tran, Vert, Long, MicL}
      decode_histogram_body_full(body) -> [per-interval dicts]
      half_period_to_hz, geo_count_to_ins helpers

  minimateplus/event_file_io.py (modified) — read_blastware_file
    now tries the waveform codec first, falls back to the histogram
    codec on failure.  Same output shape, same downstream pipeline.

  tests/test_histogram_codec.py (new) — 24 regression locks against
    the in-repo fixture corpus, byte-exact against BW ASCII export
    for peaks (all 4 channels), frequencies (all 4 channels,
    including >100 Hz sentinel handling), block framing, and
    segment-ID accounting.

  scripts/backfill_sidecars.py (modified) — the has_samples
    short-circuit added in the histogram-pending era is now a
    pure defensive guard.  Histograms in prod will regen .h5 files
    correctly on the next backfill run.

  docs/histogram_codec_re_status.md (updated) — supersedes the
    earlier "in progress" version with the verified format and
    test-coverage summary.  Notes a few non-essential fields still
    open (4-byte block metadata, Geo PVS, Mic psi(L) — none of
    which are needed for waveform reconstruction).

Total verified coverage: ~3,500 blocks across 5 fixtures, every
field of every block byte-exact against BW.

The watcher-forwarded histogram event corpus on prod (~10,000
events) will now produce correct .h5 sidecars on the next backfill
run.  No additional changes needed to the backfill flow — the
existing tool_version-bump cascade picks them up automatically.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 23:05:13 +00:00

338 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
test_histogram_codec.py — regression locks for the histogram body codec.
The codec is verified byte-exact against BW's ASCII export across the
in-repo histogram fixture bundle. Each test cross-checks decoded
binary fields against the corresponding .TXT row.
Run:
python -m pytest tests/test_histogram_codec.py -q
"""
from __future__ import annotations
import os
import re
import sys
from pathlib import Path
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from minimateplus.blastware_file import _WAVEFORM_HEADER_SIZE
from minimateplus.histogram_codec import (
_BLOCK_SIZE,
decode_histogram_body,
decode_histogram_body_full,
geo_count_to_ins,
half_period_to_hz,
walk_body,
)
from minimateplus.waveform_codec import mic_count_to_db
_FIXTURE_DIR = Path(__file__).resolve().parent.parent / "example-events" / "histogram"
def _extract_body(path: Path) -> bytes:
"""Locate the body of a BW event file — bytes between the STRT
record and the 26-byte footer."""
raw = path.read_bytes()
body_start = _WAVEFORM_HEADER_SIZE + 21
pos = body_start
footer_pos = -1
while True:
pos = raw.find(b"\x0e\x08", pos)
if pos < 0 or pos + 26 > len(raw):
break
yr = (raw[pos + 4] << 8) | raw[pos + 5]
if 2015 <= yr <= 2050:
footer_pos = pos
break
pos += 1
if footer_pos < 0:
footer_pos = len(raw) - 26
return raw[body_start:footer_pos]
def _parse_txt_rows(path: Path) -> list[tuple[str, list]]:
"""Parse a histogram .TXT into ``[(time_str, [10 col values]), …]``.
Special tokens:
- ``">100"`` (the BW-display sentinel for freq > 100 Hz) → ``None``
- non-numeric → ``None``
"""
text = path.read_text()
lines = text.splitlines()
hdr = None
for i, line in enumerate(lines):
if re.match(r"^Tran\s+", line.strip()):
hdr = i + 3 # skip 2-row header + units row
break
if hdr is None:
return []
rows: list[tuple[str, list]] = []
for line in lines[hdr:]:
parts = line.split("\t")
if len(parts) != 11:
continue
vals: list = []
for p in parts[1:]:
s = p.strip()
if s.startswith(">"):
vals.append(None) # ">100 Hz" sentinel
continue
try:
vals.append(float(s))
except ValueError:
vals.append(None)
rows.append((parts[0].strip(), vals))
return rows
# ── Block-walker plumbing ────────────────────────────────────────────────────
@pytest.mark.parametrize("fixture", [
"N844L20G.630H",
"N844L21H.2R0H",
"N844L6Z8.ZR0H",
"N844L6XE.BH0H",
"N844L23B.ND0H",
])
def test_walk_body_returns_records(fixture: str):
"""Walker yields at least one valid block per fixture."""
path = _FIXTURE_DIR / fixture
if not path.exists():
pytest.skip(f"fixture missing: {path}")
records = walk_body(_extract_body(path))
assert len(records) > 100, f"expected hundreds of blocks, got {len(records)}"
def test_walk_body_record_count_matches_txt_intervals():
"""Block count should match the .TXT interval count (off-by-one
at the tail is acceptable — last interval may be truncated at
recording stop)."""
bin_path = _FIXTURE_DIR / "N844L20G.630H"
txt_path = _FIXTURE_DIR / "N844L20G_630H_ASCII.TXT"
if not bin_path.exists() or not txt_path.exists():
pytest.skip("fixture missing")
records = walk_body(_extract_body(bin_path))
txt_rows = _parse_txt_rows(txt_path)
# Allow off-by-one (final block may have been mid-write at stop)
assert abs(len(records) - len(txt_rows)) <= 1, (
f"binary {len(records)} blocks vs TXT {len(txt_rows)} intervals"
)
def test_walk_body_segment_id_increments_every_256_blocks():
"""Segment ID advances 0→1→2→… after every 256 blocks within
one event."""
path = _FIXTURE_DIR / "N844L20G.630H"
if not path.exists():
pytest.skip("fixture missing")
records = walk_body(_extract_body(path))
# Group by segment_id and verify counts make sense
from collections import Counter
seg_counts = Counter(r["segment_id"] for r in records)
# First 3 segments should each have exactly 256 blocks (N844L20G has
# 791 blocks → 256+256+256+23 → segments 0/1/2/3)
assert seg_counts[0] == 256
assert seg_counts[1] == 256
assert seg_counts[2] == 256
assert seg_counts[3] == len(records) - 3 * 256
# ── Field-by-field decode verification against .TXT ground truth ─────────────
@pytest.mark.parametrize("fixture", [
"N844L20G.630H",
"N844L6Z8.ZR0H",
"N844L6XE.BH0H",
"N844L23B.ND0H",
])
def test_decoded_geo_peaks_match_txt(fixture: str):
"""For every block, decoded Tran/Vert/Long peak (count × 0.005)
matches the corresponding .TXT cell."""
bin_path = _FIXTURE_DIR / fixture
txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT")
if not bin_path.exists() or not txt_path.exists():
pytest.skip("fixture missing")
records = walk_body(_extract_body(bin_path))
txt_rows = _parse_txt_rows(txt_path)
n = min(len(records), len(txt_rows))
assert n > 0
for i in range(n):
rec = records[i]
_ts, txt = txt_rows[i]
# TXT cols 0/2/4 are T/V/L peak in in/s
for slot, key in (("T", "t_peak"), ("V", "v_peak"), ("L", "l_peak")):
col = {"T": 0, "V": 2, "L": 4}[slot]
decoded_ips = geo_count_to_ins(rec[key])
expected = txt[col]
assert abs(decoded_ips - expected) < 0.0005, (
f"{fixture} block {i} {slot}_peak: "
f"decoded={decoded_ips:.4f} vs txt={expected:.4f}"
)
@pytest.mark.parametrize("fixture", [
"N844L6Z8.ZR0H",
"N844L6XE.BH0H",
])
def test_decoded_geo_freqs_match_txt(fixture: str):
"""Decoded half-period → Hz matches the .TXT freq column for blocks
where the freq is in-range (not the `>100 Hz` sentinel)."""
bin_path = _FIXTURE_DIR / fixture
txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT")
if not bin_path.exists() or not txt_path.exists():
pytest.skip("fixture missing")
records = walk_body(_extract_body(bin_path))
txt_rows = _parse_txt_rows(txt_path)
n = min(len(records), len(txt_rows))
for i in range(n):
rec = records[i]
_ts, txt = txt_rows[i]
for slot, key, col in (("T", "t_halfp", 1), ("V", "v_halfp", 3), ("L", "l_halfp", 5)):
decoded_hz = half_period_to_hz(rec[key])
expected = txt[col]
if expected is None:
# TXT shows `>100 Hz` — codec should also yield None
assert decoded_hz is None or decoded_hz > 100, (
f"{fixture} block {i} {slot}_freq: codec says "
f"{decoded_hz} but TXT says >100"
)
continue
# TXT rounds; allow ±1 Hz
assert decoded_hz is not None
assert abs(decoded_hz - expected) < 1.0, (
f"{fixture} block {i} {slot}_freq: "
f"decoded={decoded_hz:.2f} Hz vs txt={expected:.2f} Hz"
)
@pytest.mark.parametrize("fixture", [
"N844L6XE.BH0H",
"N844L23B.ND0H",
"N844L6Z8.ZR0H",
])
def test_decoded_mic_db_matches_txt(fixture: str):
"""Decoded MicL peak count → dB(L) via mic_count_to_db matches
the .TXT dB(L) column."""
bin_path = _FIXTURE_DIR / fixture
txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT")
if not bin_path.exists() or not txt_path.exists():
pytest.skip("fixture missing")
records = walk_body(_extract_body(bin_path))
txt_rows = _parse_txt_rows(txt_path)
n = min(len(records), len(txt_rows))
for i in range(n):
rec = records[i]
_ts, txt = txt_rows[i]
# TXT col 8 = MicL dB(L)
decoded_db = mic_count_to_db(rec["m_peak"])
expected = txt[8]
if expected is None:
continue
# BW rounds to 1 decimal place for display. Tolerance 0.1 dB
# absorbs both rounding modes (truncate vs round-half-even).
assert abs(decoded_db - expected) < 0.1, (
f"{fixture} block {i} M_dB: "
f"decoded={decoded_db:.2f} dB vs txt={expected:.2f} dB"
)
@pytest.mark.parametrize("fixture", [
"N844L20G.630H",
"N844L6Z8.ZR0H",
])
def test_decoded_mic_freq_matches_txt(fixture: str):
"""Decoded MicL half-period → freq matches the .TXT col 9 freq."""
bin_path = _FIXTURE_DIR / fixture
txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT")
if not bin_path.exists() or not txt_path.exists():
pytest.skip("fixture missing")
records = walk_body(_extract_body(bin_path))
txt_rows = _parse_txt_rows(txt_path)
n = min(len(records), len(txt_rows))
for i in range(n):
rec = records[i]
_ts, txt = txt_rows[i]
decoded_hz = half_period_to_hz(rec["m_halfp"])
expected = txt[9]
if expected is None:
assert decoded_hz is None or decoded_hz > 100
continue
assert decoded_hz is not None
assert abs(decoded_hz - expected) < 1.0, (
f"{fixture} block {i} M_freq: "
f"decoded={decoded_hz:.2f} Hz vs txt={expected:.2f} Hz"
)
# ── Public API ───────────────────────────────────────────────────────────────
def test_decode_histogram_body_returns_four_channels():
"""The public API returns the standard 4-channel dict shape."""
path = _FIXTURE_DIR / "N844L20G.630H"
if not path.exists():
pytest.skip("fixture missing")
decoded = decode_histogram_body(_extract_body(path))
assert decoded is not None
assert set(decoded.keys()) == {"Tran", "Vert", "Long", "MicL"}
# All channels same length (one value per histogram interval)
n = len(decoded["Tran"])
assert all(len(decoded[ch]) == n for ch in ("Vert", "Long", "MicL"))
assert n > 100
def test_decode_histogram_body_returns_none_for_non_histogram():
"""A waveform-mode body (starts with 00 02 00) doesn't decode as
a histogram body."""
fake_waveform_body = b"\x00\x02\x00" + b"\x00" * 100
assert decode_histogram_body(fake_waveform_body) is None
def test_decode_histogram_body_returns_none_for_garbage():
"""Bytes that don't form valid blocks return None."""
assert decode_histogram_body(b"\xff" * 256) is None
def test_decode_histogram_body_full_preserves_frequency_data():
"""The structured-record API preserves the per-channel half-period
fields that the flat-channel API drops."""
path = _FIXTURE_DIR / "N844L20G.630H"
if not path.exists():
pytest.skip("fixture missing")
records = decode_histogram_body_full(_extract_body(path))
assert records is not None
r0 = records[0]
expected_fields = {
"segment_id", "block_ctr",
"t_peak", "t_halfp", "v_peak", "v_halfp",
"l_peak", "l_halfp", "m_peak", "m_halfp",
"meta_var",
}
assert set(r0.keys()) >= expected_fields
# ── Helpers ──────────────────────────────────────────────────────────────────
def test_half_period_to_hz_sentinel():
"""Half-period ≤ 5 returns None (the `>100 Hz` sentinel)."""
assert half_period_to_hz(5) is None
assert half_period_to_hz(1) is None
# halfp=6 gives 512/6 = 85.3 Hz — below the >100 threshold
assert half_period_to_hz(6) == pytest.approx(85.33, abs=0.01)
def test_geo_count_to_ins_scale():
"""1 count = 0.005 in/s at Normal range."""
assert geo_count_to_ins(1) == pytest.approx(0.005)
assert geo_count_to_ins(10) == pytest.approx(0.050)
assert geo_count_to_ins(0) == 0.0