Files
seismo-relay/minimateplus/framing.py
T

607 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
framing.py — DLE frame codec for the Instantel MiniMate Plus RS-232 protocol.
Wire format:
BW→S3 (our requests): [ACK=0x41] [STX=0x02] [stuffed payload+chk] [ETX=0x03]
S3→BW (device replies): [DLE=0x10] [STX=0x02] [stuffed payload+chk] [DLE=0x10] [ETX=0x03]
The ACK 0x41 byte often precedes S3 frames too — it is silently discarded
by the streaming parser.
De-stuffed payload layout:
BW→S3 request frame:
[0] CMD 0x10 (BW request marker)
[1] flags 0x00
[2] SUB command sub-byte
[3] 0x00 always zero in captured frames
[4] 0x00 always zero in captured frames
[5] OFFSET two-step offset: 0x00 = length-probe, DATA_LEN = data-request
[6-15] zero padding (total de-stuffed payload = 16 bytes)
S3→BW response frame:
[0] CMD 0x00 (S3 response marker)
[1] flags 0x10
[2] SUB response sub-byte (= 0xFF - request SUB)
[3] PAGE_HI high byte of page address (always 0x00 in observed frames)
[4] PAGE_LO low byte (always 0x00 in observed frames)
[5+] data payload data section (composite inner frames for large responses)
DLE stuffing rule: any 0x10 byte in the payload is doubled on the wire (0x10 → 0x10 0x10).
This applies to the checksum byte too.
Confirmed from live captures (s3_parser.py validation + raw_bw.bin / raw_s3.bin).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
# ── Protocol byte constants ───────────────────────────────────────────────────
DLE = 0x10 # Data Link Escape
STX = 0x02 # Start of text
ETX = 0x03 # End of text
ACK = 0x41 # Acknowledgement / frame-start marker (BW side)
BW_CMD = 0x10 # CMD byte value in BW→S3 frames
S3_CMD = 0x00 # CMD byte value in S3→BW frames
S3_FLAGS = 0x10 # flags byte value in S3→BW frames
# BW read-command payload size: 5 header bytes + 11 padding bytes = 16 total.
# Confirmed from captured raw_bw.bin: all read-command frames carry exactly 16
# de-stuffed bytes (excluding the appended checksum).
_BW_PAYLOAD_SIZE = 16
# ── DLE stuffing / de-stuffing ────────────────────────────────────────────────
def dle_stuff(data: bytes) -> bytes:
"""Escape literal 0x10 bytes: 0x10 → 0x10 0x10."""
out = bytearray()
for b in data:
if b == DLE:
out.append(DLE)
out.append(b)
return bytes(out)
def dle_unstuff(data: bytes) -> bytes:
"""Remove DLE stuffing: 0x10 0x10 → 0x10."""
out = bytearray()
i = 0
while i < len(data):
b = data[i]
if b == DLE and i + 1 < len(data) and data[i + 1] == DLE:
out.append(DLE)
i += 2
else:
out.append(b)
i += 1
return bytes(out)
# ── Checksum ─────────────────────────────────────────────────────────────────
def checksum(payload: bytes) -> int:
"""SUM8: sum of all de-stuffed payload bytes, mod 256."""
return sum(payload) & 0xFF
# ── BW→S3 frame builder ───────────────────────────────────────────────────────
# SUB byte for 5A — used by build_5a_frame below (protocol.py has the full
# constant set; defined here to avoid a circular import).
SUB_5A = 0x5A
def build_5a_frame(offset_word: int, raw_params: bytes) -> bytes:
"""
Build a BW→S3 frame for SUB 5A (BULK_WAVEFORM_STREAM) that exactly
matches Blastware's wire output.
SUB 5A uses a DIFFERENT frame format from all other read commands:
1. The offset field (bytes [4:6]) is written RAW — the 0x10 in
offset_hi=0x10 is NOT DLE-stuffed, unlike build_bw_frame().
2. The checksum uses a DLE-aware sum: for each 0x10 XX pair in the
stuffed section, only XX contributes; lone bytes contribute normally.
This differs from the standard SUM8 checksum on the unstuffed payload.
Both differences are confirmed from the 1-2-26 BW TX capture (all 10 frames
verified against this algorithm on 2026-04-02).
Args:
offset_word: 16-bit offset (0x1004 for probe/chunks, 0x005A for term).
raw_params: 10 or 11 params bytes (from bulk_waveform_params or
bulk_waveform_term_params). 0x10 bytes in params are
written RAW — NOT DLE-stuffed. Confirmed 2026-04-06 by
comparing wire bytes: BW sends bare `10 04` for chunk 1
(counter=0x1004), not stuffed `10 10 04`. Device reads
params at fixed byte positions; stuffing shifts the bytes
and corrupts the counter, causing device to ignore the frame.
Returns:
Complete frame bytes: [ACK][STX][stuffed_section][chk][ETX]
"""
if len(raw_params) not in (10, 11):
raise ValueError(f"raw_params must be 10 or 11 bytes, got {len(raw_params)}")
# Build stuffed section between STX and checksum
s = bytearray()
s += b"\x10\x10" # DLE-stuffed BW_CMD
s += b"\x00" # flags
s += bytes([SUB_5A]) # sub = 0x5A
s += b"\x00" # field3
s += bytes([(offset_word >> 8) & 0xFF, # offset_hi — raw, NOT stuffed
offset_word & 0xFF]) # offset_lo
for b in raw_params: # params — NOT DLE-stuffed (raw bytes, match BW wire format)
s.append(b)
# DLE-aware checksum: for 0x10 XX pairs count XX; for lone bytes count them
chk, i = 0, 0
while i < len(s):
if s[i] == DLE and i + 1 < len(s):
chk = (chk + s[i + 1]) & 0xFF
i += 2
else:
chk = (chk + s[i]) & 0xFF
i += 1
return bytes([ACK, STX]) + bytes(s) + bytes([chk, ETX])
def build_bw_frame(sub: int, offset: int = 0, params: bytes = bytes(10)) -> bytes:
"""
Build a BW→S3 read-command frame.
The payload is always 16 de-stuffed bytes:
[BW_CMD, 0x00, sub, 0x00, 0x00, offset] + params(10 bytes)
Confirmed from BW capture analysis: payload[3] and payload[4] are always
0x00 across all observed read commands. The two-step offset lives at
payload[5]: 0x00 for the length-probe step, DATA_LEN for the data-fetch step.
The 10 params bytes (payload[6..15]) are zero for standard reads. For
keyed reads (SUBs 0A, 0C) the 4-byte waveform key lives at params[4..7]
(= payload[10..13]). For token-based reads (SUBs 1E, 1F) a single token
byte lives at params[6] (= payload[12]). Use waveform_key_params() and
token_params() helpers to build these safely.
Wire output: [ACK] [STX] dle_stuff(payload + checksum) [ETX]
Args:
sub: SUB command byte (e.g. 0x01 = FULL_CONFIG_READ)
offset: Value placed at payload[5].
Pass 0 for the probe step; pass DATA_LENGTHS[sub] for the data step.
params: 10 bytes placed at payload[6..15]. Default: all zeros.
Returns:
Complete frame bytes ready to write to the serial port / socket.
"""
if len(params) != 10:
raise ValueError(f"params must be exactly 10 bytes, got {len(params)}")
if offset > 0xFFFF:
raise ValueError(f"offset must fit in uint16, got {offset:#06x}")
# offset is a uint16 split across bytes [4] (high) and [5] (low).
# For all standard reads (offset ≤ 0xFF), byte[4] = 0x00 — consistent with
# every captured BW frame. For large payloads (e.g. SUB 1A / E5 at 0x082A),
# byte[4] carries the high byte. 🔶 INFERRED — confirm once E5 is captured.
offset_hi = (offset >> 8) & 0xFF
offset_lo = offset & 0xFF
payload = bytes([BW_CMD, 0x00, sub, 0x00, offset_hi, offset_lo]) + params
chk = checksum(payload)
wire = bytes([ACK, STX]) + dle_stuff(payload + bytes([chk])) + bytes([ETX])
return wire
def build_bw_write_frame(
sub: int,
data: bytes,
*,
offset: int = 0,
params: bytes = bytes(10),
) -> bytes:
"""
Build a BW→S3 write-command frame.
Write frames extend the standard 16-byte read header with a variable-length
data payload. They use a different checksum formula from read frames.
**CRITICAL: Write frames use minimal DLE stuffing.**
Unlike read frames (build_bw_frame), write frames do NOT apply full DLE
stuffing to the payload. Only the BW_CMD byte (0x10) at position [0] is
doubled to 0x10 0x10 on the wire. All other bytes — flags, sub, offset,
params, data, and checksum — are written RAW with no stuffing, even if they
contain 0x10 bytes (e.g. offset_hi=0x10 for compliance chunks, or 0x10
bytes in the write data payload).
Confirmed from 3-11-26 BW TX capture (frames 102112): all 11 write frames
match the rule "double BW_CMD only; everything else raw." ✅ 2026-04-07.
Wire layout:
[41] ACK
[02] STX
[10 10] BW_CMD doubled (the ONLY DLE stuffing applied)
[00] flags
[sub] write command byte (0x680x83)
[00] always zero
[hi][lo] offset as uint16 BE (raw; NOT stuffed even if hi=0x10)
[params] 10 bytes (raw)
[data] variable-length write payload (raw; NOT stuffed)
[chk] checksum byte (raw; NOT stuffed even if 0x10)
[03] ETX
De-stuffed payload (for checksum computation):
[0] BW_CMD 0x10
[1] flags 0x00
[2] SUB write command byte
[3] 0x00 always zero
[4] offset_hi
[5] offset_lo
[6:16] params 10 bytes
[16:] data write payload
[-1] chk
**Checksum formula (confirmed 2026-03-12 from 3-11-26 BW TX capture):**
chk = (sum(b for b in payload[2:] if b != 0x10) + 0x10) % 256
where payload = destuffed content BEFORE appending chk.
This skips all 0x10 bytes in payload[2:] (sub onwards), including any
0x10 bytes in the offset, params, data, and the checksum byte itself.
The offset field [4:6] meaning per write SUB:
- SUBs 68, 69, 82 (single-chunk writes): offset = data[1] + 2, where
data[1] is an embedded length field in the write payload.
Confirmed from capture: 68→0x5A (data[1]=0x58+2), 82→0x1C
(data[1]=0x1A+2), 69→0xCA (data[1]=0xC8+2).
- SUB 71 (multi-chunk compliance): 0x1004 for full chunks, 0x002C
for the final partial chunk.
- Confirm frames (72, 73, 74, 83): offset=0, no data.
Args:
sub: Write command SUB byte.
data: Write payload (variable length; empty for confirm frames).
offset: 16-bit value placed at [4:6]. See per-SUB notes above.
params: 10 bytes placed at [6:16]. All-zero for most writes; compliance
chunk writes use chunk-specific values.
Returns:
Complete frame bytes ready to write to the transport.
"""
if len(params) != 10:
raise ValueError(f"params must be exactly 10 bytes, got {len(params)}")
if offset > 0xFFFF:
raise ValueError(f"offset must fit in uint16, got {offset:#06x}")
offset_hi = (offset >> 8) & 0xFF
offset_lo = offset & 0xFF
# Destuffed payload (used only for checksum; not sent directly)
payload_no_chk = bytes([BW_CMD, 0x00, sub, 0x00, offset_hi, offset_lo]) + params + data
# Large-frame checksum: sum payload[2:] skipping all 0x10 bytes, add 0x10.
# Applied to the destuffed representation — confirms correctly against
# all 11 write frames in the 3-11-26/170151 BW TX capture. ✅
chk = (sum(b for b in payload_no_chk[2:] if b != 0x10) + 0x10) & 0xFF
# Wire construction: only BW_CMD is doubled; everything else is raw.
# Do NOT use dle_stuff() here — that would incorrectly double 0x10 bytes
# in the offset, params, and data sections.
wire = (
bytes([ACK, STX]) # Frame prefix (not part of payload)
+ bytes([BW_CMD, BW_CMD]) # BW_CMD doubled (only DLE stuffing applied)
+ payload_no_chk[1:] # flags, sub, offset, params, data — RAW
+ bytes([chk]) # checksum — RAW
+ bytes([ETX]) # Frame terminator
)
return wire
def waveform_key_params(key4: bytes) -> bytes:
"""
Build the 10-byte params block that carries a 4-byte waveform key.
Used for SUBs 0A (WAVEFORM_HEADER) and 0C (WAVEFORM_RECORD).
The key goes at params[4..7], which maps to payload[10..13].
Confirmed from 3-31-26 capture: 0A and 0C request frames carry the
4-byte record address at payload[10..13]. Probe and data-fetch steps
carry the same key in both frames.
Args:
key4: exactly 4 bytes — the opaque waveform record address returned
by the EVENT_HEADER (1E) or EVENT_ADVANCE (1F) response.
Returns:
10-byte params block with key embedded at positions [4..7].
"""
if len(key4) != 4:
raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}")
p = bytearray(10)
p[4:8] = key4
return bytes(p)
def token_params(token: int = 0) -> bytes:
"""
Build the 10-byte params block that carries a single token byte.
Used for SUBs 1E (EVENT_HEADER) and 1F (EVENT_ADVANCE).
The token goes at params[7], which maps to payload[13].
Confirmed from BOTH 3-31-26 and 4-3-26 BW TX captures:
raw params bytes: 00 00 00 00 00 00 00 fe 00 00
token is at index 7 (not 6 — that was wrong).
- token=0x00: first-event read / browse mode (no download marking)
- token=0xfe: download mode (causes 1F to skip partial bins and
advance to the next full record)
The device echoes the token at data[8] of the S3 response (payload[13]),
distinct from the next-event key at data[11:15] (payload[16:20]).
Args:
token: single byte to place at params[7] / payload[13].
Returns:
10-byte params block with token at position [7].
"""
p = bytearray(10)
p[7] = token
return bytes(p)
def bulk_waveform_params(key4: bytes, counter: int, *, is_probe: bool = False) -> bytes:
"""
Build the 10-byte params block for SUB 5A (BULK_WAVEFORM_STREAM) requests.
Confirmed 2026-04-02 from 1-2-26 BW TX capture analysis:
Probe / first request (is_probe=True, counter=0):
params[0] = 0x00
params[1:5] = key4 (all 4 key bytes; counter overlaps key4[2:4] = 0x0000)
params[5:] = zeros
Regular chunk requests (is_probe=False):
params[0] = 0x00
params[1:3] = key4[0:2] (first 2 key bytes as session handle)
params[3:5] = counter (BE uint16) (chunk position, increments by 0x0400)
params[5:] = zeros
Termination request: DO NOT use this helper — see bulk_waveform_term_params().
Args:
key4: 4-byte waveform key from EVENT_HEADER (1E) response.
counter: Chunk position counter (uint16 BE). Pass 0 for probe.
is_probe: If True, embed full key4 (probe step only).
Returns:
11-byte params block. (BW confirmed: chunk frames carry 11 params bytes,
not 10; the extra trailing 0x00 was confirmed from 1-2-26 wire capture
on 2026-04-02.)
"""
if len(key4) != 4:
raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}")
p = bytearray(11) # 11 bytes confirmed from BW wire capture
p[0] = 0x00
p[1] = key4[0]
p[2] = key4[1]
if is_probe:
# Full key4; counter=0 is implied (overlaps with key4[2:4] which must be 0x0000)
p[3] = key4[2]
p[4] = key4[3]
else:
p[3] = (counter >> 8) & 0xFF
p[4] = counter & 0xFF
return bytes(p)
def bulk_waveform_term_params(key4: bytes, counter: int) -> bytes:
"""
Build the 10-byte params block for the SUB 5A termination request.
The termination request uses offset=0x005A and a DIFFERENT params layout —
the leading 0x00 byte is dropped, key4[0:2] shifts to params[0:2], and the
counter high byte is at params[2]:
params[0] = key4[0]
params[1] = key4[1]
params[2] = (counter >> 8) & 0xFF
params[3:] = zeros
Counter for the termination request = last_regular_counter + 0x0400.
Confirmed from 1-2-26 BW TX capture: final request (frame 83) uses
offset=0x005A, params[0:3] = key4[0:2] + term_counter_hi.
Args:
key4: 4-byte waveform key.
counter: Termination counter (= last regular counter + 0x0400).
Returns:
10-byte params block.
"""
if len(key4) != 4:
raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}")
p = bytearray(10)
p[0] = key4[0]
p[1] = key4[1]
p[2] = (counter >> 8) & 0xFF
return bytes(p)
# ── Pre-built POLL frames ─────────────────────────────────────────────────────
#
# POLL (SUB 0x5B) uses the same two-step pattern as all other reads — the
# hardcoded length 0x30 lives at payload[5], exactly as in build_bw_frame().
POLL_PROBE = build_bw_frame(0x5B, 0x00) # length-probe POLL (offset = 0)
POLL_DATA = build_bw_frame(0x5B, 0x30) # data-request POLL (offset = 0x30)
# Session-reset signal (ACK + ETX, no STX/payload).
# Confirmed from 4-8-26 BW TX captures: Blastware sends this 2-byte sequence
# immediately before the first POLL probe, and again between the POLL probe
# and the POLL data request. Required to wake a unit that is actively
# monitoring — without it the unit does not respond to POLL over TCP.
# Harmless for idle units (they respond to POLL regardless).
SESSION_RESET = bytes([0x41, 0x03])
# ── S3 response dataclass ─────────────────────────────────────────────────────
@dataclass
class S3Frame:
"""A fully parsed and de-stuffed S3→BW response frame."""
sub: int # response SUB byte (e.g. 0xA4 = POLL_RESPONSE)
page_hi: int # PAGE_HI from header (= data length on step-2 length response)
page_lo: int # PAGE_LO from header
data: bytes # payload data section (payload[5:], checksum already stripped)
checksum_valid: bool
chk_byte: int = 0 # actual checksum byte received from wire (body[-1])
# needed for N00 file reconstruction: when the last data byte
# is 0x10 and chk_byte ∈ {0x02, 0x03, 0x04}, the DLE+chk pair
# must be included in the DLE-strip operation to correctly
# reconstruct the Blastware binary body.
@property
def page_key(self) -> int:
"""Combined 16-bit page address / length: (page_hi << 8) | page_lo."""
return (self.page_hi << 8) | self.page_lo
# ── Streaming S3 frame parser ─────────────────────────────────────────────────
class S3FrameParser:
"""
Incremental byte-stream parser for S3→BW response frames.
Feed incoming bytes with feed(). Complete, valid frames are returned
immediately and also accumulated in self.frames.
State machine:
IDLE — scanning for DLE (0x10)
SEEN_DLE — saw DLE, waiting for STX (0x02) to start a frame
IN_FRAME — collecting de-stuffed payload bytes; bare ETX ends frame
IN_FRAME_DLE — inside frame, saw DLE; DLE continues stuffing;
DLE+ETX is treated as literal data (NOT a frame end),
which lets inner-frame terminators pass through intact
Wire format confirmed from captures:
[DLE=0x10] [STX=0x02] [stuffed payload+chk] [bare ETX=0x03]
The ETX is NOT preceded by a DLE on the wire. DLE+ETX sequences that
appear inside the payload are inner-frame terminators and must be
treated as literal data.
ACK (0x41) bytes and arbitrary non-DLE bytes in IDLE state are silently
discarded (covers device boot string "Operating System" and keepalive ACKs).
"""
_IDLE = 0
_SEEN_DLE = 1
_IN_FRAME = 2
_IN_FRAME_DLE = 3
def __init__(self) -> None:
self._state = self._IDLE
self._body = bytearray() # accumulates de-stuffed frame bytes
self.frames: list[S3Frame] = []
self.bytes_fed: int = 0 # cumulative raw bytes fed since last reset
def reset(self) -> None:
self._state = self._IDLE
self._body.clear()
self.bytes_fed = 0
def feed(self, data: bytes) -> list[S3Frame]:
"""
Process a chunk of incoming bytes.
Returns a list of S3Frame objects completed during this call.
All completed frames are also appended to self.frames.
"""
self.bytes_fed += len(data)
completed: list[S3Frame] = []
for b in data:
frame = self._step(b)
if frame is not None:
completed.append(frame)
self.frames.append(frame)
return completed
def _step(self, b: int) -> Optional[S3Frame]:
"""Process one byte. Returns a completed S3Frame or None."""
if self._state == self._IDLE:
if b == DLE:
self._state = self._SEEN_DLE
# ACK, boot strings, garbage — silently ignored
elif self._state == self._SEEN_DLE:
if b == STX:
self._body.clear()
self._state = self._IN_FRAME
else:
# Stray DLE not followed by STX — back to idle
self._state = self._IDLE
elif self._state == self._IN_FRAME:
if b == DLE:
self._state = self._IN_FRAME_DLE
elif b == ETX:
# Bare ETX = real frame terminator (confirmed from captures)
frame = self._finalise()
self._state = self._IDLE
return frame
else:
self._body.append(b)
elif self._state == self._IN_FRAME_DLE:
if b == DLE:
# DLE DLE → literal 0x10 in payload
self._body.append(DLE)
self._state = self._IN_FRAME
elif b == ETX:
# DLE+ETX inside a frame is an inner-frame terminator, NOT
# the outer frame end. Treat as literal data and continue.
self._body.append(DLE)
self._body.append(ETX)
self._state = self._IN_FRAME
else:
# Unexpected DLE + byte — treat both as literal data and continue
self._body.append(DLE)
self._body.append(b)
self._state = self._IN_FRAME
return None
def _finalise(self) -> Optional[S3Frame]:
"""
Called when DLE+ETX is seen. Validates checksum and builds S3Frame.
Returns None if the frame is too short or structurally invalid.
"""
body = bytes(self._body)
# Minimum valid frame: 5-byte header + at least 1 checksum byte = 6
if len(body) < 6:
return None
raw_payload = body[:-1] # everything except the trailing checksum byte
chk_received = body[-1]
chk_computed = checksum(raw_payload)
if len(raw_payload) < 5:
return None
# Validate CMD byte — we only accept S3→BW response frames here
if raw_payload[0] != S3_CMD:
return None
return S3Frame(
sub = raw_payload[2],
page_hi = raw_payload[3],
page_lo = raw_payload[4],
data = raw_payload[5:],
checksum_valid = (chk_received == chk_computed),
chk_byte = chk_received,
)