""" client.py — MiniMateClient: the top-level public API for the library. Combines transport, protocol, and model decoding into a single easy-to-use class. This is the only layer that the SFM server (sfm/server.py) imports directly. Design: stateless per-call (connect → do work → disconnect). The client does not hold an open connection between calls. This keeps the first implementation simple and matches Blastware's observed behaviour. Persistent connections can be added later without changing the public API. Example (serial): from minimateplus import MiniMateClient with MiniMateClient("COM5") as device: info = device.connect() # POLL handshake + identity read events = device.get_events() # download all events Example (TCP / modem): from minimateplus import MiniMateClient from minimateplus.transport import TcpTransport transport = TcpTransport("203.0.113.5", port=12345) with MiniMateClient(transport=transport) as device: info = device.connect() """ from __future__ import annotations import logging import struct from typing import Optional from .framing import S3Frame from .models import ( ComplianceConfig, DeviceInfo, Event, MonitorStatus, PeakValues, ProjectInfo, Timestamp, ) from .protocol import MiniMateProtocol, ProtocolError from .protocol import ( SUB_SERIAL_NUMBER, SUB_FULL_CONFIG, SUB_WRITE_CONFIRM_A, SUB_WRITE_CONFIRM_B, SUB_WRITE_CONFIRM_C, SUB_TRIGGER_CONFIRM, SUB_MONITOR_STATUS, ) from .transport import SerialTransport, BaseTransport log = logging.getLogger(__name__) # ── Module-level constants ──────────────────────────────────────────────────── # Trigger config payload hardcoded from 3-11-26 BW TX capture (BW frame 108). # No SUB 0x22 read exists in the capture — Blastware writes this fixed blob. # 29 bytes: [00][1A][D5][00][00][10][03][08][0A] + 18×FF + [00][00] _TRIGGER_DATA_HARDCODED: bytes = bytes.fromhex( "001ad500001003080a" "ffffffffffffffffffffffffffffffffffffff" "0000" ) # Compliance ASCII slot format (confirmed from 3-11-26 capture + label search): # Each label occupies a 64-byte slot. # Value starts at slot_start + 22, max 42 bytes, null-padded. _COMPLIANCE_SLOT_SIZE = 64 _COMPLIANCE_VALUE_OFFSET = 22 _COMPLIANCE_VALUE_MAX = _COMPLIANCE_SLOT_SIZE - _COMPLIANCE_VALUE_OFFSET # 42 # ── MiniMateClient ──────────────────────────────────────────────────────────── class MiniMateClient: """ High-level client for a single MiniMate Plus device. Args: port: Serial port name (e.g. "COM5", "/dev/ttyUSB0"). Not required when a pre-built transport is provided. baud: Baud rate (default 38400, ignored when transport is provided). timeout: Per-request receive timeout in seconds (default 15.0). transport: Pre-built transport (SerialTransport or TcpTransport). If None, a SerialTransport is constructed from port/baud. """ def __init__( self, port: str = "", baud: int = 38_400, timeout: float = 15.0, transport: Optional[BaseTransport] = None, ) -> None: self.port = port self.baud = baud self.timeout = timeout self._transport: Optional[BaseTransport] = transport self._proto: Optional[MiniMateProtocol] = None # ── Connection lifecycle ────────────────────────────────────────────────── def open(self) -> None: """Open the transport connection.""" if self._transport is None: self._transport = SerialTransport(self.port, self.baud) if not self._transport.is_connected: self._transport.connect() self._proto = MiniMateProtocol(self._transport, recv_timeout=self.timeout) def close(self) -> None: """Close the transport connection.""" if self._transport and self._transport.is_connected: self._transport.disconnect() self._proto = None @property def is_open(self) -> bool: return bool(self._transport and self._transport.is_connected) # ── Context manager ─────────────────────────────────────────────────────── def __enter__(self) -> "MiniMateClient": self.open() return self def __exit__(self, *_) -> None: self.close() # ── Public API ──────────────────────────────────────────────────────────── def connect(self) -> DeviceInfo: """ Perform the startup handshake and read device identity + compliance config. Opens the connection if not already open. Reads: 1. POLL handshake (startup) 2. SUB 15 — serial number 3. SUB 01 — full config block (firmware, model strings) 4. SUB 1A — compliance config (record time, trigger/alarm levels, project strings) 5. SUB 08 — event index (stored event count) Returns: Populated DeviceInfo with compliance_config and event_count cached. Raises: ProtocolError: on any communication failure. """ if not self.is_open: self.open() proto = self._require_proto() log.info("connect: POLL startup") proto.startup() log.info("connect: reading serial number (SUB 15)") sn_data = proto.read(SUB_SERIAL_NUMBER) device_info = _decode_serial_number(sn_data) log.info("connect: reading full config (SUB 01)") cfg_data = proto.read(SUB_FULL_CONFIG) _decode_full_config_into(cfg_data, device_info) log.info("connect: reading compliance config (SUB 1A)") try: cc_data = proto.read_compliance_config() _decode_compliance_config_into(cc_data, device_info) except ProtocolError as exc: log.warning("connect: compliance config read failed: %s — continuing", exc) log.info("connect: reading event index (SUB 08)") try: idx_raw = proto.read_event_index() device_info.event_count = _decode_event_count(idx_raw) log.info("connect: device has %d stored event(s)", device_info.event_count) except ProtocolError as exc: log.warning("connect: event index read failed: %s — continuing", exc) log.info("connect: %s", device_info) return device_info def count_events(self) -> int: """ Count stored events by iterating the 1E → 1F key chain. This is the only reliable way to get the true event count. The SUB 08 event index payload has a field that was assumed to be an event count (uint32 BE at offset +3) but empirically always returns 1 regardless of how many events are stored — Blastware appears to download one event per TCP session, so the index may reflect session-scoped state rather than device-wide storage. This method issues 1E (first key) then 1F repeatedly until the null sentinel, counting as it goes. No 0A/0C/5A reads are performed, so it is much faster than get_events(). Null sentinel: event_data8[4:8] == b'\\x00\\x00\\x00\\x00'. DO NOT check key4 — key4 is all-zeros for event 0 and would falsely signal end-of-events on the very first iteration. Returns: Number of stored waveform events (0 if device is empty). """ proto = self._require_proto() try: key4, data8 = proto.read_event_first() except ProtocolError as exc: log.warning("count_events: 1E failed: %s — returning 0", exc) return 0 log.warning( "count_events: 1E → key=%s data8=%s trailing=%s", key4.hex(), data8.hex(), data8[4:8].hex(), ) if data8[4:8] == b"\x00\x00\x00\x00": log.info("count_events: 1E returned null sentinel — device is empty") return 0 # Iterate via 0A → 1F. Each 0A call establishes waveform context so that # 1F (EVENT_ADVANCE) returns the actual next-event key. Without the 0A # call, 1F immediately returns the null sentinel regardless of how many # events are stored. Confirmed from 4-3-26 two-event BW/S3 capture: # browse-mode sequence is 0A(keyN) → 1F → 0A(keyN+1) → 1F → … → null. count = 0 while data8[4:8] != b"\x00\x00\x00\x00": count += 1 try: # 0A establishes device context for this key before 1F advances. proto.read_waveform_header(key4) except ProtocolError as exc: log.warning("count_events: 0A failed for key=%s: %s", key4.hex(), exc) break try: key4, data8 = proto.advance_event(browse=True) log.warning( "count_events: 1F [iter %d] → key=%s data8=%s trailing=%s", count, key4.hex(), data8.hex(), data8[4:8].hex(), ) except ProtocolError as exc: log.warning("count_events: 1F failed after %d events: %s", count, exc) break log.info("count_events: %d event(s) found via 1E/1F chain", count) return count def get_events(self, full_waveform: bool = False, debug: bool = False, stop_after_index: Optional[int] = None) -> list[Event]: """ Download all stored events from the device using the confirmed 1E → 0A → 0C → 5A → 1F event-iterator protocol. Sequence (confirmed from 3-31-26 and 1-2-26 Blastware captures): 1. SUB 1E — get first waveform key 2. For each key until b'\\x00\\x00\\x00\\x00': a. SUB 0A — waveform header (first event only, confirm full record) b. SUB 0C — full waveform record (peak values, record type, timestamp) c. SUB 5A — bulk waveform stream (event-time metadata; stops early after "Project:" is found, so only ~8 frames are fetched) d. SUB 1F — advance to next key (token=0xFE skips partial bins) The SUB 5A fetch provides the authoritative event-time metadata: "Project:", "Client:", "User Name:", "Seis Loc:", and "Extended Notes" as they were configured AT THE TIME the event was recorded. This is distinct from the current device compliance config (SUB 1A), which only reflects the CURRENT setup. Raw ADC waveform samples (full bulk waveform payload, several MB) are NOT downloaded by default. include_waveforms is reserved for a future endpoint that fetches and stores the raw ADC channel data. Returns: List of Event objects, one per stored waveform record. Raises: ProtocolError: on unrecoverable communication failure. """ proto = self._require_proto() log.info("get_events: requesting first event (SUB 1E)") try: key4, data8 = proto.read_event_first() except ProtocolError as exc: raise ProtocolError(f"get_events: 1E failed: {exc}") from exc # Null sentinel: trailing 4 bytes of the 8-byte event data block are # all zero. DO NOT use key4 == b"\x00\x00\x00\x00" — event 0 has # key4=00000000 which would falsely signal an empty device. if data8[4:8] == b"\x00\x00\x00\x00": log.info("get_events: device reports no stored events") return [] events: list[Event] = [] idx = 0 while data8[4:8] != b"\x00\x00\x00\x00": cur_key = key4 # key for this event's 0A/1E-arm/0C/5A calls log.info("get_events: record %d key=%s", idx, cur_key.hex()) ev = Event(index=idx) ev._waveform_key = cur_key # SUB 0A — MUST be called first to establish device waveform context. # Required before 0C, 1E-arm, and 1F. proceed = True try: _hdr, rec_len = proto.read_waveform_header(cur_key) if rec_len < 0x30: log.warning( "get_events: key=%s is partial (len=0x%02X) — skipping", cur_key.hex(), rec_len, ) proceed = False except ProtocolError as exc: log.warning( "get_events: 0A failed for key=%s: %s — skipping", cur_key.hex(), exc, ) proceed = False if proceed: # SUB 1E (download-arm) — MUST be sent between 0A and 0C. # Device ignores 5A probe frames without this second 1E(token=0xFE). # Confirmed from both 4-2-26 and 4-3-26 BW TX captures (2026-04-06). log.info("get_events: 1E download-arm (token=0xFE) for key=%s", cur_key.hex()) try: proto.read_event_first(token=0xFE) log.info("get_events: 1E download-arm OK") except ProtocolError as exc: log.warning( "get_events: 1E download-arm failed for key=%s: %s", cur_key.hex(), exc, ) # SUB 0C — full waveform record (peak values, timestamp, project string) try: record = proto.read_waveform_record(cur_key) if debug: ev._raw_record = record _decode_waveform_record_into(record, ev) except ProtocolError as exc: log.warning( "get_events: 0C failed for key=%s: %s", cur_key.hex(), exc ) # SUB 1F (download-arm) — send token=0xFE BEFORE POLL+5A to arm the # device's bulk stream state machine. Cache the returned key as a # fallback for loop iteration when 5A fails (see iteration block below). # Confirmed from 4-2-26 capture frames 66-67 (1F before frames 68-73 POLL). arm_key4: Optional[bytes] = None try: arm_key4, _ = proto.advance_event(browse=False) # arm 5A log.info("get_events: 1F(download) — 5A armed, arm_key=%s", arm_key4.hex()) except ProtocolError as exc: log.warning("get_events: 1F(download) arm failed: %s", exc) # POLL × 3 — BW sends 3 full POLL cycles between 1F and 5A. # Confirmed from 4-2-26 BW TX capture (frames 68-73 before 5A at 74). log.info("get_events: POLL × 3 before 5A") for _p in range(3): try: proto.poll() except ProtocolError as exc: log.warning("get_events: POLL %d failed: %s", _p, exc) # SUB 5A — bulk waveform stream (uses cur_key, the event set up by 0A+1E+0C). # By default (full_waveform=False): stop after frame 7 for metadata only. # When full_waveform=True: fetch all chunks and decode raw ADC samples. a5_ok = False try: if full_waveform: log.info( "get_events: 5A full waveform download for key=%s", cur_key.hex() ) a5_frames = proto.read_bulk_waveform_stream( cur_key, stop_after_metadata=False, max_chunks=128 ) if a5_frames: a5_ok = True _decode_a5_metadata_into(a5_frames, ev) _decode_a5_waveform(a5_frames, ev) log.info( "get_events: 5A decoded %d sample-sets", len((ev.raw_samples or {}).get("Tran", [])), ) else: log.info( "get_events: 5A metadata-only download for key=%s", cur_key.hex() ) a5_frames = proto.read_bulk_waveform_stream( cur_key, stop_after_metadata=True ) if a5_frames: a5_ok = True _decode_a5_metadata_into(a5_frames, ev) log.debug( "get_events: 5A metadata client=%r operator=%r", ev.project_info.client if ev.project_info else None, ev.project_info.operator if ev.project_info else None, ) except ProtocolError as exc: log.warning( "get_events: 5A failed for key=%s: %s — metadata unavailable", cur_key.hex(), exc, ) # SUB 1F — loop iteration. # # IMPORTANT: browse 1F (all-zero params) is ONLY called when 5A # succeeded. If 5A timed out or failed, calling browse 1F disrupts # the device's internal state and causes the NEXT event's 5A to also # fail. In the failure path, use the key cached from 1F(download) # above as a best-effort fallback for iteration. # # Confirmed from 4-3-26 browse-mode captures: browse=True params # are correct for multi-event iteration. Conditional logic added # 2026-04-06 to avoid post-failure state disruption. if a5_ok: # 5A succeeded — use browse 1F for reliable key advancement. try: key4, data8 = proto.advance_event(browse=True) log.info( "get_events: 1F(browse) → key=%s trailing=%s", key4.hex(), data8[4:8].hex(), ) except ProtocolError as exc: log.warning("get_events: 1F(browse) failed: %s — stopping", exc) key4 = b"\x00\x00\x00\x00" data8 = b"\x00\x00\x00\x00\x00\x00\x00\x00" else: # 5A failed — skip browse 1F to avoid further state disruption. # Use the arm_key4 returned by 1F(download) as the next-key hint. if arm_key4 is None or arm_key4 == cur_key: # 1F(download) returned no valid next key (or same key = stuck). # Stop iteration to prevent infinite loop. log.warning( "get_events: 5A failed and 1F(download) returned no valid " "next key (arm_key=%s, cur_key=%s) — stopping iteration", arm_key4.hex() if arm_key4 else "None", cur_key.hex(), ) key4 = b"\x00\x00\x00\x00" data8 = b"\x00\x00\x00\x00\x00\x00\x00\x00" else: # arm_key4 is a valid non-stuck next key — use it. # Construct a synthetic data8 with non-null trailing so the # loop continues (the real trailing is unknown but non-null # since we have a valid arm_key4). key4 = arm_key4 data8 = arm_key4 + b"\x00\x00\x00\x01" log.warning( "get_events: 5A failed — advancing via arm_key=%s " "(browse 1F skipped to preserve device state)", key4.hex(), ) events.append(ev) idx += 1 # Early exit: if the caller only wants events up to a specific # index, stop iterating once we've collected it. if stop_after_index is not None and idx > stop_after_index: log.info( "get_events: reached stop_after_index=%d — stopping early", stop_after_index, ) break else: # Partial/failed record — skip 5A, just advance with 1F. log.info( "get_events: key=%s — skipping partial/failed record", cur_key.hex(), ) try: key4, data8 = proto.advance_event(browse=True) log.info( "get_events: 1F → key=%s trailing=%s", key4.hex(), data8[4:8].hex(), ) except ProtocolError as exc: log.warning("get_events: 1F failed: %s — stopping iteration", exc) break log.info("get_events: downloaded %d event(s)", len(events)) return events def download_waveform(self, event: Event) -> None: """ Download the full raw ADC waveform for a previously-retrieved event and populate event.raw_samples, event.total_samples, event.pretrig_samples, and event.rectime_seconds. This performs a complete SUB 5A (BULK_WAVEFORM_STREAM) download with stop_after_metadata=False, fetching all waveform frames (typically 9 large A5 frames for a standard blast record). The download is large (up to several hundred KB for a 9-second, 4-channel, 1024-Hz record) and is intentionally not performed by get_events() by default. Args: event: An Event object returned by get_events(). Must have a waveform key embedded; the key is reconstructed from the event's timestamp and index via the 1E/1F protocol. Raises: ValueError: if the event does not have a waveform key available. RuntimeError: if the client is not connected. ProtocolError: on communication failure. Confirmed format (4-2-26 blast capture, ✅): 4-channel interleaved signed 16-bit LE, 8 bytes per sample-set. Total samples: 9306 (≈9.1 s at 1024 Hz), pretrig: 298 (≈0.29 s). Channel order: Tran, Vert, Long, Mic (Blastware convention). """ proto = self._require_proto() if event._waveform_key is None: raise ValueError( f"Event#{event.index} has no waveform key — " "was it retrieved via get_events()?" ) log.info( "download_waveform: starting full 5A download for event#%d (key=%s)", event.index, event._waveform_key.hex(), ) a5_frames = proto.read_bulk_waveform_stream( event._waveform_key, stop_after_metadata=False, max_chunks=128 ) log.info( "download_waveform: received %d A5 frames; decoding waveform", len(a5_frames), ) _decode_a5_waveform(a5_frames, event) if event.raw_samples is not None: n = len(event.raw_samples.get("Tran", [])) log.info( "download_waveform: decoded %d sample-sets across 4 channels", n, ) else: log.warning("download_waveform: waveform decode produced no samples") # ── Write commands ──────────────────────────────────────────────────────── def push_config_raw( self, event_index_data: bytes, compliance_data: bytes, trigger_data: bytes, waveform_data: bytes, ) -> None: """ Push a complete config update to the device using the confirmed write sequence from the 3-11-26 BW TX capture. This is the raw-bytes interface — callers supply pre-encoded payloads for each write block. A higher-level method that encodes from ComplianceConfig and re-reads the current payloads first can be built on top of this. Full write sequence (confirmed from 3-11-26 BW TX capture frames 102–112): SUB 68 → event index write → ack SUB 0x97 SUB 73 → confirm B → ack SUB 0x8C SUB 71 (×3 chunks) → compliance write → each ack SUB 0x8E SUB 72 → confirm A → ack SUB 0x8D SUB 82 → trigger config write → ack SUB 0x7D SUB 83 → trigger confirm → ack SUB 0x7C SUB 69 → waveform data write → ack SUB 0x96 SUB 74 → confirm C → ack SUB 0x8B SUB 72 → confirm A → ack SUB 0x8D Args: event_index_data: Raw bytes for SUB 68 write (88-byte event index). compliance_data: Raw bytes for SUB 71 write (≥2082 bytes, 3 chunks). trigger_data: Raw bytes for SUB 82 write (44-byte trigger config). waveform_data: Raw bytes for SUB 69 write. Raises: RuntimeError: if the client is not connected. ProtocolError: if any write step fails (timeout, bad ack SUB). ValueError: if compliance_data is too short for the 3-chunk split. """ proto = self._require_proto() # 68 → 73 log.info("push_config_raw: write event index (SUB 68)") proto.write_event_index(event_index_data) log.info("push_config_raw: confirm B (SUB 73)") proto.write_confirm(SUB_WRITE_CONFIRM_B) # 71×3 → 72 (handled internally by write_compliance_config_raw) log.info("push_config_raw: write compliance config (SUB 71 ×3 + confirm 72)") proto.write_compliance_config_raw(compliance_data) # 82 → 83 log.info("push_config_raw: write trigger config (SUB 82)") proto.write_trigger_config(trigger_data) log.info("push_config_raw: trigger confirm (SUB 83)") proto.write_confirm(SUB_TRIGGER_CONFIRM) # 69 → 74 → 72 log.info("push_config_raw: write waveform data (SUB 69)") proto.write_waveform_data(waveform_data) log.info("push_config_raw: confirm C (SUB 74)") proto.write_confirm(SUB_WRITE_CONFIRM_C) log.info("push_config_raw: confirm A (SUB 72)") proto.write_confirm(SUB_WRITE_CONFIRM_A) log.info("push_config_raw: complete") def apply_config( self, *, # Recording parameters sample_rate: Optional[int] = None, record_time: Optional[float] = None, # Threshold parameters (geo channels, in/s) trigger_level_geo: Optional[float] = None, alarm_level_geo: Optional[float] = None, max_range_geo: Optional[float] = None, # Project / operator strings project: Optional[str] = None, client_name: Optional[str] = None, operator: Optional[str] = None, seis_loc: Optional[str] = None, notes: Optional[str] = None, ) -> None: """ Read the current device config, apply any supplied changes to the compliance block, and write the full config back to the device. Only non-None arguments are modified; all other bytes are round-tripped verbatim from the device. Configurable fields ------------------- Recording parameters: sample_rate : int — samples/sec; valid values: 1024, 2048, 4096 record_time : float — record duration in seconds (e.g. 2.0, 3.0) Trigger/alarm thresholds (geo channels, in/s): trigger_level_geo : float — trigger threshold (e.g. 0.5) alarm_level_geo : float — alarm threshold (e.g. 1.0) max_range_geo : float — full-scale calibration constant (e.g. 6.206) rarely changed — only set if you know what you're doing Project / operator strings (max 41 ASCII characters each): project : str client_name : str operator : str seis_loc : str — sensor location notes : str — extended notes Write sequence (confirmed from 3-11-26 BW TX capture): 68→73 | 71×3→72 | 82→83 | 69→74→72 Write payloads: event_index_data : 88 bytes — read live via SUB 08 compliance_data : 2128 bytes — read live via SUB 1A (2126 bytes) + \\x00\\x00 footer trigger_data : 29 bytes — hardcoded from 3-11-26 capture waveform_data : 204 bytes — read live via SUB 09 Raises: RuntimeError: if not connected. ProtocolError: if any read or write step fails. ValueError: if compliance buffer is not the expected 2126 bytes. """ proto = self._require_proto() # 1. Read current payloads from the device log.info("apply_config: reading event index (SUB 08)") event_index_data = proto.read_event_index() log.info("apply_config: reading compliance config (SUB 1A)") compliance_raw = proto.read_compliance_config() # 2126 bytes log.info("apply_config: reading waveform data (SUB 09)") waveform_data = proto.read_waveform_data_raw() # 204 bytes trigger_data = _TRIGGER_DATA_HARDCODED # 29 bytes # 2. Patch the compliance buffer and build the 2128-byte write payload compliance_data = _encode_compliance_config( compliance_raw, sample_rate=sample_rate, record_time=record_time, trigger_level_geo=trigger_level_geo, alarm_level_geo=alarm_level_geo, max_range_geo=max_range_geo, project=project, client_name=client_name, operator=operator, seis_loc=seis_loc, notes=notes, ) log.info("apply_config: compliance payload ready (%d bytes)", len(compliance_data)) # 3. Push the full write sequence to the device self.push_config_raw(event_index_data, compliance_data, trigger_data, waveform_data) log.info("apply_config: complete") def set_project_info( self, project: Optional[str] = None, client_name: Optional[str] = None, operator: Optional[str] = None, seis_loc: Optional[str] = None, notes: Optional[str] = None, ) -> None: """Backwards-compat shim — delegates to apply_config().""" self.apply_config( project=project, client_name=client_name, operator=operator, seis_loc=seis_loc, notes=notes, ) def poll(self) -> None: """ Perform just the POLL startup handshake — no config reads. Opens the connection if not already open. Used by the monitoring endpoints which need to communicate with the device quickly without spending 10-15 seconds reading compliance config and event index. The POLL establishes the DLE-framed session with the device. After poll(), the protocol is ready for any command (read_monitor_status, start_monitoring, stop_monitoring, etc.). """ if not self.is_open: self.open() proto = self._require_proto() log.debug("poll: startup handshake") proto.startup() # ── Monitoring ──────────────────────────────────────────────────────────── def get_monitor_status(self) -> MonitorStatus: """ Read the current monitoring state, battery voltage, and memory usage. Wraps protocol.read_monitor_status() and decodes the raw payload into a MonitorStatus object. The device payload length indicates mode: - 44 bytes (0x2C): unit is idle (full status block present) - 12 bytes : unit is actively monitoring (abbreviated block) Confirmed field offsets (relative to data[11], the start of the S3 data section after the 11-byte frame header): [0x2F:0x31] battery voltage × 100 uint16 BE e.g. 0x02A8 = 680 → 6.80 V [0x31:0x35] memory total (bytes) uint32 BE e.g. 0x000F0000 = 983040 bytes [0x35:0x39] memory free (bytes) uint32 BE Returns: MonitorStatus with is_monitoring, battery_v, memory_total, memory_free. Raises: RuntimeError: if not connected. ProtocolError: on timeout or wrong response SUB. """ proto = self._require_proto() frame = proto.read_monitor_status() return _decode_monitor_status(frame.data) def start_monitoring(self) -> None: """ Command the device to begin monitoring (recording triggered events). Sends SUB 0x96; device responds with a 17-byte zero-data ack (SUB 0x69). Confirmed from 4-8-26/2ndtry BW TX capture frame 92. Raises: RuntimeError: if not connected. ProtocolError: on timeout or wrong response. """ proto = self._require_proto() proto.start_monitoring() log.info("start_monitoring: device is now monitoring") def stop_monitoring(self) -> None: """ Command the device to stop monitoring. Sends SUB 0x97; device responds with a 17-byte zero-data ack (SUB 0x68). Confirmed from 4-8-26/2ndtry BW TX capture frame 305. Raises: RuntimeError: if not connected. ProtocolError: on timeout or wrong response. """ proto = self._require_proto() proto.stop_monitoring() log.info("stop_monitoring: device stopped monitoring") # ── Internal helpers ────────────────────────────────────────────────────── def _require_proto(self) -> MiniMateProtocol: if self._proto is None: raise RuntimeError("MiniMateClient is not connected. Call open() first.") return self._proto # ── Decoder functions ───────────────────────────────────────────────────────── # # Pure functions: bytes → model field population. # Kept here (not in models.py) to isolate protocol knowledge from data shapes. def _decode_serial_number(data: bytes) -> DeviceInfo: """ Decode SUB EA (SERIAL_NUMBER_RESPONSE) payload into a new DeviceInfo. Layout (10 bytes total per §7.2): bytes 0–7: serial string, null-terminated, null-padded ("BE18189\\x00") byte 8: unit-specific trailing byte (purpose unknown ❓) byte 9: firmware minor version (0x11 = 17) ✅ Returns: New DeviceInfo with serial, firmware_minor, serial_trail_0 populated. """ # data is data_rsp.data = payload[5:]. The 11-byte section header occupies # data[0..10]: [LENGTH_ECHO:1][00×4][KEY_ECHO:4][00×2]. # Actual serial payload starts at data[11]. actual = data[11:] if len(data) > 11 else data if len(actual) < 9: # Short payload — gracefully degrade serial = actual.rstrip(b"\x00").decode("ascii", errors="replace") return DeviceInfo(serial=serial, firmware_minor=0) serial = actual[:8].rstrip(b"\x00").decode("ascii", errors="replace") trail_0 = actual[8] if len(actual) > 8 else None fw_minor = actual[9] if len(actual) > 9 else 0 return DeviceInfo( serial=serial, firmware_minor=fw_minor, serial_trail_0=trail_0, ) def _decode_full_config_into(data: bytes, info: DeviceInfo) -> None: """ Decode SUB FE (FULL_CONFIG_RESPONSE) payload into an existing DeviceInfo. The FE response arrives as a composite S3 outer frame whose data section contains inner DLE-framed sub-frames. Because of this nesting the §7.3 fixed offsets (0x34, 0x3C, 0x44, 0x6D) are unreliable — they assume a clean non-nested payload starting at byte 0. Instead we search the whole byte array for known ASCII patterns. The strings are long enough to be unique in any reasonable payload. Modifies info in-place. """ def _extract(needle: bytes, max_len: int = 32) -> Optional[str]: """Return the null-terminated ASCII string that starts with *needle*.""" pos = data.find(needle) if pos < 0: return None end = pos while end < len(data) and data[end] != 0 and (end - pos) < max_len: end += 1 s = data[pos:end].decode("ascii", errors="replace").strip() return s or None # ── Manufacturer and model are straightforward literal matches ──────────── info.manufacturer = _extract(b"Instantel") info.model = _extract(b"MiniMate Plus") # ── Firmware version: "S3xx.xx" — scan for the 'S3' prefix ─────────────── for i in range(len(data) - 5): if data[i] == ord('S') and data[i + 1] == ord('3') and chr(data[i + 2]).isdigit(): end = i while end < len(data) and data[end] not in (0, 0x20) and (end - i) < 12: end += 1 candidate = data[i:end].decode("ascii", errors="replace").strip() if "." in candidate and len(candidate) >= 5: info.firmware_version = candidate break # ── DSP version: numeric "xx.xx" — search for known prefixes ───────────── for prefix in (b"10.", b"11.", b"12.", b"9.", b"8."): pos = data.find(prefix) if pos < 0: continue end = pos while end < len(data) and data[end] not in (0, 0x20) and (end - pos) < 8: end += 1 candidate = data[pos:end].decode("ascii", errors="replace").strip() # Accept only strings that look like "digits.digits" if "." in candidate and all(c in "0123456789." for c in candidate): info.dsp_version = candidate break def _decode_event_count(data: bytes) -> int: """ Extract stored event count from SUB F7 (EVENT_INDEX_RESPONSE) payload. Layout per §7.4 (offsets from data section start): +00: 00 58 09 — total index size or record count ❓ +03: 00 00 00 01 — possibly stored event count = 1 ❓ We use bytes +03..+06 interpreted as uint32 BE as the event count. This is inferred (🔶) — the exact meaning of the first 3 bytes is unclear. """ if len(data) < 7: log.warning("event index payload too short (%d bytes), assuming 0 events", len(data)) return 0 # Log the full payload so we can reverse-engineer the format log.warning("event_index raw (%d bytes total):", len(data)) for off in range(0, len(data), 16): chunk = data[off:off+16] hex_part = " ".join(f"{b:02x}" for b in chunk) asc_part = "".join(chr(b) if 0x20 <= b < 0x7f else "." for b in chunk) log.warning(" [%04x]: %-47s %s", off, hex_part, asc_part) # Try the uint32 at +3 first count = struct.unpack_from(">I", data, 3)[0] # Sanity check: MiniMate Plus manual says max ~1000 events if count > 1000: log.warning( "event count %d looks unreasonably large — clamping to 0", count ) return 0 log.warning("event_index decoded count=%d (uint32 BE at offset +3)", count) return count def _decode_event_header_into(data: bytes, event: Event) -> None: """ Decode SUB E1 (EVENT_HEADER_RESPONSE) raw data section into an Event. The waveform key is at data[11:15] (extracted separately in MiniMateProtocol.read_event_first). The remaining 4 bytes at data[15:19] are not yet decoded (❓ — possibly sample rate or flags). Date information (year/month/day) lives in the waveform record (SUB 0C), not in the 1E response. This function is a placeholder for any future metadata we decode from the 8-byte 1E data block. Modifies event in-place. """ # Nothing confirmed yet from the 8-byte data block beyond the key at [0:4]. # Leave event.timestamp as None — it will be populated from the 0C record. pass def _decode_waveform_record_into(data: bytes, event: Event) -> None: """ Decode a 210-byte SUB F3 (FULL_WAVEFORM_RECORD) record into an Event. The *data* argument is the raw record bytes returned by MiniMateProtocol.read_waveform_record() — i.e. data_rsp.data[11:11+0xD2]. Extracts (all ✅ confirmed 2026-04-01 against Blastware event report): - timestamp: 9-byte format at bytes [0:9] - record_type: sub_code at byte[1] (0x10 = "Waveform") - peak_values: label-based float32 at label+6 for Tran/Vert/Long/MicL - peak_vector_sum: IEEE 754 BE float at offset 87 - project_info: "Project:", "Client:", etc. string search Modifies event in-place. """ # ── Record type ─────────────────────────────────────────────────────────── # Decoded from byte[1] (sub_code) first so we can gate timestamp parsing. try: event.record_type = _extract_record_type(data) except Exception as exc: log.warning("waveform record type decode failed: %s", exc) # ── Timestamp ───────────────────────────────────────────────────────────── # 9-byte format for sub_code=0x10 Waveform records: # [day][sub_code][month][year:2 BE][unknown][hour][min][sec] # sub_code=0x10 and sub_code=0x03 have different timestamp byte layouts. # Both confirmed against Blastware event reports (BE11529, 2026-04-01 and 2026-04-03). if event.record_type == "Waveform": try: event.timestamp = Timestamp.from_waveform_record(data) except Exception as exc: log.warning("waveform record timestamp decode failed: %s", exc) elif event.record_type == "Waveform (Continuous)": try: event.timestamp = Timestamp.from_continuous_record(data) except Exception as exc: log.warning("continuous record timestamp decode failed: %s", exc) # ── Peak values (per-channel PPV + Peak Vector Sum) ─────────────────────── try: peak_values = _extract_peak_floats(data) if peak_values: event.peak_values = peak_values except Exception as exc: log.warning("waveform record peak decode failed: %s", exc) # ── Project strings ─────────────────────────────────────────────────────── try: project_info = _extract_project_strings(data) if project_info: event.project_info = project_info except Exception as exc: log.warning("waveform record project strings decode failed: %s", exc) def _decode_a5_metadata_into(frames_data: list[bytes], event: Event) -> None: """ Search A5 (BULK_WAVEFORM_STREAM) frame data for event-time metadata strings and populate event.project_info. This is the authoritative source for event-time metadata — it reflects the device setup AT THE TIME the event was recorded, not the current device configuration. The metadata lives in a middle A5 frame (confirmed: A5[7] of 9 large frames for the 1-2-26 capture): Confirmed needle locations in A5[7].data (2026-04-02 from 1-2-26 capture): b"Project:" at data[626] b"Client:" at data[676] b"User Name:" at data[703] b"Seis Loc:" at data[735] b"Extended Notes" at data[774] All frames are concatenated for a single-pass needle search. NOTE: 5A appears to return the compliance config from when the *monitoring session first started*, not per-event config. This means: - "Project:" from 5A must NOT overwrite a value already set from the 0C record, because 0C carries the correct per-event project name. - "Client:", "User Name:", "Seis Loc:", "Extended Notes" are NOT present in the 210-byte 0C record at all, so 5A remains the sole source for those fields. Modifies event in-place. """ combined = b"".join(frames_data) def _find_string_after(needle: bytes, max_len: int = 64) -> Optional[str]: pos = combined.find(needle) if pos < 0: return None value_start = pos + len(needle) while value_start < len(combined) and combined[value_start] == 0: value_start += 1 if value_start >= len(combined): return None end = value_start while end < len(combined) and combined[end] != 0 and (end - value_start) < max_len: end += 1 s = combined[value_start:end].decode("ascii", errors="replace").strip() return s or None project = _find_string_after(b"Project:") client = _find_string_after(b"Client:") operator = _find_string_after(b"User Name:") location = _find_string_after(b"Seis Loc:") notes = _find_string_after(b"Extended Notes") if not any([project, client, operator, location, notes]): log.debug("a5 metadata: no project strings found in %d frames", len(frames_data)) return if event.project_info is None: event.project_info = ProjectInfo() pi = event.project_info # "project" comes from 0C (per-event, set during _decode_waveform_record_into). # 5A returns session-start compliance config — its "project" value is NOT # per-event authoritative. Only use the 5A project as a fallback if 0C # didn't supply one. # client / operator / sensor_location / notes are NOT in the 0C record at all # (confirmed from CLAUDE.md §SUB 5A), so 5A is the sole source for those. if project and not pi.project: pi.project = project if client: pi.client = client if operator: pi.operator = operator if location: pi.sensor_location = location if notes: pi.notes = notes log.debug( "a5 metadata: project=%r client=%r operator=%r location=%r", pi.project, pi.client, pi.operator, pi.sensor_location, ) def _decode_a5_waveform( frames_data: list[bytes], event: Event, ) -> None: """ Decode the raw 4-channel ADC waveform from a complete set of SUB 5A (BULK_WAVEFORM_STREAM) frame payloads and populate event.raw_samples, event.total_samples, event.pretrig_samples, and event.rectime_seconds. This requires ALL A5 frames (stop_after_metadata=False), not just the metadata-bearing subset. ── Waveform format (confirmed from 4-2-26 blast capture) ─────────────────── The blast waveform is 4-channel interleaved signed 16-bit little-endian, 8 bytes per sample-set: [T_lo T_hi V_lo V_hi L_lo L_hi M_lo M_hi] × N where T=Tran, V=Vert, L=Long, M=Mic. Channel ordering follows the Blastware convention [Tran, Vert, Long, Mic] = [ch0, ch1, ch2, ch3]. ⚠️ Channel ordering is a confirmed CONVENTION — the physical ordering on the ADC mux is not independently verifiable from the saturating blast captures we have. The convention is consistent with Blastware labeling (Tran is always the first channel field in the A5 STRT+waveform stream). ── Frame structure ────────────────────────────────────────────────────────── A5[0] (probe response): db[7:] = [11-byte header] [21-byte STRT record] [6-byte preamble] [waveform ...] STRT: b'STRT' at offset 11, total 21 bytes +8 uint16 BE: total_samples (expected full-record sample-sets) +16 uint16 BE: pretrig_samples (pre-trigger sample count) +18 uint8: rectime_seconds (record duration) Preamble: 6 bytes after the STRT record (confirmed from 4-2-26 blast capture): bytes 21-22: 0x00 0x00 (null padding) bytes 23-26: 0xFF × 4 (sync sentinel / alignment marker) Waveform starts at strt_pos + 27 within db[7:]. A5[1..N] (chunk responses): db[7:] = [8-byte per-frame header] [waveform bytes ...] Header: [ctr LE uint16, 0x00 × 6] — frame sequence counter Waveform starts at byte 8 of db[7:]. ── Cross-frame alignment ──────────────────────────────────────────────────── Frame waveform chunk sizes are NOT multiples of 8. Naive concatenation scrambles channel assignments at frame boundaries. Fix: track the cumulative global byte offset; at each new frame, the starting alignment within the T,V,L,M cycle is (global_offset % 8). Confirmed sizes from 4-2-26 (A5[0..8], skipping A5[7] metadata frame and A5[9] terminator): Frame 0: 934B Frame 1: 963B Frame 2: 946B Frame 3: 960B Frame 4: 952B Frame 5: 946B Frame 6: 941B Frame 8: 992B — none are multiples of 8. ── Modifies event in-place. ───────────────────────────────────────────────── """ if not frames_data: log.debug("_decode_a5_waveform: no frames provided") return # ── Parse STRT record from A5[0] ──────────────────────────────────────── w0 = frames_data[0][7:] # db[7:] for A5[0] strt_pos = w0.find(b"STRT") if strt_pos < 0: log.warning("_decode_a5_waveform: STRT record not found in A5[0]") return # STRT record layout (21 bytes, offsets relative to b'STRT'): # +0..3 magic b'STRT' # +8..9 uint16 BE total_samples (full-record expected sample-set count) # +16..17 uint16 BE pretrig_samples # +18 uint8 rectime_seconds strt = w0[strt_pos : strt_pos + 21] if len(strt) < 21: log.warning("_decode_a5_waveform: STRT record truncated (%dB)", len(strt)) return total_samples = struct.unpack_from(">H", strt, 8)[0] pretrig_samples = struct.unpack_from(">H", strt, 16)[0] rectime_seconds = strt[18] event.total_samples = total_samples event.pretrig_samples = pretrig_samples event.rectime_seconds = rectime_seconds log.debug( "_decode_a5_waveform: STRT total_samples=%d pretrig=%d rectime=%ds", total_samples, pretrig_samples, rectime_seconds, ) # ── Collect per-frame waveform bytes with global offset tracking ───────── # global_offset is the cumulative byte count across all frames, used to # compute the channel alignment at each frame boundary. chunks: list[tuple[int, bytes]] = [] # (frame_idx, waveform_bytes) global_offset = 0 for fi, db in enumerate(frames_data): w = db[7:] # A5[0]: waveform begins after the 21-byte STRT record and 6-byte preamble. # Layout: STRT(21B) + null-pad(2B) + 0xFF sentinel(4B) = 27 bytes total. if fi == 0: sp = w.find(b"STRT") if sp < 0: continue wave = w[sp + 27 :] # Frame 7 carries event-time metadata strings ("Project:", "Client:", …) # and no waveform ADC data. elif fi == 7: continue # Terminator frames have page_key=0x0000 and are excluded upstream # (read_bulk_waveform_stream returns early on page_key==0). # No hardcoded frame-index skip here — all non-metadata frames are data. else: # Strip the 8-byte per-frame header (ctr + 6 zero bytes) if len(w) < 8: continue wave = w[8:] if len(wave) < 2: continue chunks.append((fi, wave)) global_offset += len(wave) total_bytes = global_offset n_sets = total_bytes // 8 log.debug( "_decode_a5_waveform: %d chunks, %dB total → %d complete sample-sets " "(%d of %d expected; %.0f%%)", len(chunks), total_bytes, n_sets, n_sets, total_samples, 100.0 * n_sets / total_samples if total_samples else 0, ) if n_sets == 0: log.warning("_decode_a5_waveform: no complete sample-sets found") return # ── Concatenate into one stream and decode ─────────────────────────────── # Rather than concatenating and then fixing up, we reconstruct the correct # channel-aligned stream by skipping misaligned partial sample-sets at each # frame start. # # At global byte offset G, the byte position within the T,V,L,M cycle is # G % 8. When a frame starts with align = G % 8 ≠ 0, the first # (8 - align) bytes of that frame complete a partial sample-set that # cannot be decoded cleanly, so we skip them and start from the next full # T-boundary. # # This produces a slightly smaller decoded set but preserves correct # channel alignment throughout. tran: list[int] = [] vert: list[int] = [] long_: list[int] = [] mic: list[int] = [] running_offset = 0 for fi, wave in chunks: align = running_offset % 8 # byte position within T,V,L,M cycle skip = (8 - align) % 8 # bytes to discard to reach next T start if skip > 0 and skip < len(wave): usable = wave[skip:] elif align == 0: usable = wave else: running_offset += len(wave) continue # entire frame is a partial sample-set n_usable = len(usable) // 8 for i in range(n_usable): off = i * 8 tran.append( struct.unpack_from(" Optional[str]: """ Decode the recording mode from byte[1] of the 210-byte waveform record. Byte[1] is the sub-record code that immediately follows the day byte in the 9-byte timestamp header at the start of each waveform record: [day:1] [sub_code:1] [month:1] [year:2 BE] ... Confirmed codes (✅ 2026-04-01): 0x10 → "Waveform" (continuous / single-shot mode) Histogram mode code is not yet confirmed — a histogram event must be captured with debug=true to identify it. Returns None for unknown codes. """ if len(data) < 2: return None code = data[1] if code == 0x10: return "Waveform" if code == 0x03: # Continuous mode waveform record (confirmed by user — NOT a monitor log). # The byte layout differs from 0x10 single-shot records: the timestamp # fields decode as garbage under the 0x10 waveform layout. # TODO: confirm correct timestamp layout for 0x03 records from a known-time event. return "Waveform (Continuous)" log.warning("_extract_record_type: unknown sub_code=0x%02X", code) return f"Unknown(0x{code:02X})" def _extract_peak_floats(data: bytes) -> Optional[PeakValues]: """ Locate per-channel peak particle velocity values in the 210-byte waveform record by searching for the embedded channel label strings ("Tran", "Vert", "Long", "MicL") and reading the IEEE 754 BE float at label_offset + 6. The floats are NOT 4-byte aligned in the record (confirmed from 3-31-26 capture), so the previous step-4 scan missed Tran, Long, and MicL entirely. Label-based lookup is the correct approach. Channel labels are separated by inner-frame bytes (0x10 0x03 = DLE ETX), which the S3FrameParser preserves as literal data. Searching for the 4-byte ASCII label strings is robust to this structure. Returns PeakValues if at least one channel label is found, else None. """ # (label_bytes, field_name) channels = ( (b"Tran", "tran"), (b"Vert", "vert"), (b"Long", "long_"), (b"MicL", "micl"), ) vals: dict[str, float] = {} for label_bytes, field in channels: pos = data.find(label_bytes) if pos < 0: continue float_off = pos + 6 if float_off + 4 > len(data): log.debug("peak float: label %s at %d but float runs past end", label_bytes, pos) continue try: val = struct.unpack_from(">f", data, float_off)[0] except struct.error: continue log.debug("peak float: %s at label+6 (%d) = %.6f", label_bytes.decode(), float_off, val) vals[field] = val if not vals: return None # ── Peak Vector Sum — label-relative offset ────────────────────────────── # = √(Tran² + Vert² + Long²) at the sample instant of maximum combined geo # motion, NOT the vector sum of the three per-channel peak values (which may # occur at different times). Matches Blastware "Peak Vector Sum" exactly. # # PVS lives at tran_label_pos - 12 for both 0x10 and 0x03 record types. # Confirmed from raw bytes of two events (2026-04-01 and 2026-04-03): # 0x10: Tran at byte 98, PVS float at bytes 86–89 (98 - 12 = 86) ✅ # 0x03: Tran at byte 104, PVS float at bytes 92–95 (104 - 12 = 92) ✅ # Using a fixed absolute offset (87) breaks for 0x03 records because their # timestamp header is 10 bytes instead of 9, shifting all subsequent fields. pvs: Optional[float] = None tran_pos = data.find(b"Tran") if tran_pos >= 12: try: pvs = struct.unpack_from(">f", data, tran_pos - 12)[0] except struct.error: pass return PeakValues( tran=vals.get("tran"), vert=vals.get("vert"), long=vals.get("long_"), micl=vals.get("micl"), peak_vector_sum=pvs, ) def _extract_project_strings(data: bytes) -> Optional[ProjectInfo]: """ Search the waveform record payload for known ASCII label strings ("Project:", "Client:", "User Name:", "Seis Loc:", "Extended Notes") and extract the associated value strings that follow them. Layout (per §7.5): each entry is [label ~16 bytes][value ~32 bytes], null-padded. We find the label, then read the next non-null chars. """ def _find_string_after(needle: bytes, max_value_len: int = 64) -> Optional[str]: pos = data.find(needle) if pos < 0: return None # Skip the label (including null padding) until we find a non-null value # The value starts at pos+len(needle), but may have a gap of null bytes value_start = pos + len(needle) # Skip nulls while value_start < len(data) and data[value_start] == 0: value_start += 1 if value_start >= len(data): return None # Read until null terminator or max_value_len end = value_start while end < len(data) and data[end] != 0 and (end - value_start) < max_value_len: end += 1 value = data[value_start:end].decode("ascii", errors="replace").strip() return value or None project = _find_string_after(b"Project:") client = _find_string_after(b"Client:") operator = _find_string_after(b"User Name:") location = _find_string_after(b"Seis Loc:") notes = _find_string_after(b"Extended Notes") if not any([project, client, operator, location, notes]): return None return ProjectInfo( project=project, client=client, operator=operator, sensor_location=location, notes=notes, ) def _encode_compliance_config( raw: bytes, *, sample_rate: Optional[int] = None, record_time: Optional[float] = None, trigger_level_geo: Optional[float] = None, alarm_level_geo: Optional[float] = None, max_range_geo: Optional[float] = None, project: Optional[str] = None, client_name: Optional[str] = None, operator: Optional[str] = None, seis_loc: Optional[str] = None, notes: Optional[str] = None, ) -> bytes: """ Patch a live 2126-byte compliance buffer (read from the device) with any supplied field values and return the 2128-byte write payload. Only non-None arguments are modified; everything else is round-tripped verbatim. Numeric field locations (all anchor-relative or label-relative — immune to DLE-jitter shifts): Anchor: b'\\xbe\\x80\\x00\\x00\\x00\\x00' (confirmed stable, both BE11529 and BE18189) sample_rate → uint16 BE at anchor_pos - 6 record_time → float32 BE at anchor_pos + 6 Channel block (anchored on b"Tran" with unit-string guard): max_range_geo → float32 BE at tran_pos + 28 trigger_level_geo → float32 BE at tran_pos + 34 alarm_level_geo → float32 BE at tran_pos + 42 String field locations (64-byte slots, label+22 format): b"Project:" → value at label_pos + 22, max 41 chars + null b"Client:" → value at label_pos + 22, max 41 chars + null b"User Name:" → value at label_pos + 22, max 41 chars + null b"Seis Loc:" → value at label_pos + 22, max 41 chars + null b"Extended Notes"→ value at label_pos + 22, max 41 chars + null Returns: 2128-byte write payload: patched 2126-byte buffer + 2-byte footer \\x00\\x00. (Checksum formula for the footer is unknown; \\x00\\x00 confirmed accepted by the device in POC test 2026-04-07.) Raises: ValueError: if raw is not exactly 2126 bytes. """ if len(raw) != 2126: raise ValueError(f"_encode_compliance_config: expected 2126 bytes, got {len(raw)}") buf = bytearray(raw) # ── Numeric: sample_rate + record_time (anchor-relative) ───────────────── _ANC = b'\xbe\x80\x00\x00\x00\x00' _anc = buf.find(_ANC, 0, 150) if sample_rate is not None: if _anc < 6: log.warning("_encode_compliance_config: anchor not found — cannot write sample_rate") else: struct.pack_into(">H", buf, _anc - 6, sample_rate) log.debug("_encode_compliance_config: sample_rate=%d → offset %d", sample_rate, _anc - 6) if record_time is not None: if _anc < 0 or _anc + 10 > len(buf): log.warning("_encode_compliance_config: anchor not found — cannot write record_time") else: struct.pack_into(">f", buf, _anc + 6, record_time) log.debug("_encode_compliance_config: record_time=%.3f → offset %d", record_time, _anc + 6) # ── Numeric: channel block (Tran label + unit-string guard) ─────────────── _needs_channel = any( v is not None for v in (trigger_level_geo, alarm_level_geo, max_range_geo) ) if _needs_channel: _tran = buf.find(b"Tran", 44) _valid = ( _tran >= 0 and buf[_tran + 4 : _tran + 5] != b"2" and _tran + 50 <= len(buf) and buf[_tran + 38 : _tran + 42] == b"in.\x00" and buf[_tran + 46 : _tran + 50] == b"/s\x00\x00" ) if not _valid: log.warning( "_encode_compliance_config: 'Tran' channel block not found or unit " "guard failed — trigger/alarm/max_range will not be written" ) else: if max_range_geo is not None: struct.pack_into(">f", buf, _tran + 28, max_range_geo) log.debug("_encode_compliance_config: max_range_geo=%.4f → offset %d", max_range_geo, _tran + 28) if trigger_level_geo is not None: struct.pack_into(">f", buf, _tran + 34, trigger_level_geo) log.debug("_encode_compliance_config: trigger_level_geo=%.4f → offset %d", trigger_level_geo, _tran + 34) if alarm_level_geo is not None: struct.pack_into(">f", buf, _tran + 42, alarm_level_geo) log.debug("_encode_compliance_config: alarm_level_geo=%.4f → offset %d", alarm_level_geo, _tran + 42) # ── ASCII strings (64-byte slot, value at label_pos+22) ─────────────────── def _set_string(label: bytes, value: Optional[str]) -> None: if value is None: return idx = buf.find(label) if idx < 0: log.warning("_encode_compliance_config: label %r not found", label) return val_bytes = value.encode("ascii", errors="replace")[:_COMPLIANCE_VALUE_MAX - 1] padded = val_bytes + b"\x00" * (_COMPLIANCE_VALUE_MAX - len(val_bytes)) buf[idx + _COMPLIANCE_VALUE_OFFSET : idx + _COMPLIANCE_SLOT_SIZE] = padded log.debug("_encode_compliance_config: %r → %r", label, value) _set_string(b"Project:", project) _set_string(b"Client:", client_name) _set_string(b"User Name:", operator) _set_string(b"Seis Loc:", seis_loc) _set_string(b"Extended Notes", notes) # 2-byte footer — checksum formula unknown; \x00\x00 confirmed accepted by device return bytes(buf) + b"\x00\x00" def _decode_compliance_config_into(data: bytes, info: DeviceInfo) -> None: """ Decode a 2090-byte SUB 1A (COMPLIANCE_CONFIG) response into a ComplianceConfig. The *data* argument is the raw bytes returned by read_compliance_config() (frames B+C+D concatenated, echo headers stripped). Confirmed field locations (BE11529 with 3-step read, duplicate detection): - cfg[89] = setup_name: first long ASCII string in cfg[40:250] ✅ - ANCHOR = b'\\xbe\\x80\\x00\\x00\\x00\\x00' in cfg[0:150] ✅ (revised 2026-04-07) - anchor - 6 = sample_rate uint16_BE (1024 normal / 2048 fast / 4096 faster) - anchor + 6 = record_time float32_BE - "Project:" needle → project string - "Client:" needle → client string - "User Name:" needle → operator string - "Seis Loc:" needle → sensor_location string - "Extended Notes" needle → notes string Anchor approach is required because a DLE byte in the sample_rate field (4096 = 0x1000 → stored as 10 10 00 in raw S3 frame → unstuffed to 10 00, 1 byte shorter than 04 00 or 08 00) causes frame C to be 1 byte shorter for "faster" mode, shifting all subsequent offsets by 1. The 10-byte anchor is stable across all modes. Channel block layout (✅ confirmed 2026-04-02 from 3-11-26 E5 frame 78 and 1-2-26 A5 frame 77): "Tran" label at tran_pos tran_pos + 28 = max_range float32_BE (e.g. 6.206053 in/s) tran_pos + 34 = trigger_level float32_BE (e.g. 0.600000 in/s) tran_pos + 38 = "in.\\x00" (unit string anchor) tran_pos + 42 = alarm_level float32_BE (e.g. 1.250000 in/s) tran_pos + 46 = "/s\\x00\\x00" (unit string anchor) Modifies info.compliance_config in-place. """ if not data or len(data) < 40: log.warning("compliance config payload too short (%d bytes)", len(data)) return config = ComplianceConfig(raw=data) # ── Setup name ──────────────────────────────────────────────────────────── # The setup_name IS the string itself — it is NOT a label followed by a value. # It appears as the first long (>=8 char) ASCII string in cfg[40:250]. # The preceding bytes vary by device (cfg[88]=0x01 on BE11529); the string # itself is null-terminated. try: setup_name = _find_first_string(data, start=40, end=250, min_len=8) config.setup_name = setup_name if setup_name: log.debug("compliance_config: setup_name = %r", setup_name) except Exception as exc: log.warning("compliance_config: setup_name extraction failed: %s", exc) # ── Record time + sample rate — anchor-relative ─────────────────────────── # The 10-byte anchor sits between sample_rate and record_time in the cfg. # Absolute offsets are NOT reliable because sample_rate = 4096 (0x1000) is # DLE-escaped in the raw S3 frame (10 10 00 → 10 00 after unstuffing), # making frame C 1 byte shorter than for 1024/2048 and shifting everything. # sample_rate: uint16_BE at anchor - 2 # record_time: float32_BE at anchor + 10 # 6-byte suffix anchor — confirmed stable across BE11529 and bench unit (BE18189). # The preceding 4 bytes (old anchor prefix 01 2c / 00 3c) vary by unit config; # only be 80 00 00 00 00 is constant. # sample_rate : uint16 BE at anchor_pos - 6 # record_time : float32 BE at anchor_pos + 6 _ANCHOR = b'\xbe\x80\x00\x00\x00\x00' _anchor = data.find(_ANCHOR, 0, 150) if _anchor >= 6 and _anchor + 10 <= len(data): try: config.sample_rate = struct.unpack_from(">H", data, _anchor - 6)[0] log.debug( "compliance_config: sample_rate = %d Sa/s (anchor@%d)", config.sample_rate, _anchor ) except Exception as exc: log.warning("compliance_config: sample_rate extraction failed: %s", exc) try: config.record_time = struct.unpack_from(">f", data, _anchor + 6)[0] log.debug( "compliance_config: record_time = %.3f s (anchor@%d)", config.record_time, _anchor ) except Exception as exc: log.warning("compliance_config: record_time extraction failed: %s", exc) else: log.warning( "compliance_config: anchor %s not found in cfg[0:150] (len=%d) " "— sample_rate and record_time will be None", _ANCHOR.hex(), len(data), ) # ── Project strings ─────────────────────────────────────────────────────── try: def _find_string_after(needle: bytes, max_len: int = 64) -> Optional[str]: pos = data.find(needle) if pos < 0: return None value_start = pos + len(needle) while value_start < len(data) and data[value_start] == 0: value_start += 1 if value_start >= len(data): return None end = value_start while end < len(data) and data[end] != 0 and (end - value_start) < max_len: end += 1 s = data[value_start:end].decode("ascii", errors="replace").strip() return s or None config.project = _find_string_after(b"Project:") config.client = _find_string_after(b"Client:") config.operator = _find_string_after(b"User Name:") config.sensor_location = _find_string_after(b"Seis Loc:") config.notes = _find_string_after(b"Extended Notes") if config.project: log.debug("compliance_config: project = %s", config.project) if config.client: log.debug("compliance_config: client = %s", config.client) except Exception as exc: log.warning("compliance_config: project string extraction failed: %s", exc) # ── Channel block: trigger_level_geo, alarm_level_geo, max_range_geo ───── # The channel block is only present in the full cfg (frame D delivered, # ~2126 bytes). Layout confirmed 2026-04-02 from both E5 frame 78 of the # 3-11-26 compliance-config capture and A5 frame 77 of the 1-2-26 event # download capture: # # "Tran" label at tran_pos (+0 to +3) # max_range float32_BE at tran_pos + 28 (e.g. 6.206053 in/s) # trigger float32_BE at tran_pos + 34 (e.g. 0.600000 in/s) # "in.\x00" unit string at tran_pos + 38 ✅ confirmed # alarm float32_BE at tran_pos + 42 (e.g. 1.250000 in/s) # "/s\x00\x00" unit string at tran_pos + 46 ✅ confirmed # # Unit strings serve as layout anchors — if they match, the float offsets # are reliable. Skip "Tran2" (a later repeated label) via the +4 check. try: tran_pos = data.find(b"Tran", 44) if ( tran_pos >= 0 and data[tran_pos + 4 : tran_pos + 5] != b"2" # not "Tran2" and tran_pos + 50 <= len(data) and data[tran_pos + 38 : tran_pos + 42] == b"in.\x00" and data[tran_pos + 46 : tran_pos + 50] == b"/s\x00\x00" ): config.max_range_geo = struct.unpack_from(">f", data, tran_pos + 28)[0] config.trigger_level_geo = struct.unpack_from(">f", data, tran_pos + 34)[0] config.alarm_level_geo = struct.unpack_from(">f", data, tran_pos + 42)[0] log.debug( "compliance_config: trigger=%.4f alarm=%.4f max_range=%.4f in/s", config.trigger_level_geo, config.alarm_level_geo, config.max_range_geo, ) elif tran_pos >= 0: log.warning( "compliance_config: 'Tran' at %d — unit string check failed: " "+38..+42=%s (want 696e2e00) +46..+50=%s (want 2f730000)", tran_pos, data[tran_pos + 38 : tran_pos + 42].hex() if tran_pos + 42 <= len(data) else "??", data[tran_pos + 46 : tran_pos + 50].hex() if tran_pos + 50 <= len(data) else "??", ) else: log.debug("compliance_config: channel block not present in cfg (len=%d)", len(data)) except Exception as exc: log.warning("compliance_config: channel block extraction failed: %s", exc) info.compliance_config = config def _find_first_string(data: bytes, start: int, end: int, min_len: int) -> Optional[str]: """ Return the first null-terminated printable ASCII string of length >= min_len found in data[start:end]. """ i = start end = min(end, len(data)) while i < end: if 0x20 <= data[i] < 0x7F: j = i while j < len(data) and 0x20 <= data[j] < 0x7F: j += 1 if j - i >= min_len: return data[i:j].decode("ascii", errors="replace").strip() i = j + 1 else: i += 1 return None def _decode_monitor_status(data: bytes) -> MonitorStatus: """ Decode SUB 0x1C response payload into a MonitorStatus object. data is the raw S3 frame .data attribute (includes the 11-byte section header, so field offsets below are relative to data[11]). Payload length indicates mode: 44 bytes (0x2C): idle — full status block with battery + memory fields 12 bytes : actively monitoring — abbreviated, no battery/memory Field offsets (idle mode, confirmed 4-8-26/2ndtry): data[11 + 0x2F : 11 + 0x31] battery × 100 uint16 BE data[11 + 0x31 : 11 + 0x35] memory_total uint32 BE bytes data[11 + 0x35 : 11 + 0x39] memory_free uint32 BE bytes """ # The data section starts at offset 11 (after the S3 section header). section = data[11:] if len(data) > 11 else data # Log the raw payload at WARNING level so we can see it in the server logs # and confirm the field offsets and is_monitoring detection are correct. log.warning( "_decode_monitor_status: total data=%d bytes section=%d bytes hex=%s", len(data), len(section), section.hex(), ) # Mode: idle payload is 44 bytes; monitoring is shorter (12 bytes observed) is_monitoring = len(section) < 20 battery_v = None memory_total = None memory_free = None if not is_monitoring and len(section) >= 0x39: batt_raw = struct.unpack(">H", section[0x2F:0x31])[0] battery_v = batt_raw / 100.0 memory_total = struct.unpack(">I", section[0x31:0x35])[0] memory_free = struct.unpack(">I", section[0x35:0x39])[0] return MonitorStatus( is_monitoring=is_monitoring, battery_v=battery_v, memory_total=memory_total, memory_free=memory_free, )