""" scratch/next_experiment_skeleton.py — segment-channel scoring analyzer. This is the suggested NEXT EXPERIMENT for cracking the waveform body codec. The goal is to figure out what segments 1+ contain, since segment 0 = Tran is solved but multi-segment continuation diverges from truth at sample ~512. ──────────────────────────────────────────────────────────────────────────── The hypothesis to test ──────────────────────────────────────────────────────────────────────────── Segments rotate through channels: segment 0 → Tran samples 0..509 segment 1 → Vert samples 0..507 segment 2 → Long samples 0..507 segment 3 → Mic samples 0..507 segment 4 → Tran samples 510..N (continuation) ... This would explain why segment 0 works perfectly (it's pure Tran) and why applying segment 1's blocks as Tran continuation gives wrong values (it's actually Vert). ──────────────────────────────────────────────────────────────────────────── What the analyzer should do ──────────────────────────────────────────────────────────────────────────── For each segment in each fixture event: 1. Run the segment-0 block-walker + RLE decode (the same algorithm that ``decode_tran_initial`` uses) over the segment's blocks. Start from some anchor value and produce a cumulative trajectory of length = number-of-deltas-in-segment. 2. For each candidate channel C ∈ {Tran, Vert, Long, MicL}: For each candidate anchor location in the segment-header payload (try [0:2], [2:4], [4:6], [14:16], [16:18] as int16 BE): Compare the decoded trajectory against truth[C] starting from the segment's first sample index. Score = number of matches (or sum of squared errors). 3. Report the best (channel, anchor-location) combination per segment. If the rotation hypothesis is correct, you'll see: segment 0 → best score for (Tran, preamble bytes [3:5]) ✓ already known segment 1 → best score for (Vert, ) segment 2 → best score for (Long, ) segment 3 → best score for (MicL, ) segment 4 → best score for (Tran, continuing from segment 0's end) If the rotation hypothesis is NOT correct, the scorer will at least narrow down what segment 1 actually carries. Maybe channels interleave at finer granularity, or maybe segments alternate by something other than channel. ──────────────────────────────────────────────────────────────────────────── Why this is a scoring analyzer, not a hand-written decoder ──────────────────────────────────────────────────────────────────────────── Direct hand-coding ("assume segment 1 is Vert with anchor at byte X") gets stuck when the assumption is wrong because the failure mode is silent — you get plausible-looking-but-wrong samples and have to manually diff against truth to debug. The scorer is brute-force but cheap: every fixture event × every segment × 4 channels × 5 anchor-byte candidates is only ~hundreds of comparisons. The winning combination jumps out by score. ──────────────────────────────────────────────────────────────────────────── Skeleton ──────────────────────────────────────────────────────────────────────────── """ from __future__ import annotations import os import re import sys from dataclasses import dataclass from typing import List, Optional, Tuple sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from minimateplus.waveform_codec import walk_body, find_data_start, WaveformBlock # ── Reusable pieces ────────────────────────────────────────────────────────── CHANNELS = ("Tran", "Vert", "Long", "MicL") LSB_INV = 200 # 1 in/s / 0.005 in/s/LSB; multiply BW-export floats by this # to get 16-count units (the body's native quantization). @dataclass class FixtureEvent: name: str # e.g. "M529LL1A.SP0" bin_path: str txt_path: str body: bytes truth: dict # {channel: list of int16-quantized samples} blocks: List[WaveformBlock] segment_starts: List[int] # block indices of each 40 02 segment header segment_sample_starts: List[int] # for each segment, the truth sample index it starts at def s4(n: int) -> int: """4-bit signed nibble decode.""" return n if n < 8 else n - 16 def i8(b: int) -> int: """int8 reinterpret of unsigned byte.""" return b if b < 128 else b - 256 def load_fixture(name: str) -> FixtureEvent: """Load a fixture event with its truth values and parsed block stream.""" # Find the fixture (search both subdirs of tests/fixtures/). base = os.path.join(os.path.dirname(__file__), "..", "tests", "fixtures") candidates = [ os.path.join(base, "5-11-26", name), os.path.join(base, "decode-re-5-8-26", "event-a", name), # not used directly ] bin_path = next((c for c in candidates if os.path.exists(c)), None) if bin_path is None: # Try a glob walk for the 5-8 fixtures (they're in subdirs). for root, _, files in os.walk(base): if name in files: bin_path = os.path.join(root, name) break if bin_path is None: raise FileNotFoundError(name) txt_path = bin_path + ".TXT" with open(bin_path, "rb") as f: raw = f.read() body = raw[43:-26] truth = _parse_txt(txt_path) blocks = walk_body(body, find_data_start(body)) seg_idx = [i for i, b in enumerate(blocks) if b.tag_hi == 0x40] # Segment 0 starts at sample 0; subsequent segments start at the # cumulative sample count from previous segment(s). Tran's segment 0 # is N samples; if rotation hypothesis is correct, segment 1's data # starts at sample 0 for a *different* channel. The analyzer should # try both "continues from previous segment" and "starts at sample 0 # of a different channel." seg_sample_starts = _compute_segment_sample_starts(blocks, seg_idx) return FixtureEvent( name=name, bin_path=bin_path, txt_path=txt_path, body=body, truth=truth, blocks=blocks, segment_starts=seg_idx, segment_sample_starts=seg_sample_starts, ) def _parse_txt(path: str) -> dict: """Parse BW ASCII TXT export into {channel: [int_samples_in_16_count_units]}.""" with open(path, "r", encoding="utf-8", errors="replace") as f: lines = f.read().splitlines() header_idx = next( (i for i, l in enumerate(lines) if all(c in l for c in CHANNELS)), None, ) if header_idx is None: return {ch: [] for ch in CHANNELS} out = {ch: [] for ch in CHANNELS} for line in lines[header_idx + 1:]: parts = re.split(r"\s+", line.strip()) if len(parts) < 4: continue try: vals = [float(p) for p in parts[:4]] except ValueError: continue for ch, v in zip(CHANNELS, vals): # Multiply by LSB_INV; geo channels are in in/s, MicL is in dB(L) # (which doesn't quantize the same way — leaving raw for MicL is fine, # the scorer should treat MicL specially). out[ch].append(round(v * LSB_INV) if ch != "MicL" else v) return out def _compute_segment_sample_starts( blocks: List[WaveformBlock], seg_idx: List[int] ) -> List[int]: """Cumulative sample-count up to each segment header (if all blocks treated as Tran continuation). Useful as one candidate for segment-1-Tran tests. The scorer should ALSO try "segment 1 starts at sample 0 of a new channel" as the rotation hypothesis predicts. """ starts = [] cum = 2 # T[0] + T[1] from preamble for i, b in enumerate(blocks): if i in seg_idx: starts.append(cum) if b.tag_hi == 0x10: cum += b.tag_lo elif b.tag_hi == 0x20: cum += b.tag_lo elif b.tag_hi == 0x00: cum += b.tag_lo # 30 NN and 40 02 don't contribute samples (for this hypothesis) return starts # ── The core algorithm: decode a segment's blocks as deltas ───────────────── def decode_segment_as_channel( blocks: List[WaveformBlock], seg_start_block_idx: int, seg_end_block_idx: int, anchor: int, ) -> List[int]: """Apply the segment-0 codec rules to a range of blocks, starting from *anchor*. Returns a list of cumulative sample values (one per delta). Does NOT include the anchor itself in the output — the first returned value is anchor + first_delta. """ out = [] cur = anchor for bi in range(seg_start_block_idx, seg_end_block_idx): blk = blocks[bi] if blk.tag_hi == 0x10: for byte in blk.data: for nib in ((byte >> 4) & 0xF, byte & 0xF): cur += s4(nib) out.append(cur) elif blk.tag_hi == 0x20: for byte in blk.data: cur += i8(byte) out.append(cur) elif blk.tag_hi == 0x00: for _ in range(blk.tag_lo): out.append(cur) # 30 NN: skip (content unknown) # 40 02: shouldn't appear in segment data (it's the segment header) return out def score_against_truth( decoded: List[int], truth: List[int], truth_start: int, ) -> Tuple[int, int]: """Compare *decoded* to truth[truth_start : truth_start + len(decoded)]. Returns (n_matches, n_compared). """ n = min(len(decoded), len(truth) - truth_start) if n <= 0: return (0, 0) matches = sum(1 for i in range(n) if decoded[i] == truth[truth_start + i]) return (matches, n) # ── TODO for the next pass ────────────────────────────────────────────────── def score_segment_against_all_channels( event: FixtureEvent, segment_index: int, ) -> List[Tuple[str, int, int, int]]: """For segment *segment_index* of *event*, find the best (channel, start_sample) fit. For each candidate channel C and each candidate starting truth-sample index s, we pick the anchor that makes the FIRST decoded value match truth[C][s], then score the remaining decoded values against truth[C][s+1 : s+N]. Returns rows of (channel_name, start_sample, n_matches, n_compared) sorted by match-count descending. """ # Block range of this segment: from the segment header (inclusive) up to # the next segment header (exclusive), or end-of-blocks. seg_header_idx = event.segment_starts[segment_index] next_header_idx = ( event.segment_starts[segment_index + 1] if segment_index + 1 < len(event.segment_starts) else len(event.blocks) ) # Decode the segment's data blocks (skip the segment-header block itself). # Use anchor=0 — we'll re-anchor when scoring against each channel. deltas_trajectory = decode_segment_as_channel( event.blocks, seg_header_idx + 1, next_header_idx, anchor=0 ) if not deltas_trajectory: return [] n = len(deltas_trajectory) results = [] for ch in ("Tran", "Vert", "Long"): truth = event.truth.get(ch) if not truth or len(truth) < n + 1: continue # For each candidate starting sample s in truth, check if applying # the deltas starting from truth[s] reproduces truth[s+1:s+n+1]. best = (0, -1) for s in range(len(truth) - n): anchor = truth[s] offset = anchor - deltas_trajectory[0] + truth[s + 1] - anchor # Recompute: trajectory[i] = anchor + cumulative_delta_through_i # but we already have deltas_trajectory computed from anchor=0, # so trajectory_relative[i] = anchor + deltas_trajectory[i]. matches = 0 for i in range(n): if truth[s + i + 1] == anchor + deltas_trajectory[i]: matches += 1 # Note: we could break early on first mismatch for "matches start", # but counting total matches gives a more robust score. if matches > best[0]: best = (matches, s) results.append((ch, best[1], best[0], n)) results.sort(key=lambda r: -r[2]) return results # ── Driver ────────────────────────────────────────────────────────────────── def main(): """Run the analyzer on all loud-bundle events and print best scores.""" events = ["M529LL1A.SP0", "M529LL1A.SS0", "M529LL1A.SV0", "M529LL1L.JQ0", "M529LL1L.V70"] for name in events: try: event = load_fixture(name) except FileNotFoundError: print(f"{name}: fixture not found") continue print(f"\n=== {name} ===") print(f" body bytes: {len(event.body)}") print(f" blocks: {len(event.blocks)}") print(f" segments: {len(event.segment_starts)}") print(f" segment sample-starts (if all blocks are 1 channel):") for si, sample_start in enumerate(event.segment_sample_starts): print(f" seg {si}: sample {sample_start}") for si in range(len(event.segment_starts)): results = score_segment_against_all_channels(event, si) if not results: print(f" seg {si}: (no scorable data)") continue tag = "✓" if results[0][2] / max(results[0][3], 1) > 0.9 else " " top = results[0] print(f" seg {si}: best fit {tag} = {top[0]:<5} " f"starting at sample {top[1]:>5}, {top[2]:>4}/{top[3]:<4} match" + (f" (next: {results[1][0]} @{results[1][1]} {results[1][2]}/{results[1][3]})" if len(results) > 1 else "")) if __name__ == "__main__": main()