263 lines
9.7 KiB
Python
263 lines
9.7 KiB
Python
"""
|
||
framing.py — DLE frame codec for the Instantel MiniMate Plus RS-232 protocol.
|
||
|
||
Wire format:
|
||
BW→S3 (our requests): [ACK=0x41] [STX=0x02] [stuffed payload+chk] [ETX=0x03]
|
||
S3→BW (device replies): [DLE=0x10] [STX=0x02] [stuffed payload+chk] [DLE=0x10] [ETX=0x03]
|
||
|
||
The ACK 0x41 byte often precedes S3 frames too — it is silently discarded
|
||
by the streaming parser.
|
||
|
||
De-stuffed payload layout:
|
||
BW→S3 request frame:
|
||
[0] CMD 0x10 (BW request marker)
|
||
[1] flags 0x00
|
||
[2] SUB command sub-byte
|
||
[3] 0x00 always zero in captured frames
|
||
[4] 0x00 always zero in captured frames
|
||
[5] OFFSET two-step offset: 0x00 = length-probe, DATA_LEN = data-request
|
||
[6-15] zero padding (total de-stuffed payload = 16 bytes)
|
||
|
||
S3→BW response frame:
|
||
[0] CMD 0x00 (S3 response marker)
|
||
[1] flags 0x10
|
||
[2] SUB response sub-byte (= 0xFF - request SUB)
|
||
[3] PAGE_HI high byte of page address (always 0x00 in observed frames)
|
||
[4] PAGE_LO low byte (always 0x00 in observed frames)
|
||
[5+] data payload data section (composite inner frames for large responses)
|
||
|
||
DLE stuffing rule: any 0x10 byte in the payload is doubled on the wire (0x10 → 0x10 0x10).
|
||
This applies to the checksum byte too.
|
||
|
||
Confirmed from live captures (s3_parser.py validation + raw_bw.bin / raw_s3.bin).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass
|
||
from typing import Optional
|
||
|
||
# ── Protocol byte constants ───────────────────────────────────────────────────
|
||
|
||
DLE = 0x10 # Data Link Escape
|
||
STX = 0x02 # Start of text
|
||
ETX = 0x03 # End of text
|
||
ACK = 0x41 # Acknowledgement / frame-start marker (BW side)
|
||
|
||
BW_CMD = 0x10 # CMD byte value in BW→S3 frames
|
||
S3_CMD = 0x00 # CMD byte value in S3→BW frames
|
||
S3_FLAGS = 0x10 # flags byte value in S3→BW frames
|
||
|
||
# BW read-command payload size: 5 header bytes + 11 padding bytes = 16 total.
|
||
# Confirmed from captured raw_bw.bin: all read-command frames carry exactly 16
|
||
# de-stuffed bytes (excluding the appended checksum).
|
||
_BW_PAYLOAD_SIZE = 16
|
||
|
||
|
||
# ── DLE stuffing / de-stuffing ────────────────────────────────────────────────
|
||
|
||
def dle_stuff(data: bytes) -> bytes:
|
||
"""Escape literal 0x10 bytes: 0x10 → 0x10 0x10."""
|
||
out = bytearray()
|
||
for b in data:
|
||
if b == DLE:
|
||
out.append(DLE)
|
||
out.append(b)
|
||
return bytes(out)
|
||
|
||
|
||
def dle_unstuff(data: bytes) -> bytes:
|
||
"""Remove DLE stuffing: 0x10 0x10 → 0x10."""
|
||
out = bytearray()
|
||
i = 0
|
||
while i < len(data):
|
||
b = data[i]
|
||
if b == DLE and i + 1 < len(data) and data[i + 1] == DLE:
|
||
out.append(DLE)
|
||
i += 2
|
||
else:
|
||
out.append(b)
|
||
i += 1
|
||
return bytes(out)
|
||
|
||
|
||
# ── Checksum ─────────────────────────────────────────────────────────────────
|
||
|
||
def checksum(payload: bytes) -> int:
|
||
"""SUM8: sum of all de-stuffed payload bytes, mod 256."""
|
||
return sum(payload) & 0xFF
|
||
|
||
|
||
# ── BW→S3 frame builder ───────────────────────────────────────────────────────
|
||
|
||
def build_bw_frame(sub: int, offset: int = 0) -> bytes:
|
||
"""
|
||
Build a BW→S3 read-command frame.
|
||
|
||
The payload is always 16 de-stuffed bytes:
|
||
[BW_CMD, 0x00, sub, 0x00, 0x00, offset, 0x00 × 10]
|
||
|
||
Confirmed from BW capture analysis: payload[3] and payload[4] are always
|
||
0x00 across all observed read commands. The two-step offset lives at
|
||
payload[5]: 0x00 for the length-probe step, DATA_LEN for the data-fetch step.
|
||
|
||
Wire output: [ACK] [STX] dle_stuff(payload + checksum) [ETX]
|
||
|
||
Args:
|
||
sub: SUB command byte (e.g. 0x01 = FULL_CONFIG_READ)
|
||
offset: Value placed at payload[5].
|
||
Pass 0 for the probe step; pass DATA_LENGTHS[sub] for the data step.
|
||
|
||
Returns:
|
||
Complete frame bytes ready to write to the serial port / socket.
|
||
"""
|
||
payload = bytes([BW_CMD, 0x00, sub, 0x00, 0x00, offset]) + bytes(_BW_PAYLOAD_SIZE - 6)
|
||
chk = checksum(payload)
|
||
wire = bytes([ACK, STX]) + dle_stuff(payload + bytes([chk])) + bytes([ETX])
|
||
return wire
|
||
|
||
|
||
# ── Pre-built POLL frames ─────────────────────────────────────────────────────
|
||
#
|
||
# POLL (SUB 0x5B) uses the same two-step pattern as all other reads — the
|
||
# hardcoded length 0x30 lives at payload[5], exactly as in build_bw_frame().
|
||
|
||
POLL_PROBE = build_bw_frame(0x5B, 0x00) # length-probe POLL (offset = 0)
|
||
POLL_DATA = build_bw_frame(0x5B, 0x30) # data-request POLL (offset = 0x30)
|
||
|
||
|
||
# ── S3 response dataclass ─────────────────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class S3Frame:
|
||
"""A fully parsed and de-stuffed S3→BW response frame."""
|
||
sub: int # response SUB byte (e.g. 0xA4 = POLL_RESPONSE)
|
||
page_hi: int # PAGE_HI from header (= data length on step-2 length response)
|
||
page_lo: int # PAGE_LO from header
|
||
data: bytes # payload data section (payload[5:], checksum already stripped)
|
||
checksum_valid: bool
|
||
|
||
@property
|
||
def page_key(self) -> int:
|
||
"""Combined 16-bit page address / length: (page_hi << 8) | page_lo."""
|
||
return (self.page_hi << 8) | self.page_lo
|
||
|
||
|
||
# ── Streaming S3 frame parser ─────────────────────────────────────────────────
|
||
|
||
class S3FrameParser:
|
||
"""
|
||
Incremental byte-stream parser for S3→BW response frames.
|
||
|
||
Feed incoming bytes with feed(). Complete, valid frames are returned
|
||
immediately and also accumulated in self.frames.
|
||
|
||
State machine:
|
||
IDLE — scanning for DLE (0x10)
|
||
SEEN_DLE — saw DLE, waiting for STX (0x02) to start a frame
|
||
IN_FRAME — collecting de-stuffed payload bytes
|
||
IN_FRAME_DLE — inside frame, saw DLE; ETX ends frame, DLE continues stuffing
|
||
|
||
ACK (0x41) bytes and arbitrary non-DLE bytes in IDLE state are silently
|
||
discarded (covers device boot string "Operating System" and keepalive ACKs).
|
||
"""
|
||
|
||
_IDLE = 0
|
||
_SEEN_DLE = 1
|
||
_IN_FRAME = 2
|
||
_IN_FRAME_DLE = 3
|
||
|
||
def __init__(self) -> None:
|
||
self._state = self._IDLE
|
||
self._body = bytearray() # accumulates de-stuffed frame bytes
|
||
self.frames: list[S3Frame] = []
|
||
|
||
def reset(self) -> None:
|
||
self._state = self._IDLE
|
||
self._body.clear()
|
||
|
||
def feed(self, data: bytes) -> list[S3Frame]:
|
||
"""
|
||
Process a chunk of incoming bytes.
|
||
|
||
Returns a list of S3Frame objects completed during this call.
|
||
All completed frames are also appended to self.frames.
|
||
"""
|
||
completed: list[S3Frame] = []
|
||
for b in data:
|
||
frame = self._step(b)
|
||
if frame is not None:
|
||
completed.append(frame)
|
||
self.frames.append(frame)
|
||
return completed
|
||
|
||
def _step(self, b: int) -> Optional[S3Frame]:
|
||
"""Process one byte. Returns a completed S3Frame or None."""
|
||
|
||
if self._state == self._IDLE:
|
||
if b == DLE:
|
||
self._state = self._SEEN_DLE
|
||
# ACK, boot strings, garbage — silently ignored
|
||
|
||
elif self._state == self._SEEN_DLE:
|
||
if b == STX:
|
||
self._body.clear()
|
||
self._state = self._IN_FRAME
|
||
else:
|
||
# Stray DLE not followed by STX — back to idle
|
||
self._state = self._IDLE
|
||
|
||
elif self._state == self._IN_FRAME:
|
||
if b == DLE:
|
||
self._state = self._IN_FRAME_DLE
|
||
else:
|
||
self._body.append(b)
|
||
|
||
elif self._state == self._IN_FRAME_DLE:
|
||
if b == DLE:
|
||
# DLE DLE → literal 0x10 in payload
|
||
self._body.append(DLE)
|
||
self._state = self._IN_FRAME
|
||
elif b == ETX:
|
||
# End of frame
|
||
frame = self._finalise()
|
||
self._state = self._IDLE
|
||
return frame
|
||
else:
|
||
# Unexpected DLE + byte — treat both as literal data and continue
|
||
self._body.append(DLE)
|
||
self._body.append(b)
|
||
self._state = self._IN_FRAME
|
||
|
||
return None
|
||
|
||
def _finalise(self) -> Optional[S3Frame]:
|
||
"""
|
||
Called when DLE+ETX is seen. Validates checksum and builds S3Frame.
|
||
Returns None if the frame is too short or structurally invalid.
|
||
"""
|
||
body = bytes(self._body)
|
||
|
||
# Minimum valid frame: 5-byte header + at least 1 checksum byte = 6
|
||
if len(body) < 6:
|
||
return None
|
||
|
||
raw_payload = body[:-1] # everything except the trailing checksum byte
|
||
chk_received = body[-1]
|
||
chk_computed = checksum(raw_payload)
|
||
|
||
if len(raw_payload) < 5:
|
||
return None
|
||
|
||
# Validate CMD byte — we only accept S3→BW response frames here
|
||
if raw_payload[0] != S3_CMD:
|
||
return None
|
||
|
||
return S3Frame(
|
||
sub = raw_payload[2],
|
||
page_hi = raw_payload[3],
|
||
page_lo = raw_payload[4],
|
||
data = raw_payload[5:],
|
||
checksum_valid = (chk_received == chk_computed),
|
||
)
|