codec-re: 30 NN block CRACKED — codec fully decoded
User intuition (16-bit) + 12-bit packing hypothesis + the int16 ADC
range constraint led to the final piece.
30 NN block format (CONFIRMED across all 14 blocks in the fixture
bundle):
NN 12-bit signed deltas packed as NN/4 groups of 6 bytes each.
Within each group:
bytes [0:2] = 16 bits = 4 × 4-bit high nibbles (MSB-first)
bytes [2:6] = 4 × int8 low bytes
delta[k] = sign_extend_12((high_nibble[k] << 8) | low_byte[k])
Block length = NN × 1.5 + 2 bytes (tag included). Earlier walker
used NN × 4 which is only correct in the TRAILER section.
Why 12-bit: ±2047 in 16-count units ≈ ±10 in/s = the geophone's
full-scale range at Normal sensitivity. The codec sizes its widest
delta to cover the worst-case sample-to-sample change.
Results: every decoded sample across all fixture events matches truth
byte-exact. ZERO divergences.
event-a: 9984 samples (full event, all 3 geos)
event-c: 3840 (full event)
event-d: 3840 (full event)
JQ0: 9984 (full event)
V70: 9984 (full event)
SP0: 5122 (walker stops early on edge cases)
SS0: 1758
SV0: 2114
event-b: 738
TOTAL: 47,364 ADC samples verified, zero errors.
Three full 3-sec events decode end-to-end across all three geo
channels. The events where fewer samples decode (SP0/SS0/SV0/event-b)
are limited by walker robustness issues past the first few segments,
NOT by decoder correctness.
64 tests pass (up from 55). Files: minimateplus/waveform_codec.py
(new 30 NN decode + corrected walker length), tests/test_waveform_codec.py
(new full-event regression tests), docs/* (updated status everywhere),
analysis/test_30nn_hybrid.py (new — the analysis script that confirmed
the format).
This commit is contained in:
@@ -0,0 +1,132 @@
|
||||
"""Test the '30 NN data = high-nibbles + int8 low-bytes' hypothesis.
|
||||
|
||||
Layout for `30 04` (6 data bytes, 4 deltas):
|
||||
bytes [0:2] = 16 bits = 4 × 4-bit high-nibbles (MSB first)
|
||||
bytes [2:6] = 4 × int8 low bytes
|
||||
Each delta = 12-bit signed = sign-extend((high_nibble << 8) | low_byte)
|
||||
"""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import _parse_txt
|
||||
from minimateplus.waveform_codec import walk_body, find_data_start
|
||||
|
||||
|
||||
def s4(n):
|
||||
return n if n < 8 else n - 16
|
||||
|
||||
|
||||
def i8(b):
|
||||
return b if b < 128 else b - 256
|
||||
|
||||
|
||||
def sign_extend_12(v):
|
||||
return v if v < 0x800 else v - 0x1000
|
||||
|
||||
|
||||
def decode_30nn(data):
|
||||
"""4 × 12-bit signed deltas (high nibble + low byte).
|
||||
bytes[0:2] hold the 4 high nibbles (MSB first); bytes[2:6] hold the low bytes.
|
||||
"""
|
||||
if len(data) < 6:
|
||||
return []
|
||||
# Read high nibbles from bytes 0-1 (4 nibbles MSB-first)
|
||||
high_word = (data[0] << 8) | data[1]
|
||||
high_nibbles = [
|
||||
(high_word >> 12) & 0xF,
|
||||
(high_word >> 8) & 0xF,
|
||||
(high_word >> 4) & 0xF,
|
||||
high_word & 0xF,
|
||||
]
|
||||
out = []
|
||||
for i in range(4):
|
||||
v = (high_nibbles[i] << 8) | data[2 + i]
|
||||
out.append(sign_extend_12(v))
|
||||
return out
|
||||
|
||||
|
||||
def simulate_up_to(blocks, target_block_idx, t_preamble):
|
||||
"""Run decoder up to block_idx; return per-channel sample lists.
|
||||
NOW with 30 NN decoded too."""
|
||||
out = {"Tran": [], "Vert": [], "Long": [], "MicL": []}
|
||||
out["Tran"].extend(t_preamble)
|
||||
cur = {"Tran": t_preamble[-1], "Vert": None, "Long": None, "MicL": None}
|
||||
rotation = ["Vert", "Long", "MicL", "Tran"]
|
||||
current_channel = "Tran"
|
||||
seg_counter = -1
|
||||
for j in range(target_block_idx):
|
||||
blk = blocks[j]
|
||||
if blk.tag_hi == 0x40:
|
||||
seg_counter += 1
|
||||
prev = "Tran" if seg_counter == 0 else rotation[(seg_counter - 1) % 4]
|
||||
new_ch = rotation[seg_counter % 4]
|
||||
if cur[prev] is not None:
|
||||
d0 = int.from_bytes(blk.data[0:2], "big", signed=True)
|
||||
d1 = int.from_bytes(blk.data[2:4], "big", signed=True)
|
||||
cur[prev] += d0; out[prev].append(cur[prev])
|
||||
cur[prev] += d1; out[prev].append(cur[prev])
|
||||
c0 = int.from_bytes(blk.data[14:16], "big", signed=True)
|
||||
c1 = int.from_bytes(blk.data[16:18], "big", signed=True)
|
||||
out[new_ch].extend([c0, c1])
|
||||
cur[new_ch] = c1
|
||||
current_channel = new_ch
|
||||
elif blk.tag_hi == 0x10:
|
||||
for byte in blk.data:
|
||||
for nib in ((byte >> 4) & 0xF, byte & 0xF):
|
||||
cur[current_channel] += s4(nib)
|
||||
out[current_channel].append(cur[current_channel])
|
||||
elif blk.tag_hi == 0x20:
|
||||
for byte in blk.data:
|
||||
cur[current_channel] += i8(byte)
|
||||
out[current_channel].append(cur[current_channel])
|
||||
elif blk.tag_hi == 0x00:
|
||||
for _ in range(blk.tag_lo):
|
||||
out[current_channel].append(cur[current_channel])
|
||||
elif blk.tag_hi == 0x30:
|
||||
# NEW: decode 30 NN
|
||||
deltas = decode_30nn(blk.data)
|
||||
for d in deltas:
|
||||
cur[current_channel] += d
|
||||
out[current_channel].append(cur[current_channel])
|
||||
return out, current_channel
|
||||
|
||||
|
||||
def main():
|
||||
for stem in ("M529LL1A.SP0", "M529LL1L.JQ0", "M529LL1L.V70",
|
||||
"M529LL1A.SS0", "M529LL1A.SV0"):
|
||||
path = f"tests/fixtures/5-11-26/{stem}"
|
||||
with open(path, "rb") as f:
|
||||
body = f.read()[43:-26]
|
||||
_, samples = _parse_txt(path + ".TXT")
|
||||
blocks = walk_body(body, find_data_start(body))
|
||||
t0 = int.from_bytes(body[3:5], "big", signed=True)
|
||||
t1 = int.from_bytes(body[5:7], "big", signed=True)
|
||||
thirty_blocks = [(j, b) for j, b in enumerate(blocks) if b.tag_hi == 0x30]
|
||||
if not thirty_blocks:
|
||||
continue
|
||||
print(f"\n=== {stem} ===")
|
||||
for j, blk in thirty_blocks:
|
||||
pred, ch = simulate_up_to(blocks, j, [t0, t1])
|
||||
cur_before = pred[ch][-1]
|
||||
truth = [round(v * 200) for v in samples[ch]]
|
||||
n_pred = len(pred[ch])
|
||||
nn = blk.tag_lo
|
||||
if n_pred + nn > len(truth):
|
||||
continue
|
||||
# Decode this 30 NN block with hypothesis
|
||||
pred_deltas = decode_30nn(blk.data)
|
||||
# Compute truth deltas relative to cur_before
|
||||
truth_deltas = []
|
||||
prev = cur_before
|
||||
for k in range(nn):
|
||||
truth_deltas.append(truth[n_pred + k] - prev)
|
||||
prev = truth[n_pred + k]
|
||||
n_match = sum(1 for a, b in zip(pred_deltas, truth_deltas) if a == b)
|
||||
tag = "✓" if pred_deltas == truth_deltas else " "
|
||||
print(f" block @ {blk.offset:>5} (chan={ch}, NN={nn}):")
|
||||
print(f" data: {blk.data.hex(' ')}")
|
||||
print(f" truth: {truth_deltas}")
|
||||
print(f" pred: {pred_deltas} {tag}{n_match}/{nn}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user