Files
seismo-relay/minimateplus/client.py
2026-03-30 23:46:34 -04:00

478 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
client.py — MiniMateClient: the top-level public API for the library.
Combines transport, protocol, and model decoding into a single easy-to-use
class. This is the only layer that the SFM server (sfm/server.py) imports
directly.
Design: stateless per-call (connect → do work → disconnect).
The client does not hold an open connection between calls. This keeps the
first implementation simple and matches Blastware's observed behaviour.
Persistent connections can be added later without changing the public API.
Example:
from minimateplus import MiniMateClient
with MiniMateClient("COM5") as device:
info = device.connect() # POLL handshake + identity read
events = device.get_events() # download all events
print(info)
for ev in events:
print(ev)
"""
from __future__ import annotations
import logging
import struct
from typing import Optional
from .framing import S3Frame
from .models import (
DeviceInfo,
Event,
PeakValues,
ProjectInfo,
Timestamp,
)
from .protocol import MiniMateProtocol, ProtocolError
from .protocol import (
SUB_SERIAL_NUMBER,
SUB_FULL_CONFIG,
SUB_EVENT_INDEX,
SUB_EVENT_HEADER,
SUB_WAVEFORM_RECORD,
)
from .transport import SerialTransport, BaseTransport
log = logging.getLogger(__name__)
# ── MiniMateClient ────────────────────────────────────────────────────────────
class MiniMateClient:
"""
High-level client for a single MiniMate Plus device.
Args:
port: Serial port name (e.g. "COM5", "/dev/ttyUSB0").
baud: Baud rate (default 38400).
timeout: Per-request receive timeout in seconds (default 5.0).
transport: Optional pre-built transport (for testing / TCP future use).
If None, a SerialTransport is constructed from port/baud.
"""
def __init__(
self,
port: str,
baud: int = 38_400,
timeout: float = 15.0,
transport: Optional[BaseTransport] = None,
) -> None:
self.port = port
self.baud = baud
self.timeout = timeout
self._transport: Optional[BaseTransport] = transport
self._proto: Optional[MiniMateProtocol] = None
# ── Connection lifecycle ──────────────────────────────────────────────────
def open(self) -> None:
"""Open the transport connection."""
if self._transport is None:
self._transport = SerialTransport(self.port, self.baud)
if not self._transport.is_connected:
self._transport.connect()
self._proto = MiniMateProtocol(self._transport, recv_timeout=self.timeout)
def close(self) -> None:
"""Close the transport connection."""
if self._transport and self._transport.is_connected:
self._transport.disconnect()
self._proto = None
@property
def is_open(self) -> bool:
return bool(self._transport and self._transport.is_connected)
# ── Context manager ───────────────────────────────────────────────────────
def __enter__(self) -> "MiniMateClient":
self.open()
return self
def __exit__(self, *_) -> None:
self.close()
# ── Public API ────────────────────────────────────────────────────────────
def connect(self) -> DeviceInfo:
"""
Perform the startup handshake and read device identity.
Opens the connection if not already open.
Reads:
1. POLL handshake (startup)
2. SUB 15 — serial number
3. SUB 01 — full config block (firmware, model strings)
Returns:
Populated DeviceInfo.
Raises:
ProtocolError: on any communication failure.
"""
if not self.is_open:
self.open()
proto = self._require_proto()
log.info("connect: POLL startup")
proto.startup()
log.info("connect: reading serial number (SUB 15)")
sn_data = proto.read(SUB_SERIAL_NUMBER)
device_info = _decode_serial_number(sn_data)
log.info("connect: reading full config (SUB 01)")
cfg_data = proto.read(SUB_FULL_CONFIG)
_decode_full_config_into(cfg_data, device_info)
log.info("connect: %s", device_info)
return device_info
def get_events(self, include_waveforms: bool = True) -> list[Event]:
"""
Download all stored events from the device.
For each event in the index:
1. SUB 1E — event header (timestamp, sample rate)
2. SUB 0C — full waveform record (peak values, project strings)
Raw ADC waveform samples (SUB 5A bulk stream) are NOT downloaded
here — they can be large. Pass include_waveforms=True to also
download them (not yet implemented, reserved for a future call).
Args:
include_waveforms: Reserved. Currently ignored.
Returns:
List of Event objects, one per stored record on the device.
Raises:
ProtocolError: on any communication failure.
"""
proto = self._require_proto()
log.info("get_events: reading event index (SUB 08)")
index_data = proto.read(SUB_EVENT_INDEX)
event_count = _decode_event_count(index_data)
log.info("get_events: %d event(s) found", event_count)
events: list[Event] = []
for i in range(event_count):
log.info("get_events: downloading event %d/%d", i + 1, event_count)
ev = self._download_event(proto, i)
if ev:
events.append(ev)
return events
# ── Internal helpers ──────────────────────────────────────────────────────
def _require_proto(self) -> MiniMateProtocol:
if self._proto is None:
raise RuntimeError("MiniMateClient is not connected. Call open() first.")
return self._proto
def _download_event(
self, proto: MiniMateProtocol, index: int
) -> Optional[Event]:
"""Download header + waveform record for one event by index."""
ev = Event(index=index)
# SUB 1E — event header (timestamp, sample rate).
#
# The two-step event-header read passes the event index at payload[5]
# of the data-request frame (consistent with all other reads).
# This limits addressing to events 0255 without a multi-byte scheme;
# the MiniMate Plus stores up to ~1000 events, so high indices may need
# a revised approach once we have captured event-download frames.
try:
from .framing import build_bw_frame
from .protocol import _expected_rsp_sub, SUB_EVENT_HEADER
# Step 1 — probe (offset=0)
probe_frame = build_bw_frame(SUB_EVENT_HEADER, 0)
proto._send(probe_frame)
_probe_rsp = proto._recv_one(expected_sub=_expected_rsp_sub(SUB_EVENT_HEADER))
# Step 2 — data request (offset = event index, clamped to 0xFF)
event_offset = min(index, 0xFF)
data_frame = build_bw_frame(SUB_EVENT_HEADER, event_offset)
proto._send(data_frame)
data_rsp = proto._recv_one(expected_sub=_expected_rsp_sub(SUB_EVENT_HEADER))
_decode_event_header_into(data_rsp.data, ev)
except ProtocolError as exc:
log.warning("event %d: header read failed: %s", index, exc)
return ev # Return partial event rather than losing it entirely
# SUB 0C — full waveform record (peak values, project strings).
try:
wf_data = proto.read(SUB_WAVEFORM_RECORD)
_decode_waveform_record_into(wf_data, ev)
except ProtocolError as exc:
log.warning("event %d: waveform record read failed: %s", index, exc)
return ev
# ── Decoder functions ─────────────────────────────────────────────────────────
#
# Pure functions: bytes → model field population.
# Kept here (not in models.py) to isolate protocol knowledge from data shapes.
def _decode_serial_number(data: bytes) -> DeviceInfo:
"""
Decode SUB EA (SERIAL_NUMBER_RESPONSE) payload into a new DeviceInfo.
Layout (10 bytes total per §7.2):
bytes 07: serial string, null-terminated, null-padded ("BE18189\\x00")
byte 8: unit-specific trailing byte (purpose unknown ❓)
byte 9: firmware minor version (0x11 = 17) ✅
Returns:
New DeviceInfo with serial, firmware_minor, serial_trail_0 populated.
"""
if len(data) < 9:
# Short payload — gracefully degrade
serial = data.rstrip(b"\x00").decode("ascii", errors="replace")
return DeviceInfo(serial=serial, firmware_minor=0)
serial = data[:8].rstrip(b"\x00").decode("ascii", errors="replace")
trail_0 = data[8] if len(data) > 8 else None
fw_minor = data[9] if len(data) > 9 else 0
return DeviceInfo(
serial=serial,
firmware_minor=fw_minor,
serial_trail_0=trail_0,
)
def _decode_full_config_into(data: bytes, info: DeviceInfo) -> None:
"""
Decode SUB FE (FULL_CONFIG_RESPONSE) payload into an existing DeviceInfo.
The FE response arrives as a composite S3 outer frame whose data section
contains inner DLE-framed sub-frames. Because of this nesting the §7.3
fixed offsets (0x34, 0x3C, 0x44, 0x6D) are unreliable — they assume a
clean non-nested payload starting at byte 0.
Instead we search the whole byte array for known ASCII patterns. The
strings are long enough to be unique in any reasonable payload.
Modifies info in-place.
"""
def _extract(needle: bytes, max_len: int = 32) -> Optional[str]:
"""Return the null-terminated ASCII string that starts with *needle*."""
pos = data.find(needle)
if pos < 0:
return None
end = pos
while end < len(data) and data[end] != 0 and (end - pos) < max_len:
end += 1
s = data[pos:end].decode("ascii", errors="replace").strip()
return s or None
# ── Manufacturer and model are straightforward literal matches ────────────
info.manufacturer = _extract(b"Instantel")
info.model = _extract(b"MiniMate Plus")
# ── Firmware version: "S3xx.xx" — scan for the 'S3' prefix ───────────────
for i in range(len(data) - 5):
if data[i] == ord('S') and data[i + 1] == ord('3') and chr(data[i + 2]).isdigit():
end = i
while end < len(data) and data[end] not in (0, 0x20) and (end - i) < 12:
end += 1
candidate = data[i:end].decode("ascii", errors="replace").strip()
if "." in candidate and len(candidate) >= 5:
info.firmware_version = candidate
break
# ── DSP version: numeric "xx.xx" — search for known prefixes ─────────────
for prefix in (b"10.", b"11.", b"12.", b"9.", b"8."):
pos = data.find(prefix)
if pos < 0:
continue
end = pos
while end < len(data) and data[end] not in (0, 0x20) and (end - pos) < 8:
end += 1
candidate = data[pos:end].decode("ascii", errors="replace").strip()
# Accept only strings that look like "digits.digits"
if "." in candidate and all(c in "0123456789." for c in candidate):
info.dsp_version = candidate
break
def _decode_event_count(data: bytes) -> int:
"""
Extract stored event count from SUB F7 (EVENT_INDEX_RESPONSE) payload.
Layout per §7.4 (offsets from data section start):
+00: 00 58 09 — total index size or record count ❓
+03: 00 00 00 01 — possibly stored event count = 1 ❓
We use bytes +03..+06 interpreted as uint32 BE as the event count.
This is inferred (🔶) — the exact meaning of the first 3 bytes is unclear.
"""
if len(data) < 7:
log.warning("event index payload too short (%d bytes), assuming 0 events", len(data))
return 0
# Try the uint32 at +3 first
count = struct.unpack_from(">I", data, 3)[0]
# Sanity check: MiniMate Plus manual says max ~1000 events
if count > 1000:
log.warning(
"event count %d looks unreasonably large — clamping to 0", count
)
return 0
return count
def _decode_event_header_into(data: bytes, event: Event) -> None:
"""
Decode SUB E1 (EVENT_HEADER_RESPONSE) into an existing Event.
The 6-byte timestamp is at the start of the data payload.
Sample rate location is not yet confirmed — left as None for now.
Modifies event in-place.
"""
if len(data) < 6:
log.warning("event header payload too short (%d bytes)", len(data))
return
try:
event.timestamp = Timestamp.from_bytes(data[:6])
except ValueError as exc:
log.warning("event header timestamp decode failed: %s", exc)
def _decode_waveform_record_into(data: bytes, event: Event) -> None:
"""
Decode SUB F3 (FULL_WAVEFORM_RECORD) data into an existing Event.
Peak values are stored as IEEE 754 big-endian floats. Confirmed
positions per §7.5 (search for the known float bytes in the payload).
This decoder is intentionally conservative — it searches for the
canonical 4×float32 pattern rather than relying on a fixed offset,
since the exact field layout is only partially confirmed.
Modifies event in-place.
"""
# Attempt to extract four consecutive IEEE 754 BE floats from the
# known region of the payload (offsets are 🔶 INFERRED from captured data)
try:
peak_values = _extract_peak_floats(data)
if peak_values:
event.peak_values = peak_values
except Exception as exc:
log.warning("waveform record peak decode failed: %s", exc)
# Project strings — search for known ASCII labels
try:
project_info = _extract_project_strings(data)
if project_info:
event.project_info = project_info
except Exception as exc:
log.warning("waveform record project strings decode failed: %s", exc)
def _extract_peak_floats(data: bytes) -> Optional[PeakValues]:
"""
Scan the waveform record payload for four sequential float32 BE values
corresponding to Tran, Vert, Long, MicL peak values.
The exact offset is not confirmed (🔶), so we do a heuristic scan:
look for four consecutive 4-byte groups where each decodes as a
plausible PPV value (0 < v < 100 in/s or psi).
Returns PeakValues if a plausible group is found, else None.
"""
# Require at least 16 bytes for 4 floats
if len(data) < 16:
return None
for start in range(0, len(data) - 15, 4):
try:
vals = struct.unpack_from(">4f", data, start)
except struct.error:
continue
# All four values should be non-negative and within plausible PPV range
if all(0.0 <= v < 100.0 for v in vals):
tran, vert, long_, micl = vals
# MicL (psi) is typically much smaller than geo values
# Simple sanity: at least two non-zero values
if sum(v > 0 for v in vals) >= 2:
log.debug(
"peak floats at offset %d: T=%.4f V=%.4f L=%.4f M=%.6f",
start, tran, vert, long_, micl
)
return PeakValues(
tran=tran, vert=vert, long=long_, micl=micl
)
return None
def _extract_project_strings(data: bytes) -> Optional[ProjectInfo]:
"""
Search the waveform record payload for known ASCII label strings
("Project:", "Client:", "User Name:", "Seis Loc:", "Extended Notes")
and extract the associated value strings that follow them.
Layout (per §7.5): each entry is [label ~16 bytes][value ~32 bytes],
null-padded. We find the label, then read the next non-null chars.
"""
def _find_string_after(needle: bytes, max_value_len: int = 64) -> Optional[str]:
pos = data.find(needle)
if pos < 0:
return None
# Skip the label (including null padding) until we find a non-null value
# The value starts at pos+len(needle), but may have a gap of null bytes
value_start = pos + len(needle)
# Skip nulls
while value_start < len(data) and data[value_start] == 0:
value_start += 1
if value_start >= len(data):
return None
# Read until null terminator or max_value_len
end = value_start
while end < len(data) and data[end] != 0 and (end - value_start) < max_value_len:
end += 1
value = data[value_start:end].decode("ascii", errors="replace").strip()
return value or None
project = _find_string_after(b"Project:")
client = _find_string_after(b"Client:")
operator = _find_string_after(b"User Name:")
location = _find_string_after(b"Seis Loc:")
notes = _find_string_after(b"Extended Notes")
if not any([project, client, operator, location, notes]):
return None
return ProjectInfo(
project=project,
client=client,
operator=operator,
sensor_location=location,
notes=notes,
)