"""Test 30 NN packing by running the real decoder up to each 30 NN block, recording how many samples have been produced for each channel at that point, then checking truth deltas immediately after.""" import sys sys.path.insert(0, ".") from analysis.load_bundle import _parse_txt from minimateplus.waveform_codec import walk_body, find_data_start def s4(n): return n if n < 8 else n - 16 def i8(b): return b if b < 128 else b - 256 def s12(v): return v if v < 0x800 else v - 0x1000 def unpack_12bit_be_contiguous(data): out = [] val = int.from_bytes(data, "big") n = len(data) * 8 // 12 for i in range(n): d = (val >> (12 * (n - 1 - i))) & 0xFFF out.append(s12(d)) return out def unpack_12bit_per_triplet_be(data): out = [] for i in range(0, len(data), 3): if i + 2 >= len(data): break b0, b1, b2 = data[i], data[i + 1], data[i + 2] d0 = (b0 << 4) | (b1 >> 4) d1 = ((b1 & 0x0F) << 8) | b2 out.append(s12(d0)) out.append(s12(d1)) return out def simulate_up_to(blocks, target_block_idx, t_preamble): """Run the decoder up to block_idx; return per-channel sample lists.""" out = {"Tran": [], "Vert": [], "Long": [], "MicL": []} out["Tran"].extend(t_preamble) cur = {"Tran": t_preamble[-1], "Vert": None, "Long": None, "MicL": None} rotation = ["Vert", "Long", "MicL", "Tran"] seg_idx = [j for j, b in enumerate(blocks) if b.tag_hi == 0x40] # Determine which channel we're CURRENTLY decoding into current_channel = "Tran" seg_counter = -1 # incremented at each 40 02 for j in range(target_block_idx): blk = blocks[j] if blk.tag_hi == 0x40: # Switch: extend prev channel, set up new channel seg_counter += 1 prev = "Tran" if seg_counter == 0 else rotation[(seg_counter - 1) % 4] new_ch = rotation[seg_counter % 4] if cur[prev] is not None: d0 = int.from_bytes(blk.data[0:2], "big", signed=True) d1 = int.from_bytes(blk.data[2:4], "big", signed=True) cur[prev] += d0; out[prev].append(cur[prev]) cur[prev] += d1; out[prev].append(cur[prev]) c0 = int.from_bytes(blk.data[14:16], "big", signed=True) c1 = int.from_bytes(blk.data[16:18], "big", signed=True) out[new_ch].extend([c0, c1]) cur[new_ch] = c1 current_channel = new_ch elif blk.tag_hi == 0x10: for byte in blk.data: for nib in ((byte >> 4) & 0xF, byte & 0xF): cur[current_channel] += s4(nib) out[current_channel].append(cur[current_channel]) elif blk.tag_hi == 0x20: for byte in blk.data: cur[current_channel] += i8(byte) out[current_channel].append(cur[current_channel]) elif blk.tag_hi == 0x00: for _ in range(blk.tag_lo): out[current_channel].append(cur[current_channel]) elif blk.tag_hi == 0x30: # Skip for now — we want to know what comes next pass return out, current_channel def main(): for stem in ("M529LL1A.SP0", "M529LL1L.JQ0", "M529LL1L.V70", "M529LL1A.SS0", "M529LL1A.SV0"): path = f"tests/fixtures/5-11-26/{stem}" with open(path, "rb") as f: body = f.read()[43:-26] _, samples = _parse_txt(path + ".TXT") blocks = walk_body(body, find_data_start(body)) t0 = int.from_bytes(body[3:5], "big", signed=True) t1 = int.from_bytes(body[5:7], "big", signed=True) # Find all 30 NN blocks in data section thirty_blocks = [(j, b) for j, b in enumerate(blocks) if b.tag_hi == 0x30] if not thirty_blocks: continue print(f"\n=== {stem} ===") for j, blk in thirty_blocks: pred, ch = simulate_up_to(blocks, j, [t0, t1]) n_pred = len(pred[ch]) # The 30 NN block carries NN deltas for channel `ch` starting at sample n_pred truth = [round(v * 200) for v in samples[ch]] if n_pred >= len(truth): continue # Truth deltas: truth[n_pred] - cur, truth[n_pred+1] - truth[n_pred], ... cur_val = pred[ch][-1] nn = blk.tag_lo truth_deltas = [] prev = cur_val for k in range(min(nn, len(truth) - n_pred)): truth_deltas.append(truth[n_pred + k] - prev) prev = truth[n_pred + k] print(f" block @ {blk.offset:>5} (chan={ch}, after sample {n_pred-1}, " f"NN={nn}, last_val={cur_val}):") print(f" data: {blk.data.hex(' ')}") print(f" truth: {truth_deltas}") schemes = [ ("12-bit BE contiguous", unpack_12bit_be_contiguous(blk.data)), ("12-bit per-triplet BE", unpack_12bit_per_triplet_be(blk.data)), ] for name, pred_deltas in schemes: n_match = sum(1 for a, b in zip(pred_deltas, truth_deltas) if a == b) tag = "✓" if pred_deltas == truth_deltas else " " print(f" {tag}{n_match}/{nn} {name}: {pred_deltas[:nn]}") if __name__ == "__main__": main()