Files
seismo-relay/minimateplus/client.py
T
Claude 85f4bcfe86 codec: wire decode_waveform_v2 into production; add MicL dB helper
Replaces the broken legacy int16 LE decoder in client.py with the
verified multi-channel codec.  Three changes:

1. blastware_file.extract_body_bytes(a5_frames) — new helper that
   factors out the body-reconstruction logic from write_blastware_file
   so both writers (BW binary) and decoders (sample arrays) can use
   the same canonical bytes.

2. waveform_codec.decode_a5_frames(a5_frames) — production entry point.
   Returns the raw_samples dict consumers expect (Tran/Vert/Long as
   int16 ADC counts; MicL as native ADC counts).  Internally:
     A5 frames → extract_body_bytes → decode_waveform_v2
                → decoded_to_adc_counts (geos ×16; mic pass-through)

3. waveform_codec.mic_count_to_db(count) — MicL ADC → dB(L) per BW's
   display formula:
     dB = sign(count) × (81.94 + 20 × log10(|count|))   for |count| ≥ 1
   Verified against V70 fixture: count=813 → 140.14 dB (BW PSPL 140.1).

client.py:_decode_a5_waveform is reduced to a thin wrapper that calls
decode_a5_frames and populates event.raw_samples.  Original implementation
preserved as _decode_a5_waveform_LEGACY (dead code; reference only).

Also fixed a tail-end bug in decode_waveform_v2 where trailer-section
"40 02" markers (containing ASCII serial bytes, NOT real segment headers)
were being mis-interpreted, producing 2 spurious samples per channel at
the end of each event.  Added bytes [12:14] == "02 00" validation to
reject non-header markers.

7 new pytest tests cover the new helpers and dB conversion.  Total:
71 passing (up from 64).

Known limitation (carried over from before): the walker still stops
mid-event on the loudest fixtures (SP0/SS0/SV0/event-b) at some
mid-segment edge cases not yet characterized.  Every sample reached
is decoded correctly; the walker just doesn't reach all of them.
Loud events still yield 5,000–15,000 byte-exact samples each.
2026-05-20 17:28:54 +00:00

