feat: add config API endpoint and JSON schema draft

This commit is contained in:
2026-04-07 17:26:24 -04:00
parent 7005ae766d
commit a7ab6eaf7c
3 changed files with 438 additions and 113 deletions
+233 -84
View File
@@ -617,6 +617,97 @@ class MiniMateClient:
log.info("push_config_raw: complete")
def apply_config(
self,
*,
# Recording parameters
sample_rate: Optional[int] = None,
record_time: Optional[float] = None,
# Threshold parameters (geo channels, in/s)
trigger_level_geo: Optional[float] = None,
alarm_level_geo: Optional[float] = None,
max_range_geo: Optional[float] = None,
# Project / operator strings
project: Optional[str] = None,
client_name: Optional[str] = None,
operator: Optional[str] = None,
seis_loc: Optional[str] = None,
notes: Optional[str] = None,
) -> None:
"""
Read the current device config, apply any supplied changes to the
compliance block, and write the full config back to the device.
Only non-None arguments are modified; all other bytes are round-tripped
verbatim from the device.
Configurable fields
-------------------
Recording parameters:
sample_rate : int — samples/sec; valid values: 1024, 2048, 4096
record_time : float — record duration in seconds (e.g. 2.0, 3.0)
Trigger/alarm thresholds (geo channels, in/s):
trigger_level_geo : float — trigger threshold (e.g. 0.5)
alarm_level_geo : float — alarm threshold (e.g. 1.0)
max_range_geo : float — full-scale calibration constant (e.g. 6.206)
rarely changed — only set if you know what you're doing
Project / operator strings (max 41 ASCII characters each):
project : str
client_name : str
operator : str
seis_loc : str — sensor location
notes : str — extended notes
Write sequence (confirmed from 3-11-26 BW TX capture):
68→73 | 71×3→72 | 82→83 | 69→74→72
Write payloads:
event_index_data : 88 bytes — read live via SUB 08
compliance_data : 2128 bytes — read live via SUB 1A (2126 bytes) + \\x00\\x00 footer
trigger_data : 29 bytes — hardcoded from 3-11-26 capture
waveform_data : 204 bytes — read live via SUB 09
Raises:
RuntimeError: if not connected.
ProtocolError: if any read or write step fails.
ValueError: if compliance buffer is not the expected 2126 bytes.
"""
proto = self._require_proto()
# 1. Read current payloads from the device
log.info("apply_config: reading event index (SUB 08)")
event_index_data = proto.read_event_index()
log.info("apply_config: reading compliance config (SUB 1A)")
compliance_raw = proto.read_compliance_config() # 2126 bytes
log.info("apply_config: reading waveform data (SUB 09)")
waveform_data = proto.read_waveform_data_raw() # 204 bytes
trigger_data = _TRIGGER_DATA_HARDCODED # 29 bytes
# 2. Patch the compliance buffer and build the 2128-byte write payload
compliance_data = _encode_compliance_config(
compliance_raw,
sample_rate=sample_rate,
record_time=record_time,
trigger_level_geo=trigger_level_geo,
alarm_level_geo=alarm_level_geo,
max_range_geo=max_range_geo,
project=project,
client_name=client_name,
operator=operator,
seis_loc=seis_loc,
notes=notes,
)
log.info("apply_config: compliance payload ready (%d bytes)", len(compliance_data))
# 3. Push the full write sequence to the device
self.push_config_raw(event_index_data, compliance_data, trigger_data, waveform_data)
log.info("apply_config: complete")
def set_project_info(
self,
project: Optional[str] = None,
@@ -625,84 +716,15 @@ class MiniMateClient:
seis_loc: Optional[str] = None,
notes: Optional[str] = None,
) -> None:
"""
POC: Read the current config from the device, patch ASCII project fields
in the compliance block, and write the modified config back.
Only fields passed as non-None are modified. All other bytes are
round-tripped verbatim.
Label slot format confirmed from 3-11-26 BW TX capture:
- Compliance buffer is 2126 bytes (three E5 data frames concatenated).
- Each label ("Project:", "Client:", etc.) anchors a 64-byte slot.
- ASCII value starts at slot_start + 22, max 42 bytes, null-padded.
Known labels and their confirmed positions in the 2126-byte buffer:
b"Project:" → value at 125 + 22 = 147
b"Client:" → value at 189 + 22 = 211
b"User Name:" → value at 253 + 22 = 275
b"Seis Loc:" → value at 317 + 22 = 339
b"Extended Notes" → value at 381 + 22 = 403
Write payloads:
event_index_data : 88 bytes — read live from device via SUB 08
compliance_data : 2128 bytes — 2126 read bytes + 2-byte footer \\x00\\x00
(checksum formula unknown for POC; device may ignore it)
trigger_data : 29 bytes — hardcoded from 3-11-26 capture
waveform_data : 204 bytes — read live from device via SUB 09
Raises:
RuntimeError: if not connected.
ProtocolError: if any read or write step fails.
"""
proto = self._require_proto()
# 1. Read current payloads from the device
log.info("set_project_info: reading event index (SUB 08)")
event_index_data = proto.read_event_index() # 88 bytes
log.info("set_project_info: reading compliance config (SUB 1A)")
compliance_raw = bytearray(proto.read_compliance_config()) # 2126 bytes
log.info("set_project_info: reading waveform data (SUB 09)")
waveform_data = proto.read_waveform_data_raw() # 204 bytes
trigger_data = _TRIGGER_DATA_HARDCODED # 29 bytes
# 2. Patch ASCII fields inside the compliance buffer
def _set_field(label: bytes, value: Optional[str]) -> None:
if value is None:
return
idx = compliance_raw.find(label)
if idx < 0:
log.warning(
"set_project_info: label %r not found in compliance data", label
)
return
val_bytes = value.encode("ascii", errors="replace")[: _COMPLIANCE_VALUE_MAX - 1]
padded = val_bytes + b"\x00" * (_COMPLIANCE_VALUE_MAX - len(val_bytes))
compliance_raw[idx + _COMPLIANCE_VALUE_OFFSET : idx + _COMPLIANCE_SLOT_SIZE] = padded
log.debug(
"set_project_info: set %r%r (at offset %d)", label, value, idx
)
_set_field(b"Project:", project)
_set_field(b"Client:", client_name)
_set_field(b"User Name:", operator)
_set_field(b"Seis Loc:", seis_loc)
_set_field(b"Extended Notes", notes)
# 3. Append 2-byte footer (checksum formula unknown; \x00\x00 for POC)
compliance_data = bytes(compliance_raw) + b"\x00\x00" # 2128 bytes
log.info(
"set_project_info: compliance payload ready (%d bytes)", len(compliance_data)
"""Backwards-compat shim — delegates to apply_config()."""
self.apply_config(
project=project,
client_name=client_name,
operator=operator,
seis_loc=seis_loc,
notes=notes,
)
# 4. Push the full write sequence to the device
self.push_config_raw(event_index_data, compliance_data, trigger_data, waveform_data)
log.info("set_project_info: complete")
# ── Internal helpers ──────────────────────────────────────────────────────
def _require_proto(self) -> MiniMateProtocol:
@@ -1336,6 +1358,128 @@ def _extract_project_strings(data: bytes) -> Optional[ProjectInfo]:
)
def _encode_compliance_config(
raw: bytes,
*,
sample_rate: Optional[int] = None,
record_time: Optional[float] = None,
trigger_level_geo: Optional[float] = None,
alarm_level_geo: Optional[float] = None,
max_range_geo: Optional[float] = None,
project: Optional[str] = None,
client_name: Optional[str] = None,
operator: Optional[str] = None,
seis_loc: Optional[str] = None,
notes: Optional[str] = None,
) -> bytes:
"""
Patch a live 2126-byte compliance buffer (read from the device) with any
supplied field values and return the 2128-byte write payload.
Only non-None arguments are modified; everything else is round-tripped verbatim.
Numeric field locations (all anchor-relative or label-relative — immune to
DLE-jitter shifts):
Anchor: b'\\xbe\\x80\\x00\\x00\\x00\\x00' (confirmed stable, both BE11529 and BE18189)
sample_rate → uint16 BE at anchor_pos - 6
record_time → float32 BE at anchor_pos + 6
Channel block (anchored on b"Tran" with unit-string guard):
max_range_geo → float32 BE at tran_pos + 28
trigger_level_geo → float32 BE at tran_pos + 34
alarm_level_geo → float32 BE at tran_pos + 42
String field locations (64-byte slots, label+22 format):
b"Project:" → value at label_pos + 22, max 41 chars + null
b"Client:" → value at label_pos + 22, max 41 chars + null
b"User Name:" → value at label_pos + 22, max 41 chars + null
b"Seis Loc:" → value at label_pos + 22, max 41 chars + null
b"Extended Notes"→ value at label_pos + 22, max 41 chars + null
Returns:
2128-byte write payload: patched 2126-byte buffer + 2-byte footer \\x00\\x00.
(Checksum formula for the footer is unknown; \\x00\\x00 confirmed accepted
by the device in POC test 2026-04-07.)
Raises:
ValueError: if raw is not exactly 2126 bytes.
"""
if len(raw) != 2126:
raise ValueError(f"_encode_compliance_config: expected 2126 bytes, got {len(raw)}")
buf = bytearray(raw)
# ── Numeric: sample_rate + record_time (anchor-relative) ─────────────────
_ANC = b'\xbe\x80\x00\x00\x00\x00'
_anc = buf.find(_ANC, 0, 150)
if sample_rate is not None:
if _anc < 6:
log.warning("_encode_compliance_config: anchor not found — cannot write sample_rate")
else:
struct.pack_into(">H", buf, _anc - 6, sample_rate)
log.debug("_encode_compliance_config: sample_rate=%d → offset %d", sample_rate, _anc - 6)
if record_time is not None:
if _anc < 0 or _anc + 10 > len(buf):
log.warning("_encode_compliance_config: anchor not found — cannot write record_time")
else:
struct.pack_into(">f", buf, _anc + 6, record_time)
log.debug("_encode_compliance_config: record_time=%.3f → offset %d", record_time, _anc + 6)
# ── Numeric: channel block (Tran label + unit-string guard) ───────────────
_needs_channel = any(
v is not None for v in (trigger_level_geo, alarm_level_geo, max_range_geo)
)
if _needs_channel:
_tran = buf.find(b"Tran", 44)
_valid = (
_tran >= 0
and buf[_tran + 4 : _tran + 5] != b"2"
and _tran + 50 <= len(buf)
and buf[_tran + 38 : _tran + 42] == b"in.\x00"
and buf[_tran + 46 : _tran + 50] == b"/s\x00\x00"
)
if not _valid:
log.warning(
"_encode_compliance_config: 'Tran' channel block not found or unit "
"guard failed — trigger/alarm/max_range will not be written"
)
else:
if max_range_geo is not None:
struct.pack_into(">f", buf, _tran + 28, max_range_geo)
log.debug("_encode_compliance_config: max_range_geo=%.4f → offset %d", max_range_geo, _tran + 28)
if trigger_level_geo is not None:
struct.pack_into(">f", buf, _tran + 34, trigger_level_geo)
log.debug("_encode_compliance_config: trigger_level_geo=%.4f → offset %d", trigger_level_geo, _tran + 34)
if alarm_level_geo is not None:
struct.pack_into(">f", buf, _tran + 42, alarm_level_geo)
log.debug("_encode_compliance_config: alarm_level_geo=%.4f → offset %d", alarm_level_geo, _tran + 42)
# ── ASCII strings (64-byte slot, value at label_pos+22) ───────────────────
def _set_string(label: bytes, value: Optional[str]) -> None:
if value is None:
return
idx = buf.find(label)
if idx < 0:
log.warning("_encode_compliance_config: label %r not found", label)
return
val_bytes = value.encode("ascii", errors="replace")[:_COMPLIANCE_VALUE_MAX - 1]
padded = val_bytes + b"\x00" * (_COMPLIANCE_VALUE_MAX - len(val_bytes))
buf[idx + _COMPLIANCE_VALUE_OFFSET : idx + _COMPLIANCE_SLOT_SIZE] = padded
log.debug("_encode_compliance_config: %r%r", label, value)
_set_string(b"Project:", project)
_set_string(b"Client:", client_name)
_set_string(b"User Name:", operator)
_set_string(b"Seis Loc:", seis_loc)
_set_string(b"Extended Notes", notes)
# 2-byte footer — checksum formula unknown; \x00\x00 confirmed accepted by device
return bytes(buf) + b"\x00\x00"
def _decode_compliance_config_into(data: bytes, info: DeviceInfo) -> None:
"""
Decode a 2090-byte SUB 1A (COMPLIANCE_CONFIG) response into a ComplianceConfig.
@@ -1345,9 +1489,9 @@ def _decode_compliance_config_into(data: bytes, info: DeviceInfo) -> None:
Confirmed field locations (BE11529 with 3-step read, duplicate detection):
- cfg[89] = setup_name: first long ASCII string in cfg[40:250] ✅
- ANCHOR = b'\\x01\\x2c\\x00\\x00\\xbe\\x80\\x00\\x00\\x00\\x00' in cfg[40:100] ✅
- anchor - 2 = sample_rate uint16_BE (1024 normal / 2048 fast / 4096 faster)
- anchor + 10 = record_time float32_BE
- ANCHOR = b'\\xbe\\x80\\x00\\x00\\x00\\x00' in cfg[0:150] ✅ (revised 2026-04-07)
- anchor - 6 = sample_rate uint16_BE (1024 normal / 2048 fast / 4096 faster)
- anchor + 6 = record_time float32_BE
- "Project:" needle → project string
- "Client:" needle → client string
- "User Name:" needle → operator string
@@ -1397,18 +1541,23 @@ def _decode_compliance_config_into(data: bytes, info: DeviceInfo) -> None:
# making frame C 1 byte shorter than for 1024/2048 and shifting everything.
# sample_rate: uint16_BE at anchor - 2
# record_time: float32_BE at anchor + 10
_ANCHOR = b'\x01\x2c\x00\x00\xbe\x80\x00\x00\x00\x00'
# 6-byte suffix anchor — confirmed stable across BE11529 and bench unit (BE18189).
# The preceding 4 bytes (old anchor prefix 01 2c / 00 3c) vary by unit config;
# only be 80 00 00 00 00 is constant.
# sample_rate : uint16 BE at anchor_pos - 6
# record_time : float32 BE at anchor_pos + 6
_ANCHOR = b'\xbe\x80\x00\x00\x00\x00'
_anchor = data.find(_ANCHOR, 0, 150)
if _anchor >= 2 and _anchor + 14 <= len(data):
if _anchor >= 6 and _anchor + 10 <= len(data):
try:
config.sample_rate = struct.unpack_from(">H", data, _anchor - 2)[0]
config.sample_rate = struct.unpack_from(">H", data, _anchor - 6)[0]
log.debug(
"compliance_config: sample_rate = %d Sa/s (anchor@%d)", config.sample_rate, _anchor
)
except Exception as exc:
log.warning("compliance_config: sample_rate extraction failed: %s", exc)
try:
config.record_time = struct.unpack_from(">f", data, _anchor + 10)[0]
config.record_time = struct.unpack_from(">f", data, _anchor + 6)[0]
log.debug(
"compliance_config: record_time = %.3f s (anchor@%d)", config.record_time, _anchor
)
@@ -1416,7 +1565,7 @@ def _decode_compliance_config_into(data: bytes, info: DeviceInfo) -> None:
log.warning("compliance_config: record_time extraction failed: %s", exc)
else:
log.warning(
"compliance_config: anchor %s not found in cfg[40:100] (len=%d) "
"compliance_config: anchor %s not found in cfg[0:150] (len=%d) "
"— sample_rate and record_time will be None",
_ANCHOR.hex(), len(data),
)