""" test_5a_protocol.py — Regression test for the v0.14.x SUB 5A protocol fixes. Verifies that SFM's framing helpers reproduce Blastware's exact wire bytes for every 5A request frame in the 5-1-26 "bwcap3sec" capture, AND that the file builder produces a byte-identical file when fed the BW capture's A5 responses. Together these two tests protect all four v0.14.x fixes: v0.14.0 — STRT-bounded chunk walk (probe @ 0, metadata pages @ 0x1002 + 0x1004, samples @ 0x0600..0x1E00 step 0x0200, TERM at residual) v0.14.1 — event-N probe counter is `start_offset`, not `start_offset+0x46` (covered by the multi-event captures, not this 3-sec event-1 capture — but the helpers are the same code path) v0.14.2 — file body assembly is contiguous concatenation, no de-duplication v0.14.3 — partial DLE stuffing of `0x10` bytes in 5A params (counter=0x1000 wire bytes are `10 10 00`, not `10 00`) If any of these fixes regresses, this test fails immediately with a clear byte-level diff. Run: python -m pytest tests/test_5a_protocol.py -v or: python tests/test_5a_protocol.py """ from __future__ import annotations import os import sys import pytest # Allow running from the project root without installation sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from minimateplus.framing import ( S3FrameParser, build_5a_frame, bulk_waveform_params, bulk_waveform_term_v2, ) # ── Capture loading ──────────────────────────────────────────────────────────── ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Reference BW MITM capture: BW saving a 3-sec event 0 (start_key=01110000, # end_offset=0x21F2). 17 5A frames: probe + 2 metadata pages + 13 samples + TERM. BW_TX_PATH = os.path.join( ROOT, "bridges/captures/5-1-26/comcheck/bwcap3sec/" "raw_bw_20260501_165723_copy_3sec_waveform_to_disk.bin", ) BW_S3_PATH = os.path.join( ROOT, "bridges/captures/5-1-26/comcheck/bwcap3sec/" "raw_s3_20260501_165723_copy_3sec_waveform_to_disk.bin", ) # BW's saved Blastware file for the same event (used for file-builder verification). BW_SAVED_FILE = os.path.join( ROOT, "example-events/decode_test/5-1-26/bw/M529LKIQ.G10", ) def _split_bw_frames(data: bytes) -> list[bytes]: """Split BW TX bytes into individual frames (ACK STX … bare ETX).""" frames: list[bytes] = [] i = 0 while i < len(data): if data[i] != 0x41 or i + 1 >= len(data) or data[i + 1] != 0x02: i += 1 continue j = i + 2 while j < len(data): if data[j] == 0x03: break if data[j] == 0x10 and j + 1 < len(data): j += 2 continue j += 1 if j >= len(data): break frames.append(data[i : j + 1]) i = j + 1 return frames @pytest.fixture(scope="module") def bw_5a_frames() -> list[bytes]: """All 5A frames from the BW TX capture, in wire order.""" if not os.path.exists(BW_TX_PATH): pytest.skip(f"BW capture not found: {BW_TX_PATH}") raw = open(BW_TX_PATH, "rb").read() frames = [ f for f in _split_bw_frames(raw) if len(f) >= 6 and f[5] == 0x5A # body[3] == 0x5A (SUB) ] assert len(frames) == 17, f"expected 17 5A frames in capture, got {len(frames)}" return frames @pytest.fixture(scope="module") def bw_a5_frames(): """All A5 (response) frames from the matching S3 capture.""" if not os.path.exists(BW_S3_PATH): pytest.skip(f"BW S3 capture not found: {BW_S3_PATH}") raw = open(BW_S3_PATH, "rb").read() p = S3FrameParser() p.feed(raw) a5 = [f for f in p.frames if f.sub == 0xA5] assert len(a5) == 17, f"expected 17 A5 frames in capture, got {len(a5)}" return a5 # ── 5A request frame byte-perfect verification ──────────────────────────────── KEY4 = bytes.fromhex("01110000") # start_key for the 3-sec event 0 END_OFFSET = 0x21F2 # parsed from STRT in the BW capture LAST_CHUNK_COUNTER = 0x1E00 # last full 0x0200-byte chunk before TERM SAMPLE_COUNTERS = ( 0x0600, 0x0800, 0x0A00, 0x0C00, 0x0E00, 0x1000, 0x1200, 0x1400, 0x1600, 0x1800, 0x1A00, 0x1C00, 0x1E00, ) def _meta_params(key: bytes, counter: int) -> bytes: """Build the 12-byte metadata-page params block (matches BW for 0x1002 / 0x1004).""" return bytes( [ 0x00, key[0], key[1], (counter >> 8) & 0xFF, counter & 0xFF, 0, 0, 0, 0, 0, 0, 0, ] ) def test_probe_frame_byte_perfect(bw_5a_frames): """Probe @ counter=0x0000 (frame 0).""" sfm = build_5a_frame(0x1002, bulk_waveform_params(KEY4, 0, is_probe=True)) assert sfm == bw_5a_frames[0], ( f"\nSFM: {sfm.hex()}\nBW: {bw_5a_frames[0].hex()}" ) @pytest.mark.parametrize("idx,counter", [(1, 0x1002), (2, 0x1004)]) def test_metadata_page_frames_byte_perfect(bw_5a_frames, idx, counter): """Metadata pages @ counter=0x1002 and 0x1004 (frames 1 and 2).""" sfm = build_5a_frame(0x1002, _meta_params(KEY4, counter)) assert sfm == bw_5a_frames[idx], ( f"\nSFM: {sfm.hex()}\nBW: {bw_5a_frames[idx].hex()}" ) @pytest.mark.parametrize("i,counter", list(enumerate(SAMPLE_COUNTERS))) def test_sample_chunk_frames_byte_perfect(bw_5a_frames, i, counter): """ Sample chunks @ counter=0x0600..0x1E00, step 0x0200 (frames 3..15). Critically, frame 8 (counter=0x1000) requires the v0.14.3 partial DLE stuffing fix — wire params include `10 10 00` for the counter, not `10 00`. """ sfm = build_5a_frame(0x1002, bulk_waveform_params(KEY4, counter)) bw_idx = 3 + i assert sfm == bw_5a_frames[bw_idx], ( f"\ncounter=0x{counter:04X}" f"\nSFM: {sfm.hex()}" f"\nBW: {bw_5a_frames[bw_idx].hex()}" ) def test_term_frame_byte_perfect(bw_5a_frames): """TERM frame at residual (frame 16).""" offset_word, params = bulk_waveform_term_v2(KEY4, END_OFFSET, LAST_CHUNK_COUNTER) sfm = build_5a_frame(offset_word, params) assert sfm == bw_5a_frames[16], ( f"\nSFM: {sfm.hex()}\nBW: {bw_5a_frames[16].hex()}" ) def test_strt_end_offset_parsing(bw_a5_frames): """The probe response (A5[0]) carries STRT at byte 17 with end_offset=0x21F2.""" from minimateplus.framing import parse_strt_end_offset end_offset = parse_strt_end_offset(bw_a5_frames[0].data) assert end_offset == END_OFFSET, ( f"expected end_offset=0x{END_OFFSET:04X}, got " f"{f'0x{end_offset:04X}' if end_offset is not None else 'None'}" ) # ── File builder byte-perfect verification ──────────────────────────────────── def test_blastware_file_builder_byte_perfect(bw_a5_frames): """ Feed the BW capture's A5 frames into write_blastware_file() and verify the output is byte-identical to BW's saved M529LKIQ.G10 reference file. This protects the v0.14.2 strip-removal fix and the file-builder skip values (probe=38, meta=13, samples=12, TERM=11). """ if not os.path.exists(BW_SAVED_FILE): pytest.skip(f"BW saved file not found: {BW_SAVED_FILE}") import tempfile from minimateplus.blastware_file import write_blastware_file from minimateplus.models import Event ev = Event(index=0) ev._waveform_key = KEY4 ev.rectime_seconds = 3 ev.timestamp = None # let the builder pull the footer from the TERM frame with tempfile.NamedTemporaryFile(suffix=".G10", delete=False) as tf: tmp_path = tf.name try: write_blastware_file(ev, bw_a5_frames, tmp_path) sfm_bytes = open(tmp_path, "rb").read() finally: os.unlink(tmp_path) bw_bytes = open(BW_SAVED_FILE, "rb").read() assert len(sfm_bytes) == len(bw_bytes), ( f"file size mismatch: SFM={len(sfm_bytes)} BW={len(bw_bytes)}" ) if sfm_bytes != bw_bytes: # Find first diff for actionable error message for i in range(len(bw_bytes)): if bw_bytes[i] != sfm_bytes[i]: ctx_start = max(0, i - 8) ctx_end = min(len(bw_bytes), i + 16) pytest.fail( f"file diverges at byte 0x{i:04X}\n" f" BW : {bw_bytes[ctx_start:ctx_end].hex()}\n" f" SFM: {sfm_bytes[ctx_start:ctx_end].hex()}\n" f" {' ' * (i - ctx_start)}^^" ) # ── Standalone runner ───────────────────────────────────────────────────────── if __name__ == "__main__": sys.exit(pytest.main([__file__, "-v"]))