record_time (float32_BE) and sample_rate (uint16_BE) both validated against live device across normal / fast / faster modes and multiple record time settings. Diagnostic scaffolding no longer needed.
707 lines
28 KiB
Python
707 lines
28 KiB
Python
"""
|
||
client.py — MiniMateClient: the top-level public API for the library.
|
||
|
||
Combines transport, protocol, and model decoding into a single easy-to-use
|
||
class. This is the only layer that the SFM server (sfm/server.py) imports
|
||
directly.
|
||
|
||
Design: stateless per-call (connect → do work → disconnect).
|
||
The client does not hold an open connection between calls. This keeps the
|
||
first implementation simple and matches Blastware's observed behaviour.
|
||
Persistent connections can be added later without changing the public API.
|
||
|
||
Example (serial):
|
||
from minimateplus import MiniMateClient
|
||
|
||
with MiniMateClient("COM5") as device:
|
||
info = device.connect() # POLL handshake + identity read
|
||
events = device.get_events() # download all events
|
||
|
||
Example (TCP / modem):
|
||
from minimateplus import MiniMateClient
|
||
from minimateplus.transport import TcpTransport
|
||
|
||
transport = TcpTransport("203.0.113.5", port=12345)
|
||
with MiniMateClient(transport=transport) as device:
|
||
info = device.connect()
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import struct
|
||
from typing import Optional
|
||
|
||
from .framing import S3Frame
|
||
from .models import (
|
||
ComplianceConfig,
|
||
DeviceInfo,
|
||
Event,
|
||
PeakValues,
|
||
ProjectInfo,
|
||
Timestamp,
|
||
)
|
||
from .protocol import MiniMateProtocol, ProtocolError
|
||
from .protocol import (
|
||
SUB_SERIAL_NUMBER,
|
||
SUB_FULL_CONFIG,
|
||
)
|
||
from .transport import SerialTransport, BaseTransport
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
|
||
# ── MiniMateClient ────────────────────────────────────────────────────────────
|
||
|
||
class MiniMateClient:
|
||
"""
|
||
High-level client for a single MiniMate Plus device.
|
||
|
||
Args:
|
||
port: Serial port name (e.g. "COM5", "/dev/ttyUSB0").
|
||
Not required when a pre-built transport is provided.
|
||
baud: Baud rate (default 38400, ignored when transport is provided).
|
||
timeout: Per-request receive timeout in seconds (default 15.0).
|
||
transport: Pre-built transport (SerialTransport or TcpTransport).
|
||
If None, a SerialTransport is constructed from port/baud.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
port: str = "",
|
||
baud: int = 38_400,
|
||
timeout: float = 15.0,
|
||
transport: Optional[BaseTransport] = None,
|
||
) -> None:
|
||
self.port = port
|
||
self.baud = baud
|
||
self.timeout = timeout
|
||
self._transport: Optional[BaseTransport] = transport
|
||
self._proto: Optional[MiniMateProtocol] = None
|
||
|
||
# ── Connection lifecycle ──────────────────────────────────────────────────
|
||
|
||
def open(self) -> None:
|
||
"""Open the transport connection."""
|
||
if self._transport is None:
|
||
self._transport = SerialTransport(self.port, self.baud)
|
||
if not self._transport.is_connected:
|
||
self._transport.connect()
|
||
self._proto = MiniMateProtocol(self._transport, recv_timeout=self.timeout)
|
||
|
||
def close(self) -> None:
|
||
"""Close the transport connection."""
|
||
if self._transport and self._transport.is_connected:
|
||
self._transport.disconnect()
|
||
self._proto = None
|
||
|
||
@property
|
||
def is_open(self) -> bool:
|
||
return bool(self._transport and self._transport.is_connected)
|
||
|
||
# ── Context manager ───────────────────────────────────────────────────────
|
||
|
||
def __enter__(self) -> "MiniMateClient":
|
||
self.open()
|
||
return self
|
||
|
||
def __exit__(self, *_) -> None:
|
||
self.close()
|
||
|
||
# ── Public API ────────────────────────────────────────────────────────────
|
||
|
||
def connect(self) -> DeviceInfo:
|
||
"""
|
||
Perform the startup handshake and read device identity + compliance config.
|
||
|
||
Opens the connection if not already open.
|
||
|
||
Reads:
|
||
1. POLL handshake (startup)
|
||
2. SUB 15 — serial number
|
||
3. SUB 01 — full config block (firmware, model strings)
|
||
4. SUB 1A — compliance config (record time, trigger/alarm levels, project strings)
|
||
|
||
Returns:
|
||
Populated DeviceInfo with compliance_config cached.
|
||
|
||
Raises:
|
||
ProtocolError: on any communication failure.
|
||
"""
|
||
if not self.is_open:
|
||
self.open()
|
||
|
||
proto = self._require_proto()
|
||
|
||
log.info("connect: POLL startup")
|
||
proto.startup()
|
||
|
||
log.info("connect: reading serial number (SUB 15)")
|
||
sn_data = proto.read(SUB_SERIAL_NUMBER)
|
||
device_info = _decode_serial_number(sn_data)
|
||
|
||
log.info("connect: reading full config (SUB 01)")
|
||
cfg_data = proto.read(SUB_FULL_CONFIG)
|
||
_decode_full_config_into(cfg_data, device_info)
|
||
|
||
log.info("connect: reading compliance config (SUB 1A)")
|
||
try:
|
||
cc_data = proto.read_compliance_config()
|
||
_decode_compliance_config_into(cc_data, device_info)
|
||
except ProtocolError as exc:
|
||
log.warning("connect: compliance config read failed: %s — continuing", exc)
|
||
|
||
log.info("connect: %s", device_info)
|
||
return device_info
|
||
|
||
def get_events(self, include_waveforms: bool = True, debug: bool = False) -> list[Event]:
|
||
"""
|
||
Download all stored events from the device using the confirmed
|
||
1E → 0A → 0C → 1F event-iterator protocol.
|
||
|
||
Sequence (confirmed from 3-31-26 Blastware capture):
|
||
1. SUB 1E — get first waveform key
|
||
2. For each key until b'\\x00\\x00\\x00\\x00':
|
||
a. SUB 0A — waveform header (first event only, to confirm full record)
|
||
b. SUB 0C — full waveform record (peak values, project strings)
|
||
c. SUB 1F — advance to next key (token=0xFE skips partial bins)
|
||
|
||
Subsequent keys returned by 1F (token=0xFE) are guaranteed to be full
|
||
records, so 0A is only called for the first event. This exactly
|
||
matches Blastware's observed behaviour.
|
||
|
||
Raw ADC waveform samples (SUB 5A bulk stream) are NOT downloaded
|
||
here — they are large (several MB per event) and fetched separately.
|
||
include_waveforms is reserved for a future call.
|
||
|
||
Returns:
|
||
List of Event objects, one per stored waveform record.
|
||
|
||
Raises:
|
||
ProtocolError: on unrecoverable communication failure.
|
||
"""
|
||
proto = self._require_proto()
|
||
|
||
log.info("get_events: requesting first event (SUB 1E)")
|
||
try:
|
||
key4, _event_data8 = proto.read_event_first()
|
||
except ProtocolError as exc:
|
||
raise ProtocolError(f"get_events: 1E failed: {exc}") from exc
|
||
|
||
if key4 == b"\x00\x00\x00\x00":
|
||
log.info("get_events: device reports no stored events")
|
||
return []
|
||
|
||
events: list[Event] = []
|
||
idx = 0
|
||
is_first = True
|
||
|
||
while key4 != b"\x00\x00\x00\x00":
|
||
log.info(
|
||
"get_events: record %d key=%s", idx, key4.hex()
|
||
)
|
||
ev = Event(index=idx)
|
||
|
||
# First event: call 0A to verify it's a full record (0x30 length).
|
||
# Subsequent keys come from 1F(0xFE) which guarantees full records,
|
||
# so we skip 0A for those — exactly matching Blastware behaviour.
|
||
proceed = True
|
||
if is_first:
|
||
try:
|
||
_hdr, rec_len = proto.read_waveform_header(key4)
|
||
if rec_len < 0x30:
|
||
log.warning(
|
||
"get_events: first key=%s is partial (len=0x%02X) — skipping",
|
||
key4.hex(), rec_len,
|
||
)
|
||
proceed = False
|
||
except ProtocolError as exc:
|
||
log.warning(
|
||
"get_events: 0A failed for key=%s: %s — skipping 0C",
|
||
key4.hex(), exc,
|
||
)
|
||
proceed = False
|
||
is_first = False
|
||
|
||
if proceed:
|
||
# SUB 0C — full waveform record (peak values, project strings)
|
||
try:
|
||
record = proto.read_waveform_record(key4)
|
||
if debug:
|
||
ev._raw_record = record
|
||
_decode_waveform_record_into(record, ev)
|
||
except ProtocolError as exc:
|
||
log.warning(
|
||
"get_events: 0C failed for key=%s: %s", key4.hex(), exc
|
||
)
|
||
|
||
events.append(ev)
|
||
idx += 1
|
||
|
||
# SUB 1F — advance to the next full waveform record key
|
||
try:
|
||
key4 = proto.advance_event()
|
||
except ProtocolError as exc:
|
||
log.warning("get_events: 1F failed: %s — stopping iteration", exc)
|
||
break
|
||
|
||
log.info("get_events: downloaded %d event(s)", len(events))
|
||
return events
|
||
|
||
# ── Internal helpers ──────────────────────────────────────────────────────
|
||
|
||
def _require_proto(self) -> MiniMateProtocol:
|
||
if self._proto is None:
|
||
raise RuntimeError("MiniMateClient is not connected. Call open() first.")
|
||
return self._proto
|
||
|
||
|
||
# ── Decoder functions ─────────────────────────────────────────────────────────
|
||
#
|
||
# Pure functions: bytes → model field population.
|
||
# Kept here (not in models.py) to isolate protocol knowledge from data shapes.
|
||
|
||
def _decode_serial_number(data: bytes) -> DeviceInfo:
|
||
"""
|
||
Decode SUB EA (SERIAL_NUMBER_RESPONSE) payload into a new DeviceInfo.
|
||
|
||
Layout (10 bytes total per §7.2):
|
||
bytes 0–7: serial string, null-terminated, null-padded ("BE18189\\x00")
|
||
byte 8: unit-specific trailing byte (purpose unknown ❓)
|
||
byte 9: firmware minor version (0x11 = 17) ✅
|
||
|
||
Returns:
|
||
New DeviceInfo with serial, firmware_minor, serial_trail_0 populated.
|
||
"""
|
||
# data is data_rsp.data = payload[5:]. The 11-byte section header occupies
|
||
# data[0..10]: [LENGTH_ECHO:1][00×4][KEY_ECHO:4][00×2].
|
||
# Actual serial payload starts at data[11].
|
||
actual = data[11:] if len(data) > 11 else data
|
||
|
||
if len(actual) < 9:
|
||
# Short payload — gracefully degrade
|
||
serial = actual.rstrip(b"\x00").decode("ascii", errors="replace")
|
||
return DeviceInfo(serial=serial, firmware_minor=0)
|
||
|
||
serial = actual[:8].rstrip(b"\x00").decode("ascii", errors="replace")
|
||
trail_0 = actual[8] if len(actual) > 8 else None
|
||
fw_minor = actual[9] if len(actual) > 9 else 0
|
||
|
||
return DeviceInfo(
|
||
serial=serial,
|
||
firmware_minor=fw_minor,
|
||
serial_trail_0=trail_0,
|
||
)
|
||
|
||
|
||
def _decode_full_config_into(data: bytes, info: DeviceInfo) -> None:
|
||
"""
|
||
Decode SUB FE (FULL_CONFIG_RESPONSE) payload into an existing DeviceInfo.
|
||
|
||
The FE response arrives as a composite S3 outer frame whose data section
|
||
contains inner DLE-framed sub-frames. Because of this nesting the §7.3
|
||
fixed offsets (0x34, 0x3C, 0x44, 0x6D) are unreliable — they assume a
|
||
clean non-nested payload starting at byte 0.
|
||
|
||
Instead we search the whole byte array for known ASCII patterns. The
|
||
strings are long enough to be unique in any reasonable payload.
|
||
|
||
Modifies info in-place.
|
||
"""
|
||
def _extract(needle: bytes, max_len: int = 32) -> Optional[str]:
|
||
"""Return the null-terminated ASCII string that starts with *needle*."""
|
||
pos = data.find(needle)
|
||
if pos < 0:
|
||
return None
|
||
end = pos
|
||
while end < len(data) and data[end] != 0 and (end - pos) < max_len:
|
||
end += 1
|
||
s = data[pos:end].decode("ascii", errors="replace").strip()
|
||
return s or None
|
||
|
||
# ── Manufacturer and model are straightforward literal matches ────────────
|
||
info.manufacturer = _extract(b"Instantel")
|
||
info.model = _extract(b"MiniMate Plus")
|
||
|
||
# ── Firmware version: "S3xx.xx" — scan for the 'S3' prefix ───────────────
|
||
for i in range(len(data) - 5):
|
||
if data[i] == ord('S') and data[i + 1] == ord('3') and chr(data[i + 2]).isdigit():
|
||
end = i
|
||
while end < len(data) and data[end] not in (0, 0x20) and (end - i) < 12:
|
||
end += 1
|
||
candidate = data[i:end].decode("ascii", errors="replace").strip()
|
||
if "." in candidate and len(candidate) >= 5:
|
||
info.firmware_version = candidate
|
||
break
|
||
|
||
# ── DSP version: numeric "xx.xx" — search for known prefixes ─────────────
|
||
for prefix in (b"10.", b"11.", b"12.", b"9.", b"8."):
|
||
pos = data.find(prefix)
|
||
if pos < 0:
|
||
continue
|
||
end = pos
|
||
while end < len(data) and data[end] not in (0, 0x20) and (end - pos) < 8:
|
||
end += 1
|
||
candidate = data[pos:end].decode("ascii", errors="replace").strip()
|
||
# Accept only strings that look like "digits.digits"
|
||
if "." in candidate and all(c in "0123456789." for c in candidate):
|
||
info.dsp_version = candidate
|
||
break
|
||
|
||
|
||
def _decode_event_count(data: bytes) -> int:
|
||
"""
|
||
Extract stored event count from SUB F7 (EVENT_INDEX_RESPONSE) payload.
|
||
|
||
Layout per §7.4 (offsets from data section start):
|
||
+00: 00 58 09 — total index size or record count ❓
|
||
+03: 00 00 00 01 — possibly stored event count = 1 ❓
|
||
|
||
We use bytes +03..+06 interpreted as uint32 BE as the event count.
|
||
This is inferred (🔶) — the exact meaning of the first 3 bytes is unclear.
|
||
"""
|
||
if len(data) < 7:
|
||
log.warning("event index payload too short (%d bytes), assuming 0 events", len(data))
|
||
return 0
|
||
|
||
# Try the uint32 at +3 first
|
||
count = struct.unpack_from(">I", data, 3)[0]
|
||
|
||
# Sanity check: MiniMate Plus manual says max ~1000 events
|
||
if count > 1000:
|
||
log.warning(
|
||
"event count %d looks unreasonably large — clamping to 0", count
|
||
)
|
||
return 0
|
||
|
||
return count
|
||
|
||
|
||
def _decode_event_header_into(data: bytes, event: Event) -> None:
|
||
"""
|
||
Decode SUB E1 (EVENT_HEADER_RESPONSE) raw data section into an Event.
|
||
|
||
The waveform key is at data[11:15] (extracted separately in
|
||
MiniMateProtocol.read_event_first). The remaining 4 bytes at
|
||
data[15:19] are not yet decoded (❓ — possibly sample rate or flags).
|
||
|
||
Date information (year/month/day) lives in the waveform record (SUB 0C),
|
||
not in the 1E response. This function is a placeholder for any future
|
||
metadata we decode from the 8-byte 1E data block.
|
||
|
||
Modifies event in-place.
|
||
"""
|
||
# Nothing confirmed yet from the 8-byte data block beyond the key at [0:4].
|
||
# Leave event.timestamp as None — it will be populated from the 0C record.
|
||
pass
|
||
|
||
|
||
def _decode_waveform_record_into(data: bytes, event: Event) -> None:
|
||
"""
|
||
Decode a 210-byte SUB F3 (FULL_WAVEFORM_RECORD) record into an Event.
|
||
|
||
The *data* argument is the raw record bytes returned by
|
||
MiniMateProtocol.read_waveform_record() — i.e. data_rsp.data[11:11+0xD2].
|
||
|
||
Extracts (all ✅ confirmed 2026-04-01 against Blastware event report):
|
||
- timestamp: 9-byte format at bytes [0:9]
|
||
- record_type: sub_code at byte[1] (0x10 = "Waveform")
|
||
- peak_values: label-based float32 at label+6 for Tran/Vert/Long/MicL
|
||
- peak_vector_sum: IEEE 754 BE float at offset 87
|
||
- project_info: "Project:", "Client:", etc. string search
|
||
|
||
Modifies event in-place.
|
||
"""
|
||
# ── Timestamp ─────────────────────────────────────────────────────────────
|
||
# 9-byte format: [day][sub_code][month][year:2 BE][unknown][hour][min][sec]
|
||
try:
|
||
event.timestamp = Timestamp.from_waveform_record(data)
|
||
except Exception as exc:
|
||
log.warning("waveform record timestamp decode failed: %s", exc)
|
||
|
||
# ── Record type ───────────────────────────────────────────────────────────
|
||
# Decoded from byte[1] (sub_code), not from ASCII string search
|
||
try:
|
||
event.record_type = _extract_record_type(data)
|
||
except Exception as exc:
|
||
log.warning("waveform record type decode failed: %s", exc)
|
||
|
||
# ── Peak values (per-channel PPV + Peak Vector Sum) ───────────────────────
|
||
try:
|
||
peak_values = _extract_peak_floats(data)
|
||
if peak_values:
|
||
event.peak_values = peak_values
|
||
except Exception as exc:
|
||
log.warning("waveform record peak decode failed: %s", exc)
|
||
|
||
# ── Project strings ───────────────────────────────────────────────────────
|
||
try:
|
||
project_info = _extract_project_strings(data)
|
||
if project_info:
|
||
event.project_info = project_info
|
||
except Exception as exc:
|
||
log.warning("waveform record project strings decode failed: %s", exc)
|
||
|
||
|
||
def _extract_record_type(data: bytes) -> Optional[str]:
|
||
"""
|
||
Decode the recording mode from byte[1] of the 210-byte waveform record.
|
||
|
||
Byte[1] is the sub-record code that immediately follows the day byte in the
|
||
9-byte timestamp header at the start of each waveform record:
|
||
[day:1] [sub_code:1] [month:1] [year:2 BE] ...
|
||
|
||
Confirmed codes (✅ 2026-04-01):
|
||
0x10 → "Waveform" (continuous / single-shot mode)
|
||
|
||
Histogram mode code is not yet confirmed — a histogram event must be
|
||
captured with debug=true to identify it. Returns None for unknown codes.
|
||
"""
|
||
if len(data) < 2:
|
||
return None
|
||
code = data[1]
|
||
if code == 0x10:
|
||
return "Waveform"
|
||
# TODO: add histogram sub_code once a histogram event is captured with debug=true
|
||
return None
|
||
|
||
|
||
def _extract_peak_floats(data: bytes) -> Optional[PeakValues]:
|
||
"""
|
||
Locate per-channel peak particle velocity values in the 210-byte
|
||
waveform record by searching for the embedded channel label strings
|
||
("Tran", "Vert", "Long", "MicL") and reading the IEEE 754 BE float
|
||
at label_offset + 6.
|
||
|
||
The floats are NOT 4-byte aligned in the record (confirmed from
|
||
3-31-26 capture), so the previous step-4 scan missed Tran, Long, and
|
||
MicL entirely. Label-based lookup is the correct approach.
|
||
|
||
Channel labels are separated by inner-frame bytes (0x10 0x03 = DLE ETX),
|
||
which the S3FrameParser preserves as literal data. Searching for the
|
||
4-byte ASCII label strings is robust to this structure.
|
||
|
||
Returns PeakValues if at least one channel label is found, else None.
|
||
"""
|
||
# (label_bytes, field_name)
|
||
channels = (
|
||
(b"Tran", "tran"),
|
||
(b"Vert", "vert"),
|
||
(b"Long", "long_"),
|
||
(b"MicL", "micl"),
|
||
)
|
||
vals: dict[str, float] = {}
|
||
|
||
for label_bytes, field in channels:
|
||
pos = data.find(label_bytes)
|
||
if pos < 0:
|
||
continue
|
||
float_off = pos + 6
|
||
if float_off + 4 > len(data):
|
||
log.debug("peak float: label %s at %d but float runs past end", label_bytes, pos)
|
||
continue
|
||
try:
|
||
val = struct.unpack_from(">f", data, float_off)[0]
|
||
except struct.error:
|
||
continue
|
||
log.debug("peak float: %s at label+6 (%d) = %.6f", label_bytes.decode(), float_off, val)
|
||
vals[field] = val
|
||
|
||
if not vals:
|
||
return None
|
||
|
||
# ── Peak Vector Sum — fixed offset 87 (✅ confirmed 2026-04-01) ───────────
|
||
# = √(Tran² + Vert² + Long²) at the sample instant of maximum combined geo
|
||
# motion, NOT the vector sum of the three per-channel peak values (which may
|
||
# occur at different times). Matches Blastware "Peak Vector Sum" exactly.
|
||
pvs: Optional[float] = None
|
||
if len(data) > 91:
|
||
try:
|
||
pvs = struct.unpack_from(">f", data, 87)[0]
|
||
except struct.error:
|
||
pass
|
||
|
||
return PeakValues(
|
||
tran=vals.get("tran"),
|
||
vert=vals.get("vert"),
|
||
long=vals.get("long_"),
|
||
micl=vals.get("micl"),
|
||
peak_vector_sum=pvs,
|
||
)
|
||
|
||
|
||
def _extract_project_strings(data: bytes) -> Optional[ProjectInfo]:
|
||
"""
|
||
Search the waveform record payload for known ASCII label strings
|
||
("Project:", "Client:", "User Name:", "Seis Loc:", "Extended Notes")
|
||
and extract the associated value strings that follow them.
|
||
|
||
Layout (per §7.5): each entry is [label ~16 bytes][value ~32 bytes],
|
||
null-padded. We find the label, then read the next non-null chars.
|
||
"""
|
||
def _find_string_after(needle: bytes, max_value_len: int = 64) -> Optional[str]:
|
||
pos = data.find(needle)
|
||
if pos < 0:
|
||
return None
|
||
# Skip the label (including null padding) until we find a non-null value
|
||
# The value starts at pos+len(needle), but may have a gap of null bytes
|
||
value_start = pos + len(needle)
|
||
# Skip nulls
|
||
while value_start < len(data) and data[value_start] == 0:
|
||
value_start += 1
|
||
if value_start >= len(data):
|
||
return None
|
||
# Read until null terminator or max_value_len
|
||
end = value_start
|
||
while end < len(data) and data[end] != 0 and (end - value_start) < max_value_len:
|
||
end += 1
|
||
value = data[value_start:end].decode("ascii", errors="replace").strip()
|
||
return value or None
|
||
|
||
project = _find_string_after(b"Project:")
|
||
client = _find_string_after(b"Client:")
|
||
operator = _find_string_after(b"User Name:")
|
||
location = _find_string_after(b"Seis Loc:")
|
||
notes = _find_string_after(b"Extended Notes")
|
||
|
||
if not any([project, client, operator, location, notes]):
|
||
return None
|
||
|
||
return ProjectInfo(
|
||
project=project,
|
||
client=client,
|
||
operator=operator,
|
||
sensor_location=location,
|
||
notes=notes,
|
||
)
|
||
|
||
|
||
def _decode_compliance_config_into(data: bytes, info: DeviceInfo) -> None:
|
||
"""
|
||
Decode a 2090-byte SUB 1A (COMPLIANCE_CONFIG) response into a ComplianceConfig.
|
||
|
||
The *data* argument is the raw bytes returned by read_compliance_config()
|
||
(frames B+C+D concatenated, echo headers stripped).
|
||
|
||
Confirmed field locations (BE11529 with 3-step read, duplicate detection):
|
||
- cfg[89] = setup_name: first long ASCII string in cfg[40:250] ✅
|
||
- ANCHOR = b'\\x01\\x2c\\x00\\x00\\xbe\\x80\\x00\\x00\\x00\\x00' in cfg[40:100] ✅
|
||
- anchor - 2 = sample_rate uint16_BE (1024 normal / 2048 fast / 4096 faster)
|
||
- anchor + 10 = record_time float32_BE
|
||
- "Project:" needle → project string
|
||
- "Client:" needle → client string
|
||
- "User Name:" needle → operator string
|
||
- "Seis Loc:" needle → sensor_location string
|
||
- "Extended Notes" needle → notes string
|
||
|
||
Anchor approach is required because a DLE byte in the sample_rate field
|
||
(4096 = 0x1000 → stored as 10 10 00 in raw S3 frame → unstuffed to 10 00,
|
||
1 byte shorter than 04 00 or 08 00) causes frame C to be 1 byte shorter
|
||
for "faster" mode, shifting all subsequent offsets by 1. The 10-byte
|
||
anchor is stable across all modes.
|
||
|
||
Modifies info.compliance_config in-place.
|
||
"""
|
||
if not data or len(data) < 40:
|
||
log.warning("compliance config payload too short (%d bytes)", len(data))
|
||
return
|
||
|
||
config = ComplianceConfig(raw=data)
|
||
|
||
# ── Setup name ────────────────────────────────────────────────────────────
|
||
# The setup_name IS the string itself — it is NOT a label followed by a value.
|
||
# It appears as the first long (>=8 char) ASCII string in cfg[40:250].
|
||
# The preceding bytes vary by device (cfg[88]=0x01 on BE11529); the string
|
||
# itself is null-terminated.
|
||
try:
|
||
setup_name = _find_first_string(data, start=40, end=250, min_len=8)
|
||
config.setup_name = setup_name
|
||
if setup_name:
|
||
log.debug("compliance_config: setup_name = %r", setup_name)
|
||
except Exception as exc:
|
||
log.warning("compliance_config: setup_name extraction failed: %s", exc)
|
||
|
||
# ── Record time + sample rate — anchor-relative ───────────────────────────
|
||
# The 10-byte anchor sits between sample_rate and record_time in the cfg.
|
||
# Absolute offsets are NOT reliable because sample_rate = 4096 (0x1000) is
|
||
# DLE-escaped in the raw S3 frame (10 10 00 → 10 00 after unstuffing),
|
||
# making frame C 1 byte shorter than for 1024/2048 and shifting everything.
|
||
# sample_rate: uint16_BE at anchor - 2
|
||
# record_time: float32_BE at anchor + 10
|
||
_ANCHOR = b'\x01\x2c\x00\x00\xbe\x80\x00\x00\x00\x00'
|
||
_anchor = data.find(_ANCHOR, 40, 100)
|
||
if _anchor >= 2 and _anchor + 14 <= len(data):
|
||
try:
|
||
config.sample_rate = struct.unpack_from(">H", data, _anchor - 2)[0]
|
||
log.debug(
|
||
"compliance_config: sample_rate = %d Sa/s (anchor@%d)", config.sample_rate, _anchor
|
||
)
|
||
except Exception as exc:
|
||
log.warning("compliance_config: sample_rate extraction failed: %s", exc)
|
||
try:
|
||
config.record_time = struct.unpack_from(">f", data, _anchor + 10)[0]
|
||
log.debug(
|
||
"compliance_config: record_time = %.3f s (anchor@%d)", config.record_time, _anchor
|
||
)
|
||
except Exception as exc:
|
||
log.warning("compliance_config: record_time extraction failed: %s", exc)
|
||
else:
|
||
log.warning(
|
||
"compliance_config: anchor %s not found in cfg[40:100] (len=%d) "
|
||
"— sample_rate and record_time will be None",
|
||
_ANCHOR.hex(), len(data),
|
||
)
|
||
|
||
# ── Project strings ───────────────────────────────────────────────────────
|
||
try:
|
||
def _find_string_after(needle: bytes, max_len: int = 64) -> Optional[str]:
|
||
pos = data.find(needle)
|
||
if pos < 0:
|
||
return None
|
||
value_start = pos + len(needle)
|
||
while value_start < len(data) and data[value_start] == 0:
|
||
value_start += 1
|
||
if value_start >= len(data):
|
||
return None
|
||
end = value_start
|
||
while end < len(data) and data[end] != 0 and (end - value_start) < max_len:
|
||
end += 1
|
||
s = data[value_start:end].decode("ascii", errors="replace").strip()
|
||
return s or None
|
||
|
||
config.project = _find_string_after(b"Project:")
|
||
config.client = _find_string_after(b"Client:")
|
||
config.operator = _find_string_after(b"User Name:")
|
||
config.sensor_location = _find_string_after(b"Seis Loc:")
|
||
config.notes = _find_string_after(b"Extended Notes")
|
||
|
||
if config.project:
|
||
log.debug("compliance_config: project = %s", config.project)
|
||
if config.client:
|
||
log.debug("compliance_config: client = %s", config.client)
|
||
except Exception as exc:
|
||
log.warning("compliance_config: project string extraction failed: %s", exc)
|
||
|
||
info.compliance_config = config
|
||
|
||
|
||
def _find_first_string(data: bytes, start: int, end: int, min_len: int) -> Optional[str]:
|
||
"""
|
||
Return the first null-terminated printable ASCII string of length >= min_len
|
||
found in data[start:end].
|
||
"""
|
||
i = start
|
||
end = min(end, len(data))
|
||
while i < end:
|
||
if 0x20 <= data[i] < 0x7F:
|
||
j = i
|
||
while j < len(data) and 0x20 <= data[j] < 0x7F:
|
||
j += 1
|
||
if j - i >= min_len:
|
||
return data[i:j].decode("ascii", errors="replace").strip()
|
||
i = j + 1
|
||
else:
|
||
i += 1
|
||
return None
|
||
|