Files
seismo-relay/bridges/serial_watch.py
T
claude 46a86939b7 feat: add ACH TCP bridge, serial tap tool, and Serial Watch tab
- bridges/ach_bridge.py: transparent TCP bridge that MITMs the MiniMate Plus
  call-home connection — forwards to real ACH server while logging all frames
  to raw_client/raw_server .bin files compatible with parse_capture.py;
  standalone capture mode for lab use without a real server

- bridges/serial_watch.py: RS-232 serial monitor with live S3 frame parsing;
  taps the line between MiniMate and modem (RV50/RV55); captures raw bytes,
  .log and .jsonl; --ack-ok mode auto-replies to AT commands; fixed fatal
  indentation bug in the original that silently prevented any data capture

- seismo_lab.py: new "Serial Watch" fourth tab (SerialWatchPanel) wrapping
  serial_watch.py functionality; COM port picker with refresh, baud config,
  ack-ok toggle, colour-coded live frame log (teal frames / yellow ctrl /
  blue AT), raw .bin capture auto-fed into Analyzer tab on stop

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-09 12:10:52 -04:00

436 lines
16 KiB
Python

#!/usr/bin/env python3
"""
serial_watch.py — Instantel Series-3 serial monitor with S3 frame parsing.
Taps the RS-232 line between the MiniMate Plus and its modem (RV50/RV55).
Saves raw binary captures compatible with the rest of the analysis toolchain,
plus a human-readable frame log.
Usage
-----
python bridges/serial_watch.py # interactive COM picker
python bridges/serial_watch.py --port COM3 # specify port
python bridges/serial_watch.py --port COM3 --ack-ok # reply OK to AT commands
# (useful if modem is absent
# and you want the device to
# proceed past AT negotiation)
python bridges/serial_watch.py --list # list available ports
Output
------
bridges/captures/serial_<ISO-timestamp>/
raw_s3_<ts>.bin — raw bytes from device (feeds directly into S3FrameParser)
session_<ts>.log — human-readable frame + control-line log
session_<ts>.jsonl — JSON-lines frame log
The raw_s3_*.bin file is byte-for-byte compatible with the existing capture
format used by bridges/parse_capture.py and all analysis scripts.
What to look for in a call-home capture
----------------------------------------
1. Does the device talk first after CONNECT, or does it wait?
- If raw_s3_*.bin has bytes before any AT/POLL exchange → PUSH protocol
- If it stays silent → PULL protocol (same as Blastware manual download)
2. Look for "Operating System" ASCII at the start — the device sends this 16-byte
boot string on cold start before entering DLE-framed mode.
3. RING/CONNECT from the modem appear as ASCII before the DLE frames — the parser
handles these automatically (scans forward to DLE+STX).
"""
from __future__ import annotations
import argparse
import sys
import threading
import time
from datetime import datetime
from pathlib import Path
try:
import serial
from serial.tools import list_ports
except ModuleNotFoundError:
print(
"pyserial not found. Install with:\n python -m pip install pyserial",
file=sys.stderr,
)
sys.exit(1)
# Add project root so we can import the frame parser
sys.path.insert(0, str(Path(__file__).parent.parent))
from minimateplus.framing import S3FrameParser, S3Frame
import json
# ── Helpers ───────────────────────────────────────────────────────────────────
def _ts() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
def _hexdump(b: bytes) -> str:
return " ".join(f"{x:02X}" for x in b)
def _printable(b: bytes) -> str:
return b.decode("latin1", errors="replace")
_KNOWN_SUBS = {
0xA4: "POLL_RSP", 0xA5: "BULK_WAVEFORM_RSP", 0xE0: "ADVANCE_EVENT_RSP",
0xE1: "EVENT_IDX_FIRST_RSP", 0xE3: "MONITOR_STATUS_RSP", 0xEA: "SERIAL_NUM_RSP",
0xF3: "WAVEFORM_RECORD_RSP", 0xF5: "WAVEFORM_HEADER_RSP", 0xF7: "EVENT_INDEX_RSP",
0xF9: "UNK_06_RSP", 0xFE: "DEVICE_INFO_RSP",
0x69: "START_MONITOR_ACK", 0x68: "STOP_MONITOR_ACK",
0x97: "EVT_IDX_WRITE_ACK", 0x8C: "CONFIRM_B_ACK", 0x8E: "COMPLIANCE_WRITE_ACK",
0x8D: "CONFIRM_A_ACK", 0x7D: "TRIGGER_WRITE_ACK", 0x7C: "TRIGGER_CONFIRM_ACK",
0x96: "WAVEFORM_WRITE_ACK", 0x8B: "CONFIRM_C_ACK",
}
def _label_frame(frame: S3Frame) -> str:
name = _KNOWN_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}")
chk = "" if frame.checksum_valid else "✗ BAD_CHK"
peek = frame.data[:24].hex() + ("" if len(frame.data) > 24 else "")
return (
f"S3 SUB=0x{frame.sub:02X} ({name:<22}) "
f"page=0x{frame.page_key:04X} data={len(frame.data):4d}B {chk} {peek}"
)
# ── Logger ────────────────────────────────────────────────────────────────────
class Logger:
def __init__(self, log_path: Path, jsonl_path: Path, raw_path: Path) -> None:
self._log = log_path.open("a", encoding="utf-8", newline="")
self._jl = jsonl_path.open("a", encoding="utf-8", newline="")
self._raw = raw_path.open("ab")
self._lock = threading.Lock()
self._frame_count = 0
def info(self, msg: str) -> None:
line = f"[{_ts()}] INFO | {msg}"
with self._lock:
print(line)
print(line, file=self._log, flush=True)
def ctrl(self, msg: str) -> None:
line = f"[{_ts()}] CTRL | {msg}"
with self._lock:
print(line)
print(line, file=self._log, flush=True)
def data_hex(self, msg: str) -> None:
line = f"[{_ts()}] HEX | {msg}"
with self._lock:
print(line)
print(line, file=self._log, flush=True)
def data_ascii(self, msg: str) -> None:
line = f"[{_ts()}] DATA | {msg}"
with self._lock:
print(line)
print(line, file=self._log, flush=True)
def frame(self, f: S3Frame) -> None:
with self._lock:
self._frame_count += 1
label = f"[{_ts()}] FRAME | #{self._frame_count:04d} {_label_frame(f)}"
print(label)
print(label, file=self._log, flush=True)
record = {
"frame": self._frame_count,
"sub": f.sub,
"page_key": f.page_key,
"data_len": len(f.data),
"data_hex": f.data.hex(),
"checksum_valid": f.checksum_valid,
}
print(json.dumps(record), file=self._jl, flush=True)
def write_raw(self, data: bytes) -> None:
with self._lock:
self._raw.write(data)
self._raw.flush()
def close(self) -> None:
with self._lock:
for fh in (self._log, self._jl, self._raw):
try:
fh.flush()
fh.close()
except Exception:
pass
# ── Control-line monitor thread ───────────────────────────────────────────────
def _monitor_control_lines(
ser: serial.Serial,
logger: Logger,
stop: threading.Event,
interval: float,
) -> None:
prev = dict(CTS=None, DSR=None, DCD=None, RI=None)
try:
prev.update(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd)
try:
prev["RI"] = ser.ri
except Exception:
pass
except Exception as exc:
logger.ctrl(f"Init error: {exc}")
return
logger.ctrl(
f"Initial: CTS={prev['CTS']} DSR={prev['DSR']} DCD={prev['DCD']} RI={prev['RI']}"
)
while not stop.is_set():
try:
cur = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd, RI=None)
try:
cur["RI"] = ser.ri
except Exception:
pass
for name, val in cur.items():
if val != prev[name]:
logger.ctrl(f"{name}{val}")
prev[name] = val
except serial.SerialException as exc:
logger.ctrl(f"Poll error: {exc}")
break
stop.wait(interval)
# ── Serial open ───────────────────────────────────────────────────────────────
_PARITY = {
"N": serial.PARITY_NONE, "E": serial.PARITY_EVEN, "O": serial.PARITY_ODD,
"M": serial.PARITY_MARK, "S": serial.PARITY_SPACE,
}
_STOPBITS = {
1: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, 2: serial.STOPBITS_TWO,
}
def _open_serial(args: argparse.Namespace, logger: Logger) -> serial.Serial | None:
for attempt in range(1, args.open_retries + 2):
logger.info(
f"Opening {args.port} @ {args.baud},{args.bytesize}{args.parity}{args.stopbits} "
f"rtscts={args.rtscts} xonxoff={args.xonxoff} dsrdtr={args.dsrdtr} "
f"(attempt {attempt})"
)
try:
ser = serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=args.bytesize,
parity=_PARITY[args.parity],
stopbits=_STOPBITS[args.stopbits],
timeout=args.timeout,
xonxoff=args.xonxoff,
rtscts=args.rtscts,
dsrdtr=args.dsrdtr,
write_timeout=0,
)
try:
ser.setDTR(args.dtr == "on")
ser.setRTS(args.rts == "on")
logger.ctrl(f"Set DTR={args.dtr} RTS={args.rts}")
except Exception as exc:
logger.ctrl(f"DTR/RTS set failed: {exc}")
if args.send_break > 0:
try:
ser.break_condition = True
time.sleep(args.send_break / 1000.0)
ser.break_condition = False
logger.ctrl(f"BREAK held {args.send_break} ms")
except Exception as exc:
logger.ctrl(f"BREAK failed: {exc}")
return ser
except serial.SerialException as exc:
logger.info(f"Open failed: {exc}")
if attempt <= args.open_retries:
time.sleep(args.open_retry_delay)
return None
# ── Port picker ───────────────────────────────────────────────────────────────
def _list_ports() -> list:
ports = list(list_ports.comports())
if not ports:
print("No serial ports found.")
return []
print("Available serial ports:")
for i, p in enumerate(ports, 1):
print(f" {i:2d}) {p.device:<12} {p.description or ''}")
return ports
def _pick_port() -> str:
ports = _list_ports()
if not ports:
sys.exit(1)
if len(ports) == 1:
print(f"Auto-selecting: {ports[0].device}")
return ports[0].device
while True:
sel = input("Select port (number or name, e.g. COM3): ").strip()
if sel.isdigit() and 1 <= int(sel) <= len(ports):
return ports[int(sel) - 1].device
for p in ports:
if p.device.upper() == sel.upper():
return p.device
print("Not recognised. Enter list number or exact port name.")
# ── Main loop ─────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(
description="Monitor Instantel Series-3 serial traffic with S3 frame parsing."
)
ap.add_argument("--port", "-p",
help="COM port (e.g. COM3). Omit to be prompted.")
ap.add_argument("--baud", "-b", type=int, default=38400)
ap.add_argument("--bytesize", type=int, choices=[5, 6, 7, 8], default=8)
ap.add_argument("--parity", choices=["N", "E", "O", "M", "S"], default="N")
ap.add_argument("--stopbits", type=float, choices=[1, 1.5, 2], default=1)
ap.add_argument("--rtscts", action="store_true")
ap.add_argument("--xonxoff", action="store_true")
ap.add_argument("--dsrdtr", action="store_true")
ap.add_argument("--dtr", choices=["on", "off"], default="on")
ap.add_argument("--rts", choices=["on", "off"], default="on")
ap.add_argument("--send-break", type=int, default=0,
help="Hold BREAK for N ms after open.")
ap.add_argument("--show", choices=["ascii", "hex", "both", "frames"],
default="frames",
help="'frames' (default) shows only parsed S3 frames. "
"'ascii'/'hex'/'both' also show raw bytes.")
ap.add_argument("--encoding", default="latin1")
ap.add_argument("--read-chunk", type=int, default=4096)
ap.add_argument("--timeout", type=float, default=0.05)
ap.add_argument("--poll-lines-interval", type=float, default=0.2)
ap.add_argument("--open-retries", type=int, default=0)
ap.add_argument("--open-retry-delay", type=float, default=0.8)
ap.add_argument("--ack-ok", action="store_true",
help="Auto-reply OK to AT* commands (except ATDT). "
"Useful for testing without a real modem.")
ap.add_argument("--list", action="store_true",
help="List available serial ports and exit.")
args = ap.parse_args()
if args.list:
_list_ports()
return
args.port = args.port or _pick_port()
# Build output paths
ts_str = datetime.now().strftime("%Y%m%d_%H%M%S")
out_dir = Path(__file__).parent / "captures" / f"serial_{ts_str}"
out_dir.mkdir(parents=True, exist_ok=True)
log_path = out_dir / f"session_{ts_str}.log"
jsonl_path = out_dir / f"session_{ts_str}.jsonl"
raw_path = out_dir / f"raw_s3_{ts_str}.bin"
logger = Logger(log_path, jsonl_path, raw_path)
logger.info(f"Output directory: {out_dir}")
logger.info(f"raw_s3 → {raw_path.name} (compatible with parse_capture.py)")
ser = _open_serial(args, logger)
if ser is None:
logger.info("Could not open serial port. Exiting.")
logger.close()
sys.exit(1)
s3_parser = S3FrameParser()
rx_buf = bytearray()
stop_evt = threading.Event()
ctrl_thread = threading.Thread(
target=_monitor_control_lines,
args=(ser, logger, stop_evt, args.poll_lines_interval),
daemon=True,
)
ctrl_thread.start()
logger.info("Monitoring started. Waiting for call-home. Press Ctrl+C to stop.")
try:
while True:
try:
data = ser.read(args.read_chunk)
except serial.SerialException as exc:
logger.info(f"Read error: {exc}")
break
if not data:
continue
# 1. Save raw bytes
logger.write_raw(data)
# 2. Optional raw display
if args.show in ("ascii", "both"):
txt = _printable(data)
for line in txt.splitlines():
logger.data_ascii(line)
if args.show in ("hex", "both"):
logger.data_hex(_hexdump(data))
# 3. Parse S3 frames
for byte in data:
result = s3_parser.feed(bytes([byte]))
if result:
frames = result if isinstance(result, list) else [result]
for f in frames:
logger.frame(f)
# 4. AT command handling for --ack-ok
if args.ack_ok:
rx_buf.extend(data)
while b"\r" in rx_buf or b"\n" in rx_buf:
for sep in (b"\r", b"\n"):
idx = rx_buf.find(sep)
if idx != -1:
line_bytes = bytes(rx_buf[:idx])
del rx_buf[:idx + 1]
break
else:
break
line_str = line_bytes.decode("latin1", errors="ignore").strip().upper()
if line_str.startswith("AT") and not line_str.startswith("ATDT"):
try:
ser.write(b"\r\nOK\r\n")
ser.flush()
logger.info(f"AT ack: {line_str!r} → OK")
except Exception as exc:
logger.info(f"AT ack write failed: {exc}")
except KeyboardInterrupt:
logger.info("Ctrl+C — stopping.")
finally:
stop_evt.set()
try:
ser.close()
except Exception:
pass
ctrl_thread.join(timeout=1.0)
logger.info(f"Capture saved to: {out_dir}")
logger.close()
if __name__ == "__main__":
main()