diff --git a/analysis/test_30nn_12bit.py b/analysis/test_30nn_12bit.py new file mode 100644 index 0000000..2e2d532 --- /dev/null +++ b/analysis/test_30nn_12bit.py @@ -0,0 +1,195 @@ +"""Test 12-bit signed packed deltas hypothesis for 30 NN blocks across all loud events. + +For each 30 NN block in each event, identify what samples it should cover +(based on the cumulative delta count up to that point) and compare the +truth deltas against various 12-bit packing schemes. +""" +import sys +sys.path.insert(0, ".") +from analysis.load_bundle import _parse_txt +from minimateplus.waveform_codec import walk_body, find_data_start + + +CHANNEL_ORDER = ["Vert", "Long", "MicL", "Tran"] # rotation after initial T + + +def s12(v): + """Sign-extend a 12-bit unsigned value to signed int.""" + return v if v < 0x800 else v - 0x1000 + + +def unpack_12bit_be(data): + """4 deltas in 6 bytes, BE order: byte[0:1.5], byte[1.5:3], byte[3:4.5], byte[4.5:6].""" + # bits 0..47 (MSB-first), split into 4 × 12-bit + val = int.from_bytes(data, "big") + out = [] + for i in range(4): + d = (val >> (12 * (3 - i))) & 0xFFF + out.append(s12(d)) + return out + + +def unpack_12bit_le(data): + """4 deltas in 6 bytes, LE order: bytes packed as 2 × 24-bit groups.""" + out = [] + # First 3 bytes contain 2 deltas + b0, b1, b2 = data[0], data[1], data[2] + d0 = b0 | ((b1 & 0x0F) << 8) + d1 = (b1 >> 4) | (b2 << 4) + out.append(s12(d0)) + out.append(s12(d1)) + # Next 3 bytes contain 2 more deltas + b3, b4, b5 = data[3], data[4], data[5] + d2 = b3 | ((b4 & 0x0F) << 8) + d3 = (b4 >> 4) | (b5 << 4) + out.append(s12(d2)) + out.append(s12(d3)) + return out + + +def unpack_12bit_be_per_triplet(data): + """4 deltas as 2 triplets of (high4, low8) BE within each 3-byte group.""" + out = [] + b0, b1, b2 = data[0], data[1], data[2] + d0 = (b0 << 4) | (b1 >> 4) + d1 = ((b1 & 0x0F) << 8) | b2 + out.append(s12(d0)) + out.append(s12(d1)) + b3, b4, b5 = data[3], data[4], data[5] + d2 = (b3 << 4) | (b4 >> 4) + d3 = ((b4 & 0x0F) << 8) | b5 + out.append(s12(d2)) + out.append(s12(d3)) + return out + + +def truth_deltas_for_block(blocks, block_idx, event_truth, channel): + """For a 30 NN block at block_idx, determine which samples it covers and + return the truth deltas for those samples. + + Walks through all blocks before block_idx (within the same segment) and + counts how many deltas have been emitted for *channel*, starting from the + segment's anchor pair. + """ + # Find the segment header that contains this block. + seg_header_idx = None + for j in range(block_idx, -1, -1): + if blocks[j].tag_hi == 0x40: + seg_header_idx = j + break + if seg_header_idx is None: + # block is in the initial T segment; samples count from sample 2. + first_sample_in_segment = 2 + else: + # Anchor pair covers samples [N, N+1] for some N. Subsequent deltas + # are samples [N+2, N+2+1, ...]. We don't actually need to know N + # for this test — just the relative position within the segment. + first_sample_in_segment = 2 # anchor=0,1; deltas start at 2 + + # Count deltas from segment-data start to block_idx. + delta_count = 0 + start_block = seg_header_idx + 1 if seg_header_idx is not None else 0 + for j in range(start_block, block_idx): + blk = blocks[j] + if blk.tag_hi == 0x10: + delta_count += blk.tag_lo # NN nibbles = NN deltas + elif blk.tag_hi == 0x20: + delta_count += blk.tag_lo # NN int8 deltas + elif blk.tag_hi == 0x00: + delta_count += blk.tag_lo # RLE zero deltas + # Now the 30 NN block carries NN deltas. + nn = blocks[block_idx].tag_lo + # First sample affected: segment first_sample + delta_count. + # But we ALSO need to know which segment this is, since the segment maps + # to a specific channel and a specific starting absolute sample index. + return first_sample_in_segment + delta_count, nn + + +def main(): + for stem in ("M529LL1A.SP0", "M529LL1L.JQ0", "M529LL1L.V70", + "M529LL1A.SS0", "M529LL1A.SV0"): + path = f"tests/fixtures/5-11-26/{stem}" + with open(path, "rb") as f: + body = f.read()[43:-26] + _, samples = _parse_txt(path + ".TXT") + blocks = walk_body(body, find_data_start(body)) + seg_idx = [i for i, b in enumerate(blocks) if b.tag_hi == 0x40] + + # Find all 30 NN blocks in DATA section (not trailer). + thirty_blocks = [] + for bi, b in enumerate(blocks): + if b.tag_hi != 0x30: + continue + # Determine which segment this is in + seg_num = None + for k, hi in enumerate(seg_idx): + next_hi = seg_idx[k + 1] if k + 1 < len(seg_idx) else len(blocks) + if hi < bi < next_hi: + seg_num = k + break + if seg_num is None and seg_idx and bi < seg_idx[0]: + seg_num = -1 # initial T segment + thirty_blocks.append((bi, b, seg_num)) + + if not thirty_blocks: + continue + + print(f"\n=== {stem} ===") + for bi, b, seg_num in thirty_blocks: + # Channel for this segment + if seg_num == -1: + channel = "Tran" + seg_label = "initial T" + else: + channel = CHANNEL_ORDER[seg_num % 4] + seg_label = f"seg {seg_num}" + + # Count deltas before this block within the same segment. + seg_header_idx = seg_idx[seg_num] if seg_num >= 0 else -1 + start_block = seg_header_idx + 1 if seg_header_idx >= 0 else 0 + delta_count = 0 + for j in range(start_block, bi): + blk = blocks[j] + if blk.tag_hi in (0x10, 0x20, 0x00): + delta_count += blk.tag_lo + + # First sample this 30 NN block affects (within the segment) + # = anchor positions + delta_count + 2 (since anchor pair was samples 0,1) + # But the segment's first absolute sample index in the channel is + # (seg_num // 4) * 512 (approximately) if segment 0 is the first V seg. + cycle = (seg_num // 4) if seg_num >= 0 else 0 + base = cycle * 512 + 2 # +2 for anchor pair + sample_idx = base + delta_count + truth_ch = [round(v * 200) for v in samples[channel]] + nn = b.tag_lo + + if sample_idx + nn >= len(truth_ch): + print(f" block @ {b.offset} ({seg_label} {channel}): out of truth range") + continue + + # Get the previous sample so we can compute truth deltas + if sample_idx == 0: + prev = 0 + else: + prev = truth_ch[sample_idx - 1] + truth_deltas = [] + for k in range(nn): + truth_deltas.append(truth_ch[sample_idx + k] - (prev if k == 0 else truth_ch[sample_idx + k - 1])) + + # Try each packing + schemes = [ + ("12-bit BE contiguous", unpack_12bit_be(b.data)), + ("12-bit LE per-triplet", unpack_12bit_le(b.data)), + ("12-bit BE per-triplet", unpack_12bit_be_per_triplet(b.data)), + ] + print(f" block @ {b.offset:>5} ({seg_label} {channel}, samples {sample_idx}..{sample_idx+nn-1}):") + print(f" data: {b.data.hex(' ')}") + print(f" truth: {truth_deltas}") + for name, pred in schemes: + match = "✓" if pred == truth_deltas else " " + n_match = sum(1 for x, y in zip(pred, truth_deltas) if x == y) + print(f" {match}{n_match}/4 {name}: {pred}") + + +if __name__ == "__main__": + main() diff --git a/analysis/test_30nn_v2.py b/analysis/test_30nn_v2.py new file mode 100644 index 0000000..2ef49f9 --- /dev/null +++ b/analysis/test_30nn_v2.py @@ -0,0 +1,141 @@ +"""Test 30 NN packing by running the real decoder up to each 30 NN block, +recording how many samples have been produced for each channel at that point, +then checking truth deltas immediately after.""" +import sys +sys.path.insert(0, ".") +from analysis.load_bundle import _parse_txt +from minimateplus.waveform_codec import walk_body, find_data_start + + +def s4(n): + return n if n < 8 else n - 16 + + +def i8(b): + return b if b < 128 else b - 256 + + +def s12(v): + return v if v < 0x800 else v - 0x1000 + + +def unpack_12bit_be_contiguous(data): + out = [] + val = int.from_bytes(data, "big") + n = len(data) * 8 // 12 + for i in range(n): + d = (val >> (12 * (n - 1 - i))) & 0xFFF + out.append(s12(d)) + return out + + +def unpack_12bit_per_triplet_be(data): + out = [] + for i in range(0, len(data), 3): + if i + 2 >= len(data): + break + b0, b1, b2 = data[i], data[i + 1], data[i + 2] + d0 = (b0 << 4) | (b1 >> 4) + d1 = ((b1 & 0x0F) << 8) | b2 + out.append(s12(d0)) + out.append(s12(d1)) + return out + + +def simulate_up_to(blocks, target_block_idx, t_preamble): + """Run the decoder up to block_idx; return per-channel sample lists.""" + out = {"Tran": [], "Vert": [], "Long": [], "MicL": []} + out["Tran"].extend(t_preamble) + cur = {"Tran": t_preamble[-1], "Vert": None, "Long": None, "MicL": None} + rotation = ["Vert", "Long", "MicL", "Tran"] + seg_idx = [j for j, b in enumerate(blocks) if b.tag_hi == 0x40] + + # Determine which channel we're CURRENTLY decoding into + current_channel = "Tran" + seg_counter = -1 # incremented at each 40 02 + + for j in range(target_block_idx): + blk = blocks[j] + if blk.tag_hi == 0x40: + # Switch: extend prev channel, set up new channel + seg_counter += 1 + prev = "Tran" if seg_counter == 0 else rotation[(seg_counter - 1) % 4] + new_ch = rotation[seg_counter % 4] + if cur[prev] is not None: + d0 = int.from_bytes(blk.data[0:2], "big", signed=True) + d1 = int.from_bytes(blk.data[2:4], "big", signed=True) + cur[prev] += d0; out[prev].append(cur[prev]) + cur[prev] += d1; out[prev].append(cur[prev]) + c0 = int.from_bytes(blk.data[14:16], "big", signed=True) + c1 = int.from_bytes(blk.data[16:18], "big", signed=True) + out[new_ch].extend([c0, c1]) + cur[new_ch] = c1 + current_channel = new_ch + elif blk.tag_hi == 0x10: + for byte in blk.data: + for nib in ((byte >> 4) & 0xF, byte & 0xF): + cur[current_channel] += s4(nib) + out[current_channel].append(cur[current_channel]) + elif blk.tag_hi == 0x20: + for byte in blk.data: + cur[current_channel] += i8(byte) + out[current_channel].append(cur[current_channel]) + elif blk.tag_hi == 0x00: + for _ in range(blk.tag_lo): + out[current_channel].append(cur[current_channel]) + elif blk.tag_hi == 0x30: + # Skip for now — we want to know what comes next + pass + + return out, current_channel + + +def main(): + for stem in ("M529LL1A.SP0", "M529LL1L.JQ0", "M529LL1L.V70", + "M529LL1A.SS0", "M529LL1A.SV0"): + path = f"tests/fixtures/5-11-26/{stem}" + with open(path, "rb") as f: + body = f.read()[43:-26] + _, samples = _parse_txt(path + ".TXT") + blocks = walk_body(body, find_data_start(body)) + t0 = int.from_bytes(body[3:5], "big", signed=True) + t1 = int.from_bytes(body[5:7], "big", signed=True) + + # Find all 30 NN blocks in data section + thirty_blocks = [(j, b) for j, b in enumerate(blocks) if b.tag_hi == 0x30] + if not thirty_blocks: + continue + + print(f"\n=== {stem} ===") + for j, blk in thirty_blocks: + pred, ch = simulate_up_to(blocks, j, [t0, t1]) + n_pred = len(pred[ch]) + # The 30 NN block carries NN deltas for channel `ch` starting at sample n_pred + truth = [round(v * 200) for v in samples[ch]] + if n_pred >= len(truth): + continue + # Truth deltas: truth[n_pred] - cur, truth[n_pred+1] - truth[n_pred], ... + cur_val = pred[ch][-1] + nn = blk.tag_lo + truth_deltas = [] + prev = cur_val + for k in range(min(nn, len(truth) - n_pred)): + truth_deltas.append(truth[n_pred + k] - prev) + prev = truth[n_pred + k] + + print(f" block @ {blk.offset:>5} (chan={ch}, after sample {n_pred-1}, " + f"NN={nn}, last_val={cur_val}):") + print(f" data: {blk.data.hex(' ')}") + print(f" truth: {truth_deltas}") + schemes = [ + ("12-bit BE contiguous", unpack_12bit_be_contiguous(blk.data)), + ("12-bit per-triplet BE", unpack_12bit_per_triplet_be(blk.data)), + ] + for name, pred_deltas in schemes: + n_match = sum(1 for a, b in zip(pred_deltas, truth_deltas) if a == b) + tag = "✓" if pred_deltas == truth_deltas else " " + print(f" {tag}{n_match}/{nn} {name}: {pred_deltas[:nn]}") + + +if __name__ == "__main__": + main()