""" framing.py — DLE frame codec for the Instantel MiniMate Plus RS-232 protocol. Wire format: BW→S3 (our requests): [ACK=0x41] [STX=0x02] [stuffed payload+chk] [ETX=0x03] S3→BW (device replies): [DLE=0x10] [STX=0x02] [stuffed payload+chk] [DLE=0x10] [ETX=0x03] The ACK 0x41 byte often precedes S3 frames too — it is silently discarded by the streaming parser. De-stuffed payload layout: BW→S3 request frame: [0] CMD 0x10 (BW request marker) [1] flags 0x00 [2] SUB command sub-byte [3] 0x00 always zero in captured frames [4] 0x00 always zero in captured frames [5] OFFSET two-step offset: 0x00 = length-probe, DATA_LEN = data-request [6-15] zero padding (total de-stuffed payload = 16 bytes) S3→BW response frame: [0] CMD 0x00 (S3 response marker) [1] flags 0x10 [2] SUB response sub-byte (= 0xFF - request SUB) [3] PAGE_HI high byte of page address (always 0x00 in observed frames) [4] PAGE_LO low byte (always 0x00 in observed frames) [5+] data payload data section (composite inner frames for large responses) DLE stuffing rule: any 0x10 byte in the payload is doubled on the wire (0x10 → 0x10 0x10). This applies to the checksum byte too. Confirmed from live captures (s3_parser.py validation + raw_bw.bin / raw_s3.bin). """ from __future__ import annotations from dataclasses import dataclass from typing import Optional # ── Protocol byte constants ─────────────────────────────────────────────────── DLE = 0x10 # Data Link Escape STX = 0x02 # Start of text ETX = 0x03 # End of text ACK = 0x41 # Acknowledgement / frame-start marker (BW side) BW_CMD = 0x10 # CMD byte value in BW→S3 frames S3_CMD = 0x00 # CMD byte value in S3→BW frames S3_FLAGS = 0x10 # flags byte value in S3→BW frames # BW read-command payload size: 5 header bytes + 11 padding bytes = 16 total. # Confirmed from captured raw_bw.bin: all read-command frames carry exactly 16 # de-stuffed bytes (excluding the appended checksum). _BW_PAYLOAD_SIZE = 16 # ── DLE stuffing / de-stuffing ──────────────────────────────────────────────── def dle_stuff(data: bytes) -> bytes: """Escape literal 0x10 bytes: 0x10 → 0x10 0x10.""" out = bytearray() for b in data: if b == DLE: out.append(DLE) out.append(b) return bytes(out) def dle_unstuff(data: bytes) -> bytes: """Remove DLE stuffing: 0x10 0x10 → 0x10.""" out = bytearray() i = 0 while i < len(data): b = data[i] if b == DLE and i + 1 < len(data) and data[i + 1] == DLE: out.append(DLE) i += 2 else: out.append(b) i += 1 return bytes(out) # ── Checksum ───────────────────────────────────────────────────────────────── def checksum(payload: bytes) -> int: """SUM8: sum of all de-stuffed payload bytes, mod 256.""" return sum(payload) & 0xFF # ── BW→S3 frame builder ─────────────────────────────────────────────────────── def build_bw_frame(sub: int, offset: int = 0) -> bytes: """ Build a BW→S3 read-command frame. The payload is always 16 de-stuffed bytes: [BW_CMD, 0x00, sub, 0x00, 0x00, offset, 0x00 × 10] Confirmed from BW capture analysis: payload[3] and payload[4] are always 0x00 across all observed read commands. The two-step offset lives at payload[5]: 0x00 for the length-probe step, DATA_LEN for the data-fetch step. Wire output: [ACK] [STX] dle_stuff(payload + checksum) [ETX] Args: sub: SUB command byte (e.g. 0x01 = FULL_CONFIG_READ) offset: Value placed at payload[5]. Pass 0 for the probe step; pass DATA_LENGTHS[sub] for the data step. Returns: Complete frame bytes ready to write to the serial port / socket. """ payload = bytes([BW_CMD, 0x00, sub, 0x00, 0x00, offset]) + bytes(_BW_PAYLOAD_SIZE - 6) chk = checksum(payload) wire = bytes([ACK, STX]) + dle_stuff(payload + bytes([chk])) + bytes([ETX]) return wire # ── Pre-built POLL frames ───────────────────────────────────────────────────── # # POLL (SUB 0x5B) uses the same two-step pattern as all other reads — the # hardcoded length 0x30 lives at payload[5], exactly as in build_bw_frame(). POLL_PROBE = build_bw_frame(0x5B, 0x00) # length-probe POLL (offset = 0) POLL_DATA = build_bw_frame(0x5B, 0x30) # data-request POLL (offset = 0x30) # ── S3 response dataclass ───────────────────────────────────────────────────── @dataclass class S3Frame: """A fully parsed and de-stuffed S3→BW response frame.""" sub: int # response SUB byte (e.g. 0xA4 = POLL_RESPONSE) page_hi: int # PAGE_HI from header (= data length on step-2 length response) page_lo: int # PAGE_LO from header data: bytes # payload data section (payload[5:], checksum already stripped) checksum_valid: bool @property def page_key(self) -> int: """Combined 16-bit page address / length: (page_hi << 8) | page_lo.""" return (self.page_hi << 8) | self.page_lo # ── Streaming S3 frame parser ───────────────────────────────────────────────── class S3FrameParser: """ Incremental byte-stream parser for S3→BW response frames. Feed incoming bytes with feed(). Complete, valid frames are returned immediately and also accumulated in self.frames. State machine: IDLE — scanning for DLE (0x10) SEEN_DLE — saw DLE, waiting for STX (0x02) to start a frame IN_FRAME — collecting de-stuffed payload bytes IN_FRAME_DLE — inside frame, saw DLE; ETX ends frame, DLE continues stuffing ACK (0x41) bytes and arbitrary non-DLE bytes in IDLE state are silently discarded (covers device boot string "Operating System" and keepalive ACKs). """ _IDLE = 0 _SEEN_DLE = 1 _IN_FRAME = 2 _IN_FRAME_DLE = 3 def __init__(self) -> None: self._state = self._IDLE self._body = bytearray() # accumulates de-stuffed frame bytes self.frames: list[S3Frame] = [] def reset(self) -> None: self._state = self._IDLE self._body.clear() def feed(self, data: bytes) -> list[S3Frame]: """ Process a chunk of incoming bytes. Returns a list of S3Frame objects completed during this call. All completed frames are also appended to self.frames. """ completed: list[S3Frame] = [] for b in data: frame = self._step(b) if frame is not None: completed.append(frame) self.frames.append(frame) return completed def _step(self, b: int) -> Optional[S3Frame]: """Process one byte. Returns a completed S3Frame or None.""" if self._state == self._IDLE: if b == DLE: self._state = self._SEEN_DLE # ACK, boot strings, garbage — silently ignored elif self._state == self._SEEN_DLE: if b == STX: self._body.clear() self._state = self._IN_FRAME else: # Stray DLE not followed by STX — back to idle self._state = self._IDLE elif self._state == self._IN_FRAME: if b == DLE: self._state = self._IN_FRAME_DLE else: self._body.append(b) elif self._state == self._IN_FRAME_DLE: if b == DLE: # DLE DLE → literal 0x10 in payload self._body.append(DLE) self._state = self._IN_FRAME elif b == ETX: # End of frame frame = self._finalise() self._state = self._IDLE return frame else: # Unexpected DLE + byte — treat both as literal data and continue self._body.append(DLE) self._body.append(b) self._state = self._IN_FRAME return None def _finalise(self) -> Optional[S3Frame]: """ Called when DLE+ETX is seen. Validates checksum and builds S3Frame. Returns None if the frame is too short or structurally invalid. """ body = bytes(self._body) # Minimum valid frame: 5-byte header + at least 1 checksum byte = 6 if len(body) < 6: return None raw_payload = body[:-1] # everything except the trailing checksum byte chk_received = body[-1] chk_computed = checksum(raw_payload) if len(raw_payload) < 5: return None # Validate CMD byte — we only accept S3→BW response frames here if raw_payload[0] != S3_CMD: return None return S3Frame( sub = raw_payload[2], page_hi = raw_payload[3], page_lo = raw_payload[4], data = raw_payload[5:], checksum_valid = (chk_received == chk_computed), )