merge full s3 codec decoded #23
@@ -0,0 +1,195 @@
|
|||||||
|
"""Test 12-bit signed packed deltas hypothesis for 30 NN blocks across all loud events.
|
||||||
|
|
||||||
|
For each 30 NN block in each event, identify what samples it should cover
|
||||||
|
(based on the cumulative delta count up to that point) and compare the
|
||||||
|
truth deltas against various 12-bit packing schemes.
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
|
sys.path.insert(0, ".")
|
||||||
|
from analysis.load_bundle import _parse_txt
|
||||||
|
from minimateplus.waveform_codec import walk_body, find_data_start
|
||||||
|
|
||||||
|
|
||||||
|
CHANNEL_ORDER = ["Vert", "Long", "MicL", "Tran"] # rotation after initial T
|
||||||
|
|
||||||
|
|
||||||
|
def s12(v):
|
||||||
|
"""Sign-extend a 12-bit unsigned value to signed int."""
|
||||||
|
return v if v < 0x800 else v - 0x1000
|
||||||
|
|
||||||
|
|
||||||
|
def unpack_12bit_be(data):
|
||||||
|
"""4 deltas in 6 bytes, BE order: byte[0:1.5], byte[1.5:3], byte[3:4.5], byte[4.5:6]."""
|
||||||
|
# bits 0..47 (MSB-first), split into 4 × 12-bit
|
||||||
|
val = int.from_bytes(data, "big")
|
||||||
|
out = []
|
||||||
|
for i in range(4):
|
||||||
|
d = (val >> (12 * (3 - i))) & 0xFFF
|
||||||
|
out.append(s12(d))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def unpack_12bit_le(data):
|
||||||
|
"""4 deltas in 6 bytes, LE order: bytes packed as 2 × 24-bit groups."""
|
||||||
|
out = []
|
||||||
|
# First 3 bytes contain 2 deltas
|
||||||
|
b0, b1, b2 = data[0], data[1], data[2]
|
||||||
|
d0 = b0 | ((b1 & 0x0F) << 8)
|
||||||
|
d1 = (b1 >> 4) | (b2 << 4)
|
||||||
|
out.append(s12(d0))
|
||||||
|
out.append(s12(d1))
|
||||||
|
# Next 3 bytes contain 2 more deltas
|
||||||
|
b3, b4, b5 = data[3], data[4], data[5]
|
||||||
|
d2 = b3 | ((b4 & 0x0F) << 8)
|
||||||
|
d3 = (b4 >> 4) | (b5 << 4)
|
||||||
|
out.append(s12(d2))
|
||||||
|
out.append(s12(d3))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def unpack_12bit_be_per_triplet(data):
|
||||||
|
"""4 deltas as 2 triplets of (high4, low8) BE within each 3-byte group."""
|
||||||
|
out = []
|
||||||
|
b0, b1, b2 = data[0], data[1], data[2]
|
||||||
|
d0 = (b0 << 4) | (b1 >> 4)
|
||||||
|
d1 = ((b1 & 0x0F) << 8) | b2
|
||||||
|
out.append(s12(d0))
|
||||||
|
out.append(s12(d1))
|
||||||
|
b3, b4, b5 = data[3], data[4], data[5]
|
||||||
|
d2 = (b3 << 4) | (b4 >> 4)
|
||||||
|
d3 = ((b4 & 0x0F) << 8) | b5
|
||||||
|
out.append(s12(d2))
|
||||||
|
out.append(s12(d3))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def truth_deltas_for_block(blocks, block_idx, event_truth, channel):
|
||||||
|
"""For a 30 NN block at block_idx, determine which samples it covers and
|
||||||
|
return the truth deltas for those samples.
|
||||||
|
|
||||||
|
Walks through all blocks before block_idx (within the same segment) and
|
||||||
|
counts how many deltas have been emitted for *channel*, starting from the
|
||||||
|
segment's anchor pair.
|
||||||
|
"""
|
||||||
|
# Find the segment header that contains this block.
|
||||||
|
seg_header_idx = None
|
||||||
|
for j in range(block_idx, -1, -1):
|
||||||
|
if blocks[j].tag_hi == 0x40:
|
||||||
|
seg_header_idx = j
|
||||||
|
break
|
||||||
|
if seg_header_idx is None:
|
||||||
|
# block is in the initial T segment; samples count from sample 2.
|
||||||
|
first_sample_in_segment = 2
|
||||||
|
else:
|
||||||
|
# Anchor pair covers samples [N, N+1] for some N. Subsequent deltas
|
||||||
|
# are samples [N+2, N+2+1, ...]. We don't actually need to know N
|
||||||
|
# for this test — just the relative position within the segment.
|
||||||
|
first_sample_in_segment = 2 # anchor=0,1; deltas start at 2
|
||||||
|
|
||||||
|
# Count deltas from segment-data start to block_idx.
|
||||||
|
delta_count = 0
|
||||||
|
start_block = seg_header_idx + 1 if seg_header_idx is not None else 0
|
||||||
|
for j in range(start_block, block_idx):
|
||||||
|
blk = blocks[j]
|
||||||
|
if blk.tag_hi == 0x10:
|
||||||
|
delta_count += blk.tag_lo # NN nibbles = NN deltas
|
||||||
|
elif blk.tag_hi == 0x20:
|
||||||
|
delta_count += blk.tag_lo # NN int8 deltas
|
||||||
|
elif blk.tag_hi == 0x00:
|
||||||
|
delta_count += blk.tag_lo # RLE zero deltas
|
||||||
|
# Now the 30 NN block carries NN deltas.
|
||||||
|
nn = blocks[block_idx].tag_lo
|
||||||
|
# First sample affected: segment first_sample + delta_count.
|
||||||
|
# But we ALSO need to know which segment this is, since the segment maps
|
||||||
|
# to a specific channel and a specific starting absolute sample index.
|
||||||
|
return first_sample_in_segment + delta_count, nn
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
for stem in ("M529LL1A.SP0", "M529LL1L.JQ0", "M529LL1L.V70",
|
||||||
|
"M529LL1A.SS0", "M529LL1A.SV0"):
|
||||||
|
path = f"tests/fixtures/5-11-26/{stem}"
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
body = f.read()[43:-26]
|
||||||
|
_, samples = _parse_txt(path + ".TXT")
|
||||||
|
blocks = walk_body(body, find_data_start(body))
|
||||||
|
seg_idx = [i for i, b in enumerate(blocks) if b.tag_hi == 0x40]
|
||||||
|
|
||||||
|
# Find all 30 NN blocks in DATA section (not trailer).
|
||||||
|
thirty_blocks = []
|
||||||
|
for bi, b in enumerate(blocks):
|
||||||
|
if b.tag_hi != 0x30:
|
||||||
|
continue
|
||||||
|
# Determine which segment this is in
|
||||||
|
seg_num = None
|
||||||
|
for k, hi in enumerate(seg_idx):
|
||||||
|
next_hi = seg_idx[k + 1] if k + 1 < len(seg_idx) else len(blocks)
|
||||||
|
if hi < bi < next_hi:
|
||||||
|
seg_num = k
|
||||||
|
break
|
||||||
|
if seg_num is None and seg_idx and bi < seg_idx[0]:
|
||||||
|
seg_num = -1 # initial T segment
|
||||||
|
thirty_blocks.append((bi, b, seg_num))
|
||||||
|
|
||||||
|
if not thirty_blocks:
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"\n=== {stem} ===")
|
||||||
|
for bi, b, seg_num in thirty_blocks:
|
||||||
|
# Channel for this segment
|
||||||
|
if seg_num == -1:
|
||||||
|
channel = "Tran"
|
||||||
|
seg_label = "initial T"
|
||||||
|
else:
|
||||||
|
channel = CHANNEL_ORDER[seg_num % 4]
|
||||||
|
seg_label = f"seg {seg_num}"
|
||||||
|
|
||||||
|
# Count deltas before this block within the same segment.
|
||||||
|
seg_header_idx = seg_idx[seg_num] if seg_num >= 0 else -1
|
||||||
|
start_block = seg_header_idx + 1 if seg_header_idx >= 0 else 0
|
||||||
|
delta_count = 0
|
||||||
|
for j in range(start_block, bi):
|
||||||
|
blk = blocks[j]
|
||||||
|
if blk.tag_hi in (0x10, 0x20, 0x00):
|
||||||
|
delta_count += blk.tag_lo
|
||||||
|
|
||||||
|
# First sample this 30 NN block affects (within the segment)
|
||||||
|
# = anchor positions + delta_count + 2 (since anchor pair was samples 0,1)
|
||||||
|
# But the segment's first absolute sample index in the channel is
|
||||||
|
# (seg_num // 4) * 512 (approximately) if segment 0 is the first V seg.
|
||||||
|
cycle = (seg_num // 4) if seg_num >= 0 else 0
|
||||||
|
base = cycle * 512 + 2 # +2 for anchor pair
|
||||||
|
sample_idx = base + delta_count
|
||||||
|
truth_ch = [round(v * 200) for v in samples[channel]]
|
||||||
|
nn = b.tag_lo
|
||||||
|
|
||||||
|
if sample_idx + nn >= len(truth_ch):
|
||||||
|
print(f" block @ {b.offset} ({seg_label} {channel}): out of truth range")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Get the previous sample so we can compute truth deltas
|
||||||
|
if sample_idx == 0:
|
||||||
|
prev = 0
|
||||||
|
else:
|
||||||
|
prev = truth_ch[sample_idx - 1]
|
||||||
|
truth_deltas = []
|
||||||
|
for k in range(nn):
|
||||||
|
truth_deltas.append(truth_ch[sample_idx + k] - (prev if k == 0 else truth_ch[sample_idx + k - 1]))
|
||||||
|
|
||||||
|
# Try each packing
|
||||||
|
schemes = [
|
||||||
|
("12-bit BE contiguous", unpack_12bit_be(b.data)),
|
||||||
|
("12-bit LE per-triplet", unpack_12bit_le(b.data)),
|
||||||
|
("12-bit BE per-triplet", unpack_12bit_be_per_triplet(b.data)),
|
||||||
|
]
|
||||||
|
print(f" block @ {b.offset:>5} ({seg_label} {channel}, samples {sample_idx}..{sample_idx+nn-1}):")
|
||||||
|
print(f" data: {b.data.hex(' ')}")
|
||||||
|
print(f" truth: {truth_deltas}")
|
||||||
|
for name, pred in schemes:
|
||||||
|
match = "✓" if pred == truth_deltas else " "
|
||||||
|
n_match = sum(1 for x, y in zip(pred, truth_deltas) if x == y)
|
||||||
|
print(f" {match}{n_match}/4 {name}: {pred}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -0,0 +1,141 @@
|
|||||||
|
"""Test 30 NN packing by running the real decoder up to each 30 NN block,
|
||||||
|
recording how many samples have been produced for each channel at that point,
|
||||||
|
then checking truth deltas immediately after."""
|
||||||
|
import sys
|
||||||
|
sys.path.insert(0, ".")
|
||||||
|
from analysis.load_bundle import _parse_txt
|
||||||
|
from minimateplus.waveform_codec import walk_body, find_data_start
|
||||||
|
|
||||||
|
|
||||||
|
def s4(n):
|
||||||
|
return n if n < 8 else n - 16
|
||||||
|
|
||||||
|
|
||||||
|
def i8(b):
|
||||||
|
return b if b < 128 else b - 256
|
||||||
|
|
||||||
|
|
||||||
|
def s12(v):
|
||||||
|
return v if v < 0x800 else v - 0x1000
|
||||||
|
|
||||||
|
|
||||||
|
def unpack_12bit_be_contiguous(data):
|
||||||
|
out = []
|
||||||
|
val = int.from_bytes(data, "big")
|
||||||
|
n = len(data) * 8 // 12
|
||||||
|
for i in range(n):
|
||||||
|
d = (val >> (12 * (n - 1 - i))) & 0xFFF
|
||||||
|
out.append(s12(d))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def unpack_12bit_per_triplet_be(data):
|
||||||
|
out = []
|
||||||
|
for i in range(0, len(data), 3):
|
||||||
|
if i + 2 >= len(data):
|
||||||
|
break
|
||||||
|
b0, b1, b2 = data[i], data[i + 1], data[i + 2]
|
||||||
|
d0 = (b0 << 4) | (b1 >> 4)
|
||||||
|
d1 = ((b1 & 0x0F) << 8) | b2
|
||||||
|
out.append(s12(d0))
|
||||||
|
out.append(s12(d1))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def simulate_up_to(blocks, target_block_idx, t_preamble):
|
||||||
|
"""Run the decoder up to block_idx; return per-channel sample lists."""
|
||||||
|
out = {"Tran": [], "Vert": [], "Long": [], "MicL": []}
|
||||||
|
out["Tran"].extend(t_preamble)
|
||||||
|
cur = {"Tran": t_preamble[-1], "Vert": None, "Long": None, "MicL": None}
|
||||||
|
rotation = ["Vert", "Long", "MicL", "Tran"]
|
||||||
|
seg_idx = [j for j, b in enumerate(blocks) if b.tag_hi == 0x40]
|
||||||
|
|
||||||
|
# Determine which channel we're CURRENTLY decoding into
|
||||||
|
current_channel = "Tran"
|
||||||
|
seg_counter = -1 # incremented at each 40 02
|
||||||
|
|
||||||
|
for j in range(target_block_idx):
|
||||||
|
blk = blocks[j]
|
||||||
|
if blk.tag_hi == 0x40:
|
||||||
|
# Switch: extend prev channel, set up new channel
|
||||||
|
seg_counter += 1
|
||||||
|
prev = "Tran" if seg_counter == 0 else rotation[(seg_counter - 1) % 4]
|
||||||
|
new_ch = rotation[seg_counter % 4]
|
||||||
|
if cur[prev] is not None:
|
||||||
|
d0 = int.from_bytes(blk.data[0:2], "big", signed=True)
|
||||||
|
d1 = int.from_bytes(blk.data[2:4], "big", signed=True)
|
||||||
|
cur[prev] += d0; out[prev].append(cur[prev])
|
||||||
|
cur[prev] += d1; out[prev].append(cur[prev])
|
||||||
|
c0 = int.from_bytes(blk.data[14:16], "big", signed=True)
|
||||||
|
c1 = int.from_bytes(blk.data[16:18], "big", signed=True)
|
||||||
|
out[new_ch].extend([c0, c1])
|
||||||
|
cur[new_ch] = c1
|
||||||
|
current_channel = new_ch
|
||||||
|
elif blk.tag_hi == 0x10:
|
||||||
|
for byte in blk.data:
|
||||||
|
for nib in ((byte >> 4) & 0xF, byte & 0xF):
|
||||||
|
cur[current_channel] += s4(nib)
|
||||||
|
out[current_channel].append(cur[current_channel])
|
||||||
|
elif blk.tag_hi == 0x20:
|
||||||
|
for byte in blk.data:
|
||||||
|
cur[current_channel] += i8(byte)
|
||||||
|
out[current_channel].append(cur[current_channel])
|
||||||
|
elif blk.tag_hi == 0x00:
|
||||||
|
for _ in range(blk.tag_lo):
|
||||||
|
out[current_channel].append(cur[current_channel])
|
||||||
|
elif blk.tag_hi == 0x30:
|
||||||
|
# Skip for now — we want to know what comes next
|
||||||
|
pass
|
||||||
|
|
||||||
|
return out, current_channel
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
for stem in ("M529LL1A.SP0", "M529LL1L.JQ0", "M529LL1L.V70",
|
||||||
|
"M529LL1A.SS0", "M529LL1A.SV0"):
|
||||||
|
path = f"tests/fixtures/5-11-26/{stem}"
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
body = f.read()[43:-26]
|
||||||
|
_, samples = _parse_txt(path + ".TXT")
|
||||||
|
blocks = walk_body(body, find_data_start(body))
|
||||||
|
t0 = int.from_bytes(body[3:5], "big", signed=True)
|
||||||
|
t1 = int.from_bytes(body[5:7], "big", signed=True)
|
||||||
|
|
||||||
|
# Find all 30 NN blocks in data section
|
||||||
|
thirty_blocks = [(j, b) for j, b in enumerate(blocks) if b.tag_hi == 0x30]
|
||||||
|
if not thirty_blocks:
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"\n=== {stem} ===")
|
||||||
|
for j, blk in thirty_blocks:
|
||||||
|
pred, ch = simulate_up_to(blocks, j, [t0, t1])
|
||||||
|
n_pred = len(pred[ch])
|
||||||
|
# The 30 NN block carries NN deltas for channel `ch` starting at sample n_pred
|
||||||
|
truth = [round(v * 200) for v in samples[ch]]
|
||||||
|
if n_pred >= len(truth):
|
||||||
|
continue
|
||||||
|
# Truth deltas: truth[n_pred] - cur, truth[n_pred+1] - truth[n_pred], ...
|
||||||
|
cur_val = pred[ch][-1]
|
||||||
|
nn = blk.tag_lo
|
||||||
|
truth_deltas = []
|
||||||
|
prev = cur_val
|
||||||
|
for k in range(min(nn, len(truth) - n_pred)):
|
||||||
|
truth_deltas.append(truth[n_pred + k] - prev)
|
||||||
|
prev = truth[n_pred + k]
|
||||||
|
|
||||||
|
print(f" block @ {blk.offset:>5} (chan={ch}, after sample {n_pred-1}, "
|
||||||
|
f"NN={nn}, last_val={cur_val}):")
|
||||||
|
print(f" data: {blk.data.hex(' ')}")
|
||||||
|
print(f" truth: {truth_deltas}")
|
||||||
|
schemes = [
|
||||||
|
("12-bit BE contiguous", unpack_12bit_be_contiguous(blk.data)),
|
||||||
|
("12-bit per-triplet BE", unpack_12bit_per_triplet_be(blk.data)),
|
||||||
|
]
|
||||||
|
for name, pred_deltas in schemes:
|
||||||
|
n_match = sum(1 for a, b in zip(pred_deltas, truth_deltas) if a == b)
|
||||||
|
tag = "✓" if pred_deltas == truth_deltas else " "
|
||||||
|
print(f" {tag}{n_match}/{nn} {name}: {pred_deltas[:nn]}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user