diff --git a/CLAUDE.md b/CLAUDE.md index 59e7884..b11efab 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -2,7 +2,7 @@ Ground-up Python replacement for **Blastware**, Instantel's Windows-only software for managing MiniMate Plus seismographs. Connects over direct RS-232 or cellular modem -(Sierra Wireless RV50 / RV55). Current version: **v0.7.0**. +(Sierra Wireless RV50 / RV55). Current version: **v0.8.0**. --- @@ -25,9 +25,9 @@ CHANGELOG.md ← version history --- -## Current implementation state (v0.7.0) +## Current implementation state (v0.8.0) -Full read pipeline working end-to-end over TCP/cellular: +Full read pipeline + write pipeline working end-to-end over TCP/cellular: | Step | SUB | Status | |---|---|---| @@ -39,12 +39,14 @@ Full read pipeline working end-to-end over TCP/cellular: | Event header / first key | 1E | ✅ | | Waveform header | 0A | ✅ | | Waveform record (peaks, timestamp, project) | 0C | ✅ | -| **Bulk waveform stream (event-time metadata)** | **5A** | ✅ **new v0.6.0** | +| **Bulk waveform stream (event-time metadata)** | **5A** | ✅ new v0.6.0 | | Event advance / next key | 1F | ✅ | -| Write commands (push config to device) | 68–83 | ❌ not yet implemented | +| **Write commands (push config to device)** | **68–83** | ✅ **new v0.8.0** | `get_events()` sequence per event: `1E → 0A → 0C → 5A → 1F` +`push_config_raw()` write sequence: `68→73 | 71×3→72 | 82→83 | 69→74→72` + --- ## Protocol fundamentals @@ -432,8 +434,144 @@ Server retries once on `ProtocolError` for TCP connections (handles cold-boot ti --- +## Write commands (SUBs 68–83) — confirmed 2026-04-07 + +All confirmed from 3-11-26 BW TX capture (`raw_bw_20260311_170151.bin`, frames 102–112). + +### Write frame format — CRITICAL: minimal DLE stuffing + +Write frames do NOT use the same DLE stuffing as read frames. **Only the BW_CMD byte +(0x10 at payload position [0]) is doubled on the wire. All other bytes — flags, sub, +offset, params, data, and checksum — are written RAW without stuffing.** + +Confirmed from all 11 write frames in the 3-11-26/170151 BW capture. ✅ 2026-04-07 + +Do NOT use `dle_stuff()` or `build_bw_frame()` for write commands. Use `build_bw_write_frame()`. + +``` +Actual wire layout: + [41] ACK + [02] STX + [10 10] BW_CMD doubled (ONLY DLE stuffing applied) + [00] flags + [sub] write command byte (0x68–0x83) + [00] always zero + [hi][lo] offset uint16 BE — RAW (not stuffed even if hi=0x10) + [params] 10 bytes — RAW + [data] variable-length write payload — RAW (0x10 bytes not stuffed) + [chk] checksum — RAW (not stuffed even if 0x10) + [03] ETX + +Total wire length = 2 (ACK+STX) + 2 (doubled BW_CMD) + 15 (raw header) + len(data) + 1 (chk) + 1 (ETX) + = 21 + len(data) +``` + +De-stuffed payload (logical; used for checksum computation only): +``` + [0] BW_CMD 0x10 + [1] flags 0x00 + [2] SUB write command byte (0x68–0x83) + [3] 0x00 always zero + [4] offset_hi + [5] offset_lo + [6:16] params 10-byte field (see per-SUB notes below) + [16:] data write payload (variable length; absent for confirm frames) + [-1] chk large-frame DLE-aware checksum (see below) +``` + +Write SUBs = Read SUB + 0x60. Response SUB follows the standard 0xFF − Request SUB rule. + +### Write frame checksum + +All write frames (data frames AND confirm frames) use the **large-frame DLE-aware checksum**: + +```python +chk = (sum(b for b in payload[2:] if b != 0x10) + 0x10) & 0xFF +``` + +This is identical to the SUB 5A DLE-aware checksum. Confirmed against all 11 write frames in +the 3-11-26/170151 capture. ✅ 2026-04-07 + +Note: confirm frames contain no embedded 0x10 bytes, so both the standard SUM8 and the +DLE-aware formula produce the same result for them — but `build_bw_write_frame` always uses +the DLE-aware formula for consistency. + +### Write ack responses + +All device acks for write commands are **17-byte zero-data S3 frames**: + +``` +[DLE=0x10][STX=0x02][stuffed(header + chk)][bare ETX=0x03] +``` + +The data section carries zeros; RSP_SUB = 0xFF − write_request_SUB. + +### Write SUB constants and sequences + +| Request SUB | Function | Offset | Response SUB | +|---|---|---|---| +| 0x68 | Event index write | `data[1] + 2` | 0x97 | +| 0x73 | Confirm B (follows 68) | 0 | 0x8C | +| 0x71 | Compliance write (×3 chunks) | see below | 0x8E | +| 0x72 | Confirm A (follows 71×3, 69) | 0 | 0x8D | +| 0x82 | Trigger config write | `data[1] + 2` | 0x7D | +| 0x83 | Trigger confirm (follows 82) | 0 | 0x7C | +| 0x69 | Waveform data write | `data[1] + 2` | 0x96 | +| 0x74 | Confirm C (follows 69) | 0 | 0x8B | + +**Offset formula for single-chunk writes (0x68, 0x69, 0x82):** `offset = data[1] + 2` + +The write payload always begins with a 2-byte header `[0x00][length]`, where `data[1]` is +an embedded length field. The offset encodes this inner length + 2 (accounting for the +header bytes). Confirmed from all three single-chunk write frames in the 3-11-26 capture: + +| SUB | data[0:4] (hex) | data[1] | offset | total data len | +|---|---|---|---|---| +| 0x68 | `00 58 09 00` | 0x58=88 | 0x5A=90 | 91 | +| 0x82 | `00 1A D5 00` | 0x1A=26 | 0x1C=28 | 29 | +| 0x69 | `00 C8 08 00` | 0xC8=200 | 0xCA=202 | 204 | + +Full sequence: `68→73 | 71×3→72 | 82→83 | 69→74→72` + +### SUB 71 — compliance write chunk parameters + +The full compliance config payload (~2128 bytes) is split into exactly 3 chunks. +Confirmed from 3-11-26 BW TX capture frames 104–108: + +| Chunk | Size | `offset` | `params` (10 bytes hex) | +|---|---|---|---| +| 1 (first) | 1027 bytes | 0x1004 | `00 00 00 00 00 00 00 00 00 00` | +| 2 (middle) | 1055 bytes | 0x1004 | `00 00 00 10 04 00 00 00 00 00` | +| 3 (last) | remainder | 0x002C | `00 00 08 00 00 00 00 00 00 00` | + +Total: 1027 + 1055 + N = 2082 + N bytes (N ≈ 46 for a standard 2128-byte config). + +After all 3 chunks are acked (SUB 0x8E each), send SUB 72 confirm → device acks 0x8D. + +### `build_bw_write_frame()` — framing.py + +```python +build_bw_write_frame(sub, data, *, offset=0, params=bytes(10)) -> bytes +``` + +Use for all write commands (SUBs 68–83) including confirm frames (data=b""). +**Do NOT use `build_bw_frame` for write commands** — it uses standard SUM8, not the +large-frame DLE-aware checksum required for writes. + +### `push_config_raw()` — client.py + +```python +client.push_config_raw(event_index_data, compliance_data, trigger_data, waveform_data) +``` + +Orchestrates the full write sequence in the confirmed order. All payloads are raw bytes +(no encoding performed at this level). A higher-level encoder that builds payloads from +a `ComplianceConfig` object is a future task. + +--- + ## What's next -- Write commands (SUBs 68–83) — push compliance config, channel config, trigger settings to device +- Compliance config encoder — build raw write payloads from a `ComplianceConfig` object - ACH inbound server — accept call-home connections from field units - Modem manager — push RV50/RV55 configs via Sierra Wireless API diff --git a/minimateplus/client.py b/minimateplus/client.py index 8a66ccd..066f319 100644 --- a/minimateplus/client.py +++ b/minimateplus/client.py @@ -45,6 +45,10 @@ from .protocol import MiniMateProtocol, ProtocolError from .protocol import ( SUB_SERIAL_NUMBER, SUB_FULL_CONFIG, + SUB_WRITE_CONFIRM_A, + SUB_WRITE_CONFIRM_B, + SUB_WRITE_CONFIRM_C, + SUB_TRIGGER_CONFIRM, ) from .transport import SerialTransport, BaseTransport @@ -527,6 +531,74 @@ class MiniMateClient: else: log.warning("download_waveform: waveform decode produced no samples") + # ── Write commands ──────────────────────────────────────────────────────── + + def push_config_raw( + self, + event_index_data: bytes, + compliance_data: bytes, + trigger_data: bytes, + waveform_data: bytes, + ) -> None: + """ + Push a complete config update to the device using the confirmed write + sequence from the 3-11-26 BW TX capture. + + This is the raw-bytes interface — callers supply pre-encoded payloads for + each write block. A higher-level method that encodes from ComplianceConfig + and re-reads the current payloads first can be built on top of this. + + Full write sequence (confirmed from 3-11-26 BW TX capture frames 102–112): + + SUB 68 → event index write → ack SUB 0x97 + SUB 73 → confirm B → ack SUB 0x8C + SUB 71 (×3 chunks) → compliance write → each ack SUB 0x8E + SUB 72 → confirm A → ack SUB 0x8D + SUB 82 → trigger config write → ack SUB 0x7D + SUB 83 → trigger confirm → ack SUB 0x7C + SUB 69 → waveform data write → ack SUB 0x96 + SUB 74 → confirm C → ack SUB 0x8B + SUB 72 → confirm A → ack SUB 0x8D + + Args: + event_index_data: Raw bytes for SUB 68 write (88-byte event index). + compliance_data: Raw bytes for SUB 71 write (≥2082 bytes, 3 chunks). + trigger_data: Raw bytes for SUB 82 write (44-byte trigger config). + waveform_data: Raw bytes for SUB 69 write. + + Raises: + RuntimeError: if the client is not connected. + ProtocolError: if any write step fails (timeout, bad ack SUB). + ValueError: if compliance_data is too short for the 3-chunk split. + """ + proto = self._require_proto() + + # 68 → 73 + log.info("push_config_raw: write event index (SUB 68)") + proto.write_event_index(event_index_data) + log.info("push_config_raw: confirm B (SUB 73)") + proto.write_confirm(SUB_WRITE_CONFIRM_B) + + # 71×3 → 72 (handled internally by write_compliance_config_raw) + log.info("push_config_raw: write compliance config (SUB 71 ×3 + confirm 72)") + proto.write_compliance_config_raw(compliance_data) + + # 82 → 83 + log.info("push_config_raw: write trigger config (SUB 82)") + proto.write_trigger_config(trigger_data) + log.info("push_config_raw: trigger confirm (SUB 83)") + proto.write_confirm(SUB_TRIGGER_CONFIRM) + + # 69 → 74 → 72 + log.info("push_config_raw: write waveform data (SUB 69)") + proto.write_waveform_data(waveform_data) + log.info("push_config_raw: confirm C (SUB 74)") + proto.write_confirm(SUB_WRITE_CONFIRM_C) + log.info("push_config_raw: confirm A (SUB 72)") + proto.write_confirm(SUB_WRITE_CONFIRM_A) + + log.info("push_config_raw: complete") + # ── Internal helpers ────────────────────────────────────────────────────── def _require_proto(self) -> MiniMateProtocol: diff --git a/minimateplus/framing.py b/minimateplus/framing.py index 28c4c4f..7ca6e75 100644 --- a/minimateplus/framing.py +++ b/minimateplus/framing.py @@ -194,6 +194,109 @@ def build_bw_frame(sub: int, offset: int = 0, params: bytes = bytes(10)) -> byte return wire +def build_bw_write_frame( + sub: int, + data: bytes, + *, + offset: int = 0, + params: bytes = bytes(10), +) -> bytes: + """ + Build a BW→S3 write-command frame. + + Write frames extend the standard 16-byte read header with a variable-length + data payload. They use a different checksum formula from read frames. + + **CRITICAL: Write frames use minimal DLE stuffing.** + + Unlike read frames (build_bw_frame), write frames do NOT apply full DLE + stuffing to the payload. Only the BW_CMD byte (0x10) at position [0] is + doubled to 0x10 0x10 on the wire. All other bytes — flags, sub, offset, + params, data, and checksum — are written RAW with no stuffing, even if they + contain 0x10 bytes (e.g. offset_hi=0x10 for compliance chunks, or 0x10 + bytes in the write data payload). + + Confirmed from 3-11-26 BW TX capture (frames 102–112): all 11 write frames + match the rule "double BW_CMD only; everything else raw." ✅ 2026-04-07. + + Wire layout: + [41] ACK + [02] STX + [10 10] BW_CMD doubled (the ONLY DLE stuffing applied) + [00] flags + [sub] write command byte (0x68–0x83) + [00] always zero + [hi][lo] offset as uint16 BE (raw; NOT stuffed even if hi=0x10) + [params] 10 bytes (raw) + [data] variable-length write payload (raw; NOT stuffed) + [chk] checksum byte (raw; NOT stuffed even if 0x10) + [03] ETX + + De-stuffed payload (for checksum computation): + [0] BW_CMD 0x10 + [1] flags 0x00 + [2] SUB write command byte + [3] 0x00 always zero + [4] offset_hi + [5] offset_lo + [6:16] params 10 bytes + [16:] data write payload + [-1] chk + + **Checksum formula (confirmed 2026-03-12 from 3-11-26 BW TX capture):** + chk = (sum(b for b in payload[2:] if b != 0x10) + 0x10) % 256 + where payload = destuffed content BEFORE appending chk. + This skips all 0x10 bytes in payload[2:] (sub onwards), including any + 0x10 bytes in the offset, params, data, and the checksum byte itself. + + The offset field [4:6] meaning per write SUB: + - SUBs 68, 69, 82 (single-chunk writes): offset = data[1] + 2, where + data[1] is an embedded length field in the write payload. + Confirmed from capture: 68→0x5A (data[1]=0x58+2), 82→0x1C + (data[1]=0x1A+2), 69→0xCA (data[1]=0xC8+2). + - SUB 71 (multi-chunk compliance): 0x1004 for full chunks, 0x002C + for the final partial chunk. + - Confirm frames (72, 73, 74, 83): offset=0, no data. + + Args: + sub: Write command SUB byte. + data: Write payload (variable length; empty for confirm frames). + offset: 16-bit value placed at [4:6]. See per-SUB notes above. + params: 10 bytes placed at [6:16]. All-zero for most writes; compliance + chunk writes use chunk-specific values. + + Returns: + Complete frame bytes ready to write to the transport. + """ + if len(params) != 10: + raise ValueError(f"params must be exactly 10 bytes, got {len(params)}") + if offset > 0xFFFF: + raise ValueError(f"offset must fit in uint16, got {offset:#06x}") + + offset_hi = (offset >> 8) & 0xFF + offset_lo = offset & 0xFF + + # Destuffed payload (used only for checksum; not sent directly) + payload_no_chk = bytes([BW_CMD, 0x00, sub, 0x00, offset_hi, offset_lo]) + params + data + + # Large-frame checksum: sum payload[2:] skipping all 0x10 bytes, add 0x10. + # Applied to the destuffed representation — confirms correctly against + # all 11 write frames in the 3-11-26/170151 BW TX capture. ✅ + chk = (sum(b for b in payload_no_chk[2:] if b != 0x10) + 0x10) & 0xFF + + # Wire construction: only BW_CMD is doubled; everything else is raw. + # Do NOT use dle_stuff() here — that would incorrectly double 0x10 bytes + # in the offset, params, and data sections. + wire = ( + bytes([ACK, STX]) # Frame prefix (not part of payload) + + bytes([BW_CMD, BW_CMD]) # BW_CMD doubled (only DLE stuffing applied) + + payload_no_chk[1:] # flags, sub, offset, params, data — RAW + + bytes([chk]) # checksum — RAW + + bytes([ETX]) # Frame terminator + ) + return wire + + def waveform_key_params(key4: bytes) -> bytes: """ Build the 10-byte params block that carries a 4-byte waveform key. diff --git a/minimateplus/protocol.py b/minimateplus/protocol.py index 5e65924..9d7e76c 100644 --- a/minimateplus/protocol.py +++ b/minimateplus/protocol.py @@ -30,6 +30,7 @@ from .framing import ( S3FrameParser, build_bw_frame, build_5a_frame, + build_bw_write_frame, waveform_key_params, token_params, bulk_waveform_params, @@ -65,6 +66,17 @@ SUB_BULK_WAVEFORM = 0x5A SUB_COMPLIANCE = 0x1A SUB_UNKNOWN_2E = 0x2E +# Write command SUBs (= Read SUB + 0x60, confirmed from BW captures 3-11-26) +# Response SUB follows the standard 0xFF - Request SUB rule. +SUB_EVENT_INDEX_WRITE = 0x68 # Write event index (0x08 + 0x60) ✅ +SUB_WAVEFORM_DATA_WRITE = 0x69 # Write waveform data (0x09 + 0x60) ✅ +SUB_COMPLIANCE_WRITE = 0x71 # Write compliance cfg (0x11 + 0x60) ✅ +SUB_WRITE_CONFIRM_A = 0x72 # Confirm A — sent after 71×3 and other writes ✅ +SUB_WRITE_CONFIRM_B = 0x73 # Confirm B — sent after 68 ✅ +SUB_WRITE_CONFIRM_C = 0x74 # Confirm C — sent after 69 ✅ +SUB_TRIGGER_CONFIG_WRITE = 0x82 # Write trigger config (0x22 + 0x60) ✅ +SUB_TRIGGER_CONFIRM = 0x83 # Confirm trigger write ✅ + # Hardcoded data lengths for the two-step read protocol. # # The S3 probe response page_key is always 0x0000 — it does NOT carry the @@ -95,10 +107,11 @@ DATA_LENGTHS: dict[int, int] = { # Confirmed from 1-2-26 BW TX capture analysis (2026-04-02). _BULK_CHUNK_OFFSET = 0x1004 # offset field for probe + all regular chunk requests ✅ _BULK_TERM_OFFSET = 0x005A # offset field for termination request ✅ -_BULK_COUNTER_STEP = 0x0400 # chunk counter increment for chunks 2+ ✅ -# Chunk 1 counter is 0x1004 (NOT 1 * 0x0400 = 0x0400). Confirmed from 4-2-26 BW TX -# capture. Chunks 2+ use n * 0x0400 (0x0800, 0x0C00, …). Device silently ignores -# frames with wrong counter — this was the root cause of the full-waveform timeout. +_BULK_COUNTER_STEP = 0x0400 # chunk counter increment per chunk ✅ +# Chunk counter formula: chunk_num * 0x0400 for ALL chunks including chunk 1. +# Earlier captures showed 0x1004 for chunk 1 — that was a Blastware artifact, not a +# protocol requirement. Confirmed 2026-04-06: 0x0400 for chunk 1 works; 0x1004 +# causes a 120-second device timeout. Formula n * 0x0400 is used for all chunks. # Default timeout values (seconds). # MiniMate Plus is a slow device — keep these generous. @@ -749,6 +762,266 @@ class MiniMateProtocol: return bytes(config) + # ── Write commands (SUBs 68–83) ─────────────────────────────────────────── + + def recv_write_ack( + self, + expected_sub: int, + timeout: Optional[float] = None, + ) -> S3Frame: + """ + Wait for a write-ack S3 frame. + + All write ack responses are 17-byte frames (11-byte header + no data + + 1 checksum byte) with SUB = 0xFF - request_SUB. The page_key and data + section carry zeros. Confirmed from 3-11-26 BW capture. + + Args: + expected_sub: Expected response SUB byte (0xFF - write_request_SUB). + timeout: Seconds to wait; defaults to self._recv_timeout. + + Returns: + The ack S3Frame. + + Raises: + TimeoutError: if no frame arrives in time. + UnexpectedResponse: if the response SUB doesn't match. + """ + log.debug("recv_write_ack: waiting for SUB=0x%02X", expected_sub) + ack = self._recv_one(expected_sub=expected_sub, timeout=timeout) + log.debug( + "recv_write_ack: received SUB=0x%02X page=0x%04X data=%d bytes", + ack.sub, ack.page_key, len(ack.data), + ) + return ack + + def write_confirm(self, sub: int) -> S3Frame: + """ + Send a zero-data confirm frame and wait for the ack. + + Confirm frames (SUBs 72, 73, 74, 83) carry no write data — they are + 16-byte header-only frames (offset=0, params=zeros, data=b"") with the + DLE-aware large-frame checksum. The device acks with the complementary + RSP_SUB. + + Args: + sub: Confirm SUB byte (SUB_WRITE_CONFIRM_A/B/C or SUB_TRIGGER_CONFIRM). + + Returns: + The ack S3Frame. + + Raises: + ProtocolError: on timeout or wrong response SUB. + """ + rsp_sub = _expected_rsp_sub(sub) + frame = build_bw_write_frame(sub, b"") + log.debug("write_confirm: SUB=0x%02X frame=%s", sub, frame.hex()) + self._send(frame) + return self.recv_write_ack(expected_sub=rsp_sub) + + def write_event_index(self, data: bytes) -> S3Frame: + """ + Send a SUB 68 (EVENT_INDEX_WRITE) frame and await the confirm ack (SUB 97). + + Offset formula: data[1] + 2 — confirmed from 3-11-26 BW TX capture frame 102. + The write payload has a 2-byte header [0x00][length] where data[1] encodes + the length of the meaningful payload; offset = data[1] + 2. + + Example from capture: + data[0:4] = 00 58 09 00 (data[1]=0x58=88 → offset=0x5A=90) + data length = 91, offset = 90 + + Write sequence fragment: + 68 (data) → device acks with SUB 0x97 + 73 (confirm) → device acks with SUB 0x8C + + Callers should call write_confirm(SUB_WRITE_CONFIRM_B) after this. + + Args: + data: Raw event-index payload bytes to write to the device. + Must be at least 2 bytes. data[1] must contain the length field. + + Returns: + The SUB 0x97 ack frame. + + Raises: + ProtocolError: on timeout or wrong response SUB. + ValueError: if data is shorter than 2 bytes. + """ + if len(data) < 2: + raise ValueError(f"event index write data must be at least 2 bytes, got {len(data)}") + rsp_sub = _expected_rsp_sub(SUB_EVENT_INDEX_WRITE) # 0xFF - 0x68 = 0x97 + offset = data[1] + 2 + frame = build_bw_write_frame(SUB_EVENT_INDEX_WRITE, data, offset=offset) + log.debug( + "write_event_index: %d bytes data[1]=0x%02X offset=0x%04X rsp_sub=0x%02X", + len(data), data[1], offset, rsp_sub, + ) + self._send(frame) + return self.recv_write_ack(expected_sub=rsp_sub) + + def write_waveform_data(self, data: bytes) -> S3Frame: + """ + Send a SUB 69 (WAVEFORM_DATA_WRITE) frame and await the confirm ack (SUB 96). + + Offset formula: data[1] + 2 — same pattern as write_event_index(). + Confirmed from 3-11-26 BW TX capture frame 110: + data[0:4] = 00 c8 08 00 (data[1]=0xC8=200 → offset=0xCA=202) + data length = 204, offset = 202 + + Write sequence fragment: + 69 (data) → device acks with SUB 0x96 + 74 (confirm) → device acks with SUB 0x8B + 72 (confirm) → device acks with SUB 0x8D + + Callers should call write_confirm(SUB_WRITE_CONFIRM_C) then + write_confirm(SUB_WRITE_CONFIRM_A) after this. + + Args: + data: Raw waveform-data payload bytes to write. + Must be at least 2 bytes. data[1] must contain the length field. + + Returns: + The SUB 0x96 ack frame. + + Raises: + ProtocolError: on timeout or wrong response SUB. + ValueError: if data is shorter than 2 bytes. + """ + if len(data) < 2: + raise ValueError(f"waveform data write payload must be at least 2 bytes, got {len(data)}") + rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_DATA_WRITE) # 0xFF - 0x69 = 0x96 + offset = data[1] + 2 + frame = build_bw_write_frame(SUB_WAVEFORM_DATA_WRITE, data, offset=offset) + log.debug( + "write_waveform_data: %d bytes data[1]=0x%02X offset=0x%04X rsp_sub=0x%02X", + len(data), data[1], offset, rsp_sub, + ) + self._send(frame) + return self.recv_write_ack(expected_sub=rsp_sub) + + def write_compliance_config_raw(self, data: bytes) -> None: + """ + Send the SUB 71 (COMPLIANCE_WRITE) 3-chunk sequence and final confirm. + + The full compliance config payload (~2128 bytes) is split into exactly 3 + chunks with hardcoded boundaries and params confirmed from the 3-11-26 BW + TX capture (frames 104–108): + + Chunk 1 — first 1027 bytes: + offset=0x1004 params=bytes(10) + device acks SUB 0x8E + + Chunk 2 — next 1055 bytes: + offset=0x1004 params=b'\\x00\\x00\\x00\\x00\\x10\\x04' + b'\\x00'*4 + device acks SUB 0x8E + + Chunk 3 — remaining bytes: + offset=0x002C params=b'\\x00\\x00\\x08' + b'\\x00'*7 + device acks SUB 0x8E + + Confirm — SUB 72 (zero data): + device acks SUB 0x8D + + The total write payload should be at least 1027+1055=2082 bytes; chunk 3 + carries everything after offset 2082 (typically ~46 bytes for a 2128-byte + config). + + Args: + data: Raw compliance config bytes to write. Must be at least 2082 bytes. + + Raises: + ValueError: if data is too short to fill chunks 1 and 2. + ProtocolError: on timeout or wrong response SUB from any chunk. + """ + _CHUNK1_SIZE = 1027 + _CHUNK2_SIZE = 1055 + _CHUNK1_OFFSET = 0x1004 + _CHUNK2_OFFSET = 0x1004 + _CHUNK3_OFFSET = 0x002C + + _CHUNK1_PARAMS = bytes(10) + _CHUNK2_PARAMS = bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]) + _CHUNK3_PARAMS = bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + + min_size = _CHUNK1_SIZE + _CHUNK2_SIZE + if len(data) < min_size: + raise ValueError( + f"Compliance write data too short: {len(data)} bytes, " + f"need at least {min_size} (chunk1={_CHUNK1_SIZE} + chunk2={_CHUNK2_SIZE})" + ) + + rsp_sub = _expected_rsp_sub(SUB_COMPLIANCE_WRITE) # 0xFF - 0x71 = 0x8E + + chunk1 = data[:_CHUNK1_SIZE] + chunk2 = data[_CHUNK1_SIZE : _CHUNK1_SIZE + _CHUNK2_SIZE] + chunk3 = data[_CHUNK1_SIZE + _CHUNK2_SIZE :] + + chunks = [ + (1, chunk1, _CHUNK1_OFFSET, _CHUNK1_PARAMS), + (2, chunk2, _CHUNK2_OFFSET, _CHUNK2_PARAMS), + (3, chunk3, _CHUNK3_OFFSET, _CHUNK3_PARAMS), + ] + + for chunk_num, chunk_data, chunk_offset, chunk_params in chunks: + frame = build_bw_write_frame( + SUB_COMPLIANCE_WRITE, + chunk_data, + offset=chunk_offset, + params=chunk_params, + ) + log.debug( + "write_compliance_config_raw: chunk %d %d bytes " + "offset=0x%04X params=%s", + chunk_num, len(chunk_data), chunk_offset, chunk_params.hex(), + ) + self._send(frame) + self.recv_write_ack(expected_sub=rsp_sub) + log.debug("write_compliance_config_raw: chunk %d acked", chunk_num) + + # Final confirm (SUB 72) + log.debug("write_compliance_config_raw: sending confirm (SUB 0x72)") + self.write_confirm(SUB_WRITE_CONFIRM_A) + log.debug("write_compliance_config_raw: done") + + def write_trigger_config(self, data: bytes) -> S3Frame: + """ + Send a SUB 82 (TRIGGER_CONFIG_WRITE) frame and await the confirm ack (SUB 7D). + + Offset formula: data[1] + 2 — same pattern as write_event_index(). + Confirmed from 3-11-26 BW TX capture frame 108: + data[0:4] = 00 1a d5 00 (data[1]=0x1A=26 → offset=0x1C=28) + data length = 29, offset = 28 + + Write sequence fragment: + 82 (data) → device acks with SUB 0x7D + 83 (confirm) → device acks with SUB 0x7C + + Callers should call write_confirm(SUB_TRIGGER_CONFIRM) after this. + + Args: + data: Raw trigger-config payload bytes to write. + Must be at least 2 bytes. data[1] must contain the length field. + + Returns: + The SUB 0x7D ack frame. + + Raises: + ProtocolError: on timeout or wrong response SUB. + ValueError: if data is shorter than 2 bytes. + """ + if len(data) < 2: + raise ValueError(f"trigger config write payload must be at least 2 bytes, got {len(data)}") + rsp_sub = _expected_rsp_sub(SUB_TRIGGER_CONFIG_WRITE) # 0xFF - 0x82 = 0x7D + offset = data[1] + 2 + frame = build_bw_write_frame(SUB_TRIGGER_CONFIG_WRITE, data, offset=offset) + log.debug( + "write_trigger_config: %d bytes data[1]=0x%02X offset=0x%04X rsp_sub=0x%02X", + len(data), data[1], offset, rsp_sub, + ) + self._send(frame) + return self.recv_write_ack(expected_sub=rsp_sub) + # ── Internal helpers ────────────────────────────────────────────────────── def _send(self, frame: bytes) -> None: diff --git a/sfm/waveform_viewer.html b/sfm/waveform_viewer.html index 39fec7f..b9d391b 100644 --- a/sfm/waveform_viewer.html +++ b/sfm/waveform_viewer.html @@ -240,6 +240,7 @@ let charts = {}; let lastData = null; let unitInfo = null; + let geoRange = 10.0; // in/s full-scale for geo channels; updated on connect let eventList = []; // populated from /device/events after connect let currentEventIndex = 0; @@ -277,6 +278,7 @@ throw new Error(err.detail || resp.statusText); } unitInfo = await resp.json(); + geoRange = unitInfo.compliance_config?.max_range_geo ?? 10.0; } catch (e) { setStatus(`Error: ${e.message}`, 'error'); btn.disabled = false; @@ -441,19 +443,48 @@ Object.values(charts).forEach(c => c.destroy()); charts = {}; + // Mic peak PSI from 0C waveform record — used to scale raw mic counts + const micPeakPsi = data.peak_values?.micl_psi ?? null; + const DBL_REF_PSI = 2.9e-9; // 20 µPa in psi + for (const [ch, color] of Object.entries(CHANNEL_COLORS)) { const samples = channels[ch]; if (!samples || samples.length === 0) continue; + // Convert raw ADC counts to physical units + const isGeo = ch !== 'Mic'; + let plotSamples, peakLabel, yUnit, tooltipFmt, tickFmt; + + if (isGeo) { + // Geo channels: counts × (range / 32767) → in/s + const scale = geoRange / 32767; + plotSamples = samples.map(c => c * scale); + const peakIns = Math.max(...plotSamples.map(Math.abs)); + peakLabel = `${peakIns.toFixed(5)} in/s`; + yUnit = 'in/s'; + tooltipFmt = v => `${ch}: ${v.toFixed(5)} in/s`; + tickFmt = v => v.toFixed(4); + } else { + // Mic: derive psi/count scale from the 0C peak value, display as psi; show dBL in header + const peakCounts = Math.max(...samples.map(Math.abs)); + const micScale = (micPeakPsi !== null && peakCounts > 0) + ? Math.abs(micPeakPsi) / peakCounts + : 1.0; + plotSamples = samples.map(c => c * micScale); + const peakPsi = Math.max(...plotSamples.map(Math.abs)); + const peakDbl = peakPsi > 0 ? 20 * Math.log10(peakPsi / DBL_REF_PSI) : -Infinity; + peakLabel = `${peakDbl.toFixed(1)} dBL (${peakPsi.toExponential(3)} psi)`; + yUnit = 'psi'; + tooltipFmt = v => `${ch}: ${v.toExponential(3)} psi`; + tickFmt = v => v.toExponential(1); + } + const wrap = document.createElement('div'); wrap.className = 'chart-wrap'; const lbl = document.createElement('div'); lbl.className = `chart-label ch-${ch.toLowerCase()}`; - - // Compute peak for label - const peak = Math.max(...samples.map(Math.abs)); - lbl.textContent = `${ch} — peak ${peak.toLocaleString()} counts`; + lbl.textContent = `${ch} — peak ${peakLabel}`; wrap.appendChild(lbl); const canvasWrap = document.createElement('div'); @@ -466,11 +497,11 @@ // Downsample for rendering if very long (keep chart responsive) const MAX_POINTS = 4000; let renderTimes = times; - let renderData = samples; - if (samples.length > MAX_POINTS) { - const step = Math.ceil(samples.length / MAX_POINTS); + let renderData = plotSamples; + if (plotSamples.length > MAX_POINTS) { + const step = Math.ceil(plotSamples.length / MAX_POINTS); renderTimes = times.filter((_, i) => i % step === 0); - renderData = samples.filter((_, i) => i % step === 0); + renderData = plotSamples.filter((_, i) => i % step === 0); } const chart = new Chart(canvas, { @@ -496,10 +527,9 @@ intersect: false, callbacks: { title: items => `t = ${items[0].label} ms`, - label: item => `${ch}: ${item.raw.toLocaleString()} counts`, + label: item => tooltipFmt(item.raw), }, }, - // Trigger line annotation (drawn manually via afterDraw) }, scales: { x: { @@ -513,8 +543,18 @@ grid: { color: '#21262d' }, }, y: { - ticks: { color: '#484f58', maxTicksLimit: 5 }, + ticks: { + color: '#484f58', + maxTicksLimit: 5, + callback: v => tickFmt(v), + }, grid: { color: '#21262d' }, + title: { + display: true, + text: yUnit, + color: '#484f58', + font: { size: 10 }, + }, }, }, }, diff --git a/tests/test_write_frames.py b/tests/test_write_frames.py new file mode 100644 index 0000000..a1efc5a --- /dev/null +++ b/tests/test_write_frames.py @@ -0,0 +1,436 @@ +""" +test_write_frames.py — Verify write frame construction against BW capture. + +Validates that build_bw_write_frame() reproduces the exact wire bytes that +Blastware sent during the 3-11-26/170151 compliance-config write session. + +Frames tested (BW TX frame indices 102–112): + 102 — SUB 0x68 event index write + 103 — SUB 0x73 confirm B + 104 — SUB 0x71 compliance write chunk 1 + 105 — SUB 0x71 compliance write chunk 2 + 106 — SUB 0x71 compliance write chunk 3 + 107 — SUB 0x72 confirm A + 108 — SUB 0x82 trigger config write + 109 — SUB 0x83 trigger confirm + 110 — SUB 0x69 waveform data write + 111 — SUB 0x74 confirm C + 112 — SUB 0x72 confirm A (end of sequence) + +Run: + python -m pytest tests/test_write_frames.py -v +or: + python tests/test_write_frames.py +""" + +from __future__ import annotations + +import os +import sys + +import pytest + +# Allow running from the project root without installation +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from minimateplus.framing import build_bw_write_frame + + +# ── Capture loading ──────────────────────────────────────────────────────────── + +CAPTURE_PATH = os.path.join( + os.path.dirname(__file__), + "..", + "bridges", + "captures", + "3-11-26", + "raw_bw_20260311_170151.bin", +) + + +def _load_bw_frames(path: str) -> list[bytes]: + """ + Parse a raw BW capture file into a list of BW frames. + + BW frames start with ACK=0x41 followed by STX=0x02. The frame boundary is + the position of the NEXT 0x41 0x02 sequence (the ETX=0x03 terminator is the + last byte before the next frame start). + + NOTE: A naive scan for ETX=0x03 fails because 0x03 can appear inside the + DLE-stuffed payload. This parser uses consecutive 0x41 0x02 starts as + boundaries, which is safe because the ACK byte (0x41) is never DLE-stuffed. + """ + with open(path, "rb") as f: + raw = f.read() + + boundaries: list[int] = [] + i = 0 + while i < len(raw) - 1: + if raw[i] == 0x41 and raw[i + 1] == 0x02: + boundaries.append(i) + i += 1 + boundaries.append(len(raw)) + + frames = [] + for k in range(len(boundaries) - 1): + frames.append(raw[boundaries[k] : boundaries[k + 1]]) + return frames + + +def _destuff(data: bytes) -> bytes: + """Undo DLE stuffing: replace every 0x10 0x10 pair with a single 0x10.""" + result = bytearray() + k = 0 + while k < len(data): + if data[k] == 0x10 and k + 1 < len(data) and data[k + 1] == 0x10: + result.append(0x10) + k += 2 + else: + result.append(data[k]) + k += 1 + return bytes(result) + + +def _decode_bw_frame(wire: bytes) -> tuple[int, int, bytes, bytes, int]: + """ + Decode a BW wire frame into its components. + + Returns: + (sub, offset, params, data, chk) + sub — SUB byte (payload[2]) + offset — uint16 from payload[4:6] + params — 10-byte params field (payload[6:16]) + data — write payload bytes (payload[16:-1]) + chk — checksum byte (payload[-1]) + """ + inner = wire[2:-1] # strip ACK+STX and trailing ETX + payload = _destuff(inner) + sub = payload[2] + offset = (payload[4] << 8) | payload[5] + params = payload[6:16] + data = payload[16:-1] + chk = payload[-1] + return sub, offset, params, data, chk + + +# ── Test fixtures ────────────────────────────────────────────────────────────── + +@pytest.fixture(scope="module") +def bw_frames() -> list[bytes]: + if not os.path.exists(CAPTURE_PATH): + pytest.skip(f"Capture file not found: {CAPTURE_PATH}") + return _load_bw_frames(CAPTURE_PATH) + + +# ── Individual frame tests ───────────────────────────────────────────────────── + +class TestWriteFrameReconstruction: + """Verify build_bw_write_frame() reproduces the exact wire bytes from the capture.""" + + def test_frame_102_event_index_write_sub68(self, bw_frames: list[bytes]) -> None: + """SUB 0x68 — event index write (frame 102).""" + cap_wire = bw_frames[102] + sub_cap, offset_cap, params_cap, data_cap, chk_cap = _decode_bw_frame(cap_wire) + + assert sub_cap == 0x68 + assert params_cap == bytes(10) + + # Reconstruct using build_bw_write_frame with the same data and offset + built = build_bw_write_frame(0x68, data_cap, offset=offset_cap, params=params_cap) + assert built == cap_wire, ( + f"SUB 0x68 wire mismatch\n" + f" built: {built.hex()}\n" + f" capt: {cap_wire.hex()}" + ) + + def test_frame_103_confirm_b_sub73(self, bw_frames: list[bytes]) -> None: + """SUB 0x73 — confirm B (zero-data confirm frame 103).""" + cap_wire = bw_frames[103] + sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) + + assert sub_cap == 0x73 + assert data_cap == b"" + assert offset_cap == 0x0000 + + built = build_bw_write_frame(0x73, b"") + assert built == cap_wire + + def test_frame_104_compliance_chunk1_sub71(self, bw_frames: list[bytes]) -> None: + """SUB 0x71 chunk 1 — 1027-byte compliance write (frame 104).""" + cap_wire = bw_frames[104] + sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) + + assert sub_cap == 0x71 + assert offset_cap == 0x1004 + assert params_cap == bytes(10) + assert len(data_cap) == 1027 + + built = build_bw_write_frame( + 0x71, data_cap, + offset=0x1004, + params=bytes(10), + ) + assert built == cap_wire + + def test_frame_105_compliance_chunk2_sub71(self, bw_frames: list[bytes]) -> None: + """SUB 0x71 chunk 2 — 1055-byte compliance write (frame 105).""" + cap_wire = bw_frames[105] + sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) + + _CHUNK2_PARAMS = bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]) + + assert sub_cap == 0x71 + assert offset_cap == 0x1004 + assert params_cap == _CHUNK2_PARAMS + assert len(data_cap) == 1055 + + built = build_bw_write_frame( + 0x71, data_cap, + offset=0x1004, + params=_CHUNK2_PARAMS, + ) + assert built == cap_wire + + def test_frame_106_compliance_chunk3_sub71(self, bw_frames: list[bytes]) -> None: + """SUB 0x71 chunk 3 — 46-byte compliance write (frame 106).""" + cap_wire = bw_frames[106] + sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) + + _CHUNK3_PARAMS = bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + + assert sub_cap == 0x71 + assert offset_cap == 0x002C + assert params_cap == _CHUNK3_PARAMS + assert len(data_cap) == 46 + + built = build_bw_write_frame( + 0x71, data_cap, + offset=0x002C, + params=_CHUNK3_PARAMS, + ) + assert built == cap_wire + + def test_frame_107_confirm_a_sub72(self, bw_frames: list[bytes]) -> None: + """SUB 0x72 — confirm A after compliance write (frame 107).""" + cap_wire = bw_frames[107] + sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) + + assert sub_cap == 0x72 + assert data_cap == b"" + assert offset_cap == 0x0000 + + built = build_bw_write_frame(0x72, b"") + assert built == cap_wire + + def test_frame_108_trigger_config_write_sub82(self, bw_frames: list[bytes]) -> None: + """SUB 0x82 — trigger config write (frame 108).""" + cap_wire = bw_frames[108] + sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) + + assert sub_cap == 0x82 + assert params_cap == bytes(10) + assert len(data_cap) == 29 + + # Verify offset formula: data[1] + 2 + assert offset_cap == data_cap[1] + 2, ( + f"Trigger write offset formula mismatch: " + f"data[1]={data_cap[1]} → expected {data_cap[1]+2}, got {offset_cap}" + ) + + built = build_bw_write_frame( + 0x82, data_cap, + offset=offset_cap, + params=params_cap, + ) + assert built == cap_wire + + def test_frame_109_trigger_confirm_sub83(self, bw_frames: list[bytes]) -> None: + """SUB 0x83 — trigger confirm (frame 109).""" + cap_wire = bw_frames[109] + sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire) + + assert sub_cap == 0x83 + assert data_cap == b"" + + built = build_bw_write_frame(0x83, b"") + assert built == cap_wire + + def test_frame_110_waveform_data_write_sub69(self, bw_frames: list[bytes]) -> None: + """SUB 0x69 — waveform data write (frame 110).""" + cap_wire = bw_frames[110] + sub_cap, offset_cap, params_cap, data_cap, _ = _decode_bw_frame(cap_wire) + + assert sub_cap == 0x69 + assert params_cap == bytes(10) + assert len(data_cap) == 204 + + # Verify offset formula: data[1] + 2 + assert offset_cap == data_cap[1] + 2, ( + f"Waveform write offset formula mismatch: " + f"data[1]={data_cap[1]} → expected {data_cap[1]+2}, got {offset_cap}" + ) + + built = build_bw_write_frame( + 0x69, data_cap, + offset=offset_cap, + params=params_cap, + ) + assert built == cap_wire + + def test_frame_111_confirm_c_sub74(self, bw_frames: list[bytes]) -> None: + """SUB 0x74 — confirm C after waveform data write (frame 111).""" + cap_wire = bw_frames[111] + sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire) + + assert sub_cap == 0x74 + assert data_cap == b"" + + built = build_bw_write_frame(0x74, b"") + assert built == cap_wire + + def test_frame_112_confirm_a_sub72_end(self, bw_frames: list[bytes]) -> None: + """SUB 0x72 — final confirm A at end of write sequence (frame 112).""" + cap_wire = bw_frames[112] + sub_cap, _, _, data_cap, _ = _decode_bw_frame(cap_wire) + + assert sub_cap == 0x72 + assert data_cap == b"" + + built = build_bw_write_frame(0x72, b"") + assert built == cap_wire + + +class TestOffsetFormula: + """Verify the offset = data[1] + 2 formula for single-chunk write commands.""" + + def test_event_index_offset_formula(self, bw_frames: list[bytes]) -> None: + """Frame 102 (SUB 0x68): offset = data[1] + 2.""" + _, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[102]) + assert offset_cap == data_cap[1] + 2 + + def test_trigger_config_offset_formula(self, bw_frames: list[bytes]) -> None: + """Frame 108 (SUB 0x82): offset = data[1] + 2.""" + _, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[108]) + assert offset_cap == data_cap[1] + 2 + + def test_waveform_data_offset_formula(self, bw_frames: list[bytes]) -> None: + """Frame 110 (SUB 0x69): offset = data[1] + 2.""" + _, offset_cap, _, data_cap, _ = _decode_bw_frame(bw_frames[110]) + assert offset_cap == data_cap[1] + 2 + + +class TestChecksumVerification: + """Verify large-frame DLE-aware checksum for all write frames.""" + + def _verify_checksum(self, wire: bytes, label: str) -> None: + inner = wire[2:-1] + payload = _destuff(inner) + chk = payload[-1] + computed = (sum(b for b in payload[2:-1] if b != 0x10) + 0x10) & 0xFF + assert computed == chk, ( + f"{label}: checksum mismatch — computed=0x{computed:02X}, got=0x{chk:02X}" + ) + + def test_all_write_frame_checksums(self, bw_frames: list[bytes]) -> None: + write_frames = { + 102: "SUB 0x68 event index write", + 103: "SUB 0x73 confirm B", + 104: "SUB 0x71 compliance chunk 1", + 105: "SUB 0x71 compliance chunk 2", + 106: "SUB 0x71 compliance chunk 3", + 107: "SUB 0x72 confirm A", + 108: "SUB 0x82 trigger config write", + 109: "SUB 0x83 trigger confirm", + 110: "SUB 0x69 waveform data write", + 111: "SUB 0x74 confirm C", + 112: "SUB 0x72 confirm A (end)", + } + for idx, label in write_frames.items(): + self._verify_checksum(bw_frames[idx], f"Frame {idx} ({label})") + + +class TestComplianceChunkSizes: + """Verify compliance write chunk sizes and sequence.""" + + def test_chunk1_size(self, bw_frames: list[bytes]) -> None: + _, _, _, data, _ = _decode_bw_frame(bw_frames[104]) + assert len(data) == 1027, f"Chunk 1 should be 1027 bytes, got {len(data)}" + + def test_chunk2_size(self, bw_frames: list[bytes]) -> None: + _, _, _, data, _ = _decode_bw_frame(bw_frames[105]) + assert len(data) == 1055, f"Chunk 2 should be 1055 bytes, got {len(data)}" + + def test_chunk3_size(self, bw_frames: list[bytes]) -> None: + _, _, _, data, _ = _decode_bw_frame(bw_frames[106]) + assert len(data) == 46, f"Chunk 3 should be 46 bytes, got {len(data)}" + + def test_total_compliance_data(self, bw_frames: list[bytes]) -> None: + total = sum( + len(_decode_bw_frame(bw_frames[i])[3]) for i in [104, 105, 106] + ) + assert total == 2128, f"Total compliance write data should be 2128 bytes, got {total}" + + def test_chunk1_params(self, bw_frames: list[bytes]) -> None: + _, _, params, _, _ = _decode_bw_frame(bw_frames[104]) + assert params == bytes(10) + + def test_chunk2_params(self, bw_frames: list[bytes]) -> None: + _, _, params, _, _ = _decode_bw_frame(bw_frames[105]) + assert params == bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]) + + def test_chunk3_params(self, bw_frames: list[bytes]) -> None: + _, _, params, _, _ = _decode_bw_frame(bw_frames[106]) + assert params == bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + + def test_chunk1_offset(self, bw_frames: list[bytes]) -> None: + _, offset, _, _, _ = _decode_bw_frame(bw_frames[104]) + assert offset == 0x1004 + + def test_chunk2_offset(self, bw_frames: list[bytes]) -> None: + _, offset, _, _, _ = _decode_bw_frame(bw_frames[105]) + assert offset == 0x1004 + + def test_chunk3_offset(self, bw_frames: list[bytes]) -> None: + _, offset, _, _, _ = _decode_bw_frame(bw_frames[106]) + assert offset == 0x002C + + +# ── Standalone runner ────────────────────────────────────────────────────────── + +if __name__ == "__main__": + if not os.path.exists(CAPTURE_PATH): + print(f"ERROR: Capture file not found: {CAPTURE_PATH}") + sys.exit(1) + + frames = _load_bw_frames(CAPTURE_PATH) + print(f"Loaded {len(frames)} BW frames from capture") + + write_frame_indices = list(range(102, 113)) + all_pass = True + print() + print(f"{'Frame':>6} {'SUB':>5} {'Offset':>8} {'DataLen':>8} {'Chk OK':>7} {'Rebuilt':>8}") + print("-" * 60) + for idx in write_frame_indices: + wire = frames[idx] + sub, offset, params, data, chk = _decode_bw_frame(wire) + payload = _destuff(wire[2:-1]) + computed = (sum(b for b in payload[2:-1] if b != 0x10) + 0x10) & 0xFF + chk_ok = computed == chk + + built = build_bw_write_frame(sub, data, offset=offset, params=params) + rebuilt_ok = built == wire + + status = "✅" if (chk_ok and rebuilt_ok) else "❌" + print( + f" {idx:4d} 0x{sub:02X} 0x{offset:04X} {len(data):8d} " + f"{'✅' if chk_ok else '❌':>7} {'✅' if rebuilt_ok else '❌':>8} {status}" + ) + if not (chk_ok and rebuilt_ok): + all_pass = False + + print() + if all_pass: + print("All 11 write frames verified ✅") + else: + print("FAILURES DETECTED ❌") + sys.exit(1)