Files
seismo-relay/minimateplus/waveform_codec.py
T
Claude 85f4bcfe86 codec: wire decode_waveform_v2 into production; add MicL dB helper
Replaces the broken legacy int16 LE decoder in client.py with the
verified multi-channel codec.  Three changes:

1. blastware_file.extract_body_bytes(a5_frames) — new helper that
   factors out the body-reconstruction logic from write_blastware_file
   so both writers (BW binary) and decoders (sample arrays) can use
   the same canonical bytes.

2. waveform_codec.decode_a5_frames(a5_frames) — production entry point.
   Returns the raw_samples dict consumers expect (Tran/Vert/Long as
   int16 ADC counts; MicL as native ADC counts).  Internally:
     A5 frames → extract_body_bytes → decode_waveform_v2
                → decoded_to_adc_counts (geos ×16; mic pass-through)

3. waveform_codec.mic_count_to_db(count) — MicL ADC → dB(L) per BW's
   display formula:
     dB = sign(count) × (81.94 + 20 × log10(|count|))   for |count| ≥ 1
   Verified against V70 fixture: count=813 → 140.14 dB (BW PSPL 140.1).

client.py:_decode_a5_waveform is reduced to a thin wrapper that calls
decode_a5_frames and populates event.raw_samples.  Original implementation
preserved as _decode_a5_waveform_LEGACY (dead code; reference only).

Also fixed a tail-end bug in decode_waveform_v2 where trailer-section
"40 02" markers (containing ASCII serial bytes, NOT real segment headers)
were being mis-interpreted, producing 2 spurious samples per channel at
the end of each event.  Added bytes [12:14] == "02 00" validation to
reject non-header markers.

7 new pytest tests cover the new helpers and dB conversion.  Total:
71 passing (up from 64).

Known limitation (carried over from before): the walker still stops
mid-event on the loudest fixtures (SP0/SS0/SV0/event-b) at some
mid-segment edge cases not yet characterized.  Every sample reached
is decoded correctly; the walker just doesn't reach all of them.
Loud events still yield 5,000–15,000 byte-exact samples each.
2026-05-20 17:28:54 +00:00

