feat: fetch event-time metadata from SUB 5A bulk waveform stream

Add read_bulk_waveform_stream() to MiniMateProtocol and wire it into
get_events() so each event gets authoritative client/operator/sensor_location
from the A5 frames recorded at event-time, not the current compliance config.

- framing.py: bulk_waveform_params() and bulk_waveform_term_params() helpers
  (probe/chunk params and termination params for SUB 5A, confirmed from
  1-2-26 BW TX capture)
- protocol.py: read_bulk_waveform_stream(key4, stop_after_metadata=True) —
  probe + chunk loop (counter += 0x0400), early stop when b"Project:" found
  (A5[7] of 9), then sends termination at offset=0x005A
- client.py: _decode_a5_metadata_into() needle-searches concatenated A5 frame
  data for Project/Client/User Name/Seis Loc/Extended Notes; get_events() now
  calls SUB 5A after each SUB 0C, overwriting project_info with event-time values

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Brian Harrison
2026-04-02 16:08:21 -04:00
parent 9b1ed1f3a8
commit 9bf20803c2
3 changed files with 305 additions and 15 deletions

View File

@@ -166,22 +166,26 @@ class MiniMateClient:
def get_events(self, include_waveforms: bool = True, debug: bool = False) -> list[Event]:
"""
Download all stored events from the device using the confirmed
1E → 0A → 0C → 1F event-iterator protocol.
1E → 0A → 0C → 5A → 1F event-iterator protocol.
Sequence (confirmed from 3-31-26 Blastware capture):
Sequence (confirmed from 3-31-26 and 1-2-26 Blastware captures):
1. SUB 1E — get first waveform key
2. For each key until b'\\x00\\x00\\x00\\x00':
a. SUB 0A — waveform header (first event only, to confirm full record)
b. SUB 0C — full waveform record (peak values, project strings)
c. SUB 1Fadvance to next key (token=0xFE skips partial bins)
a. SUB 0A — waveform header (first event only, confirm full record)
b. SUB 0C — full waveform record (peak values, record type, timestamp)
c. SUB 5Abulk waveform stream (event-time metadata; stops early
after "Project:" is found, so only ~8 frames are fetched)
d. SUB 1F — advance to next key (token=0xFE skips partial bins)
Subsequent keys returned by 1F (token=0xFE) are guaranteed to be full
records, so 0A is only called for the first event. This exactly
matches Blastware's observed behaviour.
The SUB 5A fetch provides the authoritative event-time metadata:
"Project:", "Client:", "User Name:", "Seis Loc:", and "Extended Notes"
as they were configured AT THE TIME the event was recorded. This is
distinct from the current device compliance config (SUB 1A), which only
reflects the CURRENT setup.
Raw ADC waveform samples (SUB 5A bulk stream) are NOT downloaded
here — they are large (several MB per event) and fetched separately.
include_waveforms is reserved for a future call.
Raw ADC waveform samples (full bulk waveform payload, several MB) are
NOT downloaded by default. include_waveforms is reserved for a future
endpoint that fetches and stores the raw ADC channel data.
Returns:
List of Event objects, one per stored waveform record.
@@ -206,9 +210,7 @@ class MiniMateClient:
is_first = True
while key4 != b"\x00\x00\x00\x00":
log.info(
"get_events: record %d key=%s", idx, key4.hex()
)
log.info("get_events: record %d key=%s", idx, key4.hex())
ev = Event(index=idx)
# First event: call 0A to verify it's a full record (0x30 length).
@@ -233,7 +235,7 @@ class MiniMateClient:
is_first = False
if proceed:
# SUB 0C — full waveform record (peak values, project strings)
# SUB 0C — full waveform record (peak values, timestamp, "Project:" string)
try:
record = proto.read_waveform_record(key4)
if debug:
@@ -244,6 +246,27 @@ class MiniMateClient:
"get_events: 0C failed for key=%s: %s", key4.hex(), exc
)
# SUB 5A — bulk waveform stream: event-time metadata
# Stops early after "Project:" is found (typically in A5[7] of 9)
# so we fetch only ~8 frames rather than the full multi-MB stream.
# This is the authoritative source for client/operator/seis_loc/notes.
try:
a5_frames = proto.read_bulk_waveform_stream(
key4, stop_after_metadata=True
)
if a5_frames:
_decode_a5_metadata_into(a5_frames, ev)
log.debug(
"get_events: 5A metadata client=%r operator=%r",
ev.project_info.client if ev.project_info else None,
ev.project_info.operator if ev.project_info else None,
)
except ProtocolError as exc:
log.warning(
"get_events: 5A failed for key=%s: %s — event-time metadata unavailable",
key4.hex(), exc,
)
events.append(ev)
idx += 1
@@ -452,6 +475,74 @@ def _decode_waveform_record_into(data: bytes, event: Event) -> None:
log.warning("waveform record project strings decode failed: %s", exc)
def _decode_a5_metadata_into(frames_data: list[bytes], event: Event) -> None:
"""
Search A5 (BULK_WAVEFORM_STREAM) frame data for event-time metadata strings
and populate event.project_info.
This is the authoritative source for event-time metadata — it reflects the
device setup AT THE TIME the event was recorded, not the current device
configuration. The metadata lives in a middle A5 frame (confirmed: A5[7]
of 9 large frames for the 1-2-26 capture):
Confirmed needle locations in A5[7].data (2026-04-02 from 1-2-26 capture):
b"Project:" at data[626]
b"Client:" at data[676]
b"User Name:" at data[703]
b"Seis Loc:" at data[735]
b"Extended Notes" at data[774]
All frames are concatenated for a single-pass needle search. Fields already
set from the 0C waveform record are overwritten — A5 data is more complete
(the 210-byte 0C record only carries "Project:", not client/operator/etc.).
Modifies event in-place.
"""
combined = b"".join(frames_data)
def _find_string_after(needle: bytes, max_len: int = 64) -> Optional[str]:
pos = combined.find(needle)
if pos < 0:
return None
value_start = pos + len(needle)
while value_start < len(combined) and combined[value_start] == 0:
value_start += 1
if value_start >= len(combined):
return None
end = value_start
while end < len(combined) and combined[end] != 0 and (end - value_start) < max_len:
end += 1
s = combined[value_start:end].decode("ascii", errors="replace").strip()
return s or None
project = _find_string_after(b"Project:")
client = _find_string_after(b"Client:")
operator = _find_string_after(b"User Name:")
location = _find_string_after(b"Seis Loc:")
notes = _find_string_after(b"Extended Notes")
if not any([project, client, operator, location, notes]):
log.debug("a5 metadata: no project strings found in %d frames", len(frames_data))
return
if event.project_info is None:
event.project_info = ProjectInfo()
pi = event.project_info
# Overwrite with A5 values — they are event-time authoritative.
# 0C waveform record only carried "Project:"; A5 carries the full set.
if project: pi.project = project
if client: pi.client = client
if operator: pi.operator = operator
if location: pi.sensor_location = location
if notes: pi.notes = notes
log.debug(
"a5 metadata: project=%r client=%r operator=%r location=%r",
pi.project, pi.client, pi.operator, pi.sensor_location,
)
def _extract_record_type(data: bytes) -> Optional[str]:
"""
Decode the recording mode from byte[1] of the 210-byte waveform record.

