fix: match BW's 5A frame probe to parse event-time metadata.

This commit is contained in:
Brian Harrison
2026-04-02 16:57:12 -04:00
parent 9bf20803c2
commit 66967e036c
2 changed files with 67 additions and 5 deletions

View File

@@ -90,6 +90,65 @@ def checksum(payload: bytes) -> int:
# ── BW→S3 frame builder ─────────────────────────────────────────────────────── # ── BW→S3 frame builder ───────────────────────────────────────────────────────
# SUB byte for 5A — used by build_5a_frame below (protocol.py has the full
# constant set; defined here to avoid a circular import).
SUB_5A = 0x5A
def build_5a_frame(offset_word: int, raw_params: bytes) -> bytes:
"""
Build a BW→S3 frame for SUB 5A (BULK_WAVEFORM_STREAM) that exactly
matches Blastware's wire output.
SUB 5A uses a DIFFERENT frame format from all other read commands:
1. The offset field (bytes [4:6]) is written RAW — the 0x10 in
offset_hi=0x10 is NOT DLE-stuffed, unlike build_bw_frame().
2. The checksum uses a DLE-aware sum: for each 0x10 XX pair in the
stuffed section, only XX contributes; lone bytes contribute normally.
This differs from the standard SUM8 checksum on the unstuffed payload.
Both differences are confirmed from the 1-2-26 BW TX capture (all 10 frames
verified against this algorithm on 2026-04-02).
Args:
offset_word: 16-bit offset (0x1004 for probe/chunks, 0x005A for term).
raw_params: 10 params bytes (from bulk_waveform_params or
bulk_waveform_term_params). 0x10 bytes in params ARE
DLE-stuffed (BW confirmed this for counter=0x1000 and
counter=0x1004 in the capture).
Returns:
Complete frame bytes: [ACK][STX][stuffed_section][chk][ETX]
"""
if len(raw_params) not in (10, 11):
raise ValueError(f"raw_params must be 10 or 11 bytes, got {len(raw_params)}")
# Build stuffed section between STX and checksum
s = bytearray()
s += b"\x10\x10" # DLE-stuffed BW_CMD
s += b"\x00" # flags
s += bytes([SUB_5A]) # sub = 0x5A
s += b"\x00" # field3
s += bytes([(offset_word >> 8) & 0xFF, # offset_hi — raw, NOT stuffed
offset_word & 0xFF]) # offset_lo
for b in raw_params: # params — DLE-stuffed
if b == DLE:
s.append(DLE)
s.append(b)
# DLE-aware checksum: for 0x10 XX pairs count XX; for lone bytes count them
chk, i = 0, 0
while i < len(s):
if s[i] == DLE and i + 1 < len(s):
chk = (chk + s[i + 1]) & 0xFF
i += 2
else:
chk = (chk + s[i]) & 0xFF
i += 1
return bytes([ACK, STX]) + bytes(s) + bytes([chk, ETX])
def build_bw_frame(sub: int, offset: int = 0, params: bytes = bytes(10)) -> bytes: def build_bw_frame(sub: int, offset: int = 0, params: bytes = bytes(10)) -> bytes:
""" """
Build a BW→S3 read-command frame. Build a BW→S3 read-command frame.
@@ -207,11 +266,13 @@ def bulk_waveform_params(key4: bytes, counter: int, *, is_probe: bool = False) -
is_probe: If True, embed full key4 (probe step only). is_probe: If True, embed full key4 (probe step only).
Returns: Returns:
10-byte params block. 11-byte params block. (BW confirmed: chunk frames carry 11 params bytes,
not 10; the extra trailing 0x00 was confirmed from 1-2-26 wire capture
on 2026-04-02.)
""" """
if len(key4) != 4: if len(key4) != 4:
raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}") raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}")
p = bytearray(10) p = bytearray(11) # 11 bytes confirmed from BW wire capture
p[0] = 0x00 p[0] = 0x00
p[1] = key4[0] p[1] = key4[0]
p[2] = key4[1] p[2] = key4[1]

View File

@@ -29,6 +29,7 @@ from .framing import (
S3Frame, S3Frame,
S3FrameParser, S3FrameParser,
build_bw_frame, build_bw_frame,
build_5a_frame,
waveform_key_params, waveform_key_params,
token_params, token_params,
bulk_waveform_params, bulk_waveform_params,
@@ -465,7 +466,7 @@ class MiniMateProtocol:
# ── Step 1: probe ──────────────────────────────────────────────────── # ── Step 1: probe ────────────────────────────────────────────────────
log.debug("5A probe key=%s", key4.hex()) log.debug("5A probe key=%s", key4.hex())
params = bulk_waveform_params(key4, 0, is_probe=True) params = bulk_waveform_params(key4, 0, is_probe=True)
self._send(build_bw_frame(SUB_BULK_WAVEFORM, _BULK_CHUNK_OFFSET, params)) self._send(build_5a_frame(_BULK_CHUNK_OFFSET, params))
rsp = self._recv_one(expected_sub=rsp_sub) rsp = self._recv_one(expected_sub=rsp_sub)
frames_data.append(rsp.data) frames_data.append(rsp.data)
log.debug("5A A5[0] page_key=0x%04X %d bytes", rsp.page_key, len(rsp.data)) log.debug("5A A5[0] page_key=0x%04X %d bytes", rsp.page_key, len(rsp.data))
@@ -475,7 +476,7 @@ class MiniMateProtocol:
counter = chunk_num * _BULK_COUNTER_STEP counter = chunk_num * _BULK_COUNTER_STEP
params = bulk_waveform_params(key4, counter) params = bulk_waveform_params(key4, counter)
log.debug("5A chunk %d counter=0x%04X", chunk_num, counter) log.debug("5A chunk %d counter=0x%04X", chunk_num, counter)
self._send(build_bw_frame(SUB_BULK_WAVEFORM, _BULK_CHUNK_OFFSET, params)) self._send(build_5a_frame(_BULK_CHUNK_OFFSET, params))
rsp = self._recv_one(expected_sub=rsp_sub) rsp = self._recv_one(expected_sub=rsp_sub)
if rsp.page_key == 0x0000: if rsp.page_key == 0x0000:
@@ -505,7 +506,7 @@ class MiniMateProtocol:
"5A termination term_counter=0x%04X offset=0x%04X", "5A termination term_counter=0x%04X offset=0x%04X",
term_counter, _BULK_TERM_OFFSET, term_counter, _BULK_TERM_OFFSET,
) )
self._send(build_bw_frame(SUB_BULK_WAVEFORM, _BULK_TERM_OFFSET, term_params)) self._send(build_5a_frame(_BULK_TERM_OFFSET, term_params))
try: try:
term_rsp = self._recv_one(expected_sub=rsp_sub) term_rsp = self._recv_one(expected_sub=rsp_sub)
log.debug( log.debug(