codec-re: solve waveform body block framing; per-byte sample mapping still open
Decoded the structural framing of the Blastware waveform body — the bytes between the 21-byte STRT record and the 26-byte file footer. The body is a sequence of tagged variable-length blocks, NOT raw int16 LE. Five tag types (10/20/00/30/40 NN) and their lengths are now confirmed against the 4-event May 2026 fixture bundle. Body splits cleanly into ~16 segments (for a 1280-sample event) separated by 40 02 segment headers carrying a monotonically incrementing uint32 LE counter at bytes [8:12]. What's done: - minimateplus/waveform_codec.py — block walker, segment splitter, segment header parser. decode_waveform_v2 is a stub returning None until the byte-to-sample mapping is solved; client.py is unchanged. - tests/test_waveform_codec.py — 31 tests covering block detection, lengths, contiguous-walk, segment splitting, segment-header parsing, and counter monotonicity. All pass. - tests/fixtures/decode-re-5-8-26/ — bundled fixtures (4 events, BW binary + Blastware ASCII export each). - docs/instantel_protocol_reference.md §7.6.1 — replaced retraction box with the verified structural decoding plus an explicit list of what's still open. What's still open: the per-byte mapping inside 10 NN / 20 NN blocks. 96 channel-permutation × nibble-order × sign-convention combinations were brute-force tested; none match BW's ASCII export to within ±1 ADC count. The codec is more elaborate than uniform 4-bit deltas — likely a hybrid variable-bit-width scheme with segment-anchor resync points. Next recommended step: capture an event with a known calibration tone to pin down magnitude scaling. Walker also bails out partway through event-b (open issue documented in both the module and the protocol reference).
This commit is contained in:
@@ -0,0 +1,93 @@
|
||||
"""Brute-force test channel permutations / nibble orders on event-d (simplest signal)."""
|
||||
import sys
|
||||
import itertools
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
from minimateplus.waveform_codec import walk_body
|
||||
|
||||
|
||||
def s4(n):
|
||||
return n if n < 8 else n - 16
|
||||
|
||||
|
||||
def decode(body, channel_perm, nibble_order, sign_mode, init_from_header):
|
||||
"""Try one decoder configuration on event-d. Returns first 8 cumulative samples per channel."""
|
||||
blocks = walk_body(body)
|
||||
# Initial values from bytes [4:7] if init_from_header else 0
|
||||
if init_from_header:
|
||||
init = [body[4] if body[4] < 128 else body[4] - 256,
|
||||
body[5] if body[5] < 128 else body[5] - 256,
|
||||
body[6] if body[6] < 128 else body[6] - 256,
|
||||
0]
|
||||
else:
|
||||
init = [0, 0, 0, 0]
|
||||
cur = list(init)
|
||||
out = [[init[0]], [init[1]], [init[2]], [init[3]]] # sample 0 = init
|
||||
nibble_idx = 0 # within delta stream; channel = channel_perm[nibble_idx % 4]
|
||||
|
||||
# Walk only the 10 NN data blocks
|
||||
for blk in blocks:
|
||||
if blk.tag_hi != 0x10:
|
||||
continue
|
||||
for byte in blk.data:
|
||||
if nibble_order == 'high_first':
|
||||
nib1, nib2 = (byte >> 4) & 0xF, byte & 0xF
|
||||
else:
|
||||
nib1, nib2 = byte & 0xF, (byte >> 4) & 0xF
|
||||
for nib in (nib1, nib2):
|
||||
if sign_mode == 'signed':
|
||||
delta = s4(nib)
|
||||
else:
|
||||
delta = nib
|
||||
ch = channel_perm[nibble_idx % 4]
|
||||
cur[ch] += delta
|
||||
if (nibble_idx + 1) % 4 == 0:
|
||||
out[0].append(cur[0])
|
||||
out[1].append(cur[1])
|
||||
out[2].append(cur[2])
|
||||
out[3].append(cur[3])
|
||||
nibble_idx += 1
|
||||
if len(out[0]) >= 16:
|
||||
return out
|
||||
return out
|
||||
|
||||
|
||||
def best_match(pred, truth, n=10):
|
||||
"""Sum of squared differences in first n samples."""
|
||||
n = min(n, len(pred), len(truth))
|
||||
return sum((pred[i] - truth[i])**2 for i in range(n))
|
||||
|
||||
|
||||
def main():
|
||||
b = load_bundle("event-d")
|
||||
# truth in 16-count units
|
||||
tr = {ch: [round(v * 200) for v in b.samples[ch]] for ch in ("Tran", "Vert", "Long")}
|
||||
|
||||
print("Truth event-d first 10 samples:")
|
||||
for ch in ("Tran", "Vert", "Long"):
|
||||
print(f" {ch}: {tr[ch][:10]}")
|
||||
|
||||
# Test 96 combinations
|
||||
best = []
|
||||
for perm in itertools.permutations([0, 1, 2, 3]):
|
||||
for nibble_order in ('high_first', 'low_first'):
|
||||
for sign in ('signed', 'unsigned'):
|
||||
for init_h in (False, True):
|
||||
decoded = decode(b.body, perm, nibble_order, sign, init_h)
|
||||
# Score as TVL channel-sum
|
||||
score = sum(
|
||||
best_match(decoded[i], tr[ch], n=10)
|
||||
for i, ch in enumerate(("Tran", "Vert", "Long"))
|
||||
if i < 3
|
||||
)
|
||||
label = f"perm={perm} nib={nibble_order[:1]} sign={sign[:3]} init={init_h}"
|
||||
best.append((score, label, decoded))
|
||||
|
||||
best.sort(key=lambda x: x[0])
|
||||
print(f"\nTop 10 configurations:")
|
||||
for s, lbl, dec in best[:10]:
|
||||
print(f" score={s:>5} {lbl} T={dec[0][:8]} V={dec[1][:8]} L={dec[2][:8]}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,42 @@
|
||||
"""Compare event-c and event-d (same N_samples) to find header vs data bytes."""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
def main():
|
||||
bc = load_bundle("event-c")
|
||||
bd = load_bundle("event-d")
|
||||
|
||||
# Compare prefixes
|
||||
nc, nd = len(bc.body), len(bd.body)
|
||||
n = min(nc, nd)
|
||||
diffs = []
|
||||
for i in range(n):
|
||||
if bc.body[i] != bd.body[i]:
|
||||
diffs.append(i)
|
||||
print(f"event-c body={nc}, event-d body={nd}")
|
||||
print(f"Total diffs (first {n}): {len(diffs)}")
|
||||
|
||||
# Show common prefix
|
||||
same_prefix = 0
|
||||
for i in range(n):
|
||||
if bc.body[i] == bd.body[i]:
|
||||
same_prefix += 1
|
||||
else:
|
||||
break
|
||||
print(f"Common prefix length: {same_prefix}")
|
||||
print(f"event-c prefix: {bc.body[:same_prefix].hex(' ')}")
|
||||
|
||||
# Look for runs of common bytes
|
||||
print(f"\nFirst 32 diff positions: {diffs[:32]}")
|
||||
|
||||
# Show the "diff fingerprint" of the first 100 bytes
|
||||
print(f"\n pos c d")
|
||||
for i in range(0, 100):
|
||||
marker = " " if bc.body[i] == bd.body[i] else "*"
|
||||
bd_b = bd.body[i] if i < nd else None
|
||||
print(f" {i:>3} {bc.body[i]:02x}{marker} {bd_b:02x}" if bd_b is not None else f" {i:>3} {bc.body[i]:02x}{marker}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,99 @@
|
||||
"""
|
||||
Decoder v1: nibble-pair signed deltas in 10 NN blocks, 4-channel round-robin.
|
||||
"""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
def s4(n):
|
||||
return n if n < 8 else n - 16
|
||||
|
||||
|
||||
def walk_blocks(body, start):
|
||||
i = start
|
||||
blocks = []
|
||||
while i + 1 < len(body):
|
||||
t0, t1 = body[i], body[i + 1]
|
||||
if t0 == 0x10 and t1 % 4 == 0 and 0 < t1 <= 0xFC:
|
||||
length = t1 // 2 + 2
|
||||
data = bytes(body[i + 2 : i + length])
|
||||
blocks.append(("10", t1, data))
|
||||
i += length
|
||||
elif t0 == 0x20 and t1 % 4 == 0 and 0 < t1 <= 0xFC:
|
||||
length = t1 + 2
|
||||
data = bytes(body[i + 2 : i + length])
|
||||
blocks.append(("20", t1, data))
|
||||
i += length
|
||||
elif t0 == 0x00 and t1 % 4 == 0:
|
||||
blocks.append(("00", t1, b""))
|
||||
i += 2
|
||||
elif t0 == 0x30 and t1 % 4 == 0 and 0 < t1 <= 0x10:
|
||||
length = t1 * 4
|
||||
data = bytes(body[i + 2 : i + length])
|
||||
blocks.append(("30", t1, data))
|
||||
i += length
|
||||
elif t0 == 0x40 and t1 == 0x02:
|
||||
length = 20
|
||||
data = bytes(body[i + 2 : i + length])
|
||||
blocks.append(("40", t1, data))
|
||||
i += length
|
||||
else:
|
||||
blocks.append(("??", t0, bytes(body[i:i+8])))
|
||||
break
|
||||
return blocks
|
||||
|
||||
|
||||
def decode_v1(body, start, n_samples):
|
||||
"""Decode by accumulating nibble-pair deltas from all 10 NN blocks."""
|
||||
blocks = walk_blocks(body, start)
|
||||
# 4 channels: T, V, L, M
|
||||
cur = [0, 0, 0, 0]
|
||||
out = [[], [], [], []]
|
||||
sample_index = 0 # how many sample-sets emitted
|
||||
|
||||
for typ, NN, data in blocks:
|
||||
if typ == "10":
|
||||
# 2 nibbles per byte, round-robin TVLM
|
||||
for byte in data:
|
||||
for nib in ((byte >> 4) & 0xF, byte & 0xF):
|
||||
ch = sample_index % 4
|
||||
cur[ch] += s4(nib)
|
||||
out[ch].append(cur[ch])
|
||||
sample_index = (sample_index + 1) // 4 * 4 + (sample_index + 1) % 4 # ?
|
||||
sample_index += 1
|
||||
# We emit per-nibble, but the structure is unclear
|
||||
elif typ == "20":
|
||||
# int8 absolute or delta?
|
||||
for byte in data:
|
||||
v = byte if byte < 128 else byte - 256
|
||||
ch = sample_index % 4
|
||||
cur[ch] = v # treat as absolute
|
||||
out[ch].append(cur[ch])
|
||||
sample_index += 1
|
||||
return out
|
||||
|
||||
|
||||
def main():
|
||||
b = load_bundle("event-c")
|
||||
body = b.body
|
||||
truth_T = [round(v * 200) for v in b.samples["Tran"]]
|
||||
truth_V = [round(v * 200) for v in b.samples["Vert"]]
|
||||
truth_L = [round(v * 200) for v in b.samples["Long"]]
|
||||
|
||||
# Find start
|
||||
for s in range(15):
|
||||
if body[s] == 0x10 and body[s+1] % 4 == 0 and 0 < body[s+1] <= 0xFC:
|
||||
start = s
|
||||
break
|
||||
|
||||
blocks = walk_blocks(body, start)
|
||||
# Print block-by-block what's in each
|
||||
print(f"Total blocks: {len(blocks)}")
|
||||
bytes_processed = 0
|
||||
for typ, NN, data in blocks[:30]:
|
||||
print(f" type={typ} NN=0x{NN:02x} data_len={len(data)} data_hex={data[:32].hex(' ')}{'...' if len(data) > 32 else ''}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,27 @@
|
||||
"""Dump body bytes around a specific offset."""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
def dump_around(name: str, center: int, radius: int = 96):
|
||||
b = load_bundle(name)
|
||||
body = b.body
|
||||
start = max(0, center - radius)
|
||||
end = min(len(body), center + radius)
|
||||
print(f"\n=== {name} body[{start}:{end}] (full body={len(body)}) ===")
|
||||
for i in range(start, end, 32):
|
||||
row = body[i:i+32]
|
||||
marker = " <-- center" if i <= center < i+32 else ""
|
||||
print(f" +{i:>5} {row.hex(' ')}{marker}")
|
||||
|
||||
|
||||
def main():
|
||||
# Look at the trailer transitions
|
||||
trailer_starts = {"event-a": 7047, "event-b": 6475, "event-c": 4043, "event-d": 3941}
|
||||
for name, off in trailer_starts.items():
|
||||
dump_around(name, off, 96)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,18 @@
|
||||
"""Dump the START of each body in 32-byte rows."""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
def main():
|
||||
for name in ("event-a", "event-c"):
|
||||
b = load_bundle(name)
|
||||
body = b.body
|
||||
print(f"\n=== {name} body[0:512] (full body={len(body)}, samples={len(b.samples['Tran'])}) ===")
|
||||
for i in range(0, min(512, len(body)), 32):
|
||||
row = body[i:i+32]
|
||||
print(f" +{i:>5} {row.hex(' ')}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,24 @@
|
||||
"""Dump body bytes split into 32-byte rows starting from `start_offset`."""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
def dump(body: bytes, name: str, start: int, n_rows: int = 30):
|
||||
print(f"\n=== {name} body[{start}:] (full body={len(body)}) ===")
|
||||
end = min(start + 32 * n_rows, len(body))
|
||||
for i in range(start, end, 32):
|
||||
row = body[i:i+32]
|
||||
print(f" +{i:>5} {row.hex(' ')}")
|
||||
|
||||
|
||||
def main():
|
||||
for name in ("event-a", "event-b", "event-c", "event-d"):
|
||||
b = load_bundle(name)
|
||||
# Print the LAST ~600 bytes of the body to see the tail structure
|
||||
start = max(0, len(b.body) - 32 * 12)
|
||||
dump(b.body, name, start, 12)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,41 @@
|
||||
"""Search for structural repetition in the body bytes."""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
def find_pattern_offsets(body: bytes, pattern: bytes, max_count=20):
|
||||
out = []
|
||||
i = 0
|
||||
while True:
|
||||
i = body.find(pattern, i)
|
||||
if i < 0:
|
||||
break
|
||||
out.append(i)
|
||||
i += 1
|
||||
if len(out) >= max_count:
|
||||
break
|
||||
return out
|
||||
|
||||
|
||||
def main():
|
||||
for name in ("event-a", "event-b", "event-c", "event-d"):
|
||||
b = load_bundle(name)
|
||||
body = b.body
|
||||
print(f"\n=== {name} (body={len(body)}, N_samples={len(b.samples['Tran'])}) ===")
|
||||
|
||||
# Try to find repeating substructures (look for 4-byte 0x10-prefixed markers)
|
||||
for prefix in [b"\x10\x10", b"\x10\x04", b"\x10\x08", b"\x10\x0c", b"\x10\x18",
|
||||
b"\x10\x14", b"\x10\x20", b"\x10\x40", b"\x10\x80", b"\x10\x00",
|
||||
b"\x10\x01", b"\x10\x03", b"\x10\xf0", b"\xf1\x10", b"\x00\x10",
|
||||
b"\x40\x02", b"\x20\x04", b"\x30\x04", b"\x30\x08", b"\x00\x1a"]:
|
||||
offs = find_pattern_offsets(body, prefix, max_count=200)
|
||||
if 1 <= len(offs) <= 1000:
|
||||
# Print first 10 offsets
|
||||
first = offs[:6]
|
||||
last = offs[-3:]
|
||||
print(f" '{prefix.hex()}' x{len(offs):>4} first={first} last={last}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,34 @@
|
||||
"""Find body byte ranges that look like absolute int8 sample data (smooth waveform)."""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
def looks_like_smooth_int8(buf):
|
||||
"""Convert bytes to int8 and check if successive deltas are small (waveform-like)."""
|
||||
if len(buf) < 8:
|
||||
return 0.0
|
||||
vals = [b if b < 128 else b - 256 for b in buf]
|
||||
diffs = [abs(vals[i+1] - vals[i]) for i in range(len(vals)-1)]
|
||||
avg_diff = sum(diffs) / len(diffs)
|
||||
return avg_diff
|
||||
|
||||
|
||||
def main():
|
||||
for name in ("event-a", "event-c"):
|
||||
b = load_bundle(name)
|
||||
body = b.body
|
||||
# Scan with sliding window of 64 bytes; find segments where the bytes look like a smooth wave
|
||||
win = 64
|
||||
scores = []
|
||||
for i in range(len(body) - win):
|
||||
scores.append((i, looks_like_smooth_int8(body[i:i+win])))
|
||||
# Lowest avg_diff means smoothest
|
||||
scores.sort(key=lambda x: x[1])
|
||||
print(f"\n=== {name} (body={len(body)}) — smoothest 10 windows ===")
|
||||
for off, s in scores[:10]:
|
||||
print(f" +{off:>5} avg_diff={s:.2f} bytes={body[off:off+24].hex(' ')}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,23 @@
|
||||
"""Print raw body hex + byte-distribution stats for one event."""
|
||||
from collections import Counter
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
def main():
|
||||
for name in ("event-a", "event-b", "event-c", "event-d"):
|
||||
b = load_bundle(name)
|
||||
body = b.body
|
||||
print(f"\n=== {name} ({len(body)} body bytes) ===")
|
||||
print(f" STRT: {b.strt.hex()}")
|
||||
print(f" body[0:64]: {body[:64].hex()}")
|
||||
print(f" body[64:128]: {body[64:128].hex()}")
|
||||
print(f" body[-32:]: {body[-32:].hex()}")
|
||||
cnt = Counter(body)
|
||||
print(f" top 16 bytes: {[(f'0x{k:02x}', f'{v/len(body):.2%}') for k,v in cnt.most_common(16)]}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,142 @@
|
||||
"""
|
||||
load_bundle.py — extract body bytes from BW binary + parse sample columns from TXT.
|
||||
|
||||
Used by the codec reverse-engineering scripts in this directory.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
BUNDLE_ROOT = os.path.join(os.path.dirname(__file__), "..", "decode-re", "5-8-26")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Bundle:
|
||||
name: str
|
||||
bin_path: str
|
||||
txt_path: str
|
||||
bin: bytes
|
||||
body: bytes # bytes between STRT (43) and footer (last 26)
|
||||
strt: bytes # 21-byte STRT record
|
||||
samples: dict # {"Tran": [...], "Vert": [...], "Long": [...], "MicL": [...]}
|
||||
sample_rate: int
|
||||
rectime_sec: float
|
||||
pretrig_sec: float
|
||||
geo_range_ips: float
|
||||
ppv: dict # {"Tran": float, "Vert": float, "Long": float}
|
||||
mic_pspl: float
|
||||
serial: str
|
||||
|
||||
|
||||
def _parse_txt(path: str) -> dict:
|
||||
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
||||
text = f.read()
|
||||
|
||||
meta = {}
|
||||
samples = {"Tran": [], "Vert": [], "Long": [], "MicL": []}
|
||||
|
||||
# Find header line that starts the columns ("Tran Vert Long MicL").
|
||||
# Then every line after is sample data (4 tab-separated floats).
|
||||
lines = text.splitlines()
|
||||
header_idx = None
|
||||
for i, line in enumerate(lines):
|
||||
if "Tran" in line and "Vert" in line and "Long" in line and "MicL" in line:
|
||||
# The columns header. Sample lines start a few lines later.
|
||||
header_idx = i
|
||||
break
|
||||
if header_idx is None:
|
||||
raise ValueError(f"no Tran/Vert/Long/MicL header in {path}")
|
||||
|
||||
# Parse meta — quoted lines with "Field : value"
|
||||
for line in lines[:header_idx]:
|
||||
m = re.match(r'^"([^"]+)\s*:\s*([^"]*)"', line.strip())
|
||||
if m:
|
||||
k, v = m.group(1).strip(), m.group(2).strip()
|
||||
meta[k] = v
|
||||
|
||||
# Parse samples
|
||||
for line in lines[header_idx + 1 :]:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
parts = re.split(r"\s+", line)
|
||||
if len(parts) < 4:
|
||||
continue
|
||||
try:
|
||||
t = float(parts[0])
|
||||
v = float(parts[1])
|
||||
l = float(parts[2])
|
||||
m = float(parts[3])
|
||||
except ValueError:
|
||||
continue
|
||||
samples["Tran"].append(t)
|
||||
samples["Vert"].append(v)
|
||||
samples["Long"].append(l)
|
||||
samples["MicL"].append(m)
|
||||
|
||||
return meta, samples
|
||||
|
||||
|
||||
def load_bundle(name: str) -> Bundle:
|
||||
folder = os.path.join(BUNDLE_ROOT, name)
|
||||
files = os.listdir(folder)
|
||||
bin_name = next(f for f in files if not f.endswith(".TXT"))
|
||||
txt_name = next(f for f in files if f.endswith(".TXT"))
|
||||
|
||||
bin_path = os.path.join(folder, bin_name)
|
||||
txt_path = os.path.join(folder, txt_name)
|
||||
|
||||
with open(bin_path, "rb") as f:
|
||||
binary = f.read()
|
||||
|
||||
# Header is 22 bytes; STRT at [22:43]; footer at last 26 bytes.
|
||||
strt = binary[22:43]
|
||||
body = binary[43:-26]
|
||||
|
||||
meta, samples = _parse_txt(txt_path)
|
||||
|
||||
sample_rate = int(re.search(r"(\d+)", meta.get("Sample Rate", "1024")).group(1))
|
||||
rectime_sec = float(re.search(r"([\d.]+)", meta.get("Record Time", "3.0")).group(1))
|
||||
pretrig_sec = float(re.search(r"-?[\d.]+", meta.get("Pre-trigger Length", "0")).group(0))
|
||||
geo_range_ips = float(re.search(r"([\d.]+)", meta.get("Geo Range", "10.0")).group(1))
|
||||
serial = meta.get("Serial Number", "").strip()
|
||||
|
||||
def _f(s):
|
||||
return float(re.search(r"-?[\d.]+", s).group(0))
|
||||
|
||||
ppv = {
|
||||
"Tran": _f(meta.get("Tran PPV", "0")),
|
||||
"Vert": _f(meta.get("Vert PPV", "0")),
|
||||
"Long": _f(meta.get("Long PPV", "0")),
|
||||
}
|
||||
mic_pspl = _f(meta.get("MicL PSPL", "0"))
|
||||
|
||||
return Bundle(
|
||||
name=name,
|
||||
bin_path=bin_path,
|
||||
txt_path=txt_path,
|
||||
bin=binary,
|
||||
body=body,
|
||||
strt=strt,
|
||||
samples=samples,
|
||||
sample_rate=sample_rate,
|
||||
rectime_sec=rectime_sec,
|
||||
pretrig_sec=pretrig_sec,
|
||||
geo_range_ips=geo_range_ips,
|
||||
ppv=ppv,
|
||||
mic_pspl=mic_pspl,
|
||||
serial=serial,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
for name in ("event-a", "event-b", "event-c", "event-d"):
|
||||
b = load_bundle(name)
|
||||
n = len(b.samples["Tran"])
|
||||
print(f"{name}: body={len(b.body):>6} N_samples={n} rate={b.sample_rate} "
|
||||
f"rectime={b.rectime_sec} pretrig={b.pretrig_sec} range={b.geo_range_ips} "
|
||||
f"PPV(T,V,L)={b.ppv['Tran']:.3f},{b.ppv['Vert']:.3f},{b.ppv['Long']:.3f} "
|
||||
f"MicL={b.mic_pspl}")
|
||||
@@ -0,0 +1,67 @@
|
||||
"""Try various nibble-level channel interleavings to find which one matches truth."""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
def s4(n):
|
||||
return n if n < 8 else n - 16
|
||||
|
||||
|
||||
def run_decoder(body, layout, skip, n_channels=4):
|
||||
"""layout: function nibble_index -> channel_index. Returns list-of-lists per channel."""
|
||||
out = [[] for _ in range(n_channels)]
|
||||
cur = [0] * n_channels
|
||||
nibbles = []
|
||||
for byte in body[skip:]:
|
||||
nibbles.append((byte >> 4) & 0xF)
|
||||
nibbles.append(byte & 0xF)
|
||||
for i, n in enumerate(nibbles):
|
||||
ch = layout(i)
|
||||
cur[ch] += s4(n)
|
||||
out[ch].append(cur[ch])
|
||||
return out
|
||||
|
||||
|
||||
def cmp(pred, truth, n=24):
|
||||
n = min(n, len(pred), len(truth))
|
||||
return [(pred[i], truth[i]) for i in range(n)]
|
||||
|
||||
|
||||
def main():
|
||||
b = load_bundle("event-c")
|
||||
truth_T = [round(v * 200) for v in b.samples["Tran"]]
|
||||
truth_V = [round(v * 200) for v in b.samples["Vert"]]
|
||||
truth_L = [round(v * 200) for v in b.samples["Long"]]
|
||||
print(f"T truth[0:10]: {truth_T[:10]}")
|
||||
print(f"V truth[0:10]: {truth_V[:10]}")
|
||||
print(f"L truth[0:10]: {truth_L[:10]}")
|
||||
|
||||
# Try several nibble->channel layouts (4 channels)
|
||||
layouts = {
|
||||
"interleaved TVLM (0,1,2,3,0,1,2,3,...)": lambda i: i % 4,
|
||||
"interleaved VLMT": lambda i: (i + 3) % 4,
|
||||
"interleaved LMTV": lambda i: (i + 2) % 4,
|
||||
"interleaved MTVL": lambda i: (i + 1) % 4,
|
||||
"byte-based TV LM TV LM (high T low V byte0; high L low M byte1)": lambda i: i % 4,
|
||||
# "chunks of 8 nibbles per channel": each channel gets 8 nibbles in a row
|
||||
"chunks-8 TVLM": lambda i: (i // 8) % 4,
|
||||
"chunks-16 TVLM": lambda i: (i // 16) % 4,
|
||||
# planar (full channel sequential)
|
||||
"planar T(0..N) V(N..2N) L(2N..3N) M(3N..4N)": None, # special
|
||||
}
|
||||
|
||||
for label, layout_fn in layouts.items():
|
||||
if layout_fn is None:
|
||||
continue
|
||||
for skip in (0, 4, 7, 8, 9, 11, 14):
|
||||
out = run_decoder(b.body, layout_fn, skip)
|
||||
# Check first 8 cumulative on each channel
|
||||
print(f" skip={skip:2} {label}")
|
||||
print(f" T_cum[0:10]: {out[0][:10]}")
|
||||
print(f" V_cum[0:10]: {out[1][:10]}")
|
||||
print(f" L_cum[0:10]: {out[2][:10]}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,73 @@
|
||||
"""Try decoding body as 4-bit signed nibble deltas, 4-channel round-robin."""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
CHANNELS = ("Tran", "Vert", "Long", "MicL")
|
||||
|
||||
|
||||
def s4(n):
|
||||
"""Sign-extend a 4-bit unsigned to int (0..7 → 0..7, 8..F → -8..-1)."""
|
||||
return n if n < 8 else n - 16
|
||||
|
||||
|
||||
def decode_nibbles(body: bytes, skip_bytes: int = 7, n_channels: int = 4):
|
||||
"""Read body as 2 nibbles per byte; accumulate as deltas for n_channels round-robin."""
|
||||
out = [[] for _ in range(n_channels)]
|
||||
cur = [0] * n_channels
|
||||
ch = 0
|
||||
nibbles = []
|
||||
for byte in body[skip_bytes:]:
|
||||
nibbles.append((byte >> 4) & 0xF)
|
||||
nibbles.append(byte & 0xF)
|
||||
for n in nibbles:
|
||||
cur[ch] += s4(n)
|
||||
out[ch].append(cur[ch])
|
||||
ch = (ch + 1) % n_channels
|
||||
return out
|
||||
|
||||
|
||||
def cmp_to_truth(pred, truth, scale=16):
|
||||
"""Compare predicted ints (in 16-count units) to truth (in 16-count units = txt * 200).
|
||||
Return (max_abs_err, mean_abs_err, n_compared).
|
||||
"""
|
||||
n = min(len(pred), len(truth))
|
||||
errs = []
|
||||
for i in range(n):
|
||||
p = pred[i]
|
||||
t = truth[i]
|
||||
errs.append(abs(p - t))
|
||||
if not errs:
|
||||
return None
|
||||
return (max(errs), sum(errs) / len(errs), n)
|
||||
|
||||
|
||||
def main():
|
||||
for name in ("event-a", "event-c"):
|
||||
b = load_bundle(name)
|
||||
# Convert TXT samples (in/s) to 16-count units (multiply by 200, since 0.005 in/s = 1)
|
||||
# WAIT: 0.005 in/s = 16 ADC counts. 1 count = 0.000305 in/s.
|
||||
# So in 1-count units: count = txt * (1/0.0003052) ≈ txt * 3276.7
|
||||
# But TXT only has 0.005 resolution so equivalent to 16-count units = txt * 200.
|
||||
truth_in_16 = {ch: [round(v * 200) for v in b.samples[ch]] for ch in CHANNELS[:3]}
|
||||
# MicL is in dB, skip for now
|
||||
|
||||
# Try decoder with skip_bytes = 7
|
||||
decoded = decode_nibbles(b.body, skip_bytes=7, n_channels=4)
|
||||
print(f"\n=== {name} ===")
|
||||
print(f" body={len(b.body)}, nibbles={2*(len(b.body)-7)}, samples_per_ch={len(decoded[0])}")
|
||||
print(f" truth samples per ch: {len(truth_in_16['Tran'])}")
|
||||
# Print first 24 of each
|
||||
for i, chan in enumerate(CHANNELS):
|
||||
pred_first = decoded[i][:24]
|
||||
if chan in truth_in_16:
|
||||
truth_first = truth_in_16[chan][:24]
|
||||
print(f" {chan} pred: {pred_first}")
|
||||
print(f" {chan} truth: {truth_first}")
|
||||
else:
|
||||
print(f" {chan} pred: {pred_first} (truth in dB, skipped)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,44 @@
|
||||
"""Walk the body assuming chunks delimited by 0x10 NN tags. Print each chunk's structure."""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
def walk(body: bytes, start_offset: int = 7, max_chunks: int = 30):
|
||||
"""Find all positions where byte = 0x10 followed by a multiple-of-4 byte. Print chunks."""
|
||||
chunks = []
|
||||
i = start_offset
|
||||
while i < len(body) - 1:
|
||||
# Find next `10 NN` where NN is multiple of 4 (and not preceded by another 0x10 immediately, which would be data).
|
||||
if body[i] == 0x10 and (body[i+1] % 4 == 0):
|
||||
chunks.append(i)
|
||||
i += 1
|
||||
return chunks
|
||||
|
||||
|
||||
def main():
|
||||
for name in ("event-c", "event-d"):
|
||||
b = load_bundle(name)
|
||||
body = b.body
|
||||
positions = []
|
||||
i = 7 # skip 7-byte preamble
|
||||
while i < len(body) - 1:
|
||||
if body[i] == 0x10 and body[i+1] % 4 == 0 and body[i+1] > 0:
|
||||
positions.append(i)
|
||||
i += 2 # skip past tag
|
||||
else:
|
||||
i += 1
|
||||
print(f"\n=== {name} === body={len(body)}, total `10 NN` (NN%4==0, NN>0) tags: {len(positions)}")
|
||||
# Print first 20 chunks: show position, NN, gap to next tag
|
||||
for k in range(min(30, len(positions))):
|
||||
pos = positions[k]
|
||||
NN = body[pos + 1]
|
||||
next_pos = positions[k+1] if k+1 < len(positions) else len(body)
|
||||
gap = next_pos - pos
|
||||
data_bytes = body[pos+2 : next_pos]
|
||||
print(f" chunk[{k:>3}] @ {pos:>5} NN=0x{NN:02x} ({NN:>3}, NN/2={NN//2}) gap={gap:>3} "
|
||||
f"data={data_bytes[:24].hex(' ')}{'...' if len(data_bytes) > 24 else ''}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,50 @@
|
||||
"""Deterministic chunk walker: each chunk = [10 NN][NN/2 bytes data][2 bytes trailer]."""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
def walk_chunks(body: bytes, start: int = 7):
|
||||
"""Yield (offset, NN, data_bytes, trailer_bytes) tuples."""
|
||||
i = start
|
||||
while i + 1 < len(body):
|
||||
if body[i] != 0x10:
|
||||
break
|
||||
NN = body[i + 1]
|
||||
if NN == 0 or NN > 0x80 or NN % 4 != 0:
|
||||
break
|
||||
chunk_len = NN // 2 + 4
|
||||
if i + chunk_len > len(body):
|
||||
break
|
||||
data = bytes(body[i + 2 : i + 2 + NN // 2])
|
||||
trailer = bytes(body[i + 2 + NN // 2 : i + chunk_len])
|
||||
yield (i, NN, data, trailer)
|
||||
i += chunk_len
|
||||
|
||||
|
||||
def main():
|
||||
for name in ("event-c", "event-d", "event-a", "event-b"):
|
||||
b = load_bundle(name)
|
||||
body = b.body
|
||||
chunks = list(walk_chunks(body))
|
||||
print(f"\n=== {name} === body={len(body)} N_samples={len(b.samples['Tran'])}")
|
||||
print(f" chunks parsed: {len(chunks)}")
|
||||
if chunks:
|
||||
last = chunks[-1]
|
||||
end_of_walk = last[0] + last[1] // 2 + 4
|
||||
print(f" walk ended at offset {end_of_walk} (= {len(body) - end_of_walk} bytes from end)")
|
||||
# Stats
|
||||
total_data_bytes = sum(len(c[2]) for c in chunks)
|
||||
print(f" total data bytes: {total_data_bytes}, total nibbles: {2*total_data_bytes}")
|
||||
if name in ("event-c", "event-d"):
|
||||
ratio = (2 * total_data_bytes) / (len(b.samples['Tran']) * 4)
|
||||
print(f" nibbles per (sample × channel): {ratio:.3f}")
|
||||
# Sum of trailer second-byte
|
||||
trailer_sums = [c[3][-1] if c[3] else None for c in chunks]
|
||||
print(f" first 10 chunks: {[(c[0], c[1], c[3].hex()) for c in chunks[:10]]}")
|
||||
# Print last 10 chunks (likely transition to trailer)
|
||||
print(f" last 10 chunks: {[(c[0], c[1], c[3].hex()) for c in chunks[-10:]]}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,51 @@
|
||||
"""Walk chunks; auto-detect preamble length by finding first 10 NN."""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
def walk_chunks(body, start, max_NN=0x80):
|
||||
chunks = []
|
||||
i = start
|
||||
while i + 1 < len(body):
|
||||
if body[i] != 0x10:
|
||||
break
|
||||
NN = body[i + 1]
|
||||
if NN == 0 or NN > max_NN or NN % 4 != 0:
|
||||
break
|
||||
chunk_len = NN // 2 + 4
|
||||
if i + chunk_len > len(body):
|
||||
break
|
||||
data = bytes(body[i + 2 : i + 2 + NN // 2])
|
||||
trailer = bytes(body[i + 2 + NN // 2 : i + chunk_len])
|
||||
chunks.append((i, NN, data, trailer))
|
||||
i += chunk_len
|
||||
return chunks, i
|
||||
|
||||
|
||||
def find_first_chunk_start(body):
|
||||
"""Locate first byte that begins a `10 NN` chunk (NN ∈ multiples of 4, 4..0x7C)."""
|
||||
for i in range(20):
|
||||
if body[i] == 0x10 and body[i + 1] % 4 == 0 and 0 < body[i + 1] <= 0x7C:
|
||||
return i
|
||||
return -1
|
||||
|
||||
|
||||
def main():
|
||||
for name in ("event-c", "event-d", "event-a", "event-b"):
|
||||
b = load_bundle(name)
|
||||
body = b.body
|
||||
start = find_first_chunk_start(body)
|
||||
chunks, end = walk_chunks(body, start)
|
||||
print(f"\n=== {name} === body={len(body)} N_samples={len(b.samples['Tran'])} start={start}")
|
||||
print(f" chunks parsed: {len(chunks)}, walk ended at {end}")
|
||||
if chunks:
|
||||
print(f" first 5 chunks: {[(c[0], c[1], c[3].hex()) for c in chunks[:5]]}")
|
||||
print(f" last 5 chunks: {[(c[0], c[1], c[3].hex()) for c in chunks[-5:]]}")
|
||||
print(f" bytes around end of walk: {body[end-4:end+12].hex(' ')}")
|
||||
else:
|
||||
print(f" bytes at start: {body[start:start+16].hex(' ')}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,75 @@
|
||||
"""
|
||||
Walker v4: alternate [10 NN] data chunks and [00 NN] (or other) marker tags.
|
||||
|
||||
Hypothesis:
|
||||
- [10 NN]: data block, length NN/2 + 2 bytes (2-byte tag + NN/2 bytes data)
|
||||
- [00 NN]: 2-byte marker block (no data)
|
||||
- [20/30/40 NN]: special blocks with type-dependent length
|
||||
"""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
|
||||
|
||||
def walk(body, start):
|
||||
i = start
|
||||
blocks = []
|
||||
while i + 1 < len(body):
|
||||
t0 = body[i]
|
||||
t1 = body[i + 1]
|
||||
if t0 == 0x10 and t1 % 4 == 0 and 0 < t1 <= 0x80:
|
||||
# data chunk: length NN/2 + 2
|
||||
length = t1 // 2 + 2
|
||||
blocks.append((i, "10", t1, bytes(body[i + 2 : i + length]), length))
|
||||
i += length
|
||||
elif t0 == 0x00 and t1 % 4 == 0:
|
||||
# 2-byte marker
|
||||
blocks.append((i, "00", t1, b"", 2))
|
||||
i += 2
|
||||
elif t0 == 0x20 and t1 % 4 == 0:
|
||||
# type 2 — try length 2+t1/2 (similar to 10) OR fixed
|
||||
length = t1 // 2 + 2
|
||||
blocks.append((i, "20", t1, bytes(body[i + 2 : i + length]), length))
|
||||
i += length
|
||||
elif t0 == 0x30 and t1 % 4 == 0:
|
||||
length = t1 // 2 + 2
|
||||
blocks.append((i, "30", t1, bytes(body[i + 2 : i + length]), length))
|
||||
i += length
|
||||
elif t0 == 0x40 and t1 == 0x02:
|
||||
# Special "footer transition" block — try fixed 22 bytes
|
||||
length = 22
|
||||
blocks.append((i, "40", t1, bytes(body[i + 2 : i + length]), length))
|
||||
i += length
|
||||
else:
|
||||
# Unknown tag — stop
|
||||
blocks.append((i, "??", t0, bytes(body[i:i+8]), 0))
|
||||
break
|
||||
return blocks, i
|
||||
|
||||
|
||||
def main():
|
||||
for name in ("event-c", "event-d", "event-a", "event-b"):
|
||||
b = load_bundle(name)
|
||||
body = b.body
|
||||
# Auto-detect start
|
||||
for s in range(15):
|
||||
if body[s] == 0x10 and body[s+1] % 4 == 0 and 0 < body[s+1] <= 0x80:
|
||||
start = s
|
||||
break
|
||||
else:
|
||||
start = 7
|
||||
blocks, end = walk(body, start)
|
||||
# Categorize
|
||||
from collections import Counter
|
||||
types = Counter(b[1] for b in blocks)
|
||||
print(f"\n=== {name} === body={len(body)} N={len(b.samples['Tran'])} start={start}")
|
||||
print(f" total blocks: {len(blocks)}, walk ended at {end}/{len(body)}")
|
||||
print(f" type counts: {dict(types)}")
|
||||
# Print last 5 blocks
|
||||
print(f" last 5 blocks: {[(bb[0], bb[1], bb[2]) for bb in blocks[-5:]]}")
|
||||
if end < len(body):
|
||||
print(f" bytes at end: {body[end:end+24].hex(' ')}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,83 @@
|
||||
"""
|
||||
Walker v5: flexible NN range and multiple block-type lengths.
|
||||
|
||||
Hypothesis:
|
||||
- [10 NN]: 4-bit-delta data block, length = NN/2 + 2
|
||||
- [20 NN]: 8-bit-literal data block, length = NN + 2
|
||||
- [00 NN]: 2-byte marker (no payload)
|
||||
- [30 NN]: trailer/summary block, length = NN*4
|
||||
- [40 NN]: footer-marker block, fixed 22 bytes
|
||||
"""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
from collections import Counter
|
||||
|
||||
|
||||
def walk(body, start, max_blocks=10000):
|
||||
i = start
|
||||
blocks = []
|
||||
while i + 1 < len(body) and len(blocks) < max_blocks:
|
||||
t0 = body[i]
|
||||
t1 = body[i + 1]
|
||||
if t0 == 0x10 and t1 % 4 == 0 and 0 < t1 <= 0xFC:
|
||||
length = t1 // 2 + 2
|
||||
if i + length > len(body):
|
||||
break
|
||||
data = bytes(body[i + 2 : i + length])
|
||||
blocks.append((i, "10", t1, data, length))
|
||||
i += length
|
||||
elif t0 == 0x20 and t1 % 4 == 0 and 0 < t1 <= 0xFC:
|
||||
length = t1 + 2
|
||||
if i + length > len(body):
|
||||
break
|
||||
data = bytes(body[i + 2 : i + length])
|
||||
blocks.append((i, "20", t1, data, length))
|
||||
i += length
|
||||
elif t0 == 0x00 and t1 % 4 == 0:
|
||||
# 2-byte marker
|
||||
blocks.append((i, "00", t1, b"", 2))
|
||||
i += 2
|
||||
elif t0 == 0x30 and t1 % 4 == 0:
|
||||
length = t1 * 4
|
||||
if i + length > len(body):
|
||||
break
|
||||
data = bytes(body[i + 2 : i + length])
|
||||
blocks.append((i, "30", t1, data, length))
|
||||
i += length
|
||||
elif t0 == 0x40 and t1 == 0x02:
|
||||
length = 22
|
||||
if i + length > len(body):
|
||||
break
|
||||
data = bytes(body[i + 2 : i + length])
|
||||
blocks.append((i, "40", t1, data, length))
|
||||
i += length
|
||||
else:
|
||||
blocks.append((i, "??", t0, bytes(body[i:i+8]), 0))
|
||||
break
|
||||
return blocks, i
|
||||
|
||||
|
||||
def main():
|
||||
for name in ("event-c", "event-d", "event-a", "event-b"):
|
||||
b = load_bundle(name)
|
||||
body = b.body
|
||||
for s in range(15):
|
||||
if body[s] == 0x10 and body[s+1] % 4 == 0 and 0 < body[s+1] <= 0xFC:
|
||||
start = s; break
|
||||
else:
|
||||
start = 7
|
||||
blocks, end = walk(body, start)
|
||||
types = Counter(bb[1] for bb in blocks)
|
||||
print(f"\n=== {name} === body={len(body)} N={len(b.samples['Tran'])} start={start}")
|
||||
print(f" total blocks: {len(blocks)}, walk ended at {end}/{len(body)}")
|
||||
print(f" type counts: {dict(types)}")
|
||||
if blocks and blocks[-1][1] == "??":
|
||||
print(f" stopped at byte: 0x{blocks[-1][2]:02x}, prev 5 blocks: {[(bb[0], bb[1], bb[2]) for bb in blocks[-6:-1]]}")
|
||||
# Sum payload sizes by type
|
||||
payload_sizes = {t: sum(len(bb[3]) for bb in blocks if bb[1] == t) for t in types}
|
||||
print(f" payload bytes by type: {payload_sizes}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,68 @@
|
||||
"""
|
||||
Walker v6: handle 40 02 blocks correctly (length 20).
|
||||
|
||||
Block formats:
|
||||
- [10 NN]: 4-bit nibble delta data, length = NN/2 + 2
|
||||
- [20 NN]: int8 literal data, length = NN + 2
|
||||
- [00 NN]: 2-byte marker
|
||||
- [30 NN]: trailer/summary block, length = NN*4
|
||||
- [40 02]: segment header, fixed length 20
|
||||
"""
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
from analysis.load_bundle import load_bundle
|
||||
from collections import Counter
|
||||
|
||||
|
||||
def walk(body, start, max_blocks=10000):
|
||||
i = start
|
||||
blocks = []
|
||||
while i + 1 < len(body) and len(blocks) < max_blocks:
|
||||
t0 = body[i]
|
||||
t1 = body[i + 1]
|
||||
if t0 == 0x10 and t1 % 4 == 0 and 0 < t1 <= 0xFC:
|
||||
length = t1 // 2 + 2
|
||||
elif t0 == 0x20 and t1 % 4 == 0 and 0 < t1 <= 0xFC:
|
||||
length = t1 + 2
|
||||
elif t0 == 0x00 and t1 % 4 == 0:
|
||||
length = 2
|
||||
elif t0 == 0x30 and t1 % 4 == 0 and 0 < t1 <= 0x10:
|
||||
length = t1 * 4
|
||||
elif t0 == 0x40 and t1 == 0x02:
|
||||
length = 20
|
||||
else:
|
||||
blocks.append((i, "??", t0, bytes(body[i:i+8]), 0))
|
||||
break
|
||||
if i + length > len(body):
|
||||
break
|
||||
data = bytes(body[i + 2 : i + length])
|
||||
blocks.append((i, f"{t0:02x}", t1, data, length))
|
||||
i += length
|
||||
return blocks, i
|
||||
|
||||
|
||||
def main():
|
||||
for name in ("event-c", "event-d", "event-a", "event-b"):
|
||||
b = load_bundle(name)
|
||||
body = b.body
|
||||
for s in range(15):
|
||||
if body[s] == 0x10 and body[s+1] % 4 == 0 and 0 < body[s+1] <= 0xFC:
|
||||
start = s; break
|
||||
else:
|
||||
start = 7
|
||||
blocks, end = walk(body, start)
|
||||
types = Counter(bb[1] for bb in blocks)
|
||||
print(f"\n=== {name} === body={len(body)} N={len(b.samples['Tran'])} start={start}")
|
||||
print(f" total blocks: {len(blocks)}, walk ended at {end}/{len(body)}")
|
||||
print(f" type counts: {dict(types)}")
|
||||
if blocks and blocks[-1][1] == "??":
|
||||
print(f" stopped at byte: 0x{blocks[-1][2]:02x} at offset {blocks[-1][0]}")
|
||||
print(f" prev 5 blocks: {[(bb[0], bb[1], bb[2]) for bb in blocks[-6:-1]]}")
|
||||
print(f" bytes around stop: {body[end-4:end+24].hex(' ')}")
|
||||
# Sum
|
||||
payload_sizes = {t: sum(len(bb[3]) for bb in blocks if bb[1] == t) for t in types}
|
||||
print(f" payload bytes by type: {payload_sizes}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user