Files
seismo-relay/minimateplus/client.py

1138 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
client.py — MiniMateClient: the top-level public API for the library.
Combines transport, protocol, and model decoding into a single easy-to-use
class. This is the only layer that the SFM server (sfm/server.py) imports
directly.
Design: stateless per-call (connect → do work → disconnect).
The client does not hold an open connection between calls. This keeps the
first implementation simple and matches Blastware's observed behaviour.
Persistent connections can be added later without changing the public API.
Example (serial):
from minimateplus import MiniMateClient
with MiniMateClient("COM5") as device:
info = device.connect() # POLL handshake + identity read
events = device.get_events() # download all events
Example (TCP / modem):
from minimateplus import MiniMateClient
from minimateplus.transport import TcpTransport
transport = TcpTransport("203.0.113.5", port=12345)
with MiniMateClient(transport=transport) as device:
info = device.connect()
"""
from __future__ import annotations
import logging
import struct
from typing import Optional
from .framing import S3Frame
from .models import (
ComplianceConfig,
DeviceInfo,
Event,
PeakValues,
ProjectInfo,
Timestamp,
)
from .protocol import MiniMateProtocol, ProtocolError
from .protocol import (
SUB_SERIAL_NUMBER,
SUB_FULL_CONFIG,
)
from .transport import SerialTransport, BaseTransport
log = logging.getLogger(__name__)
# ── MiniMateClient ────────────────────────────────────────────────────────────
class MiniMateClient:
"""
High-level client for a single MiniMate Plus device.
Args:
port: Serial port name (e.g. "COM5", "/dev/ttyUSB0").
Not required when a pre-built transport is provided.
baud: Baud rate (default 38400, ignored when transport is provided).
timeout: Per-request receive timeout in seconds (default 15.0).
transport: Pre-built transport (SerialTransport or TcpTransport).
If None, a SerialTransport is constructed from port/baud.
"""
def __init__(
self,
port: str = "",
baud: int = 38_400,
timeout: float = 15.0,
transport: Optional[BaseTransport] = None,
) -> None:
self.port = port
self.baud = baud
self.timeout = timeout
self._transport: Optional[BaseTransport] = transport
self._proto: Optional[MiniMateProtocol] = None
# ── Connection lifecycle ──────────────────────────────────────────────────
def open(self) -> None:
"""Open the transport connection."""
if self._transport is None:
self._transport = SerialTransport(self.port, self.baud)
if not self._transport.is_connected:
self._transport.connect()
self._proto = MiniMateProtocol(self._transport, recv_timeout=self.timeout)
def close(self) -> None:
"""Close the transport connection."""
if self._transport and self._transport.is_connected:
self._transport.disconnect()
self._proto = None
@property
def is_open(self) -> bool:
return bool(self._transport and self._transport.is_connected)
# ── Context manager ───────────────────────────────────────────────────────
def __enter__(self) -> "MiniMateClient":
self.open()
return self
def __exit__(self, *_) -> None:
self.close()
# ── Public API ────────────────────────────────────────────────────────────
def connect(self) -> DeviceInfo:
"""
Perform the startup handshake and read device identity + compliance config.
Opens the connection if not already open.
Reads:
1. POLL handshake (startup)
2. SUB 15 — serial number
3. SUB 01 — full config block (firmware, model strings)
4. SUB 1A — compliance config (record time, trigger/alarm levels, project strings)
5. SUB 08 — event index (stored event count)
Returns:
Populated DeviceInfo with compliance_config and event_count cached.
Raises:
ProtocolError: on any communication failure.
"""
if not self.is_open:
self.open()
proto = self._require_proto()
log.info("connect: POLL startup")
proto.startup()
log.info("connect: reading serial number (SUB 15)")
sn_data = proto.read(SUB_SERIAL_NUMBER)
device_info = _decode_serial_number(sn_data)
log.info("connect: reading full config (SUB 01)")
cfg_data = proto.read(SUB_FULL_CONFIG)
_decode_full_config_into(cfg_data, device_info)
log.info("connect: reading compliance config (SUB 1A)")
try:
cc_data = proto.read_compliance_config()
_decode_compliance_config_into(cc_data, device_info)
except ProtocolError as exc:
log.warning("connect: compliance config read failed: %s — continuing", exc)
log.info("connect: reading event index (SUB 08)")
try:
idx_raw = proto.read_event_index()
device_info.event_count = _decode_event_count(idx_raw)
log.info("connect: device has %d stored event(s)", device_info.event_count)
except ProtocolError as exc:
log.warning("connect: event index read failed: %s — continuing", exc)
log.info("connect: %s", device_info)
return device_info
def get_events(self, full_waveform: bool = False, debug: bool = False) -> list[Event]:
"""
Download all stored events from the device using the confirmed
1E → 0A → 0C → 5A → 1F event-iterator protocol.
Sequence (confirmed from 3-31-26 and 1-2-26 Blastware captures):
1. SUB 1E — get first waveform key
2. For each key until b'\\x00\\x00\\x00\\x00':
a. SUB 0A — waveform header (first event only, confirm full record)
b. SUB 0C — full waveform record (peak values, record type, timestamp)
c. SUB 5A — bulk waveform stream (event-time metadata; stops early
after "Project:" is found, so only ~8 frames are fetched)
d. SUB 1F — advance to next key (token=0xFE skips partial bins)
The SUB 5A fetch provides the authoritative event-time metadata:
"Project:", "Client:", "User Name:", "Seis Loc:", and "Extended Notes"
as they were configured AT THE TIME the event was recorded. This is
distinct from the current device compliance config (SUB 1A), which only
reflects the CURRENT setup.
Raw ADC waveform samples (full bulk waveform payload, several MB) are
NOT downloaded by default. include_waveforms is reserved for a future
endpoint that fetches and stores the raw ADC channel data.
Returns:
List of Event objects, one per stored waveform record.
Raises:
ProtocolError: on unrecoverable communication failure.
"""
proto = self._require_proto()
log.info("get_events: requesting first event (SUB 1E)")
try:
key4, _event_data8 = proto.read_event_first()
except ProtocolError as exc:
raise ProtocolError(f"get_events: 1E failed: {exc}") from exc
if key4 == b"\x00\x00\x00\x00":
log.info("get_events: device reports no stored events")
return []
events: list[Event] = []
idx = 0
is_first = True
while key4 != b"\x00\x00\x00\x00":
log.info("get_events: record %d key=%s", idx, key4.hex())
ev = Event(index=idx)
ev._waveform_key = key4 # stored so download_waveform() can re-use it
# First event: call 0A to verify it's a full record (0x30 length).
# Subsequent keys come from 1F(0xFE) which guarantees full records,
# so we skip 0A for those — exactly matching Blastware behaviour.
proceed = True
if is_first:
try:
_hdr, rec_len = proto.read_waveform_header(key4)
if rec_len < 0x30:
log.warning(
"get_events: first key=%s is partial (len=0x%02X) — skipping",
key4.hex(), rec_len,
)
proceed = False
except ProtocolError as exc:
log.warning(
"get_events: 0A failed for key=%s: %s — skipping 0C",
key4.hex(), exc,
)
proceed = False
is_first = False
if proceed:
# SUB 0C — full waveform record (peak values, timestamp, "Project:" string)
try:
record = proto.read_waveform_record(key4)
if debug:
ev._raw_record = record
_decode_waveform_record_into(record, ev)
except ProtocolError as exc:
log.warning(
"get_events: 0C failed for key=%s: %s", key4.hex(), exc
)
# SUB 5A — bulk waveform stream.
# By default (full_waveform=False): stop early after frame 7 ("Project:")
# is found — fetches only ~8 frames for event-time metadata.
# When full_waveform=True: fetch the complete stream (stop_after_metadata=False,
# max_chunks=128) and decode raw ADC samples into ev.raw_samples.
# The full waveform MUST be fetched here, inside the 1E→0A→0C→5A→1F loop.
# Issuing 5A after 1F has advanced the event context will time out.
try:
if full_waveform:
log.info(
"get_events: 5A full waveform download for key=%s", key4.hex()
)
a5_frames = proto.read_bulk_waveform_stream(
key4, stop_after_metadata=False, max_chunks=128
)
if a5_frames:
_decode_a5_metadata_into(a5_frames, ev)
_decode_a5_waveform(a5_frames, ev)
log.info(
"get_events: 5A decoded %d sample-sets",
len((ev.raw_samples or {}).get("Tran", [])),
)
else:
a5_frames = proto.read_bulk_waveform_stream(
key4, stop_after_metadata=True
)
if a5_frames:
_decode_a5_metadata_into(a5_frames, ev)
log.debug(
"get_events: 5A metadata client=%r operator=%r",
ev.project_info.client if ev.project_info else None,
ev.project_info.operator if ev.project_info else None,
)
except ProtocolError as exc:
log.warning(
"get_events: 5A failed for key=%s: %s — event-time metadata unavailable",
key4.hex(), exc,
)
events.append(ev)
idx += 1
# SUB 1F — advance to the next full waveform record key
try:
key4 = proto.advance_event()
except ProtocolError as exc:
log.warning("get_events: 1F failed: %s — stopping iteration", exc)
break
log.info("get_events: downloaded %d event(s)", len(events))
return events
def download_waveform(self, event: Event) -> None:
"""
Download the full raw ADC waveform for a previously-retrieved event
and populate event.raw_samples, event.total_samples,
event.pretrig_samples, and event.rectime_seconds.
This performs a complete SUB 5A (BULK_WAVEFORM_STREAM) download with
stop_after_metadata=False, fetching all waveform frames (typically 9
large A5 frames for a standard blast record). The download is large
(up to several hundred KB for a 9-second, 4-channel, 1024-Hz record)
and is intentionally not performed by get_events() by default.
Args:
event: An Event object returned by get_events(). Must have a
waveform key embedded; the key is reconstructed from the
event's timestamp and index via the 1E/1F protocol.
Raises:
ValueError: if the event does not have a waveform key available.
RuntimeError: if the client is not connected.
ProtocolError: on communication failure.
Confirmed format (4-2-26 blast capture, ✅):
4-channel interleaved signed 16-bit LE, 8 bytes per sample-set.
Total samples: 9306 (≈9.1 s at 1024 Hz), pretrig: 298 (≈0.29 s).
Channel order: Tran, Vert, Long, Mic (Blastware convention).
"""
proto = self._require_proto()
if event._waveform_key is None:
raise ValueError(
f"Event#{event.index} has no waveform key — "
"was it retrieved via get_events()?"
)
log.info(
"download_waveform: starting full 5A download for event#%d (key=%s)",
event.index, event._waveform_key.hex(),
)
a5_frames = proto.read_bulk_waveform_stream(
event._waveform_key, stop_after_metadata=False, max_chunks=128
)
log.info(
"download_waveform: received %d A5 frames; decoding waveform",
len(a5_frames),
)
_decode_a5_waveform(a5_frames, event)
if event.raw_samples is not None:
n = len(event.raw_samples.get("Tran", []))
log.info(
"download_waveform: decoded %d sample-sets across 4 channels",
n,
)
else:
log.warning("download_waveform: waveform decode produced no samples")
# ── Internal helpers ──────────────────────────────────────────────────────
def _require_proto(self) -> MiniMateProtocol:
if self._proto is None:
raise RuntimeError("MiniMateClient is not connected. Call open() first.")
return self._proto
# ── Decoder functions ─────────────────────────────────────────────────────────
#
# Pure functions: bytes → model field population.
# Kept here (not in models.py) to isolate protocol knowledge from data shapes.
def _decode_serial_number(data: bytes) -> DeviceInfo:
"""
Decode SUB EA (SERIAL_NUMBER_RESPONSE) payload into a new DeviceInfo.
Layout (10 bytes total per §7.2):
bytes 07: serial string, null-terminated, null-padded ("BE18189\\x00")
byte 8: unit-specific trailing byte (purpose unknown ❓)
byte 9: firmware minor version (0x11 = 17) ✅
Returns:
New DeviceInfo with serial, firmware_minor, serial_trail_0 populated.
"""
# data is data_rsp.data = payload[5:]. The 11-byte section header occupies
# data[0..10]: [LENGTH_ECHO:1][00×4][KEY_ECHO:4][00×2].
# Actual serial payload starts at data[11].
actual = data[11:] if len(data) > 11 else data
if len(actual) < 9:
# Short payload — gracefully degrade
serial = actual.rstrip(b"\x00").decode("ascii", errors="replace")
return DeviceInfo(serial=serial, firmware_minor=0)
serial = actual[:8].rstrip(b"\x00").decode("ascii", errors="replace")
trail_0 = actual[8] if len(actual) > 8 else None
fw_minor = actual[9] if len(actual) > 9 else 0
return DeviceInfo(
serial=serial,
firmware_minor=fw_minor,
serial_trail_0=trail_0,
)
def _decode_full_config_into(data: bytes, info: DeviceInfo) -> None:
"""
Decode SUB FE (FULL_CONFIG_RESPONSE) payload into an existing DeviceInfo.
The FE response arrives as a composite S3 outer frame whose data section
contains inner DLE-framed sub-frames. Because of this nesting the §7.3
fixed offsets (0x34, 0x3C, 0x44, 0x6D) are unreliable — they assume a
clean non-nested payload starting at byte 0.
Instead we search the whole byte array for known ASCII patterns. The
strings are long enough to be unique in any reasonable payload.
Modifies info in-place.
"""
def _extract(needle: bytes, max_len: int = 32) -> Optional[str]:
"""Return the null-terminated ASCII string that starts with *needle*."""
pos = data.find(needle)
if pos < 0:
return None
end = pos
while end < len(data) and data[end] != 0 and (end - pos) < max_len:
end += 1
s = data[pos:end].decode("ascii", errors="replace").strip()
return s or None
# ── Manufacturer and model are straightforward literal matches ────────────
info.manufacturer = _extract(b"Instantel")
info.model = _extract(b"MiniMate Plus")
# ── Firmware version: "S3xx.xx" — scan for the 'S3' prefix ───────────────
for i in range(len(data) - 5):
if data[i] == ord('S') and data[i + 1] == ord('3') and chr(data[i + 2]).isdigit():
end = i
while end < len(data) and data[end] not in (0, 0x20) and (end - i) < 12:
end += 1
candidate = data[i:end].decode("ascii", errors="replace").strip()
if "." in candidate and len(candidate) >= 5:
info.firmware_version = candidate
break
# ── DSP version: numeric "xx.xx" — search for known prefixes ─────────────
for prefix in (b"10.", b"11.", b"12.", b"9.", b"8."):
pos = data.find(prefix)
if pos < 0:
continue
end = pos
while end < len(data) and data[end] not in (0, 0x20) and (end - pos) < 8:
end += 1
candidate = data[pos:end].decode("ascii", errors="replace").strip()
# Accept only strings that look like "digits.digits"
if "." in candidate and all(c in "0123456789." for c in candidate):
info.dsp_version = candidate
break
def _decode_event_count(data: bytes) -> int:
"""
Extract stored event count from SUB F7 (EVENT_INDEX_RESPONSE) payload.
Layout per §7.4 (offsets from data section start):
+00: 00 58 09 — total index size or record count ❓
+03: 00 00 00 01 — possibly stored event count = 1 ❓
We use bytes +03..+06 interpreted as uint32 BE as the event count.
This is inferred (🔶) — the exact meaning of the first 3 bytes is unclear.
"""
if len(data) < 7:
log.warning("event index payload too short (%d bytes), assuming 0 events", len(data))
return 0
# Try the uint32 at +3 first
count = struct.unpack_from(">I", data, 3)[0]
# Sanity check: MiniMate Plus manual says max ~1000 events
if count > 1000:
log.warning(
"event count %d looks unreasonably large — clamping to 0", count
)
return 0
return count
def _decode_event_header_into(data: bytes, event: Event) -> None:
"""
Decode SUB E1 (EVENT_HEADER_RESPONSE) raw data section into an Event.
The waveform key is at data[11:15] (extracted separately in
MiniMateProtocol.read_event_first). The remaining 4 bytes at
data[15:19] are not yet decoded (❓ — possibly sample rate or flags).
Date information (year/month/day) lives in the waveform record (SUB 0C),
not in the 1E response. This function is a placeholder for any future
metadata we decode from the 8-byte 1E data block.
Modifies event in-place.
"""
# Nothing confirmed yet from the 8-byte data block beyond the key at [0:4].
# Leave event.timestamp as None — it will be populated from the 0C record.
pass
def _decode_waveform_record_into(data: bytes, event: Event) -> None:
"""
Decode a 210-byte SUB F3 (FULL_WAVEFORM_RECORD) record into an Event.
The *data* argument is the raw record bytes returned by
MiniMateProtocol.read_waveform_record() — i.e. data_rsp.data[11:11+0xD2].
Extracts (all ✅ confirmed 2026-04-01 against Blastware event report):
- timestamp: 9-byte format at bytes [0:9]
- record_type: sub_code at byte[1] (0x10 = "Waveform")
- peak_values: label-based float32 at label+6 for Tran/Vert/Long/MicL
- peak_vector_sum: IEEE 754 BE float at offset 87
- project_info: "Project:", "Client:", etc. string search
Modifies event in-place.
"""
# ── Timestamp ─────────────────────────────────────────────────────────────
# 9-byte format: [day][sub_code][month][year:2 BE][unknown][hour][min][sec]
try:
event.timestamp = Timestamp.from_waveform_record(data)
except Exception as exc:
log.warning("waveform record timestamp decode failed: %s", exc)
# ── Record type ───────────────────────────────────────────────────────────
# Decoded from byte[1] (sub_code), not from ASCII string search
try:
event.record_type = _extract_record_type(data)
except Exception as exc:
log.warning("waveform record type decode failed: %s", exc)
# ── Peak values (per-channel PPV + Peak Vector Sum) ───────────────────────
try:
peak_values = _extract_peak_floats(data)
if peak_values:
event.peak_values = peak_values
except Exception as exc:
log.warning("waveform record peak decode failed: %s", exc)
# ── Project strings ───────────────────────────────────────────────────────
try:
project_info = _extract_project_strings(data)
if project_info:
event.project_info = project_info
except Exception as exc:
log.warning("waveform record project strings decode failed: %s", exc)
def _decode_a5_metadata_into(frames_data: list[bytes], event: Event) -> None:
"""
Search A5 (BULK_WAVEFORM_STREAM) frame data for event-time metadata strings
and populate event.project_info.
This is the authoritative source for event-time metadata — it reflects the
device setup AT THE TIME the event was recorded, not the current device
configuration. The metadata lives in a middle A5 frame (confirmed: A5[7]
of 9 large frames for the 1-2-26 capture):
Confirmed needle locations in A5[7].data (2026-04-02 from 1-2-26 capture):
b"Project:" at data[626]
b"Client:" at data[676]
b"User Name:" at data[703]
b"Seis Loc:" at data[735]
b"Extended Notes" at data[774]
All frames are concatenated for a single-pass needle search. Fields already
set from the 0C waveform record are overwritten — A5 data is more complete
(the 210-byte 0C record only carries "Project:", not client/operator/etc.).
Modifies event in-place.
"""
combined = b"".join(frames_data)
def _find_string_after(needle: bytes, max_len: int = 64) -> Optional[str]:
pos = combined.find(needle)
if pos < 0:
return None
value_start = pos + len(needle)
while value_start < len(combined) and combined[value_start] == 0:
value_start += 1
if value_start >= len(combined):
return None
end = value_start
while end < len(combined) and combined[end] != 0 and (end - value_start) < max_len:
end += 1
s = combined[value_start:end].decode("ascii", errors="replace").strip()
return s or None
project = _find_string_after(b"Project:")
client = _find_string_after(b"Client:")
operator = _find_string_after(b"User Name:")
location = _find_string_after(b"Seis Loc:")
notes = _find_string_after(b"Extended Notes")
if not any([project, client, operator, location, notes]):
log.debug("a5 metadata: no project strings found in %d frames", len(frames_data))
return
if event.project_info is None:
event.project_info = ProjectInfo()
pi = event.project_info
# Overwrite with A5 values — they are event-time authoritative.
# 0C waveform record only carried "Project:"; A5 carries the full set.
if project: pi.project = project
if client: pi.client = client
if operator: pi.operator = operator
if location: pi.sensor_location = location
if notes: pi.notes = notes
log.debug(
"a5 metadata: project=%r client=%r operator=%r location=%r",
pi.project, pi.client, pi.operator, pi.sensor_location,
)
def _decode_a5_waveform(
frames_data: list[bytes],
event: Event,
) -> None:
"""
Decode the raw 4-channel ADC waveform from a complete set of SUB 5A
(BULK_WAVEFORM_STREAM) frame payloads and populate event.raw_samples,
event.total_samples, event.pretrig_samples, and event.rectime_seconds.
This requires ALL A5 frames (stop_after_metadata=False), not just the
metadata-bearing subset.
── Waveform format (confirmed from 4-2-26 blast capture) ───────────────────
The blast waveform is 4-channel interleaved signed 16-bit little-endian,
8 bytes per sample-set:
[T_lo T_hi V_lo V_hi L_lo L_hi M_lo M_hi] × N
where T=Tran, V=Vert, L=Long, M=Mic. Channel ordering follows the
Blastware convention [Tran, Vert, Long, Mic] = [ch0, ch1, ch2, ch3].
⚠️ Channel ordering is a confirmed CONVENTION — the physical ordering on
the ADC mux is not independently verifiable from the saturating blast
captures we have. The convention is consistent with Blastware labeling
(Tran is always the first channel field in the A5 STRT+waveform stream).
── Frame structure ──────────────────────────────────────────────────────────
A5[0] (probe response):
db[7:] = [11-byte header] [21-byte STRT record] [6-byte preamble] [waveform ...]
STRT: b'STRT' at offset 11, total 21 bytes
+8 uint16 BE: total_samples (expected full-record sample-sets)
+16 uint16 BE: pretrig_samples (pre-trigger sample count)
+18 uint8: rectime_seconds (record duration)
Preamble: 6 bytes after the STRT record (confirmed from 4-2-26 blast capture):
bytes 21-22: 0x00 0x00 (null padding)
bytes 23-26: 0xFF × 4 (sync sentinel / alignment marker)
Waveform starts at strt_pos + 27 within db[7:].
A5[1..N] (chunk responses):
db[7:] = [8-byte per-frame header] [waveform bytes ...]
Header: [ctr LE uint16, 0x00 × 6] — frame sequence counter
Waveform starts at byte 8 of db[7:].
── Cross-frame alignment ────────────────────────────────────────────────────
Frame waveform chunk sizes are NOT multiples of 8. Naive concatenation
scrambles channel assignments at frame boundaries. Fix: track the
cumulative global byte offset; at each new frame, the starting alignment
within the T,V,L,M cycle is (global_offset % 8).
Confirmed sizes from 4-2-26 (A5[0..8], skipping A5[7] metadata frame
and A5[9] terminator):
Frame 0: 934B Frame 1: 963B Frame 2: 946B Frame 3: 960B
Frame 4: 952B Frame 5: 946B Frame 6: 941B Frame 8: 992B
— none are multiples of 8.
── Modifies event in-place. ─────────────────────────────────────────────────
"""
if not frames_data:
log.debug("_decode_a5_waveform: no frames provided")
return
# ── Parse STRT record from A5[0] ────────────────────────────────────────
w0 = frames_data[0][7:] # db[7:] for A5[0]
strt_pos = w0.find(b"STRT")
if strt_pos < 0:
log.warning("_decode_a5_waveform: STRT record not found in A5[0]")
return
# STRT record layout (21 bytes, offsets relative to b'STRT'):
# +0..3 magic b'STRT'
# +8..9 uint16 BE total_samples (full-record expected sample-set count)
# +16..17 uint16 BE pretrig_samples
# +18 uint8 rectime_seconds
strt = w0[strt_pos : strt_pos + 21]
if len(strt) < 21:
log.warning("_decode_a5_waveform: STRT record truncated (%dB)", len(strt))
return
total_samples = struct.unpack_from(">H", strt, 8)[0]
pretrig_samples = struct.unpack_from(">H", strt, 16)[0]
rectime_seconds = strt[18]
event.total_samples = total_samples
event.pretrig_samples = pretrig_samples
event.rectime_seconds = rectime_seconds
log.debug(
"_decode_a5_waveform: STRT total_samples=%d pretrig=%d rectime=%ds",
total_samples, pretrig_samples, rectime_seconds,
)
# ── Collect per-frame waveform bytes with global offset tracking ─────────
# global_offset is the cumulative byte count across all frames, used to
# compute the channel alignment at each frame boundary.
chunks: list[tuple[int, bytes]] = [] # (frame_idx, waveform_bytes)
global_offset = 0
for fi, db in enumerate(frames_data):
w = db[7:]
# A5[0]: waveform begins after the 21-byte STRT record and 6-byte preamble.
# Layout: STRT(21B) + null-pad(2B) + 0xFF sentinel(4B) = 27 bytes total.
if fi == 0:
sp = w.find(b"STRT")
if sp < 0:
continue
wave = w[sp + 27 :]
# Frame 7 carries event-time metadata strings ("Project:", "Client:", …)
# and no waveform ADC data.
elif fi == 7:
continue
# A5[9] is the device terminator frame (page_key=0x0000), also no data.
elif fi == 9:
continue
else:
# Strip the 8-byte per-frame header (ctr + 6 zero bytes)
if len(w) < 8:
continue
wave = w[8:]
if len(wave) < 2:
continue
chunks.append((fi, wave))
global_offset += len(wave)
total_bytes = global_offset
n_sets = total_bytes // 8
log.debug(
"_decode_a5_waveform: %d chunks, %dB total → %d complete sample-sets "
"(%d of %d expected; %.0f%%)",
len(chunks), total_bytes, n_sets, n_sets, total_samples,
100.0 * n_sets / total_samples if total_samples else 0,
)
if n_sets == 0:
log.warning("_decode_a5_waveform: no complete sample-sets found")
return
# ── Concatenate into one stream and decode ───────────────────────────────
# Rather than concatenating and then fixing up, we reconstruct the correct
# channel-aligned stream by skipping misaligned partial sample-sets at each
# frame start.
#
# At global byte offset G, the byte position within the T,V,L,M cycle is
# G % 8. When a frame starts with align = G % 8 ≠ 0, the first
# (8 - align) bytes of that frame complete a partial sample-set that
# cannot be decoded cleanly, so we skip them and start from the next full
# T-boundary.
#
# This produces a slightly smaller decoded set but preserves correct
# channel alignment throughout.
tran: list[int] = []
vert: list[int] = []
long_: list[int] = []
mic: list[int] = []
running_offset = 0
for fi, wave in chunks:
align = running_offset % 8 # byte position within T,V,L,M cycle
skip = (8 - align) % 8 # bytes to discard to reach next T start
if skip > 0 and skip < len(wave):
usable = wave[skip:]
elif align == 0:
usable = wave
else:
running_offset += len(wave)
continue # entire frame is a partial sample-set
n_usable = len(usable) // 8
for i in range(n_usable):
off = i * 8
tran.append( struct.unpack_from("<h", usable, off)[0])
vert.append( struct.unpack_from("<h", usable, off + 2)[0])
long_.append(struct.unpack_from("<h", usable, off + 4)[0])
mic.append( struct.unpack_from("<h", usable, off + 6)[0])
running_offset += len(wave)
log.debug(
"_decode_a5_waveform: decoded %d alignment-corrected sample-sets "
"(skipped %d due to frame boundary misalignment)",
len(tran), n_sets - len(tran),
)
event.raw_samples = {
"Tran": tran,
"Vert": vert,
"Long": long_,
"Mic": mic,
}
def _extract_record_type(data: bytes) -> Optional[str]:
"""
Decode the recording mode from byte[1] of the 210-byte waveform record.
Byte[1] is the sub-record code that immediately follows the day byte in the
9-byte timestamp header at the start of each waveform record:
[day:1] [sub_code:1] [month:1] [year:2 BE] ...
Confirmed codes (✅ 2026-04-01):
0x10 → "Waveform" (continuous / single-shot mode)
Histogram mode code is not yet confirmed — a histogram event must be
captured with debug=true to identify it. Returns None for unknown codes.
"""
if len(data) < 2:
return None
code = data[1]
if code == 0x10:
return "Waveform"
# Unknown code — log it so we can identify histogram/noise sub_codes from real captures
log.warning("_extract_record_type: unknown sub_code=0x%02X — returning raw string", code)
return f"Unknown(0x{code:02X})"
return None
def _extract_peak_floats(data: bytes) -> Optional[PeakValues]:
"""
Locate per-channel peak particle velocity values in the 210-byte
waveform record by searching for the embedded channel label strings
("Tran", "Vert", "Long", "MicL") and reading the IEEE 754 BE float
at label_offset + 6.
The floats are NOT 4-byte aligned in the record (confirmed from
3-31-26 capture), so the previous step-4 scan missed Tran, Long, and
MicL entirely. Label-based lookup is the correct approach.
Channel labels are separated by inner-frame bytes (0x10 0x03 = DLE ETX),
which the S3FrameParser preserves as literal data. Searching for the
4-byte ASCII label strings is robust to this structure.
Returns PeakValues if at least one channel label is found, else None.
"""
# (label_bytes, field_name)
channels = (
(b"Tran", "tran"),
(b"Vert", "vert"),
(b"Long", "long_"),
(b"MicL", "micl"),
)
vals: dict[str, float] = {}
for label_bytes, field in channels:
pos = data.find(label_bytes)
if pos < 0:
continue
float_off = pos + 6
if float_off + 4 > len(data):
log.debug("peak float: label %s at %d but float runs past end", label_bytes, pos)
continue
try:
val = struct.unpack_from(">f", data, float_off)[0]
except struct.error:
continue
log.debug("peak float: %s at label+6 (%d) = %.6f", label_bytes.decode(), float_off, val)
vals[field] = val
if not vals:
return None
# ── Peak Vector Sum — fixed offset 87 (✅ confirmed 2026-04-01) ───────────
# = √(Tran² + Vert² + Long²) at the sample instant of maximum combined geo
# motion, NOT the vector sum of the three per-channel peak values (which may
# occur at different times). Matches Blastware "Peak Vector Sum" exactly.
pvs: Optional[float] = None
if len(data) > 91:
try:
pvs = struct.unpack_from(">f", data, 87)[0]
except struct.error:
pass
return PeakValues(
tran=vals.get("tran"),
vert=vals.get("vert"),
long=vals.get("long_"),
micl=vals.get("micl"),
peak_vector_sum=pvs,
)
def _extract_project_strings(data: bytes) -> Optional[ProjectInfo]:
"""
Search the waveform record payload for known ASCII label strings
("Project:", "Client:", "User Name:", "Seis Loc:", "Extended Notes")
and extract the associated value strings that follow them.
Layout (per §7.5): each entry is [label ~16 bytes][value ~32 bytes],
null-padded. We find the label, then read the next non-null chars.
"""
def _find_string_after(needle: bytes, max_value_len: int = 64) -> Optional[str]:
pos = data.find(needle)
if pos < 0:
return None
# Skip the label (including null padding) until we find a non-null value
# The value starts at pos+len(needle), but may have a gap of null bytes
value_start = pos + len(needle)
# Skip nulls
while value_start < len(data) and data[value_start] == 0:
value_start += 1
if value_start >= len(data):
return None
# Read until null terminator or max_value_len
end = value_start
while end < len(data) and data[end] != 0 and (end - value_start) < max_value_len:
end += 1
value = data[value_start:end].decode("ascii", errors="replace").strip()
return value or None
project = _find_string_after(b"Project:")
client = _find_string_after(b"Client:")
operator = _find_string_after(b"User Name:")
location = _find_string_after(b"Seis Loc:")
notes = _find_string_after(b"Extended Notes")
if not any([project, client, operator, location, notes]):
return None
return ProjectInfo(
project=project,
client=client,
operator=operator,
sensor_location=location,
notes=notes,
)
def _decode_compliance_config_into(data: bytes, info: DeviceInfo) -> None:
"""
Decode a 2090-byte SUB 1A (COMPLIANCE_CONFIG) response into a ComplianceConfig.
The *data* argument is the raw bytes returned by read_compliance_config()
(frames B+C+D concatenated, echo headers stripped).
Confirmed field locations (BE11529 with 3-step read, duplicate detection):
- cfg[89] = setup_name: first long ASCII string in cfg[40:250] ✅
- ANCHOR = b'\\x01\\x2c\\x00\\x00\\xbe\\x80\\x00\\x00\\x00\\x00' in cfg[40:100] ✅
- anchor - 2 = sample_rate uint16_BE (1024 normal / 2048 fast / 4096 faster)
- anchor + 10 = record_time float32_BE
- "Project:" needle → project string
- "Client:" needle → client string
- "User Name:" needle → operator string
- "Seis Loc:" needle → sensor_location string
- "Extended Notes" needle → notes string
Anchor approach is required because a DLE byte in the sample_rate field
(4096 = 0x1000 → stored as 10 10 00 in raw S3 frame → unstuffed to 10 00,
1 byte shorter than 04 00 or 08 00) causes frame C to be 1 byte shorter
for "faster" mode, shifting all subsequent offsets by 1. The 10-byte
anchor is stable across all modes.
Channel block layout (✅ confirmed 2026-04-02 from 3-11-26 E5 frame 78
and 1-2-26 A5 frame 77):
"Tran" label at tran_pos
tran_pos + 28 = max_range float32_BE (e.g. 6.206053 in/s)
tran_pos + 34 = trigger_level float32_BE (e.g. 0.600000 in/s)
tran_pos + 38 = "in.\\x00" (unit string anchor)
tran_pos + 42 = alarm_level float32_BE (e.g. 1.250000 in/s)
tran_pos + 46 = "/s\\x00\\x00" (unit string anchor)
Modifies info.compliance_config in-place.
"""
if not data or len(data) < 40:
log.warning("compliance config payload too short (%d bytes)", len(data))
return
config = ComplianceConfig(raw=data)
# ── Setup name ────────────────────────────────────────────────────────────
# The setup_name IS the string itself — it is NOT a label followed by a value.
# It appears as the first long (>=8 char) ASCII string in cfg[40:250].
# The preceding bytes vary by device (cfg[88]=0x01 on BE11529); the string
# itself is null-terminated.
try:
setup_name = _find_first_string(data, start=40, end=250, min_len=8)
config.setup_name = setup_name
if setup_name:
log.debug("compliance_config: setup_name = %r", setup_name)
except Exception as exc:
log.warning("compliance_config: setup_name extraction failed: %s", exc)
# ── Record time + sample rate — anchor-relative ───────────────────────────
# The 10-byte anchor sits between sample_rate and record_time in the cfg.
# Absolute offsets are NOT reliable because sample_rate = 4096 (0x1000) is
# DLE-escaped in the raw S3 frame (10 10 00 → 10 00 after unstuffing),
# making frame C 1 byte shorter than for 1024/2048 and shifting everything.
# sample_rate: uint16_BE at anchor - 2
# record_time: float32_BE at anchor + 10
_ANCHOR = b'\x01\x2c\x00\x00\xbe\x80\x00\x00\x00\x00'
_anchor = data.find(_ANCHOR, 0, 150)
if _anchor >= 2 and _anchor + 14 <= len(data):
try:
config.sample_rate = struct.unpack_from(">H", data, _anchor - 2)[0]
log.debug(
"compliance_config: sample_rate = %d Sa/s (anchor@%d)", config.sample_rate, _anchor
)
except Exception as exc:
log.warning("compliance_config: sample_rate extraction failed: %s", exc)
try:
config.record_time = struct.unpack_from(">f", data, _anchor + 10)[0]
log.debug(
"compliance_config: record_time = %.3f s (anchor@%d)", config.record_time, _anchor
)
except Exception as exc:
log.warning("compliance_config: record_time extraction failed: %s", exc)
else:
log.warning(
"compliance_config: anchor %s not found in cfg[40:100] (len=%d) "
"— sample_rate and record_time will be None",
_ANCHOR.hex(), len(data),
)
# ── Project strings ───────────────────────────────────────────────────────
try:
def _find_string_after(needle: bytes, max_len: int = 64) -> Optional[str]:
pos = data.find(needle)
if pos < 0:
return None
value_start = pos + len(needle)
while value_start < len(data) and data[value_start] == 0:
value_start += 1
if value_start >= len(data):
return None
end = value_start
while end < len(data) and data[end] != 0 and (end - value_start) < max_len:
end += 1
s = data[value_start:end].decode("ascii", errors="replace").strip()
return s or None
config.project = _find_string_after(b"Project:")
config.client = _find_string_after(b"Client:")
config.operator = _find_string_after(b"User Name:")
config.sensor_location = _find_string_after(b"Seis Loc:")
config.notes = _find_string_after(b"Extended Notes")
if config.project:
log.debug("compliance_config: project = %s", config.project)
if config.client:
log.debug("compliance_config: client = %s", config.client)
except Exception as exc:
log.warning("compliance_config: project string extraction failed: %s", exc)
# ── Channel block: trigger_level_geo, alarm_level_geo, max_range_geo ─────
# The channel block is only present in the full cfg (frame D delivered,
# ~2126 bytes). Layout confirmed 2026-04-02 from both E5 frame 78 of the
# 3-11-26 compliance-config capture and A5 frame 77 of the 1-2-26 event
# download capture:
#
# "Tran" label at tran_pos (+0 to +3)
# max_range float32_BE at tran_pos + 28 (e.g. 6.206053 in/s)
# trigger float32_BE at tran_pos + 34 (e.g. 0.600000 in/s)
# "in.\x00" unit string at tran_pos + 38 ✅ confirmed
# alarm float32_BE at tran_pos + 42 (e.g. 1.250000 in/s)
# "/s\x00\x00" unit string at tran_pos + 46 ✅ confirmed
#
# Unit strings serve as layout anchors — if they match, the float offsets
# are reliable. Skip "Tran2" (a later repeated label) via the +4 check.
try:
tran_pos = data.find(b"Tran", 44)
if (
tran_pos >= 0
and data[tran_pos + 4 : tran_pos + 5] != b"2" # not "Tran2"
and tran_pos + 50 <= len(data)
and data[tran_pos + 38 : tran_pos + 42] == b"in.\x00"
and data[tran_pos + 46 : tran_pos + 50] == b"/s\x00\x00"
):
config.max_range_geo = struct.unpack_from(">f", data, tran_pos + 28)[0]
config.trigger_level_geo = struct.unpack_from(">f", data, tran_pos + 34)[0]
config.alarm_level_geo = struct.unpack_from(">f", data, tran_pos + 42)[0]
log.debug(
"compliance_config: trigger=%.4f alarm=%.4f max_range=%.4f in/s",
config.trigger_level_geo, config.alarm_level_geo, config.max_range_geo,
)
elif tran_pos >= 0:
log.warning(
"compliance_config: 'Tran' at %d — unit string check failed: "
"+38..+42=%s (want 696e2e00) +46..+50=%s (want 2f730000)",
tran_pos,
data[tran_pos + 38 : tran_pos + 42].hex() if tran_pos + 42 <= len(data) else "??",
data[tran_pos + 46 : tran_pos + 50].hex() if tran_pos + 50 <= len(data) else "??",
)
else:
log.debug("compliance_config: channel block not present in cfg (len=%d)", len(data))
except Exception as exc:
log.warning("compliance_config: channel block extraction failed: %s", exc)
info.compliance_config = config
def _find_first_string(data: bytes, start: int, end: int, min_len: int) -> Optional[str]:
"""
Return the first null-terminated printable ASCII string of length >= min_len
found in data[start:end].
"""
i = start
end = min(end, len(data))
while i < end:
if 0x20 <= data[i] < 0x7F:
j = i
while j < len(data) and 0x20 <= data[j] < 0x7F:
j += 1
if j - i >= min_len:
return data[i:j].decode("ascii", errors="replace").strip()
i = j + 1
else:
i += 1
return None