""" framing.py — DLE frame codec for the Instantel MiniMate Plus RS-232 protocol. Wire format: BW→S3 (our requests): [ACK=0x41] [STX=0x02] [stuffed payload+chk] [ETX=0x03] S3→BW (device replies): [DLE=0x10] [STX=0x02] [stuffed payload+chk] [DLE=0x10] [ETX=0x03] The ACK 0x41 byte often precedes S3 frames too — it is silently discarded by the streaming parser. De-stuffed payload layout: BW→S3 request frame: [0] CMD 0x10 (BW request marker) [1] flags 0x00 [2] SUB command sub-byte [3] 0x00 always zero in captured frames [4] 0x00 always zero in captured frames [5] OFFSET two-step offset: 0x00 = length-probe, DATA_LEN = data-request [6-15] zero padding (total de-stuffed payload = 16 bytes) S3→BW response frame: [0] CMD 0x00 (S3 response marker) [1] flags 0x10 [2] SUB response sub-byte (= 0xFF - request SUB) [3] PAGE_HI high byte of page address (always 0x00 in observed frames) [4] PAGE_LO low byte (always 0x00 in observed frames) [5+] data payload data section (composite inner frames for large responses) DLE stuffing rule: any 0x10 byte in the payload is doubled on the wire (0x10 → 0x10 0x10). This applies to the checksum byte too. Confirmed from live captures (s3_parser.py validation + raw_bw.bin / raw_s3.bin). """ from __future__ import annotations from dataclasses import dataclass from typing import Optional # ── Protocol byte constants ─────────────────────────────────────────────────── DLE = 0x10 # Data Link Escape STX = 0x02 # Start of text ETX = 0x03 # End of text ACK = 0x41 # Acknowledgement / frame-start marker (BW side) BW_CMD = 0x10 # CMD byte value in BW→S3 frames S3_CMD = 0x00 # CMD byte value in S3→BW frames S3_FLAGS = 0x10 # flags byte value in S3→BW frames # BW read-command payload size: 5 header bytes + 11 padding bytes = 16 total. # Confirmed from captured raw_bw.bin: all read-command frames carry exactly 16 # de-stuffed bytes (excluding the appended checksum). _BW_PAYLOAD_SIZE = 16 # ── DLE stuffing / de-stuffing ──────────────────────────────────────────────── def dle_stuff(data: bytes) -> bytes: """Escape literal 0x10 bytes: 0x10 → 0x10 0x10.""" out = bytearray() for b in data: if b == DLE: out.append(DLE) out.append(b) return bytes(out) def dle_unstuff(data: bytes) -> bytes: """Remove DLE stuffing: 0x10 0x10 → 0x10.""" out = bytearray() i = 0 while i < len(data): b = data[i] if b == DLE and i + 1 < len(data) and data[i + 1] == DLE: out.append(DLE) i += 2 else: out.append(b) i += 1 return bytes(out) # ── Checksum ───────────────────────────────────────────────────────────────── def checksum(payload: bytes) -> int: """SUM8: sum of all de-stuffed payload bytes, mod 256.""" return sum(payload) & 0xFF # ── BW→S3 frame builder ─────────────────────────────────────────────────────── def build_bw_frame(sub: int, offset: int = 0, params: bytes = bytes(10)) -> bytes: """ Build a BW→S3 read-command frame. The payload is always 16 de-stuffed bytes: [BW_CMD, 0x00, sub, 0x00, 0x00, offset] + params(10 bytes) Confirmed from BW capture analysis: payload[3] and payload[4] are always 0x00 across all observed read commands. The two-step offset lives at payload[5]: 0x00 for the length-probe step, DATA_LEN for the data-fetch step. The 10 params bytes (payload[6..15]) are zero for standard reads. For keyed reads (SUBs 0A, 0C) the 4-byte waveform key lives at params[4..7] (= payload[10..13]). For token-based reads (SUBs 1E, 1F) a single token byte lives at params[6] (= payload[12]). Use waveform_key_params() and token_params() helpers to build these safely. Wire output: [ACK] [STX] dle_stuff(payload + checksum) [ETX] Args: sub: SUB command byte (e.g. 0x01 = FULL_CONFIG_READ) offset: Value placed at payload[5]. Pass 0 for the probe step; pass DATA_LENGTHS[sub] for the data step. params: 10 bytes placed at payload[6..15]. Default: all zeros. Returns: Complete frame bytes ready to write to the serial port / socket. """ if len(params) != 10: raise ValueError(f"params must be exactly 10 bytes, got {len(params)}") payload = bytes([BW_CMD, 0x00, sub, 0x00, 0x00, offset]) + params chk = checksum(payload) wire = bytes([ACK, STX]) + dle_stuff(payload + bytes([chk])) + bytes([ETX]) return wire def waveform_key_params(key4: bytes) -> bytes: """ Build the 10-byte params block that carries a 4-byte waveform key. Used for SUBs 0A (WAVEFORM_HEADER) and 0C (WAVEFORM_RECORD). The key goes at params[4..7], which maps to payload[10..13]. Confirmed from 3-31-26 capture: 0A and 0C request frames carry the 4-byte record address at payload[10..13]. Probe and data-fetch steps carry the same key in both frames. Args: key4: exactly 4 bytes — the opaque waveform record address returned by the EVENT_HEADER (1E) or EVENT_ADVANCE (1F) response. Returns: 10-byte params block with key embedded at positions [4..7]. """ if len(key4) != 4: raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}") p = bytearray(10) p[4:8] = key4 return bytes(p) def token_params(token: int = 0) -> bytes: """ Build the 10-byte params block that carries a single token byte. Used for SUBs 1E (EVENT_HEADER) and 1F (EVENT_ADVANCE). The token goes at params[6], which maps to payload[12]. Confirmed from 3-31-26 capture: - token=0x00: first-event read / browse mode (no download marking) - token=0xfe: download mode (causes 1F to skip partial bins and advance to the next full record) Args: token: single byte to place at params[6] / payload[12]. Returns: 10-byte params block with token at position [6]. """ p = bytearray(10) p[6] = token return bytes(p) # ── Pre-built POLL frames ───────────────────────────────────────────────────── # # POLL (SUB 0x5B) uses the same two-step pattern as all other reads — the # hardcoded length 0x30 lives at payload[5], exactly as in build_bw_frame(). POLL_PROBE = build_bw_frame(0x5B, 0x00) # length-probe POLL (offset = 0) POLL_DATA = build_bw_frame(0x5B, 0x30) # data-request POLL (offset = 0x30) # ── S3 response dataclass ───────────────────────────────────────────────────── @dataclass class S3Frame: """A fully parsed and de-stuffed S3→BW response frame.""" sub: int # response SUB byte (e.g. 0xA4 = POLL_RESPONSE) page_hi: int # PAGE_HI from header (= data length on step-2 length response) page_lo: int # PAGE_LO from header data: bytes # payload data section (payload[5:], checksum already stripped) checksum_valid: bool @property def page_key(self) -> int: """Combined 16-bit page address / length: (page_hi << 8) | page_lo.""" return (self.page_hi << 8) | self.page_lo # ── Streaming S3 frame parser ───────────────────────────────────────────────── class S3FrameParser: """ Incremental byte-stream parser for S3→BW response frames. Feed incoming bytes with feed(). Complete, valid frames are returned immediately and also accumulated in self.frames. State machine: IDLE — scanning for DLE (0x10) SEEN_DLE — saw DLE, waiting for STX (0x02) to start a frame IN_FRAME — collecting de-stuffed payload bytes; bare ETX ends frame IN_FRAME_DLE — inside frame, saw DLE; DLE continues stuffing; DLE+ETX is treated as literal data (NOT a frame end), which lets inner-frame terminators pass through intact Wire format confirmed from captures: [DLE=0x10] [STX=0x02] [stuffed payload+chk] [bare ETX=0x03] The ETX is NOT preceded by a DLE on the wire. DLE+ETX sequences that appear inside the payload are inner-frame terminators and must be treated as literal data. ACK (0x41) bytes and arbitrary non-DLE bytes in IDLE state are silently discarded (covers device boot string "Operating System" and keepalive ACKs). """ _IDLE = 0 _SEEN_DLE = 1 _IN_FRAME = 2 _IN_FRAME_DLE = 3 def __init__(self) -> None: self._state = self._IDLE self._body = bytearray() # accumulates de-stuffed frame bytes self.frames: list[S3Frame] = [] def reset(self) -> None: self._state = self._IDLE self._body.clear() def feed(self, data: bytes) -> list[S3Frame]: """ Process a chunk of incoming bytes. Returns a list of S3Frame objects completed during this call. All completed frames are also appended to self.frames. """ completed: list[S3Frame] = [] for b in data: frame = self._step(b) if frame is not None: completed.append(frame) self.frames.append(frame) return completed def _step(self, b: int) -> Optional[S3Frame]: """Process one byte. Returns a completed S3Frame or None.""" if self._state == self._IDLE: if b == DLE: self._state = self._SEEN_DLE # ACK, boot strings, garbage — silently ignored elif self._state == self._SEEN_DLE: if b == STX: self._body.clear() self._state = self._IN_FRAME else: # Stray DLE not followed by STX — back to idle self._state = self._IDLE elif self._state == self._IN_FRAME: if b == DLE: self._state = self._IN_FRAME_DLE elif b == ETX: # Bare ETX = real frame terminator (confirmed from captures) frame = self._finalise() self._state = self._IDLE return frame else: self._body.append(b) elif self._state == self._IN_FRAME_DLE: if b == DLE: # DLE DLE → literal 0x10 in payload self._body.append(DLE) self._state = self._IN_FRAME elif b == ETX: # DLE+ETX inside a frame is an inner-frame terminator, NOT # the outer frame end. Treat as literal data and continue. self._body.append(DLE) self._body.append(ETX) self._state = self._IN_FRAME else: # Unexpected DLE + byte — treat both as literal data and continue self._body.append(DLE) self._body.append(b) self._state = self._IN_FRAME return None def _finalise(self) -> Optional[S3Frame]: """ Called when DLE+ETX is seen. Validates checksum and builds S3Frame. Returns None if the frame is too short or structurally invalid. """ body = bytes(self._body) # Minimum valid frame: 5-byte header + at least 1 checksum byte = 6 if len(body) < 6: return None raw_payload = body[:-1] # everything except the trailing checksum byte chk_received = body[-1] chk_computed = checksum(raw_payload) if len(raw_payload) < 5: return None # Validate CMD byte — we only accept S3→BW response frames here if raw_payload[0] != S3_CMD: return None return S3Frame( sub = raw_payload[2], page_hi = raw_payload[3], page_lo = raw_payload[4], data = raw_payload[5:], checksum_valid = (chk_received == chk_computed), )