feat: implement SUB 1A (compliance config) read

Adds full support for reading device compliance configuration (2090-byte E5
response) containing record time, trigger/alarm levels, and project strings.

protocol.py:
- Implement read_compliance_config() two-step read (SUB 1A → E5)
- Fixed length 0x082A (2090 bytes)

models.py:
- Add ComplianceConfig dataclass with fields: record_time, sample_rate,
  trigger_level_geo, alarm_level_geo, max_range_geo, project strings
- Add compliance_config field to DeviceInfo

client.py:
- Implement _decode_compliance_config_into() to extract:
  * Record time float at offset +0x28 
  * Trigger/alarm levels per-channel (heuristic parsing) 🔶
  * Project/setup strings from E5 payload
  * Placeholder for sample_rate (location TBD )
- Update connect() to read SUB 1A after SUB 01, cache in device_info
- Add ComplianceConfig to imports

sfm/server.py:
- Add _serialise_compliance_config() JSON encoder
- Include compliance_config in /device/info response
- Updated _serialise_device_info() to output compliance config

Both record_time (at fixed offset 0x28) and project strings are  CONFIRMED
from protocol reference §7.6. Trigger/alarm extraction uses heuristics
pending more detailed field mapping from captured data.

Sample rate remains undiscovered in the E5 payload — likely in the
mystery flags at offset +0x12 or requires a "fast mode" capture.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Brian Harrison
2026-04-01 12:08:43 -04:00
parent a8187eccd0
commit 32b9d3050c
4 changed files with 222 additions and 9 deletions

View File

