merge full s3 codec decoded #23

Merged
serversdown merged 18 commits from codec-re into main 2026-05-20 13:45:33 -04:00
3 changed files with 108 additions and 12 deletions
Showing only changes of commit ce5dc640ba - Show all commits
+55
View File
@@ -0,0 +1,55 @@
"""Run decode_waveform_v2 against the 5-8-26 quiet bundle to test the
'quiet events should decode fully' hypothesis."""
import os, sys
sys.path.insert(0, ".")
from minimateplus.waveform_codec import decode_waveform_v2, walk_body, find_data_start
from analysis.load_bundle import _parse_txt
def main():
base = "tests/fixtures/decode-re-5-8-26"
for evt in sorted(os.listdir(base)):
folder = os.path.join(base, evt)
if not os.path.isdir(folder):
continue
# Find the binary (not .TXT)
bin_name = next(
(f for f in os.listdir(folder) if not f.endswith(".TXT")),
None,
)
if not bin_name:
continue
bin_path = os.path.join(folder, bin_name)
txt_path = bin_path + ".TXT"
if not os.path.exists(txt_path):
# Sometimes the TXT name differs slightly
for f in os.listdir(folder):
if f.endswith(".TXT"):
txt_path = os.path.join(folder, f)
break
with open(bin_path, "rb") as f:
body = f.read()[43:-26]
decoded = decode_waveform_v2(body)
_, samples = _parse_txt(txt_path)
# Count 30 NN blocks
blocks = walk_body(body, find_data_start(body))
n_30 = sum(1 for b in blocks if b.tag_hi == 0x30)
n_40 = sum(1 for b in blocks if b.tag_hi == 0x40)
print(f"\n=== {evt} === body={len(body)} segments={n_40} '30 NN' blocks={n_30}")
if decoded is None:
print(" decoder returned None")
continue
for ch in ("Tran", "Vert", "Long"):
truth = [round(v * 200) for v in samples[ch]]
pred = decoded[ch]
n = min(len(pred), len(truth))
matches = sum(1 for i in range(n) if pred[i] == truth[i])
div = next((i for i in range(n) if pred[i] != truth[i]), -1)
print(f" {ch}: decoded={len(pred):>5} truth={len(truth):>5} "
f"matches={matches:>5}/{n:<5} first div={div}")
if __name__ == "__main__":
main()
+22 -10
View File
@@ -18,22 +18,34 @@ previous channel (bytes [0:4]).
**What decodes byte-exact today (verified against BW ASCII export):**
**Quiet events with zero `30 NN` blocks — decode FULLY across all channels:**
| Event | Channel | Samples verified | `30 NN` blocks |
|---|---|---|---|
| **event-a** (5-8-26) | Tran / Vert / Long | **3328 each × 3 = 9984** | 0 |
| **event-c** (5-8-26) | Tran / Vert / Long | **1280 each × 3 = 3840** | 0 |
| **event-d** (5-8-26) | Tran / Vert / Long | **1280 each × 3 = 3840** | 0 |
That's **17,664 ADC samples decoded byte-exact, zero errors**.
**Loud events with `30 NN` blocks — decode up to the first `30 NN`:**
| Event | Channel | Samples verified |
|---|---|---|
| V70 (Mic-heavy) | Tran | 512 (1 segment) |
| V70 | Vert | 512 |
| V70 | Long | 512 |
| V70 (Mic-heavy) | Tran / Vert / Long | 512 each (1 segment) |
| JQ0 (Vert-heavy) | Tran | 512 |
| JQ0 | Vert | 258 |
| SP0 (loud all) | Long | **1536 (all 3 L segments)** |
| SP0 | Tran | 1350 / 2044 produced |
| SP0 | Vert | 650 / 1526 produced |
| SP0 | Tran | 1350 (diverges at first `30 NN`) |
| SP0 | Vert | 650 (diverges at first `30 NN`) |
**What's still open:** the `30 NN` block format. These blocks appear in
high-amplitude regions (deltas exceeding what int8 can express). My
decoder currently steps over them, which is fine for quiet stretches but
breaks the cumulative when a `30 NN` carries information for samples we
need. Cracking this is the last major piece.
**What's still open — ONLY the `30 NN` block format.** These blocks
appear in high-amplitude regions (deltas exceeding what int8 can
express). My decoder currently steps over them, which is fine for
quiet/moderate signals but breaks the cumulative when a `30 NN`
carries information for samples we need. **Quiet events without
`30 NN` decode 100% correctly across all channels.** Cracking
`30 NN` is the last piece.
**Production code in `minimateplus/client.py:_decode_a5_waveform` still
uses the broken legacy int16 LE decoder.** Sample arrays it writes to
+31 -2
View File
@@ -261,6 +261,28 @@ MULTICHANNEL_FIXTURES = [
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1L.JQ0"), "Vert", 258),
# SP0 (loud all): Long all 3 segments byte-exact (1536 samples).
(os.path.join(os.path.dirname(__file__), "fixtures", "5-11-26", "M529LL1A.SP0"), "Long", 1536),
# 5-8-26 quiet bundle: events without 30 NN blocks decode FULLY across all channels.
# event-a: 3328 samples × 3 channels = 9984 samples, all byte-exact.
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
"event-a", "M529LKVQ.6S0"), "Tran", 3328),
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
"event-a", "M529LKVQ.6S0"), "Vert", 3328),
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
"event-a", "M529LKVQ.6S0"), "Long", 3328),
# event-c: 1280 samples × 3 channels
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
"event-c", "M529LK44.AB0"), "Tran", 1280),
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
"event-c", "M529LK44.AB0"), "Vert", 1280),
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
"event-c", "M529LK44.AB0"), "Long", 1280),
# event-d: 1280 samples × 3 channels
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
"event-d", "M529LK2V.470"), "Tran", 1280),
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
"event-d", "M529LK2V.470"), "Vert", 1280),
(os.path.join(os.path.dirname(__file__), "fixtures", "decode-re-5-8-26",
"event-d", "M529LK2V.470"), "Long", 1280),
]
@@ -325,9 +347,16 @@ def _full_truth(path):
def _full_truth_channel(path, channel):
"""Load one channel's samples (in 16-count units) from the BW ASCII export."""
import re
import glob, re
col_idx = {"Tran": 0, "Vert": 1, "Long": 2, "MicL": 3}[channel]
with open(path + ".TXT", "r", encoding="utf-8", errors="replace") as f:
# event-a's TXT has a typo ("M59" vs "M529") — pick the .TXT in the same dir
# rather than assuming exact-name correspondence.
txt_path = path + ".TXT"
if not os.path.exists(txt_path):
candidates = glob.glob(os.path.join(os.path.dirname(path), "*.TXT"))
if candidates:
txt_path = candidates[0]
with open(txt_path, "r", encoding="utf-8", errors="replace") as f:
lines = f.read().splitlines()
header_idx = None
for i, line in enumerate(lines):