46a86939b7
- bridges/ach_bridge.py: transparent TCP bridge that MITMs the MiniMate Plus call-home connection — forwards to real ACH server while logging all frames to raw_client/raw_server .bin files compatible with parse_capture.py; standalone capture mode for lab use without a real server - bridges/serial_watch.py: RS-232 serial monitor with live S3 frame parsing; taps the line between MiniMate and modem (RV50/RV55); captures raw bytes, .log and .jsonl; --ack-ok mode auto-replies to AT commands; fixed fatal indentation bug in the original that silently prevented any data capture - seismo_lab.py: new "Serial Watch" fourth tab (SerialWatchPanel) wrapping serial_watch.py functionality; COM port picker with refresh, baud config, ack-ok toggle, colour-coded live frame log (teal frames / yellow ctrl / blue AT), raw .bin capture auto-fed into Analyzer tab on stop Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
436 lines
16 KiB
Python
436 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
serial_watch.py — Instantel Series-3 serial monitor with S3 frame parsing.
|
|
|
|
Taps the RS-232 line between the MiniMate Plus and its modem (RV50/RV55).
|
|
Saves raw binary captures compatible with the rest of the analysis toolchain,
|
|
plus a human-readable frame log.
|
|
|
|
Usage
|
|
-----
|
|
python bridges/serial_watch.py # interactive COM picker
|
|
python bridges/serial_watch.py --port COM3 # specify port
|
|
python bridges/serial_watch.py --port COM3 --ack-ok # reply OK to AT commands
|
|
# (useful if modem is absent
|
|
# and you want the device to
|
|
# proceed past AT negotiation)
|
|
python bridges/serial_watch.py --list # list available ports
|
|
|
|
Output
|
|
------
|
|
bridges/captures/serial_<ISO-timestamp>/
|
|
raw_s3_<ts>.bin — raw bytes from device (feeds directly into S3FrameParser)
|
|
session_<ts>.log — human-readable frame + control-line log
|
|
session_<ts>.jsonl — JSON-lines frame log
|
|
|
|
The raw_s3_*.bin file is byte-for-byte compatible with the existing capture
|
|
format used by bridges/parse_capture.py and all analysis scripts.
|
|
|
|
What to look for in a call-home capture
|
|
----------------------------------------
|
|
1. Does the device talk first after CONNECT, or does it wait?
|
|
- If raw_s3_*.bin has bytes before any AT/POLL exchange → PUSH protocol
|
|
- If it stays silent → PULL protocol (same as Blastware manual download)
|
|
|
|
2. Look for "Operating System" ASCII at the start — the device sends this 16-byte
|
|
boot string on cold start before entering DLE-framed mode.
|
|
|
|
3. RING/CONNECT from the modem appear as ASCII before the DLE frames — the parser
|
|
handles these automatically (scans forward to DLE+STX).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import sys
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import serial
|
|
from serial.tools import list_ports
|
|
except ModuleNotFoundError:
|
|
print(
|
|
"pyserial not found. Install with:\n python -m pip install pyserial",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
# Add project root so we can import the frame parser
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
from minimateplus.framing import S3FrameParser, S3Frame
|
|
|
|
import json
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _ts() -> str:
|
|
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
|
|
|
|
|
def _hexdump(b: bytes) -> str:
|
|
return " ".join(f"{x:02X}" for x in b)
|
|
|
|
|
|
def _printable(b: bytes) -> str:
|
|
return b.decode("latin1", errors="replace")
|
|
|
|
|
|
_KNOWN_SUBS = {
|
|
0xA4: "POLL_RSP", 0xA5: "BULK_WAVEFORM_RSP", 0xE0: "ADVANCE_EVENT_RSP",
|
|
0xE1: "EVENT_IDX_FIRST_RSP", 0xE3: "MONITOR_STATUS_RSP", 0xEA: "SERIAL_NUM_RSP",
|
|
0xF3: "WAVEFORM_RECORD_RSP", 0xF5: "WAVEFORM_HEADER_RSP", 0xF7: "EVENT_INDEX_RSP",
|
|
0xF9: "UNK_06_RSP", 0xFE: "DEVICE_INFO_RSP",
|
|
0x69: "START_MONITOR_ACK", 0x68: "STOP_MONITOR_ACK",
|
|
0x97: "EVT_IDX_WRITE_ACK", 0x8C: "CONFIRM_B_ACK", 0x8E: "COMPLIANCE_WRITE_ACK",
|
|
0x8D: "CONFIRM_A_ACK", 0x7D: "TRIGGER_WRITE_ACK", 0x7C: "TRIGGER_CONFIRM_ACK",
|
|
0x96: "WAVEFORM_WRITE_ACK", 0x8B: "CONFIRM_C_ACK",
|
|
}
|
|
|
|
|
|
def _label_frame(frame: S3Frame) -> str:
|
|
name = _KNOWN_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}")
|
|
chk = "✓" if frame.checksum_valid else "✗ BAD_CHK"
|
|
peek = frame.data[:24].hex() + ("…" if len(frame.data) > 24 else "")
|
|
return (
|
|
f"S3 SUB=0x{frame.sub:02X} ({name:<22}) "
|
|
f"page=0x{frame.page_key:04X} data={len(frame.data):4d}B {chk} {peek}"
|
|
)
|
|
|
|
|
|
# ── Logger ────────────────────────────────────────────────────────────────────
|
|
|
|
class Logger:
|
|
def __init__(self, log_path: Path, jsonl_path: Path, raw_path: Path) -> None:
|
|
self._log = log_path.open("a", encoding="utf-8", newline="")
|
|
self._jl = jsonl_path.open("a", encoding="utf-8", newline="")
|
|
self._raw = raw_path.open("ab")
|
|
self._lock = threading.Lock()
|
|
self._frame_count = 0
|
|
|
|
def info(self, msg: str) -> None:
|
|
line = f"[{_ts()}] INFO | {msg}"
|
|
with self._lock:
|
|
print(line)
|
|
print(line, file=self._log, flush=True)
|
|
|
|
def ctrl(self, msg: str) -> None:
|
|
line = f"[{_ts()}] CTRL | {msg}"
|
|
with self._lock:
|
|
print(line)
|
|
print(line, file=self._log, flush=True)
|
|
|
|
def data_hex(self, msg: str) -> None:
|
|
line = f"[{_ts()}] HEX | {msg}"
|
|
with self._lock:
|
|
print(line)
|
|
print(line, file=self._log, flush=True)
|
|
|
|
def data_ascii(self, msg: str) -> None:
|
|
line = f"[{_ts()}] DATA | {msg}"
|
|
with self._lock:
|
|
print(line)
|
|
print(line, file=self._log, flush=True)
|
|
|
|
def frame(self, f: S3Frame) -> None:
|
|
with self._lock:
|
|
self._frame_count += 1
|
|
label = f"[{_ts()}] FRAME | #{self._frame_count:04d} {_label_frame(f)}"
|
|
print(label)
|
|
print(label, file=self._log, flush=True)
|
|
record = {
|
|
"frame": self._frame_count,
|
|
"sub": f.sub,
|
|
"page_key": f.page_key,
|
|
"data_len": len(f.data),
|
|
"data_hex": f.data.hex(),
|
|
"checksum_valid": f.checksum_valid,
|
|
}
|
|
print(json.dumps(record), file=self._jl, flush=True)
|
|
|
|
def write_raw(self, data: bytes) -> None:
|
|
with self._lock:
|
|
self._raw.write(data)
|
|
self._raw.flush()
|
|
|
|
def close(self) -> None:
|
|
with self._lock:
|
|
for fh in (self._log, self._jl, self._raw):
|
|
try:
|
|
fh.flush()
|
|
fh.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ── Control-line monitor thread ───────────────────────────────────────────────
|
|
|
|
def _monitor_control_lines(
|
|
ser: serial.Serial,
|
|
logger: Logger,
|
|
stop: threading.Event,
|
|
interval: float,
|
|
) -> None:
|
|
prev = dict(CTS=None, DSR=None, DCD=None, RI=None)
|
|
try:
|
|
prev.update(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd)
|
|
try:
|
|
prev["RI"] = ser.ri
|
|
except Exception:
|
|
pass
|
|
except Exception as exc:
|
|
logger.ctrl(f"Init error: {exc}")
|
|
return
|
|
|
|
logger.ctrl(
|
|
f"Initial: CTS={prev['CTS']} DSR={prev['DSR']} DCD={prev['DCD']} RI={prev['RI']}"
|
|
)
|
|
while not stop.is_set():
|
|
try:
|
|
cur = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd, RI=None)
|
|
try:
|
|
cur["RI"] = ser.ri
|
|
except Exception:
|
|
pass
|
|
for name, val in cur.items():
|
|
if val != prev[name]:
|
|
logger.ctrl(f"{name} → {val}")
|
|
prev[name] = val
|
|
except serial.SerialException as exc:
|
|
logger.ctrl(f"Poll error: {exc}")
|
|
break
|
|
stop.wait(interval)
|
|
|
|
|
|
# ── Serial open ───────────────────────────────────────────────────────────────
|
|
|
|
_PARITY = {
|
|
"N": serial.PARITY_NONE, "E": serial.PARITY_EVEN, "O": serial.PARITY_ODD,
|
|
"M": serial.PARITY_MARK, "S": serial.PARITY_SPACE,
|
|
}
|
|
_STOPBITS = {
|
|
1: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, 2: serial.STOPBITS_TWO,
|
|
}
|
|
|
|
|
|
def _open_serial(args: argparse.Namespace, logger: Logger) -> serial.Serial | None:
|
|
for attempt in range(1, args.open_retries + 2):
|
|
logger.info(
|
|
f"Opening {args.port} @ {args.baud},{args.bytesize}{args.parity}{args.stopbits} "
|
|
f"rtscts={args.rtscts} xonxoff={args.xonxoff} dsrdtr={args.dsrdtr} "
|
|
f"(attempt {attempt})"
|
|
)
|
|
try:
|
|
ser = serial.Serial(
|
|
port=args.port,
|
|
baudrate=args.baud,
|
|
bytesize=args.bytesize,
|
|
parity=_PARITY[args.parity],
|
|
stopbits=_STOPBITS[args.stopbits],
|
|
timeout=args.timeout,
|
|
xonxoff=args.xonxoff,
|
|
rtscts=args.rtscts,
|
|
dsrdtr=args.dsrdtr,
|
|
write_timeout=0,
|
|
)
|
|
try:
|
|
ser.setDTR(args.dtr == "on")
|
|
ser.setRTS(args.rts == "on")
|
|
logger.ctrl(f"Set DTR={args.dtr} RTS={args.rts}")
|
|
except Exception as exc:
|
|
logger.ctrl(f"DTR/RTS set failed: {exc}")
|
|
|
|
if args.send_break > 0:
|
|
try:
|
|
ser.break_condition = True
|
|
time.sleep(args.send_break / 1000.0)
|
|
ser.break_condition = False
|
|
logger.ctrl(f"BREAK held {args.send_break} ms")
|
|
except Exception as exc:
|
|
logger.ctrl(f"BREAK failed: {exc}")
|
|
|
|
return ser
|
|
|
|
except serial.SerialException as exc:
|
|
logger.info(f"Open failed: {exc}")
|
|
if attempt <= args.open_retries:
|
|
time.sleep(args.open_retry_delay)
|
|
|
|
return None
|
|
|
|
|
|
# ── Port picker ───────────────────────────────────────────────────────────────
|
|
|
|
def _list_ports() -> list:
|
|
ports = list(list_ports.comports())
|
|
if not ports:
|
|
print("No serial ports found.")
|
|
return []
|
|
print("Available serial ports:")
|
|
for i, p in enumerate(ports, 1):
|
|
print(f" {i:2d}) {p.device:<12} {p.description or ''}")
|
|
return ports
|
|
|
|
|
|
def _pick_port() -> str:
|
|
ports = _list_ports()
|
|
if not ports:
|
|
sys.exit(1)
|
|
if len(ports) == 1:
|
|
print(f"Auto-selecting: {ports[0].device}")
|
|
return ports[0].device
|
|
while True:
|
|
sel = input("Select port (number or name, e.g. COM3): ").strip()
|
|
if sel.isdigit() and 1 <= int(sel) <= len(ports):
|
|
return ports[int(sel) - 1].device
|
|
for p in ports:
|
|
if p.device.upper() == sel.upper():
|
|
return p.device
|
|
print("Not recognised. Enter list number or exact port name.")
|
|
|
|
|
|
# ── Main loop ─────────────────────────────────────────────────────────────────
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(
|
|
description="Monitor Instantel Series-3 serial traffic with S3 frame parsing."
|
|
)
|
|
ap.add_argument("--port", "-p",
|
|
help="COM port (e.g. COM3). Omit to be prompted.")
|
|
ap.add_argument("--baud", "-b", type=int, default=38400)
|
|
ap.add_argument("--bytesize", type=int, choices=[5, 6, 7, 8], default=8)
|
|
ap.add_argument("--parity", choices=["N", "E", "O", "M", "S"], default="N")
|
|
ap.add_argument("--stopbits", type=float, choices=[1, 1.5, 2], default=1)
|
|
ap.add_argument("--rtscts", action="store_true")
|
|
ap.add_argument("--xonxoff", action="store_true")
|
|
ap.add_argument("--dsrdtr", action="store_true")
|
|
ap.add_argument("--dtr", choices=["on", "off"], default="on")
|
|
ap.add_argument("--rts", choices=["on", "off"], default="on")
|
|
ap.add_argument("--send-break", type=int, default=0,
|
|
help="Hold BREAK for N ms after open.")
|
|
ap.add_argument("--show", choices=["ascii", "hex", "both", "frames"],
|
|
default="frames",
|
|
help="'frames' (default) shows only parsed S3 frames. "
|
|
"'ascii'/'hex'/'both' also show raw bytes.")
|
|
ap.add_argument("--encoding", default="latin1")
|
|
ap.add_argument("--read-chunk", type=int, default=4096)
|
|
ap.add_argument("--timeout", type=float, default=0.05)
|
|
ap.add_argument("--poll-lines-interval", type=float, default=0.2)
|
|
ap.add_argument("--open-retries", type=int, default=0)
|
|
ap.add_argument("--open-retry-delay", type=float, default=0.8)
|
|
ap.add_argument("--ack-ok", action="store_true",
|
|
help="Auto-reply OK to AT* commands (except ATDT). "
|
|
"Useful for testing without a real modem.")
|
|
ap.add_argument("--list", action="store_true",
|
|
help="List available serial ports and exit.")
|
|
args = ap.parse_args()
|
|
|
|
if args.list:
|
|
_list_ports()
|
|
return
|
|
|
|
args.port = args.port or _pick_port()
|
|
|
|
# Build output paths
|
|
ts_str = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
out_dir = Path(__file__).parent / "captures" / f"serial_{ts_str}"
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
log_path = out_dir / f"session_{ts_str}.log"
|
|
jsonl_path = out_dir / f"session_{ts_str}.jsonl"
|
|
raw_path = out_dir / f"raw_s3_{ts_str}.bin"
|
|
|
|
logger = Logger(log_path, jsonl_path, raw_path)
|
|
logger.info(f"Output directory: {out_dir}")
|
|
logger.info(f"raw_s3 → {raw_path.name} (compatible with parse_capture.py)")
|
|
|
|
ser = _open_serial(args, logger)
|
|
if ser is None:
|
|
logger.info("Could not open serial port. Exiting.")
|
|
logger.close()
|
|
sys.exit(1)
|
|
|
|
s3_parser = S3FrameParser()
|
|
rx_buf = bytearray()
|
|
stop_evt = threading.Event()
|
|
|
|
ctrl_thread = threading.Thread(
|
|
target=_monitor_control_lines,
|
|
args=(ser, logger, stop_evt, args.poll_lines_interval),
|
|
daemon=True,
|
|
)
|
|
ctrl_thread.start()
|
|
logger.info("Monitoring started. Waiting for call-home. Press Ctrl+C to stop.")
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
data = ser.read(args.read_chunk)
|
|
except serial.SerialException as exc:
|
|
logger.info(f"Read error: {exc}")
|
|
break
|
|
|
|
if not data:
|
|
continue
|
|
|
|
# 1. Save raw bytes
|
|
logger.write_raw(data)
|
|
|
|
# 2. Optional raw display
|
|
if args.show in ("ascii", "both"):
|
|
txt = _printable(data)
|
|
for line in txt.splitlines():
|
|
logger.data_ascii(line)
|
|
if args.show in ("hex", "both"):
|
|
logger.data_hex(_hexdump(data))
|
|
|
|
# 3. Parse S3 frames
|
|
for byte in data:
|
|
result = s3_parser.feed(bytes([byte]))
|
|
if result:
|
|
frames = result if isinstance(result, list) else [result]
|
|
for f in frames:
|
|
logger.frame(f)
|
|
|
|
# 4. AT command handling for --ack-ok
|
|
if args.ack_ok:
|
|
rx_buf.extend(data)
|
|
while b"\r" in rx_buf or b"\n" in rx_buf:
|
|
for sep in (b"\r", b"\n"):
|
|
idx = rx_buf.find(sep)
|
|
if idx != -1:
|
|
line_bytes = bytes(rx_buf[:idx])
|
|
del rx_buf[:idx + 1]
|
|
break
|
|
else:
|
|
break
|
|
|
|
line_str = line_bytes.decode("latin1", errors="ignore").strip().upper()
|
|
if line_str.startswith("AT") and not line_str.startswith("ATDT"):
|
|
try:
|
|
ser.write(b"\r\nOK\r\n")
|
|
ser.flush()
|
|
logger.info(f"AT ack: {line_str!r} → OK")
|
|
except Exception as exc:
|
|
logger.info(f"AT ack write failed: {exc}")
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Ctrl+C — stopping.")
|
|
|
|
finally:
|
|
stop_evt.set()
|
|
try:
|
|
ser.close()
|
|
except Exception:
|
|
pass
|
|
ctrl_thread.join(timeout=1.0)
|
|
logger.info(f"Capture saved to: {out_dir}")
|
|
logger.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|