Files
seismo-relay/diagnose_5a_frames.py
T
claude a46961c124 fix: waveform decode improved for accuracy.
feat: adds 5a diagnostic script to parse raw binary
2026-04-15 16:36:41 -04:00

329 lines
12 KiB
Python

#!/usr/bin/env python3
"""
diagnose_5a_frames.py -- Frame-by-frame diagnostic for SUB 5A waveform streams.
Usage:
python diagnose_5a_frames.py [--host HOST] [--port PORT] [--event INDEX]
Connects to the device, downloads the waveform for the specified event (default 0 =
most recently stored), and prints detailed per-frame info for every A5 response frame:
fi=N | db=NNN B w=NNN B | "Project:" in db=[offsets] in w=[offsets] <-- METADATA if detected
w[0:32] = <hex>
w[-8:] = <hex>
[waveform bytes or ASCII snippet]
Then shows:
- total non-metadata frames, total waveform bytes, total sample-sets decoded
- compliance-config expected vs decoded counts
- sample values at the flat-line onset region (~1700-1820)
- first near-zero run location (|T| < 20 for 10+ consecutive samples)
Run with: python diagnose_5a_frames.py 2>&1 | tee /tmp/diag_output.txt
"""
from __future__ import annotations
import argparse
import logging
import struct
import sys
# -- Setup logging -------------------------------------------------------------
logging.basicConfig(
level=logging.WARNING, # suppress library noise; we print our own output
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
stream=sys.stderr,
)
from minimateplus import MiniMateClient
from minimateplus.transport import TcpTransport
log = logging.getLogger("diagnose")
log.setLevel(logging.INFO)
def decode_int16_sets(wave: bytes, n: int = 8) -> list[tuple[int, int, int, int]]:
"""Decode up to n sample-sets from wave bytes as [T, V, L, M] int16 LE."""
sets = []
for i in range(min(n, len(wave) // 8)):
off = i * 8
t = struct.unpack_from("<h", wave, off)[0]
v = struct.unpack_from("<h", wave, off + 2)[0]
l = struct.unpack_from("<h", wave, off + 4)[0]
m = struct.unpack_from("<h", wave, off + 6)[0]
sets.append((t, v, l, m))
return sets
def find_all(data: bytes, needle: bytes) -> list[int]:
"""Return all offsets where needle appears in data."""
positions = []
start = 0
while True:
pos = data.find(needle, start)
if pos < 0:
break
positions.append(pos)
start = pos + 1
return positions
def sep(label: str = "") -> None:
width = 80
if label:
pad = max(0, (width - len(label) - 2) // 2)
print(f"\n{'-' * pad} {label} {'-' * max(0, width - pad - len(label) - 2)}")
else:
print("-" * width)
def diagnose(frames_data: list[bytes], compliance_config=None) -> None:
"""Analyse all A5 frames and print diagnostic info."""
sep("PER-FRAME ANALYSIS")
print(f"Total A5 frames received: {len(frames_data)}")
print()
all_chunks: list[tuple[int, bytes]] = [] # (fi, wave_bytes)
cumulative_wave_bytes = 0
for fi, db in enumerate(frames_data):
w = db[7:] # what _decode_a5_waveform sees (db[7:])
# Find "Project:" in both the full frame data and the w=db[7:] slice
proj_in_db = find_all(db, b"Project:")
proj_in_w = find_all(w, b"Project:")
# The live detector in client.py uses: b"Project:" in w
detected_as_metadata = bool(proj_in_w)
flag = " <-- METADATA (skipped)" if detected_as_metadata else ""
print(f"fi={fi:3d} db={len(db):5d}B w={len(w):5d}B "
f"Project: in db={proj_in_db} in w(db[7:])={proj_in_w}{flag}")
hex_head = w[:32].hex(' ')
hex_tail = w[-8:].hex(' ') if len(w) >= 8 else w.hex(' ')
print(f" w[0:32] = {hex_head}")
print(f" w[-8:] = {hex_tail}")
if fi == 0:
sp = w.find(b"STRT")
if sp >= 0:
strt = w[sp:sp + 21]
print(f" STRT at w[{sp}]: {strt.hex(' ')}")
wave = w[sp + 21:]
if wave:
sets = decode_int16_sets(wave, 4)
print(f" wave[sp+21:] first 4 sets (T,V,L,M): {sets}")
all_chunks.append((fi, wave))
cumulative_wave_bytes += len(wave)
print(f" cum_bytes={cumulative_wave_bytes} cum_sets={cumulative_wave_bytes // 8}")
else:
print(f" *** STRT NOT FOUND ***")
elif detected_as_metadata:
# Print the ASCII content to confirm this is the real metadata frame
try:
snippet = w.decode("ascii", errors="replace")
# Find the first 200 printable characters
printable = snippet[:200].replace("\x00", ".").replace("\r", "\n").replace("\n", "\n")
print(f" ASCII: {repr(printable[:140])}")
except Exception as e:
print(f" (decode error: {e})")
else:
# Regular chunk: strip 8-byte header
if len(w) >= 8:
wave = w[8:]
all_chunks.append((fi, wave))
cumulative_wave_bytes += len(wave)
sets = decode_int16_sets(wave, 4)
# Count near-zero Tran values
all_sets = decode_int16_sets(wave, len(wave) // 8)
nz = sum(1 for s in all_sets if abs(s[0]) < 20)
print(f" wave[8:] first 4 sets: {sets}")
print(f" cum_bytes={cumulative_wave_bytes} cum_sets={cumulative_wave_bytes // 8} "
f"near-zero(|T|<20): {nz}/{len(all_sets)}")
print()
# -- Waveform value analysis ------------------------------------------------
sep("WAVEFORM DECODE")
cc_sr = 1024
cc_rt = None
pretrig = 256
total_expected = 0
if compliance_config:
cc_sr = compliance_config.sample_rate or 1024
cc_rt = compliance_config.record_time
pretrig = int(round(0.25 * cc_sr))
if cc_rt:
total_expected = pretrig + int(round(cc_rt * cc_sr))
print(f"Compliance: sr={cc_sr} sps record_time={cc_rt} s "
f"pretrig={pretrig} total_expected={total_expected}")
else:
print("No compliance config -- using defaults: sr=1024, pretrig=256")
total_wave_bytes = sum(len(w) for _, w in all_chunks)
total_sets_raw = total_wave_bytes // 8
print(f"Non-metadata frames: {len(all_chunks)} "
f"Total wave bytes: {total_wave_bytes} "
f"Raw sample-sets: {total_sets_raw}")
# Alignment-corrected decode (matches _decode_a5_waveform exactly)
tran: list[int] = []
running_offset = 0
for fi, wave in all_chunks:
align = running_offset % 8
skip = (8 - align) % 8
if skip > 0 and skip < len(wave):
usable = wave[skip:]
elif align == 0:
usable = wave
else:
running_offset += len(wave)
continue
n_usable = len(usable) // 8
for i in range(n_usable):
tran.append(struct.unpack_from("<h", usable, i * 8)[0])
running_offset += len(wave)
n_decoded = len(tran)
print(f"Alignment-corrected decoded Tran samples: {n_decoded}")
if compliance_config and cc_rt:
print(f"Expected: {total_expected} Decoded: {n_decoded} "
f"Excess (tail): {max(0, n_decoded - total_expected)}")
print()
print(f"First 16 Tran: {tran[:16]}")
if n_decoded >= 32:
print(f"Last 16 Tran: {tran[-16:]}")
# -- Flat-line onset search -------------------------------------------------
sep("FLAT-LINE ONSET (first run of 10+ consecutive |Tran| < 20)")
run_start = None
run_len = 0
onset_found = False
for i, v in enumerate(tran):
if abs(v) < 20:
if run_start is None:
run_start = i
run_len += 1
else:
if run_len >= 10:
t_ms = (run_start - pretrig) * 1000.0 / cc_sr
print(f" First near-zero run: sample {run_start}-{run_start + run_len - 1} "
f"(t={t_ms:.1f}ms post-trigger) length={run_len}")
onset_found = True
break
run_start = None
run_len = 0
else:
if run_len >= 10 and run_start is not None:
t_ms = (run_start - pretrig) * 1000.0 / cc_sr
print(f" Near-zero run at end: sample {run_start}-{n_decoded - 1} "
f"(t={t_ms:.1f}ms post-trigger) length={run_len}")
onset_found = True
if not onset_found:
print(" No near-zero run of 10+ samples found (waveform looks active throughout)")
# Print samples around the expected flat-line onset (~1700-1820)
if n_decoded >= 1700:
print()
print("Tran samples [1700:1820] (10 per line):")
for row_start in range(1700, min(1820, n_decoded), 10):
row = tran[row_start:row_start + 10]
t_ms_row = (row_start - pretrig) * 1000.0 / cc_sr
print(f" [{row_start:4d}] (t={t_ms_row:6.1f}ms): {row}")
else:
print(f" Only {n_decoded} samples decoded -- range 1700-1820 not available")
def main() -> None:
parser = argparse.ArgumentParser(description="Diagnose A5 5A waveform frames")
parser.add_argument("--host", default="63.43.212.232", help="Device IP")
parser.add_argument("--port", type=int, default=9034, help="TCP port")
parser.add_argument("--event", type=int, default=0, help="Event index (0=first stored)")
args = parser.parse_args()
print(f"Connecting to {args.host}:{args.port} ...")
print(f"Target event index: {args.event}")
print()
transport = TcpTransport(args.host, port=args.port)
with MiniMateClient(transport=transport) as client:
info = client.connect()
print(f"Device: serial={info.serial} firmware={info.firmware_version}")
compliance_config = info.compliance_config
if compliance_config:
print(f"Compliance: sample_rate={compliance_config.sample_rate} "
f"record_time={compliance_config.record_time}")
print()
proto = client._proto
assert proto is not None
# -- Walk to the target event ------------------------------------------
log.info("Reading first event key (SUB 1E) ...")
first_key4, first_data8 = proto.read_event_first(token=0)
print(f"First event key: {first_key4.hex()}")
cur_key4 = first_key4
cur_data8 = first_data8
event_idx = 0
while event_idx < args.event:
# 0A required before each 1F to establish device context
proto.read_waveform_header(cur_key4)
next_key4, next_data8 = proto.advance_event(browse=True)
if next_data8[4:8] == b"\x00\x00\x00\x00":
print(f"Only {event_idx + 1} events available; cannot reach index {args.event}")
return
cur_key4 = next_key4
cur_data8 = next_data8
event_idx += 1
print(f" advanced to event {event_idx}: key={cur_key4.hex()}")
print(f"\nDownloading event {args.event}: key={cur_key4.hex()}")
# -- Full download sequence (matches get_events download-mode) ---------
log.info("0A: read_waveform_header ...")
proto.read_waveform_header(cur_key4)
log.info("1E(0xFE): arm device for 5A ...")
proto.read_event_first(token=0xFE)
log.info("0C: read_waveform_record ...")
wfm_raw = proto.read_waveform_record(cur_key4)
print(f"0C waveform record: {len(wfm_raw)} bytes")
log.info("1F(0xFE): arm 5A state machine ...")
arm_key4, _ = proto.advance_event(browse=False)
print(f"1F(arm) returned key: {arm_key4.hex()}")
log.info("POLLx3 ...")
for i in range(3):
proto.poll()
print(f" POLL {i+1}/3 OK")
print(f"\nStarting 5A bulk stream for key={cur_key4.hex()} ...")
frames_data = proto.read_bulk_waveform_stream(
cur_key4,
stop_after_metadata=False,
max_chunks=2048,
)
print(f"5A complete: {len(frames_data)} A5 frames")
print()
# -- Run the diagnostic ------------------------------------------------
diagnose(frames_data, compliance_config)
if __name__ == "__main__":
main()