#!/usr/bin/env python3 """ diagnose_5a_frames.py -- Frame-by-frame diagnostic for SUB 5A waveform streams. Usage: python diagnose_5a_frames.py [--host HOST] [--port PORT] [--event INDEX] Connects to the device, downloads the waveform for the specified event (default 0 = most recently stored), and prints detailed per-frame info for every A5 response frame: fi=N | db=NNN B w=NNN B | "Project:" in db=[offsets] in w=[offsets] <-- METADATA if detected w[0:32] = w[-8:] = [waveform bytes or ASCII snippet] Then shows: - total non-metadata frames, total waveform bytes, total sample-sets decoded - compliance-config expected vs decoded counts - sample values at the flat-line onset region (~1700-1820) - first near-zero run location (|T| < 20 for 10+ consecutive samples) Run with: python diagnose_5a_frames.py 2>&1 | tee /tmp/diag_output.txt """ from __future__ import annotations import argparse import logging import struct import sys # -- Setup logging ------------------------------------------------------------- logging.basicConfig( level=logging.WARNING, # suppress library noise; we print our own output format="%(asctime)s %(levelname)s %(name)s: %(message)s", stream=sys.stderr, ) from minimateplus import MiniMateClient from minimateplus.transport import TcpTransport log = logging.getLogger("diagnose") log.setLevel(logging.INFO) def decode_int16_sets(wave: bytes, n: int = 8) -> list[tuple[int, int, int, int]]: """Decode up to n sample-sets from wave bytes as [T, V, L, M] int16 LE.""" sets = [] for i in range(min(n, len(wave) // 8)): off = i * 8 t = struct.unpack_from(" list[int]: """Return all offsets where needle appears in data.""" positions = [] start = 0 while True: pos = data.find(needle, start) if pos < 0: break positions.append(pos) start = pos + 1 return positions def sep(label: str = "") -> None: width = 80 if label: pad = max(0, (width - len(label) - 2) // 2) print(f"\n{'-' * pad} {label} {'-' * max(0, width - pad - len(label) - 2)}") else: print("-" * width) def diagnose(frames_data: list[bytes], compliance_config=None) -> None: """Analyse all A5 frames and print diagnostic info.""" sep("PER-FRAME ANALYSIS") print(f"Total A5 frames received: {len(frames_data)}") print() all_chunks: list[tuple[int, bytes]] = [] # (fi, wave_bytes) cumulative_wave_bytes = 0 for fi, db in enumerate(frames_data): w = db[7:] # what _decode_a5_waveform sees (db[7:]) # Find "Project:" in both the full frame data and the w=db[7:] slice proj_in_db = find_all(db, b"Project:") proj_in_w = find_all(w, b"Project:") # The live detector in client.py uses: b"Project:" in w detected_as_metadata = bool(proj_in_w) flag = " <-- METADATA (skipped)" if detected_as_metadata else "" print(f"fi={fi:3d} db={len(db):5d}B w={len(w):5d}B " f"Project: in db={proj_in_db} in w(db[7:])={proj_in_w}{flag}") hex_head = w[:32].hex(' ') hex_tail = w[-8:].hex(' ') if len(w) >= 8 else w.hex(' ') print(f" w[0:32] = {hex_head}") print(f" w[-8:] = {hex_tail}") if fi == 0: sp = w.find(b"STRT") if sp >= 0: strt = w[sp:sp + 21] print(f" STRT at w[{sp}]: {strt.hex(' ')}") wave = w[sp + 21:] if wave: sets = decode_int16_sets(wave, 4) print(f" wave[sp+21:] first 4 sets (T,V,L,M): {sets}") all_chunks.append((fi, wave)) cumulative_wave_bytes += len(wave) print(f" cum_bytes={cumulative_wave_bytes} cum_sets={cumulative_wave_bytes // 8}") else: print(f" *** STRT NOT FOUND ***") elif detected_as_metadata: # Print the ASCII content to confirm this is the real metadata frame try: snippet = w.decode("ascii", errors="replace") # Find the first 200 printable characters printable = snippet[:200].replace("\x00", ".").replace("\r", "\n").replace("\n", "\n") print(f" ASCII: {repr(printable[:140])}") except Exception as e: print(f" (decode error: {e})") else: # Regular chunk: strip 8-byte header if len(w) >= 8: wave = w[8:] all_chunks.append((fi, wave)) cumulative_wave_bytes += len(wave) sets = decode_int16_sets(wave, 4) # Count near-zero Tran values all_sets = decode_int16_sets(wave, len(wave) // 8) nz = sum(1 for s in all_sets if abs(s[0]) < 20) print(f" wave[8:] first 4 sets: {sets}") print(f" cum_bytes={cumulative_wave_bytes} cum_sets={cumulative_wave_bytes // 8} " f"near-zero(|T|<20): {nz}/{len(all_sets)}") print() # -- Waveform value analysis ------------------------------------------------ sep("WAVEFORM DECODE") cc_sr = 1024 cc_rt = None pretrig = 256 total_expected = 0 if compliance_config: cc_sr = compliance_config.sample_rate or 1024 cc_rt = compliance_config.record_time pretrig = int(round(0.25 * cc_sr)) if cc_rt: total_expected = pretrig + int(round(cc_rt * cc_sr)) print(f"Compliance: sr={cc_sr} sps record_time={cc_rt} s " f"pretrig={pretrig} total_expected={total_expected}") else: print("No compliance config -- using defaults: sr=1024, pretrig=256") total_wave_bytes = sum(len(w) for _, w in all_chunks) total_sets_raw = total_wave_bytes // 8 print(f"Non-metadata frames: {len(all_chunks)} " f"Total wave bytes: {total_wave_bytes} " f"Raw sample-sets: {total_sets_raw}") # Alignment-corrected decode (matches _decode_a5_waveform exactly) tran: list[int] = [] running_offset = 0 for fi, wave in all_chunks: align = running_offset % 8 skip = (8 - align) % 8 if skip > 0 and skip < len(wave): usable = wave[skip:] elif align == 0: usable = wave else: running_offset += len(wave) continue n_usable = len(usable) // 8 for i in range(n_usable): tran.append(struct.unpack_from("= 32: print(f"Last 16 Tran: {tran[-16:]}") # -- Flat-line onset search ------------------------------------------------- sep("FLAT-LINE ONSET (first run of 10+ consecutive |Tran| < 20)") run_start = None run_len = 0 onset_found = False for i, v in enumerate(tran): if abs(v) < 20: if run_start is None: run_start = i run_len += 1 else: if run_len >= 10: t_ms = (run_start - pretrig) * 1000.0 / cc_sr print(f" First near-zero run: sample {run_start}-{run_start + run_len - 1} " f"(t={t_ms:.1f}ms post-trigger) length={run_len}") onset_found = True break run_start = None run_len = 0 else: if run_len >= 10 and run_start is not None: t_ms = (run_start - pretrig) * 1000.0 / cc_sr print(f" Near-zero run at end: sample {run_start}-{n_decoded - 1} " f"(t={t_ms:.1f}ms post-trigger) length={run_len}") onset_found = True if not onset_found: print(" No near-zero run of 10+ samples found (waveform looks active throughout)") # Print samples around the expected flat-line onset (~1700-1820) if n_decoded >= 1700: print() print("Tran samples [1700:1820] (10 per line):") for row_start in range(1700, min(1820, n_decoded), 10): row = tran[row_start:row_start + 10] t_ms_row = (row_start - pretrig) * 1000.0 / cc_sr print(f" [{row_start:4d}] (t={t_ms_row:6.1f}ms): {row}") else: print(f" Only {n_decoded} samples decoded -- range 1700-1820 not available") def main() -> None: parser = argparse.ArgumentParser(description="Diagnose A5 5A waveform frames") parser.add_argument("--host", default="63.43.212.232", help="Device IP") parser.add_argument("--port", type=int, default=9034, help="TCP port") parser.add_argument("--event", type=int, default=0, help="Event index (0=first stored)") args = parser.parse_args() print(f"Connecting to {args.host}:{args.port} ...") print(f"Target event index: {args.event}") print() transport = TcpTransport(args.host, port=args.port) with MiniMateClient(transport=transport) as client: info = client.connect() print(f"Device: serial={info.serial} firmware={info.firmware_version}") compliance_config = info.compliance_config if compliance_config: print(f"Compliance: sample_rate={compliance_config.sample_rate} " f"record_time={compliance_config.record_time}") print() proto = client._proto assert proto is not None # -- Walk to the target event ------------------------------------------ log.info("Reading first event key (SUB 1E) ...") first_key4, first_data8 = proto.read_event_first(token=0) print(f"First event key: {first_key4.hex()}") cur_key4 = first_key4 cur_data8 = first_data8 event_idx = 0 while event_idx < args.event: # 0A required before each 1F to establish device context proto.read_waveform_header(cur_key4) next_key4, next_data8 = proto.advance_event(browse=True) if next_data8[4:8] == b"\x00\x00\x00\x00": print(f"Only {event_idx + 1} events available; cannot reach index {args.event}") return cur_key4 = next_key4 cur_data8 = next_data8 event_idx += 1 print(f" advanced to event {event_idx}: key={cur_key4.hex()}") print(f"\nDownloading event {args.event}: key={cur_key4.hex()}") # -- Full download sequence (matches get_events download-mode) --------- log.info("0A: read_waveform_header ...") proto.read_waveform_header(cur_key4) log.info("1E(0xFE): arm device for 5A ...") proto.read_event_first(token=0xFE) log.info("0C: read_waveform_record ...") wfm_raw = proto.read_waveform_record(cur_key4) print(f"0C waveform record: {len(wfm_raw)} bytes") log.info("1F(0xFE): arm 5A state machine ...") arm_key4, _ = proto.advance_event(browse=False) print(f"1F(arm) returned key: {arm_key4.hex()}") log.info("POLLx3 ...") for i in range(3): proto.poll() print(f" POLL {i+1}/3 OK") print(f"\nStarting 5A bulk stream for key={cur_key4.hex()} ...") frames_data = proto.read_bulk_waveform_stream( cur_key4, stop_after_metadata=False, max_chunks=2048, ) print(f"5A complete: {len(frames_data)} A5 frames") print() # -- Run the diagnostic ------------------------------------------------ diagnose(frames_data, compliance_config) if __name__ == "__main__": main()