5 Commits

Author SHA1 Message Date
claude ab14328c8b feat: enhance logging messages in ach_server.py and add experiments.py for protocol minimization 2026-04-10 00:58:54 -04:00
claude 0baf343bf5 feat: add high-water mark state tracking to ach_server + fix monitoring flag
ach_server.py:
- Add ach_state.json per-unit state tracking (keyed by serial number)
- count_events() before any download; skip session if no new events since last call-home
- Download only events beyond the previous high-water mark (all_events[last_count:])
- --max-events N safety cap for first-run units with many stored events
- state_path and max_events wired through AchSession constructor and serve()

client.py (_decode_monitor_status):
- Revert monitoring flag to section[1] == 0x10 (was incorrectly changed to section[6])
- Fix battery/memory offsets to section[-10:-8], [-8:-4], [-4:] (no trailing checksum byte)
- Both confirmed by full byte diff of all 144 0xE3 data frames in 4-8-26/2ndtry capture

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-09 14:38:44 -04:00
claude 05421764a5 feat: add SocketTransport and ach_server.py inbound ACH server
minimateplus/transport.py:
- Add SocketTransport(TcpTransport) — wraps an already-accepted inbound
  socket; connect() is a no-op; everything else inherited from TcpTransport.
  Enables the ACH server to reuse all existing protocol/client code without
  any changes.

bridges/ach_server.py:
- Minimal inbound ACH server — listens on port 12345, accepts call-home
  connections from MiniMate Plus units, runs the full BW protocol:
  startup handshake → get_device_info → get_events(full_waveform=True)
- Saves device_info.json + events.json + raw_rx_<ts>.bin + session log
  per connection to bridges/captures/ach_inbound_<ts>/
- raw_rx.bin is byte-compatible with existing Analyzer tooling
- Taps transport.read() to capture raw S3 bytes alongside parsed output
- Each connection runs in its own daemon thread
- Clearly distinguishes push vs pull protocol in the startup log

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-09 12:34:27 -04:00
claude 74233d7e31 feat: add splitter mode to ach_bridge.py (--mirror HOST:PORT)
Adds a production-safe headphone-splitter mode:
- Device bytes tee'd to both --upstream (primary/prod) and --mirror (new server)
- Only primary server responses are returned to the device
- Mirror connect/write failures are non-fatal and logged; prod is unaffected
- New raw_mirror_<ts>.bin capture file alongside raw_client/raw_server

Three modes: standalone (capture only), bridge (one upstream), splitter (two).
Default listen port changed to 12345 to match project ACH setup.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-09 12:17:57 -04:00
claude 46a86939b7 feat: add ACH TCP bridge, serial tap tool, and Serial Watch tab
- bridges/ach_bridge.py: transparent TCP bridge that MITMs the MiniMate Plus
  call-home connection — forwards to real ACH server while logging all frames
  to raw_client/raw_server .bin files compatible with parse_capture.py;
  standalone capture mode for lab use without a real server

- bridges/serial_watch.py: RS-232 serial monitor with live S3 frame parsing;
  taps the line between MiniMate and modem (RV50/RV55); captures raw bytes,
  .log and .jsonl; --ack-ok mode auto-replies to AT commands; fixed fatal
  indentation bug in the original that silently prevented any data capture

