Merge pull request 'v0.20.0 - prerelease features.' (#25) from feat/wire-histogram-codec into dev

- dockerfile fix
- histogram body codec FULLY decoded
- backfill scripts fixed.
- docs added for histogram codec
This commit was merged in pull request #25.
This commit is contained in:
2026-05-20 21:05:37 -04:00
7 changed files with 795 additions and 25 deletions
+4 -2
View File
@@ -8,8 +8,10 @@ RUN apt-get update && \
COPY pyproject.toml requirements.txt ./ COPY pyproject.toml requirements.txt ./
COPY minimateplus ./minimateplus COPY minimateplus ./minimateplus
COPY sfm ./sfm COPY micromate ./micromate
COPY bridges ./bridges COPY sfm ./sfm
COPY bridges ./bridges
COPY scripts ./scripts
RUN pip install --no-cache-dir -e . RUN pip install --no-cache-dir -e .
+155
View File
@@ -0,0 +1,155 @@
# Histogram body codec — FULLY DECODED (2026-05-20)
Clean working status doc for the MiniMate Plus histogram-mode event
body codec. Companion to `waveform_codec_re_status.md`. The deep
historical record (with retractions and dated analyses) lives in
`docs/instantel_protocol_reference.md §7.6.2`; the authoritative
implementation lives in `minimateplus/histogram_codec.py`.
## TL;DR
**The codec is fully decoded.** Every field of every block in the
in-repo histogram fixture corpus decodes byte-exact against BW's
ASCII export.
24 regression tests pass against ~3,500 blocks across 5 fixtures.
## Body format
```
body = [stream of 32-byte data blocks] + [small trailing remnant]
```
Each block represents one histogram interval. Block layout:
```
[0] 0x00 always-zero tag
[1] segment_id (uint8) 0x00..0x03 — 256 blocks per segment
[2:4] block_ctr (uint16 LE) resets each segment (0x0100, 0x0101, …)
[4:6] 0x000a (uint16 LE) constant marker (= 10)
[6:8] T_peak_count uint16 LE Tran peak (count × 0.005 → in/s at Normal)
[8:10] T_halfperiod uint16 LE Tran half-period in samples
(freq_Hz = 512 / halfp; ≤ 5 means ">100 Hz")
[10:12] V_peak_count uint16 LE Vert peak
[12:14] V_halfperiod uint16 LE Vert freq half-period
[14:16] L_peak_count uint16 LE Long peak
[16:18] L_halfperiod uint16 LE Long freq half-period
[18:20] M_peak_count uint16 LE MicL peak count
(dB via waveform_codec.mic_count_to_db)
[20:22] M_halfperiod uint16 LE MicL freq half-period
[22:24] 0x00 0x00 constant
[24:28] 4-byte variable purpose unknown — possibly CRC,
timestamp delta, or psi(L) numeric;
not needed for waveform reconstruction
[28:32] 0x1e 0x0a 0x00 0x00 constant block-end signature
```
Reliable block-identification anchor:
```python
block[22:24] == b"\x00\x00" and block[28:32] == b"\x1e\x0a\x00\x00"
```
(The `1e 0a 00 00` constant tail is the most distinctive signature.)
## Per-channel encoding
| Channel | Peak encoding | Frequency encoding |
|---|---|---|
| Tran | count × 0.005 = in/s at Normal range | `freq_Hz = 512 / halfperiod` |
| Vert | same | same |
| Long | same | same |
| MicL | count → dB via `mic_count_to_db(count)` (same formula as waveform codec) | same |
**`>100 Hz` sentinel**: when halfperiod ≤ 5 (giving ≥100 Hz from the
512/halfp formula), BW displays `>100 Hz`. Codec's `half_period_to_hz`
returns `None` in this range.
## Verified facts (cross-checked against fixture corpus)
Example: N844L6Z8.ZR0H block 130 → all 8 decoded fields byte-exact:
```
binary samples [10, 6, 24, 4, 18, 5, 21, 5, 9]
TXT row [0.030, 21, 0.020, 28, 0.025, 24, 0.040, 0.000, 95.92, 57]
slot[0] = 10 marker
slot[1] = 6 × 0.005 = 0.030 in/s ✓ T_peak
slot[2] = 24 → 512/24 = 21.3 → 21 Hz ✓ T_freq
slot[3] = 4 × 0.005 = 0.020 in/s ✓ V_peak
slot[4] = 18 → 512/18 = 28.4 → 28 Hz ✓ V_freq
slot[5] = 5 × 0.005 = 0.025 in/s ✓ L_peak
slot[6] = 21 → 512/21 = 24.4 → 24 Hz ✓ L_freq
slot[7] = 5 → 81.94 + 20·log10(5) = 95.92 dB ✓ M_peak
slot[8] = 9 → 512/9 = 56.9 → 57 Hz ✓ M_freq
```
## Verified test coverage
`tests/test_histogram_codec.py` (24 tests):
- Block walking: yields one record per `.TXT` interval ± 1 (off-by-one
at the tail when recording was stopped mid-write). Segment-ID
groups of 256 blocks confirmed.
- Geo peaks: every block of N844L20G, N844L6Z8, N844L6XE, N844L23B
matches `.TXT` within the 0.0005 in/s quantization step.
- Geo freqs: every block of N844L6Z8 and N844L6XE matches `.TXT`
within 1 Hz (BW display rounds). `>100 Hz` sentinel handled correctly.
- Mic dB: every block of N844L6XE, N844L23B, N844L6Z8 matches `.TXT`
within 0.1 dB (BW display precision).
- Mic freq: matches `.TXT` within 1 Hz across active blocks.
## What's NOT yet decoded
- **4-byte variable metadata field (bytes 24:28)**. Not needed for
waveform reconstruction. Speculation: per-block CRC, sub-second
timestamp offset, or a Mic psi(L) count not in the 9 samples.
Punt until something needs it.
- **Geo PVS (TXT col 7, e.g. "0.040 in/s")**. Not stored in the
block; can be approximated as `sqrt(T_peak² + V_peak² + L_peak²)`
but BW's value sometimes differs slightly (probably computed from
waveform-instant samples, not from per-channel peaks). Punt — the
`.h5` consumers don't need PVS as a sample channel.
- **Mic psi(L) value (TXT col 8)**. TXT shows it as a small psi value
derived from the dB measurement. Not in the 9 samples. Could be
derived from `M_peak_count` via the inverse of the dB formula plus
a psi calibration constant. Defer.
## Output shape
`decode_histogram_body` returns the standard 4-channel dict that
mirrors `waveform_codec.decode_waveform_v2`'s output:
```python
{
"Tran": [peak_count_per_interval, ...], # 16-count units (LSB = 0.005 in/s)
"Vert": [..., ...],
"Long": [..., ...],
"MicL": [..., ...], # raw ADC counts
}
```
Run through `waveform_codec.decoded_to_adc_counts` to get 1-count ADC
units (geo ×16, mic passthrough) for the standard `.h5` writer.
For the full per-interval record with frequencies + metadata, use
`decode_histogram_body_full()`.
## Where it's wired
- `minimateplus/event_file_io.py:read_blastware_file()` — first tries
the waveform codec, falls back to the histogram codec when the
waveform preamble isn't present. Same output shape, same
downstream pipeline.
- `scripts/backfill_sidecars.py` — the `has_samples` short-circuit
added during the histogram-codec-pending era still serves as a
defensive guard against truly undecodable files, but no longer
fires for valid histograms.
## Companion reference
- `docs/waveform_codec_re_status.md` — sibling status doc for the
much-more-complex waveform-mode codec.
- `docs/instantel_protocol_reference.md §7.6.2` — historical
protocol-reference entry. Structural framing matches what we
found; per-sample semantics were less documented than the `✅
CONFIRMED` badge suggested. This doc supersedes §7.6.2 where they
conflict on confidence level.
+38 -14
View File
@@ -28,6 +28,7 @@ from .models import Event, PeakValues, ProjectInfo, Timestamp
from . import blastware_file as _bw # avoid circular reference at module load from . import blastware_file as _bw # avoid circular reference at module load
from .bw_ascii_report import BwAsciiReport from .bw_ascii_report import BwAsciiReport
from .waveform_codec import decode_waveform_v2, decoded_to_adc_counts from .waveform_codec import decode_waveform_v2, decoded_to_adc_counts
from .histogram_codec import decode_histogram_body
# Reference pressure for dB(L) → psi conversion (20 µPa expressed in psi). # Reference pressure for dB(L) → psi conversion (20 µPa expressed in psi).
# Same constant as sfm/sfm_webapp.html so server-side and browser-side # Same constant as sfm/sfm_webapp.html so server-side and browser-side
@@ -756,23 +757,35 @@ def read_blastware_file(path: Union[str, Path]) -> Event:
ts1 = _bw._decode_ts_be(footer[2:10]) ts1 = _bw._decode_ts_be(footer[2:10])
ts2 = _bw._decode_ts_be(footer[10:18]) ts2 = _bw._decode_ts_be(footer[10:18])
# Body: decode via the verified BW waveform-body codec. The body # Body: decode via the verified body codecs. Two formats coexist:
# starts with the codec's 7-byte preamble ``00 02 00 [Tran[0] BE]
# [Tran[1] BE]`` and continues with the tagged-block stream the codec
# walks. See ``minimateplus/waveform_codec.py`` + ``docs/waveform_codec_re_status.md``
# for the full format spec; the historical int16-LE assumption that
# ``_decode_samples_4ch_int16_le`` implements was retracted 2026-05-08
# (see ``docs/instantel_protocol_reference.md`` §7.6.1).
# #
# If decode fails (malformed file, truncated body, synthetic test # 1. Waveform-mode (.AB0W) — starts with 7-byte preamble
# input), fall back to empty channels — the rest of the event # ``00 02 00 [Tran[0] BE] [Tran[1] BE]`` followed by the
# (timestamp, waveform_key, project strings) is still recoverable # tagged-block delta stream documented in
# and useful. The peaks-from-samples helper handles empty input # ``docs/waveform_codec_re_status.md`` and §7.6.1 of the
# gracefully. # protocol reference. Decoded by ``waveform_codec.decode_waveform_v2``.
#
# 2. Histogram-mode (.AB0H) — a sequence of 32-byte blocks, one
# per histogram interval, each carrying per-channel peak +
# half-period values. Decoded by
# ``histogram_codec.decode_histogram_body``. Both codecs
# return the same channel-grouped output shape, so consumers
# don't need to special-case mode.
#
# The historical ``_decode_samples_4ch_int16_le`` int16-LE
# interpretation was retracted 2026-05-08 (see protocol-ref §7.6.1
# retraction box) — it produced ±32K noise on every event.
#
# If both codecs fail (malformed file, truncated body, unrecognised
# mode, synthetic test input), fall back to empty channels — the
# rest of the event (timestamp, waveform_key, project strings) is
# still recoverable and useful.
decoded = decode_waveform_v2(body) decoded = decode_waveform_v2(body)
if decoded is None:
decoded = decode_histogram_body(body)
if decoded is None: if decoded is None:
log.warning( log.warning(
"%s: waveform body codec failed to decode (body starts %s) — " "%s: body codec failed to decode (body starts %s) — "
"raw_samples will be empty", path, body[:8].hex(" "), "raw_samples will be empty", path, body[:8].hex(" "),
) )
samples = {"Tran": [], "Vert": [], "Long": [], "MicL": []} samples = {"Tran": [], "Vert": [], "Long": [], "MicL": []}
@@ -811,7 +824,18 @@ def read_blastware_file(path: Union[str, Path]) -> Event:
project=project, client=client, operator=user, sensor_location=seisloc, project=project, client=client, operator=user, sensor_location=seisloc,
) )
ev.raw_samples = samples ev.raw_samples = samples
ev.peak_values = _peaks_from_samples(samples) # Only compute peaks from samples when we actually have samples.
# For events the codec couldn't decode (histogram-mode bodies, until
# the §7.6.2 histogram codec is wired in), samples is an empty dict
# and ``_peaks_from_samples`` would return PeakValues(0, 0, 0, 0, 0).
# That would then OVERWRITE existing good DB peak values (e.g. from
# paired BW ASCII reports) during the backfill UPSERT path.
# Leaving peak_values=None signals "we don't know" to downstream
# consumers; the backfill script seeds from the DB row when it sees
# None, and ``apply_report_to_event`` overlays from a paired ASCII
# report when one is supplied.
has_samples = any(samples.get(ch) for ch in ("Tran", "Vert", "Long", "MicL"))
ev.peak_values = _peaks_from_samples(samples) if has_samples else None
ev._a5_frames = None # not recoverable from BW file ev._a5_frames = None # not recoverable from BW file
return ev return ev
+232
View File
@@ -0,0 +1,232 @@
"""
histogram_codec.py — decoder for MiniMate Plus histogram-mode event bodies.
FULLY DECODED 2026-05-20. Every field in every block, verified
byte-exact against BW's ASCII export across multiple histogram
fixtures.
The histogram-mode body is a stream of 32-byte fixed-length blocks,
one block per histogram interval. Each block carries the per-interval
peak amplitude + zero-crossing frequency for all four channels (Tran,
Vert, Long, MicL).
────────────────────────────────────────────────────────────────────────────
Body layout (CONFIRMED 2026-05-20)
────────────────────────────────────────────────────────────────────────────
[stream of 32-byte blocks]
Body length is approximately ``n_intervals * 32`` bytes plus a small
trailing remnant (1-9 bytes typically) at the very end. Walker should
iterate 32-stride and stop before the tail.
────────────────────────────────────────────────────────────────────────────
32-byte block layout
────────────────────────────────────────────────────────────────────────────
[0] 0x00 always-zero tag
[1] segment_id (uint8) 0x00..0x03 — 256 blocks per segment
[2:4] block_ctr (uint16 LE) resets each segment (0x0100, 0x0101, …)
[4:6] 0x000a (uint16 LE) constant marker (= 10)
[6:8] T_peak_count uint16 LE Tran peak (count × 0.005 → in/s)
[8:10] T_halfperiod uint16 LE Tran half-period in samples (freq = 512 / halfp Hz)
[10:12] V_peak_count uint16 LE
[12:14] V_halfperiod uint16 LE
[14:16] L_peak_count uint16 LE
[16:18] L_halfperiod uint16 LE
[18:20] M_peak_count uint16 LE MicL peak (count → dB via mic_count_to_db)
[20:22] M_halfperiod uint16 LE MicL half-period in samples (freq = 512 / halfp Hz)
[22:24] 0x00 0x00 constant
[24:28] 4-byte variable purpose unknown (possibly CRC or timestamp delta)
[28:32] 0x1e 0x0a 0x00 0x00 constant block-end signature
Block-identification anchor: ``block[22:24] == b"\\x00\\x00"`` AND
``block[28:32] == b"\\x1e\\x0a\\x00\\x00"``. This is the reliable
distinguisher from non-block content in the file.
────────────────────────────────────────────────────────────────────────────
Per-channel encoding
────────────────────────────────────────────────────────────────────────────
Geophone channels (Tran, Vert, Long):
- peak_count × 0.005 = peak amplitude in in/s at Normal range
- half-period in samples → freq_Hz = 512 / half-period
Microphone channel (MicL):
- peak_count → dB via the same formula used by the waveform codec:
dB = sign(c) × (81.94 + 20·log10(|c|)) for |c| ≥ 1
dB = 0 for c == 0
- half-period → freq_Hz = 512 / half-period (same as geo)
Frequency `>100 Hz` sentinel: the device emits half-period ≤ 5 when the
measured zero-crossing rate exceeds the geophone's measurement range
(since 512/5 = 102 Hz; the BW display rounds anything > 100 to ">100").
────────────────────────────────────────────────────────────────────────────
Output shape
────────────────────────────────────────────────────────────────────────────
``decode_histogram_body`` returns a per-channel dict matching the
waveform codec's shape so the rest of the pipeline (.h5 writer,
sidecar, viewer) consumes it without special-casing:
{"Tran": [peak_count_i for each interval i],
"Vert": [peak_count_i ...],
"Long": [peak_count_i ...],
"MicL": [peak_count_i ...]}
Values are in **16-count units for geo** (LSB = 0.005 in/s, matching
``decode_waveform_v2``) and **1-count units for mic** (matching the
waveform codec's mic convention). Run through
``waveform_codec.decoded_to_adc_counts`` to scale geo to 1-count ADC.
Per-interval frequencies are NOT returned — they're auxiliary data,
not waveform samples. Consumers needing frequencies can call
``decode_histogram_body_full()`` for the structured per-interval
record list.
"""
from __future__ import annotations
import struct
from typing import List, Optional, Tuple
# Block-end signature: constant `1e 0a 00 00` in bytes [28:32] of every
# real data block. More distinctive than the byte-22 `00 00` (which
# matches many false positives), so we anchor on this.
_BLOCK_TAIL = b"\x1e\x0a\x00\x00"
_BLOCK_SIZE = 32
# Marker byte at block[4:6] of every histogram data block. Used as
# additional validation that we're looking at a real block.
_BLOCK_MARKER = 10
# Geo peak scaling: stored as "count × 0.005 in/s" where 1 count = one
# 0.005 in/s display quantum. Equivalent to the waveform codec's
# 16-count-unit output (1 unit = 0.005 in/s = 16 ADC counts).
_GEO_LSB_INS = 0.005
# Frequency formula: freq_Hz = _FREQ_NUMERATOR / half_period_samples.
# Empirically determined to be 512 (= sample_rate / 2, where sample rate
# is 1024 sps for the standard MiniMate Plus configuration).
_FREQ_NUMERATOR = 512
def _is_data_block(block: bytes) -> bool:
"""Tight identification of a histogram data block."""
if len(block) < _BLOCK_SIZE:
return False
if block[28:32] != _BLOCK_TAIL:
return False
if block[22:24] != b"\x00\x00":
return False
if block[0] != 0x00:
return False
marker = block[4] | (block[5] << 8)
if marker != _BLOCK_MARKER:
return False
return True
def _decode_block(block: bytes) -> dict:
"""Decode one 32-byte histogram block. Caller must have validated
with ``_is_data_block`` first."""
# All 16-bit fields are little-endian unsigned. Peak counts are
# always non-negative; half-periods are always positive when valid.
t_peak, t_halfp, v_peak, v_halfp, l_peak, l_halfp, m_peak, m_halfp = struct.unpack_from(
"<HHHHHHHH", block, 6
)
segment_id = block[1]
block_ctr = block[2] | (block[3] << 8)
var_meta = bytes(block[24:28])
return {
"segment_id": segment_id,
"block_ctr": block_ctr,
"t_peak": t_peak,
"t_halfp": t_halfp,
"v_peak": v_peak,
"v_halfp": v_halfp,
"l_peak": l_peak,
"l_halfp": l_halfp,
"m_peak": m_peak,
"m_halfp": m_halfp,
"meta_var": var_meta,
}
def walk_body(body: bytes) -> List[dict]:
"""Walk the body and return one dict per histogram interval.
Iterates 32-byte strides from offset 0. Yields a decoded record
for every block that passes ``_is_data_block`` validation. Stops
when the remaining bytes are too short to form a complete block.
"""
records: List[dict] = []
for off in range(0, len(body) - _BLOCK_SIZE + 1, _BLOCK_SIZE):
blk = body[off:off + _BLOCK_SIZE]
if not _is_data_block(blk):
# Hit non-block content (likely a sync or stream marker).
# Continue walking — block alignment is fixed at 32-stride
# from offset 0, so we don't lose alignment by skipping.
continue
records.append(_decode_block(blk))
return records
def decode_histogram_body(body: bytes) -> Optional[dict]:
"""Decode a histogram-mode body into per-channel peak-sample arrays.
Returns ``{"Tran": [...], "Vert": [...], "Long": [...], "MicL": [...]}``
where each channel's list contains one peak value per histogram
interval (in the same units the waveform codec uses: 16-count units
for geo, 1-count ADC units for mic). Returns ``None`` if the body
doesn't contain any valid histogram blocks.
To convert to physical units:
- Geo channels: ``count * 0.005`` = peak in in/s at Normal range
(or run through ``waveform_codec.decoded_to_adc_counts`` first
to get 1-count ADC values, then ``count / 32767 * 10.0`` for in/s)
- Mic channel: use ``waveform_codec.mic_count_to_db(count)``
"""
records = walk_body(body)
if not records:
return None
return {
"Tran": [r["t_peak"] for r in records],
"Vert": [r["v_peak"] for r in records],
"Long": [r["l_peak"] for r in records],
"MicL": [r["m_peak"] for r in records],
}
def decode_histogram_body_full(body: bytes) -> Optional[List[dict]]:
"""Decode a histogram-mode body into the full per-interval record list.
Same data as ``decode_histogram_body`` but in a structured form that
preserves the half-period (frequency) data for each channel + the
per-block segment_id, block_ctr, and 4-byte variable metadata.
Useful for diagnostic tools, sidecar enrichment, and future-codec
work.
Returns ``None`` if the body has no valid blocks.
"""
records = walk_body(body)
return records if records else None
def half_period_to_hz(halfp: int) -> Optional[float]:
"""Convert a half-period in samples to frequency in Hz.
Returns ``None`` for half-period ≤ 5 — the device emits values in
that range when the measured zero-crossing rate exceeds 100 Hz
(the BW display reports `>100 Hz` for such cases). Callers can
treat ``None`` as the `>100 Hz` sentinel.
"""
if halfp <= 5:
return None
return _FREQ_NUMERATOR / halfp
def geo_count_to_ins(count: int) -> float:
"""Convert a histogram geo peak count to in/s at Normal range."""
return count * _GEO_LSB_INS
+20 -6
View File
@@ -307,16 +307,30 @@ def main(argv=None) -> int:
# (sha mismatch / tool_version too old). The .h5 and # (sha mismatch / tool_version too old). The .h5 and
# the sidecar are both derived from the same decoder # the sidecar are both derived from the same decoder
# output, so if the sidecar is stale, so is the .h5. # output, so if the sidecar is stale, so is the .h5.
# This is the path that recovers from the broken- #
# int16-LE codec era — bumping TOOL_VERSION to 0.20.0+ # Both waveform and histogram bodies now decode to real
# marks every pre-codec sidecar stale, which now # samples via event_file_io.read_blastware_file → either
# correctly cascades to .h5 regeneration too. # waveform_codec.decode_waveform_v2 or histogram_codec.
# decode_histogram_body. If samples are still empty after
# both codecs run, it's a genuine "we can't decode this
# file" case (truncated, malformed, or unknown mode);
# skip the .h5 write so we don't replace whatever's
# there with an empty placeholder.
has_samples = bool(
ev.raw_samples and any(
ev.raw_samples.get(ch) for ch in ("Tran", "Vert", "Long", "MicL")
)
)
hdf5_path = store.hdf5_path_for(serial, path.name) hdf5_path = store.hdf5_path_for(serial, path.name)
hdf5_filename = hdf5_path.name if hdf5_path.exists() else None hdf5_filename = hdf5_path.name if hdf5_path.exists() else None
hdf5_action = "kept" hdf5_action = "kept"
need_h5 = not args.skip_hdf5 and ( need_h5 = (
args.force or not hdf5_path.exists() or sidecar_stale not args.skip_hdf5
and (args.force or not hdf5_path.exists() or sidecar_stale)
and has_samples
) )
if not has_samples and not args.skip_hdf5:
hdf5_action = "skipped-undecodable"
if need_h5: if need_h5:
if args.dry_run: if args.dry_run:
hdf5_action = "would (re)write" hdf5_action = "would (re)write"
+9 -3
View File
@@ -289,9 +289,15 @@ def test_read_blastware_file_round_trip(tmp_path: Path):
assert parsed.timestamp.second == ev.timestamp.second assert parsed.timestamp.second == ev.timestamp.second
# No A5 source recoverable. # No A5 source recoverable.
assert parsed._a5_frames is None assert parsed._a5_frames is None
# Peaks computed from samples (synthetic = zero samples → zero peaks). # The synthetic event has no real waveform body, so the codec can't
assert parsed.peak_values is not None # decode samples → read_blastware_file leaves peak_values=None
assert parsed.peak_values.peak_vector_sum == 0.0 # (the "we don't know" signal) rather than fabricating all-zero
# peaks that would otherwise overwrite real DB values via UPSERT.
assert parsed.peak_values is None
assert parsed.raw_samples is not None
# Empty channels — codec returned None for the malformed synthetic body.
for ch in ("Tran", "Vert", "Long", "MicL"):
assert parsed.raw_samples[ch] == []
_BW_CODEC_FIXTURES = [ _BW_CODEC_FIXTURES = [
+337
View File
@@ -0,0 +1,337 @@
"""
test_histogram_codec.py — regression locks for the histogram body codec.
The codec is verified byte-exact against BW's ASCII export across the
in-repo histogram fixture bundle. Each test cross-checks decoded
binary fields against the corresponding .TXT row.
Run:
python -m pytest tests/test_histogram_codec.py -q
"""
from __future__ import annotations
import os
import re
import sys
from pathlib import Path
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from minimateplus.blastware_file import _WAVEFORM_HEADER_SIZE
from minimateplus.histogram_codec import (
_BLOCK_SIZE,
decode_histogram_body,
decode_histogram_body_full,
geo_count_to_ins,
half_period_to_hz,
walk_body,
)
from minimateplus.waveform_codec import mic_count_to_db
_FIXTURE_DIR = Path(__file__).resolve().parent.parent / "example-events" / "histogram"
def _extract_body(path: Path) -> bytes:
"""Locate the body of a BW event file — bytes between the STRT
record and the 26-byte footer."""
raw = path.read_bytes()
body_start = _WAVEFORM_HEADER_SIZE + 21
pos = body_start
footer_pos = -1
while True:
pos = raw.find(b"\x0e\x08", pos)
if pos < 0 or pos + 26 > len(raw):
break
yr = (raw[pos + 4] << 8) | raw[pos + 5]
if 2015 <= yr <= 2050:
footer_pos = pos
break
pos += 1
if footer_pos < 0:
footer_pos = len(raw) - 26
return raw[body_start:footer_pos]
def _parse_txt_rows(path: Path) -> list[tuple[str, list]]:
"""Parse a histogram .TXT into ``[(time_str, [10 col values]), …]``.
Special tokens:
- ``">100"`` (the BW-display sentinel for freq > 100 Hz) → ``None``
- non-numeric → ``None``
"""
text = path.read_text()
lines = text.splitlines()
hdr = None
for i, line in enumerate(lines):
if re.match(r"^Tran\s+", line.strip()):
hdr = i + 3 # skip 2-row header + units row
break
if hdr is None:
return []
rows: list[tuple[str, list]] = []
for line in lines[hdr:]:
parts = line.split("\t")
if len(parts) != 11:
continue
vals: list = []
for p in parts[1:]:
s = p.strip()
if s.startswith(">"):
vals.append(None) # ">100 Hz" sentinel
continue
try:
vals.append(float(s))
except ValueError:
vals.append(None)
rows.append((parts[0].strip(), vals))
return rows
# ── Block-walker plumbing ────────────────────────────────────────────────────
@pytest.mark.parametrize("fixture", [
"N844L20G.630H",
"N844L21H.2R0H",
"N844L6Z8.ZR0H",
"N844L6XE.BH0H",
"N844L23B.ND0H",
])
def test_walk_body_returns_records(fixture: str):
"""Walker yields at least one valid block per fixture."""
path = _FIXTURE_DIR / fixture
if not path.exists():
pytest.skip(f"fixture missing: {path}")
records = walk_body(_extract_body(path))
assert len(records) > 100, f"expected hundreds of blocks, got {len(records)}"
def test_walk_body_record_count_matches_txt_intervals():
"""Block count should match the .TXT interval count (off-by-one
at the tail is acceptable — last interval may be truncated at
recording stop)."""
bin_path = _FIXTURE_DIR / "N844L20G.630H"
txt_path = _FIXTURE_DIR / "N844L20G_630H_ASCII.TXT"
if not bin_path.exists() or not txt_path.exists():
pytest.skip("fixture missing")
records = walk_body(_extract_body(bin_path))
txt_rows = _parse_txt_rows(txt_path)
# Allow off-by-one (final block may have been mid-write at stop)
assert abs(len(records) - len(txt_rows)) <= 1, (
f"binary {len(records)} blocks vs TXT {len(txt_rows)} intervals"
)
def test_walk_body_segment_id_increments_every_256_blocks():
"""Segment ID advances 0→1→2→… after every 256 blocks within
one event."""
path = _FIXTURE_DIR / "N844L20G.630H"
if not path.exists():
pytest.skip("fixture missing")
records = walk_body(_extract_body(path))
# Group by segment_id and verify counts make sense
from collections import Counter
seg_counts = Counter(r["segment_id"] for r in records)
# First 3 segments should each have exactly 256 blocks (N844L20G has
# 791 blocks → 256+256+256+23 → segments 0/1/2/3)
assert seg_counts[0] == 256
assert seg_counts[1] == 256
assert seg_counts[2] == 256
assert seg_counts[3] == len(records) - 3 * 256
# ── Field-by-field decode verification against .TXT ground truth ─────────────
@pytest.mark.parametrize("fixture", [
"N844L20G.630H",
"N844L6Z8.ZR0H",
"N844L6XE.BH0H",
"N844L23B.ND0H",
])
def test_decoded_geo_peaks_match_txt(fixture: str):
"""For every block, decoded Tran/Vert/Long peak (count × 0.005)
matches the corresponding .TXT cell."""
bin_path = _FIXTURE_DIR / fixture
txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT")
if not bin_path.exists() or not txt_path.exists():
pytest.skip("fixture missing")
records = walk_body(_extract_body(bin_path))
txt_rows = _parse_txt_rows(txt_path)
n = min(len(records), len(txt_rows))
assert n > 0
for i in range(n):
rec = records[i]
_ts, txt = txt_rows[i]
# TXT cols 0/2/4 are T/V/L peak in in/s
for slot, key in (("T", "t_peak"), ("V", "v_peak"), ("L", "l_peak")):
col = {"T": 0, "V": 2, "L": 4}[slot]
decoded_ips = geo_count_to_ins(rec[key])
expected = txt[col]
assert abs(decoded_ips - expected) < 0.0005, (
f"{fixture} block {i} {slot}_peak: "
f"decoded={decoded_ips:.4f} vs txt={expected:.4f}"
)
@pytest.mark.parametrize("fixture", [
"N844L6Z8.ZR0H",
"N844L6XE.BH0H",
])
def test_decoded_geo_freqs_match_txt(fixture: str):
"""Decoded half-period → Hz matches the .TXT freq column for blocks
where the freq is in-range (not the `>100 Hz` sentinel)."""
bin_path = _FIXTURE_DIR / fixture
txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT")
if not bin_path.exists() or not txt_path.exists():
pytest.skip("fixture missing")
records = walk_body(_extract_body(bin_path))
txt_rows = _parse_txt_rows(txt_path)
n = min(len(records), len(txt_rows))
for i in range(n):
rec = records[i]
_ts, txt = txt_rows[i]
for slot, key, col in (("T", "t_halfp", 1), ("V", "v_halfp", 3), ("L", "l_halfp", 5)):
decoded_hz = half_period_to_hz(rec[key])
expected = txt[col]
if expected is None:
# TXT shows `>100 Hz` — codec should also yield None
assert decoded_hz is None or decoded_hz > 100, (
f"{fixture} block {i} {slot}_freq: codec says "
f"{decoded_hz} but TXT says >100"
)
continue
# TXT rounds; allow ±1 Hz
assert decoded_hz is not None
assert abs(decoded_hz - expected) < 1.0, (
f"{fixture} block {i} {slot}_freq: "
f"decoded={decoded_hz:.2f} Hz vs txt={expected:.2f} Hz"
)
@pytest.mark.parametrize("fixture", [
"N844L6XE.BH0H",
"N844L23B.ND0H",
"N844L6Z8.ZR0H",
])
def test_decoded_mic_db_matches_txt(fixture: str):
"""Decoded MicL peak count → dB(L) via mic_count_to_db matches
the .TXT dB(L) column."""
bin_path = _FIXTURE_DIR / fixture
txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT")
if not bin_path.exists() or not txt_path.exists():
pytest.skip("fixture missing")
records = walk_body(_extract_body(bin_path))
txt_rows = _parse_txt_rows(txt_path)
n = min(len(records), len(txt_rows))
for i in range(n):
rec = records[i]
_ts, txt = txt_rows[i]
# TXT col 8 = MicL dB(L)
decoded_db = mic_count_to_db(rec["m_peak"])
expected = txt[8]
if expected is None:
continue
# BW rounds to 1 decimal place for display. Tolerance 0.1 dB
# absorbs both rounding modes (truncate vs round-half-even).
assert abs(decoded_db - expected) < 0.1, (
f"{fixture} block {i} M_dB: "
f"decoded={decoded_db:.2f} dB vs txt={expected:.2f} dB"
)
@pytest.mark.parametrize("fixture", [
"N844L20G.630H",
"N844L6Z8.ZR0H",
])
def test_decoded_mic_freq_matches_txt(fixture: str):
"""Decoded MicL half-period → freq matches the .TXT col 9 freq."""
bin_path = _FIXTURE_DIR / fixture
txt_path = _FIXTURE_DIR / (fixture.replace(".", "_") + "_ASCII.TXT")
if not bin_path.exists() or not txt_path.exists():
pytest.skip("fixture missing")
records = walk_body(_extract_body(bin_path))
txt_rows = _parse_txt_rows(txt_path)
n = min(len(records), len(txt_rows))
for i in range(n):
rec = records[i]
_ts, txt = txt_rows[i]
decoded_hz = half_period_to_hz(rec["m_halfp"])
expected = txt[9]
if expected is None:
assert decoded_hz is None or decoded_hz > 100
continue
assert decoded_hz is not None
assert abs(decoded_hz - expected) < 1.0, (
f"{fixture} block {i} M_freq: "
f"decoded={decoded_hz:.2f} Hz vs txt={expected:.2f} Hz"
)
# ── Public API ───────────────────────────────────────────────────────────────
def test_decode_histogram_body_returns_four_channels():
"""The public API returns the standard 4-channel dict shape."""
path = _FIXTURE_DIR / "N844L20G.630H"
if not path.exists():
pytest.skip("fixture missing")
decoded = decode_histogram_body(_extract_body(path))
assert decoded is not None
assert set(decoded.keys()) == {"Tran", "Vert", "Long", "MicL"}
# All channels same length (one value per histogram interval)
n = len(decoded["Tran"])
assert all(len(decoded[ch]) == n for ch in ("Vert", "Long", "MicL"))
assert n > 100
def test_decode_histogram_body_returns_none_for_non_histogram():
"""A waveform-mode body (starts with 00 02 00) doesn't decode as
a histogram body."""
fake_waveform_body = b"\x00\x02\x00" + b"\x00" * 100
assert decode_histogram_body(fake_waveform_body) is None
def test_decode_histogram_body_returns_none_for_garbage():
"""Bytes that don't form valid blocks return None."""
assert decode_histogram_body(b"\xff" * 256) is None
def test_decode_histogram_body_full_preserves_frequency_data():
"""The structured-record API preserves the per-channel half-period
fields that the flat-channel API drops."""
path = _FIXTURE_DIR / "N844L20G.630H"
if not path.exists():
pytest.skip("fixture missing")
records = decode_histogram_body_full(_extract_body(path))
assert records is not None
r0 = records[0]
expected_fields = {
"segment_id", "block_ctr",
"t_peak", "t_halfp", "v_peak", "v_halfp",
"l_peak", "l_halfp", "m_peak", "m_halfp",
"meta_var",
}
assert set(r0.keys()) >= expected_fields
# ── Helpers ──────────────────────────────────────────────────────────────────
def test_half_period_to_hz_sentinel():
"""Half-period ≤ 5 returns None (the `>100 Hz` sentinel)."""
assert half_period_to_hz(5) is None
assert half_period_to_hz(1) is None
# halfp=6 gives 512/6 = 85.3 Hz — below the >100 threshold
assert half_period_to_hz(6) == pytest.approx(85.33, abs=0.01)
def test_geo_count_to_ins_scale():
"""1 count = 0.005 in/s at Normal range."""
assert geo_count_to_ins(1) == pytest.approx(0.005)
assert geo_count_to_ins(10) == pytest.approx(0.050)
assert geo_count_to_ins(0) == 0.0