""" protocol.py — High-level MiniMate Plus request/response protocol. Implements the request/response patterns documented in docs/instantel_protocol_reference.md on top of: - minimateplus.framing — DLE codec, frame builder, S3 streaming parser - minimateplus.transport — byte I/O (SerialTransport / future TcpTransport) This module knows nothing about pyserial or TCP — it only calls transport.write() and transport.read_until_idle(). Key patterns implemented: - POLL startup handshake (two-step, special payload[5] format) - Generic two-step paged read (probe → get length → fetch data) - Response timeout + checksum validation - Boot-string drain (device sends "Operating System" ASCII before framing) All public methods raise ProtocolError on timeout, bad checksum, or unexpected response SUB. """ from __future__ import annotations import logging import time from typing import Optional from .framing import ( S3Frame, S3FrameParser, build_bw_frame, build_5a_frame, build_bw_write_frame, waveform_key_params, token_params, bulk_waveform_params, bulk_waveform_term_params, bulk_waveform_term_v2, parse_strt_end_offset, POLL_PROBE, POLL_DATA, SESSION_RESET, ) from .transport import BaseTransport log = logging.getLogger(__name__) # ── Constants ───────────────────────────────────────────────────────────────── # Response SUB = 0xFF - Request SUB (confirmed pattern, no known exceptions # among read commands; one write-path exception documented for SUB 1C→6E). def _expected_rsp_sub(req_sub: int) -> int: return (0xFF - req_sub) & 0xFF # SUB byte constants (request side) — see protocol reference §5.1 SUB_POLL = 0x5B SUB_SERIAL_NUMBER = 0x15 SUB_FULL_CONFIG = 0x01 SUB_EVENT_INDEX = 0x08 SUB_CHANNEL_CONFIG = 0x06 # Event storage range read (first/last key) ✅ SUB_MONITOR_STATUS = 0x1C # Monitoring status read (battery, memory, mode) ✅ SUB_EVENT_HEADER = 0x1E SUB_EVENT_ADVANCE = 0x1F SUB_WAVEFORM_HEADER = 0x0A SUB_WAVEFORM_RECORD = 0x0C SUB_BULK_WAVEFORM = 0x5A SUB_COMPLIANCE = 0x1A SUB_CALL_HOME = 0x2C # Call home config read → response 0xD3 ✅ SUB_UNKNOWN_2E = 0x2E # Write command SUBs (= Read SUB + 0x60, confirmed from BW captures 3-11-26) # Response SUB follows the standard 0xFF - Request SUB rule. SUB_EVENT_INDEX_WRITE = 0x68 # Write event index (0x08 + 0x60) ✅ SUB_WAVEFORM_DATA_WRITE = 0x69 # Write waveform data (0x09 + 0x60) ✅ SUB_COMPLIANCE_WRITE = 0x71 # Write compliance cfg (0x11 + 0x60) ✅ SUB_WRITE_CONFIRM_A = 0x72 # Confirm A — sent after 71×3 and other writes ✅ SUB_WRITE_CONFIRM_B = 0x73 # Confirm B — sent after 68 ✅ SUB_WRITE_CONFIRM_C = 0x74 # Confirm C — sent after 69 ✅ SUB_TRIGGER_CONFIG_WRITE = 0x82 # Write trigger config (0x22 + 0x60) ✅ SUB_TRIGGER_CONFIRM = 0x83 # Confirm trigger write ✅ # Call home write SUBs (confirmed from 4-20-26 call home settings captures) SUB_CALL_HOME_WRITE = 0x7E # Write call home config → response 0x81 ✅ SUB_CALL_HOME_CONFIRM = 0x7F # Confirm call home write → response 0x80 ✅ # Monitoring control SUBs (confirmed from 4-8-26/2ndtry BW TX capture) SUB_START_MONITORING = 0x96 # Start monitoring → response 0x69 ✅ SUB_STOP_MONITORING = 0x97 # Stop monitoring → response 0x68 ✅ # Erase-all SUBs (confirmed from 4-11-26 MITM capture) # Both use token=0xFE at params[7] and return minimal 11-byte acks. # Standard response formula applies: 0xFF - SUB. SUB_ERASE_ALL_BEGIN = 0xA3 # Begin erase all events → response 0x5C ✅ SUB_ERASE_ALL_CONFIRM = 0xA2 # Confirm erase all events → response 0x5D ✅ # Hardcoded data lengths for the two-step read protocol. # # The S3 probe response page_key is always 0x0000 — it does NOT carry the # data length back to us. Instead, each SUB has a fixed known payload size # confirmed from BW capture analysis (offset at payload[5] of the data-request # frame). # # Key: request SUB byte. Value: offset/length byte sent in the data-request. # Entries marked 🔶 are inferred from captured frames and may need adjustment. DATA_LENGTHS: dict[int, int] = { SUB_POLL: 0x30, # POLL startup data block ✅ SUB_SERIAL_NUMBER: 0x0A, # 10-byte serial number block ✅ SUB_FULL_CONFIG: 0x98, # 152-byte full config block ✅ SUB_EVENT_INDEX: 0x58, # 88-byte event index ✅ SUB_CHANNEL_CONFIG: 0x24, # 36-byte event storage range (first/last key) ✅ SUB_MONITOR_STATUS: 0x2C, # 44-byte monitor status block (idle) ✅ SUB_EVENT_HEADER: 0x08, # 8-byte event header (waveform key + event data) ✅ SUB_EVENT_ADVANCE: 0x08, # 8-byte next-key response ✅ # SUB_WAVEFORM_HEADER (0x0A) is VARIABLE — length read from probe response # data[4]. Do NOT add it here; use read_waveform_header() instead. ✅ SUB_WAVEFORM_RECORD: 0xD2, # 210-byte waveform/histogram record ✅ SUB_CALL_HOME: 0x7C, # 124-byte call home config ✅ (confirmed 4-20-26) SUB_UNKNOWN_2E: 0x1A, # 26 bytes, purpose TBD 🔶 0x09: 0xCA, # 202 bytes, purpose TBD 🔶 # SUB_COMPLIANCE (0x1A) uses a multi-step sequence with a 2090-byte total; # NOT handled here — requires specialised read logic. } # SUB 5A (BULK_WAVEFORM_STREAM) protocol constants. # # 2026-05-01 minimal-fix: the chunk-counter walk is now bounded by the event's # `end_offset` extracted from the STRT record at data[23:27] of the probe # response. Without this bound the loop kept asking for chunks past the event # end and the device responded with post-event circular-buffer garbage, # corrupting reconstructed Blastware files for events ≥ 2 sec. # # We keep the OLD 0x0400 chunk step here (BW actually uses 0x0200 — see §7.8.5 # of the protocol reference for the corrected understanding) because the # existing blastware_file.py builder relies on the 0x0400-step frame structure # to produce valid files. Switching to BW's 0x0200 step is a separate task # that also requires updating the file builder. # BW-exact protocol values (v0.14.0). Verified against 4-27-26 + 5-1-26 captures. _BULK_CHUNK_OFFSET = 0x1002 # offset_word for probe + all chunk requests _BULK_TERM_OFFSET = 0x005A # offset_word for the legacy terminator (fallback only) _BULK_COUNTER_STEP = 0x0200 # chunk counter increment (matches chunk payload size) # Default timeout values (seconds). # MiniMate Plus is a slow device — keep these generous. DEFAULT_RECV_TIMEOUT = 10.0 POLL_RECV_TIMEOUT = 10.0 # ── Exception ───────────────────────────────────────────────────────────────── class ProtocolError(Exception): """Raised when the device violates the expected protocol.""" class TimeoutError(ProtocolError): """Raised when no response is received within the allowed time.""" class ChecksumError(ProtocolError): """Raised when a received frame has a bad checksum.""" class UnexpectedResponse(ProtocolError): """Raised when the response SUB doesn't match what we requested.""" # ── MiniMateProtocol ────────────────────────────────────────────────────────── class MiniMateProtocol: """ Protocol state machine for one open connection to a MiniMate Plus device. Does not own the transport — transport lifetime is managed by MiniMateClient. Typical usage (via MiniMateClient — not directly): proto = MiniMateProtocol(transport) proto.startup() # POLL handshake, drain boot string data = proto.read(SUB_FULL_CONFIG) sn_data = proto.read(SUB_SERIAL_NUMBER) """ def __init__( self, transport: BaseTransport, recv_timeout: float = DEFAULT_RECV_TIMEOUT, ) -> None: self._transport = transport self._recv_timeout = recv_timeout self._parser = S3FrameParser() # Extra frames buffered by _recv_one that arrived alongside the target frame. # Used when reset_parser=False so we don't discard already-parsed frames. self._pending_frames: list[S3Frame] = [] # ── Public API ──────────────────────────────────────────────────────────── def startup(self) -> S3Frame: """ Perform the POLL startup handshake and return the POLL data frame. Steps (matching §6 Session Startup Sequence): 1. Drain any boot-string bytes ("Operating System" ASCII) 2. Send POLL_PROBE (SUB 5B, offset=0x00) 3. Receive probe ack (page_key is 0x0000; data length 0x30 is hardcoded) 4. Send POLL_DATA (SUB 5B, offset=0x30) 5. Receive data frame with "Instantel" + "MiniMate Plus" strings Returns: The data-phase POLL response S3Frame. Raises: ProtocolError: if either POLL step fails. """ log.debug("startup: draining boot string") self._drain_boot_string() # Send session-reset signal (ACK+ETX) before the first POLL probe. # Confirmed from 4-8-26 BW TX captures: Blastware always sends this # 2-byte signal at session start. Required to wake units that are # actively monitoring — without it they don't respond to POLL over TCP. log.debug("startup: session reset signal") self._send(SESSION_RESET) log.debug("startup: POLL probe") self._send(POLL_PROBE) probe_rsp = self._recv_one( expected_sub=_expected_rsp_sub(SUB_POLL), timeout=self._recv_timeout, ) log.debug( "startup: POLL probe response page_key=0x%04X", probe_rsp.page_key ) # Send another session-reset between probe and data (matches BW behavior). log.debug("startup: session reset signal (inter-frame)") self._send(SESSION_RESET) log.debug("startup: POLL data request") self._send(POLL_DATA) data_rsp = self._recv_one( expected_sub=_expected_rsp_sub(SUB_POLL), timeout=self._recv_timeout, ) log.debug("startup: POLL data received, %d bytes", len(data_rsp.data)) return data_rsp def read(self, sub: int) -> bytes: """ Execute a two-step paged read and return the data payload bytes. Step 1: send probe frame (offset=0x00) → device sends a short ack Step 2: send data-request (offset=DATA_LEN) → device sends the data block The S3 probe response does NOT carry the data length — page_key is always 0x0000 in observed frames. DATA_LENGTHS holds the known fixed lengths derived from BW capture analysis. Args: sub: Request SUB byte (e.g. SUB_FULL_CONFIG = 0x01). Returns: De-stuffed data payload bytes (payload[5:] of the response frame, with the checksum already stripped by the parser). Raises: ProtocolError: on timeout, bad checksum, or wrong response SUB. KeyError: if sub is not in DATA_LENGTHS (caller should add it). """ rsp_sub = _expected_rsp_sub(sub) # Step 1 — probe (offset = 0) log.debug("read SUB=0x%02X: probe", sub) self._send(build_bw_frame(sub, 0)) _probe = self._recv_one(expected_sub=rsp_sub) # ack; page_key always 0 # Look up the hardcoded data length for this SUB if sub not in DATA_LENGTHS: raise ProtocolError( f"No known data length for SUB=0x{sub:02X}. " "Add it to DATA_LENGTHS in protocol.py." ) length = DATA_LENGTHS[sub] log.debug("read SUB=0x%02X: data request offset=0x%02X", sub, length) if length == 0: log.warning("read SUB=0x%02X: DATA_LENGTHS entry is zero", sub) return b"" # Step 2 — data-request (offset = length) self._send(build_bw_frame(sub, length)) data_rsp = self._recv_one(expected_sub=rsp_sub) log.debug("read SUB=0x%02X: received %d data bytes", sub, len(data_rsp.data)) return data_rsp.data def poll(self) -> S3Frame: """ Send a single POLL (SUB 5B) probe+data cycle and return the data response. This is a bare POLL cycle with no boot-string drain — use during an active session (contrast with startup(), which drains the "Operating System" boot string first). Confirmed from 4-2-26 BW TX capture: BW sends exactly 3 of these POLL cycles between the last 1F and the first 5A probe frame during every waveform download. Without them the device ignores the 5A probe. """ self._send(POLL_PROBE) self._recv_one( expected_sub=_expected_rsp_sub(SUB_POLL), timeout=self._recv_timeout, ) self._send(POLL_DATA) return self._recv_one( expected_sub=_expected_rsp_sub(SUB_POLL), timeout=self._recv_timeout, ) def send_keepalive(self) -> None: """ Send a single POLL_PROBE keepalive without waiting for a response. Blastware sends these every ~80ms during idle. Useful if you need to hold the session open between real requests. """ self._send(POLL_PROBE) # ── Event download API ──────────────────────────────────────────────────── def read_event_index(self) -> bytes: """ Send the SUB 08 (EVENT_INDEX) two-step read and return the raw 88-byte (0x58) index block. The index block contains: +0x00 (3 bytes): total index size or record count — purpose partially decoded; byte [3] may be a high byte of event count. +0x03 (4 bytes): stored event count as uint32 BE ❓ (inferred from captures; see §7.4 in protocol reference) +0x07 onwards: 6-byte event timestamps (see §8), one per event Caller is responsible for parsing the returned bytes. Returns: Raw 88-byte data section (data[11:11+0x58]). Raises: ProtocolError: on timeout, bad checksum, or wrong response SUB. """ rsp_sub = _expected_rsp_sub(SUB_EVENT_INDEX) length = DATA_LENGTHS[SUB_EVENT_INDEX] # 0x58 log.debug("read_event_index: 08 probe") self._send(build_bw_frame(SUB_EVENT_INDEX, 0)) self._recv_one(expected_sub=rsp_sub) log.debug("read_event_index: 08 data request offset=0x%02X", length) self._send(build_bw_frame(SUB_EVENT_INDEX, length)) data_rsp = self._recv_one(expected_sub=rsp_sub) raw = data_rsp.data[11 : 11 + length] log.debug("read_event_index: got %d bytes", len(raw)) return raw def read_event_first(self, token: int = 0) -> tuple[bytes, bytes]: """ Send the SUB 1E (EVENT_HEADER) two-step read and return the first waveform key and accompanying 8-byte event data block. Args: token: Token byte placed at params[7]. Use 0 (default) for the initial browse-mode call that returns the first event key. Use 0xFE for the second "download-arm" call that must be sent between 0A and 0C when full_waveform=True; the device will ignore 5A probe frames unless this arm step has been issued. Confirmed from 4-2-26 and 4-3-26 BW TX captures. Returns: (key4, event_data8) where: key4 — 4-byte opaque waveform record address (data[11:15]) event_data8 — full 8-byte data section (data[11:19]) Raises: ProtocolError: on timeout, bad checksum, or wrong response SUB. Confirmed from 3-31-26 capture: 1E request uses all-zero params; response data section layout is: [LENGTH_ECHO:1][00×4][KEY_ECHO:4][00×2][KEY4:4][EXTRA:4] … Actual data starts at data[11]; first 4 bytes are the waveform key. """ rsp_sub = _expected_rsp_sub(SUB_EVENT_HEADER) length = DATA_LENGTHS[SUB_EVENT_HEADER] # 0x08 params = token_params(token) log.debug("read_event_first: 1E probe (token=0x%02X)", token) self._send(build_bw_frame(SUB_EVENT_HEADER, 0, params)) self._recv_one(expected_sub=rsp_sub) log.debug("read_event_first: 1E data request offset=0x%02X (token=0x%02X)", length, token) self._send(build_bw_frame(SUB_EVENT_HEADER, length, params)) data_rsp = self._recv_one(expected_sub=rsp_sub) event_data8 = data_rsp.data[11:19] key4 = data_rsp.data[11:15] log.debug("read_event_first: key=%s", key4.hex()) return key4, event_data8 def read_waveform_header(self, key4: bytes) -> tuple[bytes, int]: """ Send the SUB 0A (WAVEFORM_HEADER) two-step read for *key4*. The data length for 0A is VARIABLE and must be read from the probe response at data[4]. Two confirmed values: 0x46 (70) — full triggered event (has 0C waveform record to follow) 0x2C (44) — partial / monitor-log entry (no 0C record; 0A header only) Args: key4: 4-byte waveform record address from 1E or 1F. Returns: (raw_data, record_length) where: raw_data — complete data_rsp.data bytes (full response payload) record_length — DATA_LENGTH read from probe (0x46 for full, 0x2C for partial) The raw_data layout: raw_data[0] = record type (0x46 = full triggered event, 0x2C = partial/monitor) raw_data[1:5] = 0x00 × 4 raw_data[5:9] = event key (4 bytes) raw_data[9:11] = 0x00 × 2 raw_data[11:] = timestamps + separator + serial + channel strings (see MonitorLogEntry in models.py for full layout) Raises: ProtocolError: on timeout, bad checksum, or wrong response SUB. Confirmed from 4-11-26 MITM capture: 0A probe response data[4] carries the variable length; data-request uses that length as the offset byte. record_length == data[0] in virtually all cases (confirmed empirically). """ rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_HEADER) params = waveform_key_params(key4) log.debug("read_waveform_header: 0A probe key=%s", key4.hex()) self._send(build_bw_frame(SUB_WAVEFORM_HEADER, 0, params)) probe_rsp = self._recv_one(expected_sub=rsp_sub) # Variable length — read from probe response data[4] length = probe_rsp.data[4] if len(probe_rsp.data) > 4 else 0x46 log.debug("read_waveform_header: 0A data request offset=0x%02X", length) if length == 0: return b"", 0 self._send(build_bw_frame(SUB_WAVEFORM_HEADER, length, params)) data_rsp = self._recv_one(expected_sub=rsp_sub) log.debug( "read_waveform_header: key=%s length=0x%02X is_full=%s", key4.hex(), length, length >= 0x40, ) return data_rsp.data, length def read_waveform_data_raw(self) -> bytes: """ Send the SUB 09 (WAVEFORM_DATA) two-step read and return the raw 202-byte (0xCA) waveform data block. This is the "waveform data" block that Blastware reads from the device before the write sequence (confirmed from 3-11-26 BW TX capture BW[80-81]). The returned bytes are used verbatim as the ``waveform_data`` payload for ``write_waveform_data()`` / ``push_config_raw()``. Returns: Raw data section starting at data[11:], typically 204 bytes. (data[11 : 11 + 0xCA] = 202 bytes on some firmware; the actual length may be 204 depending on firmware version.) Raises: ProtocolError: on timeout, bad checksum, or wrong response SUB. """ SUB_WAVEFORM_DATA = 0x09 rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_DATA) # 0xFF - 0x09 = 0xF6 length = DATA_LENGTHS[SUB_WAVEFORM_DATA] # 0xCA = 202 log.debug("read_waveform_data_raw: 09 probe") self._send(build_bw_frame(SUB_WAVEFORM_DATA, 0)) self._recv_one(expected_sub=rsp_sub) log.debug("read_waveform_data_raw: 09 data request offset=0x%02X", length) self._send(build_bw_frame(SUB_WAVEFORM_DATA, length)) data_rsp = self._recv_one(expected_sub=rsp_sub) raw = data_rsp.data[11:] log.debug("read_waveform_data_raw: got %d bytes", len(raw)) return raw def read_waveform_record(self, key4: bytes) -> bytes: """ Send the SUB 0C (WAVEFORM_RECORD / FULL_WAVEFORM_RECORD) two-step read. Returns the 210-byte waveform/histogram record containing: - Record type string ("Histogram" or "Waveform") at a variable offset - Per-channel labels ("Tran", "Vert", "Long", "MicL") with PPV floats at label_offset + 6 Args: key4: 4-byte waveform record address. Returns: 210-byte record bytes (data[11:11+0xD2]). Raises: ProtocolError: on timeout, bad checksum, or wrong response SUB. Confirmed from 3-31-26 capture: 0C always uses offset=0xD2 (210 bytes). """ rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_RECORD) length = DATA_LENGTHS[SUB_WAVEFORM_RECORD] # 0xD2 params = waveform_key_params(key4) log.debug("read_waveform_record: 0C probe key=%s", key4.hex()) self._send(build_bw_frame(SUB_WAVEFORM_RECORD, 0, params)) self._recv_one(expected_sub=rsp_sub) log.debug("read_waveform_record: 0C data request offset=0x%02X", length) self._send(build_bw_frame(SUB_WAVEFORM_RECORD, length, params)) data_rsp = self._recv_one(expected_sub=rsp_sub) record = data_rsp.data[11:11 + length] log.debug("read_waveform_record: received %d record bytes", len(record)) return record def read_bulk_waveform_stream( self, key4: bytes, *, stop_after_metadata: bool = True, # DEPRECATED — no-op under BW-exact walk max_chunks: int = 256, # safety cap only; loop is bounded by end_offset include_terminator: bool = False, extra_chunks_after_metadata: int = 1, # DEPRECATED — no-op ) -> list[S3Frame]: """ Download the SUB 5A (BULK_WAVEFORM_STREAM) A5 frames for one event using Blastware's exact protocol. REWRITTEN 2026-05-02 (v0.14.0). Algorithm (matches BW captures across 2-sec / 3-sec / event-2): 1. Probe - For events at start_key[2:4] = 0x0000 (first event after erase / wrap): probe at counter=0x0000 with full key in params. - For continuation events (start_key[2:4] != 0): first chunk at counter = start_key[2:4] + 0x0046; acts as both probe and first sample chunk; response carries STRT. 2. Parse end_offset from STRT record at data[23:27] of the probe response. 3. Read two fixed metadata pages at counter=0x1002 and counter=0x1004 — global session metadata (Project / Client / User Name / Seis Loc / Extended Notes ASCII strings). Event 1 only; continuation events skip these (BW caches them across the session). 4. Walk sample chunks at 0x0200 increments, starting from 0x0600 for event 1 or `start + 0x0046 + 0x0200` for continuation events. Stop when `next_chunk + 0x0200 > end_offset`. 5. Send TERM frame with offset_word and params computed by `bulk_waveform_term_v2(key4, end_offset, last_chunk_counter)`. The TERM response contains the partial last chunk (residual = end_offset - next_boundary) including the 26-byte 0e 08 file footer. Returns: List of S3Frame objects from each A5 response (probe, metadata pages, sample chunks, optional TERM response). Caller passes `include_terminator=True` (e.g. write_blastware_file) to keep the TERM response in the list — it's required to reconstruct the file footer. Deprecated kwargs: stop_after_metadata: legacy "Project:"-string-based stop condition. No-op under the BW-exact walk; the loop is deterministically bounded by end_offset from STRT. Accepted for backward compat. extra_chunks_after_metadata: same. Raises: ProtocolError: on timeout / bad checksum / unexpected SUB. """ if len(key4) != 4: raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}") # Quietly accept and warn on deprecated kwargs. if not stop_after_metadata: log.debug("5A: stop_after_metadata=False is no-op under BW-exact walk") if extra_chunks_after_metadata not in (0, 1): log.debug("5A: extra_chunks_after_metadata=%d is no-op under BW-exact walk", extra_chunks_after_metadata) rsp_sub = _expected_rsp_sub(SUB_BULK_WAVEFORM) # 0xA5 frames_data: list[S3Frame] = [] start_offset = (key4[2] << 8) | key4[3] is_event_1 = (start_offset == 0) # ── Step 1: probe / first chunk ────────────────────────────────────── if is_event_1: probe_counter = 0 probe_params = bulk_waveform_params(key4, 0, is_probe=True) log.debug("5A probe (event-1) key=%s counter=0x0000", key4.hex()) else: # Continuation events: first 5A request lands at start+0x0046, # acting as both probe and first sample chunk. Confirmed from # 5-1-26 "copy 2nd address event" capture. probe_counter = start_offset + 0x0046 probe_params = bulk_waveform_params(key4, probe_counter) log.debug( "5A probe (event-N) key=%s counter=0x%04X (start+0x46)", key4.hex(), probe_counter, ) self._send(build_5a_frame(_BULK_CHUNK_OFFSET, probe_params)) self._parser.reset() try: rsp = self._recv_one(expected_sub=rsp_sub, reset_parser=False) except TimeoutError: log.warning( "5A probe TIMED OUT for key=%s — %d raw bytes received", key4.hex(), self._parser.bytes_fed, ) raise frames_data.append(rsp) log.debug("5A A5[0] (probe) page_key=0x%04X %d bytes", rsp.page_key, len(rsp.data)) # ── Step 2: parse STRT end_offset from probe response ──────────────── end_offset = parse_strt_end_offset(rsp.data) if end_offset is None: log.warning( "5A probe response did not contain a STRT record; " "cannot bound chunk loop — falling back to max_chunks=%d cap", max_chunks, ) end_offset = 0xFFFF # impossible value → loop runs to max_chunks else: log.info( "5A STRT start_offset=0x%04X end_offset=0x%04X size=0x%04X", start_offset, end_offset, end_offset - start_offset, ) # ── Step 3: metadata pages 0x1002 + 0x1004 (event 1 only) ──────────── # Confirmed from BW captures: BW reads these two fixed device-buffer # pages immediately after the probe for events at start_key[2:4]=0. # Continuation events skip them (BW caches across the session). # Their content is global compliance-setup metadata: Project, Client, # User Name, Seis Loc, Extended Notes. if is_event_1: for meta_counter in (0x1002, 0x1004): # Metadata page params have an extra trailing 0x00 byte # (12-byte params instead of 11) — empirical from BW captures. # Checksum-neutral but matches BW byte-for-byte. meta_params = bytes([ 0x00, key4[0], key4[1], (meta_counter >> 8) & 0xFF, meta_counter & 0xFF, 0, 0, 0, 0, 0, 0, 0, ]) log.debug("5A metadata page counter=0x%04X", meta_counter) self._send(build_5a_frame(_BULK_CHUNK_OFFSET, meta_params)) self._parser.reset() try: meta_rsp = self._recv_one( expected_sub=rsp_sub, reset_parser=False, timeout=10.0, ) except TimeoutError: log.warning( "5A metadata page 0x%04X TIMED OUT — continuing", meta_counter, ) continue frames_data.append(meta_rsp) log.debug( "5A meta@0x%04X page_key=0x%04X %d bytes", meta_counter, meta_rsp.page_key, len(meta_rsp.data), ) # ── Step 4: sample chunk loop, bounded by end_offset ───────────────── # Sample chunks start at: # event 1: counter = 0x0600 # event N (>0): counter = probe_counter + 0x0200 # (probe was the first sample chunk) if is_event_1: counter = 0x0600 else: counter = probe_counter + _BULK_COUNTER_STEP last_chunk_counter: Optional[int] = ( probe_counter if not is_event_1 else None ) chunks_fetched = 0 while chunks_fetched < max_chunks: # Stop when next chunk would straddle the event end. if counter + _BULK_COUNTER_STEP > end_offset: log.debug( "5A chunk loop done at counter=0x%04X (end=0x%04X); " "%d chunks fetched", counter, end_offset, chunks_fetched, ) break params = bulk_waveform_params(key4, counter) log.debug("5A chunk #%d counter=0x%04X", chunks_fetched + 1, counter) self._send(build_5a_frame(_BULK_CHUNK_OFFSET, params)) self._parser.reset() try: rsp = self._recv_one( expected_sub=rsp_sub, reset_parser=False, timeout=10.0, ) except TimeoutError: raw = self._parser.bytes_fed log.warning( "5A TIMEOUT chunk=%d counter=0x%04X raw_bytes=%d", chunks_fetched + 1, counter, raw, ) if raw > 0 and frames_data: log.warning( "5A unexpected end-of-stream — proceeding to TERM", ) break raise log.debug( "5A RX chunk=%d page_key=0x%04X data_len=%d", chunks_fetched + 1, rsp.page_key, len(rsp.data), ) if rsp.page_key == 0x0000: # Device terminated mid-stream unexpectedly. log.warning( "5A unexpected page_key=0x0000 mid-stream at counter=0x%04X", counter, ) if include_terminator: frames_data.append(rsp) return frames_data frames_data.append(rsp) last_chunk_counter = counter counter += _BULK_COUNTER_STEP chunks_fetched += 1 else: log.warning( "5A reached max_chunks=%d at counter=0x%04X (end=0x%04X)", max_chunks, counter, end_offset, ) # ── Step 5: TERM with proper end_offset-derived formula ────────────── if last_chunk_counter is None or end_offset == 0xFFFF: # No STRT or no chunks fetched — fall back to legacy TERM. log.warning( "5A using legacy TERM (offset_word=0x005A); " "end_offset unavailable or no chunks fetched", ) legacy_counter = (last_chunk_counter or probe_counter) + _BULK_COUNTER_STEP term_offset_word = _BULK_TERM_OFFSET # 0x005A term_params = bulk_waveform_term_params(key4, legacy_counter) else: term_offset_word, term_params = bulk_waveform_term_v2( key4, end_offset, last_chunk_counter, ) log.debug( "5A TERM offset_word=0x%04X params[2:4]=%s end=0x%04X " "last_chunk=0x%04X", term_offset_word, term_params[2:4].hex(), end_offset, last_chunk_counter, ) self._send(build_5a_frame(term_offset_word, term_params)) try: term_rsp = self._recv_one(expected_sub=rsp_sub, timeout=10.0) log.info( "5A TERM response page_key=0x%04X %d bytes", term_rsp.page_key, len(term_rsp.data), ) if include_terminator: frames_data.append(term_rsp) except TimeoutError: log.warning("5A no TERM response (timeout)") return frames_data def advance_event(self, browse: bool = False) -> tuple[bytes, bytes]: """ Send the SUB 1F (EVENT_ADVANCE) two-step read and return the next waveform key and the full 8-byte event data block. browse=False (default, download mode): sends token=0xFE at params[7]. Used by get_events() — the token causes the device to skip partial histogram bins and return the key of the next FULL record. browse=True: sends all-zero params (no token). Matches Blastware's confirmed browse-mode sequence: 0A → 1F(zeros) → 0A → 1F(zeros). Used by count_events() where no 0C/5A download occurs. IMPORTANT: A preceding 0A (read_waveform_header) call is REQUIRED in both modes to establish device waveform context. Without it, 1F returns the null sentinel regardless of how many events are stored. Returns: (key4, event_data8) where: key4 — 4-byte opaque waveform record address (data[11:15]). event_data8 — full 8-byte block (data[11:19]). End-of-events sentinel: event_data8[4:8] == b'\\x00\\x00\\x00\\x00'. DO NOT use key4 == b'\\x00\\x00\\x00\\x00' as the sentinel — key4 is all-zeros for event 0 (the very first stored event) and will cause the loop to terminate prematurely. Raises: ProtocolError: on timeout, bad checksum, or wrong response SUB. """ rsp_sub = _expected_rsp_sub(SUB_EVENT_ADVANCE) length = DATA_LENGTHS[SUB_EVENT_ADVANCE] # 0x08 params = token_params(0) if browse else token_params(0xFE) mode = "browse" if browse else "download" log.debug("advance_event: 1F probe mode=%s params=%s", mode, params.hex()) self._send(build_bw_frame(SUB_EVENT_ADVANCE, 0, params)) self._recv_one(expected_sub=rsp_sub) log.debug("advance_event: 1F data request offset=0x%02X", length) self._send(build_bw_frame(SUB_EVENT_ADVANCE, length, params)) data_rsp = self._recv_one(expected_sub=rsp_sub) event_data8 = data_rsp.data[11:19] key4 = data_rsp.data[11:15] is_done = event_data8[4:8] == b"\x00\x00\x00\x00" log.debug( "advance_event: next key=%s data8=%s done=%s", key4.hex(), event_data8.hex(), is_done, ) return key4, event_data8 def read_compliance_config(self) -> bytes: """ Send the SUB 1A (COMPLIANCE_CONFIG) multi-step read and accumulate all E5 response frames into a single config byte string. BE18189 sends the full config in one large E5 frame (~4245 cfg bytes). BE11529 appears to chunk the response — each E5 frame carries ~44 bytes of cfg data. This method loops until the expected 0x082A (2090) bytes are accumulated or the inter-frame gap exceeds _INTER_FRAME_TIMEOUT. Frame structure (confirmed from raw BW captures 3-11-26): Probe (Frame A): byte[5]=0x00, params[7]=0x64 Data req (Frame D): byte[5]=0x2A, params[2]=0x08, params[7]=0x64 0x082A split: byte[5]=0x2A (offset low), params[2]=0x08 (length high) params[7]=0x64 required in both probe and data-request. Returns: Accumulated compliance config bytes. First frame: data[11:] (skips 11-byte echo header). Subsequent frames: structure logged and accumulated from data[11:] as well — adjust offset if structure differs. Raises: ProtocolError: if the very first E5 frame is not received (hard timeout). """ rsp_sub = _expected_rsp_sub(SUB_COMPLIANCE) # Probe — params[7]=0x64 required (confirmed from BW capture) _PROBE_PARAMS = bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00]) log.debug("read_compliance_config: 1A probe") self._send(build_bw_frame(SUB_COMPLIANCE, 0, _PROBE_PARAMS)) self._recv_one(expected_sub=rsp_sub) # Frame D params — offset=0x002A, params[2]=0x08, params[7]=0x64 _DATA_PARAMS = bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00]) # ── Multi-request accumulation ──────────────────────────────────────── # # Full BW sequence (confirmed from raw_bw captures 3-11-26): # # Frame B: offset=0x0400 params[2]=0x00 → requests cfg bytes 0..1023 # Frame C: offset=0x0400 params[2]=0x04 → requests cfg bytes 1024..2047 # Frame D: offset=0x002A params[2]=0x08 → requests cfg bytes 2048..2089 # # Total: 0x0400 + 0x0400 + 0x002A = 0x082A = 2090 bytes. # # The "offset" field in B and C encodes the chunk length (0x0400 = 1024), # not a byte offset into the config. params[2] tracks cumulative pages # (0x00 → 0x04 → 0x08; each page = 256 bytes → 0x04 pages = 1024 bytes). # # Each request gets its own E5 response with an 11-byte echo header. # Devices that send the full block in a single frame (BE18189) may return # the entire config from the last request alone — we handle both cases by # trying each step and concatenating whatever arrives. _DATA_PARAMS_B = bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00]) _DATA_PARAMS_C = bytes([0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00]) # _DATA_PARAMS_D already built above as _DATA_PARAMS _STEPS = [ ("B", 0x0400, _DATA_PARAMS_B), ("C", 0x0400, _DATA_PARAMS_C), ("D", 0x002A, _DATA_PARAMS), # _DATA_PARAMS built above ] config = bytearray() for step_name, step_offset, step_params in _STEPS: log.debug( "read_compliance_config: sending frame %s offset=0x%04X params=%s", step_name, step_offset, step_params.hex(), ) self._send(build_bw_frame(SUB_COMPLIANCE, step_offset, step_params)) try: data_rsp = self._recv_one(expected_sub=rsp_sub) except TimeoutError: log.warning( "read_compliance_config: frame %s — no E5 response (timeout)", step_name, ) continue chunk = data_rsp.data[11:] log.warning( "read_compliance_config: frame %s page=0x%04X data=%d cfg_chunk=%d running_total=%d", step_name, data_rsp.page_key, len(data_rsp.data), len(chunk), len(config) + len(chunk), ) config.extend(chunk) # Safety drain: catch any extra frame the device may buffer on slow links. try: tail_rsp = self._recv_one(expected_sub=rsp_sub, timeout=2.0) tail_chunk = tail_rsp.data[11:] log.warning( "read_compliance_config: unexpected tail frame page=0x%04X " "cfg_chunk=%d running_total=%d", tail_rsp.page_key, len(tail_chunk), len(config) + len(tail_chunk), ) config.extend(tail_chunk) except TimeoutError: pass log.warning( "read_compliance_config: done — %d cfg bytes total", len(config), ) # Hex dump first 128 bytes for field mapping for row in range(0, min(len(config), 128), 16): row_bytes = bytes(config[row:row + 16]) hex_part = ' '.join(f'{b:02x}' for b in row_bytes) asc_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in row_bytes) log.warning(" cfg[%04x]: %-48s %s", row, hex_part, asc_part) return bytes(config) # ── Write commands (SUBs 68–83) ─────────────────────────────────────────── def recv_write_ack( self, expected_sub: int, timeout: Optional[float] = None, ) -> S3Frame: """ Wait for a write-ack S3 frame. All write ack responses are 17-byte frames (11-byte header + no data + 1 checksum byte) with SUB = 0xFF - request_SUB. The page_key and data section carry zeros. Confirmed from 3-11-26 BW capture. Args: expected_sub: Expected response SUB byte (0xFF - write_request_SUB). timeout: Seconds to wait; defaults to self._recv_timeout. Returns: The ack S3Frame. Raises: TimeoutError: if no frame arrives in time. UnexpectedResponse: if the response SUB doesn't match. """ log.debug("recv_write_ack: waiting for SUB=0x%02X", expected_sub) ack = self._recv_one(expected_sub=expected_sub, timeout=timeout) log.debug( "recv_write_ack: received SUB=0x%02X page=0x%04X data=%d bytes", ack.sub, ack.page_key, len(ack.data), ) return ack def write_confirm(self, sub: int) -> S3Frame: """ Send a zero-data confirm frame and wait for the ack. Confirm frames (SUBs 72, 73, 74, 83) carry no write data — they are 16-byte header-only frames (offset=0, params=zeros, data=b"") with the DLE-aware large-frame checksum. The device acks with the complementary RSP_SUB. Args: sub: Confirm SUB byte (SUB_WRITE_CONFIRM_A/B/C or SUB_TRIGGER_CONFIRM). Returns: The ack S3Frame. Raises: ProtocolError: on timeout or wrong response SUB. """ rsp_sub = _expected_rsp_sub(sub) frame = build_bw_write_frame(sub, b"") log.debug("write_confirm: SUB=0x%02X frame=%s", sub, frame.hex()) self._send(frame) return self.recv_write_ack(expected_sub=rsp_sub) def write_event_index(self, data: bytes) -> S3Frame: """ Send a SUB 68 (EVENT_INDEX_WRITE) frame and await the confirm ack (SUB 97). Offset formula: data[1] + 2 — confirmed from 3-11-26 BW TX capture frame 102. The write payload has a 2-byte header [0x00][length] where data[1] encodes the length of the meaningful payload; offset = data[1] + 2. Example from capture: data[0:4] = 00 58 09 00 (data[1]=0x58=88 → offset=0x5A=90) data length = 91, offset = 90 Write sequence fragment: 68 (data) → device acks with SUB 0x97 73 (confirm) → device acks with SUB 0x8C Callers should call write_confirm(SUB_WRITE_CONFIRM_B) after this. Args: data: Raw event-index payload bytes to write to the device. Must be at least 2 bytes. data[1] must contain the length field. Returns: The SUB 0x97 ack frame. Raises: ProtocolError: on timeout or wrong response SUB. ValueError: if data is shorter than 2 bytes. """ if len(data) < 2: raise ValueError(f"event index write data must be at least 2 bytes, got {len(data)}") rsp_sub = _expected_rsp_sub(SUB_EVENT_INDEX_WRITE) # 0xFF - 0x68 = 0x97 offset = data[1] + 2 frame = build_bw_write_frame(SUB_EVENT_INDEX_WRITE, data, offset=offset) log.debug( "write_event_index: %d bytes data[1]=0x%02X offset=0x%04X rsp_sub=0x%02X", len(data), data[1], offset, rsp_sub, ) self._send(frame) return self.recv_write_ack(expected_sub=rsp_sub) def write_waveform_data(self, data: bytes) -> S3Frame: """ Send a SUB 69 (WAVEFORM_DATA_WRITE) frame and await the confirm ack (SUB 96). Offset formula: data[1] + 2 — same pattern as write_event_index(). Confirmed from 3-11-26 BW TX capture frame 110: data[0:4] = 00 c8 08 00 (data[1]=0xC8=200 → offset=0xCA=202) data length = 204, offset = 202 Write sequence fragment: 69 (data) → device acks with SUB 0x96 74 (confirm) → device acks with SUB 0x8B 72 (confirm) → device acks with SUB 0x8D Callers should call write_confirm(SUB_WRITE_CONFIRM_C) then write_confirm(SUB_WRITE_CONFIRM_A) after this. Args: data: Raw waveform-data payload bytes to write. Must be at least 2 bytes. data[1] must contain the length field. Returns: The SUB 0x96 ack frame. Raises: ProtocolError: on timeout or wrong response SUB. ValueError: if data is shorter than 2 bytes. """ if len(data) < 2: raise ValueError(f"waveform data write payload must be at least 2 bytes, got {len(data)}") rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_DATA_WRITE) # 0xFF - 0x69 = 0x96 offset = data[1] + 2 frame = build_bw_write_frame(SUB_WAVEFORM_DATA_WRITE, data, offset=offset) log.debug( "write_waveform_data: %d bytes data[1]=0x%02X offset=0x%04X rsp_sub=0x%02X", len(data), data[1], offset, rsp_sub, ) self._send(frame) return self.recv_write_ack(expected_sub=rsp_sub) def write_compliance_config_raw(self, data: bytes) -> None: """ Send the SUB 71 (COMPLIANCE_WRITE) 3-chunk sequence and final confirm. The full compliance config payload (~2128 bytes) is split into exactly 3 chunks with hardcoded boundaries and params confirmed from the 3-11-26 BW TX capture (frames 104–108): Chunk 1 — first 1027 bytes: offset=0x1004 params=bytes(10) device acks SUB 0x8E Chunk 2 — next 1055 bytes: offset=0x1004 params=b'\\x00\\x00\\x00\\x00\\x10\\x04' + b'\\x00'*4 device acks SUB 0x8E Chunk 3 — remaining bytes: offset=0x002C params=b'\\x00\\x00\\x08' + b'\\x00'*7 device acks SUB 0x8E Confirm — SUB 72 (zero data): device acks SUB 0x8D The total write payload should be at least 1027+1055=2082 bytes; chunk 3 carries everything after offset 2082 (typically ~46 bytes for a 2128-byte config). Args: data: Raw compliance config bytes to write. Must be at least 2082 bytes. Raises: ValueError: if data is too short to fill chunks 1 and 2. ProtocolError: on timeout or wrong response SUB from any chunk. """ _CHUNK1_SIZE = 1027 _CHUNK2_SIZE = 1055 _CHUNK1_OFFSET = 0x1004 _CHUNK2_OFFSET = 0x1004 _CHUNK3_OFFSET = 0x002C _CHUNK1_PARAMS = bytes(10) _CHUNK2_PARAMS = bytes([0x00, 0x00, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]) _CHUNK3_PARAMS = bytes([0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) min_size = _CHUNK1_SIZE + _CHUNK2_SIZE if len(data) < min_size: raise ValueError( f"Compliance write data too short: {len(data)} bytes, " f"need at least {min_size} (chunk1={_CHUNK1_SIZE} + chunk2={_CHUNK2_SIZE})" ) rsp_sub = _expected_rsp_sub(SUB_COMPLIANCE_WRITE) # 0xFF - 0x71 = 0x8E chunk1 = data[:_CHUNK1_SIZE] chunk2 = data[_CHUNK1_SIZE : _CHUNK1_SIZE + _CHUNK2_SIZE] chunk3 = data[_CHUNK1_SIZE + _CHUNK2_SIZE :] chunks = [ (1, chunk1, _CHUNK1_OFFSET, _CHUNK1_PARAMS), (2, chunk2, _CHUNK2_OFFSET, _CHUNK2_PARAMS), (3, chunk3, _CHUNK3_OFFSET, _CHUNK3_PARAMS), ] for chunk_num, chunk_data, chunk_offset, chunk_params in chunks: frame = build_bw_write_frame( SUB_COMPLIANCE_WRITE, chunk_data, offset=chunk_offset, params=chunk_params, ) log.debug( "write_compliance_config_raw: chunk %d %d bytes " "offset=0x%04X params=%s", chunk_num, len(chunk_data), chunk_offset, chunk_params.hex(), ) self._send(frame) self.recv_write_ack(expected_sub=rsp_sub) log.debug("write_compliance_config_raw: chunk %d acked", chunk_num) # Final confirm (SUB 72) log.debug("write_compliance_config_raw: sending confirm (SUB 0x72)") self.write_confirm(SUB_WRITE_CONFIRM_A) log.debug("write_compliance_config_raw: done") def write_trigger_config(self, data: bytes) -> S3Frame: """ Send a SUB 82 (TRIGGER_CONFIG_WRITE) frame and await the confirm ack (SUB 7D). Offset formula: data[1] + 2 — same pattern as write_event_index(). Confirmed from 3-11-26 BW TX capture frame 108: data[0:4] = 00 1a d5 00 (data[1]=0x1A=26 → offset=0x1C=28) data length = 29, offset = 28 Write sequence fragment: 82 (data) → device acks with SUB 0x7D 83 (confirm) → device acks with SUB 0x7C Callers should call write_confirm(SUB_TRIGGER_CONFIRM) after this. Args: data: Raw trigger-config payload bytes to write. Must be at least 2 bytes. data[1] must contain the length field. Returns: The SUB 0x7D ack frame. Raises: ProtocolError: on timeout or wrong response SUB. ValueError: if data is shorter than 2 bytes. """ if len(data) < 2: raise ValueError(f"trigger config write payload must be at least 2 bytes, got {len(data)}") rsp_sub = _expected_rsp_sub(SUB_TRIGGER_CONFIG_WRITE) # 0xFF - 0x82 = 0x7D offset = data[1] + 2 frame = build_bw_write_frame(SUB_TRIGGER_CONFIG_WRITE, data, offset=offset) log.debug( "write_trigger_config: %d bytes data[1]=0x%02X offset=0x%04X rsp_sub=0x%02X", len(data), data[1], offset, rsp_sub, ) self._send(frame) return self.recv_write_ack(expected_sub=rsp_sub) # ── Call home config (SUBs 0x2C / 0x7E / 0x7F) ────────────────────────── def read_call_home_config(self) -> bytes: """ Read the auto call home configuration (SUB 0x2C → response 0xD3). Standard two-step read: probe (offset=0x00) then data (offset=0x7C=124). Returns the raw 125-byte payload (data[11:] of the data response). Confirmed from 4-20-26 call home settings capture: - Probe response: data[4]=0x7C (confirms data length = 124) - Data response: 136 bytes total (11-byte echo header + 125 bytes payload) - Payload[0:3] = 0x00 0x7C 0xDC (header: zero, inner-length, constant) - Payload[5] = auto_call_home_enabled - Payload[6:46] = dial_string (40-byte null-padded ASCII "RADIO RING") Returns: Raw 125-byte call home config payload (data[11:]). Suitable for round-trip write (append \\x00\\x00 → 127-byte write payload). Raises: ProtocolError: on timeout or wrong response SUB. """ rsp_sub = _expected_rsp_sub(SUB_CALL_HOME) # 0xFF - 0x2C = 0xD3 length = DATA_LENGTHS[SUB_CALL_HOME] # 0x7C = 124 log.debug("read_call_home_config: 0x2C probe") self._send(build_bw_frame(SUB_CALL_HOME, 0)) self._recv_one(expected_sub=rsp_sub) log.debug("read_call_home_config: 0x2C data request offset=0x%02X", length) self._send(build_bw_frame(SUB_CALL_HOME, length)) data_rsp = self._recv_one(expected_sub=rsp_sub) payload = data_rsp.data[11:] log.debug("read_call_home_config: received %d payload bytes", len(payload)) return payload def write_call_home_config(self, data: bytes) -> None: """ Write the auto call home configuration (SUB 0x7E → 0x7F confirm). Write sequence (confirmed from 4-20-26 call home settings captures): SUB 0x7E write 127-byte payload → device acks SUB 0x81 SUB 0x7F confirm (no data) → device acks SUB 0x80 The 127-byte write payload = 125-byte read payload + b'\\x00\\x00'. The offset field = data[1] + 2 = 0x7C + 2 = 0x7E = 126. Write frame format: build_bw_write_frame (minimal DLE stuffing — only BW_CMD is doubled; all other bytes are RAW). The \\x10\\x03 sequence within the payload is preserved as-is (device interprets DLE+ETX as the literal value 0x03 per the inner-frame terminator convention). Args: data: 127-byte write payload (read payload + \\x00\\x00 footer). Must start with [0x00][0x7C][...] (standard header). Raises: ValueError: if data is not exactly 127 bytes or lacks expected header. ProtocolError: on timeout or wrong response SUB. """ if len(data) < 2: raise ValueError(f"call home write payload must be at least 2 bytes, got {len(data)}") rsp_sub_write = _expected_rsp_sub(SUB_CALL_HOME_WRITE) # 0xFF - 0x7E = 0x81 rsp_sub_confirm = _expected_rsp_sub(SUB_CALL_HOME_CONFIRM) # 0xFF - 0x7F = 0x80 # Offset formula: data[1] + 2 (same pattern as other single-chunk writes) offset = data[1] + 2 # 0x7C + 2 = 0x7E = 126 frame = build_bw_write_frame(SUB_CALL_HOME_WRITE, data, offset=offset) log.debug( "write_call_home_config: %d bytes data[1]=0x%02X offset=0x%04X", len(data), data[1], offset, ) self._send(frame) self.recv_write_ack(expected_sub=rsp_sub_write) log.debug("write_call_home_config: write acked; sending confirm 0x7F") confirm_frame = build_bw_write_frame(SUB_CALL_HOME_CONFIRM, b"") self._send(confirm_frame) self.recv_write_ack(expected_sub=rsp_sub_confirm) log.debug("write_call_home_config: confirm acked — done") # ── Monitoring ──────────────────────────────────────────────────────────── def read_monitor_status(self) -> S3Frame: """ Read monitoring status (SUB 0x1C → response 0xE3). Two-step read: probe (offset=0x00) then data (offset=0x2C). Returns: S3Frame with 44 bytes of status data (idle state). When unit is actively monitoring the payload is shorter (12 bytes); callers should check frame data length to determine mode. Raises: ProtocolError: on timeout or wrong response SUB. """ rsp_sub = _expected_rsp_sub(SUB_MONITOR_STATUS) # 0xFF - 0x1C = 0xE3 log.debug("read_monitor_status: probe step rsp_sub=0x%02X", rsp_sub) probe_frame = build_bw_frame(SUB_MONITOR_STATUS, offset=0x00) self._send(probe_frame) self._recv_one(expected_sub=rsp_sub) log.debug("read_monitor_status: data step offset=0x%02X", DATA_LENGTHS[SUB_MONITOR_STATUS]) data_frame = build_bw_frame(SUB_MONITOR_STATUS, offset=DATA_LENGTHS[SUB_MONITOR_STATUS]) self._send(data_frame) return self._recv_one(expected_sub=rsp_sub) def start_monitoring(self) -> S3Frame: """ Send Start Monitoring command (SUB 0x96 → response 0x69). Single write frame, no data payload. Confirmed from 4-8-26/2ndtry BW TX capture frame 92. Returns: S3Frame ack from device. Raises: ProtocolError: on timeout or wrong response SUB. """ rsp_sub = _expected_rsp_sub(SUB_START_MONITORING) # 0xFF - 0x96 = 0x69 log.debug("start_monitoring: rsp_sub=0x%02X", rsp_sub) frame = build_bw_write_frame(SUB_START_MONITORING, b"") self._send(frame) return self.recv_write_ack(expected_sub=rsp_sub) def stop_monitoring(self) -> S3Frame: """ Send Stop Monitoring command (SUB 0x97 → response 0x68). Single write frame, no data payload. Confirmed from 4-8-26/2ndtry BW TX capture frame 305. Returns: S3Frame ack from device. Raises: ProtocolError: on timeout or wrong response SUB. """ rsp_sub = _expected_rsp_sub(SUB_STOP_MONITORING) # 0xFF - 0x97 = 0x68 log.debug("stop_monitoring: rsp_sub=0x%02X", rsp_sub) frame = build_bw_write_frame(SUB_STOP_MONITORING, b"") self._send(frame) return self.recv_write_ack(expected_sub=rsp_sub) def read_event_storage_range(self) -> S3Frame: """ Read event storage range (SUB 0x06 → response 0xF9). Two-step read: probe (offset=0x00) then data (offset=0x24 = 36 bytes). Uses token=0xFE at params[7] — same as the erase sequence. The 36-byte response ends with two 4-byte event keys (first and last stored event key). After a successful erase, both keys are 0x01110000 (device-empty sentinel). Confirmed from 4-11-26 MITM capture. Returns: S3Frame with 36 bytes of storage range data. Raises: ProtocolError: on timeout or wrong response SUB. """ rsp_sub = _expected_rsp_sub(SUB_CHANNEL_CONFIG) # 0xFF - 0x06 = 0xF9 params = token_params(0xFE) log.debug("read_event_storage_range: probe step rsp_sub=0x%02X", rsp_sub) self._send(build_bw_frame(SUB_CHANNEL_CONFIG, offset=0x00, params=params)) self._recv_one(expected_sub=rsp_sub) log.debug( "read_event_storage_range: data step offset=0x%02X", DATA_LENGTHS[SUB_CHANNEL_CONFIG], ) self._send(build_bw_frame(SUB_CHANNEL_CONFIG, offset=DATA_LENGTHS[SUB_CHANNEL_CONFIG], params=params)) return self._recv_one(expected_sub=rsp_sub) def begin_erase_all(self) -> S3Frame: """ Send Begin-Erase-All command (SUB 0xA3 → response 0x5C). Single frame with token=0xFE at params[7]. The device acknowledges with a minimal ack and begins the erase process. Follow up with read_monitor_status() + read_event_storage_range() + confirm_erase_all() to complete the sequence. Confirmed from 4-11-26 MITM capture. Returns: S3Frame ack from device (SUB 0x5C). Raises: ProtocolError: on timeout or wrong response SUB. """ rsp_sub = _expected_rsp_sub(SUB_ERASE_ALL_BEGIN) # 0xFF - 0xA3 = 0x5C log.debug("begin_erase_all: rsp_sub=0x%02X", rsp_sub) self._send(build_bw_frame(SUB_ERASE_ALL_BEGIN, params=token_params(0xFE))) return self._recv_one(expected_sub=rsp_sub) def confirm_erase_all(self) -> S3Frame: """ Send Confirm-Erase-All command (SUB 0xA2 → response 0x5D). Single frame with token=0xFE at params[7]. Must be preceded by begin_erase_all() + read_monitor_status() + read_event_storage_range(). After this call the device memory is cleared. Confirmed from 4-11-26 MITM capture. Returns: S3Frame ack from device (SUB 0x5D). Raises: ProtocolError: on timeout or wrong response SUB. """ rsp_sub = _expected_rsp_sub(SUB_ERASE_ALL_CONFIRM) # 0xFF - 0xA2 = 0x5D log.debug("confirm_erase_all: rsp_sub=0x%02X", rsp_sub) self._send(build_bw_frame(SUB_ERASE_ALL_CONFIRM, params=token_params(0xFE))) return self._recv_one(expected_sub=rsp_sub) # ── Internal helpers ────────────────────────────────────────────────────── def _send(self, frame: bytes) -> None: """Write a pre-built frame to the transport.""" log.debug("TX %d bytes: %s", len(frame), frame.hex()) self._transport.write(frame) def _recv_one( self, expected_sub: Optional[int] = None, timeout: Optional[float] = None, reset_parser: bool = True, ) -> S3Frame: """ Read bytes from the transport until one complete S3 frame is parsed. Feeds bytes through the streaming S3FrameParser. Keeps reading until a frame arrives or the deadline expires. Args: expected_sub: If provided, raises UnexpectedResponse if the received frame's SUB doesn't match. timeout: Seconds to wait. Defaults to self._recv_timeout. reset_parser: If True (default), reset the parser before reading. Pass False when accumulating multiple frames from a single device response (e.g. chunked E5 replies) so that bytes already buffered between frames are not lost. Returns: The first complete S3Frame received. Raises: TimeoutError: if no frame arrives within the timeout. ChecksumError: if the frame has an invalid checksum. UnexpectedResponse: if expected_sub is set and doesn't match. """ deadline = time.monotonic() + (timeout or self._recv_timeout) if reset_parser: self._parser.reset() self._pending_frames.clear() # If a prior read() parsed more frames than it returned (e.g. two frames # arrived in one TCP chunk), return the buffered one immediately. if self._pending_frames: frame = self._pending_frames.pop(0) self._validate_frame(frame, expected_sub) return frame while time.monotonic() < deadline: chunk = self._transport.read(256) if chunk: log.debug("RX %d bytes: %s", len(chunk), chunk.hex()) frames = self._parser.feed(chunk) if frames: # Stash any extras so subsequent calls with reset_parser=False see them self._pending_frames.extend(frames[1:]) frame = frames[0] self._validate_frame(frame, expected_sub) return frame else: time.sleep(0.005) raise TimeoutError( f"No S3 frame received within {timeout or self._recv_timeout:.1f}s" + (f" (expected SUB 0x{expected_sub:02X})" if expected_sub is not None else "") ) @staticmethod def _validate_frame(frame: S3Frame, expected_sub: Optional[int]) -> None: """Validate SUB; log but do not raise on bad checksum. S3 response checksums frequently fail SUM8 validation due to inner-frame delimiter bytes being captured as the checksum byte. The original s3_parser.py deliberately never validates S3 checksums for exactly this reason. We log a warning and continue. """ if not frame.checksum_valid: # S3 checksums frequently fail SUM8 due to inner-frame delimiter bytes # landing in the checksum position. Treat as informational only. log.debug("S3 frame SUB=0x%02X: checksum mismatch (ignoring)", frame.sub) if expected_sub is not None and frame.sub != expected_sub: raise UnexpectedResponse( f"Expected SUB=0x{expected_sub:02X}, got 0x{frame.sub:02X}" ) def _drain_boot_string(self, drain_ms: int = 200) -> None: """ Read and discard any boot-string bytes ("Operating System") the device may send before entering binary protocol mode. We simply read with a short timeout and throw the bytes away. The S3FrameParser's IDLE state already handles non-frame bytes gracefully, but it's cleaner to drain them explicitly before the first real frame. """ deadline = time.monotonic() + (drain_ms / 1000) discarded = 0 while time.monotonic() < deadline: chunk = self._transport.read(256) if chunk: discarded += len(chunk) else: time.sleep(0.005) if discarded: log.debug("drain_boot_string: discarded %d bytes", discarded)