ae0e17b5dc
Three things to make pickup smoother: 1. analysis/README.md (NEW): catalogues the ~25 scratch scripts. Categorizes them as "still useful" / "superseded — keep for archaeology" / "pure exploration". Tells a fresh engineer which files to read first and which to ignore. 2. scratch/next_experiment_skeleton.py (NEW): stub + spec for the segment-channel scoring analyzer. Includes the fixture loader, block walker, and decode-segment-as-channel helper — just enough scaffolding that the next pass starts from "fill in score_segment_against_all_channels()" rather than from scratch. Already runs and confirms 13 segments per 3-sec event with sample starts going to 6590 (way past the 3328 actual samples) — strong evidence that not all segments carry Tran. 3. Removed decode-re/ duplicate. It was a mirror of tests/fixtures/. Analysis scripts that hardcoded decode-re/ paths updated to point at tests/fixtures/. CLAUDE.md note updated: future event uploads go directly into a dated subdirectory under tests/fixtures/. All 40 tests still pass. Skeleton runs.
322 lines
14 KiB
Python
322 lines
14 KiB
Python
"""
|
||
scratch/next_experiment_skeleton.py — segment-channel scoring analyzer.
|
||
|
||
This is the suggested NEXT EXPERIMENT for cracking the waveform body codec.
|
||
The goal is to figure out what segments 1+ contain, since segment 0 = Tran
|
||
is solved but multi-segment continuation diverges from truth at sample ~512.
|
||
|
||
────────────────────────────────────────────────────────────────────────────
|
||
The hypothesis to test
|
||
────────────────────────────────────────────────────────────────────────────
|
||
|
||
Segments rotate through channels:
|
||
|
||
segment 0 → Tran samples 0..509
|
||
segment 1 → Vert samples 0..507
|
||
segment 2 → Long samples 0..507
|
||
segment 3 → Mic samples 0..507
|
||
segment 4 → Tran samples 510..N (continuation)
|
||
...
|
||
|
||
This would explain why segment 0 works perfectly (it's pure Tran) and why
|
||
applying segment 1's blocks as Tran continuation gives wrong values
|
||
(it's actually Vert).
|
||
|
||
────────────────────────────────────────────────────────────────────────────
|
||
What the analyzer should do
|
||
────────────────────────────────────────────────────────────────────────────
|
||
|
||
For each segment in each fixture event:
|
||
|
||
1. Run the segment-0 block-walker + RLE decode (the same algorithm that
|
||
``decode_tran_initial`` uses) over the segment's blocks. Start from
|
||
some anchor value and produce a cumulative trajectory of length =
|
||
number-of-deltas-in-segment.
|
||
|
||
2. For each candidate channel C ∈ {Tran, Vert, Long, MicL}:
|
||
For each candidate anchor location in the segment-header payload
|
||
(try [0:2], [2:4], [4:6], [14:16], [16:18] as int16 BE):
|
||
Compare the decoded trajectory against truth[C] starting from
|
||
the segment's first sample index.
|
||
Score = number of matches (or sum of squared errors).
|
||
|
||
3. Report the best (channel, anchor-location) combination per segment.
|
||
|
||
If the rotation hypothesis is correct, you'll see:
|
||
segment 0 → best score for (Tran, preamble bytes [3:5]) ✓ already known
|
||
segment 1 → best score for (Vert, <some-header-byte>)
|
||
segment 2 → best score for (Long, <some-header-byte>)
|
||
segment 3 → best score for (MicL, <some-header-byte>)
|
||
segment 4 → best score for (Tran, continuing from segment 0's end)
|
||
|
||
If the rotation hypothesis is NOT correct, the scorer will at least narrow
|
||
down what segment 1 actually carries. Maybe channels interleave at finer
|
||
granularity, or maybe segments alternate by something other than channel.
|
||
|
||
────────────────────────────────────────────────────────────────────────────
|
||
Why this is a scoring analyzer, not a hand-written decoder
|
||
────────────────────────────────────────────────────────────────────────────
|
||
|
||
Direct hand-coding ("assume segment 1 is Vert with anchor at byte X") gets
|
||
stuck when the assumption is wrong because the failure mode is silent —
|
||
you get plausible-looking-but-wrong samples and have to manually diff
|
||
against truth to debug.
|
||
|
||
The scorer is brute-force but cheap: every fixture event × every segment ×
|
||
4 channels × 5 anchor-byte candidates is only ~hundreds of comparisons.
|
||
The winning combination jumps out by score.
|
||
|
||
────────────────────────────────────────────────────────────────────────────
|
||
Skeleton
|
||
────────────────────────────────────────────────────────────────────────────
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import re
|
||
import sys
|
||
from dataclasses import dataclass
|
||
from typing import List, Optional, Tuple
|
||
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||
|
||
from minimateplus.waveform_codec import walk_body, find_data_start, WaveformBlock
|
||
|
||
|
||
# ── Reusable pieces ──────────────────────────────────────────────────────────
|
||
|
||
|
||
CHANNELS = ("Tran", "Vert", "Long", "MicL")
|
||
LSB_INV = 200 # 1 in/s / 0.005 in/s/LSB; multiply BW-export floats by this
|
||
# to get 16-count units (the body's native quantization).
|
||
|
||
|
||
@dataclass
|
||
class FixtureEvent:
|
||
name: str # e.g. "M529LL1A.SP0"
|
||
bin_path: str
|
||
txt_path: str
|
||
body: bytes
|
||
truth: dict # {channel: list of int16-quantized samples}
|
||
blocks: List[WaveformBlock]
|
||
segment_starts: List[int] # block indices of each 40 02 segment header
|
||
segment_sample_starts: List[int] # for each segment, the truth sample index it starts at
|
||
|
||
|
||
def s4(n: int) -> int:
|
||
"""4-bit signed nibble decode."""
|
||
return n if n < 8 else n - 16
|
||
|
||
|
||
def i8(b: int) -> int:
|
||
"""int8 reinterpret of unsigned byte."""
|
||
return b if b < 128 else b - 256
|
||
|
||
|
||
def load_fixture(name: str) -> FixtureEvent:
|
||
"""Load a fixture event with its truth values and parsed block stream."""
|
||
# Find the fixture (search both subdirs of tests/fixtures/).
|
||
base = os.path.join(os.path.dirname(__file__), "..", "tests", "fixtures")
|
||
candidates = [
|
||
os.path.join(base, "5-11-26", name),
|
||
os.path.join(base, "decode-re-5-8-26", "event-a", name), # not used directly
|
||
]
|
||
bin_path = next((c for c in candidates if os.path.exists(c)), None)
|
||
if bin_path is None:
|
||
# Try a glob walk for the 5-8 fixtures (they're in subdirs).
|
||
for root, _, files in os.walk(base):
|
||
if name in files:
|
||
bin_path = os.path.join(root, name)
|
||
break
|
||
if bin_path is None:
|
||
raise FileNotFoundError(name)
|
||
|
||
txt_path = bin_path + ".TXT"
|
||
with open(bin_path, "rb") as f:
|
||
raw = f.read()
|
||
body = raw[43:-26]
|
||
truth = _parse_txt(txt_path)
|
||
blocks = walk_body(body, find_data_start(body))
|
||
|
||
seg_idx = [i for i, b in enumerate(blocks) if b.tag_hi == 0x40]
|
||
# Segment 0 starts at sample 0; subsequent segments start at the
|
||
# cumulative sample count from previous segment(s). Tran's segment 0
|
||
# is N samples; if rotation hypothesis is correct, segment 1's data
|
||
# starts at sample 0 for a *different* channel. The analyzer should
|
||
# try both "continues from previous segment" and "starts at sample 0
|
||
# of a different channel."
|
||
seg_sample_starts = _compute_segment_sample_starts(blocks, seg_idx)
|
||
|
||
return FixtureEvent(
|
||
name=name, bin_path=bin_path, txt_path=txt_path,
|
||
body=body, truth=truth, blocks=blocks,
|
||
segment_starts=seg_idx, segment_sample_starts=seg_sample_starts,
|
||
)
|
||
|
||
|
||
def _parse_txt(path: str) -> dict:
|
||
"""Parse BW ASCII TXT export into {channel: [int_samples_in_16_count_units]}."""
|
||
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
||
lines = f.read().splitlines()
|
||
header_idx = next(
|
||
(i for i, l in enumerate(lines)
|
||
if all(c in l for c in CHANNELS)),
|
||
None,
|
||
)
|
||
if header_idx is None:
|
||
return {ch: [] for ch in CHANNELS}
|
||
out = {ch: [] for ch in CHANNELS}
|
||
for line in lines[header_idx + 1:]:
|
||
parts = re.split(r"\s+", line.strip())
|
||
if len(parts) < 4:
|
||
continue
|
||
try:
|
||
vals = [float(p) for p in parts[:4]]
|
||
except ValueError:
|
||
continue
|
||
for ch, v in zip(CHANNELS, vals):
|
||
# Multiply by LSB_INV; geo channels are in in/s, MicL is in dB(L)
|
||
# (which doesn't quantize the same way — leaving raw for MicL is fine,
|
||
# the scorer should treat MicL specially).
|
||
out[ch].append(round(v * LSB_INV) if ch != "MicL" else v)
|
||
return out
|
||
|
||
|
||
def _compute_segment_sample_starts(
|
||
blocks: List[WaveformBlock], seg_idx: List[int]
|
||
) -> List[int]:
|
||
"""Cumulative sample-count up to each segment header (if all blocks treated
|
||
as Tran continuation). Useful as one candidate for segment-1-Tran tests.
|
||
|
||
The scorer should ALSO try "segment 1 starts at sample 0 of a new channel"
|
||
as the rotation hypothesis predicts.
|
||
"""
|
||
starts = []
|
||
cum = 2 # T[0] + T[1] from preamble
|
||
for i, b in enumerate(blocks):
|
||
if i in seg_idx:
|
||
starts.append(cum)
|
||
if b.tag_hi == 0x10:
|
||
cum += b.tag_lo
|
||
elif b.tag_hi == 0x20:
|
||
cum += b.tag_lo
|
||
elif b.tag_hi == 0x00:
|
||
cum += b.tag_lo
|
||
# 30 NN and 40 02 don't contribute samples (for this hypothesis)
|
||
return starts
|
||
|
||
|
||
# ── The core algorithm: decode a segment's blocks as deltas ─────────────────
|
||
|
||
|
||
def decode_segment_as_channel(
|
||
blocks: List[WaveformBlock],
|
||
seg_start_block_idx: int,
|
||
seg_end_block_idx: int,
|
||
anchor: int,
|
||
) -> List[int]:
|
||
"""Apply the segment-0 codec rules to a range of blocks, starting from *anchor*.
|
||
|
||
Returns a list of cumulative sample values (one per delta). Does NOT include
|
||
the anchor itself in the output — the first returned value is anchor + first_delta.
|
||
"""
|
||
out = []
|
||
cur = anchor
|
||
for bi in range(seg_start_block_idx, seg_end_block_idx):
|
||
blk = blocks[bi]
|
||
if blk.tag_hi == 0x10:
|
||
for byte in blk.data:
|
||
for nib in ((byte >> 4) & 0xF, byte & 0xF):
|
||
cur += s4(nib)
|
||
out.append(cur)
|
||
elif blk.tag_hi == 0x20:
|
||
for byte in blk.data:
|
||
cur += i8(byte)
|
||
out.append(cur)
|
||
elif blk.tag_hi == 0x00:
|
||
for _ in range(blk.tag_lo):
|
||
out.append(cur)
|
||
# 30 NN: skip (content unknown)
|
||
# 40 02: shouldn't appear in segment data (it's the segment header)
|
||
return out
|
||
|
||
|
||
def score_against_truth(
|
||
decoded: List[int],
|
||
truth: List[int],
|
||
truth_start: int,
|
||
) -> Tuple[int, int]:
|
||
"""Compare *decoded* to truth[truth_start : truth_start + len(decoded)].
|
||
|
||
Returns (n_matches, n_compared).
|
||
"""
|
||
n = min(len(decoded), len(truth) - truth_start)
|
||
if n <= 0:
|
||
return (0, 0)
|
||
matches = sum(1 for i in range(n) if decoded[i] == truth[truth_start + i])
|
||
return (matches, n)
|
||
|
||
|
||
# ── TODO for the next pass ──────────────────────────────────────────────────
|
||
|
||
|
||
def score_segment_against_all_channels(
|
||
event: FixtureEvent,
|
||
segment_index: int,
|
||
) -> List[Tuple[str, str, int, int, int]]:
|
||
"""For segment *segment_index* of *event*, try decoding it as each channel
|
||
with each candidate anchor source.
|
||
|
||
Returns rows of (channel_name, anchor_source_label, anchor_value, n_matches, n_compared)
|
||
sorted by match count descending.
|
||
|
||
Anchor source candidates to try:
|
||
- "header[0:2]" int16 BE from segment header bytes [0:2]
|
||
- "header[2:4]" int16 BE from segment header bytes [2:4]
|
||
- "header[4:6]" int16 BE from segment header bytes [4:6]
|
||
- "header[14:16]" int16 BE from segment header bytes [14:16]
|
||
- "header[16:18]" int16 BE from segment header bytes [16:18]
|
||
- "channel[0]" truth[channel][0] (= "this segment starts at sample 0 of this channel")
|
||
- "channel[prev]" truth[channel][segment_sample_starts[segment_index] - 1]
|
||
(= "this segment continues from sample N-1 of this channel")
|
||
|
||
For each combination of (channel, anchor source, "starts at sample X of channel"),
|
||
decode the segment and score against truth.
|
||
|
||
TODO: implement this — it's the heart of the experiment.
|
||
"""
|
||
raise NotImplementedError("This is the next experiment to run.")
|
||
|
||
|
||
# ── Driver ──────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def main():
|
||
"""Run the analyzer on all loud-bundle events and print best scores."""
|
||
events = ["M529LL1A.SP0", "M529LL1A.SS0", "M529LL1A.SV0",
|
||
"M529LL1L.JQ0", "M529LL1L.V70"]
|
||
for name in events:
|
||
try:
|
||
event = load_fixture(name)
|
||
except FileNotFoundError:
|
||
print(f"{name}: fixture not found")
|
||
continue
|
||
|
||
print(f"\n=== {name} ===")
|
||
print(f" body bytes: {len(event.body)}")
|
||
print(f" blocks: {len(event.blocks)}")
|
||
print(f" segments: {len(event.segment_starts)}")
|
||
print(f" segment sample-starts (if all blocks are 1 channel):")
|
||
for si, sample_start in enumerate(event.segment_sample_starts):
|
||
print(f" seg {si}: sample {sample_start}")
|
||
|
||
# When score_segment_against_all_channels is implemented:
|
||
# for si in range(len(event.segment_starts)):
|
||
# results = score_segment_against_all_channels(event, si)
|
||
# best = results[0]
|
||
# print(f" seg {si}: best fit = {best}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|