2792 lines
122 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
client.py — MiniMateClient: the top-level public API for the library.
Combines transport, protocol, and model decoding into a single easy-to-use
class. This is the only layer that the SFM server (sfm/server.py) imports
directly.
Design: stateless per-call (connect → do work → disconnect).
The client does not hold an open connection between calls. This keeps the
first implementation simple and matches Blastware's observed behaviour.
Persistent connections can be added later without changing the public API.
Example (serial):
from minimateplus import MiniMateClient
with MiniMateClient("COM5") as device:
info = device.connect() # POLL handshake + identity read
events = device.get_events() # download all events
Example (TCP / modem):
from minimateplus import MiniMateClient
from minimateplus.transport import TcpTransport
transport = TcpTransport("203.0.113.5", port=12345)
with MiniMateClient(transport=transport) as device:
info = device.connect()
"""
from __future__ import annotations
import datetime
import logging
import struct
from typing import Optional
from .framing import S3Frame
from .models import (
CallHomeConfig,
ComplianceConfig,
DeviceInfo,
Event,
MonitorLogEntry,
MonitorStatus,
PeakValues,
ProjectInfo,
Timestamp,
)
from .protocol import MiniMateProtocol, ProtocolError
from .protocol import (
SUB_SERIAL_NUMBER,
SUB_FULL_CONFIG,
SUB_WRITE_CONFIRM_A,
SUB_WRITE_CONFIRM_B,
SUB_WRITE_CONFIRM_C,
SUB_TRIGGER_CONFIRM,
SUB_MONITOR_STATUS,
)
from .transport import SerialTransport, BaseTransport
log = logging.getLogger(__name__)
# ── Module-level constants ────────────────────────────────────────────────────
# Trigger config payload hardcoded from 3-11-26 BW TX capture (BW frame 108).
# No SUB 0x22 read exists in the capture — Blastware writes this fixed blob.
# 29 bytes: [00][1A][D5][00][00][10][03][08][0A] + 18×FF + [00][00]
_TRIGGER_DATA_HARDCODED: bytes = bytes.fromhex(
"001ad500001003080a"
"ffffffffffffffffffffffffffffffffffffff"
"0000"
)
# Compliance ASCII slot format (confirmed from 3-11-26 capture + label search):
# Each label occupies a 64-byte slot.
# Value starts at slot_start + 22, max 42 bytes, null-padded.
_COMPLIANCE_SLOT_SIZE = 64
_COMPLIANCE_VALUE_OFFSET = 22
_COMPLIANCE_VALUE_MAX = _COMPLIANCE_SLOT_SIZE - _COMPLIANCE_VALUE_OFFSET # 42
# ── MiniMateClient ────────────────────────────────────────────────────────────
class MiniMateClient:
"""
High-level client for a single MiniMate Plus device.
Args:
port: Serial port name (e.g. "COM5", "/dev/ttyUSB0").
Not required when a pre-built transport is provided.
baud: Baud rate (default 38400, ignored when transport is provided).
timeout: Per-request receive timeout in seconds (default 15.0).
transport: Pre-built transport (SerialTransport or TcpTransport).
If None, a SerialTransport is constructed from port/baud.
"""
def __init__(
self,
port: str = "",
baud: int = 38_400,
timeout: float = 15.0,
transport: Optional[BaseTransport] = None,
) -> None:
self.port = port
self.baud = baud
self.timeout = timeout
self._transport: Optional[BaseTransport] = transport
self._proto: Optional[MiniMateProtocol] = None
# ── Connection lifecycle ──────────────────────────────────────────────────
def open(self) -> None:
"""Open the transport connection."""
if self._transport is None:
self._transport = SerialTransport(self.port, self.baud)
if not self._transport.is_connected:
self._transport.connect()
self._proto = MiniMateProtocol(self._transport, recv_timeout=self.timeout)
def close(self) -> None:
"""Close the transport connection."""
if self._transport and self._transport.is_connected:
self._transport.disconnect()
self._proto = None
@property
def is_open(self) -> bool:
return bool(self._transport and self._transport.is_connected)
# ── Context manager ───────────────────────────────────────────────────────
def __enter__(self) -> "MiniMateClient":
self.open()
return self
def __exit__(self, *_) -> None:
self.close()
# ── Public API ────────────────────────────────────────────────────────────
def connect(self) -> DeviceInfo:
"""
Perform the startup handshake and read device identity + compliance config.
Opens the connection if not already open.
Reads:
1. POLL handshake (startup)
2. SUB 15 — serial number
3. SUB 01 — full config block (firmware, model strings)
4. SUB 1A — compliance config (record time, trigger/alarm levels, project strings)
5. SUB 08 — event index (stored event count)
Returns:
Populated DeviceInfo with compliance_config and event_count cached.
Raises:
ProtocolError: on any communication failure.
"""
if not self.is_open:
self.open()
proto = self._require_proto()
log.info("connect: POLL startup")
proto.startup()
log.info("connect: reading serial number (SUB 15)")
sn_data = proto.read(SUB_SERIAL_NUMBER)
device_info = _decode_serial_number(sn_data)
log.info("connect: reading full config (SUB 01)")
cfg_data = proto.read(SUB_FULL_CONFIG)
_decode_full_config_into(cfg_data, device_info)
log.info("connect: reading compliance config (SUB 1A)")
try:
cc_data = proto.read_compliance_config()
_decode_compliance_config_into(cc_data, device_info)
except ProtocolError as exc:
log.warning("connect: compliance config read failed: %s — continuing", exc)
log.info("connect: reading event index (SUB 08)")
try:
idx_raw = proto.read_event_index()
# NOTE: _decode_event_count reads data[10:12] from the SUB 08 payload,
# which was believed to be the stored event count. Empirically it turns
# out to be a monotonically-increasing "total events ever recorded" counter
# that does NOT decrement when events are erased — confirmed 2026-04-13:
# device reported 6 via SUB 08 while list_event_keys() returned 0 (empty).
# We preserve the raw read here for the index data but do NOT use this
# count for logic; ach_server uses list_event_keys() as the authoritative
# source instead.
_raw_idx_count = _decode_event_count(idx_raw)
log.info(
"connect: SUB 08 index count=%d (lifetime counter, not current storage)",
_raw_idx_count,
)
# Leave device_info.event_count as None — callers should use
# list_event_keys() to get the actual current event count.
except ProtocolError as exc:
log.warning("connect: event index read failed: %s — continuing", exc)
log.info("connect: %s", device_info)
return device_info
def count_events(self) -> int:
"""
Count stored events by iterating the 1E → 1F key chain.
This is the only reliable way to get the true event count. The SUB 08
event index payload has a field that was assumed to be an event count
(uint32 BE at offset +3) but empirically always returns 1 regardless of
how many events are stored — Blastware appears to download one event per
TCP session, so the index may reflect session-scoped state rather than
device-wide storage.
This method issues 1E (first key) then 1F repeatedly until the null
sentinel, counting as it goes. No 0A/0C/5A reads are performed, so it
is much faster than get_events().
Null sentinel: event_data8[4:8] == b'\\x00\\x00\\x00\\x00'.
DO NOT check key4 — key4 is all-zeros for event 0 and would falsely
signal end-of-events on the very first iteration.
Returns:
Number of stored waveform events (0 if device is empty).
"""
proto = self._require_proto()
try:
key4, data8 = proto.read_event_first()
except ProtocolError as exc:
log.warning("count_events: 1E failed: %s — returning 0", exc)
return 0
log.debug(
"count_events: 1E -> key=%s data8=%s trailing=%s",
key4.hex(), data8.hex(), data8[4:8].hex(),
)
if data8[4:8] == b"\x00\x00\x00\x00":
log.info("count_events: 1E returned null sentinel — device is empty")
return 0
# Iterate via 0A → 1F. Each 0A call establishes waveform context so that
# 1F (EVENT_ADVANCE) returns the actual next-event key. Without the 0A
# call, 1F immediately returns the null sentinel regardless of how many
# events are stored. Confirmed from 4-3-26 two-event BW/S3 capture:
# browse-mode sequence is 0A(keyN) → 1F → 0A(keyN+1) → 1F → … → null.
count = 0
while data8[4:8] != b"\x00\x00\x00\x00":
count += 1
try:
# 0A establishes device context for this key before 1F advances.
proto.read_waveform_header(key4)
except ProtocolError as exc:
log.warning("count_events: 0A failed for key=%s: %s", key4.hex(), exc)
break
try:
key4, data8 = proto.advance_event(browse=True)
log.debug(
"count_events: 1F [iter %d] -> key=%s data8=%s trailing=%s",
count, key4.hex(), data8.hex(), data8[4:8].hex(),
)
except ProtocolError as exc:
log.warning("count_events: 1F failed after %d events: %s", count, exc)
break
log.info("count_events: %d event(s) found via 1E/1F chain", count)
return count
def list_event_keys(self) -> list[str]:
"""
Return the hex key strings for all stored events without downloading
any waveform data. Uses the same browse-mode 1E -> 0A -> 1F walk as
count_events() but collects the key at each step.
Returns:
List of 8-char lowercase hex strings, e.g. ["01110000", "0111245a"].
Empty list if device has no stored events or 1E fails.
"""
proto = self._require_proto()
try:
key4, data8 = proto.read_event_first()
except ProtocolError as exc:
log.warning("list_event_keys: 1E failed: %s -- returning []", exc)
return []
if data8[4:8] == b"\x00\x00\x00\x00":
log.info("list_event_keys: device is empty")
return []
keys: list[str] = []
while data8[4:8] != b"\x00\x00\x00\x00":
keys.append(key4.hex())
try:
proto.read_waveform_header(key4)
except ProtocolError as exc:
log.warning(
"list_event_keys: 0A failed for key=%s: %s -- stopping",
key4.hex(), exc,
)
break
try:
key4, data8 = proto.advance_event(browse=True)
log.debug(
"list_event_keys: 1F -> key=%s trailing=%s",
key4.hex(), data8[4:8].hex(),
)
except ProtocolError as exc:
log.warning(
"list_event_keys: 1F failed after %d event(s): %s -- stopping",
len(keys), exc,
)
break
log.info("list_event_keys: %d key(s): %s", len(keys), keys)
return keys
def get_monitor_log_entries(
self,
skip_keys: Optional[set] = None,
) -> list[MonitorLogEntry]:
"""
Collect all monitor log entries (partial records, type 0x2C) from the
device using the browse-mode 1E → 0A → 1F walk.
This is the fast path for monitor log data. No 0C or 5A commands are
issued — all available monitor log information is in the 0x0A response
header alone.
Full triggered events (0x0A response type 0x46) are silently skipped.
Only partial records (type 0x2C) are returned as MonitorLogEntry objects.
Confirmed from 4-11-26 MITM capture: Blastware's ACH mode performs a
full browse walk (Phase 3: 0x0A + 1F × all records) AFTER the triggered-
event download phase. The partial records encountered in this walk are
the monitor log entries.
Args:
skip_keys: optional set of 8-hex key strings to skip (already seen).
Keys in this set still advance the walk (0A + 1F) but are
not decoded or returned.
Returns:
List of MonitorLogEntry objects in device storage order.
Raises:
ProtocolError: on unrecoverable communication failure.
"""
proto = self._require_proto()
try:
key4, data8 = proto.read_event_first()
except ProtocolError as exc:
log.warning("get_monitor_log_entries: 1E failed: %s -- returning []", exc)
return []
if data8[4:8] == b"\x00\x00\x00\x00":
log.info("get_monitor_log_entries: device is empty")
return []
entries: list[MonitorLogEntry] = []
idx = 0
while data8[4:8] != b"\x00\x00\x00\x00":
cur_key = key4
key_hex = cur_key.hex()
try:
raw_data, rec_len = proto.read_waveform_header(cur_key)
except ProtocolError as exc:
log.warning(
"get_monitor_log_entries: 0A failed for key=%s: %s -- stopping",
key_hex, exc,
)
break
# Only decode partial records (0x2C); full records (0x46) are silently skipped.
if rec_len < 0x40 and raw_data and (not skip_keys or key_hex not in skip_keys):
entry = _decode_0a_partial_header(raw_data, idx, cur_key)
if entry is not None:
entries.append(entry)
log.debug(
"get_monitor_log_entries: [%d] key=%s %s%s",
idx, key_hex, entry.start_time, entry.stop_time,
)
else:
log.debug(
"get_monitor_log_entries: [%d] key=%s type=0x%02X %s",
idx, key_hex, rec_len,
"skip (already seen)" if skip_keys and key_hex in skip_keys else "skip (full record)",
)
try:
key4, data8 = proto.advance_event(browse=True)
except ProtocolError as exc:
log.warning(
"get_monitor_log_entries: 1F failed after %d record(s): %s -- stopping",
idx, exc,
)
break
idx += 1
log.info(
"get_monitor_log_entries: walked %d record(s), found %d monitor log entry(s)",
idx, len(entries),
)
return entries
def delete_all_events(self) -> None:
"""
Erase all stored events from the device memory.
This performs the complete erase sequence confirmed from the 4-11-26
MITM capture of a Blastware ACH session:
1. SUB 0xA3 (begin_erase_all) — initiate erase, token=0xFE
2. SUB 0x1C (read_monitor_status) — status read between erase commands
3. SUB 0x06 (read_event_storage_range) — verify storage state, token=0xFE
4. SUB 0xA2 (confirm_erase_all) — commit erase, token=0xFE
After this call the device's event memory is empty. The unit returns to
its normal operating state automatically (no restart-monitoring call needed).
Raises:
ProtocolError: on timeout or unexpected device response.
"""
proto = self._require_proto()
log.info("delete_all_events: step 1/4 — begin erase (SUB 0xA3)")
proto.begin_erase_all()
log.debug("delete_all_events: 0xA3 ack received")
log.info("delete_all_events: step 2/4 — monitor status read (SUB 0x1C)")
proto.read_monitor_status()
log.debug("delete_all_events: 0x1C read complete")
log.info("delete_all_events: step 3/4 — event storage range read (SUB 0x06)")
rng = proto.read_event_storage_range()
if len(rng.data) >= 8:
first_key = rng.data[-8:-4].hex()
last_key = rng.data[-4:].hex()
log.info(
"delete_all_events: storage range — first=%s last=%s",
first_key, last_key,
)
log.debug("delete_all_events: 0x06 read complete")
log.info("delete_all_events: step 4/4 — confirm erase (SUB 0xA2)")
proto.confirm_erase_all()
log.info("delete_all_events: erase confirmed — device memory cleared")
def get_events(self, full_waveform: bool = False, debug: bool = False, stop_after_index: Optional[int] = None, skip_waveform_for_keys: Optional[set] = None, skip_waveform_for_events: Optional[dict] = None, extra_chunks_after_metadata: int = 1) -> list[Event]:
"""
Download all stored events from the device using the confirmed
1E → 0A → 0C → 5A → 1F event-iterator protocol.
Sequence (confirmed from 3-31-26 and 1-2-26 Blastware captures):
1. SUB 1E — get first waveform key
2. For each key until b'\\x00\\x00\\x00\\x00':
a. SUB 0A — waveform header (first event only, confirm full record)
b. SUB 0C — full waveform record (peak values, record type, timestamp)
c. SUB 5A — bulk waveform stream (event-time metadata; stops early
after "Project:" is found, so only ~8 frames are fetched)
d. SUB 1F — advance to next key (token=0xFE skips partial bins)
The SUB 5A fetch provides the authoritative event-time metadata:
"Project:", "Client:", "User Name:", "Seis Loc:", and "Extended Notes"
as they were configured AT THE TIME the event was recorded. This is
distinct from the current device compliance config (SUB 1A), which only
reflects the CURRENT setup.
Raw ADC waveform samples (full bulk waveform payload, several MB) are
NOT downloaded by default. include_waveforms is reserved for a future
endpoint that fetches and stores the raw ADC channel data.
Returns:
List of Event objects, one per stored waveform record.
Raises:
ProtocolError: on unrecoverable communication failure.
"""
proto = self._require_proto()
log.info("get_events: requesting first event (SUB 1E)")
try:
key4, data8 = proto.read_event_first()
except ProtocolError as exc:
raise ProtocolError(f"get_events: 1E failed: {exc}") from exc
# Null sentinel: trailing 4 bytes of the 8-byte event data block are
# all zero. DO NOT use key4 == b"\x00\x00\x00\x00" — event 0 has
# key4=00000000 which would falsely signal an empty device.
if data8[4:8] == b"\x00\x00\x00\x00":
log.info("get_events: device reports no stored events")
return []
events: list[Event] = []
idx = 0
# Legacy bare-key skip set is deprecated: the device's key counter
# resets to 0x01110000 after every memory erase, so a key in this set
# cannot be trusted to identify the same physical event across erases.
# If a caller still passes it, log a warning and ignore — full
# downloads will run for every event so the bug never silently bites.
if skip_waveform_for_keys:
log.warning(
"get_events: skip_waveform_for_keys is deprecated and unsafe "
"(post-erase key reuse); ignoring %d entries. Use "
"skip_waveform_for_events={key: timestamp_iso} instead.",
len(skip_waveform_for_keys),
)
skip_evts: dict[str, str] = dict(skip_waveform_for_events or {})
while data8[4:8] != b"\x00\x00\x00\x00":
cur_key = key4 # key for this event's 0A/1E-arm/0C/5A calls
log.info("get_events: record %d key=%s", idx, cur_key.hex())
ev = Event(index=idx)
ev._waveform_key = cur_key
# SUB 0A — MUST be called first to establish device waveform context.
# Required before 0C, 1E-arm, and 1F.
proceed = True
try:
_hdr, rec_len = proto.read_waveform_header(cur_key)
if rec_len < 0x30:
log.warning(
"get_events: key=%s is partial (len=0x%02X) — skipping",
cur_key.hex(), rec_len,
)
proceed = False
except ProtocolError as exc:
log.warning(
"get_events: 0A failed for key=%s: %s — skipping",
cur_key.hex(), exc,
)
proceed = False
if proceed:
# SUB 1E (download-arm) — MUST be sent between 0A and 0C.
# Device ignores 5A probe frames without this second 1E(token=0xFE).
# Confirmed from both 4-2-26 and 4-3-26 BW TX captures (2026-04-06).
log.info("get_events: 1E download-arm (token=0xFE) for key=%s", cur_key.hex())
try:
proto.read_event_first(token=0xFE)
log.info("get_events: 1E download-arm OK")
except ProtocolError as exc:
log.warning(
"get_events: 1E download-arm failed for key=%s: %s",
cur_key.hex(), exc,
)
# SUB 0C — full waveform record (peak values, timestamp, project string)
try:
record = proto.read_waveform_record(cur_key)
if debug:
ev._raw_record = record
_decode_waveform_record_into(record, ev)
except ProtocolError as exc:
log.warning(
"get_events: 0C failed for key=%s: %s", cur_key.hex(), exc
)
# ── Skip-5A decision based on (key, timestamp) match ──────
# If skip_waveform_for_events maps cur_key.hex() to a non-empty
# ISO timestamp matching what we just read from 0C, this is
# the same physical event we already have on disk — bypass
# the 1F(arm)+POLL+5A bulk download. Otherwise (no entry, or
# timestamp mismatch indicating post-erase reuse) fall through
# to the full download.
expected_ts = skip_evts.get(cur_key.hex(), "")
actual_ts = _event_timestamp_iso(ev)
skip_5a = bool(expected_ts and actual_ts and expected_ts == actual_ts)
if skip_5a:
log.info(
"get_events: key=%s (key, ts=%s) match — skipping 5A bulk download",
cur_key.hex(), actual_ts,
)
arm_key4: Optional[bytes] = None
a5_ok = False
if not skip_5a:
# SUB 1F (download-arm) — send token=0xFE BEFORE POLL+5A to arm the
# device's bulk stream state machine. Cache the returned key as a
# fallback for loop iteration when 5A fails (see iteration block below).
# Confirmed from 4-2-26 capture frames 66-67 (1F before frames 68-73 POLL).
try:
arm_key4, _ = proto.advance_event(browse=False) # arm 5A
log.info("get_events: 1F(download) — 5A armed, arm_key=%s", arm_key4.hex())
except ProtocolError as exc:
log.warning("get_events: 1F(download) arm failed: %s", exc)
# POLL × 3 — BW sends 3 full POLL cycles between 1F and 5A.
# Confirmed from 4-2-26 BW TX capture (frames 68-73 before 5A at 74).
log.info("get_events: POLL × 3 before 5A")
for _p in range(3):
try:
proto.poll()
except ProtocolError as exc:
log.warning("get_events: POLL %d failed: %s", _p, exc)
# SUB 5A — bulk waveform stream (uses cur_key, the event set up by 0A+1E+0C).
# By default (full_waveform=False): stop after frame 7 for metadata only.
# When full_waveform=True: fetch all chunks and decode raw ADC samples.
#
# Bypassed when skip_5a is True — the event is left with
# _a5_frames=None, which signals to the caller (e.g.
# ach_server.py) that this event was matched by (key, ts) and
# already has a stored .file in the persistent waveform store.
if not skip_5a:
try:
if full_waveform:
log.info(
"get_events: 5A full waveform download for key=%s", cur_key.hex()
)
a5_frames = proto.read_bulk_waveform_stream(
cur_key, stop_after_metadata=False, max_chunks=128,
include_terminator=True,
)
if a5_frames:
a5_ok = True
ev._a5_frames = a5_frames # store for write_blastware_file
_decode_a5_metadata_into(a5_frames, ev)
_decode_a5_waveform(a5_frames, ev)
log.info(
"get_events: 5A decoded %d sample-sets",
len((ev.raw_samples or {}).get("Tran", [])),
)
else:
log.info(
"get_events: 5A metadata-only download for key=%s", cur_key.hex()
)
a5_frames = proto.read_bulk_waveform_stream(
cur_key, stop_after_metadata=True,
include_terminator=True,
extra_chunks_after_metadata=extra_chunks_after_metadata,
max_chunks=128,
)
if a5_frames:
a5_ok = True
ev._a5_frames = a5_frames # store for write_blastware_file
_decode_a5_metadata_into(a5_frames, ev)
log.debug(
"get_events: 5A metadata client=%r operator=%r",
ev.project_info.client if ev.project_info else None,
ev.project_info.operator if ev.project_info else None,
)
except ProtocolError as exc:
log.warning(
"get_events: 5A failed for key=%s: %s — metadata unavailable",
cur_key.hex(), exc,
)
# SUB 1F — loop iteration.
#
# IMPORTANT: browse 1F (all-zero params) is ONLY called when 5A
# succeeded. If 5A timed out or failed, calling browse 1F disrupts
# the device's internal state and causes the NEXT event's 5A to also
# fail. In the failure path, use the key cached from 1F(download)
# above as a best-effort fallback for iteration.
#
# Confirmed from 4-3-26 browse-mode captures: browse=True params
# are correct for multi-event iteration. Conditional logic added
# 2026-04-06 to avoid post-failure state disruption.
#
# NEW 2026-05-06: when skip_5a=True we never entered the 5A
# state at all (we read 0A+1E(arm)+0C and chose to bypass).
# 1F(browse) is safe in this scenario — the device's iteration
# pointer is independent of the bulk-stream state machine, and
# we never put it into the half-attempted 5A state that the
# earlier "post-failure 1F disruption" warning is about.
if skip_5a or a5_ok:
# 5A succeeded — use browse 1F for reliable key advancement.
try:
key4, data8 = proto.advance_event(browse=True)
log.info(
"get_events: 1F(browse) -> key=%s trailing=%s",
key4.hex(), data8[4:8].hex(),
)
except ProtocolError as exc:
log.warning("get_events: 1F(browse) failed: %s — stopping", exc)
key4 = b"\x00\x00\x00\x00"
data8 = b"\x00\x00\x00\x00\x00\x00\x00\x00"
else:
# 5A failed — skip browse 1F to avoid further state disruption.
# Use the arm_key4 returned by 1F(download) as the next-key hint.
if arm_key4 is None or arm_key4 == cur_key:
# 1F(download) returned no valid next key (or same key = stuck).
# Stop iteration to prevent infinite loop.
log.warning(
"get_events: 5A failed and 1F(download) returned no valid "
"next key (arm_key=%s, cur_key=%s) — stopping iteration",
arm_key4.hex() if arm_key4 else "None",
cur_key.hex(),
)
key4 = b"\x00\x00\x00\x00"
data8 = b"\x00\x00\x00\x00\x00\x00\x00\x00"
else:
# arm_key4 is a valid non-stuck next key — use it.
# Construct a synthetic data8 with non-null trailing so the
# loop continues (the real trailing is unknown but non-null
# since we have a valid arm_key4).
key4 = arm_key4
data8 = arm_key4 + b"\x00\x00\x00\x01"
log.warning(
"get_events: 5A failed — advancing via arm_key=%s "
"(browse 1F skipped to preserve device state)",
key4.hex(),
)
events.append(ev)
idx += 1
# Early exit: if the caller only wants events up to a specific
# index, stop iterating once we've collected it.
if stop_after_index is not None and idx > stop_after_index:
log.info(
"get_events: reached stop_after_index=%d — stopping early",
stop_after_index,
)
break
else:
# Partial/failed record — skip 5A, just advance with 1F.
log.info(
"get_events: key=%s — skipping partial/failed record",
cur_key.hex(),
)
try:
key4, data8 = proto.advance_event(browse=True)
log.info(
"get_events: 1F -> key=%s trailing=%s",
key4.hex(), data8[4:8].hex(),
)
except ProtocolError as exc:
log.warning("get_events: 1F failed: %s — stopping iteration", exc)
break
log.info("get_events: downloaded %d event(s)", len(events))
return events
def download_waveform(self, event: Event) -> None:
"""
Download the full raw ADC waveform for a previously-retrieved event
and populate event.raw_samples, event.total_samples,
event.pretrig_samples, and event.rectime_seconds.
This performs a complete SUB 5A (BULK_WAVEFORM_STREAM) download with
stop_after_metadata=False, fetching all waveform frames (typically 9
large A5 frames for a standard blast record). The download is large
(up to several hundred KB for a 9-second, 4-channel, 1024-Hz record)
and is intentionally not performed by get_events() by default.
Args:
event: An Event object returned by get_events(). Must have a
waveform key embedded; the key is reconstructed from the
event's timestamp and index via the 1E/1F protocol.
Raises:
ValueError: if the event does not have a waveform key available.
RuntimeError: if the client is not connected.
ProtocolError: on communication failure.
Confirmed format (4-2-26 blast capture, ✅):
4-channel interleaved signed 16-bit LE, 8 bytes per sample-set.
Total samples: 9306 (≈9.1 s at 1024 Hz), pretrig: 298 (≈0.29 s).
Channel order: Tran, Vert, Long, Mic (Blastware convention).
"""
proto = self._require_proto()
if event._waveform_key is None:
raise ValueError(
f"Event#{event.index} has no waveform key — "
"was it retrieved via get_events()?"
)
log.info(
"download_waveform: starting full 5A download for event#%d (key=%s)",
event.index, event._waveform_key.hex(),
)
a5_frames = proto.read_bulk_waveform_stream(
event._waveform_key, stop_after_metadata=False, max_chunks=128
)
log.info(
"download_waveform: received %d A5 frames; decoding waveform",
len(a5_frames),
)
_decode_a5_waveform(a5_frames, event)
if event.raw_samples is not None:
n = len(event.raw_samples.get("Tran", []))
log.info(
"download_waveform: decoded %d sample-sets across 4 channels",
n,
)
else:
log.warning("download_waveform: waveform decode produced no samples")
return a5_frames
def save_blastware_file(self, event: "Event", path: "Union[str, Path]", serial: str) -> None:
"""
Download the full waveform for *event* and save it as a Blastware-
compatible Blastware waveform file at *path*.
This is a convenience wrapper that calls download_waveform() (which
performs the complete SUB 5A BULK_WAVEFORM_STREAM download) and then
calls write_blastware_file() from blastware_file.py to encode the result.
Args:
event: Event object with waveform key populated (from get_events()).
path: Destination file path. Caller should use blastware_filename()
to pick the correct extension via blastware_filename().
serial: Device serial number (e.g. "BE11529") — passed to
blastware_filename() for reference, but the caller supplies
the final path.
"""
from pathlib import Path as _Path
from .blastware_file import write_blastware_file as _write_blastware_file
a5_frames = self.download_waveform(event)
if not a5_frames:
raise RuntimeError(
f"save_blastware_file: no A5 frames received for event#{event.index}"
)
_write_blastware_file(event, a5_frames, path)
log.info(
"save_blastware_file: wrote %s (%d A5 frames)",
path, len(a5_frames),
)
# ── Write commands ────────────────────────────────────────────────────────
def push_config_raw(
self,
event_index_data: bytes,
compliance_data: bytes,
trigger_data: bytes,
waveform_data: bytes,
) -> None:
"""
Push a complete config update to the device using the confirmed write
sequence from the 3-11-26 BW TX capture.
This is the raw-bytes interface — callers supply pre-encoded payloads for
each write block. A higher-level method that encodes from ComplianceConfig
and re-reads the current payloads first can be built on top of this.
Full write sequence (confirmed from 3-11-26 BW TX capture frames 102112):
SUB 68 → event index write → ack SUB 0x97
SUB 73 → confirm B → ack SUB 0x8C
SUB 71 (×3 chunks) → compliance write → each ack SUB 0x8E
SUB 72 → confirm A → ack SUB 0x8D
SUB 82 → trigger config write → ack SUB 0x7D
SUB 83 → trigger confirm → ack SUB 0x7C
SUB 69 → waveform data write → ack SUB 0x96
SUB 74 → confirm C → ack SUB 0x8B
SUB 72 → confirm A → ack SUB 0x8D
Args:
event_index_data: Raw bytes for SUB 68 write (88-byte event index).
compliance_data: Raw bytes for SUB 71 write (≥2082 bytes, 3 chunks).
trigger_data: Raw bytes for SUB 82 write (44-byte trigger config).
waveform_data: Raw bytes for SUB 69 write.
Raises:
RuntimeError: if the client is not connected.
ProtocolError: if any write step fails (timeout, bad ack SUB).
ValueError: if compliance_data is too short for the 3-chunk split.
"""
proto = self._require_proto()
# 68 → 73
log.info("push_config_raw: write event index (SUB 68)")
proto.write_event_index(event_index_data)
log.info("push_config_raw: confirm B (SUB 73)")
proto.write_confirm(SUB_WRITE_CONFIRM_B)
# 71×3 → 72 (handled internally by write_compliance_config_raw)
log.info("push_config_raw: write compliance config (SUB 71 ×3 + confirm 72)")
proto.write_compliance_config_raw(compliance_data)
# 82 → 83
log.info("push_config_raw: write trigger config (SUB 82)")
proto.write_trigger_config(trigger_data)
log.info("push_config_raw: trigger confirm (SUB 83)")
proto.write_confirm(SUB_TRIGGER_CONFIRM)
# 69 → 74 → 72
log.info("push_config_raw: write waveform data (SUB 69)")
proto.write_waveform_data(waveform_data)
log.info("push_config_raw: confirm C (SUB 74)")
proto.write_confirm(SUB_WRITE_CONFIRM_C)
log.info("push_config_raw: confirm A (SUB 72)")
proto.write_confirm(SUB_WRITE_CONFIRM_A)
log.info("push_config_raw: complete")
def apply_config(
self,
*,
# Recording parameters
recording_mode: Optional[int] = None,
sample_rate: Optional[int] = None,
record_time: Optional[float] = None,
histogram_interval_sec: Optional[int] = None,
# Threshold parameters (geo channels, in/s)
trigger_level_geo: Optional[float] = None,
alarm_level_geo: Optional[float] = None,
geo_range: Optional[int] = None, # 0x00=Normal 10in/s, 0x01=Sensitive 1.25in/s
# Project / operator strings
project: Optional[str] = None,
client_name: Optional[str] = None,
operator: Optional[str] = None,
seis_loc: Optional[str] = None,
notes: Optional[str] = None,
) -> None:
"""
Read the current device config, apply any supplied changes to the
compliance block, and write the full config back to the device.
Only non-None arguments are modified; all other bytes are round-tripped
verbatim from the device.
Configurable fields
-------------------
Recording parameters:
recording_mode : int — 0x00=Single Shot, 0x01=Continuous, 0x03=Histogram, 0x04=Histogram+Continuous
sample_rate : int — samples/sec; valid values: 1024, 2048, 4096
record_time : float — record duration in seconds (e.g. 2.0, 3.0)
Trigger/alarm thresholds and range (geo channels):
trigger_level_geo : float — trigger threshold in/s (e.g. 0.5)
alarm_level_geo : float — alarm threshold in/s (e.g. 1.0)
geo_range : int — 0x00=Normal 10.000 in/s, 0x01=Sensitive 1.250 in/s
(written to Tran/Vert/Long channel blocks)
Project / operator strings (max 41 ASCII characters each):
project : str
client_name : str
operator : str
seis_loc : str — sensor location
notes : str — extended notes
Write sequence (confirmed from 3-11-26 BW TX capture):
68→73 | 71×3→72 | 82→83 | 69→74→72
Write payloads:
event_index_data : 88 bytes — read live via SUB 08
compliance_data : ~2128 bytes — read live via SUB 1A (~2126 bytes, varies ±1-2) + \\x00\\x00 footer
trigger_data : 29 bytes — hardcoded from 3-11-26 capture
waveform_data : 204 bytes — read live via SUB 09
Raises:
RuntimeError: if not connected.
ProtocolError: if any read or write step fails.
ValueError: if compliance buffer is shorter than the 2082-byte write minimum.
"""
proto = self._require_proto()
# 1. Read current payloads from the device
log.info("apply_config: reading event index (SUB 08)")
event_index_data = proto.read_event_index()
log.info("apply_config: reading compliance config (SUB 1A)")
compliance_raw = proto.read_compliance_config() # ~2126 bytes (varies ±1-2 by DLE jitter)
log.info("apply_config: reading waveform data (SUB 09)")
waveform_data = proto.read_waveform_data_raw() # 204 bytes
trigger_data = _TRIGGER_DATA_HARDCODED # 29 bytes
# 2. Patch the compliance buffer and build the 2128-byte write payload
compliance_data = _encode_compliance_config(
compliance_raw,
recording_mode=recording_mode,
sample_rate=sample_rate,
record_time=record_time,
histogram_interval_sec=histogram_interval_sec,
trigger_level_geo=trigger_level_geo,
alarm_level_geo=alarm_level_geo,
geo_range=geo_range,
project=project,
client_name=client_name,
operator=operator,
seis_loc=seis_loc,
notes=notes,
)
log.info("apply_config: compliance payload ready (%d bytes)", len(compliance_data))
# 3. Push the full write sequence to the device
self.push_config_raw(event_index_data, compliance_data, trigger_data, waveform_data)
log.info("apply_config: complete")
def set_project_info(
self,
project: Optional[str] = None,
client_name: Optional[str] = None,
operator: Optional[str] = None,
seis_loc: Optional[str] = None,
notes: Optional[str] = None,
) -> None:
"""Backwards-compat shim — delegates to apply_config()."""
self.apply_config(
project=project,
client_name=client_name,
operator=operator,
seis_loc=seis_loc,
notes=notes,
)
# ── Call home config ──────────────────────────────────────────────────────
def get_call_home_config(self) -> CallHomeConfig:
"""
Read the auto call home (ACH) configuration from the device.
Sends SUB 0x2C (two-step read) and decodes the raw 125-byte payload
into a CallHomeConfig object.
Returns:
CallHomeConfig with all confirmed fields populated.
Raises:
RuntimeError: if not connected.
ProtocolError: on timeout or wrong response SUB.
"""
proto = self._require_proto()
raw = proto.read_call_home_config()
return _decode_call_home_config(raw)
def set_call_home_config(
self,
*,
auto_call_home_enabled: Optional[bool] = None,
after_event_recorded: Optional[bool] = None,
at_specified_times: Optional[bool] = None,
time1_enabled: Optional[bool] = None,
time1_hour: Optional[int] = None,
time1_min: Optional[int] = None,
time2_enabled: Optional[bool] = None,
time2_hour: Optional[int] = None,
time2_min: Optional[int] = None,
) -> None:
"""
Read the current call home config, apply any supplied changes, and
write the updated config back to the device.
Only non-None arguments are modified. All other bytes are round-tripped
verbatim from the device.
Configurable fields
-------------------
auto_call_home_enabled : bool — master enable for ACH
after_event_recorded : bool — call home after each triggered event
at_specified_times : bool — call home at scheduled times
time1_enabled : bool — enable time slot 1
time1_hour : int — hour for time slot 1 (0-23)
time1_min : int — minute for time slot 1 (0-59)
time2_enabled : bool — enable time slot 2
time2_hour : int — hour for time slot 2 (0-23)
time2_min : int — minute for time slot 2 (0-59)
Write sequence (confirmed from 4-20-26 call home settings captures):
SUB 0x2C (read, 2-step) → 125-byte raw payload
patch fields in-place
SUB 0x7E (write, 127-byte payload) → ack 0x81
SUB 0x7F (confirm) → ack 0x80
Raises:
RuntimeError: if not connected.
ProtocolError: if any read or write step fails.
"""
proto = self._require_proto()
# 1. Read current config
log.info("set_call_home_config: reading current config (SUB 0x2C)")
raw = proto.read_call_home_config()
# 2. Patch fields and build write payload
write_data = _encode_call_home_config(
raw,
auto_call_home_enabled=auto_call_home_enabled,
after_event_recorded=after_event_recorded,
at_specified_times=at_specified_times,
time1_enabled=time1_enabled,
time1_hour=time1_hour,
time1_min=time1_min,
time2_enabled=time2_enabled,
time2_hour=time2_hour,
time2_min=time2_min,
)
# 3. Write back
log.info("set_call_home_config: writing updated config (SUB 0x7E + 0x7F)")
proto.write_call_home_config(write_data)
log.info("set_call_home_config: complete")
def poll(self) -> None:
"""
Perform just the POLL startup handshake — no config reads.
Opens the connection if not already open. Used by the monitoring
endpoints which need to communicate with the device quickly without
spending 10-15 seconds reading compliance config and event index.
The POLL establishes the DLE-framed session with the device.
After poll(), the protocol is ready for any command (read_monitor_status,
start_monitoring, stop_monitoring, etc.).
"""
if not self.is_open:
self.open()
proto = self._require_proto()
log.debug("poll: startup handshake")
proto.startup()
# ── Monitoring ────────────────────────────────────────────────────────────
def get_monitor_status(self) -> MonitorStatus:
"""
Read the current monitoring state, battery voltage, and memory usage.
Wraps protocol.read_monitor_status() and decodes the raw payload into
a MonitorStatus object.
The device payload length indicates mode:
- 44 bytes (0x2C): unit is idle (full status block present)
- 12 bytes : unit is actively monitoring (abbreviated block)
Confirmed field offsets (relative to data[11], the start of the S3
data section after the 11-byte frame header):
[0x2F:0x31] battery voltage × 100 uint16 BE e.g. 0x02A8 = 680 → 6.80 V
[0x31:0x35] memory total (bytes) uint32 BE e.g. 0x000F0000 = 983040 bytes
[0x35:0x39] memory free (bytes) uint32 BE
Returns:
MonitorStatus with is_monitoring, battery_v, memory_total, memory_free.
Raises:
RuntimeError: if not connected.
ProtocolError: on timeout or wrong response SUB.
"""
proto = self._require_proto()
frame = proto.read_monitor_status()
return _decode_monitor_status(frame.data)
def start_monitoring(self) -> None:
"""
Command the device to begin monitoring (recording triggered events).
Sends SUB 0x96; device responds with a 17-byte zero-data ack (SUB 0x69).
Confirmed from 4-8-26/2ndtry BW TX capture frame 92.
Raises:
RuntimeError: if not connected.
ProtocolError: on timeout or wrong response.
"""
proto = self._require_proto()
proto.start_monitoring()
log.info("start_monitoring: device is now monitoring")
def stop_monitoring(self) -> None:
"""
Command the device to stop monitoring.
Sends SUB 0x97; device responds with a 17-byte zero-data ack (SUB 0x68).
Confirmed from 4-8-26/2ndtry BW TX capture frame 305.
Raises:
RuntimeError: if not connected.
ProtocolError: on timeout or wrong response.
"""
proto = self._require_proto()
proto.stop_monitoring()
log.info("stop_monitoring: device stopped monitoring")
# ── Internal helpers ──────────────────────────────────────────────────────
def _require_proto(self) -> MiniMateProtocol:
if self._proto is None:
raise RuntimeError("MiniMateClient is not connected. Call open() first.")
return self._proto
# ── Decoder functions ─────────────────────────────────────────────────────────
#
# Pure functions: bytes → model field population.
# Kept here (not in models.py) to isolate protocol knowledge from data shapes.
def _event_timestamp_iso(event: Event) -> str:
"""
Return a stable ISO-8601 string for the event's 0C-derived timestamp,
or "" if the event has no timestamp populated.
The format intentionally matches what `bridges/ach_server.py` writes
into `ach_state.json:downloaded_events[*]` so the (key, ts) compare
in get_events()'s skip path is a simple string equality.
"""
ts = getattr(event, "timestamp", None)
if ts is None:
return ""
try:
return datetime.datetime(
ts.year, ts.month, ts.day,
ts.hour or 0, ts.minute or 0, ts.second or 0,
).isoformat()
except Exception:
return str(ts)
def _decode_serial_number(data: bytes) -> DeviceInfo:
"""
Decode SUB EA (SERIAL_NUMBER_RESPONSE) payload into a new DeviceInfo.
Layout (10 bytes total per §7.2):
bytes 07: serial string, null-terminated, null-padded ("BE18189\\x00")
byte 8: unit-specific trailing byte (purpose unknown ❓)
byte 9: firmware minor version (0x11 = 17) ✅
Returns:
New DeviceInfo with serial, firmware_minor, serial_trail_0 populated.
"""
# data is data_rsp.data = payload[5:]. The 11-byte section header occupies
# data[0..10]: [LENGTH_ECHO:1][00×4][KEY_ECHO:4][00×2].
# Actual serial payload starts at data[11].
actual = data[11:] if len(data) > 11 else data
if len(actual) < 9:
# Short payload — gracefully degrade
serial = actual.rstrip(b"\x00").decode("ascii", errors="replace")
return DeviceInfo(serial=serial, firmware_minor=0)
serial = actual[:8].rstrip(b"\x00").decode("ascii", errors="replace")
trail_0 = actual[8] if len(actual) > 8 else None
fw_minor = actual[9] if len(actual) > 9 else 0
return DeviceInfo(
serial=serial,
firmware_minor=fw_minor,
serial_trail_0=trail_0,
)
def _decode_full_config_into(data: bytes, info: DeviceInfo) -> None:
"""
Decode SUB FE (FULL_CONFIG_RESPONSE) payload into an existing DeviceInfo.
The FE response arrives as a composite S3 outer frame whose data section
contains inner DLE-framed sub-frames. Because of this nesting the §7.3
fixed offsets (0x34, 0x3C, 0x44, 0x6D) are unreliable — they assume a
clean non-nested payload starting at byte 0.
Instead we search the whole byte array for known ASCII patterns. The
strings are long enough to be unique in any reasonable payload.
Modifies info in-place.
"""
def _extract(needle: bytes, max_len: int = 32) -> Optional[str]:
"""Return the null-terminated ASCII string that starts with *needle*."""
pos = data.find(needle)
if pos < 0:
return None
end = pos
while end < len(data) and data[end] != 0 and (end - pos) < max_len:
end += 1
s = data[pos:end].decode("ascii", errors="replace").strip()
return s or None
# ── Manufacturer and model are straightforward literal matches ────────────
info.manufacturer = _extract(b"Instantel")
info.model = _extract(b"MiniMate Plus")
# ── Firmware version: "S3xx.xx" — scan for the 'S3' prefix ───────────────
for i in range(len(data) - 5):
if data[i] == ord('S') and data[i + 1] == ord('3') and chr(data[i + 2]).isdigit():
end = i
while end < len(data) and data[end] not in (0, 0x20) and (end - i) < 12:
end += 1
candidate = data[i:end].decode("ascii", errors="replace").strip()
if "." in candidate and len(candidate) >= 5:
info.firmware_version = candidate
break
# ── DSP version: numeric "xx.xx" — search for known prefixes ─────────────
for prefix in (b"10.", b"11.", b"12.", b"9.", b"8."):
pos = data.find(prefix)
if pos < 0:
continue
end = pos
while end < len(data) and data[end] not in (0, 0x20) and (end - pos) < 8:
end += 1
candidate = data[pos:end].decode("ascii", errors="replace").strip()
# Accept only strings that look like "digits.digits"
if "." in candidate and all(c in "0123456789." for c in candidate):
info.dsp_version = candidate
break
def _decode_event_count(data: bytes) -> int:
"""
Extract stored event count from SUB F7 (EVENT_INDEX_RESPONSE) payload.
Confirmed 2026-04-10 from live BE11529 event index (88 bytes):
data[10:12] uint16 BE = stored event count (confirmed: 0x0006 = 6, matches LCD)
data[3:7] uint32 BE = 0x00000001 (NOT the count — meaning TBD)
Previous implementation read uint32 at offset 3, which returned 1 regardless
of how many events were stored.
"""
if len(data) < 12:
log.warning("event index payload too short (%d bytes), assuming 0 events", len(data))
return 0
count = struct.unpack_from(">H", data, 10)[0]
# Sanity check: MiniMate Plus max storage is ~1000 events
if count > 1000:
log.warning("event count %d looks unreasonably large — clamping to 0", count)
return 0
log.debug("event_index decoded count=%d (uint16 BE at offset 10)", count)
return count
def _decode_event_header_into(data: bytes, event: Event) -> None:
"""
Decode SUB E1 (EVENT_HEADER_RESPONSE) raw data section into an Event.
The waveform key is at data[11:15] (extracted separately in
MiniMateProtocol.read_event_first). The remaining 4 bytes at
data[15:19] are not yet decoded (❓ — possibly sample rate or flags).
Date information (year/month/day) lives in the waveform record (SUB 0C),
not in the 1E response. This function is a placeholder for any future
metadata we decode from the 8-byte 1E data block.
Modifies event in-place.
"""
# Nothing confirmed yet from the 8-byte data block beyond the key at [0:4].
# Leave event.timestamp as None — it will be populated from the 0C record.
pass
def _decode_waveform_record_into(data: bytes, event: Event) -> None:
"""
Decode a 210-byte SUB F3 (FULL_WAVEFORM_RECORD) record into an Event.
The *data* argument is the raw record bytes returned by
MiniMateProtocol.read_waveform_record() — i.e. data_rsp.data[11:11+0xD2].
Extracts (all ✅ confirmed 2026-04-01 against Blastware event report):
- timestamp: 9-byte format at bytes [0:9]
- record_type: sub_code at byte[1] (0x10 = "Waveform")
- peak_values: label-based float32 at label+6 for Tran/Vert/Long/MicL
- peak_vector_sum: IEEE 754 BE float at offset 87
- project_info: "Project:", "Client:", etc. string search
Modifies event in-place.
"""
# ── Record type + format detection ────────────────────────────────────────
# `record_type` is the user-facing label ("Waveform" for any triggered
# event regardless of timestamp-header layout). `fmt` is the internal
# format code used to pick the right Timestamp parser; it stays
# internal and doesn't leak to the API / sidecar / UI.
try:
event.record_type = _extract_record_type(data)
except Exception as exc:
log.warning("waveform record type decode failed: %s", exc)
fmt = _detect_record_format(data)
# ── Timestamp ─────────────────────────────────────────────────────────────
# Three timestamp-header layouts have been observed across BE11529
# firmware S338.17 — each picks a different Timestamp parser:
# "single_shot": 9-byte [day][0x10][month][year:2][unk][h][m][s]
# "continuous": 10-byte [0x10][day][0x10][month][year:2][unk][h][m][s]
# "short": 8-byte [day][month][year:2][unk][h][m][s]
# All decoded into the same Timestamp dataclass — only the byte
# offsets differ.
if fmt == "single_shot":
try:
event.timestamp = Timestamp.from_waveform_record(data)
except Exception as exc:
log.warning("single_shot record timestamp decode failed: %s", exc)
elif fmt == "continuous":
try:
event.timestamp = Timestamp.from_continuous_record(data)
except Exception as exc:
log.warning("continuous record timestamp decode failed: %s", exc)
elif fmt == "short":
try:
event.timestamp = Timestamp.from_short_record(data)
except Exception as exc:
log.warning("short record timestamp decode failed: %s", exc)
# ── Peak values (per-channel PPV + Peak Vector Sum) ───────────────────────
try:
peak_values = _extract_peak_floats(data)
if peak_values:
event.peak_values = peak_values
except Exception as exc:
log.warning("waveform record peak decode failed: %s", exc)
# ── Project strings ───────────────────────────────────────────────────────
try:
project_info = _extract_project_strings(data)
if project_info:
event.project_info = project_info
except Exception as exc:
log.warning("waveform record project strings decode failed: %s", exc)
def _decode_a5_metadata_into(frames_data: list[S3Frame], event: Event) -> None:
"""
Search A5 (BULK_WAVEFORM_STREAM) frame data for event-time metadata strings
and populate event.project_info.
This is the authoritative source for event-time metadata — it reflects the
device setup AT THE TIME the event was recorded, not the current device
configuration. The metadata lives in a middle A5 frame (confirmed: A5[7]
of 9 large frames for the 1-2-26 capture):
Confirmed needle locations in A5[7].data (2026-04-02 from 1-2-26 capture):
b"Project:" at data[626]
b"Client:" at data[676]
b"User Name:" at data[703]
b"Seis Loc:" at data[735]
b"Extended Notes" at data[774]
All frames are concatenated for a single-pass needle search.
NOTE: 5A appears to return the compliance config from when the *monitoring
session first started*, not per-event config. This means:
- "Project:" from 5A must NOT overwrite a value already set from the 0C record,
because 0C carries the correct per-event project name.
- "Client:", "User Name:", "Seis Loc:", "Extended Notes" are NOT present in the
210-byte 0C record at all, so 5A remains the sole source for those fields.
Modifies event in-place.
"""
combined = b"".join(f.data for f in frames_data)
def _find_string_after(needle: bytes, max_len: int = 64) -> Optional[str]:
pos = combined.find(needle)
if pos < 0:
return None
value_start = pos + len(needle)
while value_start < len(combined) and combined[value_start] == 0:
value_start += 1
if value_start >= len(combined):
return None
end = value_start
while end < len(combined) and combined[end] != 0 and (end - value_start) < max_len:
end += 1
s = combined[value_start:end].decode("ascii", errors="replace").strip()
return s or None
project = _find_string_after(b"Project:")
client = _find_string_after(b"Client:")
operator = _find_string_after(b"User Name:")
location = _find_string_after(b"Seis Loc:")
notes = _find_string_after(b"Extended Notes")
if not any([project, client, operator, location, notes]):
log.debug("a5 metadata: no project strings found in %d frames (%d bytes)", len(frames_data), len(combined))
return
if event.project_info is None:
event.project_info = ProjectInfo()
pi = event.project_info
# "project" comes from 0C (per-event, set during _decode_waveform_record_into).
# 5A returns session-start compliance config — its "project" value is NOT
# per-event authoritative. Only use the 5A project as a fallback if 0C
# didn't supply one.
# client / operator / sensor_location / notes are NOT in the 0C record at all
# (confirmed from CLAUDE.md §SUB 5A), so 5A is the sole source for those.
if project and not pi.project: pi.project = project
if client: pi.client = client
if operator: pi.operator = operator
if location: pi.sensor_location = location
if notes: pi.notes = notes
log.debug(
"a5 metadata: project=%r client=%r operator=%r location=%r",
pi.project, pi.client, pi.operator, pi.sensor_location,
)
def _decode_a5_waveform(
frames_data: list[S3Frame],
event: Event,
) -> None:
"""
Decode the raw 4-channel ADC waveform from a complete set of SUB 5A
(BULK_WAVEFORM_STREAM) frame payloads and populate event.raw_samples,
event.total_samples, event.pretrig_samples, and event.rectime_seconds.
Wired up 2026-05-11 to the verified ``decode_waveform_v2`` codec (see
``minimateplus/waveform_codec.py`` and ``docs/waveform_codec_re_status.md``).
Replaces the legacy int16 LE decoder, which produced full-scale ±32K
noise on every event because the body bytes are encoded, not raw
samples.
Output convention (preserved from the legacy decoder):
``event.raw_samples`` is a dict with keys "Tran", "Vert", "Long",
"MicL" mapping to lists of **int16 ADC counts**. Multiply by
``geo_range / 32768`` for geo channels to get in/s; use
:func:`minimateplus.waveform_codec.mic_count_to_db` for mic dB(L).
``total_samples`` / ``pretrig_samples`` / ``rectime_seconds`` are set
to ``None`` so the caller backfills from compliance_config (the
authoritative source — STRT fields aren't reliable).
"""
from .waveform_codec import decode_a5_frames
event.total_samples = None
event.pretrig_samples = None
event.rectime_seconds = None
if not frames_data:
log.debug("_decode_a5_waveform: no frames provided")
return
decoded = decode_a5_frames(frames_data)
if decoded is None:
log.warning("_decode_a5_waveform: codec returned no samples")
return
event.raw_samples = decoded
log.debug(
"_decode_a5_waveform: decoded %d/%d/%d/%d samples (T/V/L/M)",
len(decoded.get("Tran", [])),
len(decoded.get("Vert", [])),
len(decoded.get("Long", [])),
len(decoded.get("MicL", [])),
)
def _decode_a5_waveform_LEGACY(
frames_data: list[S3Frame],
event: Event,
) -> None:
"""
LEGACY decoder — kept for reference only. DO NOT CALL.
This is the int16 LE decoder that produced full-scale ±32K noise
on every event. Retracted 2026-05-08; replaced 2026-05-11 with
the verified codec in :mod:`minimateplus.waveform_codec`. See
``docs/instantel_protocol_reference.md §7.6.1`` for the full history.
── Waveform format (LEGACY — WRONG) ────────────────────────────────
Claimed 4-channel interleaved signed 16-bit little-endian, 8 bytes
per sample-set:
[T_lo T_hi V_lo V_hi L_lo L_hi M_lo M_hi] × N
where T=Tran, V=Vert, L=Long, M=Mic.
The body bytes are actually a tagged delta+RLE stream — this
interpretation was wrong.
── Frame structure ──────────────────────────────────────────────────────────
A5[0] (probe response):
db[7:] = [11-byte header] [21-byte STRT record] [6-byte preamble] [waveform ...]
STRT: b'STRT' at offset 11, total 21 bytes
+8 uint16 BE: total_samples (expected full-record sample-sets)
+16 uint16 BE: pretrig_samples (pre-trigger sample count)
+18 uint8: rectime_seconds (record duration)
Preamble: 6 bytes after the STRT record (confirmed from 4-2-26 blast capture):
bytes 21-22: 0x00 0x00 (null padding)
bytes 23-26: 0xFF × 4 (sync sentinel / alignment marker)
Waveform starts at strt_pos + 27 within db[7:].
A5[1..N] (chunk responses):
db[7:] = [8-byte per-frame header] [waveform bytes ...]
Header: [ctr LE uint16, 0x00 × 6] — frame sequence counter
Waveform starts at byte 8 of db[7:].
── Cross-frame alignment ────────────────────────────────────────────────────
Frame waveform chunk sizes are NOT multiples of 8. Naive concatenation
scrambles channel assignments at frame boundaries. Fix: track the
cumulative global byte offset; at each new frame, the starting alignment
within the T,V,L,M cycle is (global_offset % 8).
Confirmed sizes from 4-2-26 (A5[0..8], skipping A5[7] metadata frame
and A5[9] terminator):
Frame 0: 934B Frame 1: 963B Frame 2: 946B Frame 3: 960B
Frame 4: 952B Frame 5: 946B Frame 6: 941B Frame 8: 992B
— none are multiples of 8.
── Modifies event in-place. ─────────────────────────────────────────────────
"""
if not frames_data:
log.debug("_decode_a5_waveform: no frames provided")
return
# ── Parse STRT record from A5[0] ────────────────────────────────────────
w0 = frames_data[0].data[7:] # frame.data[7:] for A5[0]
strt_pos = w0.find(b"STRT")
if strt_pos < 0:
log.warning("_decode_a5_waveform: STRT record not found in A5[0]")
return
# STRT record layout (21 bytes, offsets relative to b'STRT'):
# +0..3 magic b'STRT'
# +8..9 uint16 BE total_samples (full-record expected sample-set count)
# +16..17 uint16 BE pretrig_samples
# +18 uint8 rectime_seconds
strt = w0[strt_pos : strt_pos + 21]
if len(strt) < 21:
log.warning("_decode_a5_waveform: STRT record truncated (%dB)", len(strt))
return
# STRT byte layout (21 bytes; verified against M529LIY6 reference files
# and re-confirmed against live BE11529 captures, 2026-05-08):
# [0:4] b'STRT'
# [4:6] 0xff 0xfe sentinel
# [6:10] end_key 4-byte BE flash address where event ends
# [10:14] start_key 4-byte BE flash address where event starts
# [14:18] device-specific (semantics not pinned; values vary across events
# and don't hold authoritative total_samples / pretrig)
# [18] 0x46 record-type marker (NOT rectime)
# [19] device-specific
# [20] sometimes rectime, sometimes 0 — not reliable
#
# AUTHORITATIVE values must come from compliance_config (sample_rate,
# record_time) and from end_offset - start_offset arithmetic (event size).
# Earlier code claimed STRT[8:10]=total_samples and STRT[16:18]=pretrig;
# those positions actually overlap end_key low-word and dev-specific bytes
# respectively. We surface the address-derived event size so consumers
# can sanity-check chunk-loop bounds, but `total_samples` per channel must
# be derived externally (sample_rate × record_time, or computed from the
# decoded sample count below).
end_key = strt[6:10]
start_key = strt[10:14]
end_offset_in_strt = (end_key[2] << 8) | end_key[3]
start_offset_in_strt = (start_key[2] << 8) | start_key[3]
is_event_1 = (start_offset_in_strt == 0x0000)
# Don't trust STRT for these — leave them as None so the caller can
# backfill from compliance_config (the authoritative source).
event.total_samples = None
event.pretrig_samples = None
event.rectime_seconds = None
log.debug(
"_decode_a5_waveform: STRT start_key=%s end_key=%s "
"start_off=0x%04X end_off=0x%04X is_event_1=%s "
"dev-specific[14:18]=%s strt[20]=0x%02X",
start_key.hex(), end_key.hex(),
start_offset_in_strt, end_offset_in_strt, is_event_1,
strt[14:18].hex(), strt[20],
)
# ── Collect per-frame waveform bytes with global offset tracking ─────────
# global_offset is the cumulative byte count across all frames, used to
# compute the channel alignment at each frame boundary.
#
# Frame layout under the v0.14.0+ walk:
# frames_data[0] = probe response (page_addr 0x0000;
# contains STRT + post-STRT data)
# frames_data[1..2] = (event 1 only) metadata pages
# page_addr = 0x1002 / 0x1004
# frames_data[mid] = sample chunks at flash addresses
# 0x0600, 0x0800, … (page_addr in
# {0x0600..0x1FFE})
# frames_data[last] = TERM response (page_key=0x0000)
#
# We identify metadata pages by their PAGE ADDRESS at db.data[4:6] (the
# 2-byte counter the device echoes back), NOT by content scan. An earlier
# needle-based detection (b"Project:", b"Client:", etc.) was the wrong
# layer of abstraction:
# • The actual metadata pages 0x1002 / 0x1004 do NOT contain ASCII
# project strings on this firmware (S338.17 / BE11529).
# • The strings physically live at flash address 0x1600 — which falls
# inside the sample-chunk address range. Skipping that frame would
# drop a real sample chunk.
# BW handles the "samples region happens to contain string bytes" case
# by just rendering the bytes verbatim; we do the same.
_METADATA_PAGES = (b"\x10\x02", b"\x10\x04")
chunks: list[tuple[int, bytes]] = [] # (frame_idx, waveform_bytes)
global_offset = 0
for fi, db in enumerate(frames_data):
page_addr = db.data[4:6] if len(db.data) >= 6 else b""
w = db.data[7:] # frame.data[7:]
# A5[0]: probe response. Two cases:
# - Event 1 (start_offset_in_strt == 0x0000): the bytes after STRT
# are the device's *pre-event reserved area* (flash 0x0046 to
# 0x0600), NOT samples. We must skip them; samples begin at
# the first dedicated chunk frame at counter=0x0600.
# - Event N (continuation, start_offset != 0x0000): the bytes after
# the STRT record ARE the first slice of real samples for the
# event (BW's chunk loop addresses the probe as a sample chunk).
if fi == 0:
sp = w.find(b"STRT")
if sp < 0:
continue
if is_event_1:
# No usable samples in the probe — pre-event reserved bytes.
continue
# Layout: STRT(21B) + null-pad(2B) + 0xFF sentinel(4B) = 27 bytes total.
wave = w[sp + 27 :]
# Skip the dedicated metadata pages (event 1 only): page_addr 0x1002 / 0x1004.
elif page_addr in _METADATA_PAGES:
log.debug(
"_decode_a5_waveform: skipping metadata page fi=%d page_addr=%s",
fi, page_addr.hex(),
)
continue
# Sample chunk (or TERM): strip the 8-byte per-frame header.
else:
if len(w) < 8:
continue
wave = w[8:]
if len(wave) < 2:
continue
chunks.append((fi, wave))
global_offset += len(wave)
total_bytes = global_offset
n_sets = total_bytes // 8
log.debug(
"_decode_a5_waveform: %d chunks, %dB total → %d complete sample-sets",
len(chunks), total_bytes, n_sets,
)
if n_sets == 0:
log.warning("_decode_a5_waveform: no complete sample-sets found")
return
# ── Concatenate into one stream and decode ───────────────────────────────
# Rather than concatenating and then fixing up, we reconstruct the correct
# channel-aligned stream by skipping misaligned partial sample-sets at each
# frame start.
#
# At global byte offset G, the byte position within the T,V,L,M cycle is
# G % 8. When a frame starts with align = G % 8 ≠ 0, the first
# (8 - align) bytes of that frame complete a partial sample-set that
# cannot be decoded cleanly, so we skip them and start from the next full
# T-boundary.
#
# This produces a slightly smaller decoded set but preserves correct
# channel alignment throughout.
tran: list[int] = []
vert: list[int] = []
long_: list[int] = []
mic: list[int] = []
running_offset = 0
for fi, wave in chunks:
align = running_offset % 8 # byte position within T,V,L,M cycle
skip = (8 - align) % 8 # bytes to discard to reach next T start
if skip > 0 and skip < len(wave):
usable = wave[skip:]
elif align == 0:
usable = wave
else:
running_offset += len(wave)
continue # entire frame is a partial sample-set
n_usable = len(usable) // 8
for i in range(n_usable):
off = i * 8
tran.append( struct.unpack_from("<h", usable, off)[0])
vert.append( struct.unpack_from("<h", usable, off + 2)[0])
long_.append(struct.unpack_from("<h", usable, off + 4)[0])
mic.append( struct.unpack_from("<h", usable, off + 6)[0])
running_offset += len(wave)
log.debug(
"_decode_a5_waveform: decoded %d alignment-corrected sample-sets "
"(skipped %d due to frame boundary misalignment)",
len(tran), n_sets - len(tran),
)
event.raw_samples = {
"Tran": tran,
"Vert": vert,
"Long": long_,
"MicL": mic,
}
def _detect_record_format(data: bytes) -> Optional[str]:
"""
Detect which timestamp-header format a 210-byte 0C waveform record uses.
THREE formats observed on BE11529 firmware S338.17:
"single_shot" — 9-byte header:
[day] [0x10] [month] [year_BE:2] [unknown] [hour] [min] [sec]
sub_code=0x10 at byte [1]. Year at [3:5].
"continuous" — 10-byte header:
[0x10] [day] [0x10] [month] [year_BE:2] [unknown] [hour] [min] [sec]
marker 0x10 at byte [0] AND byte [2]. Year at [4:6].
"short" — 8-byte header (NEW 2026-05-01):
[day] [month] [year_BE:2] [unknown] [hour] [min] [sec]
No marker bytes. Year at [2:4].
Each format has the year (uint16 BE) at a UNIQUE byte position, so we can
disambiguate by scanning each candidate position and picking the one
where the year falls in a sane range (2015..2050).
Returns "single_shot" / "continuous" / "short" or None if no format matches.
"""
if len(data) < 8:
return None
def _sane_year(hi: int, lo: int) -> bool:
y = (hi << 8) | lo
return 2015 <= y <= 2050
# Order matters: prefer formats with stronger marker-byte evidence first.
if data[1] == 0x10 and len(data) >= 9 and _sane_year(data[3], data[4]):
return "single_shot"
if (data[0] == 0x10 and data[2] == 0x10
and len(data) >= 10 and _sane_year(data[4], data[5])):
return "continuous"
if _sane_year(data[2], data[3]):
return "short"
return None
def _extract_record_type(data: bytes) -> Optional[str]:
"""
Return a user-facing name for a waveform record. All three internal
timestamp-header layouts represent the *same* user concept — a
triggered seismic event — so they all surface as just "Waveform".
The internal format code is preserved for parsing logic (timestamp
decoder selection) but doesn't leak into the API / UI / sidecar.
Callers that need the raw layout can call `_detect_record_format`
directly.
Background: across BE11529 firmware S338.17 we've observed three
different byte layouts for the timestamp header at the start of the
0C record (8 / 9 / 10 bytes, distinguished by the position of the
BE-encoded year and the presence of `0x10` marker bytes). An older
revision of this code labelled them "Waveform" / "Waveform
(Continuous)" / "Waveform (Short)", which created the false
impression that there were three distinct event "types" the user
could configure. In reality the user only ever picks Single Shot
vs Continuous vs Histogram in the compliance config — the byte
layout is a firmware-internal detail that doesn't always correlate
with that choice.
"""
fmt = _detect_record_format(data)
if fmt in ("single_shot", "continuous", "short"):
return "Waveform"
if len(data) >= 3:
log.warning(
"_extract_record_type: unrecognized header: data[0:3]=%02X %02X %02X",
data[0], data[1], data[2],
)
return f"Unknown({data[0]:02X}.{data[1]:02X}.{data[2]:02X})"
return None
def _extract_peak_floats(data: bytes) -> Optional[PeakValues]:
"""
Locate per-channel peak particle velocity values in the 210-byte
waveform record by searching for the embedded channel label strings
("Tran", "Vert", "Long", "MicL") and reading the IEEE 754 BE float
at label_offset + 6.
The floats are NOT 4-byte aligned in the record (confirmed from
3-31-26 capture), so the previous step-4 scan missed Tran, Long, and
MicL entirely. Label-based lookup is the correct approach.
Channel labels are separated by inner-frame bytes (0x10 0x03 = DLE ETX),
which the S3FrameParser preserves as literal data. Searching for the
4-byte ASCII label strings is robust to this structure.
Returns PeakValues if at least one channel label is found, else None.
"""
# (label_bytes, field_name)
channels = (
(b"Tran", "tran"),
(b"Vert", "vert"),
(b"Long", "long_"),
(b"MicL", "micl"),
)
vals: dict[str, float] = {}
for label_bytes, field in channels:
pos = data.find(label_bytes)
if pos < 0:
continue
float_off = pos + 6
if float_off + 4 > len(data):
log.debug("peak float: label %s at %d but float runs past end", label_bytes, pos)
continue
try:
val = struct.unpack_from(">f", data, float_off)[0]
except struct.error:
continue
log.debug("peak float: %s at label+6 (%d) = %.6f", label_bytes.decode(), float_off, val)
vals[field] = val
if not vals:
return None
# ── Peak Vector Sum — label-relative offset ──────────────────────────────
# = √(Tran² + Vert² + Long²) at the sample instant of maximum combined geo
# motion, NOT the vector sum of the three per-channel peak values (which may
# occur at different times). Matches Blastware "Peak Vector Sum" exactly.
#
# PVS lives at tran_label_pos - 12 for both 0x10 and 0x03 record types.
# Confirmed from raw bytes of two events (2026-04-01 and 2026-04-03):
# 0x10: Tran at byte 98, PVS float at bytes 8689 (98 - 12 = 86) ✅
# 0x03: Tran at byte 104, PVS float at bytes 9295 (104 - 12 = 92) ✅
# Using a fixed absolute offset (87) breaks for 0x03 records because their
# timestamp header is 10 bytes instead of 9, shifting all subsequent fields.
pvs: Optional[float] = None
tran_pos = data.find(b"Tran")
if tran_pos >= 12:
try:
pvs = struct.unpack_from(">f", data, tran_pos - 12)[0]
except struct.error:
pass
return PeakValues(
tran=vals.get("tran"),
vert=vals.get("vert"),
long=vals.get("long_"),
micl=vals.get("micl"),
peak_vector_sum=pvs,
)
def _extract_project_strings(data: bytes) -> Optional[ProjectInfo]:
"""
Search the waveform record payload for known ASCII label strings
("Project:", "Client:", "User Name:", "Seis Loc:", "Extended Notes")
and extract the associated value strings that follow them.
Layout (per §7.5): each entry is [label ~16 bytes][value ~32 bytes],
null-padded. We find the label, then read the next non-null chars.
"""
def _find_string_after(needle: bytes, max_value_len: int = 64) -> Optional[str]:
pos = data.find(needle)
if pos < 0:
return None
# Skip the label (including null padding) until we find a non-null value
# The value starts at pos+len(needle), but may have a gap of null bytes
value_start = pos + len(needle)
# Skip nulls
while value_start < len(data) and data[value_start] == 0:
value_start += 1
if value_start >= len(data):
return None
# Read until null terminator or max_value_len
end = value_start
while end < len(data) and data[end] != 0 and (end - value_start) < max_value_len:
end += 1
value = data[value_start:end].decode("ascii", errors="replace").strip()
return value or None
project = _find_string_after(b"Project:")
client = _find_string_after(b"Client:")
operator = _find_string_after(b"User Name:")
location = _find_string_after(b"Seis Loc:")
notes = _find_string_after(b"Extended Notes")
if not any([project, client, operator, location, notes]):
return None
return ProjectInfo(
project=project,
client=client,
operator=operator,
sensor_location=location,
notes=notes,
)
def _encode_compliance_config(
raw: bytes,
*,
recording_mode: Optional[int] = None,
sample_rate: Optional[int] = None,
record_time: Optional[float] = None,
trigger_level_geo: Optional[float] = None,
alarm_level_geo: Optional[float] = None,
geo_range: Optional[int] = None, # 0x00=Normal 10in/s, 0x01=Sensitive 1.25in/s
histogram_interval_sec: Optional[int] = None,
project: Optional[str] = None,
client_name: Optional[str] = None,
operator: Optional[str] = None,
seis_loc: Optional[str] = None,
notes: Optional[str] = None,
) -> bytes:
"""
Patch a live 2126-byte compliance buffer (read from the device) with any
supplied field values and return the 2128-byte write payload.
Only non-None arguments are modified; everything else is round-tripped verbatim.
Numeric field locations (all anchor-relative or label-relative — immune to
DLE-jitter shifts):
Anchor: b'\\xbe\\x80\\x00\\x00\\x00\\x00' (confirmed stable, both BE11529 and BE18189)
recording_mode → uint8 at anchor_pos - 8 (BOTH read and write)
Values: 0x00=Single Shot, 0x01=Continuous, 0x03=Histogram, 0x04=Histogram+Continuous
NOTE: The byte at anchor_pos - 7 is always 0x10 (a DLE marker regenerated by
device firmware in every E5 response). It must NOT be overwritten during
write — doing so causes anchor drift (+1 per write cycle).
CORRECTION 2026-04-21: previous doc stated anchor-7 for write; empirically
confirmed wrong — writing to anchor-7 shifts the anchor by 1 on every cycle.
sample_rate → uint16 BE at anchor_pos - 6
histogram_interval_sec → uint16 BE at anchor_pos - 4 (seconds; mode-gated to Histogram/Histogram+Continuous)
Valid values: 2, 5, 15, 60, 300, 900 (= 2s, 5s, 15s, 1m, 5m, 15m)
record_time → float32 BE at anchor_pos + 6
Channel block (anchored on b"Tran" with unit-string guard):
geo_range → uint8 at tran_pos + 33 (confirmed 2026-04-20)
0x00 = Normal 10.000 in/s, 0x01 = Sensitive 1.250 in/s
Written to Tran, Vert, Long channel blocks (all three).
adc_scale_factor → float32 BE at tran_pos + 28 (= 6.206053; do NOT write)
trigger_level_geo → float32 BE at tran_pos + 34
"in.\\x00" → unit string at tran_pos + 38 (layout guard)
alarm_level_geo → float32 BE at tran_pos + 42
"/s\\x00\\x00" → unit string at tran_pos + 46 (layout guard)
NOTE: tran_pos+28 (float32 = 6.206053) is the ADC-to-velocity scale factor
(= 1/sensitivity, (in/s)/V — Interface Handbook §4.5: 1.61133 V × 6.206053 = 10.000 in/s).
This is a hardware/firmware constant common to all MiniMate Plus S3 units.
It must NOT be written — do not add it back as a parameter.
String field locations (64-byte slots, label+22 format):
b"Project:" → value at label_pos + 22, max 41 chars + null
b"Client:" → value at label_pos + 22, max 41 chars + null
b"User Name:" → value at label_pos + 22, max 41 chars + null
b"Seis Loc:" → value at label_pos + 22, max 41 chars + null
b"Extended Notes"→ value at label_pos + 22, max 41 chars + null
Returns:
2128-byte write payload: patched 2126-byte buffer + 2-byte footer \\x00\\x00.
(Checksum formula for the footer is unknown; \\x00\\x00 confirmed accepted
by the device in POC test 2026-04-07.)
Raises:
ValueError: if raw is shorter than the minimum needed for the 3-chunk write.
"""
# Total size is nominally ~2126 bytes but varies by ±1-2 bytes depending on
# DLE jitter in the E5 read response (0x10 bytes in the config data cause
# 1-byte expansions per occurrence during DLE stuffing/unstuffing). The
# anchor-based field access and the chunk splitter (fixed chunk1=1027,
# chunk2=1055, chunk3=remainder) both handle variable length correctly.
# Only enforce a minimum — must have at least chunk1+chunk2 bytes of content.
_MIN_COMPLIANCE_LEN = 1027 + 1055 # = 2082
if len(raw) < _MIN_COMPLIANCE_LEN:
raise ValueError(
f"_encode_compliance_config: compliance buffer too short "
f"({len(raw)} bytes, need at least {_MIN_COMPLIANCE_LEN})"
)
if len(raw) not in range(2124, 2132):
log.warning(
"_encode_compliance_config: unusual compliance buffer length %d "
"(expected ~2126); proceeding with anchor-based access",
len(raw),
)
buf = bytearray(raw)
# ── Numeric: recording_mode + sample_rate + record_time (anchor-relative) ──
_ANC = b'\xbe\x80\x00\x00\x00\x00'
_anc = buf.find(_ANC, 0, 150)
# Log anchor position every time so we can detect unexpected shifts due to
# DLE jitter or firmware differences. Expected position is ~15.
if _anc < 0:
log.warning(
"_encode_compliance_config: anchor NOT FOUND in cfg[0:150] "
"(buf len=%d) — all anchor-relative writes will be skipped",
len(buf),
)
else:
log.info(
"_encode_compliance_config: anchor at cfg[%d] buf_len=%d "
"(recording_mode@%d DLE_marker@%d sample_rate@%d:%d "
"histogram_interval@%d:%d record_time@%d:%d)",
_anc, len(buf),
_anc - 8,
_anc - 7,
_anc - 6, _anc - 4,
_anc - 4, _anc - 2,
_anc + 6, _anc + 10,
)
if recording_mode is not None:
if _anc < 8:
log.warning("_encode_compliance_config: anchor not found — cannot write recording_mode")
else:
# Write to anchor-8, same physical position as the E5 read format.
# The byte at anchor-7 is a DLE marker (0x10) that the device firmware
# regenerates in every E5 response — it must NOT be overwritten.
# Writing to anchor-7 causes the device to add an extra byte on the
# next read-back, drifting the anchor by +1 on every write cycle.
# (CLAUDE.md "anchor-7 write" was incorrect — confirmed 2026-04-21)
buf[_anc - 8] = recording_mode & 0xFF
log.debug("_encode_compliance_config: recording_mode=0x%02X -> offset %d",
recording_mode, _anc - 8)
if sample_rate is not None:
if _anc < 6:
log.warning("_encode_compliance_config: anchor not found — cannot write sample_rate")
else:
struct.pack_into(">H", buf, _anc - 6, sample_rate)
log.debug("_encode_compliance_config: sample_rate=%d -> offset %d", sample_rate, _anc - 6)
if histogram_interval_sec is not None:
if _anc < 4:
log.warning("_encode_compliance_config: anchor not found — cannot write histogram_interval")
else:
struct.pack_into(">H", buf, _anc - 4, histogram_interval_sec)
log.debug("_encode_compliance_config: histogram_interval=%ds -> offset %d",
histogram_interval_sec, _anc - 4)
if record_time is not None:
if _anc < 0 or _anc + 10 > len(buf):
log.warning("_encode_compliance_config: anchor not found — cannot write record_time")
else:
struct.pack_into(">f", buf, _anc + 6, record_time)
log.debug("_encode_compliance_config: record_time=%.3f -> offset %d", record_time, _anc + 6)
# ── Numeric: channel block (Tran label + unit-string guard) ───────────────
# NOTE: tran_pos+24 (write format) or tran_pos+28 (E5 read format) is the
# ADC-to-velocity scale factor (6.206053, hardware constant — never written).
# geo_range is written to ALL THREE geo channel blocks (Tran, Vert, Long),
# confirmed from 4-20-26 captures showing the byte at label+29 in each block.
_needs_channel = any(v is not None for v in (trigger_level_geo, alarm_level_geo, geo_range))
if _needs_channel:
_tran = buf.find(b"Tran", 44)
_valid = (
_tran >= 0
and buf[_tran + 4 : _tran + 5] != b"2"
and _tran + 50 <= len(buf)
and buf[_tran + 38 : _tran + 42] == b"in.\x00"
and buf[_tran + 46 : _tran + 50] == b"/s\x00\x00"
)
if not _valid:
log.warning(
"_encode_compliance_config: 'Tran' channel block not found or unit "
"guard failed — trigger/alarm/geo_range will not be written"
)
else:
if trigger_level_geo is not None:
struct.pack_into(">f", buf, _tran + 34, trigger_level_geo)
log.debug("_encode_compliance_config: trigger_level_geo=%.4f -> offset %d", trigger_level_geo, _tran + 34)
if alarm_level_geo is not None:
struct.pack_into(">f", buf, _tran + 42, alarm_level_geo)
log.debug("_encode_compliance_config: alarm_level_geo=%.4f -> offset %d", alarm_level_geo, _tran + 42)
if geo_range is not None:
# Write geo_range to all three geo channel blocks (Tran, Vert, Long).
# Field at label+33 in the E5-format compliance bytes (same in read and write
# since the 2126-byte payload is round-tripped verbatim).
# 0x00 = Normal 10.000 in/s, 0x01 = Sensitive 1.250 in/s.
for _ch_label in (b"Tran", b"Vert", b"Long"):
_ch = buf.find(_ch_label, 44)
if _ch >= 0 and buf[_ch + 4 : _ch + 5] != b"2" and _ch + 34 <= len(buf):
buf[_ch + 33] = geo_range & 0xFF
log.debug(
"_encode_compliance_config: geo_range=0x%02X -> %s+33 offset %d",
geo_range, _ch_label.decode(), _ch + 33,
)
# ── ASCII strings (64-byte slot, value at label_pos+22) ───────────────────
def _set_string(label: bytes, value: Optional[str]) -> None:
if value is None:
return
idx = buf.find(label)
if idx < 0:
log.warning("_encode_compliance_config: label %r not found", label)
return
val_bytes = value.encode("ascii", errors="replace")[:_COMPLIANCE_VALUE_MAX - 1]
padded = val_bytes + b"\x00" * (_COMPLIANCE_VALUE_MAX - len(val_bytes))
buf[idx + _COMPLIANCE_VALUE_OFFSET : idx + _COMPLIANCE_SLOT_SIZE] = padded
log.debug("_encode_compliance_config: %r -> %r", label, value)
_set_string(b"Project:", project)
_set_string(b"Client:", client_name)
_set_string(b"User Name:", operator)
_set_string(b"Seis Loc:", seis_loc)
_set_string(b"Extended Notes", notes)
# 2-byte footer — checksum formula unknown; \x00\x00 confirmed accepted by device
return bytes(buf) + b"\x00\x00"
def _decode_compliance_config_into(data: bytes, info: DeviceInfo) -> None:
"""
Decode a 2090-byte SUB 1A (COMPLIANCE_CONFIG) response into a ComplianceConfig.
The *data* argument is the raw bytes returned by read_compliance_config()
(frames B+C+D concatenated, echo headers stripped).
Confirmed field locations (BE11529 with 3-step read, duplicate detection):
- cfg[89] = setup_name: first long ASCII string in cfg[40:250] ✅
- ANCHOR = b'\\xbe\\x80\\x00\\x00\\x00\\x00' in cfg[0:150] ✅ (revised 2026-04-07)
- anchor - 6 = sample_rate uint16_BE (1024 normal / 2048 fast / 4096 faster)
- anchor + 6 = record_time float32_BE
- "Project:" needle → project string
- "Client:" needle → client string
- "User Name:" needle → operator string
- "Seis Loc:" needle → sensor_location string
- "Extended Notes" needle → notes string
Anchor approach is required because a DLE byte in the sample_rate field
(4096 = 0x1000 → stored as 10 10 00 in raw S3 frame → unstuffed to 10 00,
1 byte shorter than 04 00 or 08 00) causes frame C to be 1 byte shorter
for "faster" mode, shifting all subsequent offsets by 1. The 10-byte
anchor is stable across all modes.
Channel block layout (✅ confirmed 2026-04-02 from 3-11-26 E5 frame 78
and 1-2-26 A5 frame 77):
"Tran" label at tran_pos
tran_pos + 28 = scale_factor float32_BE (= 1/sensitivity = 6.206053 (in/s)/V — ADC scale; NOT a UI setting)
tran_pos + 34 = trigger_level float32_BE (e.g. 0.600000 in/s)
tran_pos + 38 = "in.\\x00" (unit string anchor)
tran_pos + 42 = alarm_level float32_BE (e.g. 1.250000 in/s)
tran_pos + 46 = "/s\\x00\\x00" (unit string anchor)
Modifies info.compliance_config in-place.
"""
if not data or len(data) < 40:
log.warning("compliance config payload too short (%d bytes)", len(data))
return
config = ComplianceConfig(raw=data)
# ── Setup name ────────────────────────────────────────────────────────────
# The setup_name IS the string itself — it is NOT a label followed by a value.
# It appears as the first long (>=8 char) ASCII string in cfg[40:250].
# The preceding bytes vary by device (cfg[88]=0x01 on BE11529); the string
# itself is null-terminated.
try:
setup_name = _find_first_string(data, start=40, end=250, min_len=8)
config.setup_name = setup_name
if setup_name:
log.debug("compliance_config: setup_name = %r", setup_name)
except Exception as exc:
log.warning("compliance_config: setup_name extraction failed: %s", exc)
# ── recording_mode / sample_rate / histogram_interval / record_time ─────────
# 6-byte stable anchor: b'\xbe\x80\x00\x00\x00\x00' — confirmed across BE11529
# and BE18189. The 4 bytes immediately before the anchor are NOT constant:
# bytes -4:-2 are the histogram_interval (uint16 BE, seconds), and bytes -2:0
# are zero padding. The old "10-byte anchor" (\x01\x2c\x00\x00 prefix) was
# only constant when the histogram interval happened to be 5 min (0x012C = 300).
#
# E5 read format layout relative to 6-byte anchor:
# _anchor - 8 : recording_mode (uint8)
# _anchor - 7 : 0x10 (extra byte E5 read only; absent in SUB 71 write)
# _anchor - 6 : sample_rate_hi (uint16 BE, MSB)
# _anchor - 5 : sample_rate_lo (uint16 BE, LSB)
# _anchor - 4 : histogram_interval_hi (uint16 BE, seconds, MSB)
# _anchor - 3 : histogram_interval_lo (uint16 BE, seconds, LSB)
# _anchor - 2 : 0x00 (padding)
# _anchor - 1 : 0x00 (padding)
# _anchor : \xbe\x80\x00\x00\x00\x00 (6-byte anchor)
# _anchor + 6 : record_time (float32 BE)
_ANCHOR = b'\xbe\x80\x00\x00\x00\x00'
_anchor = data.find(_ANCHOR, 0, 150)
# Log anchor position on every decode so we can compare read vs write and
# catch unexpected shifts from DLE jitter or firmware differences.
# Expected position is ~15 for the E5 read payload (anchor - 8 = recording_mode).
if _anchor < 0:
log.warning(
"_decode_compliance_config_into: anchor NOT FOUND in data[0:150] (len=%d)",
len(data),
)
else:
log.info(
"_decode_compliance_config_into: anchor at data[%d] data_len=%d "
"(expected ~15; recording_mode@%d sample_rate@%d:%d "
"histogram_interval@%d:%d record_time@%d:%d)",
_anchor, len(data),
_anchor - 8,
_anchor - 6, _anchor - 4,
_anchor - 4, _anchor - 2,
_anchor + 6, _anchor + 10,
)
if _anchor >= 8 and _anchor + 10 <= len(data):
try:
config.recording_mode = data[_anchor - 8]
log.debug(
"compliance_config: recording_mode = 0x%02X (anchor@%d)", config.recording_mode, _anchor
)
except Exception as exc:
log.warning("compliance_config: recording_mode extraction failed: %s", exc)
try:
config.sample_rate = struct.unpack_from(">H", data, _anchor - 6)[0]
log.debug(
"compliance_config: sample_rate = %d Sa/s (anchor@%d)", config.sample_rate, _anchor
)
except Exception as exc:
log.warning("compliance_config: sample_rate extraction failed: %s", exc)
try:
config.histogram_interval_sec = struct.unpack_from(">H", data, _anchor - 4)[0]
log.debug(
"compliance_config: histogram_interval = %d s (anchor@%d)",
config.histogram_interval_sec, _anchor
)
except Exception as exc:
log.warning("compliance_config: histogram_interval extraction failed: %s", exc)
try:
config.record_time = struct.unpack_from(">f", data, _anchor + 6)[0]
log.debug(
"compliance_config: record_time = %.3f s (anchor@%d)", config.record_time, _anchor
)
except Exception as exc:
log.warning("compliance_config: record_time extraction failed: %s", exc)
elif _anchor >= 6 and _anchor + 10 <= len(data):
# Fallback: anchor found but not enough bytes before it for recording_mode
log.warning("compliance_config: anchor too close to start (anchor@%d) — skipping recording_mode", _anchor)
try:
config.sample_rate = struct.unpack_from(">H", data, _anchor - 6)[0]
except Exception:
pass
try:
config.record_time = struct.unpack_from(">f", data, _anchor + 6)[0]
except Exception:
pass
else:
log.warning(
"compliance_config: anchor %s not found in cfg[0:150] (len=%d) "
"— sample_rate, record_time and recording_mode will be None",
_ANCHOR.hex(), len(data),
)
# ── Project strings ───────────────────────────────────────────────────────
try:
def _find_string_after(needle: bytes, max_len: int = 64) -> Optional[str]:
pos = data.find(needle)
if pos < 0:
return None
value_start = pos + len(needle)
while value_start < len(data) and data[value_start] == 0:
value_start += 1
if value_start >= len(data):
return None
end = value_start
while end < len(data) and data[end] != 0 and (end - value_start) < max_len:
end += 1
s = data[value_start:end].decode("ascii", errors="replace").strip()
return s or None
config.project = _find_string_after(b"Project:")
config.client = _find_string_after(b"Client:")
config.operator = _find_string_after(b"User Name:")
config.sensor_location = _find_string_after(b"Seis Loc:")
config.notes = _find_string_after(b"Extended Notes")
if config.project:
log.debug("compliance_config: project = %s", config.project)
if config.client:
log.debug("compliance_config: client = %s", config.client)
except Exception as exc:
log.warning("compliance_config: project string extraction failed: %s", exc)
# ── Channel block: trigger_level_geo, alarm_level_geo, geo_range, geo_adc_scale ──
# The channel block is only present in the full cfg (frame D delivered,
# ~2126 bytes). Layout confirmed 2026-04-02 from both E5 frame 78 of the
# 3-11-26 compliance-config capture and A5 frame 77 of the 1-2-26 event
# download capture. Cross-checked 2026-04-17 across both BE11529 and BE18189.
#
# "Tran" label at tran_pos (+0 to +3)
# adc_scale float32_BE at tran_pos + 28 (= 1/sensitivity = 6.206053 (in/s)/V; hardware constant — do NOT write)
# geo_range uint8 at tran_pos + 33 CONFIRMED 2026-04-20
# 0x00 = Normal 10.000 in/s, 0x01 = Sensitive 1.250 in/s
# Same offset in E5 read and SUB 71 write (bytes are round-tripped verbatim).
# NOTE: tran_pos+20 reads 0x01 on ALL captures — constant flag, NOT range field.
# trigger float32_BE at tran_pos + 34 (e.g. 0.600000 in/s) ✅
# "in.\x00" unit string at tran_pos + 38 ✅ confirmed (layout guard)
# alarm float32_BE at tran_pos + 42 (e.g. 1.250000 in/s) ✅
# "/s\x00\x00" unit string at tran_pos + 46 ✅ confirmed (layout guard)
#
# Unit strings serve as layout anchors — if they match, the float offsets
# are reliable. Skip "Tran2" (a later repeated label) via the +4 check.
try:
tran_pos = data.find(b"Tran", 44)
if (
tran_pos >= 0
and data[tran_pos + 4 : tran_pos + 5] != b"2" # not "Tran2"
and tran_pos + 50 <= len(data)
and data[tran_pos + 38 : tran_pos + 42] == b"in.\x00"
and data[tran_pos + 46 : tran_pos + 50] == b"/s\x00\x00"
):
config.geo_range = data[tran_pos + 33] # CONFIRMED 2026-04-20: 0x00=Normal 10in/s, 0x01=Sensitive 1.25in/s
config.geo_adc_scale = struct.unpack_from(">f", data, tran_pos + 28)[0] # hw scale factor (in/s)/V — do NOT write
config.trigger_level_geo = struct.unpack_from(">f", data, tran_pos + 34)[0]
config.alarm_level_geo = struct.unpack_from(">f", data, tran_pos + 42)[0]
log.debug(
"compliance_config: trigger=%.4f alarm=%.4f geo_range=0x%02X geo_adc_scale=%.6f",
config.trigger_level_geo, config.alarm_level_geo,
config.geo_range, config.geo_adc_scale,
)
elif tran_pos >= 0:
log.warning(
"compliance_config: 'Tran' at %d — unit string check failed: "
"+38..+42=%s (want 696e2e00) +46..+50=%s (want 2f730000)",
tran_pos,
data[tran_pos + 38 : tran_pos + 42].hex() if tran_pos + 42 <= len(data) else "??",
data[tran_pos + 46 : tran_pos + 50].hex() if tran_pos + 50 <= len(data) else "??",
)
else:
log.debug("compliance_config: channel block not present in cfg (len=%d)", len(data))
except Exception as exc:
log.warning("compliance_config: channel block extraction failed: %s", exc)
info.compliance_config = config
def _find_first_string(data: bytes, start: int, end: int, min_len: int) -> Optional[str]:
"""
Return the first null-terminated printable ASCII string of length >= min_len
found in data[start:end].
"""
i = start
end = min(end, len(data))
while i < end:
if 0x20 <= data[i] < 0x7F:
j = i
while j < len(data) and 0x20 <= data[j] < 0x7F:
j += 1
if j - i >= min_len:
return data[i:j].decode("ascii", errors="replace").strip()
i = j + 1
else:
i += 1
return None
def _decode_0a_partial_header(raw_data: bytes, index: int, key4: bytes) -> Optional[MonitorLogEntry]:
"""
Decode a SUB 0x0A response for a partial (monitor log) record into a
MonitorLogEntry.
Called when read_waveform_header() returns rec_len < 0x40 (i.e. 0x2C = 44).
raw_data is the complete data_rsp.data from the protocol layer.
Layout of raw_data:
[0] = 0x2C (partial record type)
[1:5] = 0x00 × 4
[5:9] = event key (big-endian)
[9:11] = 0x00 × 2
[11:] = timestamp_start + timestamp_stop + sep + serial + geo_string
Timestamp format detection (auto):
raw_data[11] == 0x10 → 10-byte sub_code=0x03 continuous format
raw_data[12] == 0x10 → 9-byte sub_code=0x10 single-shot format
Both timestamps use the same format (detected from the first byte).
A 1-byte gap can appear between ts1 and ts2 for certain timestamps
(observed empirically when both timestamps share the same minute:second).
The parser handles this by trying ts2 immediately after ts1, then with
a 1-byte skip if that fails.
Returns:
MonitorLogEntry if decoding succeeds, None on error.
"""
if len(raw_data) < 20 or raw_data[0] != 0x2C:
return None
key_hex = key4.hex()
def try_ts9(b: bytes):
"""9-byte sub_code=0x10 format. Returns datetime or None."""
if len(b) < 9 or b[1] != 0x10:
return None
day = b[0]; month = b[2]; year = (b[3] << 8) | b[4]
hr = b[6]; mn = b[7]; sec = b[8]
if not (1 <= day <= 31 and 1 <= month <= 12 and 2000 <= year <= 2050
and hr <= 23 and mn <= 59 and sec <= 59):
return None
try:
return datetime.datetime(year, month, day, hr, mn, sec)
except ValueError:
return None
def try_ts10(b: bytes):
"""10-byte sub_code=0x03 format. Returns datetime or None."""
if len(b) < 10 or b[0] != 0x10 or b[2] != 0x10:
return None
day = b[1]; month = b[3]; year = (b[4] << 8) | b[5]
hr = b[7]; mn = b[8]; sec = b[9]
if not (1 <= day <= 31 and 1 <= month <= 12 and 2000 <= year <= 2050
and hr <= 23 and mn <= 59 and sec <= 59):
return None
try:
return datetime.datetime(year, month, day, hr, mn, sec)
except ValueError:
return None
ts_offset = 11
if len(raw_data) <= ts_offset:
return MonitorLogEntry(index=index, key=key_hex, raw_header=raw_data)
# Detect timestamp format.
if raw_data[ts_offset] == 0x10:
ts_size = 10
try_ts = try_ts10
else:
ts_size = 9
try_ts = try_ts9
# Parse ts1.
ts1 = try_ts(raw_data[ts_offset:ts_offset + ts_size])
ts1_end = ts_offset + ts_size
# Parse ts2 immediately after ts1, then with 1-byte skip if needed.
ts2 = try_ts(raw_data[ts1_end:ts1_end + ts_size])
if ts2 is None:
ts2 = try_ts(raw_data[ts1_end + 1:ts1_end + 1 + ts_size])
# Extract serial and geo threshold from "BE11529\0" and "Geo: X.XXX in/s\0".
serial: Optional[str] = None
geo_ips: Optional[float] = None
serial_pos = raw_data.find(b"BE")
if serial_pos >= 0:
# Read null-terminated serial starting at serial_pos.
null_pos = raw_data.find(b"\x00", serial_pos)
if null_pos > serial_pos:
serial = raw_data[serial_pos:null_pos].decode("ascii", errors="replace")
# Geo string follows the null byte.
geo_start = (null_pos + 1) if null_pos > serial_pos else serial_pos + 7
geo_bytes = raw_data[geo_start:]
# "Geo: X.XXX in/s\0" — extract float after "Geo: ".
geo_str_pos = geo_bytes.find(b"Geo: ")
if geo_str_pos >= 0:
geo_val_bytes = geo_bytes[geo_str_pos + 5:] # after "Geo: "
geo_val_end = geo_val_bytes.find(b" ") # before " in/s"
if geo_val_end > 0:
try:
geo_ips = float(geo_val_bytes[:geo_val_end].decode("ascii"))
except ValueError:
pass
return MonitorLogEntry(
index=index,
key=key_hex,
start_time=ts1,
stop_time=ts2,
serial=serial,
geo_threshold_ips=geo_ips,
raw_header=raw_data,
)
def _decode_monitor_status(data: bytes) -> MonitorStatus:
"""
Decode SUB 0x1C response payload into a MonitorStatus object.
data is the raw S3 frame .data attribute (includes the 11-byte section
header, so field offsets below are relative to data[11]).
NOTE: frame.data has the checksum byte already stripped by S3FrameParser
(_finalise returns raw_payload[5:] where raw_payload = body[:-1]).
There is NO trailing checksum byte in section.
Monitoring flag (confirmed 4-8-26/2ndtry, byte diff of all 144 data frames):
section[1] == 0x00 → idle
section[1] == 0x10 → monitoring
The payload length varies (4649 bytes) — IDLE is 46-47, MONITORING is 48-49.
The battery/memory block is always the last 10 bytes of section (no checksum):
section[-10:-8] battery × 100 uint16 BE (0x02A8 = 6.80 V)
section[-8 :-4] memory_total uint32 BE bytes
section[-4:] memory_free uint32 BE bytes
Values confirmed from 4-8-26/2ndtry capture (BE11529):
battery 0x02A8 = 680 → 6.80 V
mem_total 0x000EFFF2 = 983026 bytes ≈ 960 KB
mem_free 0x000E9E52 = 958034 bytes ≈ 935 KB
"""
# The data section starts at offset 11 (after the S3 section header).
section = data[11:] if len(data) > 11 else data
log.debug(
"_decode_monitor_status: total data=%d bytes section=%d bytes hex=%s",
len(data), len(section), section.hex(),
)
# Monitoring flag: section[1] == 0x10.
# Confirmed from byte diff of all 144 0xE3 data frames in 4-8-26/2ndtry capture:
# section[1] = 0x00 in all IDLE frames, 0x10 in all MONITORING frames.
# (section[6] also changes but has non-binary values 0xea/0x07 — device-specific.)
is_monitoring = len(section) > 1 and section[1] == 0x10
battery_v = None
memory_total = None
memory_free = None
# Battery and memory at relative-from-end offsets.
# Payload length varies (4649 bytes) but the battery/memory block is always
# the last 10 bytes. No checksum byte — it was stripped by S3FrameParser.
#
# section[-10:-8] battery × 100 uint16 BE 0x02A8 = 6.80 V
# section[-8 :-4] memory_total uint32 BE ≈ 960 KB on BE11529
# section[-4:] memory_free uint32 BE decreases as events fill
#
# Confirmed stable across IDLE (46b), MONITORING (48-49b) variants.
if len(section) >= 10:
batt_raw = struct.unpack(">H", section[-10:-8])[0]
battery_v = batt_raw / 100.0
memory_total = struct.unpack(">I", section[-8:-4])[0]
memory_free = struct.unpack(">I", section[-4:])[0]
return MonitorStatus(
is_monitoring=is_monitoring,
battery_v=battery_v,
memory_total=memory_total,
memory_free=memory_free,
)
def _decode_call_home_config(raw: bytes) -> CallHomeConfig:
"""
Decode the raw 125-byte call home config payload into a CallHomeConfig.
*raw* is data[11:] from the SUB 0xD3 data response frame.
Field offsets (confirmed from 4-20-26 captures, all 11 BW+S3 pairs):
[5] auto_call_home_enabled (0x00=off, 0x01=on)
[6:46] dial_string 40-byte null-padded ASCII
[87] after_event_recorded (0x01=on, 0x00=off)
[91] at_specified_times (0x01=on, 0x00=off)
[93] time1_enabled (0x01=on, 0x00=off)
[95] time2_enabled (0x01=on, 0x00=off)
[101] time1_hour uint8 decimal 0-23
[102] time1_min uint8 decimal 0-59
[105] time2_hour uint8 decimal 0-23
[106] time2_min uint8 decimal 0-59
[117:119] 10 03 = DLE-escaped num_retries=3 (logical value = 0x03)
[120] time_between_retries_sec (shifted +1 from logical by DLE prefix)
[122] wait_for_connection_sec
[124] warm_up_time_sec
The DLE-escaped ETX at raw[117:119] = b'\\x10\\x03' means the logical value
0x03 (3 retries) is stored there. The S3FrameParser keeps both bytes verbatim.
Subsequent fields are at logical_offset + 1 in the raw byte array.
"""
cfg = CallHomeConfig(raw=raw)
if len(raw) < 10:
return cfg
# Simple boolean and string fields — direct reads, no DLE complications
if len(raw) > 5:
cfg.auto_call_home_enabled = bool(raw[5])
if len(raw) >= 46:
ds = raw[6:46]
cfg.dial_string = ds.split(b"\x00", 1)[0].decode("ascii", errors="replace") or None
if len(raw) > 87:
cfg.after_event_recorded = bool(raw[87])
if len(raw) > 91:
cfg.at_specified_times = bool(raw[91])
if len(raw) > 93:
cfg.time1_enabled = bool(raw[93])
if len(raw) > 95:
cfg.time2_enabled = bool(raw[95])
if len(raw) > 102:
cfg.time1_hour = raw[101]
cfg.time1_min = raw[102]
if len(raw) > 106:
cfg.time2_hour = raw[105]
cfg.time2_min = raw[106]
# num_retries: raw[117]=0x10 (DLE prefix), raw[118]=0x03 (value)
# Subsequent fields shift by +1 from logical positions.
if len(raw) > 118 and raw[117] == 0x10:
cfg.num_retries = raw[118] # 0x03 = 3
if len(raw) > 120:
cfg.time_between_retries_sec = raw[120] # logical 119, shifted to 120
if len(raw) > 122:
cfg.wait_for_connection_sec = raw[122] # logical 121, shifted to 122
if len(raw) > 124:
cfg.warm_up_time_sec = raw[124] # logical 123, shifted to 124
elif len(raw) > 117:
# Fallback: no DLE prefix (num_retries is not 0x03)
cfg.num_retries = raw[117]
if len(raw) > 119:
cfg.time_between_retries_sec = raw[119]
if len(raw) > 121:
cfg.wait_for_connection_sec = raw[121]
if len(raw) > 123:
cfg.warm_up_time_sec = raw[123]
log.debug(
"_decode_call_home_config: enabled=%s dial=%r after_event=%s at_times=%s "
"t1=%s %02d:%02d t2=%s %02d:%02d retries=%s gap=%s wait=%s warmup=%s",
cfg.auto_call_home_enabled, cfg.dial_string,
cfg.after_event_recorded, cfg.at_specified_times,
cfg.time1_enabled, cfg.time1_hour or 0, cfg.time1_min or 0,
cfg.time2_enabled, cfg.time2_hour or 0, cfg.time2_min or 0,
cfg.num_retries, cfg.time_between_retries_sec,
cfg.wait_for_connection_sec, cfg.warm_up_time_sec,
)
return cfg
def _encode_call_home_config(
raw: bytes,
*,
auto_call_home_enabled: Optional[bool] = None,
after_event_recorded: Optional[bool] = None,
at_specified_times: Optional[bool] = None,
time1_enabled: Optional[bool] = None,
time1_hour: Optional[int] = None,
time1_min: Optional[int] = None,
time2_enabled: Optional[bool] = None,
time2_hour: Optional[int] = None,
time2_min: Optional[int] = None,
) -> bytes:
"""
Patch specific fields in the 125-byte raw call home payload and return
the 127-byte write payload (raw + b'\\x00\\x00' footer).
Only non-None arguments are modified. All other bytes including the
DLE-escaped \\x10\\x03 at [117:119] are preserved verbatim for round-trip.
The write payload footer (2 trailing zeros) matches Blastware's confirmed
write frame format from the 4-20-26 captures.
CAUTION: hour and minute values must not equal 0x03 (3) — such values would
require DLE-escaping that this encoder does not implement. Values 0x03 in
hour/minute slots are rejected with ValueError.
"""
if len(raw) < 107:
raise ValueError(
f"call home raw payload too short: {len(raw)} bytes (need ≥107)"
)
buf = bytearray(raw) # 125 bytes
def _set_bool(offset: int, value: Optional[bool]) -> None:
if value is not None:
buf[offset] = 0x01 if value else 0x00
def _set_uint8(offset: int, value: Optional[int], name: str) -> None:
if value is None:
return
if value == 0x03:
raise ValueError(
f"{name}={value} (0x03) requires DLE escaping — "
"not supported by this encoder; avoid using 3 for hour/minute fields"
)
buf[offset] = value & 0xFF
_set_bool(5, auto_call_home_enabled)
_set_bool(87, after_event_recorded)
_set_bool(91, at_specified_times)
_set_bool(93, time1_enabled)
_set_bool(95, time2_enabled)
_set_uint8(101, time1_hour, "time1_hour")
_set_uint8(102, time1_min, "time1_min")
_set_uint8(105, time2_hour, "time2_hour")
_set_uint8(106, time2_min, "time2_min")
# num_retries, time_between_retries_sec, wait_for_connection_sec, warm_up_time_sec
# are not writable via this encoder — they're preserved verbatim including the
# DLE-escaped 0x03 at [117:119].
log.debug(
"_encode_call_home_config: patched fields: "
"enabled=%s after_event=%s at_times=%s "
"t1=%s %s:%s t2=%s %s:%s",
auto_call_home_enabled, after_event_recorded, at_specified_times,
time1_enabled, time1_hour, time1_min,
time2_enabled, time2_hour, time2_min,
)
return bytes(buf) + b"\x00\x00" # append 2-byte footer (confirmed BW pattern)