View File

@@ -182,6 +182,83 @@ def token_params(token: int = 0) -> bytes:
return bytes(p)
def bulk_waveform_params(key4: bytes, counter: int, *, is_probe: bool = False) -> bytes:
"""
Build the 10-byte params block for SUB 5A (BULK_WAVEFORM_STREAM) requests.
Confirmed 2026-04-02 from 1-2-26 BW TX capture analysis:
Probe / first request (is_probe=True, counter=0):
params[0] = 0x00
params[1:5] = key4 (all 4 key bytes; counter overlaps key4[2:4] = 0x0000)
params[5:] = zeros
Regular chunk requests (is_probe=False):
params[0] = 0x00
params[1:3] = key4[0:2] (first 2 key bytes as session handle)
params[3:5] = counter (BE uint16) (chunk position, increments by 0x0400)
params[5:] = zeros
Termination request: DO NOT use this helper — see bulk_waveform_term_params().
Args:
key4: 4-byte waveform key from EVENT_HEADER (1E) response.
counter: Chunk position counter (uint16 BE). Pass 0 for probe.
is_probe: If True, embed full key4 (probe step only).
Returns:
10-byte params block.
"""
if len(key4) != 4:
raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}")
p = bytearray(10)
p[0] = 0x00
p[1] = key4[0]
p[2] = key4[1]
if is_probe:
# Full key4; counter=0 is implied (overlaps with key4[2:4] which must be 0x0000)
p[3] = key4[2]
p[4] = key4[3]
else:
p[3] = (counter >> 8) & 0xFF
p[4] = counter & 0xFF
return bytes(p)
def bulk_waveform_term_params(key4: bytes, counter: int) -> bytes:
"""
Build the 10-byte params block for the SUB 5A termination request.
The termination request uses offset=0x005A and a DIFFERENT params layout —
the leading 0x00 byte is dropped, key4[0:2] shifts to params[0:2], and the
counter high byte is at params[2]:
params[0] = key4[0]
params[1] = key4[1]
params[2] = (counter >> 8) & 0xFF
params[3:] = zeros
Counter for the termination request = last_regular_counter + 0x0400.
Confirmed from 1-2-26 BW TX capture: final request (frame 83) uses
offset=0x005A, params[0:3] = key4[0:2] + term_counter_hi.
Args:
key4: 4-byte waveform key.
counter: Termination counter (= last regular counter + 0x0400).
Returns:
10-byte params block.
"""
if len(key4) != 4:
raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}")
p = bytearray(10)
p[0] = key4[0]
p[1] = key4[1]
p[2] = (counter >> 8) & 0xFF
return bytes(p)
# ── Pre-built POLL frames ─────────────────────────────────────────────────────
#
# POLL (SUB 0x5B) uses the same two-step pattern as all other reads — the

