429c6ac87a
test: add regression tests for v0.14.x SUB 5A protocol fixes refactor(logging): change warning logs to debug for less verbosity in write_blastware_file
253 lines
8.9 KiB
Python
253 lines
8.9 KiB
Python
"""
|
|
test_5a_protocol.py — Regression test for the v0.14.x SUB 5A protocol fixes.
|
|
|
|
Verifies that SFM's framing helpers reproduce Blastware's exact wire bytes
|
|
for every 5A request frame in the 5-1-26 "bwcap3sec" capture, AND that the
|
|
file builder produces a byte-identical file when fed the BW capture's A5
|
|
responses.
|
|
|
|
Together these two tests protect all four v0.14.x fixes:
|
|
|
|
v0.14.0 — STRT-bounded chunk walk (probe @ 0, metadata pages @ 0x1002 +
|
|
0x1004, samples @ 0x0600..0x1E00 step 0x0200, TERM at residual)
|
|
v0.14.1 — event-N probe counter is `start_offset`, not `start_offset+0x46`
|
|
(covered by the multi-event captures, not this 3-sec event-1
|
|
capture — but the helpers are the same code path)
|
|
v0.14.2 — file body assembly is contiguous concatenation, no de-duplication
|
|
v0.14.3 — partial DLE stuffing of `0x10` bytes in 5A params (counter=0x1000
|
|
wire bytes are `10 10 00`, not `10 00`)
|
|
|
|
If any of these fixes regresses, this test fails immediately with a clear
|
|
byte-level diff.
|
|
|
|
Run:
|
|
python -m pytest tests/test_5a_protocol.py -v
|
|
or:
|
|
python tests/test_5a_protocol.py
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
|
|
import pytest
|
|
|
|
# Allow running from the project root without installation
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from minimateplus.framing import (
|
|
S3FrameParser,
|
|
build_5a_frame,
|
|
bulk_waveform_params,
|
|
bulk_waveform_term_v2,
|
|
)
|
|
|
|
|
|
# ── Capture loading ────────────────────────────────────────────────────────────
|
|
|
|
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
# Reference BW MITM capture: BW saving a 3-sec event 0 (start_key=01110000,
|
|
# end_offset=0x21F2). 17 5A frames: probe + 2 metadata pages + 13 samples + TERM.
|
|
BW_TX_PATH = os.path.join(
|
|
ROOT,
|
|
"bridges/captures/5-1-26/comcheck/bwcap3sec/"
|
|
"raw_bw_20260501_165723_copy_3sec_waveform_to_disk.bin",
|
|
)
|
|
BW_S3_PATH = os.path.join(
|
|
ROOT,
|
|
"bridges/captures/5-1-26/comcheck/bwcap3sec/"
|
|
"raw_s3_20260501_165723_copy_3sec_waveform_to_disk.bin",
|
|
)
|
|
|
|
# BW's saved Blastware file for the same event (used for file-builder verification).
|
|
BW_SAVED_FILE = os.path.join(
|
|
ROOT, "example-events/decode_test/5-1-26/bw/M529LKIQ.G10",
|
|
)
|
|
|
|
|
|
def _split_bw_frames(data: bytes) -> list[bytes]:
|
|
"""Split BW TX bytes into individual frames (ACK STX … bare ETX)."""
|
|
frames: list[bytes] = []
|
|
i = 0
|
|
while i < len(data):
|
|
if data[i] != 0x41 or i + 1 >= len(data) or data[i + 1] != 0x02:
|
|
i += 1
|
|
continue
|
|
j = i + 2
|
|
while j < len(data):
|
|
if data[j] == 0x03:
|
|
break
|
|
if data[j] == 0x10 and j + 1 < len(data):
|
|
j += 2
|
|
continue
|
|
j += 1
|
|
if j >= len(data):
|
|
break
|
|
frames.append(data[i : j + 1])
|
|
i = j + 1
|
|
return frames
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def bw_5a_frames() -> list[bytes]:
|
|
"""All 5A frames from the BW TX capture, in wire order."""
|
|
if not os.path.exists(BW_TX_PATH):
|
|
pytest.skip(f"BW capture not found: {BW_TX_PATH}")
|
|
raw = open(BW_TX_PATH, "rb").read()
|
|
frames = [
|
|
f for f in _split_bw_frames(raw)
|
|
if len(f) >= 6 and f[5] == 0x5A # body[3] == 0x5A (SUB)
|
|
]
|
|
assert len(frames) == 17, f"expected 17 5A frames in capture, got {len(frames)}"
|
|
return frames
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def bw_a5_frames():
|
|
"""All A5 (response) frames from the matching S3 capture."""
|
|
if not os.path.exists(BW_S3_PATH):
|
|
pytest.skip(f"BW S3 capture not found: {BW_S3_PATH}")
|
|
raw = open(BW_S3_PATH, "rb").read()
|
|
p = S3FrameParser()
|
|
p.feed(raw)
|
|
a5 = [f for f in p.frames if f.sub == 0xA5]
|
|
assert len(a5) == 17, f"expected 17 A5 frames in capture, got {len(a5)}"
|
|
return a5
|
|
|
|
|
|
# ── 5A request frame byte-perfect verification ────────────────────────────────
|
|
|
|
KEY4 = bytes.fromhex("01110000") # start_key for the 3-sec event 0
|
|
END_OFFSET = 0x21F2 # parsed from STRT in the BW capture
|
|
LAST_CHUNK_COUNTER = 0x1E00 # last full 0x0200-byte chunk before TERM
|
|
|
|
SAMPLE_COUNTERS = (
|
|
0x0600, 0x0800, 0x0A00, 0x0C00, 0x0E00,
|
|
0x1000, 0x1200, 0x1400, 0x1600, 0x1800,
|
|
0x1A00, 0x1C00, 0x1E00,
|
|
)
|
|
|
|
|
|
def _meta_params(key: bytes, counter: int) -> bytes:
|
|
"""Build the 12-byte metadata-page params block (matches BW for 0x1002 / 0x1004)."""
|
|
return bytes(
|
|
[
|
|
0x00, key[0], key[1],
|
|
(counter >> 8) & 0xFF, counter & 0xFF,
|
|
0, 0, 0, 0, 0, 0, 0,
|
|
]
|
|
)
|
|
|
|
|
|
def test_probe_frame_byte_perfect(bw_5a_frames):
|
|
"""Probe @ counter=0x0000 (frame 0)."""
|
|
sfm = build_5a_frame(0x1002, bulk_waveform_params(KEY4, 0, is_probe=True))
|
|
assert sfm == bw_5a_frames[0], (
|
|
f"\nSFM: {sfm.hex()}\nBW: {bw_5a_frames[0].hex()}"
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("idx,counter", [(1, 0x1002), (2, 0x1004)])
|
|
def test_metadata_page_frames_byte_perfect(bw_5a_frames, idx, counter):
|
|
"""Metadata pages @ counter=0x1002 and 0x1004 (frames 1 and 2)."""
|
|
sfm = build_5a_frame(0x1002, _meta_params(KEY4, counter))
|
|
assert sfm == bw_5a_frames[idx], (
|
|
f"\nSFM: {sfm.hex()}\nBW: {bw_5a_frames[idx].hex()}"
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("i,counter", list(enumerate(SAMPLE_COUNTERS)))
|
|
def test_sample_chunk_frames_byte_perfect(bw_5a_frames, i, counter):
|
|
"""
|
|
Sample chunks @ counter=0x0600..0x1E00, step 0x0200 (frames 3..15).
|
|
|
|
Critically, frame 8 (counter=0x1000) requires the v0.14.3 partial DLE
|
|
stuffing fix — wire params include `10 10 00` for the counter, not `10 00`.
|
|
"""
|
|
sfm = build_5a_frame(0x1002, bulk_waveform_params(KEY4, counter))
|
|
bw_idx = 3 + i
|
|
assert sfm == bw_5a_frames[bw_idx], (
|
|
f"\ncounter=0x{counter:04X}"
|
|
f"\nSFM: {sfm.hex()}"
|
|
f"\nBW: {bw_5a_frames[bw_idx].hex()}"
|
|
)
|
|
|
|
|
|
def test_term_frame_byte_perfect(bw_5a_frames):
|
|
"""TERM frame at residual (frame 16)."""
|
|
offset_word, params = bulk_waveform_term_v2(KEY4, END_OFFSET, LAST_CHUNK_COUNTER)
|
|
sfm = build_5a_frame(offset_word, params)
|
|
assert sfm == bw_5a_frames[16], (
|
|
f"\nSFM: {sfm.hex()}\nBW: {bw_5a_frames[16].hex()}"
|
|
)
|
|
|
|
|
|
def test_strt_end_offset_parsing(bw_a5_frames):
|
|
"""The probe response (A5[0]) carries STRT at byte 17 with end_offset=0x21F2."""
|
|
from minimateplus.framing import parse_strt_end_offset
|
|
|
|
end_offset = parse_strt_end_offset(bw_a5_frames[0].data)
|
|
assert end_offset == END_OFFSET, (
|
|
f"expected end_offset=0x{END_OFFSET:04X}, got "
|
|
f"{f'0x{end_offset:04X}' if end_offset is not None else 'None'}"
|
|
)
|
|
|
|
|
|
# ── File builder byte-perfect verification ────────────────────────────────────
|
|
|
|
def test_blastware_file_builder_byte_perfect(bw_a5_frames):
|
|
"""
|
|
Feed the BW capture's A5 frames into write_blastware_file() and verify the
|
|
output is byte-identical to BW's saved M529LKIQ.G10 reference file.
|
|
|
|
This protects the v0.14.2 strip-removal fix and the file-builder skip
|
|
values (probe=38, meta=13, samples=12, TERM=11).
|
|
"""
|
|
if not os.path.exists(BW_SAVED_FILE):
|
|
pytest.skip(f"BW saved file not found: {BW_SAVED_FILE}")
|
|
|
|
import tempfile
|
|
|
|
from minimateplus.blastware_file import write_blastware_file
|
|
from minimateplus.models import Event
|
|
|
|
ev = Event(index=0)
|
|
ev._waveform_key = KEY4
|
|
ev.rectime_seconds = 3
|
|
ev.timestamp = None # let the builder pull the footer from the TERM frame
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".G10", delete=False) as tf:
|
|
tmp_path = tf.name
|
|
try:
|
|
write_blastware_file(ev, bw_a5_frames, tmp_path)
|
|
sfm_bytes = open(tmp_path, "rb").read()
|
|
finally:
|
|
os.unlink(tmp_path)
|
|
|
|
bw_bytes = open(BW_SAVED_FILE, "rb").read()
|
|
|
|
assert len(sfm_bytes) == len(bw_bytes), (
|
|
f"file size mismatch: SFM={len(sfm_bytes)} BW={len(bw_bytes)}"
|
|
)
|
|
|
|
if sfm_bytes != bw_bytes:
|
|
# Find first diff for actionable error message
|
|
for i in range(len(bw_bytes)):
|
|
if bw_bytes[i] != sfm_bytes[i]:
|
|
ctx_start = max(0, i - 8)
|
|
ctx_end = min(len(bw_bytes), i + 16)
|
|
pytest.fail(
|
|
f"file diverges at byte 0x{i:04X}\n"
|
|
f" BW : {bw_bytes[ctx_start:ctx_end].hex()}\n"
|
|
f" SFM: {sfm_bytes[ctx_start:ctx_end].hex()}\n"
|
|
f" {' ' * (i - ctx_start)}^^"
|
|
)
|
|
|
|
|
|
# ── Standalone runner ─────────────────────────────────────────────────────────
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(pytest.main([__file__, "-v"]))
|