""" client.py — MiniMateClient: the top-level public API for the library. Combines transport, protocol, and model decoding into a single easy-to-use class. This is the only layer that the SFM server (sfm/server.py) imports directly. Design: stateless per-call (connect → do work → disconnect). The client does not hold an open connection between calls. This keeps the first implementation simple and matches Blastware's observed behaviour. Persistent connections can be added later without changing the public API. Example (serial): from minimateplus import MiniMateClient with MiniMateClient("COM5") as device: info = device.connect() # POLL handshake + identity read events = device.get_events() # download all events Example (TCP / modem): from minimateplus import MiniMateClient from minimateplus.transport import TcpTransport transport = TcpTransport("203.0.113.5", port=12345) with MiniMateClient(transport=transport) as device: info = device.connect() """ from __future__ import annotations import logging import struct from typing import Optional from .framing import S3Frame from .models import ( ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp, ) from .protocol import MiniMateProtocol, ProtocolError from .protocol import ( SUB_SERIAL_NUMBER, SUB_FULL_CONFIG, ) from .transport import SerialTransport, BaseTransport log = logging.getLogger(__name__) # ── MiniMateClient ──────────────────────────────────────────────────────────── class MiniMateClient: """ High-level client for a single MiniMate Plus device. Args: port: Serial port name (e.g. "COM5", "/dev/ttyUSB0"). Not required when a pre-built transport is provided. baud: Baud rate (default 38400, ignored when transport is provided). timeout: Per-request receive timeout in seconds (default 15.0). transport: Pre-built transport (SerialTransport or TcpTransport). If None, a SerialTransport is constructed from port/baud. """ def __init__( self, port: str = "", baud: int = 38_400, timeout: float = 15.0, transport: Optional[BaseTransport] = None, ) -> None: self.port = port self.baud = baud self.timeout = timeout self._transport: Optional[BaseTransport] = transport self._proto: Optional[MiniMateProtocol] = None # ── Connection lifecycle ────────────────────────────────────────────────── def open(self) -> None: """Open the transport connection.""" if self._transport is None: self._transport = SerialTransport(self.port, self.baud) if not self._transport.is_connected: self._transport.connect() self._proto = MiniMateProtocol(self._transport, recv_timeout=self.timeout) def close(self) -> None: """Close the transport connection.""" if self._transport and self._transport.is_connected: self._transport.disconnect() self._proto = None @property def is_open(self) -> bool: return bool(self._transport and self._transport.is_connected) # ── Context manager ─────────────────────────────────────────────────────── def __enter__(self) -> "MiniMateClient": self.open() return self def __exit__(self, *_) -> None: self.close() # ── Public API ──────────────────────────────────────────────────────────── def connect(self) -> DeviceInfo: """ Perform the startup handshake and read device identity + compliance config. Opens the connection if not already open. Reads: 1. POLL handshake (startup) 2. SUB 15 — serial number 3. SUB 01 — full config block (firmware, model strings) 4. SUB 1A — compliance config (record time, trigger/alarm levels, project strings) Returns: Populated DeviceInfo with compliance_config cached. Raises: ProtocolError: on any communication failure. """ if not self.is_open: self.open() proto = self._require_proto() log.info("connect: POLL startup") proto.startup() log.info("connect: reading serial number (SUB 15)") sn_data = proto.read(SUB_SERIAL_NUMBER) device_info = _decode_serial_number(sn_data) log.info("connect: reading full config (SUB 01)") cfg_data = proto.read(SUB_FULL_CONFIG) _decode_full_config_into(cfg_data, device_info) log.info("connect: reading compliance config (SUB 1A)") try: cc_data = proto.read_compliance_config() _decode_compliance_config_into(cc_data, device_info) except ProtocolError as exc: log.warning("connect: compliance config read failed: %s — continuing", exc) log.info("connect: %s", device_info) return device_info def get_events(self, include_waveforms: bool = True, debug: bool = False) -> list[Event]: """ Download all stored events from the device using the confirmed 1E → 0A → 0C → 1F event-iterator protocol. Sequence (confirmed from 3-31-26 Blastware capture): 1. SUB 1E — get first waveform key 2. For each key until b'\\x00\\x00\\x00\\x00': a. SUB 0A — waveform header (first event only, to confirm full record) b. SUB 0C — full waveform record (peak values, project strings) c. SUB 1F — advance to next key (token=0xFE skips partial bins) Subsequent keys returned by 1F (token=0xFE) are guaranteed to be full records, so 0A is only called for the first event. This exactly matches Blastware's observed behaviour. Raw ADC waveform samples (SUB 5A bulk stream) are NOT downloaded here — they are large (several MB per event) and fetched separately. include_waveforms is reserved for a future call. Returns: List of Event objects, one per stored waveform record. Raises: ProtocolError: on unrecoverable communication failure. """ proto = self._require_proto() log.info("get_events: requesting first event (SUB 1E)") try: key4, _event_data8 = proto.read_event_first() except ProtocolError as exc: raise ProtocolError(f"get_events: 1E failed: {exc}") from exc if key4 == b"\x00\x00\x00\x00": log.info("get_events: device reports no stored events") return [] events: list[Event] = [] idx = 0 is_first = True while key4 != b"\x00\x00\x00\x00": log.info( "get_events: record %d key=%s", idx, key4.hex() ) ev = Event(index=idx) # First event: call 0A to verify it's a full record (0x30 length). # Subsequent keys come from 1F(0xFE) which guarantees full records, # so we skip 0A for those — exactly matching Blastware behaviour. proceed = True if is_first: try: _hdr, rec_len = proto.read_waveform_header(key4) if rec_len < 0x30: log.warning( "get_events: first key=%s is partial (len=0x%02X) — skipping", key4.hex(), rec_len, ) proceed = False except ProtocolError as exc: log.warning( "get_events: 0A failed for key=%s: %s — skipping 0C", key4.hex(), exc, ) proceed = False is_first = False if proceed: # SUB 0C — full waveform record (peak values, project strings) try: record = proto.read_waveform_record(key4) if debug: ev._raw_record = record _decode_waveform_record_into(record, ev) except ProtocolError as exc: log.warning( "get_events: 0C failed for key=%s: %s", key4.hex(), exc ) events.append(ev) idx += 1 # SUB 1F — advance to the next full waveform record key try: key4 = proto.advance_event() except ProtocolError as exc: log.warning("get_events: 1F failed: %s — stopping iteration", exc) break log.info("get_events: downloaded %d event(s)", len(events)) return events # ── Internal helpers ────────────────────────────────────────────────────── def _require_proto(self) -> MiniMateProtocol: if self._proto is None: raise RuntimeError("MiniMateClient is not connected. Call open() first.") return self._proto # ── Decoder functions ───────────────────────────────────────────────────────── # # Pure functions: bytes → model field population. # Kept here (not in models.py) to isolate protocol knowledge from data shapes. def _decode_serial_number(data: bytes) -> DeviceInfo: """ Decode SUB EA (SERIAL_NUMBER_RESPONSE) payload into a new DeviceInfo. Layout (10 bytes total per §7.2): bytes 0–7: serial string, null-terminated, null-padded ("BE18189\\x00") byte 8: unit-specific trailing byte (purpose unknown ❓) byte 9: firmware minor version (0x11 = 17) ✅ Returns: New DeviceInfo with serial, firmware_minor, serial_trail_0 populated. """ # data is data_rsp.data = payload[5:]. The 11-byte section header occupies # data[0..10]: [LENGTH_ECHO:1][00×4][KEY_ECHO:4][00×2]. # Actual serial payload starts at data[11]. actual = data[11:] if len(data) > 11 else data if len(actual) < 9: # Short payload — gracefully degrade serial = actual.rstrip(b"\x00").decode("ascii", errors="replace") return DeviceInfo(serial=serial, firmware_minor=0) serial = actual[:8].rstrip(b"\x00").decode("ascii", errors="replace") trail_0 = actual[8] if len(actual) > 8 else None fw_minor = actual[9] if len(actual) > 9 else 0 return DeviceInfo( serial=serial, firmware_minor=fw_minor, serial_trail_0=trail_0, ) def _decode_full_config_into(data: bytes, info: DeviceInfo) -> None: """ Decode SUB FE (FULL_CONFIG_RESPONSE) payload into an existing DeviceInfo. The FE response arrives as a composite S3 outer frame whose data section contains inner DLE-framed sub-frames. Because of this nesting the §7.3 fixed offsets (0x34, 0x3C, 0x44, 0x6D) are unreliable — they assume a clean non-nested payload starting at byte 0. Instead we search the whole byte array for known ASCII patterns. The strings are long enough to be unique in any reasonable payload. Modifies info in-place. """ def _extract(needle: bytes, max_len: int = 32) -> Optional[str]: """Return the null-terminated ASCII string that starts with *needle*.""" pos = data.find(needle) if pos < 0: return None end = pos while end < len(data) and data[end] != 0 and (end - pos) < max_len: end += 1 s = data[pos:end].decode("ascii", errors="replace").strip() return s or None # ── Manufacturer and model are straightforward literal matches ──────────── info.manufacturer = _extract(b"Instantel") info.model = _extract(b"MiniMate Plus") # ── Firmware version: "S3xx.xx" — scan for the 'S3' prefix ─────────────── for i in range(len(data) - 5): if data[i] == ord('S') and data[i + 1] == ord('3') and chr(data[i + 2]).isdigit(): end = i while end < len(data) and data[end] not in (0, 0x20) and (end - i) < 12: end += 1 candidate = data[i:end].decode("ascii", errors="replace").strip() if "." in candidate and len(candidate) >= 5: info.firmware_version = candidate break # ── DSP version: numeric "xx.xx" — search for known prefixes ───────────── for prefix in (b"10.", b"11.", b"12.", b"9.", b"8."): pos = data.find(prefix) if pos < 0: continue end = pos while end < len(data) and data[end] not in (0, 0x20) and (end - pos) < 8: end += 1 candidate = data[pos:end].decode("ascii", errors="replace").strip() # Accept only strings that look like "digits.digits" if "." in candidate and all(c in "0123456789." for c in candidate): info.dsp_version = candidate break def _decode_event_count(data: bytes) -> int: """ Extract stored event count from SUB F7 (EVENT_INDEX_RESPONSE) payload. Layout per §7.4 (offsets from data section start): +00: 00 58 09 — total index size or record count ❓ +03: 00 00 00 01 — possibly stored event count = 1 ❓ We use bytes +03..+06 interpreted as uint32 BE as the event count. This is inferred (🔶) — the exact meaning of the first 3 bytes is unclear. """ if len(data) < 7: log.warning("event index payload too short (%d bytes), assuming 0 events", len(data)) return 0 # Try the uint32 at +3 first count = struct.unpack_from(">I", data, 3)[0] # Sanity check: MiniMate Plus manual says max ~1000 events if count > 1000: log.warning( "event count %d looks unreasonably large — clamping to 0", count ) return 0 return count def _decode_event_header_into(data: bytes, event: Event) -> None: """ Decode SUB E1 (EVENT_HEADER_RESPONSE) raw data section into an Event. The waveform key is at data[11:15] (extracted separately in MiniMateProtocol.read_event_first). The remaining 4 bytes at data[15:19] are not yet decoded (❓ — possibly sample rate or flags). Date information (year/month/day) lives in the waveform record (SUB 0C), not in the 1E response. This function is a placeholder for any future metadata we decode from the 8-byte 1E data block. Modifies event in-place. """ # Nothing confirmed yet from the 8-byte data block beyond the key at [0:4]. # Leave event.timestamp as None — it will be populated from the 0C record. pass def _decode_waveform_record_into(data: bytes, event: Event) -> None: """ Decode a 210-byte SUB F3 (FULL_WAVEFORM_RECORD) record into an Event. The *data* argument is the raw record bytes returned by MiniMateProtocol.read_waveform_record() — i.e. data_rsp.data[11:11+0xD2]. Extracts (all ✅ confirmed 2026-04-01 against Blastware event report): - timestamp: 9-byte format at bytes [0:9] - record_type: sub_code at byte[1] (0x10 = "Waveform") - peak_values: label-based float32 at label+6 for Tran/Vert/Long/MicL - peak_vector_sum: IEEE 754 BE float at offset 87 - project_info: "Project:", "Client:", etc. string search Modifies event in-place. """ # ── Timestamp ───────────────────────────────────────────────────────────── # 9-byte format: [day][sub_code][month][year:2 BE][unknown][hour][min][sec] try: event.timestamp = Timestamp.from_waveform_record(data) except Exception as exc: log.warning("waveform record timestamp decode failed: %s", exc) # ── Record type ─────────────────────────────────────────────────────────── # Decoded from byte[1] (sub_code), not from ASCII string search try: event.record_type = _extract_record_type(data) except Exception as exc: log.warning("waveform record type decode failed: %s", exc) # ── Peak values (per-channel PPV + Peak Vector Sum) ─────────────────────── try: peak_values = _extract_peak_floats(data) if peak_values: event.peak_values = peak_values except Exception as exc: log.warning("waveform record peak decode failed: %s", exc) # ── Project strings ─────────────────────────────────────────────────────── try: project_info = _extract_project_strings(data) if project_info: event.project_info = project_info except Exception as exc: log.warning("waveform record project strings decode failed: %s", exc) def _extract_record_type(data: bytes) -> Optional[str]: """ Decode the recording mode from byte[1] of the 210-byte waveform record. Byte[1] is the sub-record code that immediately follows the day byte in the 9-byte timestamp header at the start of each waveform record: [day:1] [sub_code:1] [month:1] [year:2 BE] ... Confirmed codes (✅ 2026-04-01): 0x10 → "Waveform" (continuous / single-shot mode) Histogram mode code is not yet confirmed — a histogram event must be captured with debug=true to identify it. Returns None for unknown codes. """ if len(data) < 2: return None code = data[1] if code == 0x10: return "Waveform" # TODO: add histogram sub_code once a histogram event is captured with debug=true return None def _extract_peak_floats(data: bytes) -> Optional[PeakValues]: """ Locate per-channel peak particle velocity values in the 210-byte waveform record by searching for the embedded channel label strings ("Tran", "Vert", "Long", "MicL") and reading the IEEE 754 BE float at label_offset + 6. The floats are NOT 4-byte aligned in the record (confirmed from 3-31-26 capture), so the previous step-4 scan missed Tran, Long, and MicL entirely. Label-based lookup is the correct approach. Channel labels are separated by inner-frame bytes (0x10 0x03 = DLE ETX), which the S3FrameParser preserves as literal data. Searching for the 4-byte ASCII label strings is robust to this structure. Returns PeakValues if at least one channel label is found, else None. """ # (label_bytes, field_name) channels = ( (b"Tran", "tran"), (b"Vert", "vert"), (b"Long", "long_"), (b"MicL", "micl"), ) vals: dict[str, float] = {} for label_bytes, field in channels: pos = data.find(label_bytes) if pos < 0: continue float_off = pos + 6 if float_off + 4 > len(data): log.debug("peak float: label %s at %d but float runs past end", label_bytes, pos) continue try: val = struct.unpack_from(">f", data, float_off)[0] except struct.error: continue log.debug("peak float: %s at label+6 (%d) = %.6f", label_bytes.decode(), float_off, val) vals[field] = val if not vals: return None # ── Peak Vector Sum — fixed offset 87 (✅ confirmed 2026-04-01) ─────────── # = √(Tran² + Vert² + Long²) at the sample instant of maximum combined geo # motion, NOT the vector sum of the three per-channel peak values (which may # occur at different times). Matches Blastware "Peak Vector Sum" exactly. pvs: Optional[float] = None if len(data) > 91: try: pvs = struct.unpack_from(">f", data, 87)[0] except struct.error: pass return PeakValues( tran=vals.get("tran"), vert=vals.get("vert"), long=vals.get("long_"), micl=vals.get("micl"), peak_vector_sum=pvs, ) def _extract_project_strings(data: bytes) -> Optional[ProjectInfo]: """ Search the waveform record payload for known ASCII label strings ("Project:", "Client:", "User Name:", "Seis Loc:", "Extended Notes") and extract the associated value strings that follow them. Layout (per §7.5): each entry is [label ~16 bytes][value ~32 bytes], null-padded. We find the label, then read the next non-null chars. """ def _find_string_after(needle: bytes, max_value_len: int = 64) -> Optional[str]: pos = data.find(needle) if pos < 0: return None # Skip the label (including null padding) until we find a non-null value # The value starts at pos+len(needle), but may have a gap of null bytes value_start = pos + len(needle) # Skip nulls while value_start < len(data) and data[value_start] == 0: value_start += 1 if value_start >= len(data): return None # Read until null terminator or max_value_len end = value_start while end < len(data) and data[end] != 0 and (end - value_start) < max_value_len: end += 1 value = data[value_start:end].decode("ascii", errors="replace").strip() return value or None project = _find_string_after(b"Project:") client = _find_string_after(b"Client:") operator = _find_string_after(b"User Name:") location = _find_string_after(b"Seis Loc:") notes = _find_string_after(b"Extended Notes") if not any([project, client, operator, location, notes]): return None return ProjectInfo( project=project, client=client, operator=operator, sensor_location=location, notes=notes, ) def _decode_compliance_config_into(data: bytes, info: DeviceInfo) -> None: """ Decode a 2090-byte SUB 1A (COMPLIANCE_CONFIG) response into a ComplianceConfig. The *data* argument is the raw bytes returned by read_compliance_config() (frames B+C+D concatenated, echo headers stripped). Confirmed field locations (BE11529 with 3-step read, 2126 bytes): - cfg[89] = setup_name: first long ASCII string in cfg[40:250] - cfg[??] = record_time: float32 BE — offset TBD (scan below) - cfg[??] = sample_rate: uint16 or float — offset TBD (scan below) - "Project:" needle → project string - "Client:" needle → client string - "User Name:" needle → operator string - "Seis Loc:" needle → sensor_location string - "Extended Notes" needle → notes string Modifies info.compliance_config in-place. """ if not data or len(data) < 40: log.warning("compliance config payload too short (%d bytes)", len(data)) return config = ComplianceConfig(raw=data) # ── Diagnostic: scan full cfg for strings and plausible floats ───────────── # Logged at WARNING so they appear without debug mode. Remove once field # offsets are confirmed for both BE11529 and BE18189. _cfg_diagnostic(data) # ── Setup name ──────────────────────────────────────────────────────────── # The setup_name IS the string itself — it is NOT a label followed by a value. # It appears as the first long (>=8 char) ASCII string in cfg[40:250]. # The preceding bytes vary by device (cfg[88]=0x01 on BE11529); the string # itself is null-terminated. try: setup_name = _find_first_string(data, start=40, end=250, min_len=8) config.setup_name = setup_name if setup_name: log.debug("compliance_config: setup_name = %r", setup_name) except Exception as exc: log.warning("compliance_config: setup_name extraction failed: %s", exc) # ── Record Time (⚠️ OFFSET UNCONFIRMED — waiting on diagnostic output) ── # Previous guess of 0x28 was wrong (that offset holds "(L)" string). # The correct offset will be clear once _cfg_diagnostic identifies it. # Temporarily disabled to avoid returning a garbage value. config.record_time = None # TODO: set once offset confirmed from diagnostic # ── Project strings ─────────────────────────────────────────────────────── try: def _find_string_after(needle: bytes, max_len: int = 64) -> Optional[str]: pos = data.find(needle) if pos < 0: return None value_start = pos + len(needle) while value_start < len(data) and data[value_start] == 0: value_start += 1 if value_start >= len(data): return None end = value_start while end < len(data) and data[end] != 0 and (end - value_start) < max_len: end += 1 s = data[value_start:end].decode("ascii", errors="replace").strip() return s or None config.project = _find_string_after(b"Project:") config.client = _find_string_after(b"Client:") config.operator = _find_string_after(b"User Name:") config.sensor_location = _find_string_after(b"Seis Loc:") config.notes = _find_string_after(b"Extended Notes") if config.project: log.debug("compliance_config: project = %s", config.project) if config.client: log.debug("compliance_config: client = %s", config.client) except Exception as exc: log.warning("compliance_config: project string extraction failed: %s", exc) # ── Sample rate (⚠️ OFFSET UNCONFIRMED — waiting on diagnostic output) ──── config.sample_rate = None # TODO: set once offset confirmed from diagnostic info.compliance_config = config def _find_first_string(data: bytes, start: int, end: int, min_len: int) -> Optional[str]: """ Return the first null-terminated printable ASCII string of length >= min_len found in data[start:end]. """ i = start end = min(end, len(data)) while i < end: if 0x20 <= data[i] < 0x7F: j = i while j < len(data) and 0x20 <= data[j] < 0x7F: j += 1 if j - i >= min_len: return data[i:j].decode("ascii", errors="replace").strip() i = j + 1 else: i += 1 return None def _cfg_diagnostic(data: bytes) -> None: """ Log all strings and plausible float32 BE values found in the full compliance config data. Used to map field offsets for record_time and sample_rate. Remove once offsets are confirmed. """ import struct as _struct # ── All printable ASCII strings >= 4 chars ──────────────────────────────── strings_found = [] i = 0 while i < len(data): if 0x20 <= data[i] < 0x7F: j = i while j < len(data) and 0x20 <= data[j] < 0x7F: j += 1 if j - i >= 4: s = data[i:j].decode("ascii", errors="replace") strings_found.append((i, s)) i = j else: i += 1 log.warning("cfg_diag: %d strings found:", len(strings_found)) for off, s in strings_found[:40]: # cap at 40 to avoid log spam log.warning(" cfg[%04d/0x%04X] str %r", off, off, s[:60]) # ── Float32 BE values in plausible physical ranges ───────────────────────── log.warning("cfg_diag: float32_BE scan (plausible record_time / sample_rate / trigger):") for i in range(len(data) - 3): try: v = _struct.unpack_from(">f", data, i)[0] except Exception: continue if v != v or v == float("inf") or v == float("-inf"): continue tag = "" if 0.5 <= v <= 30.0: tag = "RECORD_TIME?" elif 200 <= v <= 8192: tag = "SAMPLE_RATE?" elif 0.003 <= v <= 5.0: tag = "TRIGGER/ALARM?" if tag: log.warning(" cfg[%04d/0x%04X] f32_BE=%10.4f %s", i, i, v, tag) # ── uint16 BE/LE scan for known sample rates ────────────────────────────── known_rates = {256, 512, 1024, 2048, 4096} log.warning("cfg_diag: uint16 scan for sample rates %s:", known_rates) for i in range(len(data) - 1): u16_le = _struct.unpack_from("H", data, i)[0] if u16_le in known_rates: log.warning(" cfg[%04d/0x%04X] uint16_LE=%d", i, i, u16_le) if u16_be in known_rates and u16_be != u16_le: log.warning(" cfg[%04d/0x%04X] uint16_BE=%d", i, i, u16_be)