codec-re: channel rotation CONFIRMED — full multi-channel decoder works

The segment-channel scoring analyzer (from scratch/next_experiment_skeleton.py)
ran and immediately confirmed the rotation hypothesis:

  SP0 seg 0: best fit Vert  508/508  ✓
  SP0 seg 1: best fit Long  508/508  ✓
  SP0 seg 3: best fit Tran  508/508  ✓  (Tran continuation)
  SP0 seg 5: best fit Long  508/508  ✓
  SP0 seg 9: best fit Long  508/508  ✓
  V70 seg 0: best fit Vert  508/508  ✓
  V70 seg 1: best fit Long  508/508  ✓

Channels rotate Tran → Vert → Long → MicL per 40 02 segment header.

Also discovered the segment header has DOUBLE duty: bytes [14:18] anchor
the NEW segment's channel (2 samples as int16 BE in 16-count units), AND
bytes [0:4] extend the PREVIOUS channel by 2 more samples (2 deltas as
int16 BE).  This is the same "2 anchors + delta stream" structure as the
body preamble for Tran.

decode_waveform_v2 now returns full per-channel sample dicts.
Byte-exact verified ranges:
  V70: Tran 512, Vert 512, Long 512   (all first segments)
  JQ0: Tran 512, Vert 258
  SP0: Long 1536 (all 3 L segments)

Still open: the 30 NN block format (high-amplitude packed deltas) —
appears mid-segment when single-byte deltas can't carry the magnitude.

6 new tests bring the count to 46.  All passing.
This commit is contained in:
Claude
2026-05-12 03:57:38 +00:00
committed by serversdown
parent ae0e17b5dc
commit 07675626dc
6 changed files with 365 additions and 136 deletions
+89 -12
View File
@@ -350,17 +350,94 @@ def decode_waveform_v2(body: bytes) -> Optional[dict]:
"""
Decode the body into per-channel sample arrays.
Returns ``None`` because the full multi-channel decoder is not yet
wired up. Tran is partially solved — see :func:`decode_tran_initial`
for the initial portion (verified against ground-truth BW exports).
Status (2026-05-11 evening — channel-rotation hypothesis CONFIRMED):
segments rotate channels in fixed order **Tran → Vert → Long → MicL**.
Each channel-segment carries a 2-sample anchor pair in segment-header
bytes [14:18] (or in the body preamble for the initial Tran segment)
plus a stream of delta blocks for samples 2 onward.
Status (2026-05-11):
- Tran[0:N] correctly decoded by ``decode_tran_initial`` for the
first N samples of every fixture (where N = 22 / 42 / 46
depending on event).
- Subsequent Tran samples + all Vert / Long / MicL samples: open.
The block stream after the first data block likely interleaves
channels with ``30 NN`` channel-switch markers, but the exact
switching rule is still under investigation.
Returns ``{"Tran": [...], "Vert": [...], "Long": [...], "MicL": [...]}``
with each channel's decoded samples in 16-count units (LSB = 0.005
in/s at Normal range). Returns ``None`` if the body cannot be
parsed.
"""
return None
if len(body) < 7 or body[0:3] != b"\x00\x02\x00":
return None
channels = ["Tran", "Vert", "Long", "MicL"]
out: dict = {ch: [] for ch in channels}
# Initial Tran segment: preamble anchor pair + delta blocks before first 40 02.
t0 = int.from_bytes(body[3:5], "big", signed=True)
t1 = int.from_bytes(body[5:7], "big", signed=True)
out["Tran"].extend([t0, t1])
start = find_data_start(body)
if start < 0:
return out
blocks = walk_body(body, start)
seg_idx = [i for i, b in enumerate(blocks) if b.tag_hi == 0x40]
def apply_blocks(channel: str, anchor: int,
block_start: int, block_end: int) -> int:
"""Apply delta blocks [block_start, block_end) to *channel*'s sample
list, starting from *anchor*. Returns the final cumulative value."""
cur = anchor
for bi in range(block_start, block_end):
blk = blocks[bi]
if blk.tag_hi == 0x10:
for byte in blk.data:
for nib in ((byte >> 4) & 0xF, byte & 0xF):
cur += _s4(nib)
out[channel].append(cur)
elif blk.tag_hi == 0x20:
for byte in blk.data:
cur += _i8(byte)
out[channel].append(cur)
elif blk.tag_hi == 0x00:
for _ in range(blk.tag_lo):
out[channel].append(cur)
# 30 NN: unknown content; skip.
# 40 02: should not occur in segment data.
return cur
# Initial Tran segment: deltas from start of body up to first 40 02 (or end).
first_seg = seg_idx[0] if seg_idx else len(blocks)
last_tran_value = apply_blocks("Tran", t1, 0, first_seg)
# Subsequent segments rotate channels. Each segment header carries:
# bytes [0:2] and [2:4] = 2 deltas extending the PREVIOUS channel
# bytes [14:16] and [16:18] = anchor pair for THIS segment's channel
#
# Rotation: V, L, M, T, V, L, M, T, ... (initial Tran segment is the
# implicit T in the cycle.)
rotation = ["Vert", "Long", "MicL", "Tran"]
# Track each channel's "running cumulative value" so we can apply the
# previous-channel extension deltas at every segment boundary.
last_value = {"Tran": last_tran_value, "Vert": None, "Long": None, "MicL": None}
for k, hi in enumerate(seg_idx):
channel = rotation[k % 4]
prev_channel = "Tran" if k == 0 else rotation[(k - 1) % 4]
header = blocks[hi]
if len(header.data) < 18:
continue
# Extend the PREVIOUS channel by 2 more samples (deltas in bytes [0:4]).
prev_d0 = int.from_bytes(header.data[0:2], "big", signed=True)
prev_d1 = int.from_bytes(header.data[2:4], "big", signed=True)
if last_value[prev_channel] is not None:
v = last_value[prev_channel] + prev_d0
out[prev_channel].append(v)
v += prev_d1
out[prev_channel].append(v)
last_value[prev_channel] = v
# Anchor pair for THIS segment's channel.
c0 = int.from_bytes(header.data[14:16], "big", signed=True)
c1 = int.from_bytes(header.data[16:18], "big", signed=True)
out[channel].extend([c0, c1])
# Apply delta blocks for this segment.
next_hi = seg_idx[k + 1] if k + 1 < len(seg_idx) else len(blocks)
last_value[channel] = apply_blocks(channel, c1, hi + 1, next_hi)
return out