Files
seismo-relay/tests/test_write_frames.py
T

437 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
test_write_frames.py — Verify write frame construction against BW capture.
Validates that build_bw_write_frame() reproduces the exact wire bytes that
Blastware sent during the 3-11-26/170151 compliance-config write session.
Frames tested (BW TX frame indices 102112):
102 — SUB 0x68 event index write
103 — SUB 0x73 confirm B
104 — SUB 0x71 compliance write chunk 1
105 — SUB 0x71 compliance write chunk 2
106 — SUB 0x71 compliance write chunk 3
107 — SUB 0x72 confirm A
108 — SUB 0x82 trigger config write
109 — SUB 0x83 trigger confirm
110 — SUB 0x69 waveform data write
111 — SUB 0x74 confirm C
112 — SUB 0x72 confirm A (end of sequence)
Run:
python -m pytest tests/test_write_frames.py -v
or:
python tests/test_write_frames.py
"""
from __future__ import annotations
import os
import sys
import pytest
# Allow running from the project root without installation
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from minimateplus.framing import build_bw_write_frame
# ── Capture loading ────────────────────────────────────────────────────────────
CAPTURE_PATH = os.path.join(
os.path.dirname(__file__),
"..",
"bridges",
"captures",
"3-11-26",
"raw_bw_20260311_170151.bin",
)
def _load_bw_frames(path: str) -> list[bytes]:
"""
Parse a raw BW capture file into a list of BW frames.
BW frames start with ACK=0x41 followed by STX=0x02. The frame boundary is
the position of the NEXT 0x41 0x02 sequence (the ETX=0x03 terminator is the
last byte before the next frame start).
NOTE: A naive scan for ETX=0x03 fails because 0x03 can appear inside the
DLE-stuffed payload. This parser uses consecutive 0x41 0x02 starts as
boundaries, which is safe because the ACK byte (0x41) is never DLE-stuffed.
"""
with open(path, "rb") as f:
raw = f.read()
boundaries: list[int] = []
i = 0
while i < len(raw) - 1:
if raw[i] == 0x41 and raw[i + 1] == 0x02:
boundaries.append(i)
i += 1
boundaries.append(len(raw))
frames = []
for k in range(len(boundaries) - 1):
frames.append(raw[boundaries[k] : boundaries[k + 1]])
return frames
def _destuff(data: bytes) -> bytes:
"""Undo DLE stuffing: replace every 0x10 0x10 pair with a single 0x10."""
result = bytearray()
k = 0
while k < len(data):
if data[k] == 0x10 and k + 1 < len(data) and data[k + 1] == 0x10:
result.append(0x10)
k += 2
else:
result.append(data[k])
k += 1
return bytes(result)
def _decode_bw_frame(wire: bytes) -> tuple[int, int, bytes, bytes, int]:
"""
Decode a BW wire frame into its components.
Returns:
(sub, offset, params, data, chk)
sub — SUB byte (payload[2])
offset — uint16 from payload[4:6]
params — 10-byte params field (payload[6:16])
data — write payload bytes (payload[16:-1])
chk — checksum byte (payload[-1])
"""
inner = wire[2:-1] # strip ACK+STX and trailing ETX
payload = _destuff(inner)
sub = payload[2]
offset = (payload[4] << 8) | payload[5]
params = payload[6:16]
data = payload[16:-1]
chk = payload[-1]
return sub, offset, params, data, chk
# ── Test fixtures ──────────────────────────────────────────────────────────────
@pytest.fixture(scope="module")
def bw_frames() -> list[bytes]:
if not os.path.exists(CAPTURE_PATH):
pytest.skip(f"Capture file not found: {CAPTURE_PATH}")
return _load_bw_frames(CAPTURE_PATH)
# ── Individual frame tests ─────────────────────────────────────────────────────
class TestWriteFrameReconstruction:
"""Verify build_bw_write_frame() reproduces the exact wire bytes from the capture."""
def test_frame_102_event_index_write_sub68(self, bw_frames: list[bytes]) -> None:
"""SUB 0x68 — event index write (frame 102)."""
cap_wire = bw_frames[102]
sub_cap, offset_cap, params_cap, data_cap, chk_cap = _decode_bw_frame(cap_wire)
assert sub_cap == 0x68
assert params_cap == bytes(10)
# Reconstruct using build_bw_write_frame with the same data and offset
built = build_bw_write_frame(0x68, data_cap, offset=offset_cap, params=params_cap)
assert built == cap_wire, (
f"SUB 0x68 wire mismatch\n"
f" built: {built.hex()}\n"
f" capt: {cap_wire.hex()}"
)
def test_frame_103_confirm_b_sub73(self, bw_frames: list[bytes]) -> None:
"""SUB 0x73 — confirm B (zero-data confirm frame 103)."""
cap_wire = bw_frames[103]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x73
assert data_cap == b""
assert offset_cap == 0x0000
built = build_bw_write_frame(0x73, b"")
assert built == cap_wire
def test_frame_104_compliance_chunk1_sub71(self, bw_frames: list[bytes]) -> None:
"""SUB 0x71 chunk 1 — 1027-byte compliance write (frame 104)."""
cap_wire = bw_frames[104]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x71
assert offset_cap == 0x1004
assert params_cap == bytes(10)
assert len(data_cap) == 1027
built = build_bw_write_frame(
0x71, data_cap,
offset=0x1004,
params=bytes(10),
)
assert built == cap_wire
def test_frame_105_compliance_chunk2_sub71(self, bw_frames: list[bytes]) -> None:
"""SUB 0x71 chunk 2 — 1055-byte compliance write (frame 105)."""
cap_wire = bw_frames[105]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
_CHUNK2_PARAMS = bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00])
assert sub_cap == 0x71
assert offset_cap == 0x1004
assert params_cap == _CHUNK2_PARAMS
assert len(data_cap) == 1055
built = build_bw_write_frame(
0x71, data_cap,
offset=0x1004,
params=_CHUNK2_PARAMS,
)
assert built == cap_wire
def test_frame_106_compliance_chunk3_sub71(self, bw_frames: list[bytes]) -> None:
"""SUB 0x71 chunk 3 — 46-byte compliance write (frame 106)."""
cap_wire = bw_frames[106]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
_CHUNK3_PARAMS = bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
assert sub_cap == 0x71
assert offset_cap == 0x002C
assert params_cap == _CHUNK3_PARAMS
assert len(data_cap) == 46
built = build_bw_write_frame(
0x71, data_cap,
offset=0x002C,
params=_CHUNK3_PARAMS,
)
assert built == cap_wire
def test_frame_107_confirm_a_sub72(self, bw_frames: list[bytes]) -> None:
"""SUB 0x72 — confirm A after compliance write (frame 107)."""
cap_wire = bw_frames[107]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x72
assert data_cap == b""
assert offset_cap == 0x0000
built = build_bw_write_frame(0x72, b"")
assert built == cap_wire
def test_frame_108_trigger_config_write_sub82(self, bw_frames: list[bytes]) -> None:
"""SUB 0x82 — trigger config write (frame 108)."""
cap_wire = bw_frames[108]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x82
assert params_cap == bytes(10)
assert len(data_cap) == 29
# Verify offset formula: data[1] + 2
assert offset_cap == data_cap[1] + 2, (
f"Trigger write offset formula mismatch: "
f"data[1]={data_cap[1]} → expected {data_cap[1]+2}, got {offset_cap}"
)
built = build_bw_write_frame(
0x82, data_cap,
offset=offset_cap,
params=params_cap,
)
assert built == cap_wire
def test_frame_109_trigger_confirm_sub83(self, bw_frames: list[bytes]) -> None:
"""SUB 0x83 — trigger confirm (frame 109)."""
cap_wire = bw_frames[109]
sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x83
assert data_cap == b""
built = build_bw_write_frame(0x83, b"")
assert built == cap_wire
def test_frame_110_waveform_data_write_sub69(self, bw_frames: list[bytes]) -> None:
"""SUB 0x69 — waveform data write (frame 110)."""
cap_wire = bw_frames[110]
sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x69
assert params_cap == bytes(10)
assert len(data_cap) == 204
# Verify offset formula: data[1] + 2
assert offset_cap == data_cap[1] + 2, (
f"Waveform write offset formula mismatch: "
f"data[1]={data_cap[1]} → expected {data_cap[1]+2}, got {offset_cap}"
)
built = build_bw_write_frame(
0x69, data_cap,
offset=offset_cap,
params=params_cap,
)
assert built == cap_wire
def test_frame_111_confirm_c_sub74(self, bw_frames: list[bytes]) -> None:
"""SUB 0x74 — confirm C after waveform data write (frame 111)."""
cap_wire = bw_frames[111]
sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x74
assert data_cap == b""
built = build_bw_write_frame(0x74, b"")
assert built == cap_wire
def test_frame_112_confirm_a_sub72_end(self, bw_frames: list[bytes]) -> None:
"""SUB 0x72 — final confirm A at end of write sequence (frame 112)."""
cap_wire = bw_frames[112]
sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire)
assert sub_cap == 0x72
assert data_cap == b""
built = build_bw_write_frame(0x72, b"")
assert built == cap_wire
class TestOffsetFormula:
"""Verify the offset = data[1] + 2 formula for single-chunk write commands."""
def test_event_index_offset_formula(self, bw_frames: list[bytes]) -> None:
"""Frame 102 (SUB 0x68): offset = data[1] + 2."""
_, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[102])
assert offset_cap == data_cap[1] + 2
def test_trigger_config_offset_formula(self, bw_frames: list[bytes]) -> None:
"""Frame 108 (SUB 0x82): offset = data[1] + 2."""
_, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[108])
assert offset_cap == data_cap[1] + 2
def test_waveform_data_offset_formula(self, bw_frames: list[bytes]) -> None:
"""Frame 110 (SUB 0x69): offset = data[1] + 2."""
_, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[110])
assert offset_cap == data_cap[1] + 2
class TestChecksumVerification:
"""Verify large-frame DLE-aware checksum for all write frames."""
def _verify_checksum(self, wire: bytes, label: str) -> None:
inner = wire[2:-1]
payload = _destuff(inner)
chk = payload[-1]
computed = (sum(b for b in payload[2:-1] if b != 0x10) + 0x10) & 0xFF
assert computed == chk, (
f"{label}: checksum mismatch — computed=0x{computed:02X}, got=0x{chk:02X}"
)
def test_all_write_frame_checksums(self, bw_frames: list[bytes]) -> None:
write_frames = {
102: "SUB 0x68 event index write",
103: "SUB 0x73 confirm B",
104: "SUB 0x71 compliance chunk 1",
105: "SUB 0x71 compliance chunk 2",
106: "SUB 0x71 compliance chunk 3",
107: "SUB 0x72 confirm A",
108: "SUB 0x82 trigger config write",
109: "SUB 0x83 trigger confirm",
110: "SUB 0x69 waveform data write",
111: "SUB 0x74 confirm C",
112: "SUB 0x72 confirm A (end)",
}
for idx, label in write_frames.items():
self._verify_checksum(bw_frames[idx], f"Frame {idx} ({label})")
class TestComplianceChunkSizes:
"""Verify compliance write chunk sizes and sequence."""
def test_chunk1_size(self, bw_frames: list[bytes]) -> None:
_, _, _, data, _ = _decode_bw_frame(bw_frames[104])
assert len(data) == 1027, f"Chunk 1 should be 1027 bytes, got {len(data)}"
def test_chunk2_size(self, bw_frames: list[bytes]) -> None:
_, _, _, data, _ = _decode_bw_frame(bw_frames[105])
assert len(data) == 1055, f"Chunk 2 should be 1055 bytes, got {len(data)}"
def test_chunk3_size(self, bw_frames: list[bytes]) -> None:
_, _, _, data, _ = _decode_bw_frame(bw_frames[106])
assert len(data) == 46, f"Chunk 3 should be 46 bytes, got {len(data)}"
def test_total_compliance_data(self, bw_frames: list[bytes]) -> None:
total = sum(
len(_decode_bw_frame(bw_frames[i])[3]) for i in [104, 105, 106]
)
assert total == 2128, f"Total compliance write data should be 2128 bytes, got {total}"
def test_chunk1_params(self, bw_frames: list[bytes]) -> None:
_, _, params, _, _ = _decode_bw_frame(bw_frames[104])
assert params == bytes(10)
def test_chunk2_params(self, bw_frames: list[bytes]) -> None:
_, _, params, _, _ = _decode_bw_frame(bw_frames[105])
assert params == bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00])
def test_chunk3_params(self, bw_frames: list[bytes]) -> None:
_, _, params, _, _ = _decode_bw_frame(bw_frames[106])
assert params == bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
def test_chunk1_offset(self, bw_frames: list[bytes]) -> None:
_, offset, _, _, _ = _decode_bw_frame(bw_frames[104])
assert offset == 0x1004
def test_chunk2_offset(self, bw_frames: list[bytes]) -> None:
_, offset, _, _, _ = _decode_bw_frame(bw_frames[105])
assert offset == 0x1004
def test_chunk3_offset(self, bw_frames: list[bytes]) -> None:
_, offset, _, _, _ = _decode_bw_frame(bw_frames[106])
assert offset == 0x002C
# ── Standalone runner ──────────────────────────────────────────────────────────
if __name__ == "__main__":
if not os.path.exists(CAPTURE_PATH):
print(f"ERROR: Capture file not found: {CAPTURE_PATH}")
sys.exit(1)
frames = _load_bw_frames(CAPTURE_PATH)
print(f"Loaded {len(frames)} BW frames from capture")
write_frame_indices = list(range(102, 113))
all_pass = True
print()
print(f"{'Frame':>6} {'SUB':>5} {'Offset':>8} {'DataLen':>8} {'Chk OK':>7} {'Rebuilt':>8}")
print("-" * 60)
for idx in write_frame_indices:
wire = frames[idx]
sub, offset, params, data, chk = _decode_bw_frame(wire)
payload = _destuff(wire[2:-1])
computed = (sum(b for b in payload[2:-1] if b != 0x10) + 0x10) & 0xFF
chk_ok = computed == chk
built = build_bw_write_frame(sub, data, offset=offset, params=params)
rebuilt_ok = built == wire
status = "" if (chk_ok and rebuilt_ok) else ""
print(
f" {idx:4d} 0x{sub:02X} 0x{offset:04X} {len(data):8d} "
f"{'' if chk_ok else '':>7} {'' if rebuilt_ok else '':>8} {status}"
)
if not (chk_ok and rebuilt_ok):
all_pass = False
print()
if all_pass:
print("All 11 write frames verified ✅")
else:
print("FAILURES DETECTED ❌")
sys.exit(1)