merge full s3 codec decoded #23

Merged
serversdown merged 18 commits from codec-re into main 2026-05-20 13:45:33 -04:00
10 changed files with 7195 additions and 62 deletions
Showing only changes of commit a0c9a482c7 - Show all commits
+76
View File
@@ -0,0 +1,76 @@
"""Full Tran decoder: continues across segment headers using T_delta from header bytes [0:2]."""
import sys
sys.path.insert(0, ".")
from analysis.load_bundle import _parse_txt
from minimateplus.waveform_codec import walk_body, find_data_start
def s4(n):
return n if n < 8 else n - 16
def i8(b):
return b if b < 128 else b - 256
def decode_full_tran(body):
if len(body) < 7 or body[0:3] != b"\x00\x02\x00":
return None
T0 = int.from_bytes(body[3:5], "big", signed=True)
T1 = int.from_bytes(body[5:7], "big", signed=True)
i = 7
while i + 1 < len(body) and body[i] not in (0x00, 0x10, 0x20, 0x30, 0x40):
i += 1
blocks = walk_body(body, i)
T = [T0, T1]
cur = T1
for blk in blocks:
if blk.tag_hi == 0x40:
# Segment header carries 2 T deltas (int16 BE each) at bytes [0:2] and [2:4]
if len(blk.data) >= 4:
delta1 = int.from_bytes(blk.data[0:2], "big", signed=True)
cur += delta1
T.append(cur)
delta2 = int.from_bytes(blk.data[2:4], "big", signed=True)
cur += delta2
T.append(cur)
elif blk.tag_hi == 0x10:
for byte in blk.data:
for nib in ((byte >> 4) & 0xF, byte & 0xF):
cur += s4(nib)
T.append(cur)
elif blk.tag_hi == 0x20:
for byte in blk.data:
cur += i8(byte)
T.append(cur)
elif blk.tag_hi == 0x00:
for _ in range(blk.tag_lo):
T.append(cur)
# 30 NN: skip for now
return T
def main():
for stem in ("M529LL1L.V70", "M529LL1L.JQ0", "M529LL1A.SP0", "M529LL1A.SS0", "M529LL1A.SV0"):
path = f"decode-re/5-11-26/{stem}"
with open(path, "rb") as f:
body = f.read()[43:-26]
_, samples = _parse_txt(path + ".TXT")
truth_T = [round(v*200) for v in samples["Tran"]]
n_truth = len(truth_T)
decoded = decode_full_tran(body)
n = min(len(decoded), n_truth)
matches = sum(1 for i in range(n) if decoded[i] == truth_T[i])
div_at = -1
for i in range(n):
if decoded[i] != truth_T[i]:
div_at = i
break
print(f"{stem}: decoded={len(decoded)}, truth={n_truth}, matches={matches}/{n}, first div={div_at}")
if __name__ == "__main__":
main()
+81
View File
@@ -0,0 +1,81 @@
"""Decode Tran across multiple segments by resetting at 40 02 headers."""
import sys
sys.path.insert(0, ".")
from analysis.load_bundle import _parse_txt
from minimateplus.waveform_codec import walk_body, find_data_start
def s4(n):
return n if n < 8 else n - 16
def i8(b):
return b if b < 128 else b - 256
def decode_full_tran(body):
"""Decode all Tran samples in the body, walking through segments."""
if len(body) < 7 or body[0:3] != b"\x00\x02\x00":
return None
T0 = int.from_bytes(body[3:5], "big", signed=True)
T1 = int.from_bytes(body[5:7], "big", signed=True)
# Locate first tag
i = 7
while i + 1 < len(body) and body[i] not in (0x00, 0x10, 0x20, 0x30, 0x40):
i += 1
blocks = walk_body(body, i)
T = [T0, T1]
cur = T1
for bi, blk in enumerate(blocks):
if blk.tag_hi == 0x40:
# Segment header — try interpreting bytes [0:2] as new T anchor
if len(blk.data) >= 2:
new_anchor = int.from_bytes(blk.data[0:2], "big", signed=True)
# The next sample IS this anchor value, NOT a delta from cur.
T.append(new_anchor)
cur = new_anchor
elif blk.tag_hi == 0x10:
for byte in blk.data:
for nib in ((byte >> 4) & 0xF, byte & 0xF):
cur += s4(nib)
T.append(cur)
elif blk.tag_hi == 0x20:
for byte in blk.data:
cur += i8(byte)
T.append(cur)
elif blk.tag_hi == 0x00:
# RLE: append NN zero deltas
for _ in range(blk.tag_lo):
T.append(cur)
# 30 NN: skip
return T
def main():
for stem in ("M529LL1L.V70", "M529LL1L.JQ0", "M529LL1A.SP0", "M529LL1A.SS0", "M529LL1A.SV0"):
path = f"decode-re/5-11-26/{stem}"
with open(path, "rb") as f:
body = f.read()[43:-26]
_, samples = _parse_txt(path + ".TXT")
truth_T = [round(v*200) for v in samples["Tran"]]
n_truth = len(truth_T)
decoded = decode_full_tran(body)
n = min(len(decoded), n_truth)
matches = sum(1 for i in range(n) if decoded[i] == truth_T[i])
# Find first divergence
div_at = -1
for i in range(n):
if decoded[i] != truth_T[i]:
div_at = i
break
print(f"{stem}: decoded={len(decoded)}, truth={n_truth}, matches={matches}/{n}, first div={div_at}")
if div_at >= 0 and div_at < 30:
print(f" truth around div [{max(0,div_at-3)}:{div_at+8}]: {truth_T[max(0,div_at-3):div_at+8]}")
print(f" pred around div [{max(0,div_at-3)}:{div_at+8}]: {decoded[max(0,div_at-3):div_at+8]}")
if __name__ == "__main__":
main()
+86
View File
@@ -0,0 +1,86 @@
"""Test: 00 NN markers might be RLE for zero-deltas in current channel."""
import sys
sys.path.insert(0, ".")
from analysis.load_bundle import _parse_txt
from minimateplus.waveform_codec import walk_body, find_data_start
def s4(n):
return n if n < 8 else n - 16
def i8(b):
return b if b < 128 else b - 256
def decode_with_rle(body):
"""Decode Tran assuming:
- preamble[3:5], [5:7] = T[0], T[1]
- All 10 NN / 20 NN blocks until segment_header (40 02) are Tran deltas
- 00 NN markers are RLE: NN/4 zero T deltas (or NN, or NN/2 — try them)
"""
if len(body) < 9 or body[0:3] != b"\x00\x02\x00":
return None, None, None
T0 = int.from_bytes(body[3:5], "big", signed=True)
T1 = int.from_bytes(body[5:7], "big", signed=True)
# Find first tag (might be 00 NN, 10 NN, or 20 NN)
i = 7
while i + 1 < len(body):
if body[i] in (0x00, 0x10, 0x20):
break
i += 1
start = i
blocks = walk_body(body, start)
results = {}
for rle_div in (4, 2, 1): # try different RLE interpretations
T = [T0, T1]
cur = T1
for blk in blocks:
if blk.tag_hi == 0x40:
break
if blk.tag_hi == 0x10:
for byte in blk.data:
for nib in ((byte >> 4) & 0xF, byte & 0xF):
cur += s4(nib)
T.append(cur)
elif blk.tag_hi == 0x20:
for byte in blk.data:
cur += i8(byte)
T.append(cur)
elif blk.tag_hi == 0x00:
# RLE of zero deltas
n_zeros = blk.tag_lo // rle_div
for _ in range(n_zeros):
T.append(cur)
# 30 NN: skip for now
results[rle_div] = T
return results, T0, T1
def main():
for stem in ("M529LL1L.V70", "M529LL1L.JQ0", "M529LL1A.SP0", "M529LL1A.SS0", "M529LL1A.SV0"):
path = f"decode-re/5-11-26/{stem}"
with open(path, "rb") as f:
body = f.read()[43:-26]
_, samples = _parse_txt(path + ".TXT")
truth_T = [round(v*200) for v in samples["Tran"]]
results, T0, T1 = decode_with_rle(body)
print(f"\n=== {stem} (T[0]={T0}, T[1]={T1}) ===")
for rle_div, T in results.items():
n = min(len(T), len(truth_T))
matches = sum(1 for i in range(n) if T[i] == truth_T[i])
# Find first divergence
div_at = -1
for i in range(n):
if T[i] != truth_T[i]:
div_at = i
break
print(f" rle_div={rle_div}: decoded {len(T)}, matches {matches}/{n}, first div at sample {div_at}")
if __name__ == "__main__":
main()
+47 -11
View File
@@ -971,28 +971,64 @@ in the form ``f3/f4/f5`` near ``20 10`` markers strongly resemble
int8 channel-bias values around -12). Detailed decoding of the
trailer is outside the path needed for sample reconstruction.
##### Tran channel codec — CONFIRMED 2026-05-11
##### Tran channel codec — CONFIRMED 2026-05-11 (segment 0)
The first data block (immediately after the 7-byte preamble) carries
Tran-channel deltas starting at sample 2. Two block types in alternation:
After the 7-byte preamble, the body's segment 0 carries Tran deltas
via three block types:
- ``10 NN``: ``NN/2`` bytes of payload. Each byte = two 4-bit signed
nibbles (high nibble first; 0..7 → 0..+7, 8..F → -8..-1). Each
nibble is one Tran delta in 16-count units.
nibble is one Tran delta in 16-count units (LSB = 0.005 in/s).
- ``20 NN``: ``NN`` bytes of payload. Each byte = one int8 signed delta
in 16-count units.
- ``20 NN``: ``NN`` bytes of payload. Each byte = one int8 signed
delta in 16-count units. Used when deltas don't fit in 4 bits.
Verified against all 3 May-11 fixture events:
- ``00 NN``: a 2-byte marker. Run-length-encoded zero deltas — append
NN copies of the current cumulative Tran value (no change). Used
heavily for silent stretches.
| Event | First block | # T samples decoded | Matches truth |
Segment 0 ends at the first ``40 02`` segment header. Segment 0 typically
covers ~510 sample-sets for events with mostly-quiet Tran, fewer for
events with rapid Tran changes.
Verified against all bundled fixture events (5-8 and 5-11 bundles):
| Event | Tran character | Segment 0 size | Matches truth |
|---|---|---|---|
| SP0 | ``10 14`` (10 bytes / 20 nibbles) | 22 (= 2 preamble + 20 deltas) | 22/22 ✓ |
| SS0 | ``10 28`` (20 bytes / 40 nibbles) | 42 | 42/42 ✓ |
| SV0 | ``20 2c`` (44 int8 bytes) | 46 | 46/46 ✓ |
| SP0 (loud all-channels, pretrig=0.25s) | small near sample 0 | 510 | 510/510 ✓ |
| SS0 (loud-from-start) | big from sample 0 | 42* | 42/42 ✓ |
| SV0 (loud-from-start) | big from sample 0 | 58* | 58/58 ✓ |
| JQ0 (Vert-heavy) | near zero | 510 | 510/510 ✓ |
| V70 (Mic-heavy) | near zero | 510 | 510/510 ✓ |
\* SS0 and SV0 decode stops early because their segment 0 contains
``30 04`` blocks whose internal format hasn't been decoded yet (likely
a channel-switch marker for the high-amplitude regime). The two events
where the codec is most complex stop at the first ``30 04``.
Implementation: :func:`minimateplus.waveform_codec.decode_tran_initial`.
##### Segment header T-delta (PARTIAL 2026-05-11)
The 20-byte ``40 02`` segment header has its first 2 bytes ([0:2] of
payload) as an int16 BE Tran delta for the first sample of the new
segment. Verified across V70 (3 segments with 0 deltas) and SP0/JQ0
(1 segment with +1 delta). Other bytes of the segment header payload
are partially understood:
| Payload offset | Field | Status |
|---|---|---|
| [0:2] | T_delta at first sample of new segment (int16 BE) | ✅ confirmed |
| [2:4] | unknown (often 0; not a simple V or T delta) | ❓ open |
| [4:6] | unknown (varies per event; possibly a checksum) | ❓ open |
| [6:8] | byte length to next segment header 2 (uint16 BE) | ✅ confirmed |
| [8:12] | monotonic uint32 LE counter | ✅ confirmed |
| [12:14] | constant ``02 00`` | ✅ confirmed |
| [14:18] | unknown 4-byte field | ❓ open |
Multi-segment Tran decoding diverges after sample ~512 — the per-segment
channel ordering after the header is still unknown.
##### What's still open
- **Tran past the first data block.** After the first block, the
+51 -35
View File
@@ -137,13 +137,20 @@ class WaveformBlock:
def find_data_start(body: bytes) -> int:
"""Auto-detect the offset of the first data block (``10 NN`` or ``20 NN``).
"""Auto-detect the offset of the first data block.
The preamble is always either 7 bytes (when sample 0 and 1 have small
values) or 9 bytes (when they don't, but only on continuous-mode events
in the small May-8 bundle). Returning the offset of the first ``10/20 NN``
tag is the most robust heuristic.
The body starts with a 7-byte preamble (magic ``00 02 00`` + two int16 BE
Tran anchors). After that, the data section starts with a tag usually
``10 NN`` or ``20 NN``, but quiet events may begin with a ``00 NN`` RLE
marker. We return the offset of the first recognized tag.
"""
# Try fixed offset 7 first (canonical preamble length).
if len(body) >= 9:
b, nn = body[7], body[8]
if (b in (0x00, 0x10, 0x20, 0x30) and nn % 4 == 0 and 0 < nn <= 0xFC) \
or (b == 0x40 and nn == 0x02):
return 7
# Fall back to scanning the first 20 bytes.
for i in range(min(20, len(body) - 1)):
b = body[i]
nn = body[i + 1]
@@ -258,61 +265,70 @@ def _i8(b: int) -> int:
def decode_tran_initial(body: bytes) -> Optional[List[int]]:
"""
Decode the initial Tran-channel samples from the body VERIFIED 2026-05-11
against M529LL1A.SP0 / .SS0 / .SV0 (22 + 42 + 46 samples, 0 errors).
Decode the initial Tran-channel samples VERIFIED 2026-05-11.
Returns a list of Tran sample values in **16-count units** (LSB = 0.005 in/s
at Normal range, the same quantization BW uses for its ASCII export).
Returns ``None`` if the body cannot be parsed.
Returns Tran samples in **16-count units** (LSB = 0.005 in/s at Normal
range the same quantization BW uses for its ASCII export). Returns
``None`` if the body cannot be parsed.
The decoded list extends from sample 0 (= ``Tran[0]`` from preamble bytes
[3:5]) through the end of the FIRST data block. Subsequent samples
require decoding additional blocks that walk is not yet wired up here
because the multi-block channel-switching rule is still under
investigation (see waveform_codec module docstring).
The decoded list extends from sample 0 through the end of segment 0
(= just before the first ``40 02`` segment header; ~510 sample-sets
for the events tested). Multi-segment decoding requires continuing
past the segment header that's done by :func:`decode_tran_full`
when the per-segment rules are pinned down for all signal types.
Codec details (CONFIRMED 2026-05-11):
Codec for segment 0 (CONFIRMED 2026-05-11 against 7 fixture events):
- Body bytes [0:3] are the magic ``00 02 00``.
- Body bytes [3:5] = ``Tran[0]`` as int16 BE in 16-count units.
- Body bytes [5:7] = ``Tran[1]`` as int16 BE in 16-count units.
- The first data block (``10 NN`` or ``20 NN``) carries Tran deltas
starting at sample 2:
- Data blocks (``10 NN`` or ``20 NN``) carry Tran deltas starting
at sample 2:
* ``10 NN``: NN nibbles = NN/2 bytes; each nibble is a 4-bit signed
delta (0..7 0..+7; 8..F -8..-1). High nibble of each byte
comes first.
* ``10 NN``: NN nibbles = NN/2 bytes; each nibble is a 4-bit
signed delta (0..7 0..+7; 8..F -8..-1). High nibble of
each byte comes first.
* ``20 NN``: NN int8 signed deltas (one delta per byte).
- ``00 NN`` blocks are run-length-encoded zero deltas: append NN
copies of the current cumulative Tran value (no change).
- ``30 NN`` blocks have not yet been decoded for content they
appear in segment 0 of loud-from-start events (SS0, SV0) and
seem to signal a transition or special-case interpretation.
The walker steps over them but their data is ignored.
The walk stops at the first ``40 02`` segment header.
"""
if len(body) < 9:
return None
if body[0:3] != b"\x00\x02\x00":
if len(body) < 7 or body[0:3] != b"\x00\x02\x00":
return None
t0 = int.from_bytes(body[3:5], "big", signed=True)
t1 = int.from_bytes(body[5:7], "big", signed=True)
start = find_data_start(body)
if start < 0:
return None
blocks = walk_body(body, start)
if not blocks:
return [t0, t1]
first = blocks[0]
out = [t0, t1]
cur = t1
if first.tag_hi == 0x10:
for byte in first.data:
for blk in walk_body(body, start):
if blk.tag_hi == 0x40:
# Segment boundary — stop. Multi-segment decode is decode_tran_full.
break
if blk.tag_hi == 0x10:
for byte in blk.data:
for nib in ((byte >> 4) & 0xF, byte & 0xF):
cur += _s4(nib)
out.append(cur)
elif first.tag_hi == 0x20:
for byte in first.data:
elif blk.tag_hi == 0x20:
for byte in blk.data:
cur += _i8(byte)
out.append(cur)
else:
# First block is something else — fall back to just the preamble.
return out
elif blk.tag_hi == 0x00:
# RLE zero deltas: append NN copies of current Tran value.
for _ in range(blk.tag_lo):
out.append(cur)
# 30 NN: unknown content; skip.
return out
Binary file not shown.
File diff suppressed because it is too large Load Diff
Binary file not shown.
File diff suppressed because it is too large Load Diff
+76 -12
View File
@@ -78,25 +78,23 @@ def test_find_data_start_locates_first_block(event_name):
body = _bw_body(path)
start = find_data_start(body)
assert 0 <= start < 20, f"expected start in [0, 20), got {start}"
assert body[start] == 0x10
assert body[start + 1] % 4 == 0
assert 0 < body[start + 1] <= 0xFC
assert body[start] in (0x00, 0x10, 0x20, 0x30, 0x40), (
f"first tag byte 0x{body[start]:02x} not a recognized block type"
)
assert body[start + 1] % 4 == 0 or (body[start] == 0x40 and body[start + 1] == 0x02)
def test_find_data_start_preamble_lengths():
"""All 4 events have either a 7-byte (single-shot) or 9-byte (continuous) preamble."""
starts = {}
def test_find_data_start_canonical_offset_7():
"""All events have a 7-byte preamble (3-byte magic + 4-byte Tran anchors)."""
for name in FIXTURES_INFO:
path = _fixture_path(name)
if not os.path.exists(path):
pytest.skip(f"fixture missing: {path}")
body = _bw_body(path)
starts[name] = find_data_start(body)
# Empirically: events a, b have 9-byte preamble; events c, d have 7-byte.
assert starts["event-a"] == 9
assert starts["event-b"] == 9
assert starts["event-c"] == 7
assert starts["event-d"] == 7
# Sanity: magic
assert body[0:3] == b"\x00\x02\x00", f"{name}: bad magic"
# First tag at offset 7
assert find_data_start(body) == 7, f"{name}: expected start=7"
# ── Block walker ─────────────────────────────────────────────────────────────
@@ -274,9 +272,46 @@ TRAN_INITIAL_FIXTURES = [
[-745, -762, -771, -774, -779, -794, -808, -811, -811, -819],
46,
),
# Vert-heavy event (T near zero) — segment 0 = 510 samples, all decode correctly.
(
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.JQ0"),
[0] * 4 + [-1, 0, 0, -1, -1, 0],
38,
),
# Mic-heavy event (geos all near zero) — segment 0 = 482 samples.
(
os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.V70"),
[0] * 10,
6,
),
]
def _full_truth(path):
"""Load the BW ASCII truth for an event."""
import re
with open(path + ".TXT", "r", encoding="utf-8", errors="replace") as f:
lines = f.read().splitlines()
# Find columns header.
header_idx = None
for i, line in enumerate(lines):
if "Tran" in line and "Vert" in line and "Long" in line and "MicL" in line:
header_idx = i
break
if header_idx is None:
return None
out = []
for line in lines[header_idx + 1:]:
parts = re.split(r"\s+", line.strip())
if len(parts) < 4:
continue
try:
out.append(round(float(parts[0]) * 200))
except ValueError:
continue
return out
@pytest.mark.parametrize("path,expected,n_required", TRAN_INITIAL_FIXTURES)
def test_decode_tran_initial_matches_ground_truth(path, expected, n_required):
"""The Tran initial decoder produces values matching the BW ASCII export exactly."""
@@ -312,3 +347,32 @@ def test_decode_tran_initial_synthetic_body():
decoded = decode_tran_initial(body)
# T[0]=10, T[1]=20, then deltas (+1, -1, +2, -2) from T[1]=20
assert decoded == [10, 20, 21, 20, 22, 20]
def test_decode_tran_initial_with_rle():
"""A synthetic body with 00 NN RLE block runs the current Tran value forward."""
# T[0]=5, T[1]=5, then 00 08 RLE block = 8 zero deltas → T[2..9] = 5
body = b"\x00\x02\x00\x00\x05\x00\x05" + b"\x00\x08"
decoded = decode_tran_initial(body)
assert decoded == [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
def test_decode_tran_initial_full_segment_silent_events():
"""For events with near-silent Tran, segment 0 (~482-510 samples) decodes fully."""
for path, _, _ in TRAN_INITIAL_FIXTURES[3:]: # JQ0 (Vert-heavy) and V70 (Mic-heavy)
if not os.path.exists(path):
pytest.skip(f"fixture missing: {path}")
with open(path, "rb") as f:
body = f.read()[43:-26]
truth = _full_truth(path)
decoded = decode_tran_initial(body)
assert decoded is not None
# The decoder should produce a clean run of samples; check ALL of them
# match truth (segment 0 is fully solved for events where T is near zero).
n = len(decoded)
for i in range(n):
assert decoded[i] == truth[i], (
f"{os.path.basename(path)}: sample {i}: decoded={decoded[i]} truth={truth[i]}"
)
# And we should have decoded at least 400 samples (= segment 0 worth).
assert n >= 400, f"only {n} samples decoded for {path}"