View File

@@ -31,6 +31,8 @@ from .framing import (
build_bw_frame,
waveform_key_params,
token_params,
bulk_waveform_params,
bulk_waveform_term_params,
POLL_PROBE,
POLL_DATA,
)
@@ -88,6 +90,17 @@ DATA_LENGTHS: dict[int, int] = {
# NOT handled here — requires specialised read logic.
}
# SUB 5A (BULK_WAVEFORM_STREAM) protocol constants.
# Confirmed from 1-2-26 BW TX capture analysis (2026-04-02).
_BULK_CHUNK_OFFSET = 0x1004 # offset field for probe + all regular chunk requests ✅
_BULK_TERM_OFFSET = 0x005A # offset field for termination request ✅
_BULK_COUNTER_STEP = 0x0400 # chunk counter increment per request ✅
# Note: BW's second chunk used counter=0x1004 rather than the expected 0x0400.
# This appears to be a waveform-specific pre-trigger byte offset unique to BW's
# implementation. All subsequent chunks incremented by 0x0400 as expected.
# 🔶 INFERRED: device echoes the counter back but may not validate it.
# Confirm empirically on first live test.
# Default timeout values (seconds).
# MiniMate Plus is a slow device — keep these generous.
DEFAULT_RECV_TIMEOUT = 10.0
@@ -395,6 +408,115 @@ class MiniMateProtocol:
log.debug("read_waveform_record: received %d record bytes", len(record))
return record
def read_bulk_waveform_stream(
self,
key4: bytes,
*,
stop_after_metadata: bool = True,
max_chunks: int = 32,
) -> list[bytes]:
"""
Download the SUB 5A (BULK_WAVEFORM_STREAM) A5 frames for one event.
The bulk waveform stream carries both raw ADC samples (large) and
event-time metadata strings ("Project:", "Client:", "User Name:",
"Seis Loc:", "Extended Notes") embedded in one of the middle frames
(confirmed: A5[7] of 9 for 1-2-26 capture).
Protocol is request-per-chunk, NOT a continuous stream:
1. Probe (offset=_BULK_CHUNK_OFFSET, is_probe=True, counter=0x0000)
2. Chunks (offset=_BULK_CHUNK_OFFSET, is_probe=False, counter+=0x0400)
3. Loop until metadata found (stop_after_metadata=True) or max_chunks
4. Termination (offset=_BULK_TERM_OFFSET, counter=last+_BULK_COUNTER_STEP)
Device responds with a final A5 frame (page_key=0x0000).
The termination frame (page_key=0x0000) is NOT included in the returned list.
Args:
key4: 4-byte waveform key from EVENT_HEADER (1E).
stop_after_metadata: If True (default), send termination as soon as
b"Project:" is found in a frame's data — avoids
downloading the full ADC waveform payload (several
hundred KB). Set False to download everything.
max_chunks: Safety cap on the number of chunk requests sent
(default 32; a typical event uses 9 large frames).
Returns:
List of raw data bytes from each A5 response frame (not including
the terminator frame). Frame indices match the request sequence:
index 0 = probe response, index 1 = first chunk, etc.
Raises:
ProtocolError: on timeout, bad checksum, or unexpected SUB.
Confirmed from 1-2-26 BW TX/RX captures (2026-04-02):
- probe + 8 regular chunks + 1 termination = 10 TX frames
- 9 large A5 responses + 1 terminator A5 = 10 RX frames
- page_key=0x0010 on large frames; page_key=0x0000 on terminator ✅
- "Project:" metadata at A5[7].data[626] ✅
"""
if len(key4) != 4:
raise ValueError(f"waveform key must be 4 bytes, got {len(key4)}")
rsp_sub = _expected_rsp_sub(SUB_BULK_WAVEFORM) # 0xFF - 0x5A = 0xA5
frames_data: list[bytes] = []
counter = 0
# ── Step 1: probe ────────────────────────────────────────────────────
log.debug("5A probe key=%s", key4.hex())
params = bulk_waveform_params(key4, 0, is_probe=True)
self._send(build_bw_frame(SUB_BULK_WAVEFORM, _BULK_CHUNK_OFFSET, params))
rsp = self._recv_one(expected_sub=rsp_sub)
frames_data.append(rsp.data)
log.debug("5A A5[0] page_key=0x%04X %d bytes", rsp.page_key, len(rsp.data))
# ── Step 2: chunk loop ───────────────────────────────────────────────
for chunk_num in range(1, max_chunks + 1):
counter = chunk_num * _BULK_COUNTER_STEP
params = bulk_waveform_params(key4, counter)
log.debug("5A chunk %d counter=0x%04X", chunk_num, counter)
self._send(build_bw_frame(SUB_BULK_WAVEFORM, _BULK_CHUNK_OFFSET, params))
rsp = self._recv_one(expected_sub=rsp_sub)
if rsp.page_key == 0x0000:
# Device unexpectedly terminated mid-stream (no termination needed).
log.debug("5A A5[%d] page_key=0x0000 — device terminated early", chunk_num)
return frames_data
frames_data.append(rsp.data)
log.debug(
"5A A5[%d] page_key=0x%04X %d bytes",
chunk_num, rsp.page_key, len(rsp.data),
)
if stop_after_metadata and b"Project:" in rsp.data:
log.debug("5A A5[%d] metadata found — stopping early", chunk_num)
break
else:
log.warning(
"5A reached max_chunks=%d without end-of-stream; sending termination",
max_chunks,
)
# ── Step 3: termination ──────────────────────────────────────────────
term_counter = counter + _BULK_COUNTER_STEP
term_params = bulk_waveform_term_params(key4, term_counter)
log.debug(
"5A termination term_counter=0x%04X offset=0x%04X",
term_counter, _BULK_TERM_OFFSET,
)
self._send(build_bw_frame(SUB_BULK_WAVEFORM, _BULK_TERM_OFFSET, term_params))
try:
term_rsp = self._recv_one(expected_sub=rsp_sub)
log.debug(
"5A termination response page_key=0x%04X %d bytes",
term_rsp.page_key, len(term_rsp.data),
)
except TimeoutError:
log.debug("5A no termination response — device may have already closed")
return frames_data
def advance_event(self) -> bytes:
"""
Send the SUB 1F (EVENT_ADVANCE) two-step read with download-mode token