feat: updates to 0.8.0 - initial write functions

This commit is contained in:
2026-04-07 02:09:29 -04:00
parent c2ab94f20c
commit bcc044655a
6 changed files with 1083 additions and 21 deletions
+436
View File
@@ -0,0 +1,436 @@
"""
test_write_frames.py — Verify write frame construction against BW capture.
Validates that build_bw_write_frame() reproduces the exact wire bytes that
Blastware sent during the 3-11-26/170151 compliance-config write session.
Frames tested (BW TX frame indices 102112):
102 — SUB 0x68 event index write
103 — SUB 0x73 confirm B
104 — SUB 0x71 compliance write chunk 1
105 — SUB 0x71 compliance write chunk 2
106 — SUB 0x71 compliance write chunk 3
107 — SUB 0x72 confirm A
108 — SUB 0x82 trigger config write
109 — SUB 0x83 trigger confirm
110 — SUB 0x69 waveform data write
111 — SUB 0x74 confirm C
112 — SUB 0x72 confirm A (end of sequence)
Run:
python -m pytest tests/test_write_frames.py -v
or:
python tests/test_write_frames.py
"""
from __future__ import annotations
import os
import sys
import pytest
# Allow running from the project root without installation
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from minimateplus.framing import build_bw_write_frame
# ── Capture loading ────────────────────────────────────────────────────────────
CAPTURE_PATH = os.path.join(
os.path.dirname(__file__),
"..",
"bridges",
"captures",
"3-11-26",
"raw_bw_20260311_170151.bin",
)
def _load_bw_frames(path: str) -> list[bytes]:
"""
Parse a raw BW capture file into a list of BW frames.
BW frames start with ACK=0x41 followed by STX=0x02. The frame boundary is
the position of the NEXT 0x41 0x02 sequence (the ETX=0x03 terminator is the
last byte before the next frame start).
NOTE: A naive scan for ETX=0x03 fails because 0x03 can appear inside the
DLE-stuffed payload. This parser uses consecutive 0x41 0x02 starts as
boundaries, which is safe because the ACK byte (0x41) is never DLE-stuffed.
"""
with open(path, "rb") as f:
raw = f.read()
boundaries: list[int] = []
i = 0
while i < len(raw) - 1:
if raw[i] == 0x41 and raw[i + 1] == 0x02:
boundaries.append(i)
i += 1
boundaries.append(len(raw))
frames = []
for k in range(len(boundaries) - 1):
frames.append(raw[boundaries[k] : boundaries[k + 1]])
return frames
def _destuff(data: bytes) -> bytes:
"""Undo DLE stuffing: replace every 0x10 0x10 pair with a single 0x10."""
result = bytearray()
k = 0
while k < len(data):
if data[k] == 0x10 and k + 1 < len(data) and data[k + 1] == 0x10:
result.append(0x10)
k += 2
else:
result.append(data[k])
k += 1
return bytes(result)
def _decode_bw_frame(wire: bytes) -> tuple[int, int, bytes, bytes, int]:
"""
Decode a BW wire frame into its components.
Returns:
(sub, offset, params, data, chk)
sub — SUB byte (payload[2])
offset — uint16 from payload[4:6]
params — 10-byte params field (payload[6:16])
data — write payload bytes (payload[16:-1])
chk — checksum byte (payload[-1])
"""
inner = wire[2:-1] # strip ACK+STX and trailing ETX
payload = _destuff(inner)
sub = payload[2]
offset = (payload[4] << 8) | payload[5]
params = payload[6:16]
data = payload[16:-1]
chk = payload[-1]
return sub, offset, params, data, chk
# ── Test fixtures ──────────────────────────────────────────────────────────────
@pytest.fixture(scope="module")
def bw_frames() -> list[bytes]:
if not os.path.exists(CAPTURE_PATH):
pytest.skip(f"Capture file not found: {CAPTURE_PATH}")
return _load_bw_frames(CAPTURE_PATH)
# ── Individual frame tests ─────────────────────────────────────────────────────
class TestWriteFrameReconstruction:
"""Verify build_bw_write_frame() reproduces the exact wire bytes from the capture."""
def test_frame_102_event_index_write_sub68(self, bw_frames: list[bytes]) -> None:
"""SUB 0x68 — event index write (frame 102)."""
cap_wire = bw_frames[102]
sub_cap, offset_cap, params_cap, data_cap, chk_cap = _decode_bw_frame(cap_wire)
assert sub_cap == 0x68
assert params_cap == bytes(10)
# Reconstruct using build_bw_write_frame with the same data and offset
built = build_bw_write_frame(0x68, data_cap, offset=offset_cap, params=params_cap)
assert built == cap_wire, (
f"SUB 0x68 wire mismatch\n"
f" built: {built.hex()}\n"
f" capt: {cap_wire.hex()}"
)
def test_frame_103_confirm_b_sub73(self, bw_frames: list[bytes]) -> None:
"""SUB 0x73 — confirm B (zero-data confirm frame 103)."""
cap_wire = bw_frames[103]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x73
assert data_cap == b""
assert offset_cap == 0x0000
built = build_bw_write_frame(0x73, b"")
assert built == cap_wire
def test_frame_104_compliance_chunk1_sub71(self, bw_frames: list[bytes]) -> None:
"""SUB 0x71 chunk 1 — 1027-byte compliance write (frame 104)."""
cap_wire = bw_frames[104]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x71
assert offset_cap == 0x1004
assert params_cap == bytes(10)
assert len(data_cap) == 1027
built = build_bw_write_frame(
0x71, data_cap,
offset=0x1004,
params=bytes(10),
)
assert built == cap_wire
def test_frame_105_compliance_chunk2_sub71(self, bw_frames: list[bytes]) -> None:
"""SUB 0x71 chunk 2 — 1055-byte compliance write (frame 105)."""
cap_wire = bw_frames[105]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
_CHUNK2_PARAMS = bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00])
assert sub_cap == 0x71
assert offset_cap == 0x1004
assert params_cap == _CHUNK2_PARAMS
assert len(data_cap) == 1055
built = build_bw_write_frame(
0x71, data_cap,
offset=0x1004,
params=_CHUNK2_PARAMS,
)
assert built == cap_wire
def test_frame_106_compliance_chunk3_sub71(self, bw_frames: list[bytes]) -> None:
"""SUB 0x71 chunk 3 — 46-byte compliance write (frame 106)."""
cap_wire = bw_frames[106]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
_CHUNK3_PARAMS = bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
assert sub_cap == 0x71
assert offset_cap == 0x002C
assert params_cap == _CHUNK3_PARAMS
assert len(data_cap) == 46
built = build_bw_write_frame(
0x71, data_cap,
offset=0x002C,
params=_CHUNK3_PARAMS,
)
assert built == cap_wire
def test_frame_107_confirm_a_sub72(self, bw_frames: list[bytes]) -> None:
"""SUB 0x72 — confirm A after compliance write (frame 107)."""
cap_wire = bw_frames[107]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x72
assert data_cap == b""
assert offset_cap == 0x0000
built = build_bw_write_frame(0x72, b"")
assert built == cap_wire
def test_frame_108_trigger_config_write_sub82(self, bw_frames: list[bytes]) -> None:
"""SUB 0x82 — trigger config write (frame 108)."""
cap_wire = bw_frames[108]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x82
assert params_cap == bytes(10)
assert len(data_cap) == 29
# Verify offset formula: data[1] + 2
assert offset_cap == data_cap[1] + 2, (
f"Trigger write offset formula mismatch: "
f"data[1]={data_cap[1]} → expected {data_cap[1]+2}, got {offset_cap}"
)
built = build_bw_write_frame(
0x82, data_cap,
offset=offset_cap,
params=params_cap,
)
assert built == cap_wire
def test_frame_109_trigger_confirm_sub83(self, bw_frames: list[bytes]) -> None:
"""SUB 0x83 — trigger confirm (frame 109)."""
cap_wire = bw_frames[109]
sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x83
assert data_cap == b""
built = build_bw_write_frame(0x83, b"")
assert built == cap_wire
def test_frame_110_waveform_data_write_sub69(self, bw_frames: list[bytes]) -> None:
"""SUB 0x69 — waveform data write (frame 110)."""
cap_wire = bw_frames[110]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x69
assert params_cap == bytes(10)
assert len(data_cap) == 204
# Verify offset formula: data[1] + 2
assert offset_cap == data_cap[1] + 2, (
f"Waveform write offset formula mismatch: "
f"data[1]={data_cap[1]} → expected {data_cap[1]+2}, got {offset_cap}"
)
built = build_bw_write_frame(
0x69, data_cap,
offset=offset_cap,
params=params_cap,
)
assert built == cap_wire
def test_frame_111_confirm_c_sub74(self, bw_frames: list[bytes]) -> None:
"""SUB 0x74 — confirm C after waveform data write (frame 111)."""
cap_wire = bw_frames[111]
sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x74
assert data_cap == b""
built = build_bw_write_frame(0x74, b"")
assert built == cap_wire
def test_frame_112_confirm_a_sub72_end(self, bw_frames: list[bytes]) -> None:
"""SUB 0x72 — final confirm A at end of write sequence (frame 112)."""
cap_wire = bw_frames[112]
sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x72
assert data_cap == b""
built = build_bw_write_frame(0x72, b"")
assert built == cap_wire
class TestOffsetFormula:
"""Verify the offset = data[1] + 2 formula for single-chunk write commands."""
def test_event_index_offset_formula(self, bw_frames: list[bytes]) -> None:
"""Frame 102 (SUB 0x68): offset = data[1] + 2."""
_, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[102])
assert offset_cap == data_cap[1] + 2
def test_trigger_config_offset_formula(self, bw_frames: list[bytes]) -> None:
"""Frame 108 (SUB 0x82): offset = data[1] + 2."""
_, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[108])
assert offset_cap == data_cap[1] + 2
def test_waveform_data_offset_formula(self, bw_frames: list[bytes]) -> None:
"""Frame 110 (SUB 0x69): offset = data[1] + 2."""
_, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[110])
assert offset_cap == data_cap[1] + 2
class TestChecksumVerification:
"""Verify large-frame DLE-aware checksum for all write frames."""
def _verify_checksum(self, wire: bytes, label: str) -> None:
inner = wire[2:-1]
payload = _destuff(inner)
chk = payload[-1]
computed = (sum(b for b in payload[2:-1] if b != 0x10) + 0x10) & 0xFF
assert computed == chk, (
f"{label}: checksum mismatch — computed=0x{computed:02X}, got=0x{chk:02X}"
)
def test_all_write_frame_checksums(self, bw_frames: list[bytes]) -> None:
write_frames = {
102: "SUB 0x68 event index write",
103: "SUB 0x73 confirm B",
104: "SUB 0x71 compliance chunk 1",
105: "SUB 0x71 compliance chunk 2",
106: "SUB 0x71 compliance chunk 3",
107: "SUB 0x72 confirm A",
108: "SUB 0x82 trigger config write",
109: "SUB 0x83 trigger confirm",
110: "SUB 0x69 waveform data write",
111: "SUB 0x74 confirm C",
112: "SUB 0x72 confirm A (end)",
}
for idx, label in write_frames.items():
self._verify_checksum(bw_frames[idx], f"Frame {idx} ({label})")
class TestComplianceChunkSizes:
"""Verify compliance write chunk sizes and sequence."""
def test_chunk1_size(self, bw_frames: list[bytes]) -> None:
_, _, _, data, _ = _decode_bw_frame(bw_frames[104])
assert len(data) == 1027, f"Chunk 1 should be 1027 bytes, got {len(data)}"
def test_chunk2_size(self, bw_frames: list[bytes]) -> None:
_, _, _, data, _ = _decode_bw_frame(bw_frames[105])
assert len(data) == 1055, f"Chunk 2 should be 1055 bytes, got {len(data)}"
def test_chunk3_size(self, bw_frames: list[bytes]) -> None:
_, _, _, data, _ = _decode_bw_frame(bw_frames[106])
assert len(data) == 46, f"Chunk 3 should be 46 bytes, got {len(data)}"
def test_total_compliance_data(self, bw_frames: list[bytes]) -> None:
total = sum(
len(_decode_bw_frame(bw_frames[i])[3]) for i in [104, 105, 106]
)
assert total == 2128, f"Total compliance write data should be 2128 bytes, got {total}"
def test_chunk1_params(self, bw_frames: list[bytes]) -> None:
_, _, params, _, _ = _decode_bw_frame(bw_frames[104])
assert params == bytes(10)
def test_chunk2_params(self, bw_frames: list[bytes]) -> None:
_, _, params, _, _ = _decode_bw_frame(bw_frames[105])
assert params == bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00])
def test_chunk3_params(self, bw_frames: list[bytes]) -> None:
_, _, params, _, _ = _decode_bw_frame(bw_frames[106])
assert params == bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
def test_chunk1_offset(self, bw_frames: list[bytes]) -> None:
_, offset, _, _, _ = _decode_bw_frame(bw_frames[104])
assert offset == 0x1004
def test_chunk2_offset(self, bw_frames: list[bytes]) -> None:
_, offset, _, _, _ = _decode_bw_frame(bw_frames[105])
assert offset == 0x1004
def test_chunk3_offset(self, bw_frames: list[bytes]) -> None:
_, offset, _, _, _ = _decode_bw_frame(bw_frames[106])
assert offset == 0x002C
# ── Standalone runner ──────────────────────────────────────────────────────────
if __name__ == "__main__":
if not os.path.exists(CAPTURE_PATH):
print(f"ERROR: Capture file not found: {CAPTURE_PATH}")
sys.exit(1)
frames = _load_bw_frames(CAPTURE_PATH)
print(f"Loaded {len(frames)} BW frames from capture")
write_frame_indices = list(range(102, 113))
all_pass = True
print()
print(f"{'Frame':>6} {'SUB':>5} {'Offset':>8} {'DataLen':>8} {'Chk OK':>7} {'Rebuilt':>8}")
print("-" * 60)
for idx in write_frame_indices:
wire = frames[idx]
sub, offset, params, data, chk = _decode_bw_frame(wire)
payload = _destuff(wire[2:-1])
computed = (sum(b for b in payload[2:-1] if b != 0x10) + 0x10) & 0xFF
chk_ok = computed == chk
built = build_bw_write_frame(sub, data, offset=offset, params=params)
rebuilt_ok = built == wire
status = "" if (chk_ok and rebuilt_ok) else ""
print(
f" {idx:4d} 0x{sub:02X} 0x{offset:04X} {len(data):8d} "
f"{'' if chk_ok else '':>7} {'' if rebuilt_ok else '':>8} {status}"
)
if not (chk_ok and rebuilt_ok):
all_pass = False
print()
if all_pass:
print("All 11 write frames verified ✅")
else:
print("FAILURES DETECTED ❌")
sys.exit(1)