#!/usr/bin/env python3 """ experiments.py — Protocol minimization experiments for MiniMate Plus. Goal: figure out which steps in Blastware's sequences are truly required vs. cargo-culted, so we can build a faster, smarter client. Each experiment is self-contained (opens its own TCP connection) and reports PASS / FAIL / INCONCLUSIVE with timing and notes. Usage: python experiments.py [--host IP] [--port PORT] [exp1 exp2 ...] Run all: python experiments.py Run specific: python experiments.py cold_status fast_event_count no_5a Available experiments --------------------- cold_status EXP1 Monitor status (1C) with NO prior POLL fast_event_count EXP2 Event count via POLL+08 only — skip identity reads no_5a EXP3 Event record (0C) without bulk waveform stream (5A) skip_1e EXP4 0A/0C directly with cached key — skip initial 1E fewer_polls EXP5 Only 1 POLL before 5A instead of Blastware's 3 compliance_only EXP6 Write compliance ONLY (71x3→72), skip event index+trigger+waveform """ from __future__ import annotations import argparse import logging import struct import sys import time from dataclasses import dataclass, field from typing import Optional logging.basicConfig( level=logging.WARNING, # experiment output is via print(); set DEBUG for wire trace format="%(asctime)s %(levelname)-7s %(name)-20s %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger("experiments") # ── Imports ─────────────────────────────────────────────────────────────────── from minimateplus.transport import TcpTransport from minimateplus.protocol import ( MiniMateProtocol, ProtocolError, TimeoutError as ProtoTimeout, SUB_MONITOR_STATUS, SUB_SERIAL_NUMBER, SUB_FULL_CONFIG, SUB_EVENT_INDEX, SUB_COMPLIANCE, SUB_WRITE_CONFIRM_A, SUB_WRITE_CONFIRM_B, ) from minimateplus.framing import build_bw_frame, SESSION_RESET from minimateplus.client import ( MiniMateClient, _decode_compliance_config_into, _encode_compliance_config, ) from minimateplus.models import DeviceInfo DEFAULT_HOST = "63.43.212.232" DEFAULT_PORT = 9034 # ── Result container ────────────────────────────────────────────────────────── @dataclass class Result: name: str outcome: str # "PASS" | "FAIL" | "INCONCLUSIVE" elapsed: float = 0.0 notes: str = "" details: dict = field(default_factory=dict) def __str__(self) -> str: sym = {"PASS": "✅", "FAIL": "❌", "INCONCLUSIVE": "⚠️ "}.get(self.outcome, "?") lines = [f" {sym} {self.outcome:13s} {self.name} ({self.elapsed:.1f}s)"] if self.notes: lines.append(f" {self.notes}") for k, v in self.details.items(): lines.append(f" {k}: {v}") return "\n".join(lines) # ── Connection helpers ──────────────────────────────────────────────────────── def connect_proto(host: str, port: int, timeout: float = 15.0) -> tuple[TcpTransport, MiniMateProtocol]: """Open a raw TCP connection and return (transport, proto) without any handshake.""" t = TcpTransport(host, port) t.connect() proto = MiniMateProtocol(t, recv_timeout=timeout) return t, proto def connect_client(host: str, port: int, timeout: float = 30.0) -> tuple[MiniMateClient, DeviceInfo]: """Open a MiniMateClient and run the full connect() handshake.""" transport = TcpTransport(host, port) client = MiniMateClient(transport=transport, timeout=timeout) client.open() info = client.connect() return client, info # ── Experiment runner ───────────────────────────────────────────────────────── def run(name: str, fn, *args, **kwargs) -> Result: print(f"\n{'─'*60}") print(f" Running: {name}") print(f"{'─'*60}") t0 = time.time() try: outcome, notes, details = fn(*args, **kwargs) except Exception as exc: outcome = "FAIL" notes = f"Uncaught exception: {exc}" details = {} log.exception("Experiment %s raised:", name) elapsed = time.time() - t0 r = Result(name=name, outcome=outcome, elapsed=elapsed, notes=notes, details=details) print(str(r)) return r # ══════════════════════════════════════════════════════════════════════════════ # EXP1 — Monitor status (1C) with NO prior POLL # ══════════════════════════════════════════════════════════════════════════════ # # Blastware always does a full POLL handshake before any other command. # We want to know: can we query SUB 1C (battery, memory, monitoring state) # cold, with only a SESSION_RESET signal and no POLL at all? # # If PASS: status checks become near-instant (no ~1s POLL round-trip). # If FAIL: we need POLL first, but maybe we can cache it. def exp_cold_status(host: str, port: int) -> tuple[str, str, dict]: """SUB 1C without any POLL — just SESSION_RESET + 1C probe + 1C data.""" t, proto = connect_proto(host, port) try: print(" Sending SESSION_RESET only (no POLL)") t.write(SESSION_RESET) time.sleep(0.1) print(" Sending SUB 1C probe (no POLL first)…") rsp_sub = (0xFF - SUB_MONITOR_STATUS) & 0xFF # 0xE3 t.write(build_bw_frame(SUB_MONITOR_STATUS, 0x00)) probe = proto._recv_one(expected_sub=rsp_sub, timeout=8.0) print(f" 1C probe OK page_key=0x{probe.page_key:04X} data={probe.data.hex()}") t.write(build_bw_frame(SUB_MONITOR_STATUS, 0x2C)) data_rsp = proto._recv_one(expected_sub=rsp_sub, timeout=8.0) section = data_rsp.data print(f" 1C data OK {len(section)} bytes hex: {section.hex()}") # Decode battery + memory from the end of the section details = {"raw_bytes": len(section)} if len(section) >= 10: batt_raw = struct.unpack_from(">H", section, len(section) - 10)[0] mem_total = struct.unpack_from(">I", section, len(section) - 8)[0] mem_free = struct.unpack_from(">I", section, len(section) - 4)[0] is_monitoring = (section[1] == 0x10) details["battery_v"] = f"{batt_raw / 100:.2f} V" details["memory_total"] = f"{mem_total:,} bytes" details["memory_free"] = f"{mem_free:,} bytes" details["monitoring"] = is_monitoring print(f" battery={batt_raw/100:.2f}V mem_free={mem_free:,} monitoring={is_monitoring}") return "PASS", "SUB 1C responded without any POLL — cold status read works!", details except ProtoTimeout: return "FAIL", "Device did not respond to 1C without POLL (timeout)", {} except ProtocolError as exc: return "FAIL", f"Protocol error: {exc}", {} finally: t.disconnect() # ══════════════════════════════════════════════════════════════════════════════ # EXP2 — Fast event count: POLL + SUB 08 only (skip identity reads) # ══════════════════════════════════════════════════════════════════════════════ # # Blastware's connect() does: POLL → 15 → 01 → 1A → 08 # We want to know: can we skip 15/01/1A and go straight from POLL to 08? # # Reading identity (15, 01) and full compliance (1A, ~2126 bytes over TCP) # takes several seconds each connect. If we only need event count, skipping # them would be a huge win. # # If PASS: fast status poll = POLL + 08 only (~2 round trips vs ~8+). def exp_fast_event_count(host: str, port: int) -> tuple[str, str, dict]: """POLL startup → SUB 08 only, skip serial/config/compliance reads.""" t, proto = connect_proto(host, port) try: print(" Running startup (POLL only)…") proto.startup() print(" POLL OK — now reading SUB 08 (event index) directly…") idx_raw = proto.read_event_index() print(f" SUB 08 OK {len(idx_raw)} bytes") # Try to decode event count from SUB 08 payload # The raw block is 88 bytes; bytes [3:7] may be a count (uint32 BE) details = {"idx_raw_len": len(idx_raw)} if len(idx_raw) >= 7: count_candidate = struct.unpack_from(">I", idx_raw, 3)[0] details["count_candidate"] = count_candidate print(f" idx[3:7] as uint32 BE = {count_candidate} (may or may not be event count)") # Also verify we can read 1E without the identity reads having been done print(" Reading 1E (event header) to confirm event access works…") key4, data8 = proto.read_event_first() is_empty = data8[4:8] == b"\x00\x00\x00\x00" details["first_key"] = key4.hex() details["is_empty"] = is_empty print(f" 1E OK key={key4.hex()} empty={is_empty}") return "PASS", "POLL+08+1E all work without identity reads (15/01/1A skipped)", details except ProtocolError as exc: return "FAIL", f"Protocol error: {exc}", {} finally: t.disconnect() # ══════════════════════════════════════════════════════════════════════════════ # EXP3 — Get event record (0C) without bulk waveform stream (5A) # ══════════════════════════════════════════════════════════════════════════════ # # Blastware's event download = 1E → 0A → 1E-arm → 0C → 1F(dl) → POLL×3 → 5A → 1F(browse) # # The 5A bulk stream is the slow part (several large frames, ~1s+ per event). # We only need 5A for: client, operator, seis_loc, notes (not in 0C). # If you don't need those fields, can we do: 1E → 0A → 0C → 1F(browse) ? # # Two variants tested: # 3a: Skip 1E-arm AND 5A — just 0A → 0C → 1F(browse) # 3b: Include 1E-arm but skip 5A+POLL — 0A → 1E-arm → 0C → 1F(browse) # # If PASS: event peak values available without the slow bulk stream. # If FAIL on 3a but PASS on 3b: 1E-arm required even without 5A. def exp_no_5a(host: str, port: int) -> tuple[str, str, dict]: """Event record via 0A→0C without 5A or POLL×3. Tests both with and without 1E-arm.""" t, proto = connect_proto(host, port) try: print(" Startup (POLL)…") proto.startup() # Get the first event key via 1E key4, data8 = proto.read_event_first() if data8[4:8] == b"\x00\x00\x00\x00": return "INCONCLUSIVE", "Device has no stored events — cannot test", {} print(f" First event key: {key4.hex()}") details: dict = {"key": key4.hex()} # ── Variant 3a: 0A → 0C → 1F(browse), no 1E-arm ───────────────────── print("\n [3a] 0A → 0C → 1F(browse) (NO 1E-arm, NO 5A)") try: _hdr, rec_len = proto.read_waveform_header(key4) print(f" 0A OK rec_len=0x{rec_len:02X}") record_3a = proto.read_waveform_record(key4) print(f" 0C OK {len(record_3a)} bytes") # Check for recognizable content has_tran = b"Tran" in record_3a has_vert = b"Vert" in record_3a has_long = b"Long" in record_3a print(f" 0C content check: Tran={has_tran} Vert={has_vert} Long={has_long}") details["3a_0c_bytes"] = len(record_3a) details["3a_has_peaks"] = has_tran and has_vert and has_long # Now try browse 1F without any 5A key4_next, data8_next = proto.advance_event(browse=True) null_sentinel = data8_next[4:8] == b"\x00\x00\x00\x00" print(f" 1F(browse) → key={key4_next.hex()} null={null_sentinel}") details["3a_1f_ok"] = True details["3a_outcome"] = "PASS" except ProtocolError as exc: print(f" 3a FAILED: {exc}") details["3a_outcome"] = f"FAIL: {exc}" # Try to recover by reconnecting for 3b t.disconnect() t2, proto2 = connect_proto(host, port) proto2.startup() key4, data8 = proto2.read_event_first() if data8[4:8] == b"\x00\x00\x00\x00": return "FAIL", f"3a failed and device empty on retry: {exc}", details t, proto = t2, proto2 # ── Variant 3b: 0A → 1E-arm → 0C → 1F(browse), no 5A ─────────────── print("\n [3b] 0A → 1E-arm(0xFE) → 0C → 1F(browse) (NO POLL×3, NO 5A)") try: _hdr, rec_len = proto.read_waveform_header(key4) print(f" 0A OK rec_len=0x{rec_len:02X}") # 1E download-arm (token=0xFE) between 0A and 0C proto.read_event_first(token=0xFE) print(" 1E-arm OK") record_3b = proto.read_waveform_record(key4) print(f" 0C OK {len(record_3b)} bytes") has_tran = b"Tran" in record_3b print(f" 0C content check: Tran={has_tran} Vert={b'Vert' in record_3b}") details["3b_0c_bytes"] = len(record_3b) details["3b_has_peaks"] = has_tran # Browse 1F without 5A / POLL×3 key4_next2, data8_next2 = proto.advance_event(browse=True) null_sentinel2 = data8_next2[4:8] == b"\x00\x00\x00\x00" print(f" 1F(browse) → key={key4_next2.hex()} null={null_sentinel2}") details["3b_1f_ok"] = True details["3b_outcome"] = "PASS" except ProtocolError as exc: print(f" 3b FAILED: {exc}") details["3b_outcome"] = f"FAIL: {exc}" # Summarize a_ok = details.get("3a_outcome") == "PASS" b_ok = details.get("3b_outcome") == "PASS" if a_ok: return "PASS", "3a: 0A→0C works with NO 1E-arm and NO 5A. Huge speedup possible!", details elif b_ok: return "PASS", "3b: 0A→1E-arm→0C works without 5A (1E-arm still needed before 0C)", details else: return "FAIL", "Both 3a and 3b failed — 5A may be required for device state", details except ProtocolError as exc: return "FAIL", f"Protocol error during setup: {exc}", {} finally: try: t.disconnect() except Exception: pass # ══════════════════════════════════════════════════════════════════════════════ # EXP4 — Skip initial 1E if we already know the event key # ══════════════════════════════════════════════════════════════════════════════ # # In Blastware, every session starts with 1E to discover the first key. # But if we already fetched and cached the event keys from a previous session, # can we skip 1E entirely and go straight to 0A(cached_key)? # # Practical use case: we poll the device every N minutes. We already know # all the event keys from last time. On re-connect, can we go direct to 0A? # # If PASS: subsequent polls that don't add new events can skip 1E discovery. def exp_skip_1e(host: str, port: int) -> tuple[str, str, dict]: """Get the first event key, disconnect, reconnect, go straight to 0A (skip 1E).""" # Phase 1: get the key t, proto = connect_proto(host, port) try: proto.startup() key4, data8 = proto.read_event_first() if data8[4:8] == b"\x00\x00\x00\x00": return "INCONCLUSIVE", "No events stored — cannot test", {} print(f" Phase 1: got event key = {key4.hex()}") finally: t.disconnect() time.sleep(0.5) # Phase 2: fresh connection, skip 1E, go straight to 0A with cached key t2, proto2 = connect_proto(host, port) try: print(" Phase 2: fresh connection — startup + 0A directly (no 1E)") proto2.startup() _hdr, rec_len = proto2.read_waveform_header(key4) print(f" 0A OK rec_len=0x{rec_len:02X}") record = proto2.read_waveform_record(key4) has_peaks = b"Tran" in record print(f" 0C OK {len(record)} bytes has_peaks={has_peaks}") details = { "cached_key": key4.hex(), "0c_bytes": len(record), "has_peaks": has_peaks, } return "PASS", "0A works with cached key — 1E discovery can be skipped on known sessions", details except ProtocolError as exc: return "FAIL", f"0A failed with cached key (device needs 1E first?): {exc}", {"key": key4.hex()} finally: t2.disconnect() # ══════════════════════════════════════════════════════════════════════════════ # EXP5 — Fewer POLLs before 5A (try POLL×1 instead of Blastware's POLL×3) # ══════════════════════════════════════════════════════════════════════════════ # # Blastware always sends 3 full POLL probe+data cycles between 1F and 5A. # Each POLL is a round trip. Can we get away with just 1? # # WARNING: If POLL×1 fails, the device may be in a bad state. We try to # recover with an extra POLL×2 and a fresh 5A attempt. Even on failure we # try to leave the device in a usable state. # # Strategy: run the full event sequence up to 1F(download), then try 5A # with only 1 POLL. If 5A responds → PASS. If timeout → try 2 more POLLs # and check if the device recovers. def exp_fewer_polls(host: str, port: int) -> tuple[str, str, dict]: """Full sequence to 1F, then only 1 POLL before 5A (Blastware does 3).""" t, proto = connect_proto(host, port) try: proto.startup() key4, data8 = proto.read_event_first() if data8[4:8] == b"\x00\x00\x00\x00": return "INCONCLUSIVE", "No events stored — cannot test", {} print(f" Event key: {key4.hex()}") # Full setup: 0A → 1E-arm → 0C → 1F(download) _hdr, rec_len = proto.read_waveform_header(key4) print(f" 0A OK rec_len=0x{rec_len:02X}") proto.read_event_first(token=0xFE) # 1E-arm print(" 1E-arm OK") proto.read_waveform_record(key4) print(" 0C OK") arm_key4, _ = proto.advance_event(browse=False) # 1F(download) — arms 5A print(f" 1F(download) OK arm_key={arm_key4.hex()}") # Only 1 POLL (Blastware does 3) print(" Sending 1 POLL (instead of 3)…") proto.poll() print(" POLL ok — now probing 5A…") try: frames = proto.read_bulk_waveform_stream(key4, stop_after_metadata=True, max_chunks=12) print(f" 5A OK after 1 POLL — {len(frames)} frames received") details = {"poll_count": 1, "frames": len(frames)} return "PASS", "5A works with only 1 POLL (saved 2 round-trips per event)!", details except ProtoTimeout: print(" 5A timed out after 1 POLL — device needs more POLLs") # Attempt recovery: send 2 more POLLs and see if 5A then works print(" Attempting recovery: 2 more POLLs…") try: proto.poll() proto.poll() frames2 = proto.read_bulk_waveform_stream(key4, stop_after_metadata=True, max_chunks=12) print(f" 5A worked after total 3 POLLs ({len(frames2)} frames)") return "FAIL", "5A needs 3 POLLs — 1 is not enough (recovery confirmed 3 still works)", { "poll_count_tried": 1, "recovery_polls": 3, "recovery_frames": len(frames2) } except ProtocolError as exc2: return "FAIL", f"5A failed even after 3 total POLLs — device may need reconnect: {exc2}", {} except ProtocolError as exc: return "FAIL", f"Setup failed: {exc}", {} finally: t.disconnect() # ══════════════════════════════════════════════════════════════════════════════ # EXP6 — Compliance-only write (71×3→72), skip event index + trigger + waveform # ══════════════════════════════════════════════════════════════════════════════ # # Blastware's full write sequence: 68→73 | 71×3→72 | 82→83 | 69→74→72 # We want to know: can we write ONLY the compliance block (71×3→72)? # # Test procedure: # 1. Read current compliance config (SUB 1A) # 2. Patch the "notes" field to a test marker # 3. Write ONLY 71×3→72 (skip 68, 73, 82, 83, 69, 74, final 72) # 4. Read back (SUB 1A) and verify the change was written # 5. Restore original value # # If PASS: we can push individual config fields without touching event index, # trigger config, or waveform data — huge simplification. # If FAIL: the device needs the full write sequence (may reject partial write). # # SAFETY: We restore original data in a finally block. If the restore write # fails, the device will have the test marker in "notes" — harmless but visible. _EXP6_MARKER = "[exp6-test]" def exp_compliance_only(host: str, port: int) -> tuple[str, str, dict]: """Write compliance block alone (71×3→72), verify, and restore.""" client, info = connect_client(host, port) original_raw: Optional[bytes] = None try: proto = client._proto if proto is None: return "FAIL", "Could not get protocol handle from client", {} # 1. Read current compliance print(" Reading current compliance config (SUB 1A)…") original_raw = proto.read_compliance_config() print(f" Got {len(original_raw)} bytes of compliance config") # Find current notes value for display info_obj = DeviceInfo() _decode_compliance_config_into(original_raw, info_obj) cc = info_obj.compliance_config orig_notes = cc.notes if cc else "(unknown)" print(f" Current notes field: {orig_notes!r}") # 2. Build modified payload with test marker in notes test_notes = _EXP6_MARKER modified_raw = _encode_compliance_config( original_raw, notes=test_notes, ) print(f" Encoded modified compliance payload ({len(modified_raw)} bytes)") print(f" Patching notes: {orig_notes!r} → {test_notes!r}") # 3. Write ONLY the compliance block: 71×3 → 72 print(" Writing compliance ONLY (71×3→72) — skipping 68/73/82/83/69/74…") proto.write_compliance_config_raw(modified_raw) print(" Write complete — device acked 71×3→72") # 4. Read back and verify print(" Reading back compliance config to verify…") readback_raw = proto.read_compliance_config() readback_info = DeviceInfo() _decode_compliance_config_into(readback_raw, readback_info) rb_cc = readback_info.compliance_config readback_notes = rb_cc.notes if rb_cc else "(decode failed)" print(f" Read-back notes: {readback_notes!r}") write_worked = (readback_notes == test_notes) print(f" Write verified: {write_worked}") details = { "original_notes": orig_notes, "written_notes": test_notes, "readback_notes": readback_notes, "write_verified": write_worked, } if write_worked: return "PASS", "Compliance-only write works! No event index or trigger writes needed.", details else: return "FAIL", f"Write was not reflected in read-back (got {readback_notes!r})", details except ProtocolError as exc: return "FAIL", f"Protocol error: {exc}", {} finally: # Restore original compliance data regardless of outcome if original_raw is not None: print(" Restoring original compliance config…") try: proto2 = client._proto if proto2: proto2.write_compliance_config_raw( _encode_compliance_config(original_raw) # no-op patch = verbatim ) print(" Restore complete") else: print(" WARNING: protocol handle gone — could not restore") except Exception as exc_r: print(f" WARNING: restore failed: {exc_r}") client.close() # ══════════════════════════════════════════════════════════════════════════════ # Registry + main # ══════════════════════════════════════════════════════════════════════════════ EXPERIMENTS = { "cold_status": ("EXP1", exp_cold_status, "Monitor status (1C) with no POLL"), "fast_event_count": ("EXP2", exp_fast_event_count, "Event count via POLL+08, skip identity reads"), "no_5a": ("EXP3", exp_no_5a, "Event record (0C) without bulk waveform (5A)"), "skip_1e": ("EXP4", exp_skip_1e, "0A/0C with cached key — skip initial 1E"), "fewer_polls": ("EXP5", exp_fewer_polls, "1 POLL before 5A instead of Blastware's 3"), "compliance_only": ("EXP6", exp_compliance_only, "Compliance-only write (71×3→72), no other blocks"), } def main() -> None: ap = argparse.ArgumentParser(description="MiniMate Plus protocol minimization experiments") ap.add_argument("--host", default=DEFAULT_HOST) ap.add_argument("--port", type=int, default=DEFAULT_PORT) ap.add_argument("--debug", action="store_true", help="Enable DEBUG wire logging") ap.add_argument("experiments", nargs="*", help=f"Which to run (default: all). Choices: {', '.join(EXPERIMENTS)}") args = ap.parse_args() if args.debug: logging.getLogger().setLevel(logging.DEBUG) which = args.experiments or list(EXPERIMENTS.keys()) unknown = [e for e in which if e not in EXPERIMENTS] if unknown: print(f"Unknown experiments: {unknown}") print(f"Available: {', '.join(EXPERIMENTS)}") sys.exit(1) print(f"\n{'═'*60}") print(f" MiniMate Plus Protocol Minimization Experiments") print(f" Target: {args.host}:{args.port}") print(f" Running: {', '.join(which)}") print(f"{'═'*60}") results: list[Result] = [] for key in which: tag, fn, desc = EXPERIMENTS[key] label = f"{tag}: {desc}" r = run(label, fn, args.host, args.port) results.append(r) time.sleep(1.5) # brief pause between experiments — let device settle print(f"\n\n{'═'*60}") print(" SUMMARY") print(f"{'═'*60}") for r in results: sym = {"PASS": "✅", "FAIL": "❌", "INCONCLUSIVE": "⚠️ "}.get(r.outcome, "?") print(f" {sym} {r.outcome:13s} {r.name}") print(f"{'═'*60}") passed = sum(1 for r in results if r.outcome == "PASS") failed = sum(1 for r in results if r.outcome == "FAIL") skipped = sum(1 for r in results if r.outcome == "INCONCLUSIVE") print(f" {passed} passed {failed} failed {skipped} inconclusive") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\nInterrupted.") sys.exit(0)