"""Test 12-bit signed packed deltas hypothesis for 30 NN blocks across all loud events. For each 30 NN block in each event, identify what samples it should cover (based on the cumulative delta count up to that point) and compare the truth deltas against various 12-bit packing schemes. """ import sys sys.path.insert(0, ".") from analysis.load_bundle import _parse_txt from minimateplus.waveform_codec import walk_body, find_data_start CHANNEL_ORDER = ["Vert", "Long", "MicL", "Tran"] # rotation after initial T def s12(v): """Sign-extend a 12-bit unsigned value to signed int.""" return v if v < 0x800 else v - 0x1000 def unpack_12bit_be(data): """4 deltas in 6 bytes, BE order: byte[0:1.5], byte[1.5:3], byte[3:4.5], byte[4.5:6].""" # bits 0..47 (MSB-first), split into 4 × 12-bit val = int.from_bytes(data, "big") out = [] for i in range(4): d = (val >> (12 * (3 - i))) & 0xFFF out.append(s12(d)) return out def unpack_12bit_le(data): """4 deltas in 6 bytes, LE order: bytes packed as 2 × 24-bit groups.""" out = [] # First 3 bytes contain 2 deltas b0, b1, b2 = data[0], data[1], data[2] d0 = b0 | ((b1 & 0x0F) << 8) d1 = (b1 >> 4) | (b2 << 4) out.append(s12(d0)) out.append(s12(d1)) # Next 3 bytes contain 2 more deltas b3, b4, b5 = data[3], data[4], data[5] d2 = b3 | ((b4 & 0x0F) << 8) d3 = (b4 >> 4) | (b5 << 4) out.append(s12(d2)) out.append(s12(d3)) return out def unpack_12bit_be_per_triplet(data): """4 deltas as 2 triplets of (high4, low8) BE within each 3-byte group.""" out = [] b0, b1, b2 = data[0], data[1], data[2] d0 = (b0 << 4) | (b1 >> 4) d1 = ((b1 & 0x0F) << 8) | b2 out.append(s12(d0)) out.append(s12(d1)) b3, b4, b5 = data[3], data[4], data[5] d2 = (b3 << 4) | (b4 >> 4) d3 = ((b4 & 0x0F) << 8) | b5 out.append(s12(d2)) out.append(s12(d3)) return out def truth_deltas_for_block(blocks, block_idx, event_truth, channel): """For a 30 NN block at block_idx, determine which samples it covers and return the truth deltas for those samples. Walks through all blocks before block_idx (within the same segment) and counts how many deltas have been emitted for *channel*, starting from the segment's anchor pair. """ # Find the segment header that contains this block. seg_header_idx = None for j in range(block_idx, -1, -1): if blocks[j].tag_hi == 0x40: seg_header_idx = j break if seg_header_idx is None: # block is in the initial T segment; samples count from sample 2. first_sample_in_segment = 2 else: # Anchor pair covers samples [N, N+1] for some N. Subsequent deltas # are samples [N+2, N+2+1, ...]. We don't actually need to know N # for this test — just the relative position within the segment. first_sample_in_segment = 2 # anchor=0,1; deltas start at 2 # Count deltas from segment-data start to block_idx. delta_count = 0 start_block = seg_header_idx + 1 if seg_header_idx is not None else 0 for j in range(start_block, block_idx): blk = blocks[j] if blk.tag_hi == 0x10: delta_count += blk.tag_lo # NN nibbles = NN deltas elif blk.tag_hi == 0x20: delta_count += blk.tag_lo # NN int8 deltas elif blk.tag_hi == 0x00: delta_count += blk.tag_lo # RLE zero deltas # Now the 30 NN block carries NN deltas. nn = blocks[block_idx].tag_lo # First sample affected: segment first_sample + delta_count. # But we ALSO need to know which segment this is, since the segment maps # to a specific channel and a specific starting absolute sample index. return first_sample_in_segment + delta_count, nn def main(): for stem in ("M529LL1A.SP0", "M529LL1L.JQ0", "M529LL1L.V70", "M529LL1A.SS0", "M529LL1A.SV0"): path = f"tests/fixtures/5-11-26/{stem}" with open(path, "rb") as f: body = f.read()[43:-26] _, samples = _parse_txt(path + ".TXT") blocks = walk_body(body, find_data_start(body)) seg_idx = [i for i, b in enumerate(blocks) if b.tag_hi == 0x40] # Find all 30 NN blocks in DATA section (not trailer). thirty_blocks = [] for bi, b in enumerate(blocks): if b.tag_hi != 0x30: continue # Determine which segment this is in seg_num = None for k, hi in enumerate(seg_idx): next_hi = seg_idx[k + 1] if k + 1 < len(seg_idx) else len(blocks) if hi < bi < next_hi: seg_num = k break if seg_num is None and seg_idx and bi < seg_idx[0]: seg_num = -1 # initial T segment thirty_blocks.append((bi, b, seg_num)) if not thirty_blocks: continue print(f"\n=== {stem} ===") for bi, b, seg_num in thirty_blocks: # Channel for this segment if seg_num == -1: channel = "Tran" seg_label = "initial T" else: channel = CHANNEL_ORDER[seg_num % 4] seg_label = f"seg {seg_num}" # Count deltas before this block within the same segment. seg_header_idx = seg_idx[seg_num] if seg_num >= 0 else -1 start_block = seg_header_idx + 1 if seg_header_idx >= 0 else 0 delta_count = 0 for j in range(start_block, bi): blk = blocks[j] if blk.tag_hi in (0x10, 0x20, 0x00): delta_count += blk.tag_lo # First sample this 30 NN block affects (within the segment) # = anchor positions + delta_count + 2 (since anchor pair was samples 0,1) # But the segment's first absolute sample index in the channel is # (seg_num // 4) * 512 (approximately) if segment 0 is the first V seg. cycle = (seg_num // 4) if seg_num >= 0 else 0 base = cycle * 512 + 2 # +2 for anchor pair sample_idx = base + delta_count truth_ch = [round(v * 200) for v in samples[channel]] nn = b.tag_lo if sample_idx + nn >= len(truth_ch): print(f" block @ {b.offset} ({seg_label} {channel}): out of truth range") continue # Get the previous sample so we can compute truth deltas if sample_idx == 0: prev = 0 else: prev = truth_ch[sample_idx - 1] truth_deltas = [] for k in range(nn): truth_deltas.append(truth_ch[sample_idx + k] - (prev if k == 0 else truth_ch[sample_idx + k - 1])) # Try each packing schemes = [ ("12-bit BE contiguous", unpack_12bit_be(b.data)), ("12-bit LE per-triplet", unpack_12bit_le(b.data)), ("12-bit BE per-triplet", unpack_12bit_be_per_triplet(b.data)), ] print(f" block @ {b.offset:>5} ({seg_label} {channel}, samples {sample_idx}..{sample_idx+nn-1}):") print(f" data: {b.data.hex(' ')}") print(f" truth: {truth_deltas}") for name, pred in schemes: match = "✓" if pred == truth_deltas else " " n_match = sum(1 for x, y in zip(pred, truth_deltas) if x == y) print(f" {match}{n_match}/4 {name}: {pred}") if __name__ == "__main__": main()