From ab14328c8b4fbb9c31bb77132c47968983adf18c Mon Sep 17 00:00:00 2001 From: Brian Harrison Date: Fri, 10 Apr 2026 00:58:54 -0400 Subject: [PATCH] feat: enhance logging messages in ach_server.py and add experiments.py for protocol minimization --- bridges/ach_server.py | 24 +- docs/instantel_protocol_reference.md | 5 +- experiments.py | 634 +++++++++++++++++++++++++++ 3 files changed, 649 insertions(+), 14 deletions(-) create mode 100644 experiments.py diff --git a/bridges/ach_server.py b/bridges/ach_server.py index 1c37f86..69050b7 100644 --- a/bridges/ach_server.py +++ b/bridges/ach_server.py @@ -185,9 +185,9 @@ class AchSession: from minimateplus.protocol import MiniMateProtocol proto = MiniMateProtocol(transport, recv_timeout=self.timeout) proto.startup() - log.info(" ✓ Startup OK — pull protocol confirmed") + log.info(" [OK] Startup OK -- pull protocol confirmed") except Exception as exc: - log.error(" ✗ Startup failed: %s", exc) + log.error(" [FAIL] Startup failed: %s", exc) return # ── Step 2: device info ─────────────────────────────────────────── @@ -196,16 +196,16 @@ class AchSession: log.info("Step 2/3: reading device info") try: device_info = client.connect() - serial = device_info.serial_number + serial = device_info.serial _save_json(session_dir / "device_info.json", _device_info_to_dict(device_info)) log.info( - " ✓ Device: serial=%s firmware=%s calibration=%s", + " [OK] Device: serial=%s firmware=%s calibration=%s", serial, device_info.firmware_version, device_info.calibration_date, ) except Exception as exc: - log.error(" ✗ Device info failed: %s", exc) + log.error(" [FAIL] Device info failed: %s", exc) else: log.info("Step 2/3: skipping device info (--events-only)") @@ -221,12 +221,12 @@ class AchSession: log.info(" Unit has %d stored event(s); last downloaded count: %d", current_count, last_count) except Exception as exc: - log.error(" ✗ count_events failed: %s", exc) + log.error(" [FAIL] count_events failed: %s", exc) return if current_count <= last_count: - log.info(" ✓ No new events since last call-home — nothing to download") - log.info("Session complete (no new events) → %s", session_dir) + log.info(" [OK] No new events since last call-home -- nothing to download") + log.info("Session complete (no new events) -> %s", session_dir) return new_event_count = current_count - last_count @@ -252,7 +252,7 @@ class AchSession: ) # Only the events beyond last_count are genuinely new new_events = all_events[last_count:] - log.info(" ✓ Downloaded %d total event(s), %d new", + log.info(" [OK] Downloaded %d total event(s), %d new", len(all_events), len(new_events)) _save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events]) @@ -280,13 +280,13 @@ class AchSession: _save_state(self.state_path, state) except Exception as exc: - log.error(" ✗ Event download failed: %s", exc, exc_info=True) + log.error(" [FAIL] Event download failed: %s", exc, exc_info=True) finally: raw_fh.close() client.close() # closes transport / socket cleanly - log.info("Session complete → %s", session_dir) + log.info("Session complete -> %s", session_dir) log.info("="*60) @@ -300,7 +300,7 @@ def _save_json(path: Path, obj: object) -> None: def _device_info_to_dict(d: DeviceInfo) -> dict: return { - "serial_number": d.serial_number, + "serial": d.serial, "firmware_version": d.firmware_version, "calibration_date": str(d.calibration_date) if d.calibration_date else None, "aux_trigger": d.aux_trigger, diff --git a/docs/instantel_protocol_reference.md b/docs/instantel_protocol_reference.md index 57be0f3..e20c15f 100644 --- a/docs/instantel_protocol_reference.md +++ b/docs/instantel_protocol_reference.md @@ -93,7 +93,8 @@ | 2026-04-06 | §7.8.4 | **NEW — 5A chunk timing and count (empirical, BE11529 at 1024 sps).** Each chunk response arrives within ~1 second over TCP/cellular. A 9,306-sample event (≈9.1 s at 1024 sps) produces **35 chunks** before end-of-stream. Chunks 1–16 have varying data lengths (1036–1123 bytes); chunks 17–35 are uniformly 1036 bytes each (post-event silence, all-zero ADC samples). Safe recv timeout for chunk loop: **10 s** (10× typical response time). Default transport timeout (120 s) results in a ~2-minute stall per event at end-of-stream. | | 2026-04-06 | §7.8.3 | **KNOWN ISSUE — `_decode_a5_waveform` hardcoded fi==9 skip.** The decoder contains `elif fi == 9: continue` which was written for the 9-frame original blast capture where frame 9 was a device terminator. For streams with >9 frames (current device produces 35+), frame index 9 is live waveform data — this skip discards ~1,070 bytes (~133 sample-sets) per event. The terminator is now detected via `page_key == 0x0000`, not by frame index. The fi==9 skip should be removed. | | 2026-04-06 | §7.8 | **CONFIRMED — ADC count-to-physical-unit conversion.** Raw waveform samples are signed 16-bit integers (counts). Conversion: `value = counts × (range / 32767)`. For geo channels: range = 10.000 in/s (from the device's compliance config geo range field). For the mic channel: range is in psi (device-specific). Near-full-scale counts (≈32,700) on all four channels simultaneously indicate ADC saturation (clipping) from a high-amplitude event. | -| 2026-04-08 | §5.1, §7.10, §12 | **NEW — Monitoring commands confirmed.** SUB 0x1C (monitor status), 0x96 (start monitoring), 0x97 (stop monitoring) all confirmed from 4-8-26/2ndtry capture. SESSION_RESET (`41 03`) required before POLL to wake a monitoring unit. `section[6] == 0x10` is the monitoring flag (CORRECTED 2026-04-08 — was wrongly `section[1]`). Battery/memory at relative-from-end offsets: `section[-11:-9]` (battery×100), `section[-9:-5]` (memory_total), `section[-5:-1]` (memory_free) — stable across all payload size variants (52–55 bytes). | +| 2026-04-08 | §5.1, §7.10, §12 | **NEW — Monitoring commands confirmed.** SUB 0x1C (monitor status), 0x96 (start monitoring), 0x97 (stop monitoring) all confirmed from 4-8-26/2ndtry capture. SESSION_RESET (`41 03`) required before POLL to wake a monitoring unit. | +| 2026-04-09 | §7.10 | **CORRECTED — monitoring flag and battery/memory offsets.** `section[1] == 0x10` is the monitoring flag (100% accurate across 144 data frames in 2ndtry capture). Previous note claiming `section[6]` was wrong — section[6] has device-specific non-binary values (0xea/0x07). Battery/memory offsets corrected: `section[-10:-8]` (battery×100), `section[-8:-4]` (memory_total), `section[-4:]` (memory_free). NOTE: `frame.data` has checksum stripped by parser — earlier offsets of `[-11:-9]`/`[-9:-5]`/`[-5:-1]` were wrong because they assumed a trailing checksum byte that isn't there. | | 2026-04-08 | §7.10 | **NEW — SUBs 0x0E (channel sensor data) and 0x98 (trigger test) observed** in 4-8-26/sensor-check capture (Blastware "Unit Channel Test" comms check). SUB 0x0E: 2-step read with channel selector in `params[6:8]`, data length 0x0A per channel, RSP SUB = 0xF1. SUB 0x98: single probe frame with `params[0] = 0xFF`, RSP SUB = 0x67; sent twice per test cycle. Not yet implemented in SFM. | | 2026-04-08 | §7.10 | **NEW — SUBs 0x15 and 0x01 observed in sensor-check capture.** SUB 0x15 (serial number short form, data length 0x0A, RSP 0xEA) and SUB 0x01 (device info block, data length 0x98 = 152 bytes, RSP 0xFE) seen in Blastware's "Unit Channel Test" init sequence. Note: SUB 0x01 response SUB 0xFE collides with the existing SUB 0xFE → RSP 0x01 naming convention — they are inverse commands. | | 2026-04-08 | §12 | **CONFIRMED — Unit partially reachable during on-device sensor check.** 4-8-26/sensor-check capture shows: POLL responds normally throughout; SUB 0x0E channel reads partially served (channels 0–4 responded), then ~40s silent gap while sensor check ran, then channels 5–7 responded. On-device sensor check duration ≈ 40 s. SFM `_pollMonitorConfirm()` polls status every 5 s for up to 60 s after start_monitoring. | @@ -256,7 +257,7 @@ Step 4 — Device sends actual data payload: | `2E` | **UNKNOWN READ B** | Read command, response (`D1`) returns 0x1A (26) bytes. Purpose unknown. | 🔶 INFERRED | | `0E` | **CHANNEL SENSOR DATA** | Real-time sensor reading for one channel. Two-step read, data length 0x0A (10 bytes). Channel selector in params[6:8] (0x0000–0x0007 for 8 channels). Response (F1) carries amplitude, frequency, overswing data for that channel. Used by Blastware "Unit Channel Test" comms check. | ✅ CONFIRMED 2026-04-08 | | `98` | **TRIGGER TEST** | Trigger-test command. Single probe frame; `params[0] = 0xFF`. Response (0x67) is all-zero data. Sent twice per Blastware comms-check cycle. Not a full POLL, no monitor state change. | ✅ CONFIRMED 2026-04-08 | -| `1C` | **MONITOR STATUS READ** | Two-step read, data offset 0x2C (44 bytes). `section[6] == 0x10` → monitoring; `0x00` → idle (CORRECTED 2026-04-08 — was wrongly documented as section[1]). Payload length varies (52–55 bytes) but battery/memory block is always the last 10 bytes before checksum: `section[-11:-9]` = battery×100 (uint16 BE), `section[-9:-5]` = memory_total (uint32 BE), `section[-5:-1]` = memory_free (uint32 BE). Confirmed from 2ndtry 4-8-26 full byte diff across 3 payload size variants. | ✅ CONFIRMED 2026-04-08 | +| `1C` | **MONITOR STATUS READ** | Two-step read, data offset 0x2C (44 bytes). `section[1] == 0x10` → monitoring; `0x00` → idle (CONFIRMED 2026-04-09, 100% accuracy on 144 frames). Payload length: 46–47 bytes IDLE, 48–49 bytes MONITORING. `frame.data` has checksum stripped — no trailing byte to skip. Battery/memory at end: `section[-10:-8]` = battery×100 (uint16 BE), `section[-8:-4]` = memory_total (uint32 BE), `section[-4:]` = memory_free (uint32 BE). | ✅ CONFIRMED 2026-04-09 | | `96` | **START MONITORING** | Single write frame, no data payload. Transitions unit from idle to monitoring mode (after optional on-device sensor check ~40 s). | ✅ CONFIRMED 2026-04-08 | | `97` | **STOP MONITORING** | Single write frame, no data payload. Stops monitoring, unit returns to idle. | ✅ CONFIRMED 2026-04-08 | diff --git a/experiments.py b/experiments.py new file mode 100644 index 0000000..7c5bff3 --- /dev/null +++ b/experiments.py @@ -0,0 +1,634 @@ +#!/usr/bin/env python3 +""" +experiments.py — Protocol minimization experiments for MiniMate Plus. + +Goal: figure out which steps in Blastware's sequences are truly required vs. +cargo-culted, so we can build a faster, smarter client. + +Each experiment is self-contained (opens its own TCP connection) and reports +PASS / FAIL / INCONCLUSIVE with timing and notes. + +Usage: + python experiments.py [--host IP] [--port PORT] [exp1 exp2 ...] + + Run all: python experiments.py + Run specific: python experiments.py cold_status fast_event_count no_5a + +Available experiments +--------------------- + cold_status EXP1 Monitor status (1C) with NO prior POLL + fast_event_count EXP2 Event count via POLL+08 only — skip identity reads + no_5a EXP3 Event record (0C) without bulk waveform stream (5A) + skip_1e EXP4 0A/0C directly with cached key — skip initial 1E + fewer_polls EXP5 Only 1 POLL before 5A instead of Blastware's 3 + compliance_only EXP6 Write compliance ONLY (71x3→72), skip event index+trigger+waveform +""" + +from __future__ import annotations + +import argparse +import logging +import struct +import sys +import time +from dataclasses import dataclass, field +from typing import Optional + +logging.basicConfig( + level=logging.WARNING, # experiment output is via print(); set DEBUG for wire trace + format="%(asctime)s %(levelname)-7s %(name)-20s %(message)s", + datefmt="%H:%M:%S", +) +log = logging.getLogger("experiments") + +# ── Imports ─────────────────────────────────────────────────────────────────── + +from minimateplus.transport import TcpTransport +from minimateplus.protocol import ( + MiniMateProtocol, + ProtocolError, + TimeoutError as ProtoTimeout, + SUB_MONITOR_STATUS, + SUB_SERIAL_NUMBER, + SUB_FULL_CONFIG, + SUB_EVENT_INDEX, + SUB_COMPLIANCE, + SUB_WRITE_CONFIRM_A, + SUB_WRITE_CONFIRM_B, +) +from minimateplus.framing import build_bw_frame, SESSION_RESET +from minimateplus.client import ( + MiniMateClient, + _decode_compliance_config_into, + _encode_compliance_config, +) +from minimateplus.models import DeviceInfo + + +DEFAULT_HOST = "63.43.212.232" +DEFAULT_PORT = 9034 + + +# ── Result container ────────────────────────────────────────────────────────── + +@dataclass +class Result: + name: str + outcome: str # "PASS" | "FAIL" | "INCONCLUSIVE" + elapsed: float = 0.0 + notes: str = "" + details: dict = field(default_factory=dict) + + def __str__(self) -> str: + sym = {"PASS": "✅", "FAIL": "❌", "INCONCLUSIVE": "⚠️ "}.get(self.outcome, "?") + lines = [f" {sym} {self.outcome:13s} {self.name} ({self.elapsed:.1f}s)"] + if self.notes: + lines.append(f" {self.notes}") + for k, v in self.details.items(): + lines.append(f" {k}: {v}") + return "\n".join(lines) + + +# ── Connection helpers ──────────────────────────────────────────────────────── + +def connect_proto(host: str, port: int, timeout: float = 15.0) -> tuple[TcpTransport, MiniMateProtocol]: + """Open a raw TCP connection and return (transport, proto) without any handshake.""" + t = TcpTransport(host, port) + t.connect() + proto = MiniMateProtocol(t, recv_timeout=timeout) + return t, proto + + +def connect_client(host: str, port: int, timeout: float = 30.0) -> tuple[MiniMateClient, DeviceInfo]: + """Open a MiniMateClient and run the full connect() handshake.""" + transport = TcpTransport(host, port) + client = MiniMateClient(transport=transport, timeout=timeout) + client.open() + info = client.connect() + return client, info + + +# ── Experiment runner ───────────────────────────────────────────────────────── + +def run(name: str, fn, *args, **kwargs) -> Result: + print(f"\n{'─'*60}") + print(f" Running: {name}") + print(f"{'─'*60}") + t0 = time.time() + try: + outcome, notes, details = fn(*args, **kwargs) + except Exception as exc: + outcome = "FAIL" + notes = f"Uncaught exception: {exc}" + details = {} + log.exception("Experiment %s raised:", name) + elapsed = time.time() - t0 + r = Result(name=name, outcome=outcome, elapsed=elapsed, notes=notes, details=details) + print(str(r)) + return r + + +# ══════════════════════════════════════════════════════════════════════════════ +# EXP1 — Monitor status (1C) with NO prior POLL +# ══════════════════════════════════════════════════════════════════════════════ +# +# Blastware always does a full POLL handshake before any other command. +# We want to know: can we query SUB 1C (battery, memory, monitoring state) +# cold, with only a SESSION_RESET signal and no POLL at all? +# +# If PASS: status checks become near-instant (no ~1s POLL round-trip). +# If FAIL: we need POLL first, but maybe we can cache it. + +def exp_cold_status(host: str, port: int) -> tuple[str, str, dict]: + """SUB 1C without any POLL — just SESSION_RESET + 1C probe + 1C data.""" + t, proto = connect_proto(host, port) + try: + print(" Sending SESSION_RESET only (no POLL)") + t.write(SESSION_RESET) + time.sleep(0.1) + + print(" Sending SUB 1C probe (no POLL first)…") + rsp_sub = (0xFF - SUB_MONITOR_STATUS) & 0xFF # 0xE3 + t.write(build_bw_frame(SUB_MONITOR_STATUS, 0x00)) + probe = proto._recv_one(expected_sub=rsp_sub, timeout=8.0) + print(f" 1C probe OK page_key=0x{probe.page_key:04X} data={probe.data.hex()}") + + t.write(build_bw_frame(SUB_MONITOR_STATUS, 0x2C)) + data_rsp = proto._recv_one(expected_sub=rsp_sub, timeout=8.0) + + section = data_rsp.data + print(f" 1C data OK {len(section)} bytes hex: {section.hex()}") + + # Decode battery + memory from the end of the section + details = {"raw_bytes": len(section)} + if len(section) >= 10: + batt_raw = struct.unpack_from(">H", section, len(section) - 10)[0] + mem_total = struct.unpack_from(">I", section, len(section) - 8)[0] + mem_free = struct.unpack_from(">I", section, len(section) - 4)[0] + is_monitoring = (section[1] == 0x10) + details["battery_v"] = f"{batt_raw / 100:.2f} V" + details["memory_total"] = f"{mem_total:,} bytes" + details["memory_free"] = f"{mem_free:,} bytes" + details["monitoring"] = is_monitoring + print(f" battery={batt_raw/100:.2f}V mem_free={mem_free:,} monitoring={is_monitoring}") + + return "PASS", "SUB 1C responded without any POLL — cold status read works!", details + + except ProtoTimeout: + return "FAIL", "Device did not respond to 1C without POLL (timeout)", {} + except ProtocolError as exc: + return "FAIL", f"Protocol error: {exc}", {} + finally: + t.disconnect() + + +# ══════════════════════════════════════════════════════════════════════════════ +# EXP2 — Fast event count: POLL + SUB 08 only (skip identity reads) +# ══════════════════════════════════════════════════════════════════════════════ +# +# Blastware's connect() does: POLL → 15 → 01 → 1A → 08 +# We want to know: can we skip 15/01/1A and go straight from POLL to 08? +# +# Reading identity (15, 01) and full compliance (1A, ~2126 bytes over TCP) +# takes several seconds each connect. If we only need event count, skipping +# them would be a huge win. +# +# If PASS: fast status poll = POLL + 08 only (~2 round trips vs ~8+). + +def exp_fast_event_count(host: str, port: int) -> tuple[str, str, dict]: + """POLL startup → SUB 08 only, skip serial/config/compliance reads.""" + t, proto = connect_proto(host, port) + try: + print(" Running startup (POLL only)…") + proto.startup() + print(" POLL OK — now reading SUB 08 (event index) directly…") + + idx_raw = proto.read_event_index() + print(f" SUB 08 OK {len(idx_raw)} bytes") + + # Try to decode event count from SUB 08 payload + # The raw block is 88 bytes; bytes [3:7] may be a count (uint32 BE) + details = {"idx_raw_len": len(idx_raw)} + if len(idx_raw) >= 7: + count_candidate = struct.unpack_from(">I", idx_raw, 3)[0] + details["count_candidate"] = count_candidate + print(f" idx[3:7] as uint32 BE = {count_candidate} (may or may not be event count)") + + # Also verify we can read 1E without the identity reads having been done + print(" Reading 1E (event header) to confirm event access works…") + key4, data8 = proto.read_event_first() + is_empty = data8[4:8] == b"\x00\x00\x00\x00" + details["first_key"] = key4.hex() + details["is_empty"] = is_empty + print(f" 1E OK key={key4.hex()} empty={is_empty}") + + return "PASS", "POLL+08+1E all work without identity reads (15/01/1A skipped)", details + + except ProtocolError as exc: + return "FAIL", f"Protocol error: {exc}", {} + finally: + t.disconnect() + + +# ══════════════════════════════════════════════════════════════════════════════ +# EXP3 — Get event record (0C) without bulk waveform stream (5A) +# ══════════════════════════════════════════════════════════════════════════════ +# +# Blastware's event download = 1E → 0A → 1E-arm → 0C → 1F(dl) → POLL×3 → 5A → 1F(browse) +# +# The 5A bulk stream is the slow part (several large frames, ~1s+ per event). +# We only need 5A for: client, operator, seis_loc, notes (not in 0C). +# If you don't need those fields, can we do: 1E → 0A → 0C → 1F(browse) ? +# +# Two variants tested: +# 3a: Skip 1E-arm AND 5A — just 0A → 0C → 1F(browse) +# 3b: Include 1E-arm but skip 5A+POLL — 0A → 1E-arm → 0C → 1F(browse) +# +# If PASS: event peak values available without the slow bulk stream. +# If FAIL on 3a but PASS on 3b: 1E-arm required even without 5A. + +def exp_no_5a(host: str, port: int) -> tuple[str, str, dict]: + """Event record via 0A→0C without 5A or POLL×3. Tests both with and without 1E-arm.""" + t, proto = connect_proto(host, port) + try: + print(" Startup (POLL)…") + proto.startup() + + # Get the first event key via 1E + key4, data8 = proto.read_event_first() + if data8[4:8] == b"\x00\x00\x00\x00": + return "INCONCLUSIVE", "Device has no stored events — cannot test", {} + print(f" First event key: {key4.hex()}") + + details: dict = {"key": key4.hex()} + + # ── Variant 3a: 0A → 0C → 1F(browse), no 1E-arm ───────────────────── + print("\n [3a] 0A → 0C → 1F(browse) (NO 1E-arm, NO 5A)") + try: + _hdr, rec_len = proto.read_waveform_header(key4) + print(f" 0A OK rec_len=0x{rec_len:02X}") + record_3a = proto.read_waveform_record(key4) + print(f" 0C OK {len(record_3a)} bytes") + # Check for recognizable content + has_tran = b"Tran" in record_3a + has_vert = b"Vert" in record_3a + has_long = b"Long" in record_3a + print(f" 0C content check: Tran={has_tran} Vert={has_vert} Long={has_long}") + details["3a_0c_bytes"] = len(record_3a) + details["3a_has_peaks"] = has_tran and has_vert and has_long + + # Now try browse 1F without any 5A + key4_next, data8_next = proto.advance_event(browse=True) + null_sentinel = data8_next[4:8] == b"\x00\x00\x00\x00" + print(f" 1F(browse) → key={key4_next.hex()} null={null_sentinel}") + details["3a_1f_ok"] = True + details["3a_outcome"] = "PASS" + except ProtocolError as exc: + print(f" 3a FAILED: {exc}") + details["3a_outcome"] = f"FAIL: {exc}" + # Try to recover by reconnecting for 3b + t.disconnect() + t2, proto2 = connect_proto(host, port) + proto2.startup() + key4, data8 = proto2.read_event_first() + if data8[4:8] == b"\x00\x00\x00\x00": + return "FAIL", f"3a failed and device empty on retry: {exc}", details + t, proto = t2, proto2 + + # ── Variant 3b: 0A → 1E-arm → 0C → 1F(browse), no 5A ─────────────── + print("\n [3b] 0A → 1E-arm(0xFE) → 0C → 1F(browse) (NO POLL×3, NO 5A)") + try: + _hdr, rec_len = proto.read_waveform_header(key4) + print(f" 0A OK rec_len=0x{rec_len:02X}") + + # 1E download-arm (token=0xFE) between 0A and 0C + proto.read_event_first(token=0xFE) + print(" 1E-arm OK") + + record_3b = proto.read_waveform_record(key4) + print(f" 0C OK {len(record_3b)} bytes") + has_tran = b"Tran" in record_3b + print(f" 0C content check: Tran={has_tran} Vert={b'Vert' in record_3b}") + details["3b_0c_bytes"] = len(record_3b) + details["3b_has_peaks"] = has_tran + + # Browse 1F without 5A / POLL×3 + key4_next2, data8_next2 = proto.advance_event(browse=True) + null_sentinel2 = data8_next2[4:8] == b"\x00\x00\x00\x00" + print(f" 1F(browse) → key={key4_next2.hex()} null={null_sentinel2}") + details["3b_1f_ok"] = True + details["3b_outcome"] = "PASS" + except ProtocolError as exc: + print(f" 3b FAILED: {exc}") + details["3b_outcome"] = f"FAIL: {exc}" + + # Summarize + a_ok = details.get("3a_outcome") == "PASS" + b_ok = details.get("3b_outcome") == "PASS" + if a_ok: + return "PASS", "3a: 0A→0C works with NO 1E-arm and NO 5A. Huge speedup possible!", details + elif b_ok: + return "PASS", "3b: 0A→1E-arm→0C works without 5A (1E-arm still needed before 0C)", details + else: + return "FAIL", "Both 3a and 3b failed — 5A may be required for device state", details + + except ProtocolError as exc: + return "FAIL", f"Protocol error during setup: {exc}", {} + finally: + try: + t.disconnect() + except Exception: + pass + + +# ══════════════════════════════════════════════════════════════════════════════ +# EXP4 — Skip initial 1E if we already know the event key +# ══════════════════════════════════════════════════════════════════════════════ +# +# In Blastware, every session starts with 1E to discover the first key. +# But if we already fetched and cached the event keys from a previous session, +# can we skip 1E entirely and go straight to 0A(cached_key)? +# +# Practical use case: we poll the device every N minutes. We already know +# all the event keys from last time. On re-connect, can we go direct to 0A? +# +# If PASS: subsequent polls that don't add new events can skip 1E discovery. + +def exp_skip_1e(host: str, port: int) -> tuple[str, str, dict]: + """Get the first event key, disconnect, reconnect, go straight to 0A (skip 1E).""" + # Phase 1: get the key + t, proto = connect_proto(host, port) + try: + proto.startup() + key4, data8 = proto.read_event_first() + if data8[4:8] == b"\x00\x00\x00\x00": + return "INCONCLUSIVE", "No events stored — cannot test", {} + print(f" Phase 1: got event key = {key4.hex()}") + finally: + t.disconnect() + time.sleep(0.5) + + # Phase 2: fresh connection, skip 1E, go straight to 0A with cached key + t2, proto2 = connect_proto(host, port) + try: + print(" Phase 2: fresh connection — startup + 0A directly (no 1E)") + proto2.startup() + + _hdr, rec_len = proto2.read_waveform_header(key4) + print(f" 0A OK rec_len=0x{rec_len:02X}") + + record = proto2.read_waveform_record(key4) + has_peaks = b"Tran" in record + print(f" 0C OK {len(record)} bytes has_peaks={has_peaks}") + + details = { + "cached_key": key4.hex(), + "0c_bytes": len(record), + "has_peaks": has_peaks, + } + return "PASS", "0A works with cached key — 1E discovery can be skipped on known sessions", details + + except ProtocolError as exc: + return "FAIL", f"0A failed with cached key (device needs 1E first?): {exc}", {"key": key4.hex()} + finally: + t2.disconnect() + + +# ══════════════════════════════════════════════════════════════════════════════ +# EXP5 — Fewer POLLs before 5A (try POLL×1 instead of Blastware's POLL×3) +# ══════════════════════════════════════════════════════════════════════════════ +# +# Blastware always sends 3 full POLL probe+data cycles between 1F and 5A. +# Each POLL is a round trip. Can we get away with just 1? +# +# WARNING: If POLL×1 fails, the device may be in a bad state. We try to +# recover with an extra POLL×2 and a fresh 5A attempt. Even on failure we +# try to leave the device in a usable state. +# +# Strategy: run the full event sequence up to 1F(download), then try 5A +# with only 1 POLL. If 5A responds → PASS. If timeout → try 2 more POLLs +# and check if the device recovers. + +def exp_fewer_polls(host: str, port: int) -> tuple[str, str, dict]: + """Full sequence to 1F, then only 1 POLL before 5A (Blastware does 3).""" + t, proto = connect_proto(host, port) + try: + proto.startup() + + key4, data8 = proto.read_event_first() + if data8[4:8] == b"\x00\x00\x00\x00": + return "INCONCLUSIVE", "No events stored — cannot test", {} + print(f" Event key: {key4.hex()}") + + # Full setup: 0A → 1E-arm → 0C → 1F(download) + _hdr, rec_len = proto.read_waveform_header(key4) + print(f" 0A OK rec_len=0x{rec_len:02X}") + proto.read_event_first(token=0xFE) # 1E-arm + print(" 1E-arm OK") + proto.read_waveform_record(key4) + print(" 0C OK") + arm_key4, _ = proto.advance_event(browse=False) # 1F(download) — arms 5A + print(f" 1F(download) OK arm_key={arm_key4.hex()}") + + # Only 1 POLL (Blastware does 3) + print(" Sending 1 POLL (instead of 3)…") + proto.poll() + print(" POLL ok — now probing 5A…") + + try: + frames = proto.read_bulk_waveform_stream(key4, stop_after_metadata=True, max_chunks=12) + print(f" 5A OK after 1 POLL — {len(frames)} frames received") + details = {"poll_count": 1, "frames": len(frames)} + return "PASS", "5A works with only 1 POLL (saved 2 round-trips per event)!", details + + except ProtoTimeout: + print(" 5A timed out after 1 POLL — device needs more POLLs") + # Attempt recovery: send 2 more POLLs and see if 5A then works + print(" Attempting recovery: 2 more POLLs…") + try: + proto.poll() + proto.poll() + frames2 = proto.read_bulk_waveform_stream(key4, stop_after_metadata=True, max_chunks=12) + print(f" 5A worked after total 3 POLLs ({len(frames2)} frames)") + return "FAIL", "5A needs 3 POLLs — 1 is not enough (recovery confirmed 3 still works)", { + "poll_count_tried": 1, "recovery_polls": 3, "recovery_frames": len(frames2) + } + except ProtocolError as exc2: + return "FAIL", f"5A failed even after 3 total POLLs — device may need reconnect: {exc2}", {} + + except ProtocolError as exc: + return "FAIL", f"Setup failed: {exc}", {} + finally: + t.disconnect() + + +# ══════════════════════════════════════════════════════════════════════════════ +# EXP6 — Compliance-only write (71×3→72), skip event index + trigger + waveform +# ══════════════════════════════════════════════════════════════════════════════ +# +# Blastware's full write sequence: 68→73 | 71×3→72 | 82→83 | 69→74→72 +# We want to know: can we write ONLY the compliance block (71×3→72)? +# +# Test procedure: +# 1. Read current compliance config (SUB 1A) +# 2. Patch the "notes" field to a test marker +# 3. Write ONLY 71×3→72 (skip 68, 73, 82, 83, 69, 74, final 72) +# 4. Read back (SUB 1A) and verify the change was written +# 5. Restore original value +# +# If PASS: we can push individual config fields without touching event index, +# trigger config, or waveform data — huge simplification. +# If FAIL: the device needs the full write sequence (may reject partial write). +# +# SAFETY: We restore original data in a finally block. If the restore write +# fails, the device will have the test marker in "notes" — harmless but visible. + +_EXP6_MARKER = "[exp6-test]" + +def exp_compliance_only(host: str, port: int) -> tuple[str, str, dict]: + """Write compliance block alone (71×3→72), verify, and restore.""" + client, info = connect_client(host, port) + original_raw: Optional[bytes] = None + try: + proto = client._proto + if proto is None: + return "FAIL", "Could not get protocol handle from client", {} + + # 1. Read current compliance + print(" Reading current compliance config (SUB 1A)…") + original_raw = proto.read_compliance_config() + print(f" Got {len(original_raw)} bytes of compliance config") + + # Find current notes value for display + info_obj = DeviceInfo() + _decode_compliance_config_into(original_raw, info_obj) + cc = info_obj.compliance_config + orig_notes = cc.notes if cc else "(unknown)" + print(f" Current notes field: {orig_notes!r}") + + # 2. Build modified payload with test marker in notes + test_notes = _EXP6_MARKER + modified_raw = _encode_compliance_config( + original_raw, + notes=test_notes, + ) + print(f" Encoded modified compliance payload ({len(modified_raw)} bytes)") + print(f" Patching notes: {orig_notes!r} → {test_notes!r}") + + # 3. Write ONLY the compliance block: 71×3 → 72 + print(" Writing compliance ONLY (71×3→72) — skipping 68/73/82/83/69/74…") + proto.write_compliance_config_raw(modified_raw) + print(" Write complete — device acked 71×3→72") + + # 4. Read back and verify + print(" Reading back compliance config to verify…") + readback_raw = proto.read_compliance_config() + readback_info = DeviceInfo() + _decode_compliance_config_into(readback_raw, readback_info) + rb_cc = readback_info.compliance_config + readback_notes = rb_cc.notes if rb_cc else "(decode failed)" + print(f" Read-back notes: {readback_notes!r}") + + write_worked = (readback_notes == test_notes) + print(f" Write verified: {write_worked}") + + details = { + "original_notes": orig_notes, + "written_notes": test_notes, + "readback_notes": readback_notes, + "write_verified": write_worked, + } + + if write_worked: + return "PASS", "Compliance-only write works! No event index or trigger writes needed.", details + else: + return "FAIL", f"Write was not reflected in read-back (got {readback_notes!r})", details + + except ProtocolError as exc: + return "FAIL", f"Protocol error: {exc}", {} + + finally: + # Restore original compliance data regardless of outcome + if original_raw is not None: + print(" Restoring original compliance config…") + try: + proto2 = client._proto + if proto2: + proto2.write_compliance_config_raw( + _encode_compliance_config(original_raw) # no-op patch = verbatim + ) + print(" Restore complete") + else: + print(" WARNING: protocol handle gone — could not restore") + except Exception as exc_r: + print(f" WARNING: restore failed: {exc_r}") + client.close() + + +# ══════════════════════════════════════════════════════════════════════════════ +# Registry + main +# ══════════════════════════════════════════════════════════════════════════════ + +EXPERIMENTS = { + "cold_status": ("EXP1", exp_cold_status, "Monitor status (1C) with no POLL"), + "fast_event_count": ("EXP2", exp_fast_event_count, "Event count via POLL+08, skip identity reads"), + "no_5a": ("EXP3", exp_no_5a, "Event record (0C) without bulk waveform (5A)"), + "skip_1e": ("EXP4", exp_skip_1e, "0A/0C with cached key — skip initial 1E"), + "fewer_polls": ("EXP5", exp_fewer_polls, "1 POLL before 5A instead of Blastware's 3"), + "compliance_only": ("EXP6", exp_compliance_only, "Compliance-only write (71×3→72), no other blocks"), +} + + +def main() -> None: + ap = argparse.ArgumentParser(description="MiniMate Plus protocol minimization experiments") + ap.add_argument("--host", default=DEFAULT_HOST) + ap.add_argument("--port", type=int, default=DEFAULT_PORT) + ap.add_argument("--debug", action="store_true", help="Enable DEBUG wire logging") + ap.add_argument("experiments", nargs="*", + help=f"Which to run (default: all). Choices: {', '.join(EXPERIMENTS)}") + args = ap.parse_args() + + if args.debug: + logging.getLogger().setLevel(logging.DEBUG) + + which = args.experiments or list(EXPERIMENTS.keys()) + unknown = [e for e in which if e not in EXPERIMENTS] + if unknown: + print(f"Unknown experiments: {unknown}") + print(f"Available: {', '.join(EXPERIMENTS)}") + sys.exit(1) + + print(f"\n{'═'*60}") + print(f" MiniMate Plus Protocol Minimization Experiments") + print(f" Target: {args.host}:{args.port}") + print(f" Running: {', '.join(which)}") + print(f"{'═'*60}") + + results: list[Result] = [] + for key in which: + tag, fn, desc = EXPERIMENTS[key] + label = f"{tag}: {desc}" + r = run(label, fn, args.host, args.port) + results.append(r) + time.sleep(1.5) # brief pause between experiments — let device settle + + print(f"\n\n{'═'*60}") + print(" SUMMARY") + print(f"{'═'*60}") + for r in results: + sym = {"PASS": "✅", "FAIL": "❌", "INCONCLUSIVE": "⚠️ "}.get(r.outcome, "?") + print(f" {sym} {r.outcome:13s} {r.name}") + print(f"{'═'*60}") + + passed = sum(1 for r in results if r.outcome == "PASS") + failed = sum(1 for r in results if r.outcome == "FAIL") + skipped = sum(1 for r in results if r.outcome == "INCONCLUSIVE") + print(f" {passed} passed {failed} failed {skipped} inconclusive") + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\nInterrupted.") + sys.exit(0)