437 lines
16 KiB
Python
437 lines
16 KiB
Python
"""
|
||
test_write_frames.py — Verify write frame construction against BW capture.
|
||
|
||
Validates that build_bw_write_frame() reproduces the exact wire bytes that
|
||
Blastware sent during the 3-11-26/170151 compliance-config write session.
|
||
|
||
Frames tested (BW TX frame indices 102–112):
|
||
102 — SUB 0x68 event index write
|
||
103 — SUB 0x73 confirm B
|
||
104 — SUB 0x71 compliance write chunk 1
|
||
105 — SUB 0x71 compliance write chunk 2
|
||
106 — SUB 0x71 compliance write chunk 3
|
||
107 — SUB 0x72 confirm A
|
||
108 — SUB 0x82 trigger config write
|
||
109 — SUB 0x83 trigger confirm
|
||
110 — SUB 0x69 waveform data write
|
||
111 — SUB 0x74 confirm C
|
||
112 — SUB 0x72 confirm A (end of sequence)
|
||
|
||
Run:
|
||
python -m pytest tests/test_write_frames.py -v
|
||
or:
|
||
python tests/test_write_frames.py
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import sys
|
||
|
||
import pytest
|
||
|
||
# Allow running from the project root without installation
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
from minimateplus.framing import build_bw_write_frame
|
||
|
||
|
||
# ── Capture loading ────────────────────────────────────────────────────────────
|
||
|
||
CAPTURE_PATH = os.path.join(
|
||
os.path.dirname(__file__),
|
||
"..",
|
||
"bridges",
|
||
"captures",
|
||
"3-11-26",
|
||
"raw_bw_20260311_170151.bin",
|
||
)
|
||
|
||
|
||
def _load_bw_frames(path: str) -> list[bytes]:
|
||
"""
|
||
Parse a raw BW capture file into a list of BW frames.
|
||
|
||
BW frames start with ACK=0x41 followed by STX=0x02. The frame boundary is
|
||
the position of the NEXT 0x41 0x02 sequence (the ETX=0x03 terminator is the
|
||
last byte before the next frame start).
|
||
|
||
NOTE: A naive scan for ETX=0x03 fails because 0x03 can appear inside the
|
||
DLE-stuffed payload. This parser uses consecutive 0x41 0x02 starts as
|
||
boundaries, which is safe because the ACK byte (0x41) is never DLE-stuffed.
|
||
"""
|
||
with open(path, "rb") as f:
|
||
raw = f.read()
|
||
|
||
boundaries: list[int] = []
|
||
i = 0
|
||
while i < len(raw) - 1:
|
||
if raw[i] == 0x41 and raw[i + 1] == 0x02:
|
||
boundaries.append(i)
|
||
i += 1
|
||
boundaries.append(len(raw))
|
||
|
||
frames = []
|
||
for k in range(len(boundaries) - 1):
|
||
frames.append(raw[boundaries[k] : boundaries[k + 1]])
|
||
return frames
|
||
|
||
|
||
def _destuff(data: bytes) -> bytes:
|
||
"""Undo DLE stuffing: replace every 0x10 0x10 pair with a single 0x10."""
|
||
result = bytearray()
|
||
k = 0
|
||
while k < len(data):
|
||
if data[k] == 0x10 and k + 1 < len(data) and data[k + 1] == 0x10:
|
||
result.append(0x10)
|
||
k += 2
|
||
else:
|
||
result.append(data[k])
|
||
k += 1
|
||
return bytes(result)
|
||
|
||
|
||
def _decode_bw_frame(wire: bytes) -> tuple[int, int, bytes, bytes, int]:
|
||
"""
|
||
Decode a BW wire frame into its components.
|
||
|
||
Returns:
|
||
(sub, offset, params, data, chk)
|
||
sub — SUB byte (payload[2])
|
||
offset — uint16 from payload[4:6]
|
||
params — 10-byte params field (payload[6:16])
|
||
data — write payload bytes (payload[16:-1])
|
||
chk — checksum byte (payload[-1])
|
||
"""
|
||
inner = wire[2:-1] # strip ACK+STX and trailing ETX
|
||
payload = _destuff(inner)
|
||
sub = payload[2]
|
||
offset = (payload[4] << 8) | payload[5]
|
||
params = payload[6:16]
|
||
data = payload[16:-1]
|
||
chk = payload[-1]
|
||
return sub, offset, params, data, chk
|
||
|
||
|
||
# ── Test fixtures ──────────────────────────────────────────────────────────────
|
||
|
||
@pytest.fixture(scope="module")
|
||
def bw_frames() -> list[bytes]:
|
||
if not os.path.exists(CAPTURE_PATH):
|
||
pytest.skip(f"Capture file not found: {CAPTURE_PATH}")
|
||
return _load_bw_frames(CAPTURE_PATH)
|
||
|
||
|
||
# ── Individual frame tests ─────────────────────────────────────────────────────
|
||
|
||
class TestWriteFrameReconstruction:
|
||
"""Verify build_bw_write_frame() reproduces the exact wire bytes from the capture."""
|
||
|
||
def test_frame_102_event_index_write_sub68(self, bw_frames: list[bytes]) -> None:
|
||
"""SUB 0x68 — event index write (frame 102)."""
|
||
cap_wire = bw_frames[102]
|
||
sub_cap, offset_cap, params_cap, data_cap, chk_cap = _decode_bw_frame(cap_wire)
|
||
|
||
assert sub_cap == 0x68
|
||
assert params_cap == bytes(10)
|
||
|
||
# Reconstruct using build_bw_write_frame with the same data and offset
|
||
built = build_bw_write_frame(0x68, data_cap, offset=offset_cap, params=params_cap)
|
||
assert built == cap_wire, (
|
||
f"SUB 0x68 wire mismatch\n"
|
||
f" built: {built.hex()}\n"
|
||
f" capt: {cap_wire.hex()}"
|
||
)
|
||
|
||
def test_frame_103_confirm_b_sub73(self, bw_frames: list[bytes]) -> None:
|
||
"""SUB 0x73 — confirm B (zero-data confirm frame 103)."""
|
||
cap_wire = bw_frames[103]
|
||
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
|
||
|
||
assert sub_cap == 0x73
|
||
assert data_cap == b""
|
||
assert offset_cap == 0x0000
|
||
|
||
built = build_bw_write_frame(0x73, b"")
|
||
assert built == cap_wire
|
||
|
||
def test_frame_104_compliance_chunk1_sub71(self, bw_frames: list[bytes]) -> None:
|
||
"""SUB 0x71 chunk 1 — 1027-byte compliance write (frame 104)."""
|
||
cap_wire = bw_frames[104]
|
||
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
|
||
|
||
assert sub_cap == 0x71
|
||
assert offset_cap == 0x1004
|
||
assert params_cap == bytes(10)
|
||
assert len(data_cap) == 1027
|
||
|
||
built = build_bw_write_frame(
|
||
0x71, data_cap,
|
||
offset=0x1004,
|
||
params=bytes(10),
|
||
)
|
||
assert built == cap_wire
|
||
|
||
def test_frame_105_compliance_chunk2_sub71(self, bw_frames: list[bytes]) -> None:
|
||
"""SUB 0x71 chunk 2 — 1055-byte compliance write (frame 105)."""
|
||
cap_wire = bw_frames[105]
|
||
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
|
||
|
||
_CHUNK2_PARAMS = bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||
|
||
assert sub_cap == 0x71
|
||
assert offset_cap == 0x1004
|
||
assert params_cap == _CHUNK2_PARAMS
|
||
assert len(data_cap) == 1055
|
||
|
||
built = build_bw_write_frame(
|
||
0x71, data_cap,
|
||
offset=0x1004,
|
||
params=_CHUNK2_PARAMS,
|
||
)
|
||
assert built == cap_wire
|
||
|
||
def test_frame_106_compliance_chunk3_sub71(self, bw_frames: list[bytes]) -> None:
|
||
"""SUB 0x71 chunk 3 — 46-byte compliance write (frame 106)."""
|
||
cap_wire = bw_frames[106]
|
||
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
|
||
|
||
_CHUNK3_PARAMS = bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||
|
||
assert sub_cap == 0x71
|
||
assert offset_cap == 0x002C
|
||
assert params_cap == _CHUNK3_PARAMS
|
||
assert len(data_cap) == 46
|
||
|
||
built = build_bw_write_frame(
|
||
0x71, data_cap,
|
||
offset=0x002C,
|
||
params=_CHUNK3_PARAMS,
|
||
)
|
||
assert built == cap_wire
|
||
|
||
def test_frame_107_confirm_a_sub72(self, bw_frames: list[bytes]) -> None:
|
||
"""SUB 0x72 — confirm A after compliance write (frame 107)."""
|
||
cap_wire = bw_frames[107]
|
||
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
|
||
|
||
assert sub_cap == 0x72
|
||
assert data_cap == b""
|
||
assert offset_cap == 0x0000
|
||
|
||
built = build_bw_write_frame(0x72, b"")
|
||
assert built == cap_wire
|
||
|
||
def test_frame_108_trigger_config_write_sub82(self, bw_frames: list[bytes]) -> None:
|
||
"""SUB 0x82 — trigger config write (frame 108)."""
|
||
cap_wire = bw_frames[108]
|
||
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
|
||
|
||
assert sub_cap == 0x82
|
||
assert params_cap == bytes(10)
|
||
assert len(data_cap) == 29
|
||
|
||
# Verify offset formula: data[1] + 2
|
||
assert offset_cap == data_cap[1] + 2, (
|
||
f"Trigger write offset formula mismatch: "
|
||
f"data[1]={data_cap[1]} → expected {data_cap[1]+2}, got {offset_cap}"
|
||
)
|
||
|
||
built = build_bw_write_frame(
|
||
0x82, data_cap,
|
||
offset=offset_cap,
|
||
params=params_cap,
|
||
)
|
||
assert built == cap_wire
|
||
|
||
def test_frame_109_trigger_confirm_sub83(self, bw_frames: list[bytes]) -> None:
|
||
"""SUB 0x83 — trigger confirm (frame 109)."""
|
||
cap_wire = bw_frames[109]
|
||
sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire)
|
||
|
||
assert sub_cap == 0x83
|
||
assert data_cap == b""
|
||
|
||
built = build_bw_write_frame(0x83, b"")
|
||
assert built == cap_wire
|
||
|
||
def test_frame_110_waveform_data_write_sub69(self, bw_frames: list[bytes]) -> None:
|
||
"""SUB 0x69 — waveform data write (frame 110)."""
|
||
cap_wire = bw_frames[110]
|
||
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
|
||
|
||
assert sub_cap == 0x69
|
||
assert params_cap == bytes(10)
|
||
assert len(data_cap) == 204
|
||
|
||
# Verify offset formula: data[1] + 2
|
||
assert offset_cap == data_cap[1] + 2, (
|
||
f"Waveform write offset formula mismatch: "
|
||
f"data[1]={data_cap[1]} → expected {data_cap[1]+2}, got {offset_cap}"
|
||
)
|
||
|
||
built = build_bw_write_frame(
|
||
0x69, data_cap,
|
||
offset=offset_cap,
|
||
params=params_cap,
|
||
)
|
||
assert built == cap_wire
|
||
|
||
def test_frame_111_confirm_c_sub74(self, bw_frames: list[bytes]) -> None:
|
||
"""SUB 0x74 — confirm C after waveform data write (frame 111)."""
|
||
cap_wire = bw_frames[111]
|
||
sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire)
|
||
|
||
assert sub_cap == 0x74
|
||
assert data_cap == b""
|
||
|
||
built = build_bw_write_frame(0x74, b"")
|
||
assert built == cap_wire
|
||
|
||
def test_frame_112_confirm_a_sub72_end(self, bw_frames: list[bytes]) -> None:
|
||
"""SUB 0x72 — final confirm A at end of write sequence (frame 112)."""
|
||
cap_wire = bw_frames[112]
|
||
sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire)
|
||
|
||
assert sub_cap == 0x72
|
||
assert data_cap == b""
|
||
|
||
built = build_bw_write_frame(0x72, b"")
|
||
assert built == cap_wire
|
||
|
||
|
||
class TestOffsetFormula:
|
||
"""Verify the offset = data[1] + 2 formula for single-chunk write commands."""
|
||
|
||
def test_event_index_offset_formula(self, bw_frames: list[bytes]) -> None:
|
||
"""Frame 102 (SUB 0x68): offset = data[1] + 2."""
|
||
_, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[102])
|
||
assert offset_cap == data_cap[1] + 2
|
||
|
||
def test_trigger_config_offset_formula(self, bw_frames: list[bytes]) -> None:
|
||
"""Frame 108 (SUB 0x82): offset = data[1] + 2."""
|
||
_, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[108])
|
||
assert offset_cap == data_cap[1] + 2
|
||
|
||
def test_waveform_data_offset_formula(self, bw_frames: list[bytes]) -> None:
|
||
"""Frame 110 (SUB 0x69): offset = data[1] + 2."""
|
||
_, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[110])
|
||
assert offset_cap == data_cap[1] + 2
|
||
|
||
|
||
class TestChecksumVerification:
|
||
"""Verify large-frame DLE-aware checksum for all write frames."""
|
||
|
||
def _verify_checksum(self, wire: bytes, label: str) -> None:
|
||
inner = wire[2:-1]
|
||
payload = _destuff(inner)
|
||
chk = payload[-1]
|
||
computed = (sum(b for b in payload[2:-1] if b != 0x10) + 0x10) & 0xFF
|
||
assert computed == chk, (
|
||
f"{label}: checksum mismatch — computed=0x{computed:02X}, got=0x{chk:02X}"
|
||
)
|
||
|
||
def test_all_write_frame_checksums(self, bw_frames: list[bytes]) -> None:
|
||
write_frames = {
|
||
102: "SUB 0x68 event index write",
|
||
103: "SUB 0x73 confirm B",
|
||
104: "SUB 0x71 compliance chunk 1",
|
||
105: "SUB 0x71 compliance chunk 2",
|
||
106: "SUB 0x71 compliance chunk 3",
|
||
107: "SUB 0x72 confirm A",
|
||
108: "SUB 0x82 trigger config write",
|
||
109: "SUB 0x83 trigger confirm",
|
||
110: "SUB 0x69 waveform data write",
|
||
111: "SUB 0x74 confirm C",
|
||
112: "SUB 0x72 confirm A (end)",
|
||
}
|
||
for idx, label in write_frames.items():
|
||
self._verify_checksum(bw_frames[idx], f"Frame {idx} ({label})")
|
||
|
||
|
||
class TestComplianceChunkSizes:
|
||
"""Verify compliance write chunk sizes and sequence."""
|
||
|
||
def test_chunk1_size(self, bw_frames: list[bytes]) -> None:
|
||
_, _, _, data, _ = _decode_bw_frame(bw_frames[104])
|
||
assert len(data) == 1027, f"Chunk 1 should be 1027 bytes, got {len(data)}"
|
||
|
||
def test_chunk2_size(self, bw_frames: list[bytes]) -> None:
|
||
_, _, _, data, _ = _decode_bw_frame(bw_frames[105])
|
||
assert len(data) == 1055, f"Chunk 2 should be 1055 bytes, got {len(data)}"
|
||
|
||
def test_chunk3_size(self, bw_frames: list[bytes]) -> None:
|
||
_, _, _, data, _ = _decode_bw_frame(bw_frames[106])
|
||
assert len(data) == 46, f"Chunk 3 should be 46 bytes, got {len(data)}"
|
||
|
||
def test_total_compliance_data(self, bw_frames: list[bytes]) -> None:
|
||
total = sum(
|
||
len(_decode_bw_frame(bw_frames[i])[3]) for i in [104, 105, 106]
|
||
)
|
||
assert total == 2128, f"Total compliance write data should be 2128 bytes, got {total}"
|
||
|
||
def test_chunk1_params(self, bw_frames: list[bytes]) -> None:
|
||
_, _, params, _, _ = _decode_bw_frame(bw_frames[104])
|
||
assert params == bytes(10)
|
||
|
||
def test_chunk2_params(self, bw_frames: list[bytes]) -> None:
|
||
_, _, params, _, _ = _decode_bw_frame(bw_frames[105])
|
||
assert params == bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||
|
||
def test_chunk3_params(self, bw_frames: list[bytes]) -> None:
|
||
_, _, params, _, _ = _decode_bw_frame(bw_frames[106])
|
||
assert params == bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||
|
||
def test_chunk1_offset(self, bw_frames: list[bytes]) -> None:
|
||
_, offset, _, _, _ = _decode_bw_frame(bw_frames[104])
|
||
assert offset == 0x1004
|
||
|
||
def test_chunk2_offset(self, bw_frames: list[bytes]) -> None:
|
||
_, offset, _, _, _ = _decode_bw_frame(bw_frames[105])
|
||
assert offset == 0x1004
|
||
|
||
def test_chunk3_offset(self, bw_frames: list[bytes]) -> None:
|
||
_, offset, _, _, _ = _decode_bw_frame(bw_frames[106])
|
||
assert offset == 0x002C
|
||
|
||
|
||
# ── Standalone runner ──────────────────────────────────────────────────────────
|
||
|
||
if __name__ == "__main__":
|
||
if not os.path.exists(CAPTURE_PATH):
|
||
print(f"ERROR: Capture file not found: {CAPTURE_PATH}")
|
||
sys.exit(1)
|
||
|
||
frames = _load_bw_frames(CAPTURE_PATH)
|
||
print(f"Loaded {len(frames)} BW frames from capture")
|
||
|
||
write_frame_indices = list(range(102, 113))
|
||
all_pass = True
|
||
print()
|
||
print(f"{'Frame':>6} {'SUB':>5} {'Offset':>8} {'DataLen':>8} {'Chk OK':>7} {'Rebuilt':>8}")
|
||
print("-" * 60)
|
||
for idx in write_frame_indices:
|
||
wire = frames[idx]
|
||
sub, offset, params, data, chk = _decode_bw_frame(wire)
|
||
payload = _destuff(wire[2:-1])
|
||
computed = (sum(b for b in payload[2:-1] if b != 0x10) + 0x10) & 0xFF
|
||
chk_ok = computed == chk
|
||
|
||
built = build_bw_write_frame(sub, data, offset=offset, params=params)
|
||
rebuilt_ok = built == wire
|
||
|
||
status = "✅" if (chk_ok and rebuilt_ok) else "❌"
|
||
print(
|
||
f" {idx:4d} 0x{sub:02X} 0x{offset:04X} {len(data):8d} "
|
||
f"{'✅' if chk_ok else '❌':>7} {'✅' if rebuilt_ok else '❌':>8} {status}"
|
||
)
|
||
if not (chk_ok and rebuilt_ok):
|
||
all_pass = False
|
||
|
||
print()
|
||
if all_pass:
|
||
print("All 11 write frames verified ✅")
|
||
else:
|
||
print("FAILURES DETECTED ❌")
|
||
sys.exit(1)
|