""" histogram_codec.py — decoder for MiniMate Plus histogram-mode event bodies. FULLY DECODED 2026-05-20. Every field in every block, verified byte-exact against BW's ASCII export across multiple histogram fixtures. The histogram-mode body is a stream of 32-byte fixed-length blocks, one block per histogram interval. Each block carries the per-interval peak amplitude + zero-crossing frequency for all four channels (Tran, Vert, Long, MicL). ──────────────────────────────────────────────────────────────────────────── Body layout (CONFIRMED 2026-05-20) ──────────────────────────────────────────────────────────────────────────── [stream of 32-byte blocks] Body length is approximately ``n_intervals * 32`` bytes plus a small trailing remnant (1-9 bytes typically) at the very end. Walker should iterate 32-stride and stop before the tail. ──────────────────────────────────────────────────────────────────────────── 32-byte block layout ──────────────────────────────────────────────────────────────────────────── [0] 0x00 always-zero tag [1] segment_id (uint8) 0x00..0x03 — 256 blocks per segment [2:4] block_ctr (uint16 LE) resets each segment (0x0100, 0x0101, …) [4:6] 0x000a (uint16 LE) constant marker (= 10) [6] T_peak_count uint8 Tran peak (count × 0.005 → in/s, max 1.275 in/s) [7] T_annotation uint8 empirically non-zero on intervals with sub-Hz or unmeasurable Tran freq; meaning not fully RE'd [8:10] T_halfperiod uint16 LE Tran half-period in samples (freq = 512 / halfp Hz) [10] V_peak_count uint8 [11] V_annotation uint8 [12:14] V_halfperiod uint16 LE [14] L_peak_count uint8 [15] L_annotation uint8 [16:18] L_halfperiod uint16 LE [18] M_peak_count uint8 MicL peak (count → dB via mic_count_to_db) [19] M_annotation uint8 [20:22] M_halfperiod uint16 LE MicL half-period in samples (freq = 512 / halfp Hz) [22:24] 0x00 0x00 constant [24:28] 4-byte variable purpose unknown (possibly CRC or timestamp delta) [28:32] 0x1e 0x0a 0x00 0x00 constant block-end signature NOTE on peak-count width: an earlier interpretation treated the peak fields as uint16 LE spanning [6:8] / [10:12] / [14:16] / [18:20]. That happened to be byte-exact against the N844 fixture corpus only because every annotation byte in those fixtures was zero, making ``uint16 LE == uint8``. Cross-correlating BE9558 (K558) Tran-drift and BE18003 (T003) Histogram+Continuous events against the BW ASCII export proved peak is uint8 alone — see test_histogram_codec.py and docs/histogram_codec_re_status.md. Block-identification anchor: ``block[22:24] == b"\\x00\\x00"`` AND ``block[28:32] == b"\\x1e\\x0a\\x00\\x00"``. This is the reliable distinguisher from non-block content in the file. ──────────────────────────────────────────────────────────────────────────── Per-channel encoding ──────────────────────────────────────────────────────────────────────────── Geophone channels (Tran, Vert, Long): - peak_count × 0.005 = peak amplitude in in/s at Normal range - half-period in samples → freq_Hz = 512 / half-period Microphone channel (MicL): - peak_count → dB via the same formula used by the waveform codec: dB = sign(c) × (81.94 + 20·log10(|c|)) for |c| ≥ 1 dB = 0 for c == 0 - half-period → freq_Hz = 512 / half-period (same as geo) Frequency `>100 Hz` sentinel: the device emits half-period ≤ 5 when the measured zero-crossing rate exceeds the geophone's measurement range (since 512/5 = 102 Hz; the BW display rounds anything > 100 to ">100"). ──────────────────────────────────────────────────────────────────────────── Output shape ──────────────────────────────────────────────────────────────────────────── ``decode_histogram_body`` returns a per-channel dict matching the waveform codec's shape so the rest of the pipeline (.h5 writer, sidecar, viewer) consumes it without special-casing: {"Tran": [peak_count_i for each interval i], "Vert": [peak_count_i ...], "Long": [peak_count_i ...], "MicL": [peak_count_i ...]} Values are in **16-count units for geo** (LSB = 0.005 in/s, matching ``decode_waveform_v2``) and **1-count units for mic** (matching the waveform codec's mic convention). Run through ``waveform_codec.decoded_to_adc_counts`` to scale geo to 1-count ADC. Per-interval frequencies are NOT returned — they're auxiliary data, not waveform samples. Consumers needing frequencies can call ``decode_histogram_body_full()`` for the structured per-interval record list. """ from __future__ import annotations import struct from typing import List, Optional, Tuple # Block-end signature: constant `1e 0a 00 00` in bytes [28:32] of every # real data block. More distinctive than the byte-22 `00 00` (which # matches many false positives), so we anchor on this. _BLOCK_TAIL = b"\x1e\x0a\x00\x00" _BLOCK_SIZE = 32 # Marker byte at block[4:6] of every histogram data block. Used as # additional validation that we're looking at a real block. _BLOCK_MARKER = 10 # Geo peak scaling: stored as "count × 0.005 in/s" where 1 count = one # 0.005 in/s display quantum. Equivalent to the waveform codec's # 16-count-unit output (1 unit = 0.005 in/s = 16 ADC counts). _GEO_LSB_INS = 0.005 # Frequency formula: freq_Hz = _FREQ_NUMERATOR / half_period_samples. # Empirically determined to be 512 (= sample_rate / 2, where sample rate # is 1024 sps for the standard MiniMate Plus configuration). _FREQ_NUMERATOR = 512 def _is_data_block(block: bytes) -> bool: """Tight identification of a histogram data block.""" if len(block) < _BLOCK_SIZE: return False if block[28:32] != _BLOCK_TAIL: return False if block[22:24] != b"\x00\x00": return False if block[0] != 0x00: return False marker = block[4] | (block[5] << 8) if marker != _BLOCK_MARKER: return False return True def _decode_block(block: bytes) -> Optional[dict]: """Decode one 32-byte histogram block. Caller must have validated with ``_is_data_block`` first. Returns a record with per-channel peak counts (uint8) and half-periods (uint16 LE). """ # Peak counts are uint8 at bytes [6] / [10] / [14] / [18]. The # adjacent bytes [7] / [11] / [15] / [19] hold an annotation field # whose meaning isn't fully understood (empirically non-zero in # intervals with sub-Hz or unmeasurable geo frequencies, mostly # zero otherwise — see test fixtures from BE9558/BE18003 corpora). # Crucially, those annotation bytes are NOT the high byte of the # peak count: cross-correlating against BW's per-interval ASCII # export proves the peak is uint8 alone. # # Reading the peak as uint16 LE (the original interpretation) was # accidentally correct only because every block in the N844 fixture # corpus had a zero annotation byte; non-N844 events with non-zero # annotation bytes decoded to physically impossible peaks (e.g. # 268 in/s per channel) and produced 35× inflated PVS sums when # first run against prod data. See histogram_codec_re_status.md. t_peak = block[6] v_peak = block[10] l_peak = block[14] m_peak = block[18] t_halfp = block[8] | (block[9] << 8) v_halfp = block[12] | (block[13] << 8) l_halfp = block[16] | (block[17] << 8) m_halfp = block[20] | (block[21] << 8) segment_id = block[1] block_ctr = block[2] | (block[3] << 8) var_meta = bytes(block[24:28]) annotations = (block[7], block[11], block[15], block[19]) return { "segment_id": segment_id, "block_ctr": block_ctr, "t_peak": t_peak, "t_halfp": t_halfp, "v_peak": v_peak, "v_halfp": v_halfp, "l_peak": l_peak, "l_halfp": l_halfp, "m_peak": m_peak, "m_halfp": m_halfp, "meta_var": var_meta, "annotations": annotations, } def walk_body(body: bytes) -> List[dict]: """Walk the body and return one dict per histogram interval. Iterates 32-byte strides from offset 0. Yields a decoded record for every block that passes ``_is_data_block`` validation. Stops when the remaining bytes are too short to form a complete block. In Histogram+Continuous mode the body interleaves data blocks with other 32-byte content (likely continuous-mode waveform blocks) that fail the data-block validation; the walker naturally skips them without losing 32-byte alignment. Use ``block_ctr`` from each returned record to map back to the original interval index — the record list is sparse when other block types are interleaved. """ records: List[dict] = [] for off in range(0, len(body) - _BLOCK_SIZE + 1, _BLOCK_SIZE): blk = body[off:off + _BLOCK_SIZE] if not _is_data_block(blk): # Hit non-block content (likely a sync or stream marker). # Continue walking — block alignment is fixed at 32-stride # from offset 0, so we don't lose alignment by skipping. continue decoded = _decode_block(blk) if decoded is None: # Block validated as a histogram block but had peak fields # outside the plausible range — undocumented extension. # Skip rather than propagating bogus PVS contributions. continue records.append(decoded) return records def decode_histogram_body(body: bytes) -> Optional[dict]: """Decode a histogram-mode body into per-channel peak-sample arrays. Returns ``{"Tran": [...], "Vert": [...], "Long": [...], "MicL": [...]}`` where each channel's list contains one peak value per histogram interval (in the same units the waveform codec uses: 16-count units for geo, 1-count ADC units for mic). Returns ``None`` if the body doesn't contain any valid histogram blocks. To convert to physical units: - Geo channels: ``count * 0.005`` = peak in in/s at Normal range (or run through ``waveform_codec.decoded_to_adc_counts`` first to get 1-count ADC values, then ``count / 32767 * 10.0`` for in/s) - Mic channel: use ``waveform_codec.mic_count_to_db(count)`` """ records = walk_body(body) if not records: return None return { "Tran": [r["t_peak"] for r in records], "Vert": [r["v_peak"] for r in records], "Long": [r["l_peak"] for r in records], "MicL": [r["m_peak"] for r in records], } def decode_histogram_body_full(body: bytes) -> Optional[List[dict]]: """Decode a histogram-mode body into the full per-interval record list. Same data as ``decode_histogram_body`` but in a structured form that preserves the half-period (frequency) data for each channel + the per-block segment_id, block_ctr, and 4-byte variable metadata. Useful for diagnostic tools, sidecar enrichment, and future-codec work. Returns ``None`` if the body has no valid blocks. """ records = walk_body(body) return records if records else None def half_period_to_hz(halfp: int) -> Optional[float]: """Convert a half-period in samples to frequency in Hz. Returns ``None`` for half-period ≤ 5 — the device emits values in that range when the measured zero-crossing rate exceeds 100 Hz (the BW display reports `>100 Hz` for such cases). Callers can treat ``None`` as the `>100 Hz` sentinel. """ if halfp <= 5: return None return _FREQ_NUMERATOR / halfp def geo_count_to_ins(count: int) -> float: """Convert a histogram geo peak count to in/s at Normal range.""" return count * _GEO_LSB_INS