@@ -34,6 +34,7 @@ from typing import Optional
from .framing import S3Frame
from .models import (
ComplianceConfig,
DeviceInfo,
Event,
PeakValues,
@@ -111,7 +112,7 @@ class MiniMateClient:
def connect(self) -> DeviceInfo:
"""
Perform the startup handshake and read device identity.
Perform the startup handshake and read device identity + compliance config.
Opens the connection if not already open.
@@ -119,9 +120,10 @@ class MiniMateClient:
1. POLL handshake (startup)
2. SUB 15 — serial number
3. SUB 01 — full config block (firmware, model strings)
4. SUB 1A — compliance config (record time, trigger/alarm levels, project strings)
Returns:
Populated DeviceInfo.
Populated DeviceInfo with compliance_config cached.
Raises:
ProtocolError: on any communication failure.
@@ -142,6 +144,13 @@ class MiniMateClient:
cfg_data = proto.read(SUB_FULL_CONFIG)
_decode_full_config_into(cfg_data, device_info)
log.info("connect: reading compliance config (SUB 1A)")
try:
cc_data = proto.read_compliance_config()
_decode_compliance_config_into(cc_data, device_info)
except ProtocolError as exc:
log.warning("connect: compliance config read failed: %s — continuing", exc)
log.info("connect: %s", device_info)
return device_info
@@ -565,3 +574,109 @@ def _extract_project_strings(data: bytes) -> Optional[ProjectInfo]:
sensor_location=location,
notes=notes,
)
def _decode_compliance_config_into(data: bytes, info: DeviceInfo) -> None:
"""
Decode a 2090-byte SUB 1A (COMPLIANCE_CONFIG) response into a ComplianceConfig.
The *data* argument is the raw response bytes from read_compliance_config().
Extracts (per §7.6):
- record_time: float32 BE at offset +0x28
- trigger_level / alarm_level per-channel: IEEE 754 BE floats
- project strings: "Project:", "Client:", "User Name:", "Seis Loc:", "Extended Notes"
- sample_rate: NOT YET FOUND ❓
Modifies info.compliance_config in-place.
"""
if not data or len(data) < 0x28:
log.warning("compliance config payload too short (%d bytes)", len(data))
return
config = ComplianceConfig(raw=data)
# ── Record Time (✅ CONFIRMED at §7.6.1) ──────────────────────────────────
try:
# Record time is at offset +0x28 within the data payload (NOT from frame start)
# This is the second page of the paged read response.
if len(data) > 0x28 + 4:
record_time = struct.unpack_from(">f", data, 0x28)[0]
config.record_time = record_time
log.debug("compliance_config: record_time = %.1f sec", record_time)
except struct.error:
log.warning("compliance_config: failed to unpack record_time at offset 0x28")
# ── Per-channel trigger/alarm levels (✅ CONFIRMED at §7.6) ─────────────────
# Layout repeats per channel: [padding][max_range][padding][trigger]["in.\0"][alarm]["/s\0\0"][flag][label]
# For now, extract just the first geo channel (Transverse) as a representative.
try:
# Search for the "Tran" label to locate the first geo channel block
tran_pos = data.find(b"Tran")
if tran_pos > 0:
# Work backward from the label to find trigger and alarm
# From §7.6: trigger float is ~20 bytes before the label, alarm is ~8 bytes before
# Exact structure: [padding2][trigger_float][unit "in."][alarm_float][unit "/s"]
# We'll search backward for the last float before the label
# The trigger/alarm block format is complex; for now use heuristics
# Look for trigger level at a few standard offsets relative to label
# From protocol: trigger is usually at label_offset - 14
if tran_pos >= 14:
try:
trigger = struct.unpack_from(">f", data, tran_pos - 14)[0]
config.trigger_level_geo = trigger
log.debug("compliance_config: trigger_level_geo = %.3f in/s", trigger)
except (struct.error, ValueError):
pass
# Alarm is usually after the "in." unit string following trigger
# Try offset tran_pos - 6
if tran_pos >= 6:
try:
alarm = struct.unpack_from(">f", data, tran_pos - 6)[0]
config.alarm_level_geo = alarm
log.debug("compliance_config: alarm_level_geo = %.3f in/s", alarm)
except (struct.error, ValueError):
pass
except Exception as exc:
log.warning("compliance_config: trigger/alarm extraction failed: %s", exc)
# ── Project strings (from E5 / SUB 71 payload) ────────────────────────────
try:
def _find_string_after(needle: bytes, max_len: int = 64) -> Optional[str]:
pos = data.find(needle)
if pos < 0:
return None
value_start = pos + len(needle)
while value_start < len(data) and data[value_start] == 0:
value_start += 1
if value_start >= len(data):
return None
end = value_start
while end < len(data) and data[end] != 0 and (end - value_start) < max_len:
end += 1
s = data[value_start:end].decode("ascii", errors="replace").strip()
return s or None
config.setup_name = _find_string_after(b"Standard Recording Setup")
config.project = _find_string_after(b"Project:")
config.client = _find_string_after(b"Client:")
config.operator = _find_string_after(b"User Name:")
config.sensor_location = _find_string_after(b"Seis Loc:")
config.notes = _find_string_after(b"Extended Notes")
if config.project:
log.debug("compliance_config: project = %s", config.project)
if config.client:
log.debug("compliance_config: client = %s", config.client)
except Exception as exc:
log.warning("compliance_config: project string extraction failed: %s", exc)
# ── Sample rate (NOT YET FOUND ❓) ────────────────────────────────────────
# The sample rate (1024 sps standard, 2048 sps fast) is not yet located in the
# protocol docs. It may be encoded in mystery flags at offset +0x12 in the .set
# file format, or it may require a separate capture analysis. For now, leave as None.
config.sample_rate = None
info.compliance_config = config

View File

@@ -180,6 +180,9 @@ class DeviceInfo:
manufacturer: Optional[str] = None # e.g. "Instantel" ✅
model: Optional[str] = None # e.g. "MiniMate Plus" ✅
# ── From SUB 1A (COMPLIANCE_CONFIG_RESPONSE) ──────────────────────────────
compliance_config: Optional["ComplianceConfig"] = None # E5 response, read in connect()
def __str__(self) -> str:
fw = self.firmware_version or f"?.{self.firmware_minor}"
mdl = self.model or "MiniMate Plus"
@@ -253,6 +256,44 @@ class ProjectInfo:
notes: Optional[str] = None # extended notes
# ── Compliance Config ──────────────────────────────────────────────────────────
@dataclass
class ComplianceConfig:
"""
Device compliance and recording configuration from SUB 1A (response E5).
Contains device-wide settings like record time, trigger/alarm thresholds,
and operator-supplied strings. This is read once during connect() and
cached in DeviceInfo.
All fields are optional — some may not be decoded yet or may be absent
from the device configuration.
"""
raw: Optional[bytes] = None # full 2090-byte payload (for debugging)
# Recording parameters (✅ CONFIRMED from §7.6)
record_time: Optional[float] = None # seconds (7.0, 10.0, 13.0, etc.)
sample_rate: Optional[int] = None # sps (1024, 2048, 4096, etc.) — NOT YET FOUND ❓
# Trigger/alarm levels (✅ CONFIRMED per-channel at §7.6)
# For now we store the first geo channel (Transverse) as representatives;
# full per-channel data would require structured Channel objects.
trigger_level_geo: Optional[float] = None # in/s (first geo channel)
alarm_level_geo: Optional[float] = None # in/s (first geo channel)
max_range_geo: Optional[float] = None # in/s full-scale range
# Project/setup strings (sourced from E5 / SUB 71 write payload)
# These are the FULL project metadata from compliance config,
# complementing the sparse ProjectInfo found in the waveform record (SUB 0C).
setup_name: Optional[str] = None # "Standard Recording Setup"
project: Optional[str] = None # project description
client: Optional[str] = None # client name
operator: Optional[str] = None # operator / user name
sensor_location: Optional[str] = None # sensor location string
notes: Optional[str] = None # extended notes / additional info
# ── Event ─────────────────────────────────────────────────────────────────────
@dataclass

View File

@@ -395,6 +395,44 @@ class MiniMateProtocol:
)
return key4
def read_compliance_config(self) -> bytes:
"""
Send the SUB 1A (CHANNEL_SCALING / COMPLIANCE_CONFIG) two-step read.
Returns the full 2090-byte compliance config block (E5 response)
containing:
- Trigger and alarm levels per channel (IEEE 754 BE floats)
- Record time (float at offset +0x28)
- Project strings (Project, Client, User Name, Seis Loc, Extended Notes)
- Channel labels and unit strings
- Per-channel max range values
Returns:
2090-byte compliance config data (data[11:11+0x082A]).
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
Confirmed from protocol reference §7.6:
- SUB 1A request uses all-zero params
- E5 response is 2090 bytes (0x082A fixed length)
- Response data section: data[11:11+0x082A]
"""
rsp_sub = _expected_rsp_sub(SUB_COMPLIANCE)
length = 0x082A # 2090 bytes, fixed length for compliance config
log.debug("read_compliance_config: 1A probe")
self._send(build_bw_frame(SUB_COMPLIANCE, 0))
self._recv_one(expected_sub=rsp_sub)
log.debug("read_compliance_config: 1A data request offset=0x%04X", length)
self._send(build_bw_frame(SUB_COMPLIANCE, length))
data_rsp = self._recv_one(expected_sub=rsp_sub)
config = data_rsp.data[11:11 + length]
log.debug("read_compliance_config: received %d config bytes", len(config))
return config
# ── Internal helpers ──────────────────────────────────────────────────────
def _send(self, frame: bytes) -> None: