Files
seismo-relay/minimateplus/framing.py
T
serversdown 429c6ac87a feat(protocol): implement v0.14.0 SUB 5A protocol rewrite with enhanced chunk handling and new helpers
test: add regression tests for v0.14.x SUB 5A protocol fixes
refactor(logging): change warning logs to debug for less verbosity in write_blastware_file
2026-05-06 14:18:31 -04:00

757 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
framing.py — DLE frame codec for the Instantel MiniMate Plus RS-232 protocol.
Wire format:
BW→S3 (our requests): [ACK=0x41] [STX=0x02] [stuffed payload+chk] [ETX=0x03]
S3→BW (device replies): [DLE=0x10] [STX=0x02] [stuffed payload+chk] [DLE=0x10] [ETX=0x03]
The ACK 0x41 byte often precedes S3 frames too — it is silently discarded
by the streaming parser.
De-stuffed payload layout:
BW→S3 request frame:
[0] CMD 0x10 (BW request marker)
[1] flags 0x00
[2] SUB command sub-byte
[3] 0x00 always zero in captured frames
[4] 0x00 always zero in captured frames
[5] OFFSET two-step offset: 0x00 = length-probe, DATA_LEN = data-request
[6-15] zero padding (total de-stuffed payload = 16 bytes)
S3→BW response frame:
[0] CMD 0x00 (S3 response marker)
[1] flags 0x10
[2] SUB response sub-byte (= 0xFF - request SUB)
[3] PAGE_HI high byte of page address (always 0x00 in observed frames)
[4] PAGE_LO low byte (always 0x00 in observed frames)
[5+] data payload data section (composite inner frames for large responses)
DLE stuffing rule: any 0x10 byte in the payload is doubled on the wire (0x10 → 0x10 0x10).
This applies to the checksum byte too.
Confirmed from live captures (s3_parser.py validation + raw_bw.bin / raw_s3.bin).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
# ── Protocol byte constants ───────────────────────────────────────────────────
DLE = 0x10 # Data Link Escape
STX = 0x02 # Start of text
ETX = 0x03 # End of text
ACK = 0x41 # Acknowledgement / frame-start marker (BW side)
BW_CMD = 0x10 # CMD byte value in BW→S3 frames
S3_CMD = 0x00 # CMD byte value in S3→BW frames
S3_FLAGS = 0x10 # flags byte value in S3→BW frames
# BW read-command payload size: 5 header bytes + 11 padding bytes = 16 total.
# Confirmed from captured raw_bw.bin: all read-command frames carry exactly 16
# de-stuffed bytes (excluding the appended checksum).
_BW_PAYLOAD_SIZE = 16
# ── DLE stuffing / de-stuffing ────────────────────────────────────────────────
def dle_stuff(data: bytes) -> bytes:
"""Escape literal 0x10 bytes: 0x10 → 0x10 0x10."""
out = bytearray()
for b in data:
if b == DLE:
out.append(DLE)
out.append(b)
return bytes(out)
def dle_unstuff(data: bytes) -> bytes:
"""Remove DLE stuffing: 0x10 0x10 → 0x10."""
out = bytearray()
i = 0
while i < len(data):
b = data[i]
if b == DLE and i + 1 < len(data) and data[i + 1] == DLE:
out.append(DLE)
i += 2
else:
out.append(b)
i += 1
return bytes(out)
# ── Checksum ─────────────────────────────────────────────────────────────────
def checksum(payload: bytes) -> int:
"""SUM8: sum of all de-stuffed payload bytes, mod 256."""
return sum(payload) & 0xFF
# ── BW→S3 frame builder ───────────────────────────────────────────────────────
# SUB byte for 5A — used by build_5a_frame below (protocol.py has the full
# constant set; defined here to avoid a circular import).
SUB_5A = 0x5A
def build_5a_frame(offset_word: int, raw_params: bytes) -> bytes:
"""
Build a BW→S3 frame for SUB 5A (BULK_WAVEFORM_STREAM) that exactly
matches Blastware's wire output.
SUB 5A uses a DIFFERENT frame format from all other read commands:
1. The offset field (bytes [4:6]) is written RAW — the 0x10 in
offset_hi=0x10 is NOT DLE-stuffed, unlike build_bw_frame().
2. The checksum uses a DLE-aware sum: for each 0x10 XX pair in the
stuffed section, only XX contributes; lone bytes contribute normally.
This differs from the standard SUM8 checksum on the unstuffed payload.
Both differences are confirmed from the 1-2-26 BW TX capture (all 10 frames
verified against this algorithm on 2026-04-02).
Args:
offset_word: 16-bit offset. For probe/chunks/metadata pages this is
`0x1002`. For the proper TERM frame this is computed by
`bulk_waveform_term_v2()` from the STRT-derived
`end_offset`.
raw_params: 10, 11, or 12 params bytes (from `bulk_waveform_params`
for probes/samples, `bulk_waveform_term_v2` for TERM, or
a manually-built 12-byte block for the metadata pages
0x1002 / 0x1004). See gotcha #3 below — params region
uses partial DLE stuffing of 0x10 bytes.
Returns:
Complete frame bytes: [ACK][STX][stuffed_section][chk][ETX]
"""
if len(raw_params) not in (10, 11, 12):
# 10 = termination params; 11 = regular probe / chunk params;
# 12 = metadata-page params (extra trailing 0x00 — BW byte-perfect quirk
# for the two fixed metadata reads at counter=0x1002 and 0x1004).
raise ValueError(f"raw_params must be 10/11/12 bytes, got {len(raw_params)}")
# Build stuffed section between STX and checksum
s = bytearray()
s += b"\x10\x10" # DLE-stuffed BW_CMD
s += b"\x00" # flags
s += bytes([SUB_5A]) # sub = 0x5A
s += b"\x00" # field3
s += bytes([(offset_word >> 8) & 0xFF, # offset_hi — raw, NOT stuffed
offset_word & 0xFF]) # offset_lo
# Params — partial DLE stuffing of 0x10 bytes (CONFIRMED 2026-05-05).
#
# The device's de-stuffing rule for params is:
# • `10 10` → de-stuffs to `10`
# • `10 02/03/04` → kept literal (these are inner-frame markers)
# • `10 X` other → de-stuffs to just `X` (drops the 0x10)
#
# So for any 0x10 byte in the *logical* params that is followed by a
# byte NOT in {0x02, 0x03, 0x04, 0x10}, we must double the 0x10 on the
# wire (`10 X` → `10 10 X`) so the device's de-stuffer reproduces the
# original `10 X` pair. Without this, counter values with `0x10` in
# the high byte (e.g. counter=0x1000 has params bytes `10 00`) are
# silently corrupted to `0x__00` on the device side, and the device
# responds for the wrong address — for counter=0x1000 it returns the
# probe response (counter=0x0000), which contains the file header +
# STRT. That STRT block then lands in the assembled file body and
# Blastware rejects the file as malformed.
#
# Confirmed against BW capture 5-1-26 / bwcap3sec frame 20: params
# logical bytes `00 01 11 10 00 00 00 00 00 00 00` (counter=0x1000)
# are encoded on the wire as `00 01 11 10 10 00 00 00 00 00 00 00`.
# BW frames 13/14 (meta @ 0x1002 / 0x1004) leave `10 02` and `10 04`
# raw — the device handles those literal pairs correctly.
i = 0
while i < len(raw_params):
b = raw_params[i]
s.append(b)
if (
b == 0x10
and i + 1 < len(raw_params)
and raw_params[i + 1] not in (0x02, 0x03, 0x04, 0x10)
):
s.append(0x10) # double the 0x10 so it survives device de-stuffing
i += 1
# DLE-aware checksum: for 0x10 XX pairs count XX; for lone bytes count them
chk, i = 0, 0
while i < len(s):
if s[i] == DLE and i + 1 < len(s):
chk = (chk + s[i + 1]) & 0xFF
i += 2
else:
chk = (chk + s[i]) & 0xFF
i += 1
return bytes([ACK, STX]) + bytes(s) + bytes([chk, ETX])
def build_bw_frame(sub: int, offset: int = 0, params: bytes = bytes(10)) -> bytes:
"""
Build a BW→S3 read-command frame.
The payload is always 16 de-stuffed bytes:
[BW_CMD, 0x00, sub, 0x00, 0x00, offset] + params(10 bytes)
Confirmed from BW capture analysis: payload[3] and payload[4] are always
0x00 across all observed read commands. The two-step offset lives at
payload[5]: 0x00 for the length-probe step, DATA_LEN for the data-fetch step.
The 10 params bytes (payload[6..15]) are zero for standard reads. For
keyed reads (SUBs 0A, 0C) the 4-byte waveform key lives at params[4..7]
(= payload[10..13]). For token-based reads (SUBs 1E, 1F) a single token
byte lives at params[6] (= payload[12]). Use waveform_key_params() and
token_params() helpers to build these safely.
Wire output: [ACK] [STX] dle_stuff(payload + checksum) [ETX]
Args:
sub: SUB command byte (e.g. 0x01 = FULL_CONFIG_READ)
offset: Value placed at payload[5].
Pass 0 for the probe step; pass DATA_LENGTHS[sub] for the data step.
params: 10 bytes placed at payload[6..15]. Default: all zeros.
Returns:
Complete frame bytes ready to write to the serial port / socket.
"""
if len(params) != 10:
raise ValueError(f"params must be exactly 10 bytes, got {len(params)}")
if offset > 0xFFFF:
raise ValueError(f"offset must fit in uint16, got {offset:#06x}")
# offset is a uint16 split across bytes [4] (high) and [5] (low).
# For all standard reads (offset ≤ 0xFF), byte[4] = 0x00 — consistent with
# every captured BW frame. For large payloads (e.g. SUB 1A / E5 at 0x082A),
# byte[4] carries the high byte. 🔶 INFERRED — confirm once E5 is captured.
offset_hi = (offset >> 8) & 0xFF
offset_lo = offset & 0xFF
payload = bytes([BW_CMD, 0x00, sub, 0x00, offset_hi, offset_lo]) + params
chk = checksum(payload)
wire = bytes([ACK, STX]) + dle_stuff(payload + bytes([chk])) + bytes([ETX])
return wire
def build_bw_write_frame(
sub: int,
data: bytes,
*,
offset: int = 0,
params: bytes = bytes(10),
) -> bytes:
"""
Build a BW→S3 write-command frame.
Write frames extend the standard 16-byte read header with a variable-length
data payload. They use a different checksum formula from read frames.
**CRITICAL: Write frames use minimal DLE stuffing.**
Unlike read frames (build_bw_frame), write frames do NOT apply full DLE
stuffing to the payload. Only the BW_CMD byte (0x10) at position [0] is
doubled to 0x10 0x10 on the wire. All other bytes — flags, sub, offset,
params, data, and checksum — are written RAW with no stuffing, even if they
contain 0x10 bytes (e.g. offset_hi=0x10 for compliance chunks, or 0x10
bytes in the write data payload).
Confirmed from 3-11-26 BW TX capture (frames 102112): all 11 write frames
match the rule "double BW_CMD only; everything else raw." ✅ 2026-04-07.
Wire layout:
[41] ACK
[02] STX
[10 10] BW_CMD doubled (the ONLY DLE stuffing applied)
[00] flags
[sub] write command byte (0x680x83)
[00] always zero
[hi][lo] offset as uint16 BE (raw; NOT stuffed even if hi=0x10)
[params] 10 bytes (raw)
[data] variable-length write payload (raw; NOT stuffed)
[chk] checksum byte (raw; NOT stuffed even if 0x10)
[03] ETX
De-stuffed payload (for checksum computation):
[0] BW_CMD 0x10
[1] flags 0x00
[2] SUB write command byte
[3] 0x00 always zero
[4] offset_hi
[5] offset_lo
[6:16] params 10 bytes
[16:] data write payload
[-1] chk
**Checksum formula (confirmed 2026-03-12 from 3-11-26 BW TX capture):**
chk = (sum(b for b in payload[2:] if b != 0x10) + 0x10) % 256
where payload = destuffed content BEFORE appending chk.
This skips all 0x10 bytes in payload[2:] (sub onwards), including any
0x10 bytes in the offset, params, data, and the checksum byte itself.
The offset field [4:6] meaning per write SUB:
- SUBs 68, 69, 82 (single-chunk writes): offset = data[1] + 2, where
data[1] is an embedded length field in the write payload.
Confirmed from capture: 68→0x5A (data[1]=0x58+2), 82→0x1C
(data[1]=0x1A+2), 69→0xCA (data[1]=0xC8+2).
- SUB 71 (multi-chunk compliance): 0x1004 for full chunks, 0x002C
for the final partial chunk.
- Confirm frames (72, 73, 74, 83): offset=0, no data.
Args:
sub: Write command SUB byte.
data: Write payload (variable length; empty for confirm frames).
offset: 16-bit value placed at [4:6]. See per-SUB notes above.
params: 10 bytes placed at [6:16]. All-zero for most writes; compliance
chunk writes use chunk-specific values.
Returns:
Complete frame bytes ready to write to the transport.
"""
if len(params) != 10:
raise ValueError(f"params must be exactly 10 bytes, got {len(params)}")
if offset > 0xFFFF:
raise ValueError(f"offset must fit in uint16, got {offset:#06x}")
offset_hi = (offset >> 8) & 0xFF
offset_lo = offset & 0xFF
# Destuffed payload (used only for checksum; not sent directly)
payload_no_chk = bytes([BW_CMD, 0x00, sub, 0x00, offset_hi, offset_lo]) + params + data
# Large-frame checksum: sum payload[2:] skipping all 0x10 bytes, add 0x10.
# Applied to the destuffed representation — confirms correctly against
# all 11 write frames in the 3-11-26/170151 BW TX capture. ✅
chk = (sum(b for b in payload_no_chk[2:] if b != 0x10) + 0x10) & 0xFF
# Wire construction: only BW_CMD is doubled; everything else is raw.
# Do NOT use dle_stuff() here — that would incorrectly double 0x10 bytes
# in the offset, params, and data sections.
wire = (
bytes([ACK, STX]) # Frame prefix (not part of payload)
+ bytes([BW_CMD, BW_CMD]) # BW_CMD doubled (only DLE stuffing applied)
+ payload_no_chk[1:] # flags, sub, offset, params, data — RAW
+ bytes([chk]) # checksum — RAW
+ bytes([ETX]) # Frame terminator
)
return wire
def waveform_key_params(key4: bytes) -> bytes:
"""
Build the 10-byte params block that carries a 4-byte waveform key.
Used for SUBs 0A (WAVEFORM_HEADER) and 0C (WAVEFORM_RECORD).
The key goes at params[4..7], which maps to payload[10..13].
Confirmed from 3-31-26 capture: 0A and 0C request frames carry the
4-byte record address at payload[10..13]. Probe and data-fetch steps
carry the same key in both frames.
Args:
key4: exactly 4 bytes — the opaque waveform record address returned
by the EVENT_HEADER (1E) or EVENT_ADVANCE (1F) response.
Returns:
10-byte params block with key embedded at positions [4..7].
"""
if len(key4) != 4:
raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}")
p = bytearray(10)
p[4:8] = key4
return bytes(p)
def token_params(token: int = 0) -> bytes:
"""
Build the 10-byte params block that carries a single token byte.
Used for SUBs 1E (EVENT_HEADER) and 1F (EVENT_ADVANCE).
The token goes at params[7], which maps to payload[13].
Confirmed from BOTH 3-31-26 and 4-3-26 BW TX captures:
raw params bytes: 00 00 00 00 00 00 00 fe 00 00
token is at index 7 (not 6 — that was wrong).
- token=0x00: first-event read / browse mode (no download marking)
- token=0xfe: download mode (causes 1F to skip partial bins and
advance to the next full record)
The device echoes the token at data[8] of the S3 response (payload[13]),
distinct from the next-event key at data[11:15] (payload[16:20]).
Args:
token: single byte to place at params[7] / payload[13].
Returns:
10-byte params block with token at position [7].
"""
p = bytearray(10)
p[7] = token
return bytes(p)
def bulk_waveform_params(key4: bytes, counter: int, *, is_probe: bool = False) -> bytes:
"""
Build the 10-byte params block for SUB 5A (BULK_WAVEFORM_STREAM) requests.
Confirmed 2026-04-02 from 1-2-26 BW TX capture analysis:
Probe / first request (is_probe=True, counter=0):
params[0] = 0x00
params[1:5] = key4 (all 4 key bytes; counter overlaps key4[2:4] = 0x0000)
params[5:] = zeros
Regular chunk requests (is_probe=False):
params[0] = 0x00
params[1:3] = key4[0:2] (first 2 key bytes as session handle)
params[3:5] = counter (BE uint16) (chunk position, increments by 0x0400)
params[5:] = zeros
Termination request: DO NOT use this helper — see bulk_waveform_term_params().
Args:
key4: 4-byte waveform key from EVENT_HEADER (1E) response.
counter: Chunk position counter (uint16 BE). Pass 0 for probe.
is_probe: If True, embed full key4 (probe step only).
Returns:
11-byte params block. (BW confirmed: chunk frames carry 11 params bytes,
not 10; the extra trailing 0x00 was confirmed from 1-2-26 wire capture
on 2026-04-02.)
"""
if len(key4) != 4:
raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}")
p = bytearray(11) # 11 bytes confirmed from BW wire capture
p[0] = 0x00
p[1] = key4[0]
p[2] = key4[1]
if is_probe:
# Full key4; counter=0 is implied (overlaps with key4[2:4] which must be 0x0000)
p[3] = key4[2]
p[4] = key4[3]
else:
p[3] = (counter >> 8) & 0xFF
p[4] = counter & 0xFF
return bytes(p)
def bulk_waveform_term_params(key4: bytes, counter: int) -> bytes:
"""
⛔ DEPRECATED — DO NOT USE IN NEW CODE.
This is the v1 termination params helper, paired with the broken
`_BULK_TERM_OFFSET = 0x005A` magic offset_word. Together they produce a
~100-byte device-side terminator response that does NOT contain the
partial-last-chunk waveform tail or the 26-byte file footer. Files
reconstructed using this terminator are missing their last ~512 bytes of
waveform data and have a synthesized footer that disagrees with what BW
would have written.
**For new code, use `bulk_waveform_term_v2(key4, end_offset, last_chunk_counter)`**
which computes the correct offset_word + params from the STRT-derived
`end_offset`. v2 produces wire bytes that match BW exactly across all
tested events (4-27-26 / 5-1-26 / 5-4-26 captures).
This function is retained ONLY for the defensive fallback path in
`read_bulk_waveform_stream()` that triggers when STRT parsing fails or no
chunks are fetched (= a malformed event or an unexpected device state).
The fallback already logs a WARNING when it activates; if you see that
warning, the bug is upstream — STRT should have been parseable.
"""
if len(key4) != 4:
raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}")
p = bytearray(10)
p[0] = key4[0]
p[1] = key4[1]
p[2] = (counter >> 8) & 0xFF
return bytes(p)
def bulk_waveform_term_v2(
key4: bytes,
end_offset: int,
last_chunk_counter: int,
) -> tuple[int, bytes]:
"""
Compute the SUB 5A TERM frame's offset_word and 10-byte params block.
Confirmed across 3 events (4-27-26 + 5-1-26 captures):
next_boundary = last_chunk_counter + 0x0200
offset_word = end_offset - next_boundary (residual byte count)
params[0] = key4[0] (= 0x01 on every observed device)
params[1] = key4[1] (= 0x11)
params[2] = (next_boundary >> 8) & 0xFF
params[3] = next_boundary & 0xFF
params[4:10] = zeros
Verification:
| end_offset | last_chunk | next_boundary | offset_word | params[2:4] |
| 0x1ABE | 0x1800 | 0x1A00 | 0x00BE | 1A 00 |
| 0x21F2 | 0x1E00 | 0x2000 | 0x01F2 | 20 00 |
| 0x417E | 0x3E38 | 0x4038 | 0x0146 | 40 38 |
The device receives `requested_address = (params[2] << 8) | offset_word`
and replies with `(end_offset - next_boundary)` bytes of waveform tail
starting at `next_boundary` — including the 26-byte file footer.
Args:
key4: 4-byte waveform key for this event.
end_offset: Event-end pointer (= `(end_key[2] << 8) | end_key[3]`
from the STRT record at data[23:27] of A5[0]).
last_chunk_counter: Counter of the last full 0x0200-byte chunk fetched
(the chunk that covers [last_chunk_counter,
last_chunk_counter + 0x0200)).
Returns:
(offset_word, params10) tuple. Pass as
`build_5a_frame(offset_word, params)`.
Raises:
ValueError: on inconsistent inputs.
"""
if len(key4) != 4:
raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}")
next_boundary = last_chunk_counter + 0x0200
if next_boundary > 0xFFFF:
raise ValueError(
f"next_boundary 0x{next_boundary:04X} exceeds uint16; check inputs"
)
if end_offset <= last_chunk_counter:
raise ValueError(
f"end_offset 0x{end_offset:04X} must be > "
f"last_chunk_counter 0x{last_chunk_counter:04X}"
)
offset_word = end_offset - next_boundary
if offset_word < 0:
# Last chunk overshot end_offset; caller should have stopped one chunk
# earlier. Treat as zero residual.
offset_word = 0
if offset_word > 0xFFFF:
raise ValueError(
f"offset_word 0x{offset_word:04X} exceeds uint16"
)
p = bytearray(10)
p[0] = key4[0]
p[1] = key4[1]
p[2] = (next_boundary >> 8) & 0xFF
p[3] = next_boundary & 0xFF
return offset_word, bytes(p)
# ── End-offset extraction from STRT record ────────────────────────────────────
STRT_MARKER = b"STRT"
def parse_strt_end_offset(a5_data: bytes) -> Optional[int]:
"""
Extract the event-end offset from the STRT record in an A5 response payload.
The first A5 response (the probe response, or the first chunk for events
with non-zero start_key[2:4]) contains a STRT record at byte offset 17 of
`data`. Layout:
data[17:21] "STRT"
data[21:23] ff fe sentinel
data[23:27] end_key ← 4-byte key of where this event ENDS
data[27:31] start_key
...
Returns `(end_key[2] << 8) | end_key[3]` — the absolute device-buffer
address where the event ends. Use this to bound the chunk loop and to
compute the TERM frame.
Verified end_offset values:
| event start_key | end_key | end_offset |
| 01110000 | 01111ABE | 0x1ABE |
| 01110000 | 011121F2 | 0x21F2 |
| 011121F2 | 0111417E | 0x417E |
Args:
a5_data: The `data` field of an A5 response frame (frame.data).
Returns:
The end_offset (uint16) if STRT is found, else None.
"""
pos = a5_data.find(STRT_MARKER)
if pos < 0 or pos + 10 > len(a5_data):
return None
# data[pos+4:pos+6] is "ff fe"; data[pos+6:pos+10] is end_key.
end_key = a5_data[pos + 6 : pos + 10]
if len(end_key) < 4:
return None
return (end_key[2] << 8) | end_key[3]
# ── Pre-built POLL frames ─────────────────────────────────────────────────────
#
# POLL (SUB 0x5B) uses the same two-step pattern as all other reads — the
# hardcoded length 0x30 lives at payload[5], exactly as in build_bw_frame().
POLL_PROBE = build_bw_frame(0x5B, 0x00) # length-probe POLL (offset = 0)
POLL_DATA = build_bw_frame(0x5B, 0x30) # data-request POLL (offset = 0x30)
# Session-reset signal (ACK + ETX, no STX/payload).
# Confirmed from 4-8-26 BW TX captures: Blastware sends this 2-byte sequence
# immediately before the first POLL probe, and again between the POLL probe
# and the POLL data request. Required to wake a unit that is actively
# monitoring — without it the unit does not respond to POLL over TCP.
# Harmless for idle units (they respond to POLL regardless).
SESSION_RESET = bytes([0x41, 0x03])
# ── S3 response dataclass ─────────────────────────────────────────────────────
@dataclass
class S3Frame:
"""A fully parsed and de-stuffed S3→BW response frame."""
sub: int # response SUB byte (e.g. 0xA4 = POLL_RESPONSE)
page_hi: int # PAGE_HI from header (= data length on step-2 length response)
page_lo: int # PAGE_LO from header
data: bytes # payload data section (payload[5:], checksum already stripped)
checksum_valid: bool
chk_byte: int = 0 # actual checksum byte received from wire (body[-1])
# needed for waveform file reconstruction: when the last data byte
# is 0x10 and chk_byte ∈ {0x02, 0x03, 0x04}, the DLE+chk pair
# must be included in the DLE-strip operation to correctly
# reconstruct the Blastware binary body.
@property
def page_key(self) -> int:
"""Combined 16-bit page address / length: (page_hi << 8) | page_lo."""
return (self.page_hi << 8) | self.page_lo
# ── Streaming S3 frame parser ─────────────────────────────────────────────────
class S3FrameParser:
"""
Incremental byte-stream parser for S3→BW response frames.
Feed incoming bytes with feed(). Complete, valid frames are returned
immediately and also accumulated in self.frames.
State machine:
IDLE — scanning for DLE (0x10)
SEEN_DLE — saw DLE, waiting for STX (0x02) to start a frame
IN_FRAME — collecting de-stuffed payload bytes; bare ETX ends frame
IN_FRAME_DLE — inside frame, saw DLE; DLE continues stuffing;
DLE+ETX is treated as literal data (NOT a frame end),
which lets inner-frame terminators pass through intact
Wire format confirmed from captures:
[DLE=0x10] [STX=0x02] [stuffed payload+chk] [bare ETX=0x03]
The ETX is NOT preceded by a DLE on the wire. DLE+ETX sequences that
appear inside the payload are inner-frame terminators and must be
treated as literal data.
ACK (0x41) bytes and arbitrary non-DLE bytes in IDLE state are silently
discarded (covers device boot string "Operating System" and keepalive ACKs).
"""
_IDLE = 0
_SEEN_DLE = 1
_IN_FRAME = 2
_IN_FRAME_DLE = 3
def __init__(self) -> None:
self._state = self._IDLE
self._body = bytearray() # accumulates de-stuffed frame bytes
self.frames: list[S3Frame] = []
self.bytes_fed: int = 0 # cumulative raw bytes fed since last reset
def reset(self) -> None:
self._state = self._IDLE
self._body.clear()
self.bytes_fed = 0
def feed(self, data: bytes) -> list[S3Frame]:
"""
Process a chunk of incoming bytes.
Returns a list of S3Frame objects completed during this call.
All completed frames are also appended to self.frames.
"""
self.bytes_fed += len(data)
completed: list[S3Frame] = []
for b in data:
frame = self._step(b)
if frame is not None:
completed.append(frame)
self.frames.append(frame)
return completed
def _step(self, b: int) -> Optional[S3Frame]:
"""Process one byte. Returns a completed S3Frame or None."""
if self._state == self._IDLE:
if b == DLE:
self._state = self._SEEN_DLE
# ACK, boot strings, garbage — silently ignored
elif self._state == self._SEEN_DLE:
if b == STX:
self._body.clear()
self._state = self._IN_FRAME
else:
# Stray DLE not followed by STX — back to idle
self._state = self._IDLE
elif self._state == self._IN_FRAME:
if b == DLE:
self._state = self._IN_FRAME_DLE
elif b == ETX:
# Bare ETX = real frame terminator (confirmed from captures)
frame = self._finalise()
self._state = self._IDLE
return frame
else:
self._body.append(b)
elif self._state == self._IN_FRAME_DLE:
if b == DLE:
# DLE DLE → literal 0x10 in payload
self._body.append(DLE)
self._state = self._IN_FRAME
elif b == ETX:
# DLE+ETX inside a frame is an inner-frame terminator, NOT
# the outer frame end. Treat as literal data and continue.
self._body.append(DLE)
self._body.append(ETX)
self._state = self._IN_FRAME
else:
# Unexpected DLE + byte — treat both as literal data and continue
self._body.append(DLE)
self._body.append(b)
self._state = self._IN_FRAME
return None
def _finalise(self) -> Optional[S3Frame]:
"""
Called when DLE+ETX is seen. Validates checksum and builds S3Frame.
Returns None if the frame is too short or structurally invalid.
"""
body = bytes(self._body)
# Minimum valid frame: 5-byte header + at least 1 checksum byte = 6
if len(body) < 6:
return None
raw_payload = body[:-1] # everything except the trailing checksum byte
chk_received = body[-1]
chk_computed = checksum(raw_payload)
if len(raw_payload) < 5:
return None
# Validate CMD byte — we only accept S3→BW response frames here
if raw_payload[0] != S3_CMD:
return None
return S3Frame(
sub = raw_payload[2],
page_hi = raw_payload[3],
page_lo = raw_payload[4],
data = raw_payload[5:],
checksum_valid = (chk_received == chk_computed),
chk_byte = chk_received,
)