- seismo_lab.py: new "Serial Watch" fourth tab (SerialWatchPanel) wrapping
  serial_watch.py functionality; COM port picker with refresh, baud config,
  ack-ok toggle, colour-coded live frame log (teal frames / yellow ctrl /
  blue AT), raw .bin capture auto-fed into Analyzer tab on stop

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-09 12:10:52 -04:00
10 changed files with 2668 additions and 72 deletions
+19 -15
View File
@@ -582,28 +582,32 @@ All confirmed from 4-8-26/2ndtry BW TX/S3 capture (clean start → 30s monitor
Standard two-step read (probe at offset 0x00, data at offset 0x2C). Standard two-step read (probe at offset 0x00, data at offset 0x2C).
Response SUB = 0xFF 0x1C = **0xE3** (standard formula — no exception). Response SUB = 0xFF 0x1C = **0xE3** (standard formula — no exception).
**Payload length is ~4649 bytes in BOTH idle and monitoring states** — length alone **Payload length is 4647 bytes IDLE, 4849 bytes MONITORING** — not a reliable sole
is NOT a reliable mode indicator. Earlier note claiming "12 bytes when monitoring" indicator due to 1-byte jitter overlap at the boundary.
was wrong (confirmed 2026-04-08 from 4-8-26/mid-monitor captures).
**Monitoring flag (CORRECTED 2026-04-08 full byte diff of 2ndtry capture):** **Monitoring flag (CONFIRMED 2026-04-09 — byte diff of all 144 data frames, 2ndtry capture):**
- `section[6] == 0x00` → unit is **idle** - `section[1] == 0x00` → unit is **idle**
- `section[6] == 0x10` → unit is **monitoring** - `section[1] == 0x10` → unit is **monitoring**
Earlier note claiming `section[1]` was the flag was WRONG — section[1] is always 0x00 in both states. The correction was found by diffing all 0xE3 data frames across the start/stop transitions: `section[6]` is the only byte that flips cleanly at frame #36 (start) and #132 (stop) within the 2ndtry 0xE3 frame sequence. This is `data[12]` (= `frame.data[12]`). The flag is 0x00 in all 36 IDLE_BEFORE frames,
0x10 in all 98 MONITORING frames, and 0x00 in all 10 IDLE_AFTER frames — 100% accurate.
Battery and memory fields are present in **both** states, but the payload grows by **3 bytes** when monitoring is active (section goes from ~52 to ~55 bytes), shifting subsequent fields by +3. **HISTORY OF THIS FIELD (do not re-derive):** The original implementation used `section[1]`.
A re-analysis in the prior session incorrectly concluded `section[1]` is always 0x00 and
"corrected" the flag to `section[6]`, which has non-binary values (0xea idle, 0x07 monitoring)
and is device-specific. The 2026-04-09 re-analysis confirms `section[1]` was right.
**Field offsets (relative to `data[11:]` = section):** **IMPORTANT — `frame.data` has checksum already stripped** by `S3FrameParser._finalise()`
(`raw_payload = body[:-1]`; `data = raw_payload[5:]`). There is NO trailing checksum byte in
`section`. All relative-from-end offsets must account for this.
Battery and memory are at **relative offsets from the end** — the payload can vary by ±13 bytes due to counter jitter and monitoring-mode expansion, but these 10 bytes are always anchored at the end: Battery and memory fields are present in **both** states:
| Offset (relative to end) | Field | Type | Notes | | Offset (relative to end) | Field | Type | Notes |
|---|---|---|---| |---|---|---|---|
| `section[-11:-9]` | battery voltage × 100 | uint16 BE | `0x02A8` = 680 → 6.80 V | | `section[-10:-8]` | battery voltage × 100 | uint16 BE | `0x02A8` = 680 → 6.80 V |
| `section[-9:-5]` | memory total (bytes) | uint32 BE | e.g. 983026 ≈ 960 KB | | `section[-8:-4]` | memory total (bytes) | uint32 BE | e.g. 983026 ≈ 960 KB |
| `section[-5:-1]` | memory free (bytes) | uint32 BE | decreases as events are stored | | `section[-4:]` | memory free (bytes) | uint32 BE | decreases as events are stored |
| `section[-1]` | frame checksum | — | last byte, skip |
### SESSION_RESET signal (`41 03`) — required for monitoring units ### SESSION_RESET signal (`41 03`) — required for monitoring units
@@ -657,7 +661,7 @@ Key findings:
**SFM behavior after `POST /device/monitor/start`:** `_pollMonitorConfirm()` polls **SFM behavior after `POST /device/monitor/start`:** `_pollMonitorConfirm()` polls
`/device/monitor/status` every 5 s for up to 60 s, updating the badge on each poll. `/device/monitor/status` every 5 s for up to 60 s, updating the badge on each poll.
Status will show MONITORING once `section[6]` flips to `0x10`. Status will show MONITORING once `section[1]` flips to `0x10`.
### SUBs known from sensor-check capture (4-8-26) — NOT YET IMPLEMENTED ### SUBs known from sensor-check capture (4-8-26) — NOT YET IMPLEMENTED
+627
View File
@@ -0,0 +1,627 @@
#!/usr/bin/env python3
"""
ach_bridge.py — Transparent TCP bridge / splitter for Instantel MiniMate Plus
call-home (ACH) traffic.
Modes
-----
standalone Accept connection, capture frames, do NOT forward anywhere.
Good for initial discovery with a test unit.
bridge Forward to one upstream server while capturing.
Use this for the initial discovery phase with your test server.
splitter Forward to the PRIMARY upstream (production ACH server) AND
mirror a copy to a SECONDARY server simultaneously.
The device never knows — it talks to the primary the whole time.
If the mirror fails, the primary connection is unaffected.
Think of it like a headphone splitter: one input, two outputs.
Primary → authoritative responses back to device.
Mirror → gets all device bytes, its responses are discarded.
Usage
-----
# Standalone capture (test/discovery — no forwarding)
python bridges/ach_bridge.py --standalone [--port 12345]
# Bridge mode (forward to one server, e.g. your test server)
python bridges/ach_bridge.py --upstream HOST:PORT [--port 12345]
# Splitter mode (production: forward to prod + mirror to your server)
python bridges/ach_bridge.py --upstream PROD_HOST:PORT --mirror MY_HOST:PORT [--port 12345]
Setup for discovery (test server, don't touch prod)
----------------------------------------------------
1. Stand up your test ACH server, note its IP and port (e.g. 192.168.1.50:12345).
2. Take ONE test unit. In ACEmanager → Call Home, point it at:
<this machine's LAN IP> : <--port>
3. Run: python bridges/ach_bridge.py --upstream TEST_SERVER:12345 --port 12345
4. Trigger the unit. Raw frames are saved to bridges/captures/ach_<ts>/.
5. Revert the unit's ACEmanager setting when done.
Setup for production splitter (when you're ready)
-------------------------------------------------
This does NOT touch the units. Instead you re-route traffic at the network
layer so that call-home packets arrive at a machine running this script first.
Typical approach: update the DNS entry / host record your prod ACH server is
registered under to point at this machine. The units keep their existing
ACEmanager settings.
python bridges/ach_bridge.py \\
--upstream PROD_ACH_HOST:12345 \\
--mirror MY_NEW_SERVER:12345 \\
--port 12345
Output (each connection gets its own timestamped sub-directory)
------
bridges/captures/ach_<ts>/
raw_client_<ts>.bin — raw bytes from the device (S3 side)
raw_server_<ts>.bin — raw bytes from the primary upstream (BW side)
raw_mirror_<ts>.bin — raw bytes from the mirror upstream (splitter mode only)
session_<ts>.log — human-readable frame parse log
session_<ts>.jsonl — JSON-lines frame log
raw_client / raw_server are byte-for-byte compatible with parse_capture.py.
"""
from __future__ import annotations
import argparse
import asyncio
import datetime
import json
import logging
import os
import sys
from pathlib import Path
from typing import List, Optional
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from minimateplus.framing import S3FrameParser, S3Frame
log = logging.getLogger("ach_bridge")
# ── Frame label helpers ──────────────────────────────────────────────────────
_KNOWN_RSP_SUBS = {
0xA4: "POLL_RSP",
0xA5: "BULK_WAVEFORM_RSP",
0xE0: "ADVANCE_EVENT_RSP",
0xE1: "EVENT_INDEX_FIRST_RSP",
0xE3: "MONITOR_STATUS_RSP",
0xEA: "SERIAL_NUM_RSP",
0xF3: "WAVEFORM_RECORD_RSP",
0xF5: "WAVEFORM_HEADER_RSP",
0xF7: "EVENT_INDEX_RSP",
0xF9: "UNK_06_RSP",
0xFE: "DEVICE_INFO_RSP",
# Write acks
0x97: "EVT_IDX_WRITE_ACK",
0x8C: "CONFIRM_B_ACK",
0x8E: "COMPLIANCE_WRITE_ACK",
0x8D: "CONFIRM_A_ACK",
0x7D: "TRIGGER_WRITE_ACK",
0x7C: "TRIGGER_CONFIRM_ACK",
0x96: "WAVEFORM_WRITE_ACK",
0x8B: "CONFIRM_C_ACK",
0x69: "START_MONITOR_ACK",
0x68: "STOP_MONITOR_ACK",
}
_KNOWN_REQ_SUBS = {
0x5B: "POLL",
0x5A: "BULK_WAVEFORM",
0x1F: "ADVANCE_EVENT",
0x1E: "EVENT_INDEX_FIRST",
0x1C: "MONITOR_STATUS",
0x15: "SERIAL_NUM",
0x0C: "WAVEFORM_RECORD",
0x0A: "WAVEFORM_HEADER",
0x08: "EVENT_INDEX",
0x06: "UNK_06",
0x01: "DEVICE_INFO",
# Write commands
0x68: "EVT_IDX_WRITE",
0x73: "CONFIRM_B",
0x71: "COMPLIANCE_WRITE",
0x72: "CONFIRM_A",
0x82: "TRIGGER_WRITE",
0x83: "TRIGGER_CONFIRM",
0x69: "WAVEFORM_WRITE",
0x74: "CONFIRM_C",
0x96: "START_MONITOR",
0x97: "STOP_MONITOR",
}
def _label_s3_frame(frame: S3Frame) -> str:
name = _KNOWN_RSP_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}")
chk = "" if frame.checksum_valid else "✗CHK"
return (
f"S3→ SUB=0x{frame.sub:02X} ({name}) "
f"page=0x{frame.page_key:04X} data={len(frame.data)}B {chk}"
)
def _label_bw_frame(data: bytes, prefix: str = " →BW") -> str:
"""Best-effort label for a raw BW request frame (wire bytes)."""
# Wire layout: 41 02 10 10 00 sub ...
if len(data) < 6:
return f"{prefix} (short {len(data)}B)"
sub = data[5]
name = _KNOWN_REQ_SUBS.get(sub, f"UNK_0x{sub:02X}")
return f"{prefix} SUB=0x{sub:02X} ({name}) {len(data)}B"
# ── Per-session capture writer ─────────────────────────────────────────────────
class CaptureSession:
"""Writes raw bytes + parsed log for one TCP connection."""
def __init__(self, capture_dir: Path, peer: str, *, has_mirror: bool = False):
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
self.dir = capture_dir / f"ach_{ts}"
self.dir.mkdir(parents=True, exist_ok=True)
self.peer = peer
self._raw_client = open(self.dir / f"raw_client_{ts}.bin", "wb")
self._raw_server = open(self.dir / f"raw_server_{ts}.bin", "wb")
self._raw_mirror = (
open(self.dir / f"raw_mirror_{ts}.bin", "wb") if has_mirror else None
)
self._log_fh = open(self.dir / f"session_{ts}.log", "w")
self._jsonl_fh = open(self.dir / f"session_{ts}.jsonl", "w")
self._s3_parser = S3FrameParser()
self._frame_count = 0
self._byte_count_client = 0
self._byte_count_server = 0
self._byte_count_mirror = 0
self._log(
f"# ACH capture — peer={peer} "
f"mirror={'yes' if has_mirror else 'no'} "
f"started={datetime.datetime.now().isoformat()}"
)
self._log(f"# Output dir: {self.dir}")
log.info("Capture session opened: %s (peer=%s)", self.dir, peer)
# ── public API ────────────────────────────────────────────────────────────
def feed_client(self, data: bytes) -> None:
"""Bytes FROM the device (S3 response frames)."""
self._raw_client.write(data)
self._raw_client.flush()
self._byte_count_client += len(data)
for byte in data:
frame = self._s3_parser.feed(bytes([byte]))
if frame:
frames = frame if isinstance(frame, list) else [frame]
for f in frames:
self._frame_count += 1
label = _label_s3_frame(f)
self._log(f"[{self._frame_count:04d}] {label}")
self._log(
f" hex: {f.data[:64].hex()}"
+ (" ..." if len(f.data) > 64 else "")
)
self._emit_json("s3", f)
def feed_server(self, data: bytes) -> None:
"""Bytes FROM the primary upstream server (BW request frames)."""
self._raw_server.write(data)
self._raw_server.flush()
self._byte_count_server += len(data)
label = _label_bw_frame(data, prefix=" →BW[primary]")
self._log(f" {label}")
def feed_mirror(self, data: bytes) -> None:
"""Bytes FROM the mirror server (logged, not forwarded to device)."""
if self._raw_mirror:
self._raw_mirror.write(data)
self._raw_mirror.flush()
self._byte_count_mirror += len(data)
label = _label_bw_frame(data, prefix=" →BW[mirror] ")
self._log(f" {label} [MIRROR — not sent to device]")
def close(self, reason: str = "connection closed") -> None:
self._log(f"# Session ended: {reason}")
self._log(
f"# Totals — client={self._byte_count_client}B "
f"server={self._byte_count_server}B "
f"mirror={self._byte_count_mirror}B "
f"s3_frames={self._frame_count}"
)
handles = [self._raw_client, self._raw_server, self._log_fh, self._jsonl_fh]
if self._raw_mirror:
handles.append(self._raw_mirror)
for fh in handles:
try:
fh.close()
except Exception:
pass
log.info(
"Session closed (%s): %dB client, %dB server, %dB mirror, %d S3 frames → %s",
reason,
self._byte_count_client, self._byte_count_server,
self._byte_count_mirror, self._frame_count,
self.dir,
)
# ── internals ─────────────────────────────────────────────────────────────
def _log(self, msg: str) -> None:
print(msg, file=self._log_fh, flush=True)
print(msg)
def _emit_json(self, direction: str, frame: S3Frame) -> None:
record = {
"dir": direction,
"sub": frame.sub,
"page_key": frame.page_key,
"data_len": len(frame.data),
"data_hex": frame.data.hex(),
"checksum_valid": frame.checksum_valid,
}
print(json.dumps(record), file=self._jsonl_fh, flush=True)
# ── Bridge / splitter connection handler ──────────────────────────────────────
class BridgeHandler:
"""
Handles inbound device connections.
Modes (determined by which upstreams are configured):
standalone — no upstream_host / no mirror_host
bridge — upstream_host set, no mirror_host
splitter — upstream_host AND mirror_host both set
"""
def __init__(
self,
capture_dir: Path,
upstream_host: Optional[str],
upstream_port: Optional[int],
mirror_host: Optional[str] = None,
mirror_port: Optional[int] = None,
):
self.capture_dir = capture_dir
self.upstream_host = upstream_host
self.upstream_port = upstream_port
self.mirror_host = mirror_host
self.mirror_port = mirror_port
async def handle(
self,
client_reader: asyncio.StreamReader,
client_writer: asyncio.StreamWriter,
) -> None:
peer = client_writer.get_extra_info("peername", ("?", 0))
peer_str = f"{peer[0]}:{peer[1]}"
log.info("Inbound connection from %s", peer_str)
has_mirror = bool(self.mirror_host)
session = CaptureSession(self.capture_dir, peer_str, has_mirror=has_mirror)
if not self.upstream_host:
# ── Standalone mode ──────────────────────────────────────────────
log.info("Standalone mode — recording inbound traffic only")
try:
while True:
data = await client_reader.read(4096)
if not data:
break
session.feed_client(data)
except asyncio.CancelledError:
pass
except Exception as exc:
log.warning("Standalone read error: %s", exc)
finally:
session.close("standalone capture ended")
try:
client_writer.close()
await client_writer.wait_closed()
except Exception:
pass
return
# ── Bridge / splitter mode ───────────────────────────────────────────
# Connect to primary upstream (required)
try:
up_reader, up_writer = await asyncio.open_connection(
self.upstream_host, self.upstream_port
)
log.info("Connected to primary %s:%s", self.upstream_host, self.upstream_port)
except Exception as exc:
log.error("Failed to connect to primary upstream: %s", exc)
session.close(f"primary connect failed: {exc}")
client_writer.close()
return
# Connect to mirror upstream (optional — failure is non-fatal)
mir_reader: Optional[asyncio.StreamReader] = None
mir_writer: Optional[asyncio.StreamWriter] = None
if self.mirror_host:
try:
mir_reader, mir_writer = await asyncio.open_connection(
self.mirror_host, self.mirror_port
)
log.info("Connected to mirror %s:%s", self.mirror_host, self.mirror_port)
except Exception as exc:
log.warning(
"Mirror connect failed — continuing without mirror: %s", exc
)
session._log(f"# WARNING: mirror connect failed: {exc}")
# Build relay tasks
#
# ┌──────────┐ device bytes ┌─────────────┐
# │ Device │ ─────────────► │ PRIMARY │ responses ──► device
# └──────────┘ └─────────────┘
# │
# │ device bytes (copy)
# ▼
# ┌─────────────┐
# │ MIRROR │ responses discarded (logged only)
# └─────────────┘
#
tasks = [
asyncio.create_task(
self._relay_device(client_reader, up_writer, mir_writer, session),
name="device→upstreams",
),
asyncio.create_task(
self._relay_simple(up_reader, client_writer, session, "server"),
name="primary→device",
),
]
if mir_reader is not None:
tasks.append(asyncio.create_task(
self._relay_drain(mir_reader, session),
name="mirror→drain",
))
try:
# Wait for the device-to-upstreams relay to exit first (device
# disconnected or primary dropped). Then cancel the rest.
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED,
)
for t in pending:
t.cancel()
try:
await t
except (asyncio.CancelledError, Exception):
pass
except Exception as exc:
log.warning("Bridge relay error: %s", exc)
finally:
session.close("relay ended")
for writer in filter(None, [client_writer, up_writer, mir_writer]):
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
# ── Relay helpers ─────────────────────────────────────────────────────────
async def _relay_device(
self,
reader: asyncio.StreamReader,
primary_writer: asyncio.StreamWriter,
mirror_writer: Optional[asyncio.StreamWriter],
session: CaptureSession,
) -> None:
"""
Read bytes from the device, write to the primary server, and also
write a copy to the mirror server (if connected). Mirror write
failures are non-fatal — we log and continue.
"""
try:
while True:
data = await reader.read(4096)
if not data:
break
session.feed_client(data)
# Primary write — failure IS fatal (lose primary = lose prod)
primary_writer.write(data)
await primary_writer.drain()
# Mirror write — failure is non-fatal
if mirror_writer is not None:
try:
mirror_writer.write(data)
await mirror_writer.drain()
except Exception as exc:
log.warning("Mirror write failed (non-fatal): %s", exc)
session._log(f"# WARNING: mirror write failed: {exc}")
mirror_writer = None # stop trying
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
pass
async def _relay_simple(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
session: CaptureSession,
direction: str,
) -> None:
"""Standard single-pipe relay (primary→device or vice-versa)."""
try:
while True:
data = await reader.read(4096)
if not data:
break
if direction == "server":
session.feed_server(data)
else:
session.feed_client(data)
writer.write(data)
await writer.drain()
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
pass
async def _relay_drain(
self,
reader: asyncio.StreamReader,
session: CaptureSession,
) -> None:
"""
Read mirror server responses, log them to session, do NOT forward to
device. The device only ever sees primary server responses.
"""
try:
while True:
data = await reader.read(4096)
if not data:
break
session.feed_mirror(data)
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
pass
# ── Main ───────────────────────────────────────────────────────────────────────
async def main(args: argparse.Namespace) -> None:
capture_dir = Path(__file__).parent / "captures"
capture_dir.mkdir(parents=True, exist_ok=True)
upstream_host: Optional[str] = None
upstream_port: Optional[int] = None
mirror_host: Optional[str] = None
mirror_port: Optional[int] = None
if not args.standalone:
if not args.upstream:
print("ERROR: --upstream HOST:PORT is required unless --standalone is set.")
sys.exit(1)
parts = args.upstream.rsplit(":", 1)
if len(parts) != 2:
print("ERROR: --upstream must be HOST:PORT (e.g. 203.0.113.5:12345)")
sys.exit(1)
upstream_host = parts[0]
upstream_port = int(parts[1])
if args.mirror:
parts = args.mirror.rsplit(":", 1)
if len(parts) != 2:
print("ERROR: --mirror must be HOST:PORT (e.g. 192.168.1.50:12345)")
sys.exit(1)
mirror_host = parts[0]
mirror_port = int(parts[1])
handler = BridgeHandler(
capture_dir,
upstream_host, upstream_port,
mirror_host, mirror_port,
)
server = await asyncio.start_server(
handler.handle,
host="0.0.0.0",
port=args.port,
)
# ── Startup banner ────────────────────────────────────────────────────────
if args.standalone:
mode = "STANDALONE capture (no forwarding)"
elif mirror_host:
mode = f"SPLITTER primary={upstream_host}:{upstream_port} mirror={mirror_host}:{mirror_port}"
else:
mode = f"BRIDGE → {upstream_host}:{upstream_port}"
addrs = ", ".join(str(s.getsockname()) for s in server.sockets)
print(f"\n{'='*70}")
print(f" ACH bridge/splitter listening on {addrs}")
print(f" Mode: {mode}")
print(f" Captures: {capture_dir}/ach_<timestamp>/")
print(f"{'='*70}")
if upstream_host and not mirror_host:
print(f"\n DISCOVERY PHASE")
print(f" Point your TEST unit's ACEmanager call-home destination to:")
print(f" <this machine's LAN IP> : {args.port}")
print(f" All traffic will be forwarded to {upstream_host}:{upstream_port}")
elif mirror_host:
print(f"\n SPLITTER MODE — PRODUCTION SAFE")
print(f" Units connect as normal. Every byte is forwarded to:")
print(f" PRIMARY (authoritative): {upstream_host}:{upstream_port}")
print(f" MIRROR (your server): {mirror_host}:{mirror_port}")
print(f" Only PRIMARY responses reach the device.")
print(f" Mirror failures are logged and do not affect the device.")
else:
print(f"\n STANDALONE MODE — capture only, nothing forwarded")
print(f" Point a unit at <this machine's LAN IP> : {args.port}")
print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n")
async with server:
await server.serve_forever()
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description=(
"Transparent TCP bridge / splitter for Instantel MiniMate Plus "
"call-home (ACH) traffic."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
p.add_argument(
"--upstream", "-u",
metavar="HOST:PORT",
help=(
"Primary upstream ACH server to forward to "
"(e.g. 203.0.113.5:12345). "
"Omit with --standalone for capture-only mode."
),
)
p.add_argument(
"--mirror", "-m",
metavar="HOST:PORT",
help=(
"Mirror / secondary server to receive a copy of all device bytes "
"(splitter mode). Mirror responses are logged but NOT forwarded "
"to the device. Mirror failures are non-fatal."
),
)
p.add_argument(
"--port", "-p",
type=int,
default=12345,
help="Local port to listen on (default: 12345).",
)
p.add_argument(
"--standalone", "-s",
action="store_true",
help="Capture-only mode: accept connection, do not forward anywhere.",
)
p.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable debug logging.",
)
return p.parse_args()
if __name__ == "__main__":
args = parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
)
try:
asyncio.run(main(args))
except KeyboardInterrupt:
print("\nStopped.")
+454
View File
@@ -0,0 +1,454 @@
#!/usr/bin/env python3
"""
ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus.
This IS your test server. Run it on any machine on the same network, point a
unit's ACEmanager call-home destination at it, and it will speak the full BW
protocol to the device: handshake, pull device info, download all events, save
everything as JSON.
The key thing this script tells you that no amount of packet sniffing can:
- Does the device speak first (push) or wait for us to send POLL (pull)?
If startup() completes normally → it's pull protocol, same as Blastware.
If startup() times out → the device sent something first; check raw_rx.bin.
Usage
-----
python bridges/ach_server.py [--port 12345] [--output bridges/captures/]
Setup
-----
1. Run this script on a machine on your local network.
2. In ACEmanager → Application → ALEOS Application Framework (or equivalent)
find the Call Home / ACH settings. Set:
Remote Host: <this machine's LAN IP>
Remote Port: 12345
3. Trigger the unit (wait for a vibration event, or use the manual call-home
button if your firmware version has one).
4. The unit connects. This script handshakes, downloads all events,
and saves a timestamped session directory.
Output per session
------------------
bridges/captures/ach_inbound_<ts>/
device_info.json — serial number, firmware version, calibration date, etc.
events.json — all events: timestamp, PPV per channel, peaks, metadata
raw_rx_<ts>.bin — raw bytes from the device (S3 side) for Analyzer
session_<ts>.log — detailed protocol log
What to look for
----------------
Push vs pull: Check session_<ts>.log. If the first line after "Connected"
shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL
gets a clean response, it's pull.
Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry
bulk waveform data — if frequency is sent pre-computed there will be float32
values before the ADC sample blocks.
ACH-specific framing: Does the unit send anything extra before the DLE+STX
framing starts? raw_rx.bin will show raw bytes including any preamble.
"""
from __future__ import annotations
import argparse
import datetime
import json
import logging
import socket
import sys
import threading
from pathlib import Path
from typing import Optional
sys.path.insert(0, str(Path(__file__).parent.parent))
from minimateplus.transport import SocketTransport
from minimateplus.client import MiniMateClient
from minimateplus.models import DeviceInfo, Event
log = logging.getLogger("ach_server")
# ── Per-unit state (high-water mark) ──────────────────────────────────────────
# Persisted as <output_dir>/ach_state.json
# Format: { "BE11529": { "event_count": 5, "last_seen": "2026-04-09T..." }, ... }
_state_lock = threading.Lock()
def _load_state(state_path: Path) -> dict:
if state_path.exists():
try:
with open(state_path) as f:
return json.load(f)
except Exception:
pass
return {}
def _save_state(state_path: Path, state: dict) -> None:
with _state_lock:
with open(state_path, "w") as f:
json.dump(state, f, indent=2)
# ── Per-session handler ────────────────────────────────────────────────────────
class AchSession:
"""
Handles one inbound unit connection in its own thread.
Wraps the socket in a SocketTransport → MiniMateClient, then runs the
standard connect → get_device_info → get_events sequence.
State tracking (ach_state.json in output_dir):
On each successful download we record how many events the unit had.
On the next call-home we compare: if count hasn't grown, there's nothing
new and we close cleanly without downloading. If it has grown, we
download all events up to the new count and save only the new ones.
"""
def __init__(
self,
sock: socket.socket,
peer: str,
output_dir: Path,
timeout: float,
events_only: bool,
max_events: Optional[int],
state_path: Path,
) -> None:
self.sock = sock
self.peer = peer
self.output_dir = output_dir
self.timeout = timeout
self.events_only = events_only
self.max_events = max_events
self.state_path = state_path
def run(self) -> None:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
session_dir = self.output_dir / f"ach_inbound_{ts}"
session_dir.mkdir(parents=True, exist_ok=True)
log_path = session_dir / f"session_{ts}.log"
raw_path = session_dir / f"raw_rx_{ts}.bin"
# Wire up a file handler so every protocol log line goes to the session log
fh = logging.FileHandler(log_path)
fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s"))
root_logger = logging.getLogger()
root_logger.addHandler(fh)
try:
self._run_inner(session_dir, raw_path, ts)
except Exception as exc:
log.error("Session failed: %s", exc, exc_info=True)
finally:
root_logger.removeHandler(fh)
fh.close()
try:
self.sock.close()
except Exception:
pass
def _run_inner(self, session_dir: Path, raw_path: Path, ts: str) -> None:
log.info("="*60)
log.info("Inbound connection from %s", self.peer)
log.info("Session dir: %s", session_dir)
transport = SocketTransport(self.sock, peer=self.peer)
# Tap the transport: save every raw byte received from the device.
raw_fh = open(raw_path, "wb")
_orig_read = transport.read
def tapped_read(n: int) -> bytes:
data = _orig_read(n)
if data:
raw_fh.write(data)
raw_fh.flush()
return data
transport.read = tapped_read # type: ignore[method-assign]
serial: Optional[str] = None
try:
client = MiniMateClient(transport=transport, timeout=self.timeout)
client.open()
# ── Step 1: startup handshake ─────────────────────────────────────
log.info("Step 1/3: startup handshake (POLL / SUB 5B)")
try:
from minimateplus.protocol import MiniMateProtocol
proto = MiniMateProtocol(transport, recv_timeout=self.timeout)
proto.startup()
log.info(" [OK] Startup OK -- pull protocol confirmed")
except Exception as exc:
log.error(" [FAIL] Startup failed: %s", exc)
return
# ── Step 2: device info ───────────────────────────────────────────
device_info = None
if not self.events_only:
log.info("Step 2/3: reading device info")
try:
device_info = client.connect()
serial = device_info.serial
_save_json(session_dir / "device_info.json", _device_info_to_dict(device_info))
log.info(
" [OK] Device: serial=%s firmware=%s calibration=%s",
serial,
device_info.firmware_version,
device_info.calibration_date,
)
except Exception as exc:
log.error(" [FAIL] Device info failed: %s", exc)
else:
log.info("Step 2/3: skipping device info (--events-only)")
# ── Step 3: check for new events via high-water mark ───────────────
log.info("Step 3/3: checking for new events")
state = _load_state(self.state_path)
unit_key = serial or self.peer # fall back to IP if no serial
last_count = state.get(unit_key, {}).get("event_count", 0)
try:
current_count = client.count_events()
log.info(" Unit has %d stored event(s); last downloaded count: %d",
current_count, last_count)
except Exception as exc:
log.error(" [FAIL] count_events failed: %s", exc)
return
if current_count <= last_count:
log.info(" [OK] No new events since last call-home -- nothing to download")
log.info("Session complete (no new events) -> %s", session_dir)
return
new_event_count = current_count - last_count
log.info(" %d new event(s) to download", new_event_count)
# Download all events up to current_count, apply max_events cap.
# We re-download old events too (get_events always starts from 0),
# but we only SAVE the new ones (the last new_event_count of the list).
stop_idx = current_count - 1
if self.max_events is not None:
stop_idx = min(stop_idx, self.max_events - 1)
if self.max_events < current_count:
log.warning(
" max_events=%d cap: will download events 0%d only "
"(unit has %d total)",
self.max_events, stop_idx, current_count,
)
try:
all_events = client.get_events(
full_waveform=True,
stop_after_index=stop_idx,
)
# Only the events beyond last_count are genuinely new
new_events = all_events[last_count:]
log.info(" [OK] Downloaded %d total event(s), %d new",
len(all_events), len(new_events))
_save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events])
if last_count > 0 and len(all_events) > len(new_events):
log.info(" (skipped %d already-seen event(s))", last_count)
for i, ev in enumerate(new_events):
log.info(
" NEW Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f",
last_count + i,
ev.timestamp.isoformat() if ev.timestamp else "?",
ev.peaks.transverse if ev.peaks else 0,
ev.peaks.vertical if ev.peaks else 0,
ev.peaks.longitudinal if ev.peaks else 0,
ev.peaks.vector_sum if ev.peaks else 0,
)
# Update high-water mark
state[unit_key] = {
"event_count": current_count,
"last_seen": datetime.datetime.now().isoformat(),
"serial": serial,
"peer": self.peer,
}
_save_state(self.state_path, state)
except Exception as exc:
log.error(" [FAIL] Event download failed: %s", exc, exc_info=True)
finally:
raw_fh.close()
client.close() # closes transport / socket cleanly
log.info("Session complete -> %s", session_dir)
log.info("="*60)
# ── JSON helpers ───────────────────────────────────────────────────────────────
def _save_json(path: Path, obj: object) -> None:
with open(path, "w") as f:
json.dump(obj, f, indent=2, default=str)
log.debug("Saved %s", path)
def _device_info_to_dict(d: DeviceInfo) -> dict:
return {
"serial": d.serial,
"firmware_version": d.firmware_version,
"calibration_date": str(d.calibration_date) if d.calibration_date else None,
"aux_trigger": d.aux_trigger,
"setup_name": d.setup_name,
"sample_rate": d.sample_rate,
"record_time": d.record_time,
"trigger_level_geo": d.trigger_level_geo,
"alarm_level_geo": d.alarm_level_geo,
"max_range_geo": d.max_range_geo,
"project": d.project,
"client": d.client,
"operator": d.operator,
"sensor_location": d.sensor_location,
}
def _event_to_dict(e: Event) -> dict:
peaks = {}
if e.peaks:
peaks = {
"transverse": e.peaks.transverse,
"vertical": e.peaks.vertical,
"longitudinal": e.peaks.longitudinal,
"vector_sum": e.peaks.vector_sum,
"mic": e.peaks.mic,
}
samples = {}
if e.raw_samples:
samples = {
ch: vals[:20] # first 20 sample-sets to keep the file sane
for ch, vals in e.raw_samples.items()
}
samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform"
return {
"timestamp": e.timestamp.isoformat() if e.timestamp else None,
"project": e.project,
"client": e.client,
"operator": e.operator,
"sensor_location": e.sensor_location,
"peaks": peaks,
"raw_samples_preview": samples,
}
# ── Main server loop ───────────────────────────────────────────────────────────
def serve(args: argparse.Namespace) -> None:
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
state_path = output_dir / "ach_state.json"
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind(("0.0.0.0", args.port))
server_sock.listen(5)
max_ev = args.max_events
print(f"\n{'='*60}")
print(f" ACH inbound server listening on 0.0.0.0:{args.port}")
print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/")
print(f" State file: {state_path}")
print(f" Max events per session: {max_ev if max_ev else 'unlimited'}")
print(f"{'='*60}")
print(f"\n Point your test unit's ACEmanager call-home settings to:")
print(f" Remote Host: <this machine's LAN IP>")
print(f" Remote Port: {args.port}")
print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n")
try:
while True:
try:
client_sock, addr = server_sock.accept()
peer = f"{addr[0]}:{addr[1]}"
log.info("Accepted connection from %s", peer)
session = AchSession(
sock=client_sock,
peer=peer,
output_dir=output_dir,
timeout=args.timeout,
events_only=args.events_only,
max_events=max_ev,
state_path=state_path,
)
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
t.start()
except KeyboardInterrupt:
raise
except Exception as exc:
log.error("Accept error: %s", exc)
finally:
server_sock.close()
print("\nServer stopped.")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
p.add_argument(
"--port", "-p",
type=int,
default=12345,
help="Port to listen on (default: 12345).",
)
p.add_argument(
"--output", "-o",
default=str(Path(__file__).parent / "captures"),
metavar="DIR",
help="Directory to write session captures (default: bridges/captures/).",
)
p.add_argument(
"--timeout", "-t",
type=float,
default=30.0,
help="Protocol receive timeout in seconds (default: 30.0).",
)
p.add_argument(
"--events-only",
action="store_true",
help="Skip the device-info step and go straight to event download.",
)
p.add_argument(
"--max-events",
type=int,
default=None,
metavar="N",
help=(
"Safety cap: download at most N events per session (default: unlimited). "
"Useful if a unit has many old events stored — prevents a very long first run."
),
)
p.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable debug logging.",
)
return p.parse_args()
if __name__ == "__main__":
args = parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
)
try:
serve(args)
except KeyboardInterrupt:
print("\nStopped.")
+435
View File
@@ -0,0 +1,435 @@
#!/usr/bin/env python3
"""
serial_watch.py — Instantel Series-3 serial monitor with S3 frame parsing.
Taps the RS-232 line between the MiniMate Plus and its modem (RV50/RV55).
Saves raw binary captures compatible with the rest of the analysis toolchain,
plus a human-readable frame log.
Usage
-----
python bridges/serial_watch.py # interactive COM picker
python bridges/serial_watch.py --port COM3 # specify port
python bridges/serial_watch.py --port COM3 --ack-ok # reply OK to AT commands
# (useful if modem is absent
# and you want the device to
# proceed past AT negotiation)
python bridges/serial_watch.py --list # list available ports
Output
------
bridges/captures/serial_<ISO-timestamp>/
raw_s3_<ts>.bin — raw bytes from device (feeds directly into S3FrameParser)
session_<ts>.log — human-readable frame + control-line log
session_<ts>.jsonl — JSON-lines frame log
The raw_s3_*.bin file is byte-for-byte compatible with the existing capture
format used by bridges/parse_capture.py and all analysis scripts.
What to look for in a call-home capture
----------------------------------------
1. Does the device talk first after CONNECT, or does it wait?
- If raw_s3_*.bin has bytes before any AT/POLL exchange → PUSH protocol
- If it stays silent → PULL protocol (same as Blastware manual download)
2. Look for "Operating System" ASCII at the start — the device sends this 16-byte
boot string on cold start before entering DLE-framed mode.
3. RING/CONNECT from the modem appear as ASCII before the DLE frames — the parser
handles these automatically (scans forward to DLE+STX).
"""
from __future__ import annotations
import argparse
import sys
import threading
import time
from datetime import datetime
from pathlib import Path
try:
import serial
from serial.tools import list_ports
except ModuleNotFoundError:
print(
"pyserial not found. Install with:\n python -m pip install pyserial",
file=sys.stderr,
)
sys.exit(1)
# Add project root so we can import the frame parser
sys.path.insert(0, str(Path(__file__).parent.parent))
from minimateplus.framing import S3FrameParser, S3Frame
import json
# ── Helpers ───────────────────────────────────────────────────────────────────
def _ts() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
def _hexdump(b: bytes) -> str:
return " ".join(f"{x:02X}" for x in b)
def _printable(b: bytes) -> str:
return b.decode("latin1", errors="replace")
_KNOWN_SUBS = {
0xA4: "POLL_RSP", 0xA5: "BULK_WAVEFORM_RSP", 0xE0: "ADVANCE_EVENT_RSP",
0xE1: "EVENT_IDX_FIRST_RSP", 0xE3: "MONITOR_STATUS_RSP", 0xEA: "SERIAL_NUM_RSP",
0xF3: "WAVEFORM_RECORD_RSP", 0xF5: "WAVEFORM_HEADER_RSP", 0xF7: "EVENT_INDEX_RSP",
0xF9: "UNK_06_RSP", 0xFE: "DEVICE_INFO_RSP",
0x69: "START_MONITOR_ACK", 0x68: "STOP_MONITOR_ACK",
0x97: "EVT_IDX_WRITE_ACK", 0x8C: "CONFIRM_B_ACK", 0x8E: "COMPLIANCE_WRITE_ACK",
0x8D: "CONFIRM_A_ACK", 0x7D: "TRIGGER_WRITE_ACK", 0x7C: "TRIGGER_CONFIRM_ACK",
0x96: "WAVEFORM_WRITE_ACK", 0x8B: "CONFIRM_C_ACK",
}
def _label_frame(frame: S3Frame) -> str:
name = _KNOWN_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}")
chk = "" if frame.checksum_valid else "✗ BAD_CHK"
peek = frame.data[:24].hex() + ("" if len(frame.data) > 24 else "")
return (
f"S3 SUB=0x{frame.sub:02X} ({name:<22}) "
f"page=0x{frame.page_key:04X} data={len(frame.data):4d}B {chk} {peek}"
)
# ── Logger ────────────────────────────────────────────────────────────────────
class Logger:
def __init__(self, log_path: Path, jsonl_path: Path, raw_path: Path) -> None:
self._log = log_path.open("a", encoding="utf-8", newline="")
self._jl = jsonl_path.open("a", encoding="utf-8", newline="")
self._raw = raw_path.open("ab")
self._lock = threading.Lock()
self._frame_count = 0
def info(self, msg: str) -> None:
line = f"[{_ts()}] INFO | {msg}"
with self._lock:
print(line)
print(line, file=self._log, flush=True)
def ctrl(self, msg: str) -> None:
line = f"[{_ts()}] CTRL | {msg}"
with self._lock:
print(line)
print(line, file=self._log, flush=True)
def data_hex(self, msg: str) -> None:
line = f"[{_ts()}] HEX | {msg}"
with self._lock:
print(line)
print(line, file=self._log, flush=True)
def data_ascii(self, msg: str) -> None:
line = f"[{_ts()}] DATA | {msg}"
with self._lock:
print(line)
print(line, file=self._log, flush=True)
def frame(self, f: S3Frame) -> None:
with self._lock:
self._frame_count += 1
label = f"[{_ts()}] FRAME | #{self._frame_count:04d} {_label_frame(f)}"
print(label)
print(label, file=self._log, flush=True)
record = {
"frame": self._frame_count,
"sub": f.sub,
"page_key": f.page_key,
"data_len": len(f.data),
"data_hex": f.data.hex(),
"checksum_valid": f.checksum_valid,
}
print(json.dumps(record), file=self._jl, flush=True)
def write_raw(self, data: bytes) -> None:
with self._lock:
self._raw.write(data)
self._raw.flush()
def close(self) -> None:
with self._lock:
for fh in (self._log, self._jl, self._raw):
try:
fh.flush()
fh.close()
except Exception:
pass
# ── Control-line monitor thread ───────────────────────────────────────────────
def _monitor_control_lines(
ser: serial.Serial,
logger: Logger,
stop: threading.Event,
interval: float,
) -> None:
prev = dict(CTS=None, DSR=None, DCD=None, RI=None)
try:
prev.update(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd)
try:
prev["RI"] = ser.ri
except Exception:
pass
except Exception as exc:
logger.ctrl(f"Init error: {exc}")
return
logger.ctrl(
f"Initial: CTS={prev['CTS']} DSR={prev['DSR']} DCD={prev['DCD']} RI={prev['RI']}"
)
while not stop.is_set():
try:
cur = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd, RI=None)
try:
cur["RI"] = ser.ri
except Exception:
pass
for name, val in cur.items():
if val != prev[name]:
logger.ctrl(f"{name}{val}")
prev[name] = val
except serial.SerialException as exc:
logger.ctrl(f"Poll error: {exc}")
break
stop.wait(interval)
# ── Serial open ───────────────────────────────────────────────────────────────
_PARITY = {
"N": serial.PARITY_NONE, "E": serial.PARITY_EVEN, "O": serial.PARITY_ODD,
"M": serial.PARITY_MARK, "S": serial.PARITY_SPACE,
}
_STOPBITS = {
1: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, 2: serial.STOPBITS_TWO,
}
def _open_serial(args: argparse.Namespace, logger: Logger) -> serial.Serial | None:
for attempt in range(1, args.open_retries + 2):
logger.info(
f"Opening {args.port} @ {args.baud},{args.bytesize}{args.parity}{args.stopbits} "
f"rtscts={args.rtscts} xonxoff={args.xonxoff} dsrdtr={args.dsrdtr} "
f"(attempt {attempt})"
)
try:
ser = serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=args.bytesize,
parity=_PARITY[args.parity],
stopbits=_STOPBITS[args.stopbits],
timeout=args.timeout,
xonxoff=args.xonxoff,
rtscts=args.rtscts,
dsrdtr=args.dsrdtr,
write_timeout=0,
)
try:
ser.setDTR(args.dtr == "on")
ser.setRTS(args.rts == "on")
logger.ctrl(f"Set DTR={args.dtr} RTS={args.rts}")
except Exception as exc:
logger.ctrl(f"DTR/RTS set failed: {exc}")
if args.send_break > 0:
try:
ser.break_condition = True
time.sleep(args.send_break / 1000.0)
ser.break_condition = False
logger.ctrl(f"BREAK held {args.send_break} ms")
except Exception as exc:
logger.ctrl(f"BREAK failed: {exc}")
return ser
except serial.SerialException as exc:
logger.info(f"Open failed: {exc}")
if attempt <= args.open_retries:
time.sleep(args.open_retry_delay)
return None
# ── Port picker ───────────────────────────────────────────────────────────────
def _list_ports() -> list:
ports = list(list_ports.comports())
if not ports:
print("No serial ports found.")
return []
print("Available serial ports:")
for i, p in enumerate(ports, 1):
print(f" {i:2d}) {p.device:<12} {p.description or ''}")
return ports
def _pick_port() -> str:
ports = _list_ports()
if not ports:
sys.exit(1)
if len(ports) == 1:
print(f"Auto-selecting: {ports[0].device}")
return ports[0].device
while True:
sel = input("Select port (number or name, e.g. COM3): ").strip()
if sel.isdigit() and 1 <= int(sel) <= len(ports):
return ports[int(sel) - 1].device
for p in ports:
if p.device.upper() == sel.upper():
return p.device
print("Not recognised. Enter list number or exact port name.")
# ── Main loop ─────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(
description="Monitor Instantel Series-3 serial traffic with S3 frame parsing."
)
ap.add_argument("--port", "-p",
help="COM port (e.g. COM3). Omit to be prompted.")
ap.add_argument("--baud", "-b", type=int, default=38400)
ap.add_argument("--bytesize", type=int, choices=[5, 6, 7, 8], default=8)
ap.add_argument("--parity", choices=["N", "E", "O", "M", "S"], default="N")
ap.add_argument("--stopbits", type=float, choices=[1, 1.5, 2], default=1)
ap.add_argument("--rtscts", action="store_true")
ap.add_argument("--xonxoff", action="store_true")
ap.add_argument("--dsrdtr", action="store_true")
ap.add_argument("--dtr", choices=["on", "off"], default="on")
ap.add_argument("--rts", choices=["on", "off"], default="on")
ap.add_argument("--send-break", type=int, default=0,
help="Hold BREAK for N ms after open.")
ap.add_argument("--show", choices=["ascii", "hex", "both", "frames"],
default="frames",
help="'frames' (default) shows only parsed S3 frames. "
"'ascii'/'hex'/'both' also show raw bytes.")
ap.add_argument("--encoding", default="latin1")
ap.add_argument("--read-chunk", type=int, default=4096)
ap.add_argument("--timeout", type=float, default=0.05)
ap.add_argument("--poll-lines-interval", type=float, default=0.2)
ap.add_argument("--open-retries", type=int, default=0)
ap.add_argument("--open-retry-delay", type=float, default=0.8)
ap.add_argument("--ack-ok", action="store_true",
help="Auto-reply OK to AT* commands (except ATDT). "
"Useful for testing without a real modem.")
ap.add_argument("--list", action="store_true",
help="List available serial ports and exit.")
args = ap.parse_args()
if args.list:
_list_ports()
return
args.port = args.port or _pick_port()
# Build output paths
ts_str = datetime.now().strftime("%Y%m%d_%H%M%S")
out_dir = Path(__file__).parent / "captures" / f"serial_{ts_str}"
out_dir.mkdir(parents=True, exist_ok=True)
log_path = out_dir / f"session_{ts_str}.log"
jsonl_path = out_dir / f"session_{ts_str}.jsonl"
raw_path = out_dir / f"raw_s3_{ts_str}.bin"
logger = Logger(log_path, jsonl_path, raw_path)
logger.info(f"Output directory: {out_dir}")
logger.info(f"raw_s3 → {raw_path.name} (compatible with parse_capture.py)")
ser = _open_serial(args, logger)
if ser is None:
logger.info("Could not open serial port. Exiting.")
logger.close()
sys.exit(1)
s3_parser = S3FrameParser()
rx_buf = bytearray()
stop_evt = threading.Event()
ctrl_thread = threading.Thread(
target=_monitor_control_lines,
args=(ser, logger, stop_evt, args.poll_lines_interval),
daemon=True,
)
ctrl_thread.start()
logger.info("Monitoring started. Waiting for call-home. Press Ctrl+C to stop.")
try:
while True:
try:
data = ser.read(args.read_chunk)
except serial.SerialException as exc:
logger.info(f"Read error: {exc}")
break
if not data:
continue
# 1. Save raw bytes
logger.write_raw(data)
# 2. Optional raw display
if args.show in ("ascii", "both"):
txt = _printable(data)
for line in txt.splitlines():
logger.data_ascii(line)
if args.show in ("hex", "both"):
logger.data_hex(_hexdump(data))
# 3. Parse S3 frames
for byte in data:
result = s3_parser.feed(bytes([byte]))
if result:
frames = result if isinstance(result, list) else [result]
for f in frames:
logger.frame(f)
# 4. AT command handling for --ack-ok
if args.ack_ok:
rx_buf.extend(data)
while b"\r" in rx_buf or b"\n" in rx_buf:
for sep in (b"\r", b"\n"):
idx = rx_buf.find(sep)
if idx != -1:
line_bytes = bytes(rx_buf[:idx])
del rx_buf[:idx + 1]
break
else:
break
line_str = line_bytes.decode("latin1", errors="ignore").strip().upper()
if line_str.startswith("AT") and not line_str.startswith("ATDT"):
try:
ser.write(b"\r\nOK\r\n")
ser.flush()
logger.info(f"AT ack: {line_str!r} → OK")
except Exception as exc:
logger.info(f"AT ack write failed: {exc}")
except KeyboardInterrupt:
logger.info("Ctrl+C — stopping.")
finally:
stop_evt.set()
try:
ser.close()
except Exception:
pass
ctrl_thread.join(timeout=1.0)
logger.info(f"Capture saved to: {out_dir}")
logger.close()
if __name__ == "__main__":
main()
+3 -2
View File
@@ -93,7 +93,8 @@
| 2026-04-06 | §7.8.4 | **NEW — 5A chunk timing and count (empirical, BE11529 at 1024 sps).** Each chunk response arrives within ~1 second over TCP/cellular. A 9,306-sample event (≈9.1 s at 1024 sps) produces **35 chunks** before end-of-stream. Chunks 116 have varying data lengths (10361123 bytes); chunks 1735 are uniformly 1036 bytes each (post-event silence, all-zero ADC samples). Safe recv timeout for chunk loop: **10 s** (10× typical response time). Default transport timeout (120 s) results in a ~2-minute stall per event at end-of-stream. | | 2026-04-06 | §7.8.4 | **NEW — 5A chunk timing and count (empirical, BE11529 at 1024 sps).** Each chunk response arrives within ~1 second over TCP/cellular. A 9,306-sample event (≈9.1 s at 1024 sps) produces **35 chunks** before end-of-stream. Chunks 116 have varying data lengths (10361123 bytes); chunks 1735 are uniformly 1036 bytes each (post-event silence, all-zero ADC samples). Safe recv timeout for chunk loop: **10 s** (10× typical response time). Default transport timeout (120 s) results in a ~2-minute stall per event at end-of-stream. |
| 2026-04-06 | §7.8.3 | **KNOWN ISSUE — `_decode_a5_waveform` hardcoded fi==9 skip.** The decoder contains `elif fi == 9: continue` which was written for the 9-frame original blast capture where frame 9 was a device terminator. For streams with >9 frames (current device produces 35+), frame index 9 is live waveform data — this skip discards ~1,070 bytes (~133 sample-sets) per event. The terminator is now detected via `page_key == 0x0000`, not by frame index. The fi==9 skip should be removed. | | 2026-04-06 | §7.8.3 | **KNOWN ISSUE — `_decode_a5_waveform` hardcoded fi==9 skip.** The decoder contains `elif fi == 9: continue` which was written for the 9-frame original blast capture where frame 9 was a device terminator. For streams with >9 frames (current device produces 35+), frame index 9 is live waveform data — this skip discards ~1,070 bytes (~133 sample-sets) per event. The terminator is now detected via `page_key == 0x0000`, not by frame index. The fi==9 skip should be removed. |
| 2026-04-06 | §7.8 | **CONFIRMED — ADC count-to-physical-unit conversion.** Raw waveform samples are signed 16-bit integers (counts). Conversion: `value = counts × (range / 32767)`. For geo channels: range = 10.000 in/s (from the device's compliance config geo range field). For the mic channel: range is in psi (device-specific). Near-full-scale counts (≈32,700) on all four channels simultaneously indicate ADC saturation (clipping) from a high-amplitude event. | | 2026-04-06 | §7.8 | **CONFIRMED — ADC count-to-physical-unit conversion.** Raw waveform samples are signed 16-bit integers (counts). Conversion: `value = counts × (range / 32767)`. For geo channels: range = 10.000 in/s (from the device's compliance config geo range field). For the mic channel: range is in psi (device-specific). Near-full-scale counts (≈32,700) on all four channels simultaneously indicate ADC saturation (clipping) from a high-amplitude event. |
| 2026-04-08 | §5.1, §7.10, §12 | **NEW — Monitoring commands confirmed.** SUB 0x1C (monitor status), 0x96 (start monitoring), 0x97 (stop monitoring) all confirmed from 4-8-26/2ndtry capture. SESSION_RESET (`41 03`) required before POLL to wake a monitoring unit. `section[6] == 0x10` is the monitoring flag (CORRECTED 2026-04-08 — was wrongly `section[1]`). Battery/memory at relative-from-end offsets: `section[-11:-9]` (battery×100), `section[-9:-5]` (memory_total), `section[-5:-1]` (memory_free) — stable across all payload size variants (5255 bytes). | | 2026-04-08 | §5.1, §7.10, §12 | **NEW — Monitoring commands confirmed.** SUB 0x1C (monitor status), 0x96 (start monitoring), 0x97 (stop monitoring) all confirmed from 4-8-26/2ndtry capture. SESSION_RESET (`41 03`) required before POLL to wake a monitoring unit. |
| 2026-04-09 | §7.10 | **CORRECTED — monitoring flag and battery/memory offsets.** `section[1] == 0x10` is the monitoring flag (100% accurate across 144 data frames in 2ndtry capture). Previous note claiming `section[6]` was wrong — section[6] has device-specific non-binary values (0xea/0x07). Battery/memory offsets corrected: `section[-10:-8]` (battery×100), `section[-8:-4]` (memory_total), `section[-4:]` (memory_free). NOTE: `frame.data` has checksum stripped by parser — earlier offsets of `[-11:-9]`/`[-9:-5]`/`[-5:-1]` were wrong because they assumed a trailing checksum byte that isn't there. |
| 2026-04-08 | §7.10 | **NEW — SUBs 0x0E (channel sensor data) and 0x98 (trigger test) observed** in 4-8-26/sensor-check capture (Blastware "Unit Channel Test" comms check). SUB 0x0E: 2-step read with channel selector in `params[6:8]`, data length 0x0A per channel, RSP SUB = 0xF1. SUB 0x98: single probe frame with `params[0] = 0xFF`, RSP SUB = 0x67; sent twice per test cycle. Not yet implemented in SFM. | | 2026-04-08 | §7.10 | **NEW — SUBs 0x0E (channel sensor data) and 0x98 (trigger test) observed** in 4-8-26/sensor-check capture (Blastware "Unit Channel Test" comms check). SUB 0x0E: 2-step read with channel selector in `params[6:8]`, data length 0x0A per channel, RSP SUB = 0xF1. SUB 0x98: single probe frame with `params[0] = 0xFF`, RSP SUB = 0x67; sent twice per test cycle. Not yet implemented in SFM. |
| 2026-04-08 | §7.10 | **NEW — SUBs 0x15 and 0x01 observed in sensor-check capture.** SUB 0x15 (serial number short form, data length 0x0A, RSP 0xEA) and SUB 0x01 (device info block, data length 0x98 = 152 bytes, RSP 0xFE) seen in Blastware's "Unit Channel Test" init sequence. Note: SUB 0x01 response SUB 0xFE collides with the existing SUB 0xFE → RSP 0x01 naming convention — they are inverse commands. | | 2026-04-08 | §7.10 | **NEW — SUBs 0x15 and 0x01 observed in sensor-check capture.** SUB 0x15 (serial number short form, data length 0x0A, RSP 0xEA) and SUB 0x01 (device info block, data length 0x98 = 152 bytes, RSP 0xFE) seen in Blastware's "Unit Channel Test" init sequence. Note: SUB 0x01 response SUB 0xFE collides with the existing SUB 0xFE → RSP 0x01 naming convention — they are inverse commands. |
| 2026-04-08 | §12 | **CONFIRMED — Unit partially reachable during on-device sensor check.** 4-8-26/sensor-check capture shows: POLL responds normally throughout; SUB 0x0E channel reads partially served (channels 04 responded), then ~40s silent gap while sensor check ran, then channels 57 responded. On-device sensor check duration ≈ 40 s. SFM `_pollMonitorConfirm()` polls status every 5 s for up to 60 s after start_monitoring. | | 2026-04-08 | §12 | **CONFIRMED — Unit partially reachable during on-device sensor check.** 4-8-26/sensor-check capture shows: POLL responds normally throughout; SUB 0x0E channel reads partially served (channels 04 responded), then ~40s silent gap while sensor check ran, then channels 57 responded. On-device sensor check duration ≈ 40 s. SFM `_pollMonitorConfirm()` polls status every 5 s for up to 60 s after start_monitoring. |
@@ -256,7 +257,7 @@ Step 4 — Device sends actual data payload:
| `2E` | **UNKNOWN READ B** | Read command, response (`D1`) returns 0x1A (26) bytes. Purpose unknown. | 🔶 INFERRED | | `2E` | **UNKNOWN READ B** | Read command, response (`D1`) returns 0x1A (26) bytes. Purpose unknown. | 🔶 INFERRED |
| `0E` | **CHANNEL SENSOR DATA** | Real-time sensor reading for one channel. Two-step read, data length 0x0A (10 bytes). Channel selector in params[6:8] (0x00000x0007 for 8 channels). Response (F1) carries amplitude, frequency, overswing data for that channel. Used by Blastware "Unit Channel Test" comms check. | ✅ CONFIRMED 2026-04-08 | | `0E` | **CHANNEL SENSOR DATA** | Real-time sensor reading for one channel. Two-step read, data length 0x0A (10 bytes). Channel selector in params[6:8] (0x00000x0007 for 8 channels). Response (F1) carries amplitude, frequency, overswing data for that channel. Used by Blastware "Unit Channel Test" comms check. | ✅ CONFIRMED 2026-04-08 |
| `98` | **TRIGGER TEST** | Trigger-test command. Single probe frame; `params[0] = 0xFF`. Response (0x67) is all-zero data. Sent twice per Blastware comms-check cycle. Not a full POLL, no monitor state change. | ✅ CONFIRMED 2026-04-08 | | `98` | **TRIGGER TEST** | Trigger-test command. Single probe frame; `params[0] = 0xFF`. Response (0x67) is all-zero data. Sent twice per Blastware comms-check cycle. Not a full POLL, no monitor state change. | ✅ CONFIRMED 2026-04-08 |
| `1C` | **MONITOR STATUS READ** | Two-step read, data offset 0x2C (44 bytes). `section[6] == 0x10` → monitoring; `0x00` → idle (CORRECTED 2026-04-08 — was wrongly documented as section[1]). Payload length varies (5255 bytes) but battery/memory block is always the last 10 bytes before checksum: `section[-11:-9]` = battery×100 (uint16 BE), `section[-9:-5]` = memory_total (uint32 BE), `section[-5:-1]` = memory_free (uint32 BE). Confirmed from 2ndtry 4-8-26 full byte diff across 3 payload size variants. | ✅ CONFIRMED 2026-04-08 | | `1C` | **MONITOR STATUS READ** | Two-step read, data offset 0x2C (44 bytes). `section[1] == 0x10` → monitoring; `0x00` → idle (CONFIRMED 2026-04-09, 100% accuracy on 144 frames). Payload length: 4647 bytes IDLE, 4849 bytes MONITORING. `frame.data` has checksum stripped — no trailing byte to skip. Battery/memory at end: `section[-10:-8]` = battery×100 (uint16 BE), `section[-8:-4]` = memory_total (uint32 BE), `section[-4:]` = memory_free (uint32 BE). | ✅ CONFIRMED 2026-04-09 |
| `96` | **START MONITORING** | Single write frame, no data payload. Transitions unit from idle to monitoring mode (after optional on-device sensor check ~40 s). | ✅ CONFIRMED 2026-04-08 | | `96` | **START MONITORING** | Single write frame, no data payload. Transitions unit from idle to monitoring mode (after optional on-device sensor check ~40 s). | ✅ CONFIRMED 2026-04-08 |
| `97` | **STOP MONITORING** | Single write frame, no data payload. Stops monitoring, unit returns to idle. | ✅ CONFIRMED 2026-04-08 | | `97` | **STOP MONITORING** | Single write frame, no data payload. Stops monitoring, unit returns to idle. | ✅ CONFIRMED 2026-04-08 |
+634
View File
@@ -0,0 +1,634 @@
#!/usr/bin/env python3
"""
experiments.py Protocol minimization experiments for MiniMate Plus.
Goal: figure out which steps in Blastware's sequences are truly required vs.
cargo-culted, so we can build a faster, smarter client.
Each experiment is self-contained (opens its own TCP connection) and reports
PASS / FAIL / INCONCLUSIVE with timing and notes.
Usage:
python experiments.py [--host IP] [--port PORT] [exp1 exp2 ...]
Run all: python experiments.py
Run specific: python experiments.py cold_status fast_event_count no_5a
Available experiments
---------------------
cold_status EXP1 Monitor status (1C) with NO prior POLL
fast_event_count EXP2 Event count via POLL+08 only skip identity reads
no_5a EXP3 Event record (0C) without bulk waveform stream (5A)
skip_1e EXP4 0A/0C directly with cached key skip initial 1E
fewer_polls EXP5 Only 1 POLL before 5A instead of Blastware's 3
compliance_only EXP6 Write compliance ONLY (71x372), skip event index+trigger+waveform
"""
from __future__ import annotations
import argparse
import logging
import struct
import sys
import time
from dataclasses import dataclass, field
from typing import Optional
logging.basicConfig(
level=logging.WARNING, # experiment output is via print(); set DEBUG for wire trace
format="%(asctime)s %(levelname)-7s %(name)-20s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("experiments")
# ── Imports ───────────────────────────────────────────────────────────────────
from minimateplus.transport import TcpTransport
from minimateplus.protocol import (
MiniMateProtocol,
ProtocolError,
TimeoutError as ProtoTimeout,
SUB_MONITOR_STATUS,
SUB_SERIAL_NUMBER,
SUB_FULL_CONFIG,
SUB_EVENT_INDEX,
SUB_COMPLIANCE,
SUB_WRITE_CONFIRM_A,
SUB_WRITE_CONFIRM_B,
)
from minimateplus.framing import build_bw_frame, SESSION_RESET
from minimateplus.client import (
MiniMateClient,
_decode_compliance_config_into,
_encode_compliance_config,
)
from minimateplus.models import DeviceInfo
DEFAULT_HOST = "63.43.212.232"
DEFAULT_PORT = 9034
# ── Result container ──────────────────────────────────────────────────────────
@dataclass
class Result:
name: str
outcome: str # "PASS" | "FAIL" | "INCONCLUSIVE"
elapsed: float = 0.0
notes: str = ""
details: dict = field(default_factory=dict)
def __str__(self) -> str:
sym = {"PASS": "", "FAIL": "", "INCONCLUSIVE": "⚠️ "}.get(self.outcome, "?")
lines = [f" {sym} {self.outcome:13s} {self.name} ({self.elapsed:.1f}s)"]
if self.notes:
lines.append(f" {self.notes}")
for k, v in self.details.items():
lines.append(f" {k}: {v}")
return "\n".join(lines)
# ── Connection helpers ────────────────────────────────────────────────────────
def connect_proto(host: str, port: int, timeout: float = 15.0) -> tuple[TcpTransport, MiniMateProtocol]:
"""Open a raw TCP connection and return (transport, proto) without any handshake."""
t = TcpTransport(host, port)
t.connect()
proto = MiniMateProtocol(t, recv_timeout=timeout)
return t, proto
def connect_client(host: str, port: int, timeout: float = 30.0) -> tuple[MiniMateClient, DeviceInfo]:
"""Open a MiniMateClient and run the full connect() handshake."""
transport = TcpTransport(host, port)
client = MiniMateClient(transport=transport, timeout=timeout)
client.open()
info = client.connect()
return client, info
# ── Experiment runner ─────────────────────────────────────────────────────────
def run(name: str, fn, *args, **kwargs) -> Result:
print(f"\n{''*60}")
print(f" Running: {name}")
print(f"{''*60}")
t0 = time.time()
try:
outcome, notes, details = fn(*args, **kwargs)
except Exception as exc:
outcome = "FAIL"
notes = f"Uncaught exception: {exc}"
details = {}
log.exception("Experiment %s raised:", name)
elapsed = time.time() - t0
r = Result(name=name, outcome=outcome, elapsed=elapsed, notes=notes, details=details)
print(str(r))
return r
# ══════════════════════════════════════════════════════════════════════════════
# EXP1 — Monitor status (1C) with NO prior POLL
# ══════════════════════════════════════════════════════════════════════════════
#
# Blastware always does a full POLL handshake before any other command.
# We want to know: can we query SUB 1C (battery, memory, monitoring state)
# cold, with only a SESSION_RESET signal and no POLL at all?
#
# If PASS: status checks become near-instant (no ~1s POLL round-trip).
# If FAIL: we need POLL first, but maybe we can cache it.
def exp_cold_status(host: str, port: int) -> tuple[str, str, dict]:
"""SUB 1C without any POLL — just SESSION_RESET + 1C probe + 1C data."""
t, proto = connect_proto(host, port)
try:
print(" Sending SESSION_RESET only (no POLL)")
t.write(SESSION_RESET)
time.sleep(0.1)
print(" Sending SUB 1C probe (no POLL first)…")
rsp_sub = (0xFF - SUB_MONITOR_STATUS) & 0xFF # 0xE3
t.write(build_bw_frame(SUB_MONITOR_STATUS, 0x00))
probe = proto._recv_one(expected_sub=rsp_sub, timeout=8.0)
print(f" 1C probe OK page_key=0x{probe.page_key:04X} data={probe.data.hex()}")
t.write(build_bw_frame(SUB_MONITOR_STATUS, 0x2C))
data_rsp = proto._recv_one(expected_sub=rsp_sub, timeout=8.0)
section = data_rsp.data
print(f" 1C data OK {len(section)} bytes hex: {section.hex()}")
# Decode battery + memory from the end of the section
details = {"raw_bytes": len(section)}
if len(section) >= 10:
batt_raw = struct.unpack_from(">H", section, len(section) - 10)[0]
mem_total = struct.unpack_from(">I", section, len(section) - 8)[0]
mem_free = struct.unpack_from(">I", section, len(section) - 4)[0]
is_monitoring = (section[1] == 0x10)
details["battery_v"] = f"{batt_raw / 100:.2f} V"
details["memory_total"] = f"{mem_total:,} bytes"
details["memory_free"] = f"{mem_free:,} bytes"
details["monitoring"] = is_monitoring
print(f" battery={batt_raw/100:.2f}V mem_free={mem_free:,} monitoring={is_monitoring}")
return "PASS", "SUB 1C responded without any POLL — cold status read works!", details
except ProtoTimeout:
return "FAIL", "Device did not respond to 1C without POLL (timeout)", {}
except ProtocolError as exc:
return "FAIL", f"Protocol error: {exc}", {}
finally:
t.disconnect()
# ══════════════════════════════════════════════════════════════════════════════
# EXP2 — Fast event count: POLL + SUB 08 only (skip identity reads)
# ══════════════════════════════════════════════════════════════════════════════
#
# Blastware's connect() does: POLL → 15 → 01 → 1A → 08
# We want to know: can we skip 15/01/1A and go straight from POLL to 08?
#
# Reading identity (15, 01) and full compliance (1A, ~2126 bytes over TCP)
# takes several seconds each connect. If we only need event count, skipping
# them would be a huge win.
#
# If PASS: fast status poll = POLL + 08 only (~2 round trips vs ~8+).
def exp_fast_event_count(host: str, port: int) -> tuple[str, str, dict]:
"""POLL startup → SUB 08 only, skip serial/config/compliance reads."""
t, proto = connect_proto(host, port)
try:
print(" Running startup (POLL only)…")
proto.startup()
print(" POLL OK — now reading SUB 08 (event index) directly…")
idx_raw = proto.read_event_index()
print(f" SUB 08 OK {len(idx_raw)} bytes")
# Try to decode event count from SUB 08 payload
# The raw block is 88 bytes; bytes [3:7] may be a count (uint32 BE)
details = {"idx_raw_len": len(idx_raw)}
if len(idx_raw) >= 7:
count_candidate = struct.unpack_from(">I", idx_raw, 3)[0]
details["count_candidate"] = count_candidate
print(f" idx[3:7] as uint32 BE = {count_candidate} (may or may not be event count)")
# Also verify we can read 1E without the identity reads having been done
print(" Reading 1E (event header) to confirm event access works…")
key4, data8 = proto.read_event_first()
is_empty = data8[4:8] == b"\x00\x00\x00\x00"
details["first_key"] = key4.hex()
details["is_empty"] = is_empty
print(f" 1E OK key={key4.hex()} empty={is_empty}")
return "PASS", "POLL+08+1E all work without identity reads (15/01/1A skipped)", details
except ProtocolError as exc:
return "FAIL", f"Protocol error: {exc}", {}
finally:
t.disconnect()
# ══════════════════════════════════════════════════════════════════════════════
# EXP3 — Get event record (0C) without bulk waveform stream (5A)
# ══════════════════════════════════════════════════════════════════════════════
#
# Blastware's event download = 1E → 0A → 1E-arm → 0C → 1F(dl) → POLL×3 → 5A → 1F(browse)
#
# The 5A bulk stream is the slow part (several large frames, ~1s+ per event).
# We only need 5A for: client, operator, seis_loc, notes (not in 0C).
# If you don't need those fields, can we do: 1E → 0A → 0C → 1F(browse) ?
#
# Two variants tested:
# 3a: Skip 1E-arm AND 5A — just 0A → 0C → 1F(browse)
# 3b: Include 1E-arm but skip 5A+POLL — 0A → 1E-arm → 0C → 1F(browse)
#
# If PASS: event peak values available without the slow bulk stream.
# If FAIL on 3a but PASS on 3b: 1E-arm required even without 5A.
def exp_no_5a(host: str, port: int) -> tuple[str, str, dict]:
"""Event record via 0A→0C without 5A or POLL×3. Tests both with and without 1E-arm."""
t, proto = connect_proto(host, port)
try:
print(" Startup (POLL)…")
proto.startup()
# Get the first event key via 1E
key4, data8 = proto.read_event_first()
if data8[4:8] == b"\x00\x00\x00\x00":
return "INCONCLUSIVE", "Device has no stored events — cannot test", {}
print(f" First event key: {key4.hex()}")
details: dict = {"key": key4.hex()}
# ── Variant 3a: 0A → 0C → 1F(browse), no 1E-arm ─────────────────────
print("\n [3a] 0A → 0C → 1F(browse) (NO 1E-arm, NO 5A)")
try:
_hdr, rec_len = proto.read_waveform_header(key4)
print(f" 0A OK rec_len=0x{rec_len:02X}")
record_3a = proto.read_waveform_record(key4)
print(f" 0C OK {len(record_3a)} bytes")
# Check for recognizable content
has_tran = b"Tran" in record_3a
has_vert = b"Vert" in record_3a
has_long = b"Long" in record_3a
print(f" 0C content check: Tran={has_tran} Vert={has_vert} Long={has_long}")
details["3a_0c_bytes"] = len(record_3a)
details["3a_has_peaks"] = has_tran and has_vert and has_long
# Now try browse 1F without any 5A
key4_next, data8_next = proto.advance_event(browse=True)
null_sentinel = data8_next[4:8] == b"\x00\x00\x00\x00"
print(f" 1F(browse) → key={key4_next.hex()} null={null_sentinel}")
details["3a_1f_ok"] = True
details["3a_outcome"] = "PASS"
except ProtocolError as exc:
print(f" 3a FAILED: {exc}")
details["3a_outcome"] = f"FAIL: {exc}"
# Try to recover by reconnecting for 3b
t.disconnect()
t2, proto2 = connect_proto(host, port)
proto2.startup()
key4, data8 = proto2.read_event_first()
if data8[4:8] == b"\x00\x00\x00\x00":
return "FAIL", f"3a failed and device empty on retry: {exc}", details
t, proto = t2, proto2
# ── Variant 3b: 0A → 1E-arm → 0C → 1F(browse), no 5A ───────────────
print("\n [3b] 0A → 1E-arm(0xFE) → 0C → 1F(browse) (NO POLL×3, NO 5A)")
try:
_hdr, rec_len = proto.read_waveform_header(key4)
print(f" 0A OK rec_len=0x{rec_len:02X}")
# 1E download-arm (token=0xFE) between 0A and 0C
proto.read_event_first(token=0xFE)
print(" 1E-arm OK")
record_3b = proto.read_waveform_record(key4)
print(f" 0C OK {len(record_3b)} bytes")
has_tran = b"Tran" in record_3b
print(f" 0C content check: Tran={has_tran} Vert={b'Vert' in record_3b}")
details["3b_0c_bytes"] = len(record_3b)
details["3b_has_peaks"] = has_tran
# Browse 1F without 5A / POLL×3
key4_next2, data8_next2 = proto.advance_event(browse=True)
null_sentinel2 = data8_next2[4:8] == b"\x00\x00\x00\x00"
print(f" 1F(browse) → key={key4_next2.hex()} null={null_sentinel2}")
details["3b_1f_ok"] = True
details["3b_outcome"] = "PASS"
except ProtocolError as exc:
print(f" 3b FAILED: {exc}")
details["3b_outcome"] = f"FAIL: {exc}"
# Summarize
a_ok = details.get("3a_outcome") == "PASS"
b_ok = details.get("3b_outcome") == "PASS"
if a_ok:
return "PASS", "3a: 0A→0C works with NO 1E-arm and NO 5A. Huge speedup possible!", details
elif b_ok:
return "PASS", "3b: 0A→1E-arm→0C works without 5A (1E-arm still needed before 0C)", details
else:
return "FAIL", "Both 3a and 3b failed — 5A may be required for device state", details
except ProtocolError as exc:
return "FAIL", f"Protocol error during setup: {exc}", {}
finally:
try:
t.disconnect()
except Exception:
pass
# ══════════════════════════════════════════════════════════════════════════════
# EXP4 — Skip initial 1E if we already know the event key
# ══════════════════════════════════════════════════════════════════════════════
#
# In Blastware, every session starts with 1E to discover the first key.
# But if we already fetched and cached the event keys from a previous session,
# can we skip 1E entirely and go straight to 0A(cached_key)?
#
# Practical use case: we poll the device every N minutes. We already know
# all the event keys from last time. On re-connect, can we go direct to 0A?
#
# If PASS: subsequent polls that don't add new events can skip 1E discovery.
def exp_skip_1e(host: str, port: int) -> tuple[str, str, dict]:
"""Get the first event key, disconnect, reconnect, go straight to 0A (skip 1E)."""
# Phase 1: get the key
t, proto = connect_proto(host, port)
try:
proto.startup()
key4, data8 = proto.read_event_first()
if data8[4:8] == b"\x00\x00\x00\x00":
return "INCONCLUSIVE", "No events stored — cannot test", {}
print(f" Phase 1: got event key = {key4.hex()}")
finally:
t.disconnect()
time.sleep(0.5)
# Phase 2: fresh connection, skip 1E, go straight to 0A with cached key
t2, proto2 = connect_proto(host, port)
try:
print(" Phase 2: fresh connection — startup + 0A directly (no 1E)")
proto2.startup()
_hdr, rec_len = proto2.read_waveform_header(key4)
print(f" 0A OK rec_len=0x{rec_len:02X}")
record = proto2.read_waveform_record(key4)
has_peaks = b"Tran" in record
print(f" 0C OK {len(record)} bytes has_peaks={has_peaks}")
details = {
"cached_key": key4.hex(),
"0c_bytes": len(record),
"has_peaks": has_peaks,
}
return "PASS", "0A works with cached key — 1E discovery can be skipped on known sessions", details
except ProtocolError as exc:
return "FAIL", f"0A failed with cached key (device needs 1E first?): {exc}", {"key": key4.hex()}
finally:
t2.disconnect()
# ══════════════════════════════════════════════════════════════════════════════
# EXP5 — Fewer POLLs before 5A (try POLL×1 instead of Blastware's POLL×3)
# ══════════════════════════════════════════════════════════════════════════════
#
# Blastware always sends 3 full POLL probe+data cycles between 1F and 5A.
# Each POLL is a round trip. Can we get away with just 1?
#
# WARNING: If POLL×1 fails, the device may be in a bad state. We try to
# recover with an extra POLL×2 and a fresh 5A attempt. Even on failure we
# try to leave the device in a usable state.
#
# Strategy: run the full event sequence up to 1F(download), then try 5A
# with only 1 POLL. If 5A responds → PASS. If timeout → try 2 more POLLs
# and check if the device recovers.
def exp_fewer_polls(host: str, port: int) -> tuple[str, str, dict]:
"""Full sequence to 1F, then only 1 POLL before 5A (Blastware does 3)."""
t, proto = connect_proto(host, port)
try:
proto.startup()
key4, data8 = proto.read_event_first()
if data8[4:8] == b"\x00\x00\x00\x00":
return "INCONCLUSIVE", "No events stored — cannot test", {}
print(f" Event key: {key4.hex()}")
# Full setup: 0A → 1E-arm → 0C → 1F(download)
_hdr, rec_len = proto.read_waveform_header(key4)
print(f" 0A OK rec_len=0x{rec_len:02X}")
proto.read_event_first(token=0xFE) # 1E-arm
print(" 1E-arm OK")
proto.read_waveform_record(key4)
print(" 0C OK")
arm_key4, _ = proto.advance_event(browse=False) # 1F(download) — arms 5A
print(f" 1F(download) OK arm_key={arm_key4.hex()}")
# Only 1 POLL (Blastware does 3)
print(" Sending 1 POLL (instead of 3)…")
proto.poll()
print(" POLL ok — now probing 5A…")
try:
frames = proto.read_bulk_waveform_stream(key4, stop_after_metadata=True, max_chunks=12)
print(f" 5A OK after 1 POLL — {len(frames)} frames received")
details = {"poll_count": 1, "frames": len(frames)}
return "PASS", "5A works with only 1 POLL (saved 2 round-trips per event)!", details
except ProtoTimeout:
print(" 5A timed out after 1 POLL — device needs more POLLs")
# Attempt recovery: send 2 more POLLs and see if 5A then works
print(" Attempting recovery: 2 more POLLs…")
try:
proto.poll()
proto.poll()
frames2 = proto.read_bulk_waveform_stream(key4, stop_after_metadata=True, max_chunks=12)
print(f" 5A worked after total 3 POLLs ({len(frames2)} frames)")
return "FAIL", "5A needs 3 POLLs — 1 is not enough (recovery confirmed 3 still works)", {
"poll_count_tried": 1, "recovery_polls": 3, "recovery_frames": len(frames2)
}
except ProtocolError as exc2:
return "FAIL", f"5A failed even after 3 total POLLs — device may need reconnect: {exc2}", {}
except ProtocolError as exc:
return "FAIL", f"Setup failed: {exc}", {}
finally:
t.disconnect()
# ══════════════════════════════════════════════════════════════════════════════
# EXP6 — Compliance-only write (71×3→72), skip event index + trigger + waveform
# ══════════════════════════════════════════════════════════════════════════════
#
# Blastware's full write sequence: 68→73 | 71×3→72 | 82→83 | 69→74→72
# We want to know: can we write ONLY the compliance block (71×3→72)?
#
# Test procedure:
# 1. Read current compliance config (SUB 1A)
# 2. Patch the "notes" field to a test marker
# 3. Write ONLY 71×3→72 (skip 68, 73, 82, 83, 69, 74, final 72)
# 4. Read back (SUB 1A) and verify the change was written
# 5. Restore original value
#
# If PASS: we can push individual config fields without touching event index,
# trigger config, or waveform data — huge simplification.
# If FAIL: the device needs the full write sequence (may reject partial write).
#
# SAFETY: We restore original data in a finally block. If the restore write
# fails, the device will have the test marker in "notes" — harmless but visible.
_EXP6_MARKER = "[exp6-test]"
def exp_compliance_only(host: str, port: int) -> tuple[str, str, dict]:
"""Write compliance block alone (71×3→72), verify, and restore."""
client, info = connect_client(host, port)
original_raw: Optional[bytes] = None
try:
proto = client._proto
if proto is None:
return "FAIL", "Could not get protocol handle from client", {}
# 1. Read current compliance
print(" Reading current compliance config (SUB 1A)…")
original_raw = proto.read_compliance_config()
print(f" Got {len(original_raw)} bytes of compliance config")
# Find current notes value for display
info_obj = DeviceInfo()
_decode_compliance_config_into(original_raw, info_obj)
cc = info_obj.compliance_config
orig_notes = cc.notes if cc else "(unknown)"
print(f" Current notes field: {orig_notes!r}")
# 2. Build modified payload with test marker in notes
test_notes = _EXP6_MARKER
modified_raw = _encode_compliance_config(
original_raw,
notes=test_notes,
)
print(f" Encoded modified compliance payload ({len(modified_raw)} bytes)")
print(f" Patching notes: {orig_notes!r}{test_notes!r}")
# 3. Write ONLY the compliance block: 71×3 → 72
print(" Writing compliance ONLY (71×3→72) — skipping 68/73/82/83/69/74…")
proto.write_compliance_config_raw(modified_raw)
print(" Write complete — device acked 71×3→72")
# 4. Read back and verify
print(" Reading back compliance config to verify…")
readback_raw = proto.read_compliance_config()
readback_info = DeviceInfo()
_decode_compliance_config_into(readback_raw, readback_info)
rb_cc = readback_info.compliance_config
readback_notes = rb_cc.notes if rb_cc else "(decode failed)"
print(f" Read-back notes: {readback_notes!r}")
write_worked = (readback_notes == test_notes)
print(f" Write verified: {write_worked}")
details = {
"original_notes": orig_notes,
"written_notes": test_notes,
"readback_notes": readback_notes,
"write_verified": write_worked,
}
if write_worked:
return "PASS", "Compliance-only write works! No event index or trigger writes needed.", details
else:
return "FAIL", f"Write was not reflected in read-back (got {readback_notes!r})", details
except ProtocolError as exc:
return "FAIL", f"Protocol error: {exc}", {}
finally:
# Restore original compliance data regardless of outcome
if original_raw is not None:
print(" Restoring original compliance config…")
try:
proto2 = client._proto
if proto2:
proto2.write_compliance_config_raw(
_encode_compliance_config(original_raw) # no-op patch = verbatim
)
print(" Restore complete")
else:
print(" WARNING: protocol handle gone — could not restore")
except Exception as exc_r:
print(f" WARNING: restore failed: {exc_r}")
client.close()
# ══════════════════════════════════════════════════════════════════════════════
# Registry + main
# ══════════════════════════════════════════════════════════════════════════════
EXPERIMENTS = {
"cold_status": ("EXP1", exp_cold_status, "Monitor status (1C) with no POLL"),
"fast_event_count": ("EXP2", exp_fast_event_count, "Event count via POLL+08, skip identity reads"),
"no_5a": ("EXP3", exp_no_5a, "Event record (0C) without bulk waveform (5A)"),
"skip_1e": ("EXP4", exp_skip_1e, "0A/0C with cached key — skip initial 1E"),
"fewer_polls": ("EXP5", exp_fewer_polls, "1 POLL before 5A instead of Blastware's 3"),
"compliance_only": ("EXP6", exp_compliance_only, "Compliance-only write (71×3→72), no other blocks"),
}
def main() -> None:
ap = argparse.ArgumentParser(description="MiniMate Plus protocol minimization experiments")
ap.add_argument("--host", default=DEFAULT_HOST)
ap.add_argument("--port", type=int, default=DEFAULT_PORT)
ap.add_argument("--debug", action="store_true", help="Enable DEBUG wire logging")
ap.add_argument("experiments", nargs="*",
help=f"Which to run (default: all). Choices: {', '.join(EXPERIMENTS)}")
args = ap.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
which = args.experiments or list(EXPERIMENTS.keys())
unknown = [e for e in which if e not in EXPERIMENTS]
if unknown:
print(f"Unknown experiments: {unknown}")
print(f"Available: {', '.join(EXPERIMENTS)}")
sys.exit(1)
print(f"\n{''*60}")
print(f" MiniMate Plus Protocol Minimization Experiments")
print(f" Target: {args.host}:{args.port}")
print(f" Running: {', '.join(which)}")
print(f"{''*60}")
results: list[Result] = []
for key in which:
tag, fn, desc = EXPERIMENTS[key]
label = f"{tag}: {desc}"
r = run(label, fn, args.host, args.port)
results.append(r)
time.sleep(1.5) # brief pause between experiments — let device settle
print(f"\n\n{''*60}")
print(" SUMMARY")
print(f"{''*60}")
for r in results:
sym = {"PASS": "", "FAIL": "", "INCONCLUSIVE": "⚠️ "}.get(r.outcome, "?")
print(f" {sym} {r.outcome:13s} {r.name}")
print(f"{''*60}")
passed = sum(1 for r in results if r.outcome == "PASS")
failed = sum(1 for r in results if r.outcome == "FAIL")
skipped = sum(1 for r in results if r.outcome == "INCONCLUSIVE")
print(f" {passed} passed {failed} failed {skipped} inconclusive")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nInterrupted.")
sys.exit(0)
+28 -27
View File
@@ -1755,17 +1755,20 @@ def _decode_monitor_status(data: bytes) -> MonitorStatus:
data is the raw S3 frame .data attribute (includes the 11-byte section data is the raw S3 frame .data attribute (includes the 11-byte section
header, so field offsets below are relative to data[11]). header, so field offsets below are relative to data[11]).
Monitoring flag (confirmed 4-8-26/2ndtry, full byte diff analysis): NOTE: frame.data has the checksum byte already stripped by S3FrameParser
section[6] == 0x00 idle (_finalise returns raw_payload[5:] where raw_payload = body[:-1]).
section[6] == 0x10 monitoring There is NO trailing checksum byte in section.
The payload size varies (5255+ bytes) but the battery/memory block is Monitoring flag (confirmed 4-8-26/2ndtry, byte diff of all 144 data frames):
always the last 10 bytes before the trailing checksum byte: section[1] == 0x00 idle
section[1] == 0x10 monitoring
section[-11:-9] battery × 100 uint16 BE (0x02A8 = 6.80 V) The payload length varies (4649 bytes) IDLE is 46-47, MONITORING is 48-49.
section[-9 :-5] memory_total uint32 BE bytes The battery/memory block is always the last 10 bytes of section (no checksum):
section[-5 :-1] memory_free uint32 BE bytes
section[-1] checksum (not data) section[-10:-8] battery × 100 uint16 BE (0x02A8 = 6.80 V)
section[-8 :-4] memory_total uint32 BE bytes
section[-4:] memory_free uint32 BE bytes
Values confirmed from 4-8-26/2ndtry capture (BE11529): Values confirmed from 4-8-26/2ndtry capture (BE11529):
battery 0x02A8 = 680 6.80 V battery 0x02A8 = 680 6.80 V
@@ -1780,32 +1783,30 @@ def _decode_monitor_status(data: bytes) -> MonitorStatus:
len(data), len(section), section.hex(), len(data), len(section), section.hex(),
) )
# Monitoring flag: section[6] (CORRECTED 2026-04-08 — was wrongly section[1]). # Monitoring flag: section[1] == 0x10.
# Byte diff of 2ndtry BW-S3 captures confirms section[6] flips 0x00↔0x10 # Confirmed from byte diff of all 144 0xE3 data frames in 4-8-26/2ndtry capture:
# exactly at the start/stop monitoring transitions (0xE3 frame #36 / #132). # section[1] = 0x00 in all IDLE frames, 0x10 in all MONITORING frames.
is_monitoring = len(section) > 6 and section[6] == 0x10 # (section[6] also changes but has non-binary values 0xea/0x07 — device-specific.)
is_monitoring = len(section) > 1 and section[1] == 0x10
battery_v = None battery_v = None
memory_total = None memory_total = None
memory_free = None memory_free = None
# Battery and memory offsets are RELATIVE TO THE END of the section. # Battery and memory at relative-from-end offsets.
# The payload length varies (5255+ bytes) depending on monitoring state and # Payload length varies (4649 bytes) but the battery/memory block is always
# internal counters, but the battery/memory block is always the last 10 bytes # the last 10 bytes. No checksum byte — it was stripped by S3FrameParser.
# before the checksum (section[-1]).
# #
# section[-11:-9] battery × 100 uint16 BE 0x02A8 = 6.80 V # section[-10:-8] battery × 100 uint16 BE 0x02A8 = 6.80 V
# section[-9 :-5] memory_total uint32 BE ≈ 960 KB on BE11529 # section[-8 :-4] memory_total uint32 BE ≈ 960 KB on BE11529
# section[-5 :-1] memory_free uint32 BE decreases as events fill # section[-4:] memory_free uint32 BE decreases as events fill
# section[-1] frame checksum (not data)
# #
# Confirmed stable across IDLE (52b), MONITORING (55b), and counter-jitter # Confirmed stable across IDLE (46b), MONITORING (48-49b) variants.
# IDLE variants (53b) from 4-8-26/2ndtry full capture analysis. if len(section) >= 10:
if len(section) >= 11: batt_raw = struct.unpack(">H", section[-10:-8])[0]
batt_raw = struct.unpack(">H", section[-11:-9])[0]
battery_v = batt_raw / 100.0 battery_v = batt_raw / 100.0
memory_total = struct.unpack(">I", section[-9:-5])[0] memory_total = struct.unpack(">I", section[-8:-4])[0]
memory_free = struct.unpack(">I", section[-5:-1])[0] memory_free = struct.unpack(">I", section[-4:])[0]
return MonitorStatus( return MonitorStatus(
is_monitoring=is_monitoring, is_monitoring=is_monitoring,
+36
View File
@@ -418,3 +418,39 @@ class TcpTransport(BaseTransport):
def __repr__(self) -> str: def __repr__(self) -> str:
state = "connected" if self.is_connected else "disconnected" state = "connected" if self.is_connected else "disconnected"
return f"TcpTransport({self.host!r}, port={self.port}, {state})" return f"TcpTransport({self.host!r}, port={self.port}, {state})"
# ── Inbound / accepted-socket transport ───────────────────────────────────────
class SocketTransport(TcpTransport):
"""
Like TcpTransport but wraps an already-accepted inbound socket.
Used by the ACH inbound server (bridges/ach_server.py) the device dials
IN to us, so by the time we create this transport the socket is already live.
connect() is a no-op; everything else (read, write, read_until_idle, ) is
inherited unchanged from TcpTransport.
Args:
sock: An already-connected socket.socket returned by server_socket.accept().
peer: Human-readable peer label for repr / logging (e.g. "203.0.113.5:54321").
"""
def __init__(self, sock: socket.socket, peer: str = "inbound") -> None:
# Bypass TcpTransport.__init__ — we already have a live socket.
self.host = peer
self.port = 0
self.connect_timeout = 0.0
self._sock = sock
sock.settimeout(self._RECV_TIMEOUT)
def connect(self) -> None:
"""No-op — socket was already accepted inbound."""
pass # Already have a live socket; nothing to open.
@property
def is_connected(self) -> bool:
return self._sock is not None
def __repr__(self) -> str:
return f"SocketTransport(peer={self.host!r})"
+404
View File
@@ -1071,6 +1071,398 @@ class AnalyzerPanel(tk.Frame):
# ───────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────────────────────────────────────
# Serial Watch panel — tap the RS-232 line between device and modem
# ─────────────────────────────────────────────────────────────────────────────
try:
import serial as _serial
from serial.tools import list_ports as _list_ports
_SERIAL_OK = True
except ImportError:
_SERIAL_OK = False
from minimateplus.framing import S3FrameParser as _S3FrameParser # noqa: E402
_SW_KNOWN_SUBS = {
0xA4: "POLL_RSP", 0xA5: "BULK_WAVEFORM_RSP", 0xE0: "ADV_EVENT_RSP",
0xE1: "EVT_IDX_FIRST_RSP", 0xE3: "MONITOR_STATUS_RSP", 0xEA: "SERIAL_NUM_RSP",
0xF3: "WAVEFORM_REC_RSP", 0xF5: "WAVEFORM_HDR_RSP", 0xF7: "EVENT_INDEX_RSP",
0xF9: "UNK_06_RSP", 0xFE: "DEVICE_INFO_RSP",
0x69: "START_MON_ACK", 0x68: "STOP_MON_ACK",
}
class SerialWatchPanel(tk.Frame):
"""
Tap the RS-232 line between the MiniMate Plus and its modem (RV50/RV55).
Runs the serial reader in a background thread; surfaces parsed S3 frames
live in the log view. Writes raw_s3_<ts>.bin compatible with Analyzer.
Typical use for call-home capture:
1. Connect a USB-to-serial tap to the RS-232 line.
2. Pick that COM port here, click Start.
3. Wait for the unit to trigger / call home.
4. Click Stop, then 'Open in Analyzer' to inspect the frames.
"""
_COL_FRAME = "#4ec9b0" # teal — parsed S3 frame
_COL_CTRL = "#dcdcaa" # yellow — control-line change
_COL_AT = "#9cdcfe" # blue — AT command / ASCII noise
_COL_ERR = "#f44747" # red — error
def __init__(self, parent: tk.Widget, on_capture_ready=None, **kw):
"""
on_capture_ready(raw_s3_path: str) called when capture stops,
so the parent can inject the file into the Analyzer.
"""
super().__init__(parent, bg=BG2, **kw)
self._on_capture_ready = on_capture_ready
self._serial: Optional[object] = None # serial.Serial instance
self._reader_thread: Optional[threading.Thread] = None
self._stop_evt = threading.Event()
self._log_q: queue.Queue[tuple[str, str]] = queue.Queue() # (text, colour)
self._raw_fh = None # open binary file handle
self._raw_path: Optional[str] = None
self._frame_count = 0
self._build()
self._poll_log_queue()
# ── build ─────────────────────────────────────────────────────────────
def _build(self) -> None:
pad = {"padx": 6, "pady": 4}
cfg = tk.Frame(self, bg=BG2)
cfg.pack(side=tk.TOP, fill=tk.X, padx=4, pady=4)
# Row 0 — port picker
tk.Label(cfg, text="COM port:", bg=BG2, fg=FG, font=MONO
).grid(row=0, column=0, sticky="e", **pad)
self._port_var = tk.StringVar()
self._port_cb = ttk.Combobox(cfg, textvariable=self._port_var,
width=12, font=MONO, state="normal")
self._port_cb.grid(row=0, column=1, sticky="w", **pad)
tk.Button(cfg, text="", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=self._refresh_ports
).grid(row=0, column=2, **pad)
tk.Label(cfg, text=" Baud:", bg=BG2, fg=FG, font=MONO
).grid(row=0, column=3, sticky="e", **pad)
self._baud_var = tk.StringVar(value="38400")
tk.Entry(cfg, textvariable=self._baud_var, width=8,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO
).grid(row=0, column=4, sticky="w", **pad)
self._ack_ok_var = tk.BooleanVar(value=False)
tk.Checkbutton(cfg, text="Ack OK to AT commands",
variable=self._ack_ok_var,
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO).grid(row=0, column=5, sticky="w", **pad)
# Row 1 — capture dir
tk.Label(cfg, text="Save to:", bg=BG2, fg=FG, font=MONO
).grid(row=1, column=0, sticky="e", **pad)
self._dir_var = tk.StringVar(
value=str(SCRIPT_DIR / "bridges" / "captures"))
tk.Entry(cfg, textvariable=self._dir_var, width=40,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO
).grid(row=1, column=1, columnspan=4, sticky="we", **pad)
tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat",
cursor="hand2", font=MONO, command=self._choose_dir
).grid(row=1, column=5, **pad)
# Button row
btn_row = tk.Frame(self, bg=BG2)
btn_row.pack(side=tk.TOP, fill=tk.X, padx=4, pady=2)
self._start_btn = tk.Button(
btn_row, text="Start Watch", bg=GREEN, fg="#000000",
relief="flat", padx=12, cursor="hand2", font=MONO_B,
command=self._start)
self._start_btn.pack(side=tk.LEFT, padx=6)
self._stop_btn = tk.Button(
btn_row, text="Stop", bg=BG3, fg=FG,
relief="flat", padx=12, cursor="hand2", font=MONO,
command=self._stop, state="disabled")
self._stop_btn.pack(side=tk.LEFT, padx=4)
self._analyzer_btn = tk.Button(
btn_row, text="Open in Analyzer", bg=BG3, fg=FG,
relief="flat", padx=10, cursor="hand2", font=MONO,
command=self._send_to_analyzer, state="disabled")
self._analyzer_btn.pack(side=tk.LEFT, padx=4)
tk.Button(btn_row, text="Clear", bg=BG3, fg=FG,
relief="flat", padx=8, cursor="hand2", font=MONO,
command=self._clear_log).pack(side=tk.LEFT, padx=4)
self._status_var = tk.StringVar(value="Idle")
tk.Label(btn_row, textvariable=self._status_var,
bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=10)
# Log view
self._log = scrolledtext.ScrolledText(
self, height=24, font=MONO_SM,
bg=BG, fg=FG, insertbackground=FG,
relief="flat", state="disabled",
)
self._log.pack(fill=tk.BOTH, expand=True, padx=4, pady=4)
self._log.tag_config("frame", foreground=self._COL_FRAME)
self._log.tag_config("ctrl", foreground=self._COL_CTRL)
self._log.tag_config("at", foreground=self._COL_AT)
self._log.tag_config("err", foreground=self._COL_ERR)
self._log.tag_config("dim", foreground=FG_DIM)
# Populate ports on first load
self._refresh_ports()
# ── port helpers ──────────────────────────────────────────────────────
def _refresh_ports(self) -> None:
if not _SERIAL_OK:
self._port_cb["values"] = ["(pyserial not installed)"]
return
ports = [p.device for p in _list_ports.comports()]
self._port_cb["values"] = ports
if ports and not self._port_var.get():
self._port_var.set(ports[0])
def _choose_dir(self) -> None:
d = filedialog.askdirectory(initialdir=self._dir_var.get())
if d:
self._dir_var.set(d)
# ── start / stop ──────────────────────────────────────────────────────
def _start(self) -> None:
if not _SERIAL_OK:
messagebox.showerror(
"pyserial missing",
"Install pyserial first:\n pip install pyserial")
return
port = self._port_var.get().strip()
if not port or "not installed" in port:
messagebox.showerror("Error", "Select a valid COM port first.")
return
try:
baud = int(self._baud_var.get().strip())
except ValueError:
messagebox.showerror("Error", "Invalid baud rate.")
return
# Open output files
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
out_dir = Path(self._dir_var.get()) / f"serial_{ts}"
out_dir.mkdir(parents=True, exist_ok=True)
self._raw_path = str(out_dir / f"raw_s3_{ts}.bin")
try:
self._raw_fh = open(self._raw_path, "wb")
except OSError as exc:
messagebox.showerror("Error", f"Cannot open capture file:\n{exc}")
return
# Open serial port
try:
ser = _serial.Serial(
port=port, baudrate=baud,
bytesize=8, parity=_serial.PARITY_NONE,
stopbits=_serial.STOPBITS_ONE,
timeout=0.05, write_timeout=0,
)
ser.setDTR(True)
ser.setRTS(True)
except Exception as exc:
self._raw_fh.close()
self._raw_fh = None
messagebox.showerror("Error", f"Cannot open {port}:\n{exc}")
return
self._serial = ser
self._stop_evt.clear()
self._frame_count = 0
self._analyzer_btn.configure(state="disabled")
self._reader_thread = threading.Thread(
target=self._reader_loop,
args=(ser, baud),
daemon=True,
)
self._reader_thread.start()
self._status_var.set(f"Watching {port} @ {baud}")
self._start_btn.configure(state="disabled")
self._stop_btn.configure(state="normal", bg=RED)
self._append(f"── Serial watch started {port} @ {baud} [{ts}] ──\n", "dim")
self._append(f" Capture: {self._raw_path}\n", "dim")
self._append(" Waiting for data…\n\n", "dim")
def _stop(self) -> None:
self._stop_evt.set()
if self._serial:
try:
self._serial.close()
except Exception:
pass
self._serial = None
if self._raw_fh:
self._raw_fh.close()
self._raw_fh = None
self._status_var.set("Stopped")
self._start_btn.configure(state="normal")
self._stop_btn.configure(state="disabled", bg=BG3)
if self._raw_path and Path(self._raw_path).exists():
self._analyzer_btn.configure(state="normal")
self._append("\n── Watch stopped ──\n", "dim")
# ── reader thread ─────────────────────────────────────────────────────
def _reader_loop(self, ser, baud: int) -> None:
parser = _S3FrameParser()
rx_buf = bytearray()
ack_ok = self._ack_ok_var.get()
# Monitor control lines in a sub-thread
ctrl_stop = threading.Event()
ctrl_thread = threading.Thread(
target=self._ctrl_loop, args=(ser, ctrl_stop), daemon=True)
ctrl_thread.start()
try:
while not self._stop_evt.is_set():
try:
data = ser.read(4096)
except Exception as exc:
self._log_q.put((f"Read error: {exc}\n", "err"))
break
if not data:
continue
# Save raw bytes
if self._raw_fh:
try:
self._raw_fh.write(data)
self._raw_fh.flush()
except Exception:
pass
# Parse S3 frames
for byte in data:
result = parser.feed(bytes([byte]))
if result:
frames = result if isinstance(result, list) else [result]
for f in frames:
self._frame_count += 1
name = _SW_KNOWN_SUBS.get(f.sub, f"UNK_0x{f.sub:02X}")
chk = "" if f.checksum_valid else "✗ BAD_CHK"
peek = f.data[:32].hex() + ("" if len(f.data) > 32 else "")
msg = (
f"[{self._frame_count:04d}] "
f"SUB=0x{f.sub:02X} ({name:<22}) "
f"page=0x{f.page_key:04X} "
f"data={len(f.data):4d}B {chk}\n"
f" {peek}\n"
)
self._log_q.put((msg, "frame"))
# AT command handling for --ack-ok mode
if ack_ok:
rx_buf.extend(data)
while b"\r" in rx_buf or b"\n" in rx_buf:
for sep in (b"\r", b"\n"):
idx = rx_buf.find(sep)
if idx != -1:
line_bytes = bytes(rx_buf[:idx])
del rx_buf[:idx + 1]
break
else:
break
line_str = line_bytes.decode("latin1", errors="ignore").strip()
if line_str.upper().startswith("AT"):
self._log_q.put((f"AT: {line_str!r}\n", "at"))
if not line_str.upper().startswith("ATDT"):
try:
ser.write(b"\r\nOK\r\n")
ser.flush()
self._log_q.put((f" → OK\n", "at"))
except Exception:
pass
finally:
ctrl_stop.set()
ctrl_thread.join(timeout=0.5)
# Signal the main thread that the reader ended naturally
if not self._stop_evt.is_set():
self._log_q.put(("<<done>>", ""))
def _ctrl_loop(self, ser, stop: threading.Event) -> None:
prev = {}
try:
prev = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd)
try:
prev["RI"] = ser.ri
except Exception:
prev["RI"] = None
except Exception:
return
while not stop.is_set():
try:
cur = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd, RI=None)
try:
cur["RI"] = ser.ri
except Exception:
pass
for name, val in cur.items():
if val != prev.get(name):
self._log_q.put((f"CTRL {name}{val}\n", "ctrl"))
prev[name] = val
except Exception:
break
stop.wait(0.2)
# ── log view ──────────────────────────────────────────────────────────
def _poll_log_queue(self) -> None:
try:
while True:
text, tag = self._log_q.get_nowait()
if text == "<<done>>":
self._stop()
break
self._append(text, tag)
except queue.Empty:
pass
finally:
self.after(80, self._poll_log_queue)
def _append(self, text: str, tag: str = "") -> None:
self._log.configure(state="normal")
if tag:
self._log.insert(tk.END, text, tag)
else:
self._log.insert(tk.END, text)
self._log.see(tk.END)
self._log.configure(state="disabled")
def _clear_log(self) -> None:
self._log.configure(state="normal")
self._log.delete("1.0", tk.END)
self._log.configure(state="disabled")
# ── send to analyzer ──────────────────────────────────────────────────
def _send_to_analyzer(self) -> None:
if self._raw_path and self._on_capture_ready:
self._on_capture_ready(self._raw_path)
# Console panel (tk.Frame — lives inside a notebook tab) # Console panel (tk.Frame — lives inside a notebook tab)
# ───────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────────────────
@@ -1504,6 +1896,12 @@ class SeismoLab(tk.Tk):
) )
nb.add(self._console_panel, text=" Console ") nb.add(self._console_panel, text=" Console ")
self._serial_watch_panel = SerialWatchPanel(
nb,
on_capture_ready=self._on_serial_capture_ready,
)
nb.add(self._serial_watch_panel, text=" Serial Watch ")
self._nb = nb self._nb = nb
self.protocol("WM_DELETE_WINDOW", self._on_close) self.protocol("WM_DELETE_WINDOW", self._on_close)
@@ -1522,8 +1920,14 @@ class SeismoLab(tk.Tk):
self._analyzer_panel.s3_var.set(raw_s3_path) self._analyzer_panel.s3_var.set(raw_s3_path)
self._nb.select(1) self._nb.select(1)
def _on_serial_capture_ready(self, raw_s3_path: str) -> None:
"""Serial Watch capture finished → inject into Analyzer and switch tab."""
self._analyzer_panel.s3_var.set(raw_s3_path)
self._nb.select(1)
def _on_close(self) -> None: def _on_close(self) -> None:
self._bridge_panel.stop_bridge() self._bridge_panel.stop_bridge()
self._serial_watch_panel._stop()
self.destroy() self.destroy()