563 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
waveform_codec.py — block-walker and verified decoder for the MiniMate Plus
waveform-file body.
FULLY DECODED 2026-05-11. Every block type, every channel, and the
channel-rotation rule are verified byte-exact against BW's ASCII export
across the 9-event fixture bundle (47,364 ADC samples, zero errors).
The Blastware waveform-file body — the bytes between the 21-byte STRT
record and the 26-byte file footer — is a tagged variable-length block
stream with a custom delta + RLE codec. (Not raw int16 LE, which was
the historical wrong assumption that produced ±32K noise on every event.)
Current status:
- Block framing: ✅ solved (5 block types and lengths all confirmed)
- Per-channel decode: ✅ solved (Tran / Vert / Long / MicL all byte-exact)
- Channel rotation: ✅ Tran → Vert → Long → MicL per segment
- Segment header: ✅ fully decoded (anchor pair + prev-channel extension)
- 30 NN packed-delta block: ✅ NN × 12-bit signed deltas in NN/4 groups
- MicL → dB(L) conversion: ✅ ``mic_count_to_db`` matches BW display
- Production wiring: ✅ ``client.py:_decode_a5_waveform`` uses the new
codec (via ``decode_a5_frames``). ``.h5`` sidecars now render
correctly.
Known limitations:
- Walker stops early on the loudest events (SP0, SS0, SV0, event-b) at
some mid-segment edge cases not yet fully characterized. Every
sample reached IS correct; the walker just doesn't reach all of
them yet. The cleanly-decoded subset is still ~500015000 samples
per loud event.
────────────────────────────────────────────────────────────────────────────
Body layout (CONFIRMED 2026-05-11 against 8 fixture events)
────────────────────────────────────────────────────────────────────────────
[7-byte preamble] [stream of tagged blocks] [trailer]
The preamble is always exactly 7 bytes:
body[0:3] = 00 02 00 magic
body[3:5] = Tran[0] int16 BE in 16-count units (LSB = 0.005 in/s)
body[5:7] = Tran[1] int16 BE in 16-count units
(Earlier drafts of this module described a "7-or-9-byte preamble";
that was wrong — single-shot and continuous events both use 7 bytes.
The "extra 2 bytes" on continuous events were the first ``00 NN`` RLE
marker, not part of the preamble.)
Block types and lengths (all confirmed):
| Tag | Length | Meaning |
|----------|-----------------------|----------------------------------------|
| ``10 NN``| NN/2 + 2 bytes | 4-bit nibble deltas (2 per byte; high |
| | | nibble first; signed 0..7 / 8..F = -8..-1)|
| ``20 NN``| NN + 2 bytes | int8 signed deltas (1 per byte) |
| ``00 NN``| 2 bytes | RLE: append NN copies of current value |
| ``30 NN``| NN*2 in data, NN*4 | Unknown content. Only in loud events. |
| | in trailer | |
| ``40 02``| 20 bytes (fixed) | Segment header |
NN is always a multiple of 4.
────────────────────────────────────────────────────────────────────────────
Tran channel, segment 0 (CONFIRMED 2026-05-11)
────────────────────────────────────────────────────────────────────────────
Segment 0 — everything before the first ``40 02`` segment header — encodes
Tran samples only. Starting from preamble anchors Tran[0] and Tran[1],
each subsequent block contributes to the running Tran value:
10 NN → append NN deltas (4-bit signed nibbles)
20 NN → append NN deltas (int8 signed bytes)
00 NN → append NN copies of the current value (RLE zeros)
40 02 → segment 0 ends; multi-segment continuation is open
This decodes the first 482510 samples of Tran for each event with zero
errors against BW's ASCII export. The exact segment-0 sample count
varies per event (it's bounded by a fixed device-flash byte budget, not
a fixed sample count — quiet events fit more samples because zero
deltas pack into ``00 NN`` markers compactly).
Implementation: :func:`decode_tran_initial`.
────────────────────────────────────────────────────────────────────────────
Segment header (40 02, 20 bytes total)
────────────────────────────────────────────────────────────────────────────
The 18-byte payload of the ``40 02`` block:
| Offset | Field | Status |
|-----------|---------------------------------------------|-------------|
| [0:2] | T_delta at first sample of new segment | ✅ confirmed|
| | (int16 BE, in 16-count units) | |
| [2:4] | Likely T_delta at sample seg_start+1 | 🟡 likely |
| [4:6] | Unknown (varies; possibly checksum) | ❓ open |
| [6:8] | Byte length to next segment header 2 | ✅ confirmed|
| | (uint16 BE; useful for walker pre-scan) | |
| [8:12] | Monotonic uint32 LE counter | ✅ confirmed|
| | (starts ~0x47, increments by 1 per segment) | |
| [12:14] | Constant ``02 00`` | ✅ confirmed|
| [14:18] | Unknown 4-byte field | ❓ open |
────────────────────────────────────────────────────────────────────────────
What breaks the multi-segment decoder (the main open question)
────────────────────────────────────────────────────────────────────────────
After segment 0 ends and the segment header T_delta is consumed,
applying segment 1's blocks as Tran continuation produces values that
diverge from truth by sample ~512. The block structure inside segment
1 is IDENTICAL to segment 0 (same alternating 10 NN / 00 NN pattern),
and the delta budget matches the segment size exactly (V70 segment 1
has 264 nibble-deltas + 244 RLE zeros = 508 = the segment's sample
count). But the cumulative is wrong.
The strongest unverified hypothesis is that segments rotate channels:
segment 0 → Tran samples 0..509
segment 1 → Vert samples 0..507
segment 2 → Long samples 0..507
segment 3 → Mic samples 0..507
segment 4 → Tran samples 510..N (continuation)
...
This is consistent with the segment-1 block sums net-to-near-zero in
V70 (where all 4 channels are near zero) and with the per-segment delta
budget matching the segment size for a single channel. It is NOT yet
verified because the per-segment channel anchor isn't pinned down in
the segment header — bytes [4:6] and [14:18] of the header are still
open and probably encode V/L/M anchors.
See ``docs/waveform_codec_re_status.md`` for the current working notes
and the suggested next experiment ("segment-channel scoring analyzer").
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from typing import List, Optional, Tuple
@dataclass
class WaveformBlock:
"""One tagged block parsed out of a Blastware waveform-file body."""
offset: int # byte offset into body
tag_hi: int # first tag byte (0x10 / 0x20 / 0x00 / 0x30 / 0x40)
tag_lo: int # second tag byte (NN)
data: bytes # block payload (excludes the 2-byte tag)
length: int # total block length on the wire (includes the tag)
@property
def kind(self) -> str:
return f"{self.tag_hi:02x} {self.tag_lo:02x}"
def find_data_start(body: bytes) -> int:
"""Auto-detect the offset of the first data block.
The body starts with a 7-byte preamble (magic ``00 02 00`` + two int16 BE
Tran anchors). After that, the data section starts with a tag — usually
``10 NN`` or ``20 NN``, but quiet events may begin with a ``00 NN`` RLE
marker. We return the offset of the first recognized tag.
"""
# Try fixed offset 7 first (canonical preamble length).
if len(body) >= 9:
b, nn = body[7], body[8]
if (b in (0x00, 0x10, 0x20, 0x30) and nn % 4 == 0 and 0 < nn <= 0xFC) \
or (b == 0x40 and nn == 0x02):
return 7
# Fall back to scanning the first 20 bytes.
for i in range(min(20, len(body) - 1)):
b = body[i]
nn = body[i + 1]
if b in (0x10, 0x20) and nn % 4 == 0 and 0 < nn <= 0xFC:
return i
return -1
def walk_body(body: bytes, start: Optional[int] = None) -> List[WaveformBlock]:
"""Walk the tagged-block sequence starting at *start* (auto-detected by default).
Stops when an unrecognized tag is encountered or end of body is reached.
Returned blocks are in stream order.
"""
if start is None:
start = find_data_start(body)
if start < 0:
return []
blocks: List[WaveformBlock] = []
i = start
while i + 1 < len(body):
t0 = body[i]
t1 = body[i + 1]
if t0 == 0x10 and t1 % 4 == 0 and 0 < t1 <= 0xFC:
length = t1 // 2 + 2
elif t0 == 0x20 and t1 % 4 == 0 and 0 < t1 <= 0xFC:
length = t1 + 2
elif t0 == 0x00 and t1 % 4 == 0:
length = 2
elif t0 == 0x30 and t1 % 4 == 0 and 0 < t1 <= 0x10:
# Data-section ``30 NN`` blocks carry NN 12-bit signed deltas packed
# as NN/4 groups of (2-byte high-nibble field + 4 × int8 low byte).
# Length = NN/4 × 6 + 2 = NN × 1.5 + 2 (= 8 for NN=4, 14 for NN=8,
# 20 for NN=12, etc.). Confirmed 2026-05-11 by full-decoder
# verification against BW ASCII export.
#
# Trailer-section ``30 NN`` blocks have a different length formula
# (NN × 4 = 32 for NN=8 in trailers). We try the data-section
# length first and fall back to the trailer length if needed.
cand_data = t1 * 3 // 2 + 2
cand_trailer = t1 * 4
if (i + cand_data < len(body) - 1
and body[i + cand_data] in (0x10, 0x20, 0x00, 0x30, 0x40)):
length = cand_data
else:
length = cand_trailer
elif t0 == 0x40 and t1 == 0x02:
length = 20
else:
# Unknown tag; stop. Caller can inspect ``i`` to see where.
break
if i + length > len(body):
break
data = bytes(body[i + 2 : i + length])
blocks.append(WaveformBlock(offset=i, tag_hi=t0, tag_lo=t1, data=data, length=length))
i += length
return blocks
def split_segments(blocks: List[WaveformBlock]) -> List[List[WaveformBlock]]:
"""Group consecutive blocks into segments separated by ``40 02`` headers.
The first segment is whatever runs before the first ``40 02`` header
(typically the "segment 0" preamble data after the body preamble).
Subsequent segments start with a ``40 02`` block, then have their
own data blocks until the next ``40 02``.
"""
segments: List[List[WaveformBlock]] = []
current: List[WaveformBlock] = []
for b in blocks:
if b.tag_hi == 0x40 and b.tag_lo == 0x02:
if current:
segments.append(current)
current = [b]
else:
current.append(b)
if current:
segments.append(current)
return segments
def parse_segment_header(block: WaveformBlock) -> Optional[dict]:
"""Decode the 18-byte payload of a ``40 02`` segment header.
Returns a dict with the labelled fields, or None if *block* is not
a ``40 02`` header.
"""
if not (block.tag_hi == 0x40 and block.tag_lo == 0x02):
return None
if len(block.data) < 18:
return None
p = block.data
counter = int.from_bytes(p[8:12], "little", signed=False)
return {
"anchor_bytes": p[0:4], # 4-byte field, role unconfirmed
"field2": p[4:8], # 4-byte field, role unconfirmed
"counter": counter, # uint32 LE — increments by 1 per segment
"fixed_pattern": p[12:16], # always b"\x02\x00\x00\x01"
"tail": p[16:18], # last 2 bytes
}
def _s4(n: int) -> int:
"""Sign-extend a 4-bit value to signed int (0..7 → 0..7; 8..F → -8..-1)."""
return n if n < 8 else n - 16
def _i8(b: int) -> int:
"""Reinterpret an unsigned byte as signed int8."""
return b if b < 128 else b - 256
def decode_tran_initial(body: bytes) -> Optional[List[int]]:
"""
Decode the initial Tran-channel samples — VERIFIED 2026-05-11.
Returns Tran samples in **16-count units** (LSB = 0.005 in/s at Normal
range — the same quantization BW uses for its ASCII export). Returns
``None`` if the body cannot be parsed.
The decoded list extends from sample 0 through the end of segment 0
(= just before the first ``40 02`` segment header; ~510 sample-sets
for the events tested). Multi-segment decoding requires continuing
past the segment header — that's done by :func:`decode_tran_full`
when the per-segment rules are pinned down for all signal types.
Codec for segment 0 (CONFIRMED 2026-05-11 against 7 fixture events):
- Body bytes [0:3] are the magic ``00 02 00``.
- Body bytes [3:5] = ``Tran[0]`` as int16 BE in 16-count units.
- Body bytes [5:7] = ``Tran[1]`` as int16 BE in 16-count units.
- Data blocks (``10 NN`` or ``20 NN``) carry Tran deltas starting
at sample 2:
* ``10 NN``: NN nibbles = NN/2 bytes; each nibble is a 4-bit
signed delta (0..7 → 0..+7; 8..F → -8..-1). High nibble of
each byte comes first.
* ``20 NN``: NN int8 signed deltas (one delta per byte).
- ``00 NN`` blocks are run-length-encoded zero deltas: append NN
copies of the current cumulative Tran value (no change).
- ``30 NN`` blocks have not yet been decoded for content — they
appear in segment 0 of loud-from-start events (SS0, SV0) and
seem to signal a transition or special-case interpretation.
The walker steps over them but their data is ignored.
The walk stops at the first ``40 02`` segment header.
"""
if len(body) < 7 or body[0:3] != b"\x00\x02\x00":
return None
t0 = int.from_bytes(body[3:5], "big", signed=True)
t1 = int.from_bytes(body[5:7], "big", signed=True)
start = find_data_start(body)
if start < 0:
return [t0, t1]
out = [t0, t1]
cur = t1
for blk in walk_body(body, start):
if blk.tag_hi == 0x40:
# Segment boundary — stop. Multi-segment decode is decode_tran_full.
break
if blk.tag_hi == 0x10:
for byte in blk.data:
for nib in ((byte >> 4) & 0xF, byte & 0xF):
cur += _s4(nib)
out.append(cur)
elif blk.tag_hi == 0x20:
for byte in blk.data:
cur += _i8(byte)
out.append(cur)
elif blk.tag_hi == 0x00:
# RLE zero deltas: append NN copies of current Tran value.
for _ in range(blk.tag_lo):
out.append(cur)
# 30 NN: unknown content; skip.
return out
def decode_waveform_v2(body: bytes) -> Optional[dict]:
"""
Decode the body into per-channel sample arrays.
Status (2026-05-11 evening — channel-rotation hypothesis CONFIRMED):
segments rotate channels in fixed order **Tran → Vert → Long → MicL**.
Each channel-segment carries a 2-sample anchor pair in segment-header
bytes [14:18] (or in the body preamble for the initial Tran segment)
plus a stream of delta blocks for samples 2 onward.
Returns ``{"Tran": [...], "Vert": [...], "Long": [...], "MicL": [...]}``
with each channel's decoded samples in 16-count units (LSB = 0.005
in/s at Normal range). Returns ``None`` if the body cannot be
parsed.
"""
if len(body) < 7 or body[0:3] != b"\x00\x02\x00":
return None
channels = ["Tran", "Vert", "Long", "MicL"]
out: dict = {ch: [] for ch in channels}
# Initial Tran segment: preamble anchor pair + delta blocks before first 40 02.
t0 = int.from_bytes(body[3:5], "big", signed=True)
t1 = int.from_bytes(body[5:7], "big", signed=True)
out["Tran"].extend([t0, t1])
start = find_data_start(body)
if start < 0:
return out
blocks = walk_body(body, start)
seg_idx = [i for i, b in enumerate(blocks) if b.tag_hi == 0x40]
def apply_blocks(channel: str, anchor: int,
block_start: int, block_end: int) -> int:
"""Apply delta blocks [block_start, block_end) to *channel*'s sample
list, starting from *anchor*. Returns the final cumulative value."""
cur = anchor
for bi in range(block_start, block_end):
blk = blocks[bi]
if blk.tag_hi == 0x10:
for byte in blk.data:
for nib in ((byte >> 4) & 0xF, byte & 0xF):
cur += _s4(nib)
out[channel].append(cur)
elif blk.tag_hi == 0x20:
for byte in blk.data:
cur += _i8(byte)
out[channel].append(cur)
elif blk.tag_hi == 0x00:
for _ in range(blk.tag_lo):
out[channel].append(cur)
elif blk.tag_hi == 0x30:
# 12-bit signed deltas, packed as NN/4 groups of 6 bytes each:
# bytes [0:2] = 16 bits = 4 × 4-bit high nibbles (MSB first)
# bytes [2:6] = 4 × int8 low bytes
# Each delta = sign_extend_12((high_nibble << 8) | low_byte).
# Confirmed 2026-05-11 against all 14 ``30 NN`` blocks in the
# bundled fixtures.
n_groups = blk.tag_lo // 4
for g in range(n_groups):
grp = blk.data[g * 6 : (g + 1) * 6]
if len(grp) < 6:
break
high_word = (grp[0] << 8) | grp[1]
for k in range(4):
nib = (high_word >> (12 - 4 * k)) & 0xF
v = (nib << 8) | grp[2 + k]
if v >= 0x800:
v -= 0x1000
cur += v
out[channel].append(cur)
# 40 02: should not occur in segment data.
return cur
# Initial Tran segment: deltas from start of body up to first 40 02 (or end).
first_seg = seg_idx[0] if seg_idx else len(blocks)
last_tran_value = apply_blocks("Tran", t1, 0, first_seg)
# Subsequent segments rotate channels. Each segment header carries:
# bytes [0:2] and [2:4] = 2 deltas extending the PREVIOUS channel
# bytes [14:16] and [16:18] = anchor pair for THIS segment's channel
#
# Rotation: V, L, M, T, V, L, M, T, ... (initial Tran segment is the
# implicit T in the cycle.)
rotation = ["Vert", "Long", "MicL", "Tran"]
# Track each channel's "running cumulative value" so we can apply the
# previous-channel extension deltas at every segment boundary.
last_value = {"Tran": last_tran_value, "Vert": None, "Long": None, "MicL": None}
for k, hi in enumerate(seg_idx):
channel = rotation[k % 4]
prev_channel = "Tran" if k == 0 else rotation[(k - 1) % 4]
header = blocks[hi]
if len(header.data) < 18:
continue
# Validate: real segment headers have bytes [12:14] = `02 00`.
# Trailer/footer "40 02" markers contain ASCII serial bytes or other
# non-header data there and would otherwise be mis-interpreted as
# segment headers, adding spurious samples at the tail.
if header.data[12:14] != b"\x02\x00":
break
# Extend the PREVIOUS channel by 2 more samples (deltas in bytes [0:4]).
prev_d0 = int.from_bytes(header.data[0:2], "big", signed=True)
prev_d1 = int.from_bytes(header.data[2:4], "big", signed=True)
if last_value[prev_channel] is not None:
v = last_value[prev_channel] + prev_d0
out[prev_channel].append(v)
v += prev_d1
out[prev_channel].append(v)
last_value[prev_channel] = v
# Anchor pair for THIS segment's channel.
c0 = int.from_bytes(header.data[14:16], "big", signed=True)
c1 = int.from_bytes(header.data[16:18], "big", signed=True)
out[channel].extend([c0, c1])
# Apply delta blocks for this segment.
next_hi = seg_idx[k + 1] if k + 1 < len(seg_idx) else len(blocks)
last_value[channel] = apply_blocks(channel, c1, hi + 1, next_hi)
return out
# ── ADC-scale conversion helpers ────────────────────────────────────────────
# Scaling factor: decode_waveform_v2 produces geo-channel samples in the BW
# display quantization (16-count units, LSB = 0.005 in/s at Normal range).
# The legacy consumer pipeline (sfm/event_hdf5.py) expects raw_samples in
# 1-count ADC units (× full_scale / 32768 → physical). To plug the new
# decoder in without rewriting consumers, multiply geo values by 16.
#
# Mic samples are already in raw ADC counts (decoded value 1 = 1 mic ADC count
# = -81.94 dB on the BW display). Mic values pass through unchanged.
_GEO_DECODER_TO_ADC = 16
def decoded_to_adc_counts(decoded: dict) -> dict:
"""Convert :func:`decode_waveform_v2` output to int16 ADC counts.
Geo channels are scaled by ×16 (decoder produces 16-count units,
consumer expects 1-count ADC). Mic is passed through as raw counts.
"""
if not decoded:
return {}
return {
"Tran": [v * _GEO_DECODER_TO_ADC for v in decoded.get("Tran", [])],
"Vert": [v * _GEO_DECODER_TO_ADC for v in decoded.get("Vert", [])],
"Long": [v * _GEO_DECODER_TO_ADC for v in decoded.get("Long", [])],
"MicL": list(decoded.get("MicL", [])),
}
def mic_count_to_db(count: int) -> float:
"""Convert a MicL ADC count to dB(L) for BW-display-compatible output.
Empirical formula (confirmed 2026-05-11 against V70 fixture: count=813
→ 140.1 dB; count=±1 → ±81.94 dB; count=±24 → ±109.5 dB):
dB = sign(count) × (81.94 + 20 × log10(|count|)) for |count| ≥ 1
dB = 0.0 for count == 0
The constant 81.94 corresponds to 10^(81.94/20) ≈ 12490 mic ADC counts
being the dB(L) reference level — almost certainly a calibration
constant from the device's mic.
"""
if count == 0:
return 0.0
sign = 1.0 if count > 0 else -1.0
return sign * (81.94 + 20.0 * math.log10(abs(count)))
# ── A5-frame entry point ────────────────────────────────────────────────────
def decode_a5_frames(a5_frames) -> Optional[dict]:
"""Decode a list of A5 (BULK_WAVEFORM_STREAM) frames into per-channel
int16 ADC samples.
Returns ``{"Tran": [...], "Vert": [...], "Long": [...], "MicL": [...]}``
with each channel's samples in **1-count ADC units** (the legacy
``event.raw_samples`` convention — multiply by ``full_scale / 32768``
to convert to physical units; for mic, use :func:`mic_count_to_db` or
a per-count psi factor).
Returns ``None`` if the frames cannot be parsed.
This is the wired-up production entry point. It:
1. Reconstructs the BW-binary body bytes from the A5 frames
(``blastware_file.extract_body_bytes``).
2. Runs the verified codec (``decode_waveform_v2``) on the body.
3. Converts to int16 ADC counts via :func:`decoded_to_adc_counts`.
"""
# Local import to avoid a cycle: blastware_file imports models and
# ultimately client.py imports waveform_codec.
from .blastware_file import extract_body_bytes
if not a5_frames:
return None
_strt, body, _footer = extract_body_bytes(a5_frames)
if not body:
return None
decoded = decode_waveform_v2(body)
if decoded is None:
return None
return decoded_to_adc_counts(decoded)