Files
serversdown 429c6ac87a feat(protocol): implement v0.14.0 SUB 5A protocol rewrite with enhanced chunk handling and new helpers
test: add regression tests for v0.14.x SUB 5A protocol fixes
refactor(logging): change warning logs to debug for less verbosity in write_blastware_file
2026-05-06 14:18:31 -04:00

1559 lines
66 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
protocol.py — High-level MiniMate Plus request/response protocol.
Implements the request/response patterns documented in
docs/instantel_protocol_reference.md on top of:
- minimateplus.framing — DLE codec, frame builder, S3 streaming parser
- minimateplus.transport — byte I/O (SerialTransport / future TcpTransport)
This module knows nothing about pyserial or TCP — it only calls
transport.write() and transport.read_until_idle().
Key patterns implemented:
- POLL startup handshake (two-step, special payload[5] format)
- Generic two-step paged read (probe → get length → fetch data)
- Response timeout + checksum validation
- Boot-string drain (device sends "Operating System" ASCII before framing)
All public methods raise ProtocolError on timeout, bad checksum, or
unexpected response SUB.
"""
from __future__ import annotations
import logging
import time
from typing import Optional
from .framing import (
S3Frame,
S3FrameParser,
build_bw_frame,
build_5a_frame,
build_bw_write_frame,
waveform_key_params,
token_params,
bulk_waveform_params,
bulk_waveform_term_params,
bulk_waveform_term_v2,
parse_strt_end_offset,
POLL_PROBE,
POLL_DATA,
SESSION_RESET,
)
from .transport import BaseTransport
log = logging.getLogger(__name__)
# ── Constants ─────────────────────────────────────────────────────────────────
# Response SUB = 0xFF - Request SUB (confirmed pattern, no known exceptions
# among read commands; one write-path exception documented for SUB 1C→6E).
def _expected_rsp_sub(req_sub: int) -> int:
return (0xFF - req_sub) & 0xFF
# SUB byte constants (request side) — see protocol reference §5.1
SUB_POLL = 0x5B
SUB_SERIAL_NUMBER = 0x15
SUB_FULL_CONFIG = 0x01
SUB_EVENT_INDEX = 0x08
SUB_CHANNEL_CONFIG = 0x06 # Event storage range read (first/last key) ✅
SUB_MONITOR_STATUS = 0x1C # Monitoring status read (battery, memory, mode) ✅
SUB_EVENT_HEADER = 0x1E
SUB_EVENT_ADVANCE = 0x1F
SUB_WAVEFORM_HEADER = 0x0A
SUB_WAVEFORM_RECORD = 0x0C
SUB_BULK_WAVEFORM = 0x5A
SUB_COMPLIANCE = 0x1A
SUB_CALL_HOME = 0x2C # Call home config read → response 0xD3 ✅
SUB_UNKNOWN_2E = 0x2E
# Write command SUBs (= Read SUB + 0x60, confirmed from BW captures 3-11-26)
# Response SUB follows the standard 0xFF - Request SUB rule.
SUB_EVENT_INDEX_WRITE = 0x68 # Write event index (0x08 + 0x60) ✅
SUB_WAVEFORM_DATA_WRITE = 0x69 # Write waveform data (0x09 + 0x60) ✅
SUB_COMPLIANCE_WRITE = 0x71 # Write compliance cfg (0x11 + 0x60) ✅
SUB_WRITE_CONFIRM_A = 0x72 # Confirm A — sent after 71×3 and other writes ✅
SUB_WRITE_CONFIRM_B = 0x73 # Confirm B — sent after 68 ✅
SUB_WRITE_CONFIRM_C = 0x74 # Confirm C — sent after 69 ✅
SUB_TRIGGER_CONFIG_WRITE = 0x82 # Write trigger config (0x22 + 0x60) ✅
SUB_TRIGGER_CONFIRM = 0x83 # Confirm trigger write ✅
# Call home write SUBs (confirmed from 4-20-26 call home settings captures)
SUB_CALL_HOME_WRITE = 0x7E # Write call home config → response 0x81 ✅
SUB_CALL_HOME_CONFIRM = 0x7F # Confirm call home write → response 0x80 ✅
# Monitoring control SUBs (confirmed from 4-8-26/2ndtry BW TX capture)
SUB_START_MONITORING = 0x96 # Start monitoring → response 0x69 ✅
SUB_STOP_MONITORING = 0x97 # Stop monitoring → response 0x68 ✅
# Erase-all SUBs (confirmed from 4-11-26 MITM capture)
# Both use token=0xFE at params[7] and return minimal 11-byte acks.
# Standard response formula applies: 0xFF - SUB.
SUB_ERASE_ALL_BEGIN = 0xA3 # Begin erase all events → response 0x5C ✅
SUB_ERASE_ALL_CONFIRM = 0xA2 # Confirm erase all events → response 0x5D ✅
# Hardcoded data lengths for the two-step read protocol.
#
# The S3 probe response page_key is always 0x0000 — it does NOT carry the
# data length back to us. Instead, each SUB has a fixed known payload size
# confirmed from BW capture analysis (offset at payload[5] of the data-request
# frame).
#
# Key: request SUB byte. Value: offset/length byte sent in the data-request.
# Entries marked 🔶 are inferred from captured frames and may need adjustment.
DATA_LENGTHS: dict[int, int] = {
SUB_POLL: 0x30, # POLL startup data block ✅
SUB_SERIAL_NUMBER: 0x0A, # 10-byte serial number block ✅
SUB_FULL_CONFIG: 0x98, # 152-byte full config block ✅
SUB_EVENT_INDEX: 0x58, # 88-byte event index ✅
SUB_CHANNEL_CONFIG: 0x24, # 36-byte event storage range (first/last key) ✅
SUB_MONITOR_STATUS: 0x2C, # 44-byte monitor status block (idle) ✅
SUB_EVENT_HEADER: 0x08, # 8-byte event header (waveform key + event data) ✅
SUB_EVENT_ADVANCE: 0x08, # 8-byte next-key response ✅
# SUB_WAVEFORM_HEADER (0x0A) is VARIABLE — length read from probe response
# data[4]. Do NOT add it here; use read_waveform_header() instead. ✅
SUB_WAVEFORM_RECORD: 0xD2, # 210-byte waveform/histogram record ✅
SUB_CALL_HOME: 0x7C, # 124-byte call home config ✅ (confirmed 4-20-26)
SUB_UNKNOWN_2E: 0x1A, # 26 bytes, purpose TBD 🔶
0x09: 0xCA, # 202 bytes, purpose TBD 🔶
# SUB_COMPLIANCE (0x1A) uses a multi-step sequence with a 2090-byte total;
# NOT handled here — requires specialised read logic.
}
# SUB 5A (BULK_WAVEFORM_STREAM) protocol constants.
#
# 2026-05-01 minimal-fix: the chunk-counter walk is now bounded by the event's
# `end_offset` extracted from the STRT record at data[23:27] of the probe
# response. Without this bound the loop kept asking for chunks past the event
# end and the device responded with post-event circular-buffer garbage,
# corrupting reconstructed Blastware files for events ≥ 2 sec.
#
# We keep the OLD 0x0400 chunk step here (BW actually uses 0x0200 — see §7.8.5
# of the protocol reference for the corrected understanding) because the
# existing blastware_file.py builder relies on the 0x0400-step frame structure
# to produce valid files. Switching to BW's 0x0200 step is a separate task
# that also requires updating the file builder.
# BW-exact protocol values (v0.14.0). Verified against 4-27-26 + 5-1-26 captures.
_BULK_CHUNK_OFFSET = 0x1002 # offset_word for probe + all chunk requests
_BULK_TERM_OFFSET = 0x005A # offset_word for the legacy terminator (fallback only)
_BULK_COUNTER_STEP = 0x0200 # chunk counter increment (matches chunk payload size)
# Default timeout values (seconds).
# MiniMate Plus is a slow device — keep these generous.
DEFAULT_RECV_TIMEOUT = 10.0
POLL_RECV_TIMEOUT = 10.0
# ── Exception ─────────────────────────────────────────────────────────────────
class ProtocolError(Exception):
"""Raised when the device violates the expected protocol."""
class TimeoutError(ProtocolError):
"""Raised when no response is received within the allowed time."""
class ChecksumError(ProtocolError):
"""Raised when a received frame has a bad checksum."""
class UnexpectedResponse(ProtocolError):
"""Raised when the response SUB doesn't match what we requested."""
# ── MiniMateProtocol ──────────────────────────────────────────────────────────
class MiniMateProtocol:
"""
Protocol state machine for one open connection to a MiniMate Plus device.
Does not own the transport — transport lifetime is managed by MiniMateClient.
Typical usage (via MiniMateClient — not directly):
proto = MiniMateProtocol(transport)
proto.startup() # POLL handshake, drain boot string
data = proto.read(SUB_FULL_CONFIG)
sn_data = proto.read(SUB_SERIAL_NUMBER)
"""
def __init__(
self,
transport: BaseTransport,
recv_timeout: float = DEFAULT_RECV_TIMEOUT,
) -> None:
self._transport = transport
self._recv_timeout = recv_timeout
self._parser = S3FrameParser()
# Extra frames buffered by _recv_one that arrived alongside the target frame.
# Used when reset_parser=False so we don't discard already-parsed frames.
self._pending_frames: list[S3Frame] = []
# ── Public API ────────────────────────────────────────────────────────────
def startup(self) -> S3Frame:
"""
Perform the POLL startup handshake and return the POLL data frame.
Steps (matching §6 Session Startup Sequence):
1. Drain any boot-string bytes ("Operating System" ASCII)
2. Send POLL_PROBE (SUB 5B, offset=0x00)
3. Receive probe ack (page_key is 0x0000; data length 0x30 is hardcoded)
4. Send POLL_DATA (SUB 5B, offset=0x30)
5. Receive data frame with "Instantel" + "MiniMate Plus" strings
Returns:
The data-phase POLL response S3Frame.
Raises:
ProtocolError: if either POLL step fails.
"""
log.debug("startup: draining boot string")
self._drain_boot_string()
# Send session-reset signal (ACK+ETX) before the first POLL probe.
# Confirmed from 4-8-26 BW TX captures: Blastware always sends this
# 2-byte signal at session start. Required to wake units that are
# actively monitoring — without it they don't respond to POLL over TCP.
log.debug("startup: session reset signal")
self._send(SESSION_RESET)
log.debug("startup: POLL probe")
self._send(POLL_PROBE)
probe_rsp = self._recv_one(
expected_sub=_expected_rsp_sub(SUB_POLL),
timeout=self._recv_timeout,
)
log.debug(
"startup: POLL probe response page_key=0x%04X", probe_rsp.page_key
)
# Send another session-reset between probe and data (matches BW behavior).
log.debug("startup: session reset signal (inter-frame)")
self._send(SESSION_RESET)
log.debug("startup: POLL data request")
self._send(POLL_DATA)
data_rsp = self._recv_one(
expected_sub=_expected_rsp_sub(SUB_POLL),
timeout=self._recv_timeout,
)
log.debug("startup: POLL data received, %d bytes", len(data_rsp.data))
return data_rsp
def read(self, sub: int) -> bytes:
"""
Execute a two-step paged read and return the data payload bytes.
Step 1: send probe frame (offset=0x00) → device sends a short ack
Step 2: send data-request (offset=DATA_LEN) → device sends the data block
The S3 probe response does NOT carry the data length — page_key is always
0x0000 in observed frames. DATA_LENGTHS holds the known fixed lengths
derived from BW capture analysis.
Args:
sub: Request SUB byte (e.g. SUB_FULL_CONFIG = 0x01).
Returns:
De-stuffed data payload bytes (payload[5:] of the response frame,
with the checksum already stripped by the parser).
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
KeyError: if sub is not in DATA_LENGTHS (caller should add it).
"""
rsp_sub = _expected_rsp_sub(sub)
# Step 1 — probe (offset = 0)
log.debug("read SUB=0x%02X: probe", sub)
self._send(build_bw_frame(sub, 0))
_probe = self._recv_one(expected_sub=rsp_sub) # ack; page_key always 0
# Look up the hardcoded data length for this SUB
if sub not in DATA_LENGTHS:
raise ProtocolError(
f"No known data length for SUB=0x{sub:02X}. "
"Add it to DATA_LENGTHS in protocol.py."
)
length = DATA_LENGTHS[sub]
log.debug("read SUB=0x%02X: data request offset=0x%02X", sub, length)
if length == 0:
log.warning("read SUB=0x%02X: DATA_LENGTHS entry is zero", sub)
return b""
# Step 2 — data-request (offset = length)
self._send(build_bw_frame(sub, length))
data_rsp = self._recv_one(expected_sub=rsp_sub)
log.debug("read SUB=0x%02X: received %d data bytes", sub, len(data_rsp.data))
return data_rsp.data
def poll(self) -> S3Frame:
"""
Send a single POLL (SUB 5B) probe+data cycle and return the data response.
This is a bare POLL cycle with no boot-string drain — use during an active
session (contrast with startup(), which drains the "Operating System" boot
string first).
Confirmed from 4-2-26 BW TX capture: BW sends exactly 3 of these POLL
cycles between the last 1F and the first 5A probe frame during every
waveform download. Without them the device ignores the 5A probe.
"""
self._send(POLL_PROBE)
self._recv_one(
expected_sub=_expected_rsp_sub(SUB_POLL),
timeout=self._recv_timeout,
)
self._send(POLL_DATA)
return self._recv_one(
expected_sub=_expected_rsp_sub(SUB_POLL),
timeout=self._recv_timeout,
)
def send_keepalive(self) -> None:
"""
Send a single POLL_PROBE keepalive without waiting for a response.
Blastware sends these every ~80ms during idle. Useful if you need to
hold the session open between real requests.
"""
self._send(POLL_PROBE)
# ── Event download API ────────────────────────────────────────────────────
def read_event_index(self) -> bytes:
"""
Send the SUB 08 (EVENT_INDEX) two-step read and return the raw 88-byte
(0x58) index block.
The index block contains:
+0x00 (3 bytes): total index size or record count — purpose partially
decoded; byte [3] may be a high byte of event count.
+0x03 (4 bytes): stored event count as uint32 BE ❓ (inferred from
captures; see §7.4 in protocol reference)
+0x07 onwards: 6-byte event timestamps (see §8), one per event
Caller is responsible for parsing the returned bytes.
Returns:
Raw 88-byte data section (data[11:11+0x58]).
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_EVENT_INDEX)
length = DATA_LENGTHS[SUB_EVENT_INDEX] # 0x58
log.debug("read_event_index: 08 probe")
self._send(build_bw_frame(SUB_EVENT_INDEX, 0))
self._recv_one(expected_sub=rsp_sub)
log.debug("read_event_index: 08 data request offset=0x%02X", length)
self._send(build_bw_frame(SUB_EVENT_INDEX, length))
data_rsp = self._recv_one(expected_sub=rsp_sub)
raw = data_rsp.data[11 : 11 + length]
log.debug("read_event_index: got %d bytes", len(raw))
return raw
def read_event_first(self, token: int = 0) -> tuple[bytes, bytes]:
"""
Send the SUB 1E (EVENT_HEADER) two-step read and return the first
waveform key and accompanying 8-byte event data block.
Args:
token: Token byte placed at params[7]. Use 0 (default) for the
initial browse-mode call that returns the first event key.
Use 0xFE for the second "download-arm" call that must be
sent between 0A and 0C when full_waveform=True; the device
will ignore 5A probe frames unless this arm step has been
issued. Confirmed from 4-2-26 and 4-3-26 BW TX captures.
Returns:
(key4, event_data8) where:
key4 — 4-byte opaque waveform record address (data[11:15])
event_data8 — full 8-byte data section (data[11:19])
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
Confirmed from 3-31-26 capture: 1E request uses all-zero params;
response data section layout is:
[LENGTH_ECHO:1][00×4][KEY_ECHO:4][00×2][KEY4:4][EXTRA:4] …
Actual data starts at data[11]; first 4 bytes are the waveform key.
"""
rsp_sub = _expected_rsp_sub(SUB_EVENT_HEADER)
length = DATA_LENGTHS[SUB_EVENT_HEADER] # 0x08
params = token_params(token)
log.debug("read_event_first: 1E probe (token=0x%02X)", token)
self._send(build_bw_frame(SUB_EVENT_HEADER, 0, params))
self._recv_one(expected_sub=rsp_sub)
log.debug("read_event_first: 1E data request offset=0x%02X (token=0x%02X)", length, token)
self._send(build_bw_frame(SUB_EVENT_HEADER, length, params))
data_rsp = self._recv_one(expected_sub=rsp_sub)
event_data8 = data_rsp.data[11:19]
key4 = data_rsp.data[11:15]
log.debug("read_event_first: key=%s", key4.hex())
return key4, event_data8
def read_waveform_header(self, key4: bytes) -> tuple[bytes, int]:
"""
Send the SUB 0A (WAVEFORM_HEADER) two-step read for *key4*.
The data length for 0A is VARIABLE and must be read from the probe
response at data[4]. Two confirmed values:
0x46 (70) — full triggered event (has 0C waveform record to follow)
0x2C (44) — partial / monitor-log entry (no 0C record; 0A header only)
Args:
key4: 4-byte waveform record address from 1E or 1F.
Returns:
(raw_data, record_length) where:
raw_data — complete data_rsp.data bytes (full response payload)
record_length — DATA_LENGTH read from probe (0x46 for full, 0x2C for partial)
The raw_data layout:
raw_data[0] = record type (0x46 = full triggered event, 0x2C = partial/monitor)
raw_data[1:5] = 0x00 × 4
raw_data[5:9] = event key (4 bytes)
raw_data[9:11] = 0x00 × 2
raw_data[11:] = timestamps + separator + serial + channel strings
(see MonitorLogEntry in models.py for full layout)
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
Confirmed from 4-11-26 MITM capture: 0A probe response data[4] carries
the variable length; data-request uses that length as the offset byte.
record_length == data[0] in virtually all cases (confirmed empirically).
"""
rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_HEADER)
params = waveform_key_params(key4)
log.debug("read_waveform_header: 0A probe key=%s", key4.hex())
self._send(build_bw_frame(SUB_WAVEFORM_HEADER, 0, params))
probe_rsp = self._recv_one(expected_sub=rsp_sub)
# Variable length — read from probe response data[4]
length = probe_rsp.data[4] if len(probe_rsp.data) > 4 else 0x46
log.debug("read_waveform_header: 0A data request offset=0x%02X", length)
if length == 0:
return b"", 0
self._send(build_bw_frame(SUB_WAVEFORM_HEADER, length, params))
data_rsp = self._recv_one(expected_sub=rsp_sub)
log.debug(
"read_waveform_header: key=%s length=0x%02X is_full=%s",
key4.hex(), length, length >= 0x40,
)
return data_rsp.data, length
def read_waveform_data_raw(self) -> bytes:
"""
Send the SUB 09 (WAVEFORM_DATA) two-step read and return the raw
202-byte (0xCA) waveform data block.
This is the "waveform data" block that Blastware reads from the device
before the write sequence (confirmed from 3-11-26 BW TX capture BW[80-81]).
The returned bytes are used verbatim as the ``waveform_data`` payload for
``write_waveform_data()`` / ``push_config_raw()``.
Returns:
Raw data section starting at data[11:], typically 204 bytes.
(data[11 : 11 + 0xCA] = 202 bytes on some firmware; the actual
length may be 204 depending on firmware version.)
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
"""
SUB_WAVEFORM_DATA = 0x09
rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_DATA) # 0xFF - 0x09 = 0xF6
length = DATA_LENGTHS[SUB_WAVEFORM_DATA] # 0xCA = 202
log.debug("read_waveform_data_raw: 09 probe")
self._send(build_bw_frame(SUB_WAVEFORM_DATA, 0))
self._recv_one(expected_sub=rsp_sub)
log.debug("read_waveform_data_raw: 09 data request offset=0x%02X", length)
self._send(build_bw_frame(SUB_WAVEFORM_DATA, length))
data_rsp = self._recv_one(expected_sub=rsp_sub)
raw = data_rsp.data[11:]
log.debug("read_waveform_data_raw: got %d bytes", len(raw))
return raw
def read_waveform_record(self, key4: bytes) -> bytes:
"""
Send the SUB 0C (WAVEFORM_RECORD / FULL_WAVEFORM_RECORD) two-step read.
Returns the 210-byte waveform/histogram record containing:
- Record type string ("Histogram" or "Waveform") at a variable offset
- Per-channel labels ("Tran", "Vert", "Long", "MicL") with PPV floats
at label_offset + 6
Args:
key4: 4-byte waveform record address.
Returns:
210-byte record bytes (data[11:11+0xD2]).
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
Confirmed from 3-31-26 capture: 0C always uses offset=0xD2 (210 bytes).
"""
rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_RECORD)
length = DATA_LENGTHS[SUB_WAVEFORM_RECORD] # 0xD2
params = waveform_key_params(key4)
log.debug("read_waveform_record: 0C probe key=%s", key4.hex())
self._send(build_bw_frame(SUB_WAVEFORM_RECORD, 0, params))
self._recv_one(expected_sub=rsp_sub)
log.debug("read_waveform_record: 0C data request offset=0x%02X", length)
self._send(build_bw_frame(SUB_WAVEFORM_RECORD, length, params))
data_rsp = self._recv_one(expected_sub=rsp_sub)
record = data_rsp.data[11:11 + length]
log.debug("read_waveform_record: received %d record bytes", len(record))
return record
def read_bulk_waveform_stream(
self,
key4: bytes,
*,
stop_after_metadata: bool = True, # DEPRECATED — no-op under BW-exact walk
max_chunks: int = 256, # safety cap only; loop is bounded by end_offset
include_terminator: bool = False,
extra_chunks_after_metadata: int = 1, # DEPRECATED — no-op
) -> list[S3Frame]:
"""
Download the SUB 5A (BULK_WAVEFORM_STREAM) A5 frames for one event using
Blastware's exact protocol. REWRITTEN 2026-05-02 (v0.14.0).
Algorithm (matches BW captures across 2-sec / 3-sec / event-2):
1. Probe
- For events at start_key[2:4] = 0x0000 (first event after erase
/ wrap): probe at counter=0x0000 with full key in params.
- For continuation events (start_key[2:4] != 0): first chunk at
counter = start_key[2:4] + 0x0046; acts as both probe and
first sample chunk; response carries STRT.
2. Parse end_offset from STRT record at data[23:27] of the probe response.
3. Read two fixed metadata pages at counter=0x1002 and counter=0x1004
— global session metadata (Project / Client / User Name / Seis Loc
/ Extended Notes ASCII strings). Event 1 only; continuation
events skip these (BW caches them across the session).
4. Walk sample chunks at 0x0200 increments, starting from 0x0600 for
event 1 or `start + 0x0046 + 0x0200` for continuation events.
Stop when `next_chunk + 0x0200 > end_offset`.
5. Send TERM frame with offset_word and params computed by
`bulk_waveform_term_v2(key4, end_offset, last_chunk_counter)`.
The TERM response contains the partial last chunk (residual =
end_offset - next_boundary) including the 26-byte 0e 08 file
footer.
Returns:
List of S3Frame objects from each A5 response (probe, metadata
pages, sample chunks, optional TERM response). Caller passes
`include_terminator=True` (e.g. write_blastware_file) to keep the
TERM response in the list — it's required to reconstruct the
file footer.
Deprecated kwargs:
stop_after_metadata: legacy "Project:"-string-based stop condition.
No-op under the BW-exact walk; the loop is
deterministically bounded by end_offset from
STRT. Accepted for backward compat.
extra_chunks_after_metadata: same.
Raises:
ProtocolError: on timeout / bad checksum / unexpected SUB.
"""
if len(key4) != 4:
raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}")
# Quietly accept and warn on deprecated kwargs.
if not stop_after_metadata:
log.debug("5A: stop_after_metadata=False is no-op under BW-exact walk")
if extra_chunks_after_metadata not in (0, 1):
log.debug("5A: extra_chunks_after_metadata=%d is no-op under BW-exact walk",
extra_chunks_after_metadata)
rsp_sub = _expected_rsp_sub(SUB_BULK_WAVEFORM) # 0xA5
frames_data: list[S3Frame] = []
start_offset = (key4[2] << 8) | key4[3]
is_event_1 = (start_offset == 0)
# ── Step 1: probe / first chunk ──────────────────────────────────────
if is_event_1:
probe_counter = 0
probe_params = bulk_waveform_params(key4, 0, is_probe=True)
log.debug("5A probe (event-1) key=%s counter=0x0000", key4.hex())
else:
# Continuation events: first 5A request lands at counter = key[2:4]
# (i.e. the address of the off=0x46 WAVEHDR record returned by 1F).
# The probe response carries STRT at byte 17 with end_offset.
#
# Confirmed 2026-05-04 from 5-1-26 "copy 2nd address" capture
# (BW probes counter=0x2238 with key=01112238, STRT@17 end=0x417E)
# and 5-4-26 BW captures (2-sec event probes counter=0x2238).
#
# The earlier "+0x46" formula in the doc came from calling
# start_key the BOUNDARY (off=0x2C) key, but the iteration walk
# uses 1F's off=0x46 key as cur_key, which already incorporates
# the +0x46 offset relative to the boundary. Adding it again
# caused the probe to overshoot, miss STRT, and run uncapped.
probe_counter = start_offset
probe_params = bulk_waveform_params(key4, probe_counter)
log.debug(
"5A probe (event-N) key=%s counter=0x%04X",
key4.hex(), probe_counter,
)
self._send(build_5a_frame(_BULK_CHUNK_OFFSET, probe_params))
self._parser.reset()
try:
rsp = self._recv_one(expected_sub=rsp_sub, reset_parser=False)
except TimeoutError:
log.warning(
"5A probe TIMED OUT for key=%s%d raw bytes received",
key4.hex(), self._parser.bytes_fed,
)
raise
frames_data.append(rsp)
log.debug("5A A5[0] (probe) page_key=0x%04X %d bytes",
rsp.page_key, len(rsp.data))
# ── Step 2: parse STRT end_offset from probe response ────────────────
end_offset = parse_strt_end_offset(rsp.data)
if end_offset is None:
log.warning(
"5A probe response did not contain a STRT record; "
"cannot bound chunk loop — falling back to max_chunks=%d cap",
max_chunks,
)
end_offset = 0xFFFF # impossible value → loop runs to max_chunks
else:
log.info(
"5A STRT start_offset=0x%04X end_offset=0x%04X size=0x%04X",
start_offset, end_offset, end_offset - start_offset,
)
# ── Step 3: metadata pages 0x1002 + 0x1004 (event 1 only) ────────────
# Confirmed from BW captures: BW reads these two fixed device-buffer
# pages immediately after the probe for events at start_key[2:4]=0.
# Continuation events skip them (BW caches across the session).
# Their content is global compliance-setup metadata: Project, Client,
# User Name, Seis Loc, Extended Notes.
if is_event_1:
for meta_counter in (0x1002, 0x1004):
# Metadata page params have an extra trailing 0x00 byte
# (12-byte params instead of 11) — empirical from BW captures.
# Checksum-neutral but matches BW byte-for-byte.
meta_params = bytes([
0x00,
key4[0], key4[1],
(meta_counter >> 8) & 0xFF,
meta_counter & 0xFF,
0, 0, 0, 0, 0, 0, 0,
])
log.debug("5A metadata page counter=0x%04X", meta_counter)
self._send(build_5a_frame(_BULK_CHUNK_OFFSET, meta_params))
self._parser.reset()
try:
meta_rsp = self._recv_one(
expected_sub=rsp_sub, reset_parser=False, timeout=10.0,
)
except TimeoutError:
log.warning(
"5A metadata page 0x%04X TIMED OUT — continuing",
meta_counter,
)
continue
frames_data.append(meta_rsp)
log.debug(
"5A meta@0x%04X page_key=0x%04X %d bytes",
meta_counter, meta_rsp.page_key, len(meta_rsp.data),
)
# ── Step 4: sample chunk loop, bounded by end_offset ─────────────────
# Sample chunks start at:
# event 1: counter = 0x0600
# event N (>0): counter = probe_counter + 0x0200
# (probe was the first sample chunk)
if is_event_1:
counter = 0x0600
else:
counter = probe_counter + _BULK_COUNTER_STEP
last_chunk_counter: Optional[int] = (
probe_counter if not is_event_1 else None
)
chunks_fetched = 0
while chunks_fetched < max_chunks:
# Stop when next chunk would straddle the event end.
if counter + _BULK_COUNTER_STEP > end_offset:
log.debug(
"5A chunk loop done at counter=0x%04X (end=0x%04X); "
"%d chunks fetched",
counter, end_offset, chunks_fetched,
)
break
params = bulk_waveform_params(key4, counter)
log.debug("5A chunk #%d counter=0x%04X", chunks_fetched + 1, counter)
self._send(build_5a_frame(_BULK_CHUNK_OFFSET, params))
self._parser.reset()
try:
rsp = self._recv_one(
expected_sub=rsp_sub, reset_parser=False, timeout=10.0,
)
except TimeoutError:
raw = self._parser.bytes_fed
log.warning(
"5A TIMEOUT chunk=%d counter=0x%04X raw_bytes=%d",
chunks_fetched + 1, counter, raw,
)
if raw > 0 and frames_data:
log.warning(
"5A unexpected end-of-stream — proceeding to TERM",
)
break
raise
log.debug(
"5A RX chunk=%d page_key=0x%04X data_len=%d",
chunks_fetched + 1, rsp.page_key, len(rsp.data),
)
if rsp.page_key == 0x0000:
# Device terminated mid-stream unexpectedly.
log.warning(
"5A unexpected page_key=0x0000 mid-stream at counter=0x%04X",
counter,
)
if include_terminator:
frames_data.append(rsp)
return frames_data
frames_data.append(rsp)
last_chunk_counter = counter
counter += _BULK_COUNTER_STEP
chunks_fetched += 1
else:
log.warning(
"5A reached max_chunks=%d at counter=0x%04X (end=0x%04X)",
max_chunks, counter, end_offset,
)
# ── Step 5: TERM with proper end_offset-derived formula ──────────────
if last_chunk_counter is None or end_offset == 0xFFFF:
# No STRT or no chunks fetched — fall back to legacy TERM.
log.warning(
"5A using legacy TERM (offset_word=0x005A); "
"end_offset unavailable or no chunks fetched",
)
legacy_counter = (last_chunk_counter or probe_counter) + _BULK_COUNTER_STEP
term_offset_word = _BULK_TERM_OFFSET # 0x005A
term_params = bulk_waveform_term_params(key4, legacy_counter)
else:
term_offset_word, term_params = bulk_waveform_term_v2(
key4, end_offset, last_chunk_counter,
)
log.debug(
"5A TERM offset_word=0x%04X params[2:4]=%s end=0x%04X "
"last_chunk=0x%04X",
term_offset_word, term_params[2:4].hex(),
end_offset, last_chunk_counter,
)
self._send(build_5a_frame(term_offset_word, term_params))
try:
term_rsp = self._recv_one(expected_sub=rsp_sub, timeout=10.0)
log.info(
"5A TERM response page_key=0x%04X %d bytes",
term_rsp.page_key, len(term_rsp.data),
)
if include_terminator:
frames_data.append(term_rsp)
except TimeoutError:
log.warning("5A no TERM response (timeout)")
return frames_data
def advance_event(self, browse: bool = False) -> tuple[bytes, bytes]:
"""
Send the SUB 1F (EVENT_ADVANCE) two-step read and return the next
waveform key and the full 8-byte event data block.
browse=False (default, download mode): sends token=0xFE at params[7].
Used by get_events() — the token causes the device to skip partial
histogram bins and return the key of the next FULL record.
browse=True: sends all-zero params (no token). Matches Blastware's
confirmed browse-mode sequence: 0A → 1F(zeros) → 0A → 1F(zeros).
Used by count_events() where no 0C/5A download occurs.
IMPORTANT: A preceding 0A (read_waveform_header) call is REQUIRED in
both modes to establish device waveform context. Without it, 1F
returns the null sentinel regardless of how many events are stored.
Returns:
(key4, event_data8) where:
key4 — 4-byte opaque waveform record address (data[11:15]).
event_data8 — full 8-byte block (data[11:19]).
End-of-events sentinel: event_data8[4:8] == b'\\x00\\x00\\x00\\x00'.
DO NOT use key4 == b'\\x00\\x00\\x00\\x00' as the sentinel — key4 is
all-zeros for event 0 (the very first stored event) and will cause the
loop to terminate prematurely.
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_EVENT_ADVANCE)
length = DATA_LENGTHS[SUB_EVENT_ADVANCE] # 0x08
params = token_params(0) if browse else token_params(0xFE)
mode = "browse" if browse else "download"
log.debug("advance_event: 1F probe mode=%s params=%s", mode, params.hex())
self._send(build_bw_frame(SUB_EVENT_ADVANCE, 0, params))
self._recv_one(expected_sub=rsp_sub)
log.debug("advance_event: 1F data request offset=0x%02X", length)
self._send(build_bw_frame(SUB_EVENT_ADVANCE, length, params))
data_rsp = self._recv_one(expected_sub=rsp_sub)
event_data8 = data_rsp.data[11:19]
key4 = data_rsp.data[11:15]
is_done = event_data8[4:8] == b"\x00\x00\x00\x00"
log.debug(
"advance_event: next key=%s data8=%s done=%s",
key4.hex(), event_data8.hex(), is_done,
)
return key4, event_data8
def read_compliance_config(self) -> bytes:
"""
Send the SUB 1A (COMPLIANCE_CONFIG) multi-step read and accumulate
all E5 response frames into a single config byte string.
BE18189 sends the full config in one large E5 frame (~4245 cfg bytes).
BE11529 appears to chunk the response — each E5 frame carries ~44 bytes
of cfg data. This method loops until the expected 0x082A (2090) bytes
are accumulated or the inter-frame gap exceeds _INTER_FRAME_TIMEOUT.
Frame structure (confirmed from raw BW captures 3-11-26):
Probe (Frame A): byte[5]=0x00, params[7]=0x64
Data req (Frame D): byte[5]=0x2A, params[2]=0x08, params[7]=0x64
0x082A split: byte[5]=0x2A (offset low), params[2]=0x08 (length high)
params[7]=0x64 required in both probe and data-request.
Returns:
Accumulated compliance config bytes. First frame: data[11:] (skips
11-byte echo header). Subsequent frames: structure logged and
accumulated from data[11:] as well — adjust offset if structure differs.
Raises:
ProtocolError: if the very first E5 frame is not received (hard timeout).
"""
rsp_sub = _expected_rsp_sub(SUB_COMPLIANCE)
# Probe — params[7]=0x64 required (confirmed from BW capture)
_PROBE_PARAMS = bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00])
log.debug("read_compliance_config: 1A probe")
self._send(build_bw_frame(SUB_COMPLIANCE, 0, _PROBE_PARAMS))
self._recv_one(expected_sub=rsp_sub)
# Frame D params — offset=0x002A, params[2]=0x08, params[7]=0x64
_DATA_PARAMS = bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00])
# ── Multi-request accumulation ────────────────────────────────────────
#
# Full BW sequence (confirmed from raw_bw captures 3-11-26):
#
# Frame B: offset=0x0400 params[2]=0x00 → requests cfg bytes 0..1023
# Frame C: offset=0x0400 params[2]=0x04 → requests cfg bytes 1024..2047
# Frame D: offset=0x002A params[2]=0x08 → requests cfg bytes 2048..2089
#
# Total: 0x0400 + 0x0400 + 0x002A = 0x082A = 2090 bytes.
#
# The "offset" field in B and C encodes the chunk length (0x0400 = 1024),
# not a byte offset into the config. params[2] tracks cumulative pages
# (0x00 → 0x04 → 0x08; each page = 256 bytes → 0x04 pages = 1024 bytes).
#
# Each request gets its own E5 response with an 11-byte echo header.
# Devices that send the full block in a single frame (BE18189) may return
# the entire config from the last request alone — we handle both cases by
# trying each step and concatenating whatever arrives.
_DATA_PARAMS_B = bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00])
_DATA_PARAMS_C = bytes([0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00])
# _DATA_PARAMS_D already built above as _DATA_PARAMS
_STEPS = [
("B", 0x0400, _DATA_PARAMS_B),
("C", 0x0400, _DATA_PARAMS_C),
("D", 0x002A, _DATA_PARAMS), # _DATA_PARAMS built above
]
config = bytearray()
for step_name, step_offset, step_params in _STEPS:
log.debug(
"read_compliance_config: sending frame %s offset=0x%04X params=%s",
step_name, step_offset, step_params.hex(),
)
self._send(build_bw_frame(SUB_COMPLIANCE, step_offset, step_params))
try:
data_rsp = self._recv_one(expected_sub=rsp_sub)
except TimeoutError:
log.warning(
"read_compliance_config: frame %s — no E5 response (timeout)",
step_name,
)
continue
chunk = data_rsp.data[11:]
log.debug(
"read_compliance_config: frame %s page=0x%04X data=%d cfg_chunk=%d running_total=%d",
step_name, data_rsp.page_key, len(data_rsp.data),
len(chunk), len(config) + len(chunk),
)
config.extend(chunk)
# Safety drain: catch any extra frame the device may buffer on slow links.
try:
tail_rsp = self._recv_one(expected_sub=rsp_sub, timeout=2.0)
tail_chunk = tail_rsp.data[11:]
log.warning(
"read_compliance_config: unexpected tail frame page=0x%04X "
"cfg_chunk=%d running_total=%d",
tail_rsp.page_key, len(tail_chunk), len(config) + len(tail_chunk),
)
config.extend(tail_chunk)
except TimeoutError:
pass
log.info(
"read_compliance_config: done — %d cfg bytes total",
len(config),
)
# Hex dump first 128 bytes — useful only for field-mapping work, not normal operation.
if log.isEnabledFor(logging.DEBUG):
for row in range(0, min(len(config), 128), 16):
row_bytes = bytes(config[row:row + 16])
hex_part = ' '.join(f'{b:02x}' for b in row_bytes)
asc_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in row_bytes)
log.debug(" cfg[%04x]: %-48s %s", row, hex_part, asc_part)
return bytes(config)
# ── Write commands (SUBs 6883) ───────────────────────────────────────────
def recv_write_ack(
self,
expected_sub: int,
timeout: Optional[float] = None,
) -> S3Frame:
"""
Wait for a write-ack S3 frame.
All write ack responses are 17-byte frames (11-byte header + no data +
1 checksum byte) with SUB = 0xFF - request_SUB. The page_key and data
section carry zeros. Confirmed from 3-11-26 BW capture.
Args:
expected_sub: Expected response SUB byte (0xFF - write_request_SUB).
timeout: Seconds to wait; defaults to self._recv_timeout.
Returns:
The ack S3Frame.
Raises:
TimeoutError: if no frame arrives in time.
UnexpectedResponse: if the response SUB doesn't match.
"""
log.debug("recv_write_ack: waiting for SUB=0x%02X", expected_sub)
ack = self._recv_one(expected_sub=expected_sub, timeout=timeout)
log.debug(
"recv_write_ack: received SUB=0x%02X page=0x%04X data=%d bytes",
ack.sub, ack.page_key, len(ack.data),
)
return ack
def write_confirm(self, sub: int) -> S3Frame:
"""
Send a zero-data confirm frame and wait for the ack.
Confirm frames (SUBs 72, 73, 74, 83) carry no write data — they are
16-byte header-only frames (offset=0, params=zeros, data=b"") with the
DLE-aware large-frame checksum. The device acks with the complementary
RSP_SUB.
Args:
sub: Confirm SUB byte (SUB_WRITE_CONFIRM_A/B/C or SUB_TRIGGER_CONFIRM).
Returns:
The ack S3Frame.
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(sub)
frame = build_bw_write_frame(sub, b"")
log.debug("write_confirm: SUB=0x%02X frame=%s", sub, frame.hex())
self._send(frame)
return self.recv_write_ack(expected_sub=rsp_sub)
def write_event_index(self, data: bytes) -> S3Frame:
"""
Send a SUB 68 (EVENT_INDEX_WRITE) frame and await the confirm ack (SUB 97).
Offset formula: data[1] + 2 — confirmed from 3-11-26 BW TX capture frame 102.
The write payload has a 2-byte header [0x00][length] where data[1] encodes
the length of the meaningful payload; offset = data[1] + 2.
Example from capture:
data[0:4] = 00 58 09 00 (data[1]=0x58=88 → offset=0x5A=90)
data length = 91, offset = 90
Write sequence fragment:
68 (data) → device acks with SUB 0x97
73 (confirm) → device acks with SUB 0x8C
Callers should call write_confirm(SUB_WRITE_CONFIRM_B) after this.
Args:
data: Raw event-index payload bytes to write to the device.
Must be at least 2 bytes. data[1] must contain the length field.
Returns:
The SUB 0x97 ack frame.
Raises:
ProtocolError: on timeout or wrong response SUB.
ValueError: if data is shorter than 2 bytes.
"""
if len(data) < 2:
raise ValueError(f"event index write data must be at least 2 bytes, got {len(data)}")
rsp_sub = _expected_rsp_sub(SUB_EVENT_INDEX_WRITE) # 0xFF - 0x68 = 0x97
offset = data[1] + 2
frame = build_bw_write_frame(SUB_EVENT_INDEX_WRITE, data, offset=offset)
log.debug(
"write_event_index: %d bytes data[1]=0x%02X offset=0x%04X rsp_sub=0x%02X",
len(data), data[1], offset, rsp_sub,
)
self._send(frame)
return self.recv_write_ack(expected_sub=rsp_sub)
def write_waveform_data(self, data: bytes) -> S3Frame:
"""
Send a SUB 69 (WAVEFORM_DATA_WRITE) frame and await the confirm ack (SUB 96).
Offset formula: data[1] + 2 — same pattern as write_event_index().
Confirmed from 3-11-26 BW TX capture frame 110:
data[0:4] = 00 c8 08 00 (data[1]=0xC8=200 → offset=0xCA=202)
data length = 204, offset = 202
Write sequence fragment:
69 (data) → device acks with SUB 0x96
74 (confirm) → device acks with SUB 0x8B
72 (confirm) → device acks with SUB 0x8D
Callers should call write_confirm(SUB_WRITE_CONFIRM_C) then
write_confirm(SUB_WRITE_CONFIRM_A) after this.
Args:
data: Raw waveform-data payload bytes to write.
Must be at least 2 bytes. data[1] must contain the length field.
Returns:
The SUB 0x96 ack frame.
Raises:
ProtocolError: on timeout or wrong response SUB.
ValueError: if data is shorter than 2 bytes.
"""
if len(data) < 2:
raise ValueError(f"waveform data write payload must be at least 2 bytes, got {len(data)}")
rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_DATA_WRITE) # 0xFF - 0x69 = 0x96
offset = data[1] + 2
frame = build_bw_write_frame(SUB_WAVEFORM_DATA_WRITE, data, offset=offset)
log.debug(
"write_waveform_data: %d bytes data[1]=0x%02X offset=0x%04X rsp_sub=0x%02X",
len(data), data[1], offset, rsp_sub,
)
self._send(frame)
return self.recv_write_ack(expected_sub=rsp_sub)
def write_compliance_config_raw(self, data: bytes) -> None:
"""
Send the SUB 71 (COMPLIANCE_WRITE) 3-chunk sequence and final confirm.
The full compliance config payload (~2128 bytes) is split into exactly 3
chunks with hardcoded boundaries and params confirmed from the 3-11-26 BW
TX capture (frames 104108):
Chunk 1 — first 1027 bytes:
offset=0x1004 params=bytes(10)
device acks SUB 0x8E
Chunk 2 — next 1055 bytes:
offset=0x1004 params=b'\\x00\\x00\\x00\\x00\\x10\\x04' + b'\\x00'*4
device acks SUB 0x8E
Chunk 3 — remaining bytes:
offset=0x002C params=b'\\x00\\x00\\x08' + b'\\x00'*7
device acks SUB 0x8E
Confirm — SUB 72 (zero data):
device acks SUB 0x8D
The total write payload should be at least 1027+1055=2082 bytes; chunk 3
carries everything after offset 2082 (typically ~46 bytes for a 2128-byte
config).
Args:
data: Raw compliance config bytes to write. Must be at least 2082 bytes.
Raises:
ValueError: if data is too short to fill chunks 1 and 2.
ProtocolError: on timeout or wrong response SUB from any chunk.
"""
_CHUNK1_SIZE = 1027
_CHUNK2_SIZE = 1055
_CHUNK1_OFFSET = 0x1004
_CHUNK2_OFFSET = 0x1004
_CHUNK3_OFFSET = 0x002C
_CHUNK1_PARAMS = bytes(10)
_CHUNK2_PARAMS = bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00])
_CHUNK3_PARAMS = bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
min_size = _CHUNK1_SIZE + _CHUNK2_SIZE
if len(data) < min_size:
raise ValueError(
f"Compliance write data too short: {len(data)} bytes, "
f"need at least {min_size} (chunk1={_CHUNK1_SIZE} + chunk2={_CHUNK2_SIZE})"
)
rsp_sub = _expected_rsp_sub(SUB_COMPLIANCE_WRITE) # 0xFF - 0x71 = 0x8E
chunk1 = data[:_CHUNK1_SIZE]
chunk2 = data[_CHUNK1_SIZE : _CHUNK1_SIZE + _CHUNK2_SIZE]
chunk3 = data[_CHUNK1_SIZE + _CHUNK2_SIZE :]
chunks = [
(1, chunk1, _CHUNK1_OFFSET, _CHUNK1_PARAMS),
(2, chunk2, _CHUNK2_OFFSET, _CHUNK2_PARAMS),
(3, chunk3, _CHUNK3_OFFSET, _CHUNK3_PARAMS),
]
for chunk_num, chunk_data, chunk_offset, chunk_params in chunks:
frame = build_bw_write_frame(
SUB_COMPLIANCE_WRITE,
chunk_data,
offset=chunk_offset,
params=chunk_params,
)
log.debug(
"write_compliance_config_raw: chunk %d %d bytes "
"offset=0x%04X params=%s",
chunk_num, len(chunk_data), chunk_offset, chunk_params.hex(),
)
self._send(frame)
self.recv_write_ack(expected_sub=rsp_sub)
log.debug("write_compliance_config_raw: chunk %d acked", chunk_num)
# Final confirm (SUB 72)
log.debug("write_compliance_config_raw: sending confirm (SUB 0x72)")
self.write_confirm(SUB_WRITE_CONFIRM_A)
log.debug("write_compliance_config_raw: done")
def write_trigger_config(self, data: bytes) -> S3Frame:
"""
Send a SUB 82 (TRIGGER_CONFIG_WRITE) frame and await the confirm ack (SUB 7D).
Offset formula: data[1] + 2 — same pattern as write_event_index().
Confirmed from 3-11-26 BW TX capture frame 108:
data[0:4] = 00 1a d5 00 (data[1]=0x1A=26 → offset=0x1C=28)
data length = 29, offset = 28
Write sequence fragment:
82 (data) → device acks with SUB 0x7D
83 (confirm) → device acks with SUB 0x7C
Callers should call write_confirm(SUB_TRIGGER_CONFIRM) after this.
Args:
data: Raw trigger-config payload bytes to write.
Must be at least 2 bytes. data[1] must contain the length field.
Returns:
The SUB 0x7D ack frame.
Raises:
ProtocolError: on timeout or wrong response SUB.
ValueError: if data is shorter than 2 bytes.
"""
if len(data) < 2:
raise ValueError(f"trigger config write payload must be at least 2 bytes, got {len(data)}")
rsp_sub = _expected_rsp_sub(SUB_TRIGGER_CONFIG_WRITE) # 0xFF - 0x82 = 0x7D
offset = data[1] + 2
frame = build_bw_write_frame(SUB_TRIGGER_CONFIG_WRITE, data, offset=offset)
log.debug(
"write_trigger_config: %d bytes data[1]=0x%02X offset=0x%04X rsp_sub=0x%02X",
len(data), data[1], offset, rsp_sub,
)
self._send(frame)
return self.recv_write_ack(expected_sub=rsp_sub)
# ── Call home config (SUBs 0x2C / 0x7E / 0x7F) ──────────────────────────
def read_call_home_config(self) -> bytes:
"""
Read the auto call home configuration (SUB 0x2C → response 0xD3).
Standard two-step read: probe (offset=0x00) then data (offset=0x7C=124).
Returns the raw 125-byte payload (data[11:] of the data response).
Confirmed from 4-20-26 call home settings capture:
- Probe response: data[4]=0x7C (confirms data length = 124)
- Data response: 136 bytes total (11-byte echo header + 125 bytes payload)
- Payload[0:3] = 0x00 0x7C 0xDC (header: zero, inner-length, constant)
- Payload[5] = auto_call_home_enabled
- Payload[6:46] = dial_string (40-byte null-padded ASCII "RADIO RING")
Returns:
Raw 125-byte call home config payload (data[11:]).
Suitable for round-trip write (append \\x00\\x00 → 127-byte write payload).
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_CALL_HOME) # 0xFF - 0x2C = 0xD3
length = DATA_LENGTHS[SUB_CALL_HOME] # 0x7C = 124
log.debug("read_call_home_config: 0x2C probe")
self._send(build_bw_frame(SUB_CALL_HOME, 0))
self._recv_one(expected_sub=rsp_sub)
log.debug("read_call_home_config: 0x2C data request offset=0x%02X", length)
self._send(build_bw_frame(SUB_CALL_HOME, length))
data_rsp = self._recv_one(expected_sub=rsp_sub)
payload = data_rsp.data[11:]
log.debug("read_call_home_config: received %d payload bytes", len(payload))
return payload
def write_call_home_config(self, data: bytes) -> None:
"""
Write the auto call home configuration (SUB 0x7E → 0x7F confirm).
Write sequence (confirmed from 4-20-26 call home settings captures):
SUB 0x7E write 127-byte payload → device acks SUB 0x81
SUB 0x7F confirm (no data) → device acks SUB 0x80
The 127-byte write payload = 125-byte read payload + b'\\x00\\x00'.
The offset field = data[1] + 2 = 0x7C + 2 = 0x7E = 126.
Write frame format: build_bw_write_frame (minimal DLE stuffing — only
BW_CMD is doubled; all other bytes are RAW). The \\x10\\x03 sequence
within the payload is preserved as-is (device interprets DLE+ETX as the
literal value 0x03 per the inner-frame terminator convention).
Args:
data: 127-byte write payload (read payload + \\x00\\x00 footer).
Must start with [0x00][0x7C][...] (standard header).
Raises:
ValueError: if data is not exactly 127 bytes or lacks expected header.
ProtocolError: on timeout or wrong response SUB.
"""
if len(data) < 2:
raise ValueError(f"call home write payload must be at least 2 bytes, got {len(data)}")
rsp_sub_write = _expected_rsp_sub(SUB_CALL_HOME_WRITE) # 0xFF - 0x7E = 0x81
rsp_sub_confirm = _expected_rsp_sub(SUB_CALL_HOME_CONFIRM) # 0xFF - 0x7F = 0x80
# Offset formula: data[1] + 2 (same pattern as other single-chunk writes)
offset = data[1] + 2 # 0x7C + 2 = 0x7E = 126
frame = build_bw_write_frame(SUB_CALL_HOME_WRITE, data, offset=offset)
log.debug(
"write_call_home_config: %d bytes data[1]=0x%02X offset=0x%04X",
len(data), data[1], offset,
)
self._send(frame)
self.recv_write_ack(expected_sub=rsp_sub_write)
log.debug("write_call_home_config: write acked; sending confirm 0x7F")
confirm_frame = build_bw_write_frame(SUB_CALL_HOME_CONFIRM, b"")
self._send(confirm_frame)
self.recv_write_ack(expected_sub=rsp_sub_confirm)
log.debug("write_call_home_config: confirm acked — done")
# ── Monitoring ────────────────────────────────────────────────────────────
def read_monitor_status(self) -> S3Frame:
"""
Read monitoring status (SUB 0x1C → response 0xE3).
Two-step read: probe (offset=0x00) then data (offset=0x2C).
Returns:
S3Frame with 44 bytes of status data (idle state).
When unit is actively monitoring the payload is shorter (12 bytes);
callers should check frame data length to determine mode.
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_MONITOR_STATUS) # 0xFF - 0x1C = 0xE3
log.debug("read_monitor_status: probe step rsp_sub=0x%02X", rsp_sub)
probe_frame = build_bw_frame(SUB_MONITOR_STATUS, offset=0x00)
self._send(probe_frame)
self._recv_one(expected_sub=rsp_sub)
log.debug("read_monitor_status: data step offset=0x%02X", DATA_LENGTHS[SUB_MONITOR_STATUS])
data_frame = build_bw_frame(SUB_MONITOR_STATUS, offset=DATA_LENGTHS[SUB_MONITOR_STATUS])
self._send(data_frame)
return self._recv_one(expected_sub=rsp_sub)
def start_monitoring(self) -> S3Frame:
"""
Send Start Monitoring command (SUB 0x96 → response 0x69).
Single write frame, no data payload. Confirmed from 4-8-26/2ndtry
BW TX capture frame 92.
Returns:
S3Frame ack from device.
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_START_MONITORING) # 0xFF - 0x96 = 0x69
log.debug("start_monitoring: rsp_sub=0x%02X", rsp_sub)
frame = build_bw_write_frame(SUB_START_MONITORING, b"")
self._send(frame)
return self.recv_write_ack(expected_sub=rsp_sub)
def stop_monitoring(self) -> S3Frame:
"""
Send Stop Monitoring command (SUB 0x97 → response 0x68).
Single write frame, no data payload. Confirmed from 4-8-26/2ndtry
BW TX capture frame 305.
Returns:
S3Frame ack from device.
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_STOP_MONITORING) # 0xFF - 0x97 = 0x68
log.debug("stop_monitoring: rsp_sub=0x%02X", rsp_sub)
frame = build_bw_write_frame(SUB_STOP_MONITORING, b"")
self._send(frame)
return self.recv_write_ack(expected_sub=rsp_sub)
def read_event_storage_range(self) -> S3Frame:
"""
Read event storage range (SUB 0x06 → response 0xF9).
Two-step read: probe (offset=0x00) then data (offset=0x24 = 36 bytes).
Uses token=0xFE at params[7] — same as the erase sequence.
The 36-byte response ends with two 4-byte event keys (first and last
stored event key). After a successful erase, both keys are 0x01110000
(device-empty sentinel). Confirmed from 4-11-26 MITM capture.
Returns:
S3Frame with 36 bytes of storage range data.
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_CHANNEL_CONFIG) # 0xFF - 0x06 = 0xF9
params = token_params(0xFE)
log.debug("read_event_storage_range: probe step rsp_sub=0x%02X", rsp_sub)
self._send(build_bw_frame(SUB_CHANNEL_CONFIG, offset=0x00, params=params))
self._recv_one(expected_sub=rsp_sub)
log.debug(
"read_event_storage_range: data step offset=0x%02X",
DATA_LENGTHS[SUB_CHANNEL_CONFIG],
)
self._send(build_bw_frame(SUB_CHANNEL_CONFIG,
offset=DATA_LENGTHS[SUB_CHANNEL_CONFIG],
params=params))
return self._recv_one(expected_sub=rsp_sub)
def begin_erase_all(self) -> S3Frame:
"""
Send Begin-Erase-All command (SUB 0xA3 → response 0x5C).
Single frame with token=0xFE at params[7]. The device acknowledges with
a minimal ack and begins the erase process. Follow up with
read_monitor_status() + read_event_storage_range() + confirm_erase_all()
to complete the sequence. Confirmed from 4-11-26 MITM capture.
Returns:
S3Frame ack from device (SUB 0x5C).
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_ERASE_ALL_BEGIN) # 0xFF - 0xA3 = 0x5C
log.debug("begin_erase_all: rsp_sub=0x%02X", rsp_sub)
self._send(build_bw_frame(SUB_ERASE_ALL_BEGIN, params=token_params(0xFE)))
return self._recv_one(expected_sub=rsp_sub)
def confirm_erase_all(self) -> S3Frame:
"""
Send Confirm-Erase-All command (SUB 0xA2 → response 0x5D).
Single frame with token=0xFE at params[7]. Must be preceded by
begin_erase_all() + read_monitor_status() + read_event_storage_range().
After this call the device memory is cleared. Confirmed from 4-11-26
MITM capture.
Returns:
S3Frame ack from device (SUB 0x5D).
Raises:
ProtocolError: on timeout or wrong response SUB.
"""
rsp_sub = _expected_rsp_sub(SUB_ERASE_ALL_CONFIRM) # 0xFF - 0xA2 = 0x5D
log.debug("confirm_erase_all: rsp_sub=0x%02X", rsp_sub)
self._send(build_bw_frame(SUB_ERASE_ALL_CONFIRM, params=token_params(0xFE)))
return self._recv_one(expected_sub=rsp_sub)
# ── Internal helpers ──────────────────────────────────────────────────────
def _send(self, frame: bytes) -> None:
"""Write a pre-built frame to the transport."""
log.debug("TX %d bytes: %s", len(frame), frame.hex())
self._transport.write(frame)
def _recv_one(
self,
expected_sub: Optional[int] = None,
timeout: Optional[float] = None,
reset_parser: bool = True,
) -> S3Frame:
"""
Read bytes from the transport until one complete S3 frame is parsed.
Feeds bytes through the streaming S3FrameParser. Keeps reading until
a frame arrives or the deadline expires.
Args:
expected_sub: If provided, raises UnexpectedResponse if the
received frame's SUB doesn't match.
timeout: Seconds to wait. Defaults to self._recv_timeout.
reset_parser: If True (default), reset the parser before reading.
Pass False when accumulating multiple frames from a
single device response (e.g. chunked E5 replies) so
that bytes already buffered between frames are not lost.
Returns:
The first complete S3Frame received.
Raises:
TimeoutError: if no frame arrives within the timeout.
ChecksumError: if the frame has an invalid checksum.
UnexpectedResponse: if expected_sub is set and doesn't match.
"""
deadline = time.monotonic() + (timeout or self._recv_timeout)
if reset_parser:
self._parser.reset()
self._pending_frames.clear()
# If a prior read() parsed more frames than it returned (e.g. two frames
# arrived in one TCP chunk), return the buffered one immediately.
if self._pending_frames:
frame = self._pending_frames.pop(0)
self._validate_frame(frame, expected_sub)
return frame
while time.monotonic() < deadline:
chunk = self._transport.read(256)
if chunk:
log.debug("RX %d bytes: %s", len(chunk), chunk.hex())
frames = self._parser.feed(chunk)
if frames:
# Stash any extras so subsequent calls with reset_parser=False see them
self._pending_frames.extend(frames[1:])
frame = frames[0]
self._validate_frame(frame, expected_sub)
return frame
else:
time.sleep(0.005)
raise TimeoutError(
f"No S3 frame received within {timeout or self._recv_timeout:.1f}s"
+ (f" (expected SUB 0x{expected_sub:02X})" if expected_sub is not None else "")
)
@staticmethod
def _validate_frame(frame: S3Frame, expected_sub: Optional[int]) -> None:
"""Validate SUB; log but do not raise on bad checksum.
S3 response checksums frequently fail SUM8 validation due to inner-frame
delimiter bytes being captured as the checksum byte. The original
s3_parser.py deliberately never validates S3 checksums for exactly this
reason. We log a warning and continue.
"""
if not frame.checksum_valid:
# S3 checksums frequently fail SUM8 due to inner-frame delimiter bytes
# landing in the checksum position. Treat as informational only.
log.debug("S3 frame SUB=0x%02X: checksum mismatch (ignoring)", frame.sub)
if expected_sub is not None and frame.sub != expected_sub:
raise UnexpectedResponse(
f"Expected SUB=0x{expected_sub:02X}, got 0x{frame.sub:02X}"
)
def _drain_boot_string(self, drain_ms: int = 200) -> None:
"""
Read and discard any boot-string bytes ("Operating System") the device
may send before entering binary protocol mode.
We simply read with a short timeout and throw the bytes away. The
S3FrameParser's IDLE state already handles non-frame bytes gracefully,
but it's cleaner to drain them explicitly before the first real frame.
"""
deadline = time.monotonic() + (drain_ms / 1000)
discarded = 0
while time.monotonic() < deadline:
chunk = self._transport.read(256)
if chunk:
discarded += len(chunk)
else:
time.sleep(0.005)
if discarded:
log.debug("drain_boot_string: discarded %d bytes", discarded)