""" test_write_frames.py — Verify write frame construction against BW capture. Validates that build_bw_write_frame() reproduces the exact wire bytes that Blastware sent during the 3-11-26/170151 compliance-config write session. Frames tested (BW TX frame indices 102–112): 102 — SUB 0x68 event index write 103 — SUB 0x73 confirm B 104 — SUB 0x71 compliance write chunk 1 105 — SUB 0x71 compliance write chunk 2 106 — SUB 0x71 compliance write chunk 3 107 — SUB 0x72 confirm A 108 — SUB 0x82 trigger config write 109 — SUB 0x83 trigger confirm 110 — SUB 0x69 waveform data write 111 — SUB 0x74 confirm C 112 — SUB 0x72 confirm A (end of sequence) Run: python -m pytest tests/test_write_frames.py -v or: python tests/test_write_frames.py """ from __future__ import annotations import os import sys import pytest # Allow running from the project root without installation sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from minimateplus.framing import build_bw_write_frame # ── Capture loading ──────────────────────────────────────────────────────────── CAPTURE_PATH = os.path.join( os.path.dirname(__file__), "..", "bridges", "captures", "3-11-26", "raw_bw_20260311_170151.bin", ) def _load_bw_frames(path: str) -> list[bytes]: """ Parse a raw BW capture file into a list of BW frames. BW frames start with ACK=0x41 followed by STX=0x02. The frame boundary is the position of the NEXT 0x41 0x02 sequence (the ETX=0x03 terminator is the last byte before the next frame start). NOTE: A naive scan for ETX=0x03 fails because 0x03 can appear inside the DLE-stuffed payload. This parser uses consecutive 0x41 0x02 starts as boundaries, which is safe because the ACK byte (0x41) is never DLE-stuffed. """ with open(path, "rb") as f: raw = f.read() boundaries: list[int] = [] i = 0 while i < len(raw) - 1: if raw[i] == 0x41 and raw[i + 1] == 0x02: boundaries.append(i) i += 1 boundaries.append(len(raw)) frames = [] for k in range(len(boundaries) - 1): frames.append(raw[boundaries[k] : boundaries[k + 1]]) return frames def _destuff(data: bytes) -> bytes: """Undo DLE stuffing: replace every 0x10 0x10 pair with a single 0x10.""" result = bytearray() k = 0 while k < len(data): if data[k] == 0x10 and k + 1 < len(data) and data[k + 1] == 0x10: result.append(0x10) k += 2 else: result.append(data[k]) k += 1 return bytes(result) def _decode_bw_frame(wire: bytes) -> tuple[int, int, bytes, bytes, int]: """ Decode a BW wire frame into its components. Returns: (sub, offset, params, data, chk) sub — SUB byte (payload[2]) offset — uint16 from payload[4:6] params — 10-byte params field (payload[6:16]) data — write payload bytes (payload[16:-1]) chk — checksum byte (payload[-1]) """ inner = wire[2:-1] # strip ACK+STX and trailing ETX payload = _destuff(inner) sub = payload[2] offset = (payload[4] << 8) | payload[5] params = payload[6:16] data = payload[16:-1] chk = payload[-1] return sub, offset, params, data, chk # ── Test fixtures ────────────────────────────────────────────────────────────── @pytest.fixture(scope="module") def bw_frames() -> list[bytes]: if not os.path.exists(CAPTURE_PATH): pytest.skip(f"Capture file not found: {CAPTURE_PATH}") return _load_bw_frames(CAPTURE_PATH) # ── Individual frame tests ───────────────────────────────────────────────────── class TestWriteFrameReconstruction: """Verify build_bw_write_frame() reproduces the exact wire bytes from the capture.""" def test_frame_102_event_index_write_sub68(self, bw_frames: list[bytes]) -> None: """SUB 0x68 — event index write (frame 102).""" cap_wire = bw_frames[102] sub_cap, offset_cap, params_cap, data_cap, chk_cap = _decode_bw_frame(cap_wire) assert sub_cap == 0x68 assert params_cap == bytes(10) # Reconstruct using build_bw_write_frame with the same data and offset built = build_bw_write_frame(0x68, data_cap, offset=offset_cap, params=params_cap) assert built == cap_wire, ( f"SUB 0x68 wire mismatch\n" f" built: {built.hex()}\n" f" capt: {cap_wire.hex()}" ) def test_frame_103_confirm_b_sub73(self, bw_frames: list[bytes]) -> None: """SUB 0x73 — confirm B (zero-data confirm frame 103).""" cap_wire = bw_frames[103] sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) assert sub_cap == 0x73 assert data_cap == b"" assert offset_cap == 0x0000 built = build_bw_write_frame(0x73, b"") assert built == cap_wire def test_frame_104_compliance_chunk1_sub71(self, bw_frames: list[bytes]) -> None: """SUB 0x71 chunk 1 — 1027-byte compliance write (frame 104).""" cap_wire = bw_frames[104] sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) assert sub_cap == 0x71 assert offset_cap == 0x1004 assert params_cap == bytes(10) assert len(data_cap) == 1027 built = build_bw_write_frame( 0x71, data_cap, offset=0x1004, params=bytes(10), ) assert built == cap_wire def test_frame_105_compliance_chunk2_sub71(self, bw_frames: list[bytes]) -> None: """SUB 0x71 chunk 2 — 1055-byte compliance write (frame 105).""" cap_wire = bw_frames[105] sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) _CHUNK2_PARAMS = bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]) assert sub_cap == 0x71 assert offset_cap == 0x1004 assert params_cap == _CHUNK2_PARAMS assert len(data_cap) == 1055 built = build_bw_write_frame( 0x71, data_cap, offset=0x1004, params=_CHUNK2_PARAMS, ) assert built == cap_wire def test_frame_106_compliance_chunk3_sub71(self, bw_frames: list[bytes]) -> None: """SUB 0x71 chunk 3 — 46-byte compliance write (frame 106).""" cap_wire = bw_frames[106] sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) _CHUNK3_PARAMS = bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) assert sub_cap == 0x71 assert offset_cap == 0x002C assert params_cap == _CHUNK3_PARAMS assert len(data_cap) == 46 built = build_bw_write_frame( 0x71, data_cap, offset=0x002C, params=_CHUNK3_PARAMS, ) assert built == cap_wire def test_frame_107_confirm_a_sub72(self, bw_frames: list[bytes]) -> None: """SUB 0x72 — confirm A after compliance write (frame 107).""" cap_wire = bw_frames[107] sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) assert sub_cap == 0x72 assert data_cap == b"" assert offset_cap == 0x0000 built = build_bw_write_frame(0x72, b"") assert built == cap_wire def test_frame_108_trigger_config_write_sub82(self, bw_frames: list[bytes]) -> None: """SUB 0x82 — trigger config write (frame 108).""" cap_wire = bw_frames[108] sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) assert sub_cap == 0x82 assert params_cap == bytes(10) assert len(data_cap) == 29 # Verify offset formula: data[1] + 2 assert offset_cap == data_cap[1] + 2, ( f"Trigger write offset formula mismatch: " f"data[1]={data_cap[1]} → expected {data_cap[1]+2}, got {offset_cap}" ) built = build_bw_write_frame( 0x82, data_cap, offset=offset_cap, params=params_cap, ) assert built == cap_wire def test_frame_109_trigger_confirm_sub83(self, bw_frames: list[bytes]) -> None: """SUB 0x83 — trigger confirm (frame 109).""" cap_wire = bw_frames[109] sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire) assert sub_cap == 0x83 assert data_cap == b"" built = build_bw_write_frame(0x83, b"") assert built == cap_wire def test_frame_110_waveform_data_write_sub69(self, bw_frames: list[bytes]) -> None: """SUB 0x69 — waveform data write (frame 110).""" cap_wire = bw_frames[110] sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) assert sub_cap == 0x69 assert params_cap == bytes(10) assert len(data_cap) == 204 # Verify offset formula: data[1] + 2 assert offset_cap == data_cap[1] + 2, ( f"Waveform write offset formula mismatch: " f"data[1]={data_cap[1]} → expected {data_cap[1]+2}, got {offset_cap}" ) built = build_bw_write_frame( 0x69, data_cap, offset=offset_cap, params=params_cap, ) assert built == cap_wire def test_frame_111_confirm_c_sub74(self, bw_frames: list[bytes]) -> None: """SUB 0x74 — confirm C after waveform data write (frame 111).""" cap_wire = bw_frames[111] sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire) assert sub_cap == 0x74 assert data_cap == b"" built = build_bw_write_frame(0x74, b"") assert built == cap_wire def test_frame_112_confirm_a_sub72_end(self, bw_frames: list[bytes]) -> None: """SUB 0x72 — final confirm A at end of write sequence (frame 112).""" cap_wire = bw_frames[112] sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire) assert sub_cap == 0x72 assert data_cap == b"" built = build_bw_write_frame(0x72, b"") assert built == cap_wire class TestOffsetFormula: """Verify the offset = data[1] + 2 formula for single-chunk write commands.""" def test_event_index_offset_formula(self, bw_frames: list[bytes]) -> None: """Frame 102 (SUB 0x68): offset = data[1] + 2.""" _, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[102]) assert offset_cap == data_cap[1] + 2 def test_trigger_config_offset_formula(self, bw_frames: list[bytes]) -> None: """Frame 108 (SUB 0x82): offset = data[1] + 2.""" _, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[108]) assert offset_cap == data_cap[1] + 2 def test_waveform_data_offset_formula(self, bw_frames: list[bytes]) -> None: """Frame 110 (SUB 0x69): offset = data[1] + 2.""" _, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[110]) assert offset_cap == data_cap[1] + 2 class TestChecksumVerification: """Verify large-frame DLE-aware checksum for all write frames.""" def _verify_checksum(self, wire: bytes, label: str) -> None: inner = wire[2:-1] payload = _destuff(inner) chk = payload[-1] computed = (sum(b for b in payload[2:-1] if b != 0x10) + 0x10) & 0xFF assert computed == chk, ( f"{label}: checksum mismatch — computed=0x{computed:02X}, got=0x{chk:02X}" ) def test_all_write_frame_checksums(self, bw_frames: list[bytes]) -> None: write_frames = { 102: "SUB 0x68 event index write", 103: "SUB 0x73 confirm B", 104: "SUB 0x71 compliance chunk 1", 105: "SUB 0x71 compliance chunk 2", 106: "SUB 0x71 compliance chunk 3", 107: "SUB 0x72 confirm A", 108: "SUB 0x82 trigger config write", 109: "SUB 0x83 trigger confirm", 110: "SUB 0x69 waveform data write", 111: "SUB 0x74 confirm C", 112: "SUB 0x72 confirm A (end)", } for idx, label in write_frames.items(): self._verify_checksum(bw_frames[idx], f"Frame {idx} ({label})") class TestComplianceChunkSizes: """Verify compliance write chunk sizes and sequence.""" def test_chunk1_size(self, bw_frames: list[bytes]) -> None: _, _, _, data, _ = _decode_bw_frame(bw_frames[104]) assert len(data) == 1027, f"Chunk 1 should be 1027 bytes, got {len(data)}" def test_chunk2_size(self, bw_frames: list[bytes]) -> None: _, _, _, data, _ = _decode_bw_frame(bw_frames[105]) assert len(data) == 1055, f"Chunk 2 should be 1055 bytes, got {len(data)}" def test_chunk3_size(self, bw_frames: list[bytes]) -> None: _, _, _, data, _ = _decode_bw_frame(bw_frames[106]) assert len(data) == 46, f"Chunk 3 should be 46 bytes, got {len(data)}" def test_total_compliance_data(self, bw_frames: list[bytes]) -> None: total = sum( len(_decode_bw_frame(bw_frames[i])[3]) for i in [104, 105, 106] ) assert total == 2128, f"Total compliance write data should be 2128 bytes, got {total}" def test_chunk1_params(self, bw_frames: list[bytes]) -> None: _, _, params, _, _ = _decode_bw_frame(bw_frames[104]) assert params == bytes(10) def test_chunk2_params(self, bw_frames: list[bytes]) -> None: _, _, params, _, _ = _decode_bw_frame(bw_frames[105]) assert params == bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]) def test_chunk3_params(self, bw_frames: list[bytes]) -> None: _, _, params, _, _ = _decode_bw_frame(bw_frames[106]) assert params == bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) def test_chunk1_offset(self, bw_frames: list[bytes]) -> None: _, offset, _, _, _ = _decode_bw_frame(bw_frames[104]) assert offset == 0x1004 def test_chunk2_offset(self, bw_frames: list[bytes]) -> None: _, offset, _, _, _ = _decode_bw_frame(bw_frames[105]) assert offset == 0x1004 def test_chunk3_offset(self, bw_frames: list[bytes]) -> None: _, offset, _, _, _ = _decode_bw_frame(bw_frames[106]) assert offset == 0x002C # ── Standalone runner ────────────────────────────────────────────────────────── if __name__ == "__main__": if not os.path.exists(CAPTURE_PATH): print(f"ERROR: Capture file not found: {CAPTURE_PATH}") sys.exit(1) frames = _load_bw_frames(CAPTURE_PATH) print(f"Loaded {len(frames)} BW frames from capture") write_frame_indices = list(range(102, 113)) all_pass = True print() print(f"{'Frame':>6} {'SUB':>5} {'Offset':>8} {'DataLen':>8} {'Chk OK':>7} {'Rebuilt':>8}") print("-" * 60) for idx in write_frame_indices: wire = frames[idx] sub, offset, params, data, chk = _decode_bw_frame(wire) payload = _destuff(wire[2:-1]) computed = (sum(b for b in payload[2:-1] if b != 0x10) + 0x10) & 0xFF chk_ok = computed == chk built = build_bw_write_frame(sub, data, offset=offset, params=params) rebuilt_ok = built == wire status = "✅" if (chk_ok and rebuilt_ok) else "❌" print( f" {idx:4d} 0x{sub:02X} 0x{offset:04X} {len(data):8d} " f"{'✅' if chk_ok else '❌':>7} {'✅' if rebuilt_ok else '❌':>8} {status}" ) if not (chk_ok and rebuilt_ok): all_pass = False print() if all_pass: print("All 11 write frames verified ✅") else: print("FAILURES DETECTED ❌") sys.exit(1)