Files
seismo-relay/minimateplus/protocol.py
T

1474 lines
64 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
protocol.py — High-level MiniMate Plus request/response protocol.
Implements the request/response patterns documented in
docs/instantel_protocol_reference.md on top of:
- minimateplus.framing — DLE codec, frame builder, S3 streaming parser
- minimateplus.transport — byte I/O (SerialTransport / future TcpTransport)
This module knows nothing about pyserial or TCP — it only calls
transport.write() and transport.read_until_idle().
Key patterns implemented:
- POLL startup handshake (two-step, special payload[5] format)
- Generic two-step paged read (probe → get length → fetch data)
- Response timeout + checksum validation
- Boot-string drain (device sends "Operating System" ASCII before framing)
All public methods raise ProtocolError on timeout, bad checksum, or
unexpected response SUB.
"""
from __future__ import annotations
import logging
import time
from typing import Optional
from .framing import (
S3Frame,
S3FrameParser,
build_bw_frame,
build_5a_frame,
build_bw_write_frame,
waveform_key_params,
token_params,
bulk_waveform_params,
bulk_waveform_term_params,
POLL_PROBE,
POLL_DATA,
SESSION_RESET,
)
from .transport import BaseTransport
log = logging.getLogger(__name__)
# ── Constants ─────────────────────────────────────────────────────────────────
# Response SUB = 0xFF - Request SUB (confirmed pattern, no known exceptions
# among read commands; one write-path exception documented for SUB 1C→6E).
def _expected_rsp_sub(req_sub: int) -> int:
return (0xFF - req_sub) & 0xFF
# SUB byte constants (request side) — see protocol reference §5.1
SUB_POLL = 0x5B
SUB_SERIAL_NUMBER = 0x15
SUB_FULL_CONFIG = 0x01
SUB_EVENT_INDEX = 0x08
SUB_CHANNEL_CONFIG = 0x06 # Event storage range read (first/last key) ✅
SUB_MONITOR_STATUS = 0x1C # Monitoring status read (battery, memory, mode) ✅
SUB_EVENT_HEADER = 0x1E
SUB_EVENT_ADVANCE = 0x1F
SUB_WAVEFORM_HEADER = 0x0A
SUB_WAVEFORM_RECORD = 0x0C
SUB_BULK_WAVEFORM = 0x5A
SUB_COMPLIANCE = 0x1A
SUB_CALL_HOME = 0x2C # Call home config read → response 0xD3 ✅
SUB_UNKNOWN_2E = 0x2E
# Write command SUBs (= Read SUB + 0x60, confirmed from BW captures 3-11-26)
# Response SUB follows the standard 0xFF - Request SUB rule.
SUB_EVENT_INDEX_WRITE = 0x68 # Write event index (0x08 + 0x60) ✅
SUB_WAVEFORM_DATA_WRITE = 0x69 # Write waveform data (0x09 + 0x60) ✅
SUB_COMPLIANCE_WRITE = 0x71 # Write compliance cfg (0x11 + 0x60) ✅
SUB_WRITE_CONFIRM_A = 0x72 # Confirm A — sent after 71×3 and other writes ✅
SUB_WRITE_CONFIRM_B = 0x73 # Confirm B — sent after 68 ✅
SUB_WRITE_CONFIRM_C = 0x74 # Confirm C — sent after 69 ✅
SUB_TRIGGER_CONFIG_WRITE = 0x82 # Write trigger config (0x22 + 0x60) ✅
SUB_TRIGGER_CONFIRM = 0x83 # Confirm trigger write ✅
# Call home write SUBs (confirmed from 4-20-26 call home settings captures)
SUB_CALL_HOME_WRITE = 0x7E # Write call home config → response 0x81 ✅
SUB_CALL_HOME_CONFIRM = 0x7F # Confirm call home write → response 0x80 ✅
# Monitoring control SUBs (confirmed from 4-8-26/2ndtry BW TX capture)
SUB_START_MONITORING = 0x96 # Start monitoring → response 0x69 ✅
SUB_STOP_MONITORING = 0x97 # Stop monitoring → response 0x68 ✅
# Erase-all SUBs (confirmed from 4-11-26 MITM capture)
# Both use token=0xFE at params[7] and return minimal 11-byte acks.
# Standard response formula applies: 0xFF - SUB.
SUB_ERASE_ALL_BEGIN = 0xA3 # Begin erase all events → response 0x5C ✅
SUB_ERASE_ALL_CONFIRM = 0xA2 # Confirm erase all events → response 0x5D ✅
# Hardcoded data lengths for the two-step read protocol.
#
# The S3 probe response page_key is always 0x0000 — it does NOT carry the
# data length back to us. Instead, each SUB has a fixed known payload size
# confirmed from BW capture analysis (offset at payload[5] of the data-request
# frame).
#
# Key: request SUB byte. Value: offset/length byte sent in the data-request.
# Entries marked 🔶 are inferred from captured frames and may need adjustment.
DATA_LENGTHS: dict[int, int] = {
SUB_POLL: 0x30, # POLL startup data block ✅
SUB_SERIAL_NUMBER: 0x0A, # 10-byte serial number block ✅
SUB_FULL_CONFIG: 0x98, # 152-byte full config block ✅
SUB_EVENT_INDEX: 0x58, # 88-byte event index ✅
SUB_CHANNEL_CONFIG: 0x24, # 36-byte event storage range (first/last key) ✅
SUB_MONITOR_STATUS: 0x2C, # 44-byte monitor status block (idle) ✅
SUB_EVENT_HEADER: 0x08, # 8-byte event header (waveform key + event data) ✅
SUB_EVENT_ADVANCE: 0x08, # 8-byte next-key response ✅
# SUB_WAVEFORM_HEADER (0x0A) is VARIABLE — length read from probe response
# data[4]. Do NOT add it here; use read_waveform_header() instead. ✅
SUB_WAVEFORM_RECORD: 0xD2, # 210-byte waveform/histogram record ✅
SUB_CALL_HOME: 0x7C, # 124-byte call home config ✅ (confirmed 4-20-26)
SUB_UNKNOWN_2E: 0x1A, # 26 bytes, purpose TBD 🔶
0x09: 0xCA, # 202 bytes, purpose TBD 🔶
# SUB_COMPLIANCE (0x1A) uses a multi-step sequence with a 2090-byte total;
# NOT handled here — requires specialised read logic.
}
# SUB 5A (BULK_WAVEFORM_STREAM) protocol constants.
# Confirmed from 1-2-26 BW TX capture analysis (2026-04-02).
_BULK_CHUNK_OFFSET = 0x1004 # offset field for probe + all regular chunk requests ✅
_BULK_TERM_OFFSET = 0x005A # offset field for termination request ✅
_BULK_COUNTER_STEP = 0x0400 # chunk counter increment per chunk ✅
# Chunk counter formula: chunk_num * 0x0400 for ALL chunks including chunk 1.
# Earlier captures showed 0x1004 for chunk 1 — that was a Blastware artifact, not a
# protocol requirement. Confirmed 2026-04-06: 0x0400 for chunk 1 works; 0x1004
# causes a 120-second device timeout. Formula n * 0x0400 is used for all chunks.
# Default timeout values (seconds).
# MiniMate Plus is a slow device — keep these generous.
DEFAULT_RECV_TIMEOUT = 10.0
POLL_RECV_TIMEOUT = 10.0
# ── Exception ─────────────────────────────────────────────────────────────────
class ProtocolError(Exception):
"""Raised when the device violates the expected protocol."""
class TimeoutError(ProtocolError):
"""Raised when no response is received within the allowed time."""
class ChecksumError(ProtocolError):
"""Raised when a received frame has a bad checksum."""
class UnexpectedResponse(ProtocolError):
"""Raised when the response SUB doesn't match what we requested."""
# ── MiniMateProtocol ──────────────────────────────────────────────────────────
class MiniMateProtocol:
"""
Protocol state machine for one open connection to a MiniMate Plus device.
Does not own the transport — transport lifetime is managed by MiniMateClient.
Typical usage (via MiniMateClient — not directly):
proto = MiniMateProtocol(transport)
proto.startup() # POLL handshake, drain boot string
data = proto.read(SUB_FULL_CONFIG)
sn_data = proto.read(SUB_SERIAL_NUMBER)
"""
def __init__(
self,
transport: BaseTransport,
recv_timeout: float = DEFAULT_RECV_TIMEOUT,
) -> None:
self._transport = transport
self._recv_timeout = recv_timeout
self._parser = S3FrameParser()
# Extra frames buffered by _recv_one that arrived alongside the target frame.
# Used when reset_parser=False so we don't discard already-parsed frames.
self._pending_frames: list[S3Frame] = []
# ── Public API ────────────────────────────────────────────────────────────
def startup(self) -> S3Frame:
"""
Perform the POLL startup handshake and return the POLL data frame.
Steps (matching §6 Session Startup Sequence):
1. Drain any boot-string bytes ("Operating System" ASCII)
2. Send POLL_PROBE (SUB 5B, offset=0x00)
3. Receive probe ack (page_key is 0x0000; data length 0x30 is hardcoded)
4. Send POLL_DATA (SUB 5B, offset=0x30)
5. Receive data frame with "Instantel" + "MiniMate Plus" strings
Returns:
The data-phase POLL response S3Frame.
Raises:
ProtocolError: if either POLL step fails.
"""
log.debug("startup: draining boot string")
self._drain_boot_string()
# Send session-reset signal (ACK+ETX) before the first POLL probe.
# Confirmed from 4-8-26 BW TX captures: Blastware always sends this
# 2-byte signal at session start. Required to wake units that are
# actively monitoring — without it they don't respond to POLL over TCP.
log.debug("startup: session reset signal")
self._send(SESSION_RESET)
log.debug("startup: POLL probe")
self._send(POLL_PROBE)
probe_rsp = self._recv_one(
expected_sub=_expected_rsp_sub(SUB_POLL),
timeout=self._recv_timeout,
)
log.debug(
"startup: POLL probe response page_key=0x%04X", probe_rsp.page_key
)
# Send another session-reset between probe and data (matches BW behavior).
log.debug("startup: session reset signal (inter-frame)")
self._send(SESSION_RESET)
log.debug("startup: POLL data request")
self._send(POLL_DATA)
data_rsp = self._recv_one(
expected_sub=_expected_rsp_sub(SUB_POLL),
timeout=self._recv_timeout,
)
log.debug("startup: POLL data received, %d bytes", len(data_rsp.data))
return data_rsp
def read(self, sub: int) -> bytes:
"""
Execute a two-step paged read and return the data payload bytes.
Step 1: send probe frame (offset=0x00) → device sends a short ack
Step 2: send data-request (offset=DATA_LEN) → device sends the data block
The S3 probe response does NOT carry the data length — page_key is always
0x0000 in observed frames. DATA_LENGTHS holds the known fixed lengths
derived from BW capture analysis.
Args:
sub: Request SUB byte (e.g. SUB_FULL_CONFIG = 0x01).
Returns:
De-stuffed data payload bytes (payload[5:] of the response frame,
with the checksum already stripped by the parser).
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
KeyError: if sub is not in DATA_LENGTHS (caller should add it).
"""
rsp_sub = _expected_rsp_sub(sub)
# Step 1 — probe (offset = 0)
log.debug("read SUB=0x%02X: probe", sub)
self._send(build_bw_frame(sub, 0))
_probe = self._recv_one(expected_sub=rsp_sub) # ack; page_key always 0
# Look up the hardcoded data length for this SUB
if sub not in DATA_LENGTHS:
raise ProtocolError(
f"No known data length for SUB=0x{sub:02X}. "
"Add it to DATA_LENGTHS in protocol.py."
)
length = DATA_LENGTHS[sub]
log.debug("read SUB=0x%02X: data request offset=0x%02X", sub, length)
if length == 0:
log.warning("read SUB=0x%02X: DATA_LENGTHS entry is zero", sub)
return b""
# Step 2 — data-request (offset = length)
self._send(build_bw_frame(sub, length))
data_rsp = self._recv_one(expected_sub=rsp_sub)
log.debug("read SUB=0x%02X: received %d data bytes", sub, len(data_rsp.data))
return data_rsp.data
def poll(self) -> S3Frame:
"""
Send a single POLL (SUB 5B) probe+data cycle and return the data response.
This is a bare POLL cycle with no boot-string drain — use during an active
session (contrast with startup(), which drains the "Operating System" boot
string first).
Confirmed from 4-2-26 BW TX capture: BW sends exactly 3 of these POLL
cycles between the last 1F and the first 5A probe frame during every
waveform download. Without them the device ignores the 5A probe.
"""
self._send(POLL_PROBE)
self._recv_one(
expected_sub=_expected_rsp_sub(SUB_POLL),
timeout=self._recv_timeout,
)
self._send(POLL_DATA)
return self._recv_one(
expected_sub=_expected_rsp_sub(SUB_POLL),
timeout=self._recv_timeout,
)
def send_keepalive(self) -> None:
"""
Send a single POLL_PROBE keepalive without waiting for a response.
Blastware sends these every ~80ms during idle. Useful if you need to
hold the session open between real requests.
"""
self._send(POLL_PROBE)
# ── Event download API ────────────────────────────────────────────────────
def read_event_index(self) -> bytes:
"""
Send the SUB 08 (EVENT_INDEX) two-step read and return the raw 88-byte
(0x58) index block.
The index block contains:
+0x00 (3 bytes): total index size or record count — purpose partially
decoded; byte [3] may be a high byte of event count.
+0x03 (4 bytes): stored event count as uint32 BE ❓ (inferred from
captures; see §7.4 in protocol reference)
+0x07 onwards: 6-byte event timestamps (see §8), one per event
Caller is responsible for parsing the returned bytes.
Returns:
Raw 88-byte data section (data[11:11+0x58]).
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_EVENT_INDEX)
length = DATA_LENGTHS[SUB_EVENT_INDEX] # 0x58
log.debug("read_event_index: 08 probe")
self._send(build_bw_frame(SUB_EVENT_INDEX, 0))
self._recv_one(expected_sub=rsp_sub)
log.debug("read_event_index: 08 data request offset=0x%02X", length)
self._send(build_bw_frame(SUB_EVENT_INDEX, length))
data_rsp = self._recv_one(expected_sub=rsp_sub)
raw = data_rsp.data[11 : 11 + length]
log.debug("read_event_index: got %d bytes", len(raw))
return raw
def read_event_first(self, token: int = 0) -> tuple[bytes, bytes]:
"""
Send the SUB 1E (EVENT_HEADER) two-step read and return the first
waveform key and accompanying 8-byte event data block.
Args:
token: Token byte placed at params[7]. Use 0 (default) for the
initial browse-mode call that returns the first event key.
Use 0xFE for the second "download-arm" call that must be
sent between 0A and 0C when full_waveform=True; the device
will ignore 5A probe frames unless this arm step has been
issued. Confirmed from 4-2-26 and 4-3-26 BW TX captures.
Returns:
(key4, event_data8) where:
key4 — 4-byte opaque waveform record address (data[11:15])
event_data8 — full 8-byte data section (data[11:19])
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
Confirmed from 3-31-26 capture: 1E request uses all-zero params;
response data section layout is:
[LENGTH_ECHO:1][00×4][KEY_ECHO:4][00×2][KEY4:4][EXTRA:4] …
Actual data starts at data[11]; first 4 bytes are the waveform key.
"""
rsp_sub = _expected_rsp_sub(SUB_EVENT_HEADER)
length = DATA_LENGTHS[SUB_EVENT_HEADER] # 0x08
params = token_params(token)
log.debug("read_event_first: 1E probe (token=0x%02X)", token)
self._send(build_bw_frame(SUB_EVENT_HEADER, 0, params))
self._recv_one(expected_sub=rsp_sub)
log.debug("read_event_first: 1E data request offset=0x%02X (token=0x%02X)", length, token)
self._send(build_bw_frame(SUB_EVENT_HEADER, length, params))
data_rsp = self._recv_one(expected_sub=rsp_sub)
event_data8 = data_rsp.data[11:19]
key4 = data_rsp.data[11:15]
log.debug("read_event_first: key=%s", key4.hex())
return key4, event_data8
def read_waveform_header(self, key4: bytes) -> tuple[bytes, int]:
"""
Send the SUB 0A (WAVEFORM_HEADER) two-step read for *key4*.
The data length for 0A is VARIABLE and must be read from the probe
response at data[4]. Two confirmed values:
0x46 (70) — full triggered event (has 0C waveform record to follow)
0x2C (44) — partial / monitor-log entry (no 0C record; 0A header only)
Args:
key4: 4-byte waveform record address from 1E or 1F.
Returns:
(raw_data, record_length) where:
raw_data — complete data_rsp.data bytes (full response payload)
record_length — DATA_LENGTH read from probe (0x46 for full, 0x2C for partial)
The raw_data layout:
raw_data[0] = record type (0x46 = full triggered event, 0x2C = partial/monitor)
raw_data[1:5] = 0x00 × 4
raw_data[5:9] = event key (4 bytes)
raw_data[9:11] = 0x00 × 2
raw_data[11:] = timestamps + separator + serial + channel strings
(see MonitorLogEntry in models.py for full layout)
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
Confirmed from 4-11-26 MITM capture: 0A probe response data[4] carries
the variable length; data-request uses that length as the offset byte.
record_length == data[0] in virtually all cases (confirmed empirically).
"""
rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_HEADER)
params = waveform_key_params(key4)
log.debug("read_waveform_header: 0A probe key=%s", key4.hex())
self._send(build_bw_frame(SUB_WAVEFORM_HEADER, 0, params))
probe_rsp = self._recv_one(expected_sub=rsp_sub)
# Variable length — read from probe response data[4]
length = probe_rsp.data[4] if len(probe_rsp.data) > 4 else 0x46
log.debug("read_waveform_header: 0A data request offset=0x%02X", length)
if length == 0:
return b"", 0
self._send(build_bw_frame(SUB_WAVEFORM_HEADER, length, params))
data_rsp = self._recv_one(expected_sub=rsp_sub)
log.debug(
"read_waveform_header: key=%s length=0x%02X is_full=%s",
key4.hex(), length, length >= 0x40,
)
return data_rsp.data, length
def read_waveform_data_raw(self) -> bytes:
"""
Send the SUB 09 (WAVEFORM_DATA) two-step read and return the raw
202-byte (0xCA) waveform data block.
This is the "waveform data" block that Blastware reads from the device
before the write sequence (confirmed from 3-11-26 BW TX capture BW[80-81]).
The returned bytes are used verbatim as the ``waveform_data`` payload for
``write_waveform_data()`` / ``push_config_raw()``.
Returns:
Raw data section starting at data[11:], typically 204 bytes.
(data[11 : 11 + 0xCA] = 202 bytes on some firmware; the actual
length may be 204 depending on firmware version.)
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
"""
SUB_WAVEFORM_DATA = 0x09
rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_DATA) # 0xFF - 0x09 = 0xF6
length = DATA_LENGTHS[SUB_WAVEFORM_DATA] # 0xCA = 202
log.debug("read_waveform_data_raw: 09 probe")
self._send(build_bw_frame(SUB_WAVEFORM_DATA, 0))
self._recv_one(expected_sub=rsp_sub)
log.debug("read_waveform_data_raw: 09 data request offset=0x%02X", length)
self._send(build_bw_frame(SUB_WAVEFORM_DATA, length))
data_rsp = self._recv_one(expected_sub=rsp_sub)
raw = data_rsp.data[11:]
log.debug("read_waveform_data_raw: got %d bytes", len(raw))
return raw
def read_waveform_record(self, key4: bytes) -> bytes:
"""
Send the SUB 0C (WAVEFORM_RECORD / FULL_WAVEFORM_RECORD) two-step read.
Returns the 210-byte waveform/histogram record containing:
- Record type string ("Histogram" or "Waveform") at a variable offset
- Per-channel labels ("Tran", "Vert", "Long", "MicL") with PPV floats
at label_offset + 6
Args:
key4: 4-byte waveform record address.
Returns:
210-byte record bytes (data[11:11+0xD2]).
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
Confirmed from 3-31-26 capture: 0C always uses offset=0xD2 (210 bytes).
"""
rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_RECORD)
length = DATA_LENGTHS[SUB_WAVEFORM_RECORD] # 0xD2
params = waveform_key_params(key4)
log.debug("read_waveform_record: 0C probe key=%s", key4.hex())
self._send(build_bw_frame(SUB_WAVEFORM_RECORD, 0, params))
self._recv_one(expected_sub=rsp_sub)
log.debug("read_waveform_record: 0C data request offset=0x%02X", length)
self._send(build_bw_frame(SUB_WAVEFORM_RECORD, length, params))
data_rsp = self._recv_one(expected_sub=rsp_sub)
record = data_rsp.data[11:11 + length]
log.debug("read_waveform_record: received %d record bytes", len(record))
return record
def read_bulk_waveform_stream(
self,
key4: bytes,
*,
stop_after_metadata: bool = True,
max_chunks: int = 32,
include_terminator: bool = False,
extra_chunks_after_metadata: int = 1,
) -> list[S3Frame]:
"""
Download the SUB 5A (BULK_WAVEFORM_STREAM) A5 frames for one event.
The bulk waveform stream carries both raw ADC samples (large) and
event-time metadata strings ("Project:", "Client:", "User Name:",
"Seis Loc:", "Extended Notes") embedded in one of the middle frames
(confirmed: A5[7] of 9 for 1-2-26 capture).
Protocol is request-per-chunk, NOT a continuous stream:
1. Probe (offset=_BULK_CHUNK_OFFSET, is_probe=True, counter=0x0000)
2. Chunks (offset=_BULK_CHUNK_OFFSET, is_probe=False, counter+=0x0400)
3. Loop until metadata found (stop_after_metadata=True) or max_chunks
4. Termination (offset=_BULK_TERM_OFFSET, counter=last+_BULK_COUNTER_STEP)
Device responds with a final A5 frame (page_key=0x0000).
By default the termination frame (page_key=0x0000) is NOT included in the
returned list. Pass include_terminator=True to append it; the blastware_file
writer needs the terminator frame's body to reconstruct the waveform file footer.
Args:
key4: 4-byte waveform key from EVENT_HEADER (1E).
stop_after_metadata: If True (default), send termination as soon as
b"Project:" is found in a frame's data — avoids
downloading the full ADC waveform payload (several
hundred KB). Set False to download everything.
max_chunks: Safety cap on the number of chunk requests sent
(default 32; a typical event uses 9 large frames).
include_terminator: If True, append the terminator A5 frame
(page_key=0x0000) to the returned list. The
terminator carries the waveform file footer bytes.
Default False preserves existing caller behaviour.
Returns:
List of S3Frame objects from each A5 response frame. Frame indices
match the request sequence: index 0 = probe response, index 1 = first
chunk, etc. If include_terminator=True, the last element is the
terminator frame (page_key=0x0000).
Raises:
ProtocolError: on timeout, bad checksum, or unexpected SUB.
Confirmed from 1-2-26 BW TX/RX captures (2026-04-02):
- probe + 8 regular chunks + 1 termination = 10 TX frames
- 9 large A5 responses + 1 terminator A5 = 10 RX frames
- page_key=0x0010 on large frames; page_key=0x0000 on terminator ✅
- "Project:" metadata at A5[7].data[626] ✅
"""
if len(key4) != 4:
raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}")
rsp_sub = _expected_rsp_sub(SUB_BULK_WAVEFORM) # 0xFF - 0x5A = 0xA5
frames_data: list[S3Frame] = []
counter = 0
# ── Step 1: probe ────────────────────────────────────────────────────
log.debug("5A probe key=%s", key4.hex())
params = bulk_waveform_params(key4, 0, is_probe=True)
self._send(build_5a_frame(_BULK_CHUNK_OFFSET, params))
self._parser.reset() # reset bytes_fed counter before probe recv
try:
rsp = self._recv_one(expected_sub=rsp_sub, reset_parser=False)
except TimeoutError:
log.warning(
"5A probe TIMED OUT for key=%s"
"%d raw bytes received (no complete A5 frame assembled)",
key4.hex(), self._parser.bytes_fed,
)
raise
frames_data.append(rsp)
log.debug("5A A5[0] page_key=0x%04X %d bytes", rsp.page_key, len(rsp.data))
# ── Step 2: chunk loop ───────────────────────────────────────────────
# Chunk counters are monotonic: chunk_num * 0x0400 for all chunks.
# The 4-2-26 BW TX capture showed 0x1004 for chunk 1, but this is a
# Blastware artifact — the device accepts any counter value and streams
# data regardless. Empirically confirmed 2026-04-06: 0x0400 for chunk 1
# works; 0x1004 causes the device to ignore the frame (timeout).
for chunk_num in range(1, max_chunks + 1):
counter = chunk_num * _BULK_COUNTER_STEP
params = bulk_waveform_params(key4, counter)
log.debug("5A chunk %d counter=0x%04X", chunk_num, counter)
self._send(build_5a_frame(_BULK_CHUNK_OFFSET, params))
self._parser.reset() # reset bytes_fed for accurate per-chunk count
try:
rsp = self._recv_one(expected_sub=rsp_sub, reset_parser=False, timeout=10.0)
except TimeoutError:
raw = self._parser.bytes_fed
log.warning(
"5A TIMEOUT chunk=%d counter=0x%04X raw_bytes=%d",
chunk_num, counter, raw,
)
if raw > 0 and frames_data:
# Device sent a partial byte (likely a bare DLE/ETX end-of-stream
# signal) but never completed a full frame. Treat as graceful
# stream end and fall through to the termination step.
log.warning(
"5A end-of-stream detected at chunk=%d (raw_bytes=%d, "
"frames_collected=%d) — proceeding to termination",
chunk_num, raw, len(frames_data),
)
break
raise
log.warning(
"5A RX chunk=%d page_key=0x%04X data_len=%d contains_Project=%s",
chunk_num, rsp.page_key, len(rsp.data), b"Project:" in rsp.data,
)
if rsp.page_key == 0x0000:
# Device unexpectedly terminated mid-stream (no termination needed).
log.debug("5A A5[%d] page_key=0x0000 — device terminated early", chunk_num)
if include_terminator:
frames_data.append(rsp)
return frames_data
frames_data.append(rsp)
if stop_after_metadata and b"Project:" in rsp.data:
# Download exactly one more chunk after finding metadata — this is
# what Blastware does. The extra chunk contains the tail ADC data
# and primes the device to return a valid footer in the termination
# response. Without it, termination returns an empty ack with no
# footer bytes (confirmed 2026-04-23 from HxD comparison).
# Download extra chunks until we hit post-event silence (all-FF
# ADC values) or the cap. The device returns the footer in the
# termination response only when we stop at the right point —
# right after the last real data chunk, before silence starts.
# Silence detection: >80% of payload bytes are 0xFF.
log.debug("5A A5[%d] metadata found — fetching extra chunks until silence",
chunk_num)
for _extra_n in range(extra_chunks_after_metadata):
chunk_num += 1
counter = chunk_num * _BULK_COUNTER_STEP
params = bulk_waveform_params(key4, counter)
self._send(build_5a_frame(_BULK_CHUNK_OFFSET, params))
try:
extra = self._recv_one(expected_sub=rsp_sub, timeout=10.0)
payload = extra.data[7:] # skip 7-byte frame header
ff_ratio = payload.count(0xFF) / max(len(payload), 1)
is_silence = ff_ratio > 0.8
log.debug(
"5A A5[%d] extra chunk page_key=0x%04X data_len=%d "
"ff_ratio=%.2f silence=%s",
chunk_num, extra.page_key, len(extra.data),
ff_ratio, is_silence,
)
if extra.page_key == 0x0000:
if include_terminator:
frames_data.append(extra)
return frames_data
if is_silence:
# Don't include the silence chunk — terminate here.
# The termination response will contain the footer.
log.debug("5A A5[%d] silence detected — stopping before this chunk",
chunk_num)
break
frames_data.append(extra)
except TimeoutError:
log.debug("5A extra chunk %d timed out — end of stream", _extra_n + 1)
break
break
else:
log.warning(
"5A reached max_chunks=%d without end-of-stream; sending termination",
max_chunks,
)
# ── Step 3: termination ──────────────────────────────────────────────
term_counter = counter + _BULK_COUNTER_STEP
term_params = bulk_waveform_term_params(key4, term_counter)
log.debug(
"5A termination term_counter=0x%04X offset=0x%04X",
term_counter, _BULK_TERM_OFFSET,
)
self._send(build_5a_frame(_BULK_TERM_OFFSET, term_params))
try:
term_rsp = self._recv_one(expected_sub=rsp_sub)
log.debug(
"5A termination response page_key=0x%04X %d bytes",
term_rsp.page_key, len(term_rsp.data),
)
if include_terminator:
frames_data.append(term_rsp)
except TimeoutError:
log.debug("5A no termination response — device may have already closed")
return frames_data
def advance_event(self, browse: bool = False) -> tuple[bytes, bytes]:
"""
Send the SUB 1F (EVENT_ADVANCE) two-step read and return the next
waveform key and the full 8-byte event data block.
browse=False (default, download mode): sends token=0xFE at params[7].
Used by get_events() — the token causes the device to skip partial
histogram bins and return the key of the next FULL record.
browse=True: sends all-zero params (no token). Matches Blastware's
confirmed browse-mode sequence: 0A → 1F(zeros) → 0A → 1F(zeros).
Used by count_events() where no 0C/5A download occurs.
IMPORTANT: A preceding 0A (read_waveform_header) call is REQUIRED in
both modes to establish device waveform context. Without it, 1F
returns the null sentinel regardless of how many events are stored.
Returns:
(key4, event_data8) where:
key4 — 4-byte opaque waveform record address (data[11:15]).
event_data8 — full 8-byte block (data[11:19]).
End-of-events sentinel: event_data8[4:8] == b'\\x00\\x00\\x00\\x00'.
DO NOT use key4 == b'\\x00\\x00\\x00\\x00' as the sentinel — key4 is
all-zeros for event 0 (the very first stored event) and will cause the
loop to terminate prematurely.
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_EVENT_ADVANCE)
length = DATA_LENGTHS[SUB_EVENT_ADVANCE] # 0x08
params = token_params(0) if browse else token_params(0xFE)
mode = "browse" if browse else "download"
log.debug("advance_event: 1F probe mode=%s params=%s", mode, params.hex())
self._send(build_bw_frame(SUB_EVENT_ADVANCE, 0, params))
self._recv_one(expected_sub=rsp_sub)
log.debug("advance_event: 1F data request offset=0x%02X", length)
self._send(build_bw_frame(SUB_EVENT_ADVANCE, length, params))
data_rsp = self._recv_one(expected_sub=rsp_sub)
event_data8 = data_rsp.data[11:19]
key4 = data_rsp.data[11:15]
is_done = event_data8[4:8] == b"\x00\x00\x00\x00"
log.debug(
"advance_event: next key=%s data8=%s done=%s",
key4.hex(), event_data8.hex(), is_done,
)
return key4, event_data8
def read_compliance_config(self) -> bytes:
"""
Send the SUB 1A (COMPLIANCE_CONFIG) multi-step read and accumulate
all E5 response frames into a single config byte string.
BE18189 sends the full config in one large E5 frame (~4245 cfg bytes).
BE11529 appears to chunk the response — each E5 frame carries ~44 bytes
of cfg data. This method loops until the expected 0x082A (2090) bytes
are accumulated or the inter-frame gap exceeds _INTER_FRAME_TIMEOUT.
Frame structure (confirmed from raw BW captures 3-11-26):
Probe (Frame A): byte[5]=0x00, params[7]=0x64
Data req (Frame D): byte[5]=0x2A, params[2]=0x08, params[7]=0x64
0x082A split: byte[5]=0x2A (offset low), params[2]=0x08 (length high)
params[7]=0x64 required in both probe and data-request.
Returns:
Accumulated compliance config bytes. First frame: data[11:] (skips
11-byte echo header). Subsequent frames: structure logged and
accumulated from data[11:] as well — adjust offset if structure differs.
Raises:
ProtocolError: if the very first E5 frame is not received (hard timeout).
"""
rsp_sub = _expected_rsp_sub(SUB_COMPLIANCE)
# Probe — params[7]=0x64 required (confirmed from BW capture)
_PROBE_PARAMS = bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00])
log.debug("read_compliance_config: 1A probe")
self._send(build_bw_frame(SUB_COMPLIANCE, 0, _PROBE_PARAMS))
self._recv_one(expected_sub=rsp_sub)
# Frame D params — offset=0x002A, params[2]=0x08, params[7]=0x64
_DATA_PARAMS = bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00])
# ── Multi-request accumulation ────────────────────────────────────────
#
# Full BW sequence (confirmed from raw_bw captures 3-11-26):
#
# Frame B: offset=0x0400 params[2]=0x00 → requests cfg bytes 0..1023
# Frame C: offset=0x0400 params[2]=0x04 → requests cfg bytes 1024..2047
# Frame D: offset=0x002A params[2]=0x08 → requests cfg bytes 2048..2089
#
# Total: 0x0400 + 0x0400 + 0x002A = 0x082A = 2090 bytes.
#
# The "offset" field in B and C encodes the chunk length (0x0400 = 1024),
# not a byte offset into the config. params[2] tracks cumulative pages
# (0x00 → 0x04 → 0x08; each page = 256 bytes → 0x04 pages = 1024 bytes).
#
# Each request gets its own E5 response with an 11-byte echo header.
# Devices that send the full block in a single frame (BE18189) may return
# the entire config from the last request alone — we handle both cases by
# trying each step and concatenating whatever arrives.
_DATA_PARAMS_B = bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00])
_DATA_PARAMS_C = bytes([0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00])
# _DATA_PARAMS_D already built above as _DATA_PARAMS
_STEPS = [
("B", 0x0400, _DATA_PARAMS_B),
("C", 0x0400, _DATA_PARAMS_C),
("D", 0x002A, _DATA_PARAMS), # _DATA_PARAMS built above
]
config = bytearray()
for step_name, step_offset, step_params in _STEPS:
log.debug(
"read_compliance_config: sending frame %s offset=0x%04X params=%s",
step_name, step_offset, step_params.hex(),
)
self._send(build_bw_frame(SUB_COMPLIANCE, step_offset, step_params))
try:
data_rsp = self._recv_one(expected_sub=rsp_sub)
except TimeoutError:
log.warning(
"read_compliance_config: frame %s — no E5 response (timeout)",
step_name,
)
continue
chunk = data_rsp.data[11:]
log.warning(
"read_compliance_config: frame %s page=0x%04X data=%d cfg_chunk=%d running_total=%d",
step_name, data_rsp.page_key, len(data_rsp.data),
len(chunk), len(config) + len(chunk),
)
config.extend(chunk)
# Safety drain: catch any extra frame the device may buffer on slow links.
try:
tail_rsp = self._recv_one(expected_sub=rsp_sub, timeout=2.0)
tail_chunk = tail_rsp.data[11:]
log.warning(
"read_compliance_config: unexpected tail frame page=0x%04X "
"cfg_chunk=%d running_total=%d",
tail_rsp.page_key, len(tail_chunk), len(config) + len(tail_chunk),
)
config.extend(tail_chunk)
except TimeoutError:
pass
log.warning(
"read_compliance_config: done — %d cfg bytes total",
len(config),
)
# Hex dump first 128 bytes for field mapping
for row in range(0, min(len(config), 128), 16):
row_bytes = bytes(config[row:row + 16])
hex_part = ' '.join(f'{b:02x}' for b in row_bytes)
asc_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in row_bytes)
log.warning(" cfg[%04x]: %-48s %s", row, hex_part, asc_part)
return bytes(config)
# ── Write commands (SUBs 6883) ───────────────────────────────────────────
def recv_write_ack(
self,
expected_sub: int,
timeout: Optional[float] = None,
) -> S3Frame:
"""
Wait for a write-ack S3 frame.
All write ack responses are 17-byte frames (11-byte header + no data +
1 checksum byte) with SUB = 0xFF - request_SUB. The page_key and data
section carry zeros. Confirmed from 3-11-26 BW capture.
Args:
expected_sub: Expected response SUB byte (0xFF - write_request_SUB).
timeout: Seconds to wait; defaults to self._recv_timeout.
Returns:
The ack S3Frame.
Raises:
TimeoutError: if no frame arrives in time.
UnexpectedResponse: if the response SUB doesn't match.
"""
log.debug("recv_write_ack: waiting for SUB=0x%02X", expected_sub)
ack = self._recv_one(expected_sub=expected_sub, timeout=timeout)
log.debug(
"recv_write_ack: received SUB=0x%02X page=0x%04X data=%d bytes",
ack.sub, ack.page_key, len(ack.data),
)
return ack
def write_confirm(self, sub: int) -> S3Frame:
"""
Send a zero-data confirm frame and wait for the ack.
Confirm frames (SUBs 72, 73, 74, 83) carry no write data — they are
16-byte header-only frames (offset=0, params=zeros, data=b"") with the
DLE-aware large-frame checksum. The device acks with the complementary
RSP_SUB.
Args:
sub: Confirm SUB byte (SUB_WRITE_CONFIRM_A/B/C or SUB_TRIGGER_CONFIRM).
Returns:
The ack S3Frame.
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(sub)
frame = build_bw_write_frame(sub, b"")
log.debug("write_confirm: SUB=0x%02X frame=%s", sub, frame.hex())
self._send(frame)
return self.recv_write_ack(expected_sub=rsp_sub)
def write_event_index(self, data: bytes) -> S3Frame:
"""
Send a SUB 68 (EVENT_INDEX_WRITE) frame and await the confirm ack (SUB 97).
Offset formula: data[1] + 2 — confirmed from 3-11-26 BW TX capture frame 102.
The write payload has a 2-byte header [0x00][length] where data[1] encodes
the length of the meaningful payload; offset = data[1] + 2.
Example from capture:
data[0:4] = 00 58 09 00 (data[1]=0x58=88 → offset=0x5A=90)
data length = 91, offset = 90
Write sequence fragment:
68 (data) → device acks with SUB 0x97
73 (confirm) → device acks with SUB 0x8C
Callers should call write_confirm(SUB_WRITE_CONFIRM_B) after this.
Args:
data: Raw event-index payload bytes to write to the device.
Must be at least 2 bytes. data[1] must contain the length field.
Returns:
The SUB 0x97 ack frame.
Raises:
ProtocolError: on timeout or wrong response SUB.
ValueError: if data is shorter than 2 bytes.
"""
if len(data) < 2:
raise ValueError(f"event index write data must be at least 2 bytes, got {len(data)}")
rsp_sub = _expected_rsp_sub(SUB_EVENT_INDEX_WRITE) # 0xFF - 0x68 = 0x97
offset = data[1] + 2
frame = build_bw_write_frame(SUB_EVENT_INDEX_WRITE, data, offset=offset)
log.debug(
"write_event_index: %d bytes data[1]=0x%02X offset=0x%04X rsp_sub=0x%02X",
len(data), data[1], offset, rsp_sub,
)
self._send(frame)
return self.recv_write_ack(expected_sub=rsp_sub)
def write_waveform_data(self, data: bytes) -> S3Frame:
"""
Send a SUB 69 (WAVEFORM_DATA_WRITE) frame and await the confirm ack (SUB 96).
Offset formula: data[1] + 2 — same pattern as write_event_index().
Confirmed from 3-11-26 BW TX capture frame 110:
data[0:4] = 00 c8 08 00 (data[1]=0xC8=200 → offset=0xCA=202)
data length = 204, offset = 202
Write sequence fragment:
69 (data) → device acks with SUB 0x96
74 (confirm) → device acks with SUB 0x8B
72 (confirm) → device acks with SUB 0x8D
Callers should call write_confirm(SUB_WRITE_CONFIRM_C) then
write_confirm(SUB_WRITE_CONFIRM_A) after this.
Args:
data: Raw waveform-data payload bytes to write.
Must be at least 2 bytes. data[1] must contain the length field.
Returns:
The SUB 0x96 ack frame.
Raises:
ProtocolError: on timeout or wrong response SUB.
ValueError: if data is shorter than 2 bytes.
"""
if len(data) < 2:
raise ValueError(f"waveform data write payload must be at least 2 bytes, got {len(data)}")
rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_DATA_WRITE) # 0xFF - 0x69 = 0x96
offset = data[1] + 2
frame = build_bw_write_frame(SUB_WAVEFORM_DATA_WRITE, data, offset=offset)
log.debug(
"write_waveform_data: %d bytes data[1]=0x%02X offset=0x%04X rsp_sub=0x%02X",
len(data), data[1], offset, rsp_sub,
)
self._send(frame)
return self.recv_write_ack(expected_sub=rsp_sub)
def write_compliance_config_raw(self, data: bytes) -> None:
"""
Send the SUB 71 (COMPLIANCE_WRITE) 3-chunk sequence and final confirm.
The full compliance config payload (~2128 bytes) is split into exactly 3
chunks with hardcoded boundaries and params confirmed from the 3-11-26 BW
TX capture (frames 104108):
Chunk 1 — first 1027 bytes:
offset=0x1004 params=bytes(10)
device acks SUB 0x8E
Chunk 2 — next 1055 bytes:
offset=0x1004 params=b'\\x00\\x00\\x00\\x00\\x10\\x04' + b'\\x00'*4
device acks SUB 0x8E
Chunk 3 — remaining bytes:
offset=0x002C params=b'\\x00\\x00\\x08' + b'\\x00'*7
device acks SUB 0x8E
Confirm — SUB 72 (zero data):
device acks SUB 0x8D
The total write payload should be at least 1027+1055=2082 bytes; chunk 3
carries everything after offset 2082 (typically ~46 bytes for a 2128-byte
config).
Args:
data: Raw compliance config bytes to write. Must be at least 2082 bytes.
Raises:
ValueError: if data is too short to fill chunks 1 and 2.
ProtocolError: on timeout or wrong response SUB from any chunk.
"""
_CHUNK1_SIZE = 1027
_CHUNK2_SIZE = 1055
_CHUNK1_OFFSET = 0x1004
_CHUNK2_OFFSET = 0x1004
_CHUNK3_OFFSET = 0x002C
_CHUNK1_PARAMS = bytes(10)
_CHUNK2_PARAMS = bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00])
_CHUNK3_PARAMS = bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
min_size = _CHUNK1_SIZE + _CHUNK2_SIZE
if len(data) < min_size:
raise ValueError(
f"Compliance write data too short: {len(data)} bytes, "
f"need at least {min_size} (chunk1={_CHUNK1_SIZE} + chunk2={_CHUNK2_SIZE})"
)
rsp_sub = _expected_rsp_sub(SUB_COMPLIANCE_WRITE) # 0xFF - 0x71 = 0x8E
chunk1 = data[:_CHUNK1_SIZE]
chunk2 = data[_CHUNK1_SIZE : _CHUNK1_SIZE + _CHUNK2_SIZE]
chunk3 = data[_CHUNK1_SIZE + _CHUNK2_SIZE :]
chunks = [
(1, chunk1, _CHUNK1_OFFSET, _CHUNK1_PARAMS),
(2, chunk2, _CHUNK2_OFFSET, _CHUNK2_PARAMS),
(3, chunk3, _CHUNK3_OFFSET, _CHUNK3_PARAMS),
]
for chunk_num, chunk_data, chunk_offset, chunk_params in chunks:
frame = build_bw_write_frame(
SUB_COMPLIANCE_WRITE,
chunk_data,
offset=chunk_offset,
params=chunk_params,
)
log.debug(
"write_compliance_config_raw: chunk %d %d bytes "
"offset=0x%04X params=%s",
chunk_num, len(chunk_data), chunk_offset, chunk_params.hex(),
)
self._send(frame)
self.recv_write_ack(expected_sub=rsp_sub)
log.debug("write_compliance_config_raw: chunk %d acked", chunk_num)
# Final confirm (SUB 72)
log.debug("write_compliance_config_raw: sending confirm (SUB 0x72)")
self.write_confirm(SUB_WRITE_CONFIRM_A)
log.debug("write_compliance_config_raw: done")
def write_trigger_config(self, data: bytes) -> S3Frame:
"""
Send a SUB 82 (TRIGGER_CONFIG_WRITE) frame and await the confirm ack (SUB 7D).
Offset formula: data[1] + 2 — same pattern as write_event_index().
Confirmed from 3-11-26 BW TX capture frame 108:
data[0:4] = 00 1a d5 00 (data[1]=0x1A=26 → offset=0x1C=28)
data length = 29, offset = 28
Write sequence fragment:
82 (data) → device acks with SUB 0x7D
83 (confirm) → device acks with SUB 0x7C
Callers should call write_confirm(SUB_TRIGGER_CONFIRM) after this.
Args:
data: Raw trigger-config payload bytes to write.
Must be at least 2 bytes. data[1] must contain the length field.
Returns:
The SUB 0x7D ack frame.
Raises:
ProtocolError: on timeout or wrong response SUB.
ValueError: if data is shorter than 2 bytes.
"""
if len(data) < 2:
raise ValueError(f"trigger config write payload must be at least 2 bytes, got {len(data)}")
rsp_sub = _expected_rsp_sub(SUB_TRIGGER_CONFIG_WRITE) # 0xFF - 0x82 = 0x7D
offset = data[1] + 2
frame = build_bw_write_frame(SUB_TRIGGER_CONFIG_WRITE, data, offset=offset)
log.debug(
"write_trigger_config: %d bytes data[1]=0x%02X offset=0x%04X rsp_sub=0x%02X",
len(data), data[1], offset, rsp_sub,
)
self._send(frame)
return self.recv_write_ack(expected_sub=rsp_sub)
# ── Call home config (SUBs 0x2C / 0x7E / 0x7F) ──────────────────────────
def read_call_home_config(self) -> bytes:
"""
Read the auto call home configuration (SUB 0x2C → response 0xD3).
Standard two-step read: probe (offset=0x00) then data (offset=0x7C=124).
Returns the raw 125-byte payload (data[11:] of the data response).
Confirmed from 4-20-26 call home settings capture:
- Probe response: data[4]=0x7C (confirms data length = 124)
- Data response: 136 bytes total (11-byte echo header + 125 bytes payload)
- Payload[0:3] = 0x00 0x7C 0xDC (header: zero, inner-length, constant)
- Payload[5] = auto_call_home_enabled
- Payload[6:46] = dial_string (40-byte null-padded ASCII "RADIO RING")
Returns:
Raw 125-byte call home config payload (data[11:]).
Suitable for round-trip write (append \\x00\\x00 → 127-byte write payload).
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_CALL_HOME) # 0xFF - 0x2C = 0xD3
length = DATA_LENGTHS[SUB_CALL_HOME] # 0x7C = 124
log.debug("read_call_home_config: 0x2C probe")
self._send(build_bw_frame(SUB_CALL_HOME, 0))
self._recv_one(expected_sub=rsp_sub)
log.debug("read_call_home_config: 0x2C data request offset=0x%02X", length)
self._send(build_bw_frame(SUB_CALL_HOME, length))
data_rsp = self._recv_one(expected_sub=rsp_sub)
payload = data_rsp.data[11:]
log.debug("read_call_home_config: received %d payload bytes", len(payload))
return payload
def write_call_home_config(self, data: bytes) -> None:
"""
Write the auto call home configuration (SUB 0x7E → 0x7F confirm).
Write sequence (confirmed from 4-20-26 call home settings captures):
SUB 0x7E write 127-byte payload → device acks SUB 0x81
SUB 0x7F confirm (no data) → device acks SUB 0x80
The 127-byte write payload = 125-byte read payload + b'\\x00\\x00'.
The offset field = data[1] + 2 = 0x7C + 2 = 0x7E = 126.
Write frame format: build_bw_write_frame (minimal DLE stuffing — only
BW_CMD is doubled; all other bytes are RAW). The \\x10\\x03 sequence
within the payload is preserved as-is (device interprets DLE+ETX as the
literal value 0x03 per the inner-frame terminator convention).
Args:
data: 127-byte write payload (read payload + \\x00\\x00 footer).
Must start with [0x00][0x7C][...] (standard header).
Raises:
ValueError: if data is not exactly 127 bytes or lacks expected header.
ProtocolError: on timeout or wrong response SUB.
"""
if len(data) < 2:
raise ValueError(f"call home write payload must be at least 2 bytes, got {len(data)}")
rsp_sub_write = _expected_rsp_sub(SUB_CALL_HOME_WRITE) # 0xFF - 0x7E = 0x81
rsp_sub_confirm = _expected_rsp_sub(SUB_CALL_HOME_CONFIRM) # 0xFF - 0x7F = 0x80
# Offset formula: data[1] + 2 (same pattern as other single-chunk writes)
offset = data[1] + 2 # 0x7C + 2 = 0x7E = 126
frame = build_bw_write_frame(SUB_CALL_HOME_WRITE, data, offset=offset)
log.debug(
"write_call_home_config: %d bytes data[1]=0x%02X offset=0x%04X",
len(data), data[1], offset,
)
self._send(frame)
self.recv_write_ack(expected_sub=rsp_sub_write)
log.debug("write_call_home_config: write acked; sending confirm 0x7F")
confirm_frame = build_bw_write_frame(SUB_CALL_HOME_CONFIRM, b"")
self._send(confirm_frame)
self.recv_write_ack(expected_sub=rsp_sub_confirm)
log.debug("write_call_home_config: confirm acked — done")
# ── Monitoring ────────────────────────────────────────────────────────────
def read_monitor_status(self) -> S3Frame:
"""
Read monitoring status (SUB 0x1C → response 0xE3).
Two-step read: probe (offset=0x00) then data (offset=0x2C).
Returns:
S3Frame with 44 bytes of status data (idle state).
When unit is actively monitoring the payload is shorter (12 bytes);
callers should check frame data length to determine mode.
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_MONITOR_STATUS) # 0xFF - 0x1C = 0xE3
log.debug("read_monitor_status: probe step rsp_sub=0x%02X", rsp_sub)
probe_frame = build_bw_frame(SUB_MONITOR_STATUS, offset=0x00)
self._send(probe_frame)
self._recv_one(expected_sub=rsp_sub)
log.debug("read_monitor_status: data step offset=0x%02X", DATA_LENGTHS[SUB_MONITOR_STATUS])
data_frame = build_bw_frame(SUB_MONITOR_STATUS, offset=DATA_LENGTHS[SUB_MONITOR_STATUS])
self._send(data_frame)
return self._recv_one(expected_sub=rsp_sub)
def start_monitoring(self) -> S3Frame:
"""
Send Start Monitoring command (SUB 0x96 → response 0x69).
Single write frame, no data payload. Confirmed from 4-8-26/2ndtry
BW TX capture frame 92.
Returns:
S3Frame ack from device.
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_START_MONITORING) # 0xFF - 0x96 = 0x69
log.debug("start_monitoring: rsp_sub=0x%02X", rsp_sub)
frame = build_bw_write_frame(SUB_START_MONITORING, b"")
self._send(frame)
return self.recv_write_ack(expected_sub=rsp_sub)
def stop_monitoring(self) -> S3Frame:
"""
Send Stop Monitoring command (SUB 0x97 → response 0x68).
Single write frame, no data payload. Confirmed from 4-8-26/2ndtry
BW TX capture frame 305.
Returns:
S3Frame ack from device.
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_STOP_MONITORING) # 0xFF - 0x97 = 0x68
log.debug("stop_monitoring: rsp_sub=0x%02X", rsp_sub)
frame = build_bw_write_frame(SUB_STOP_MONITORING, b"")
self._send(frame)
return self.recv_write_ack(expected_sub=rsp_sub)
def read_event_storage_range(self) -> S3Frame:
"""
Read event storage range (SUB 0x06 → response 0xF9).
Two-step read: probe (offset=0x00) then data (offset=0x24 = 36 bytes).
Uses token=0xFE at params[7] — same as the erase sequence.
The 36-byte response ends with two 4-byte event keys (first and last
stored event key). After a successful erase, both keys are 0x01110000
(device-empty sentinel). Confirmed from 4-11-26 MITM capture.
Returns:
S3Frame with 36 bytes of storage range data.
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_CHANNEL_CONFIG) # 0xFF - 0x06 = 0xF9
params = token_params(0xFE)
log.debug("read_event_storage_range: probe step rsp_sub=0x%02X", rsp_sub)
self._send(build_bw_frame(SUB_CHANNEL_CONFIG, offset=0x00, params=params))
self._recv_one(expected_sub=rsp_sub)
log.debug(
"read_event_storage_range: data step offset=0x%02X",
DATA_LENGTHS[SUB_CHANNEL_CONFIG],
)
self._send(build_bw_frame(SUB_CHANNEL_CONFIG,
offset=DATA_LENGTHS[SUB_CHANNEL_CONFIG],
params=params))
return self._recv_one(expected_sub=rsp_sub)
def begin_erase_all(self) -> S3Frame:
"""
Send Begin-Erase-All command (SUB 0xA3 → response 0x5C).
Single frame with token=0xFE at params[7]. The device acknowledges with
a minimal ack and begins the erase process. Follow up with
read_monitor_status() + read_event_storage_range() + confirm_erase_all()
to complete the sequence. Confirmed from 4-11-26 MITM capture.
Returns:
S3Frame ack from device (SUB 0x5C).
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_ERASE_ALL_BEGIN) # 0xFF - 0xA3 = 0x5C
log.debug("begin_erase_all: rsp_sub=0x%02X", rsp_sub)
self._send(build_bw_frame(SUB_ERASE_ALL_BEGIN, params=token_params(0xFE)))
return self._recv_one(expected_sub=rsp_sub)
def confirm_erase_all(self) -> S3Frame:
"""
Send Confirm-Erase-All command (SUB 0xA2 → response 0x5D).
Single frame with token=0xFE at params[7]. Must be preceded by
begin_erase_all() + read_monitor_status() + read_event_storage_range().
After this call the device memory is cleared. Confirmed from 4-11-26
MITM capture.
Returns:
S3Frame ack from device (SUB 0x5D).
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_ERASE_ALL_CONFIRM) # 0xFF - 0xA2 = 0x5D
log.debug("confirm_erase_all: rsp_sub=0x%02X", rsp_sub)
self._send(build_bw_frame(SUB_ERASE_ALL_CONFIRM, params=token_params(0xFE)))
return self._recv_one(expected_sub=rsp_sub)
# ── Internal helpers ──────────────────────────────────────────────────────
def _send(self, frame: bytes) -> None:
"""Write a pre-built frame to the transport."""
log.debug("TX %d bytes: %s", len(frame), frame.hex())
self._transport.write(frame)
def _recv_one(
self,
expected_sub: Optional[int] = None,
timeout: Optional[float] = None,
reset_parser: bool = True,
) -> S3Frame:
"""
Read bytes from the transport until one complete S3 frame is parsed.
Feeds bytes through the streaming S3FrameParser. Keeps reading until
a frame arrives or the deadline expires.
Args:
expected_sub: If provided, raises UnexpectedResponse if the
received frame's SUB doesn't match.
timeout: Seconds to wait. Defaults to self._recv_timeout.
reset_parser: If True (default), reset the parser before reading.
Pass False when accumulating multiple frames from a
single device response (e.g. chunked E5 replies) so
that bytes already buffered between frames are not lost.
Returns:
The first complete S3Frame received.
Raises:
TimeoutError: if no frame arrives within the timeout.
ChecksumError: if the frame has an invalid checksum.
UnexpectedResponse: if expected_sub is set and doesn't match.
"""
deadline = time.monotonic() + (timeout or self._recv_timeout)
if reset_parser:
self._parser.reset()
self._pending_frames.clear()
# If a prior read() parsed more frames than it returned (e.g. two frames
# arrived in one TCP chunk), return the buffered one immediately.
if self._pending_frames:
frame = self._pending_frames.pop(0)
self._validate_frame(frame, expected_sub)
return frame
while time.monotonic() < deadline:
chunk = self._transport.read(256)
if chunk:
log.debug("RX %d bytes: %s", len(chunk), chunk.hex())
frames = self._parser.feed(chunk)
if frames:
# Stash any extras so subsequent calls with reset_parser=False see them
self._pending_frames.extend(frames[1:])
frame = frames[0]
self._validate_frame(frame, expected_sub)
return frame
else:
time.sleep(0.005)
raise TimeoutError(
f"No S3 frame received within {timeout or self._recv_timeout:.1f}s"
+ (f" (expected SUB 0x{expected_sub:02X})" if expected_sub is not None else "")
)
@staticmethod
def _validate_frame(frame: S3Frame, expected_sub: Optional[int]) -> None:
"""Validate SUB; log but do not raise on bad checksum.
S3 response checksums frequently fail SUM8 validation due to inner-frame
delimiter bytes being captured as the checksum byte. The original
s3_parser.py deliberately never validates S3 checksums for exactly this
reason. We log a warning and continue.
"""
if not frame.checksum_valid:
# S3 checksums frequently fail SUM8 due to inner-frame delimiter bytes
# landing in the checksum position. Treat as informational only.
log.debug("S3 frame SUB=0x%02X: checksum mismatch (ignoring)", frame.sub)
if expected_sub is not None and frame.sub != expected_sub:
raise UnexpectedResponse(
f"Expected SUB=0x{expected_sub:02X}, got 0x{frame.sub:02X}"
)
def _drain_boot_string(self, drain_ms: int = 200) -> None:
"""
Read and discard any boot-string bytes ("Operating System") the device
may send before entering binary protocol mode.
We simply read with a short timeout and throw the bytes away. The
S3FrameParser's IDLE state already handles non-frame bytes gracefully,
but it's cleaner to drain them explicitly before the first real frame.
"""
deadline = time.monotonic() + (drain_ms / 1000)
discarded = 0
while time.monotonic() < deadline:
chunk = self._transport.read(256)
if chunk:
discarded += len(chunk)
else:
time.sleep(0.005)
if discarded:
log.debug("drain_boot_string: discarded %d bytes", discarded)