From 46a86939b7d3c8b0be961b516ff45e687a4a0e44 Mon Sep 17 00:00:00 2001 From: Brian Harrison Date: Thu, 9 Apr 2026 12:10:52 -0400 Subject: [PATCH] feat: add ACH TCP bridge, serial tap tool, and Serial Watch tab MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - bridges/ach_bridge.py: transparent TCP bridge that MITMs the MiniMate Plus call-home connection — forwards to real ACH server while logging all frames to raw_client/raw_server .bin files compatible with parse_capture.py; standalone capture mode for lab use without a real server - bridges/serial_watch.py: RS-232 serial monitor with live S3 frame parsing; taps the line between MiniMate and modem (RV50/RV55); captures raw bytes, .log and .jsonl; --ack-ok mode auto-replies to AT commands; fixed fatal indentation bug in the original that silently prevented any data capture - seismo_lab.py: new "Serial Watch" fourth tab (SerialWatchPanel) wrapping serial_watch.py functionality; COM port picker with refresh, baud config, ack-ok toggle, colour-coded live frame log (teal frames / yellow ctrl / blue AT), raw .bin capture auto-fed into Analyzer tab on stop Co-Authored-By: Claude Sonnet 4.6 --- bridges/ach_bridge.py | 401 ++++++++++++++++++++++++++++++++++++ bridges/serial_watch.py | 435 ++++++++++++++++++++++++++++++++++++++++ seismo_lab.py | 404 +++++++++++++++++++++++++++++++++++++ 3 files changed, 1240 insertions(+) create mode 100644 bridges/ach_bridge.py create mode 100644 bridges/serial_watch.py diff --git a/bridges/ach_bridge.py b/bridges/ach_bridge.py new file mode 100644 index 0000000..1028f99 --- /dev/null +++ b/bridges/ach_bridge.py @@ -0,0 +1,401 @@ +#!/usr/bin/env python3 +""" +ach_bridge.py — Transparent TCP bridge for capturing Instantel MiniMate Plus + call-home (ACH) traffic. + +Usage +----- + # Bridge mode: forward to real ACH server while logging + python bridges/ach_bridge.py --upstream HOST:PORT [--port 9034] + + # Standalone capture mode: accept connection, don't forward (use when you + # want to see what the device sends/expects without a real server) + python bridges/ach_bridge.py --standalone [--port 9034] + +Setup +----- +1. Find the "Remote Hostname/IP" and port in ACEmanager → Dual SIM / WAN → + Call Home (or equivalent menu on your RV50/RV55 firmware). +2. Temporarily change that setting on ONE unit to point at: + your-machine-local-ip : <--port> +3. Run this script. +4. Wait for the unit to trigger / call home. A capture file is written to + bridges/captures/ach_/ alongside an auto-parsed frame log. +5. Revert the unit's ACEmanager setting. + +Output +------ + bridges/captures/ach_/ + raw_client_.bin — raw bytes from the device (S3 side) + raw_server_.bin — raw bytes from the upstream server (BW side) + (empty in standalone mode) + session_.log — human-readable frame parse log + session_.jsonl — JSON-lines frame log (for downstream tooling) + +The raw_client / raw_server files are byte-for-byte compatible with the +existing capture format used by bridges/parse_capture.py and the rest of +the analysis tooling. +""" + +from __future__ import annotations + +import argparse +import asyncio +import datetime +import json +import logging +import os +import sys +from pathlib import Path +from typing import Optional + +# Add project root to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from minimateplus.framing import S3FrameParser, S3Frame + +log = logging.getLogger("ach_bridge") + + +# ── Frame label helpers ────────────────────────────────────────────────────── + +_KNOWN_RSP_SUBS = { + 0xA4: "POLL_RSP", + 0xA5: "BULK_WAVEFORM_RSP", + 0xE0: "ADVANCE_EVENT_RSP", + 0xE1: "EVENT_INDEX_FIRST_RSP", + 0xE3: "MONITOR_STATUS_RSP", + 0xEA: "SERIAL_NUM_RSP", + 0xF3: "WAVEFORM_RECORD_RSP", + 0xF5: "WAVEFORM_HEADER_RSP", + 0xF7: "EVENT_INDEX_RSP", + 0xF9: "UNK_06_RSP", + 0xFE: "DEVICE_INFO_RSP", + # Write acks + 0x97: "EVT_IDX_WRITE_ACK", + 0x8C: "CONFIRM_B_ACK", + 0x8E: "COMPLIANCE_WRITE_ACK", + 0x8D: "CONFIRM_A_ACK", + 0x7D: "TRIGGER_WRITE_ACK", + 0x7C: "TRIGGER_CONFIRM_ACK", + 0x96: "WAVEFORM_WRITE_ACK", + 0x8B: "CONFIRM_C_ACK", + 0x69: "START_MONITOR_ACK", + 0x68: "STOP_MONITOR_ACK", +} + +_KNOWN_REQ_SUBS = { + 0x5B: "POLL", + 0x5A: "BULK_WAVEFORM", + 0x1F: "ADVANCE_EVENT", + 0x1E: "EVENT_INDEX_FIRST", + 0x1C: "MONITOR_STATUS", + 0x15: "SERIAL_NUM", + 0x0C: "WAVEFORM_RECORD", + 0x0A: "WAVEFORM_HEADER", + 0x08: "EVENT_INDEX", + 0x06: "UNK_06", + 0x01: "DEVICE_INFO", + # Write commands + 0x68: "EVT_IDX_WRITE", + 0x73: "CONFIRM_B", + 0x71: "COMPLIANCE_WRITE", + 0x72: "CONFIRM_A", + 0x82: "TRIGGER_WRITE", + 0x83: "TRIGGER_CONFIRM", + 0x69: "WAVEFORM_WRITE", + 0x74: "CONFIRM_C", + 0x96: "START_MONITOR", + 0x97: "STOP_MONITOR", +} + + +def _label_s3_frame(frame: S3Frame) -> str: + name = _KNOWN_RSP_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}") + chk = "✓" if frame.checksum_valid else "✗CHK" + return f"S3→ SUB=0x{frame.sub:02X} ({name}) page=0x{frame.page_key:04X} data={len(frame.data)}B {chk}" + + +def _label_bw_frame(data: bytes) -> str: + """Best-effort label for a raw BW request frame.""" + # BW frame (destuffed): ACK STX [10 10] flags sub ... + # Wire: 41 02 10 10 00 sub ... + if len(data) < 6: + return f"BW→ (short {len(data)}B)" + sub = data[5] + name = _KNOWN_REQ_SUBS.get(sub, f"UNK_0x{sub:02X}") + return f" →BW SUB=0x{sub:02X} ({name}) {len(data)}B" + + +# ── Per-session capture writer ──────────────────────────────────────────────── + +class CaptureSession: + """Writes raw bytes + parsed log for one TCP connection.""" + + def __init__(self, capture_dir: Path, peer: str): + ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + self.dir = capture_dir / f"ach_{ts}" + self.dir.mkdir(parents=True, exist_ok=True) + self.peer = peer + + self._raw_client = open(self.dir / f"raw_client_{ts}.bin", "wb") + self._raw_server = open(self.dir / f"raw_server_{ts}.bin", "wb") + self._log_fh = open(self.dir / f"session_{ts}.log", "w") + self._jsonl_fh = open(self.dir / f"session_{ts}.jsonl", "w") + + self._s3_parser = S3FrameParser() + self._frame_count = 0 + self._byte_count_client = 0 + self._byte_count_server = 0 + + self._log(f"# ACH capture — peer={peer} started={datetime.datetime.now().isoformat()}") + self._log(f"# Output dir: {self.dir}") + log.info("Capture session opened: %s (peer=%s)", self.dir, peer) + + # ── public API ─────────────────────────────────────────────────────────── + + def feed_client(self, data: bytes) -> None: + """Bytes arriving FROM the device (S3 side).""" + self._raw_client.write(data) + self._raw_client.flush() + self._byte_count_client += len(data) + + # Parse S3 frames + for byte in data: + frame = self._s3_parser.feed(bytes([byte])) + if frame: + frames = frame if isinstance(frame, list) else [frame] + for f in frames: + self._frame_count += 1 + label = _label_s3_frame(f) + self._log(f"[{self._frame_count:04d}] {label}") + self._log(f" hex: {f.data[:64].hex()}" + + (" ..." if len(f.data) > 64 else "")) + self._emit_json("s3", f) + + def feed_server(self, data: bytes) -> None: + """Bytes arriving FROM the upstream server (BW side).""" + self._raw_server.write(data) + self._raw_server.flush() + self._byte_count_server += len(data) + label = _label_bw_frame(data) + self._log(f" {label}") + + def close(self, reason: str = "connection closed") -> None: + self._log(f"# Session ended: {reason}") + self._log(f"# Totals — client_bytes={self._byte_count_client} " + f"server_bytes={self._byte_count_server} " + f"s3_frames={self._frame_count}") + for fh in (self._raw_client, self._raw_server, self._log_fh, self._jsonl_fh): + try: + fh.close() + except Exception: + pass + log.info( + "Capture session closed (%s): %dB client, %dB server, %d S3 frames → %s", + reason, self._byte_count_client, self._byte_count_server, + self._frame_count, self.dir, + ) + + # ── internals ──────────────────────────────────────────────────────────── + + def _log(self, msg: str) -> None: + print(msg, file=self._log_fh, flush=True) + # Also echo to console for live monitoring + print(msg) + + def _emit_json(self, direction: str, frame: S3Frame) -> None: + record = { + "dir": direction, + "sub": frame.sub, + "page_key": frame.page_key, + "data_len": len(frame.data), + "data_hex": frame.data.hex(), + "checksum_valid": frame.checksum_valid, + } + print(json.dumps(record), file=self._jsonl_fh, flush=True) + + +# ── Bridge connection handler ───────────────────────────────────────────────── + +class BridgeHandler: + def __init__( + self, + capture_dir: Path, + upstream_host: Optional[str], + upstream_port: Optional[int], + ): + self.capture_dir = capture_dir + self.upstream_host = upstream_host + self.upstream_port = upstream_port + + async def handle( + self, + client_reader: asyncio.StreamReader, + client_writer: asyncio.StreamWriter, + ) -> None: + peer = client_writer.get_extra_info("peername", ("?", 0)) + peer_str = f"{peer[0]}:{peer[1]}" + log.info("Inbound connection from %s", peer_str) + + session = CaptureSession(self.capture_dir, peer_str) + + if self.upstream_host: + # Bridge mode: connect to upstream and relay + try: + up_reader, up_writer = await asyncio.open_connection( + self.upstream_host, self.upstream_port + ) + log.info("Connected to upstream %s:%s", self.upstream_host, self.upstream_port) + except Exception as exc: + log.error("Failed to connect to upstream: %s", exc) + session.close(f"upstream connect failed: {exc}") + client_writer.close() + return + + try: + await asyncio.gather( + self._relay(client_reader, up_writer, session, "client"), + self._relay(up_reader, client_writer, session, "server"), + ) + except asyncio.CancelledError: + pass + except Exception as exc: + log.warning("Bridge relay error: %s", exc) + finally: + session.close("bridge relay ended") + for writer in (client_writer, up_writer): + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + + else: + # Standalone mode: just capture, don't forward + log.info("Standalone mode — recording inbound traffic only") + try: + while True: + data = await client_reader.read(4096) + if not data: + break + session.feed_client(data) + except asyncio.CancelledError: + pass + except Exception as exc: + log.warning("Standalone read error: %s", exc) + finally: + session.close("standalone capture ended") + try: + client_writer.close() + await client_writer.wait_closed() + except Exception: + pass + + async def _relay( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + session: CaptureSession, + direction: str, + ) -> None: + try: + while True: + data = await reader.read(4096) + if not data: + break + if direction == "client": + session.feed_client(data) + else: + session.feed_server(data) + writer.write(data) + await writer.drain() + except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError): + pass + + +# ── Main ────────────────────────────────────────────────────────────────────── + +async def main(args: argparse.Namespace) -> None: + capture_dir = Path(__file__).parent / "captures" + capture_dir.mkdir(parents=True, exist_ok=True) + + upstream_host: Optional[str] = None + upstream_port: Optional[int] = None + + if not args.standalone: + if not args.upstream: + print("ERROR: --upstream HOST:PORT is required unless --standalone is set.") + sys.exit(1) + parts = args.upstream.rsplit(":", 1) + if len(parts) != 2: + print("ERROR: --upstream must be HOST:PORT (e.g. 203.0.113.5:9034)") + sys.exit(1) + upstream_host = parts[0] + upstream_port = int(parts[1]) + + handler = BridgeHandler(capture_dir, upstream_host, upstream_port) + + server = await asyncio.start_server( + handler.handle, + host="0.0.0.0", + port=args.port, + ) + + mode = f"bridge → {upstream_host}:{upstream_port}" if upstream_host else "standalone capture" + addrs = ", ".join(str(s.getsockname()) for s in server.sockets) + print(f"\n{'='*60}") + print(f" ACH bridge listening on {addrs}") + print(f" Mode: {mode}") + print(f" Captures: {capture_dir}/ach_/") + print(f"{'='*60}") + print(f"\n Point your unit's ACEmanager call-home destination to:") + print(f" :{args.port}") + if upstream_host: + print(f"\n All traffic will be forwarded to {upstream_host}:{upstream_port}") + print(f" Your live data feed is uninterrupted.") + print(f"\n Waiting for inbound connection... (Ctrl-C to stop)\n") + + async with server: + await server.serve_forever() + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser( + description="Transparent TCP bridge for capturing MiniMate Plus call-home traffic." + ) + p.add_argument( + "--upstream", "-u", + metavar="HOST:PORT", + help="Upstream ACH server to forward to (e.g. 203.0.113.5:9034). " + "Omit with --standalone for capture-only mode.", + ) + p.add_argument( + "--port", "-p", + type=int, + default=9034, + help="Local port to listen on (default: 9034).", + ) + p.add_argument( + "--standalone", "-s", + action="store_true", + help="Capture-only mode: accept connection but do not forward to upstream.", + ) + p.add_argument( + "--verbose", "-v", + action="store_true", + help="Enable debug logging.", + ) + return p.parse_args() + + +if __name__ == "__main__": + args = parse_args() + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s %(levelname)-7s %(name)s %(message)s", + ) + try: + asyncio.run(main(args)) + except KeyboardInterrupt: + print("\nStopped.") diff --git a/bridges/serial_watch.py b/bridges/serial_watch.py new file mode 100644 index 0000000..cd24f94 --- /dev/null +++ b/bridges/serial_watch.py @@ -0,0 +1,435 @@ +#!/usr/bin/env python3 +""" +serial_watch.py — Instantel Series-3 serial monitor with S3 frame parsing. + +Taps the RS-232 line between the MiniMate Plus and its modem (RV50/RV55). +Saves raw binary captures compatible with the rest of the analysis toolchain, +plus a human-readable frame log. + +Usage +----- + python bridges/serial_watch.py # interactive COM picker + python bridges/serial_watch.py --port COM3 # specify port + python bridges/serial_watch.py --port COM3 --ack-ok # reply OK to AT commands + # (useful if modem is absent + # and you want the device to + # proceed past AT negotiation) + python bridges/serial_watch.py --list # list available ports + +Output +------ + bridges/captures/serial_/ + raw_s3_.bin — raw bytes from device (feeds directly into S3FrameParser) + session_.log — human-readable frame + control-line log + session_.jsonl — JSON-lines frame log + +The raw_s3_*.bin file is byte-for-byte compatible with the existing capture +format used by bridges/parse_capture.py and all analysis scripts. + +What to look for in a call-home capture +---------------------------------------- +1. Does the device talk first after CONNECT, or does it wait? + - If raw_s3_*.bin has bytes before any AT/POLL exchange → PUSH protocol + - If it stays silent → PULL protocol (same as Blastware manual download) + +2. Look for "Operating System" ASCII at the start — the device sends this 16-byte + boot string on cold start before entering DLE-framed mode. + +3. RING/CONNECT from the modem appear as ASCII before the DLE frames — the parser + handles these automatically (scans forward to DLE+STX). +""" + +from __future__ import annotations + +import argparse +import sys +import threading +import time +from datetime import datetime +from pathlib import Path + +try: + import serial + from serial.tools import list_ports +except ModuleNotFoundError: + print( + "pyserial not found. Install with:\n python -m pip install pyserial", + file=sys.stderr, + ) + sys.exit(1) + +# Add project root so we can import the frame parser +sys.path.insert(0, str(Path(__file__).parent.parent)) +from minimateplus.framing import S3FrameParser, S3Frame + +import json + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _ts() -> str: + return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + + +def _hexdump(b: bytes) -> str: + return " ".join(f"{x:02X}" for x in b) + + +def _printable(b: bytes) -> str: + return b.decode("latin1", errors="replace") + + +_KNOWN_SUBS = { + 0xA4: "POLL_RSP", 0xA5: "BULK_WAVEFORM_RSP", 0xE0: "ADVANCE_EVENT_RSP", + 0xE1: "EVENT_IDX_FIRST_RSP", 0xE3: "MONITOR_STATUS_RSP", 0xEA: "SERIAL_NUM_RSP", + 0xF3: "WAVEFORM_RECORD_RSP", 0xF5: "WAVEFORM_HEADER_RSP", 0xF7: "EVENT_INDEX_RSP", + 0xF9: "UNK_06_RSP", 0xFE: "DEVICE_INFO_RSP", + 0x69: "START_MONITOR_ACK", 0x68: "STOP_MONITOR_ACK", + 0x97: "EVT_IDX_WRITE_ACK", 0x8C: "CONFIRM_B_ACK", 0x8E: "COMPLIANCE_WRITE_ACK", + 0x8D: "CONFIRM_A_ACK", 0x7D: "TRIGGER_WRITE_ACK", 0x7C: "TRIGGER_CONFIRM_ACK", + 0x96: "WAVEFORM_WRITE_ACK", 0x8B: "CONFIRM_C_ACK", +} + + +def _label_frame(frame: S3Frame) -> str: + name = _KNOWN_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}") + chk = "✓" if frame.checksum_valid else "✗ BAD_CHK" + peek = frame.data[:24].hex() + ("…" if len(frame.data) > 24 else "") + return ( + f"S3 SUB=0x{frame.sub:02X} ({name:<22}) " + f"page=0x{frame.page_key:04X} data={len(frame.data):4d}B {chk} {peek}" + ) + + +# ── Logger ──────────────────────────────────────────────────────────────────── + +class Logger: + def __init__(self, log_path: Path, jsonl_path: Path, raw_path: Path) -> None: + self._log = log_path.open("a", encoding="utf-8", newline="") + self._jl = jsonl_path.open("a", encoding="utf-8", newline="") + self._raw = raw_path.open("ab") + self._lock = threading.Lock() + self._frame_count = 0 + + def info(self, msg: str) -> None: + line = f"[{_ts()}] INFO | {msg}" + with self._lock: + print(line) + print(line, file=self._log, flush=True) + + def ctrl(self, msg: str) -> None: + line = f"[{_ts()}] CTRL | {msg}" + with self._lock: + print(line) + print(line, file=self._log, flush=True) + + def data_hex(self, msg: str) -> None: + line = f"[{_ts()}] HEX | {msg}" + with self._lock: + print(line) + print(line, file=self._log, flush=True) + + def data_ascii(self, msg: str) -> None: + line = f"[{_ts()}] DATA | {msg}" + with self._lock: + print(line) + print(line, file=self._log, flush=True) + + def frame(self, f: S3Frame) -> None: + with self._lock: + self._frame_count += 1 + label = f"[{_ts()}] FRAME | #{self._frame_count:04d} {_label_frame(f)}" + print(label) + print(label, file=self._log, flush=True) + record = { + "frame": self._frame_count, + "sub": f.sub, + "page_key": f.page_key, + "data_len": len(f.data), + "data_hex": f.data.hex(), + "checksum_valid": f.checksum_valid, + } + print(json.dumps(record), file=self._jl, flush=True) + + def write_raw(self, data: bytes) -> None: + with self._lock: + self._raw.write(data) + self._raw.flush() + + def close(self) -> None: + with self._lock: + for fh in (self._log, self._jl, self._raw): + try: + fh.flush() + fh.close() + except Exception: + pass + + +# ── Control-line monitor thread ─────────────────────────────────────────────── + +def _monitor_control_lines( + ser: serial.Serial, + logger: Logger, + stop: threading.Event, + interval: float, +) -> None: + prev = dict(CTS=None, DSR=None, DCD=None, RI=None) + try: + prev.update(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd) + try: + prev["RI"] = ser.ri + except Exception: + pass + except Exception as exc: + logger.ctrl(f"Init error: {exc}") + return + + logger.ctrl( + f"Initial: CTS={prev['CTS']} DSR={prev['DSR']} DCD={prev['DCD']} RI={prev['RI']}" + ) + while not stop.is_set(): + try: + cur = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd, RI=None) + try: + cur["RI"] = ser.ri + except Exception: + pass + for name, val in cur.items(): + if val != prev[name]: + logger.ctrl(f"{name} → {val}") + prev[name] = val + except serial.SerialException as exc: + logger.ctrl(f"Poll error: {exc}") + break + stop.wait(interval) + + +# ── Serial open ─────────────────────────────────────────────────────────────── + +_PARITY = { + "N": serial.PARITY_NONE, "E": serial.PARITY_EVEN, "O": serial.PARITY_ODD, + "M": serial.PARITY_MARK, "S": serial.PARITY_SPACE, +} +_STOPBITS = { + 1: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, 2: serial.STOPBITS_TWO, +} + + +def _open_serial(args: argparse.Namespace, logger: Logger) -> serial.Serial | None: + for attempt in range(1, args.open_retries + 2): + logger.info( + f"Opening {args.port} @ {args.baud},{args.bytesize}{args.parity}{args.stopbits} " + f"rtscts={args.rtscts} xonxoff={args.xonxoff} dsrdtr={args.dsrdtr} " + f"(attempt {attempt})" + ) + try: + ser = serial.Serial( + port=args.port, + baudrate=args.baud, + bytesize=args.bytesize, + parity=_PARITY[args.parity], + stopbits=_STOPBITS[args.stopbits], + timeout=args.timeout, + xonxoff=args.xonxoff, + rtscts=args.rtscts, + dsrdtr=args.dsrdtr, + write_timeout=0, + ) + try: + ser.setDTR(args.dtr == "on") + ser.setRTS(args.rts == "on") + logger.ctrl(f"Set DTR={args.dtr} RTS={args.rts}") + except Exception as exc: + logger.ctrl(f"DTR/RTS set failed: {exc}") + + if args.send_break > 0: + try: + ser.break_condition = True + time.sleep(args.send_break / 1000.0) + ser.break_condition = False + logger.ctrl(f"BREAK held {args.send_break} ms") + except Exception as exc: + logger.ctrl(f"BREAK failed: {exc}") + + return ser + + except serial.SerialException as exc: + logger.info(f"Open failed: {exc}") + if attempt <= args.open_retries: + time.sleep(args.open_retry_delay) + + return None + + +# ── Port picker ─────────────────────────────────────────────────────────────── + +def _list_ports() -> list: + ports = list(list_ports.comports()) + if not ports: + print("No serial ports found.") + return [] + print("Available serial ports:") + for i, p in enumerate(ports, 1): + print(f" {i:2d}) {p.device:<12} {p.description or ''}") + return ports + + +def _pick_port() -> str: + ports = _list_ports() + if not ports: + sys.exit(1) + if len(ports) == 1: + print(f"Auto-selecting: {ports[0].device}") + return ports[0].device + while True: + sel = input("Select port (number or name, e.g. COM3): ").strip() + if sel.isdigit() and 1 <= int(sel) <= len(ports): + return ports[int(sel) - 1].device + for p in ports: + if p.device.upper() == sel.upper(): + return p.device + print("Not recognised. Enter list number or exact port name.") + + +# ── Main loop ───────────────────────────────────────────────────────────────── + +def main() -> None: + ap = argparse.ArgumentParser( + description="Monitor Instantel Series-3 serial traffic with S3 frame parsing." + ) + ap.add_argument("--port", "-p", + help="COM port (e.g. COM3). Omit to be prompted.") + ap.add_argument("--baud", "-b", type=int, default=38400) + ap.add_argument("--bytesize", type=int, choices=[5, 6, 7, 8], default=8) + ap.add_argument("--parity", choices=["N", "E", "O", "M", "S"], default="N") + ap.add_argument("--stopbits", type=float, choices=[1, 1.5, 2], default=1) + ap.add_argument("--rtscts", action="store_true") + ap.add_argument("--xonxoff", action="store_true") + ap.add_argument("--dsrdtr", action="store_true") + ap.add_argument("--dtr", choices=["on", "off"], default="on") + ap.add_argument("--rts", choices=["on", "off"], default="on") + ap.add_argument("--send-break", type=int, default=0, + help="Hold BREAK for N ms after open.") + ap.add_argument("--show", choices=["ascii", "hex", "both", "frames"], + default="frames", + help="'frames' (default) shows only parsed S3 frames. " + "'ascii'/'hex'/'both' also show raw bytes.") + ap.add_argument("--encoding", default="latin1") + ap.add_argument("--read-chunk", type=int, default=4096) + ap.add_argument("--timeout", type=float, default=0.05) + ap.add_argument("--poll-lines-interval", type=float, default=0.2) + ap.add_argument("--open-retries", type=int, default=0) + ap.add_argument("--open-retry-delay", type=float, default=0.8) + ap.add_argument("--ack-ok", action="store_true", + help="Auto-reply OK to AT* commands (except ATDT). " + "Useful for testing without a real modem.") + ap.add_argument("--list", action="store_true", + help="List available serial ports and exit.") + args = ap.parse_args() + + if args.list: + _list_ports() + return + + args.port = args.port or _pick_port() + + # Build output paths + ts_str = datetime.now().strftime("%Y%m%d_%H%M%S") + out_dir = Path(__file__).parent / "captures" / f"serial_{ts_str}" + out_dir.mkdir(parents=True, exist_ok=True) + + log_path = out_dir / f"session_{ts_str}.log" + jsonl_path = out_dir / f"session_{ts_str}.jsonl" + raw_path = out_dir / f"raw_s3_{ts_str}.bin" + + logger = Logger(log_path, jsonl_path, raw_path) + logger.info(f"Output directory: {out_dir}") + logger.info(f"raw_s3 → {raw_path.name} (compatible with parse_capture.py)") + + ser = _open_serial(args, logger) + if ser is None: + logger.info("Could not open serial port. Exiting.") + logger.close() + sys.exit(1) + + s3_parser = S3FrameParser() + rx_buf = bytearray() + stop_evt = threading.Event() + + ctrl_thread = threading.Thread( + target=_monitor_control_lines, + args=(ser, logger, stop_evt, args.poll_lines_interval), + daemon=True, + ) + ctrl_thread.start() + logger.info("Monitoring started. Waiting for call-home. Press Ctrl+C to stop.") + + try: + while True: + try: + data = ser.read(args.read_chunk) + except serial.SerialException as exc: + logger.info(f"Read error: {exc}") + break + + if not data: + continue + + # 1. Save raw bytes + logger.write_raw(data) + + # 2. Optional raw display + if args.show in ("ascii", "both"): + txt = _printable(data) + for line in txt.splitlines(): + logger.data_ascii(line) + if args.show in ("hex", "both"): + logger.data_hex(_hexdump(data)) + + # 3. Parse S3 frames + for byte in data: + result = s3_parser.feed(bytes([byte])) + if result: + frames = result if isinstance(result, list) else [result] + for f in frames: + logger.frame(f) + + # 4. AT command handling for --ack-ok + if args.ack_ok: + rx_buf.extend(data) + while b"\r" in rx_buf or b"\n" in rx_buf: + for sep in (b"\r", b"\n"): + idx = rx_buf.find(sep) + if idx != -1: + line_bytes = bytes(rx_buf[:idx]) + del rx_buf[:idx + 1] + break + else: + break + + line_str = line_bytes.decode("latin1", errors="ignore").strip().upper() + if line_str.startswith("AT") and not line_str.startswith("ATDT"): + try: + ser.write(b"\r\nOK\r\n") + ser.flush() + logger.info(f"AT ack: {line_str!r} → OK") + except Exception as exc: + logger.info(f"AT ack write failed: {exc}") + + except KeyboardInterrupt: + logger.info("Ctrl+C — stopping.") + + finally: + stop_evt.set() + try: + ser.close() + except Exception: + pass + ctrl_thread.join(timeout=1.0) + logger.info(f"Capture saved to: {out_dir}") + logger.close() + + +if __name__ == "__main__": + main() diff --git a/seismo_lab.py b/seismo_lab.py index 687e6a3..2c85222 100644 --- a/seismo_lab.py +++ b/seismo_lab.py @@ -1071,6 +1071,398 @@ class AnalyzerPanel(tk.Frame): # ───────────────────────────────────────────────────────────────────────────── +# ───────────────────────────────────────────────────────────────────────────── +# Serial Watch panel — tap the RS-232 line between device and modem +# ───────────────────────────────────────────────────────────────────────────── + +try: + import serial as _serial + from serial.tools import list_ports as _list_ports + _SERIAL_OK = True +except ImportError: + _SERIAL_OK = False + +from minimateplus.framing import S3FrameParser as _S3FrameParser # noqa: E402 + +_SW_KNOWN_SUBS = { + 0xA4: "POLL_RSP", 0xA5: "BULK_WAVEFORM_RSP", 0xE0: "ADV_EVENT_RSP", + 0xE1: "EVT_IDX_FIRST_RSP", 0xE3: "MONITOR_STATUS_RSP", 0xEA: "SERIAL_NUM_RSP", + 0xF3: "WAVEFORM_REC_RSP", 0xF5: "WAVEFORM_HDR_RSP", 0xF7: "EVENT_INDEX_RSP", + 0xF9: "UNK_06_RSP", 0xFE: "DEVICE_INFO_RSP", + 0x69: "START_MON_ACK", 0x68: "STOP_MON_ACK", +} + + +class SerialWatchPanel(tk.Frame): + """ + Tap the RS-232 line between the MiniMate Plus and its modem (RV50/RV55). + Runs the serial reader in a background thread; surfaces parsed S3 frames + live in the log view. Writes raw_s3_.bin compatible with Analyzer. + + Typical use for call-home capture: + 1. Connect a USB-to-serial tap to the RS-232 line. + 2. Pick that COM port here, click Start. + 3. Wait for the unit to trigger / call home. + 4. Click Stop, then 'Open in Analyzer' to inspect the frames. + """ + + _COL_FRAME = "#4ec9b0" # teal — parsed S3 frame + _COL_CTRL = "#dcdcaa" # yellow — control-line change + _COL_AT = "#9cdcfe" # blue — AT command / ASCII noise + _COL_ERR = "#f44747" # red — error + + def __init__(self, parent: tk.Widget, on_capture_ready=None, **kw): + """ + on_capture_ready(raw_s3_path: str) — called when capture stops, + so the parent can inject the file into the Analyzer. + """ + super().__init__(parent, bg=BG2, **kw) + self._on_capture_ready = on_capture_ready + self._serial: Optional[object] = None # serial.Serial instance + self._reader_thread: Optional[threading.Thread] = None + self._stop_evt = threading.Event() + self._log_q: queue.Queue[tuple[str, str]] = queue.Queue() # (text, colour) + self._raw_fh = None # open binary file handle + self._raw_path: Optional[str] = None + self._frame_count = 0 + self._build() + self._poll_log_queue() + + # ── build ───────────────────────────────────────────────────────────── + + def _build(self) -> None: + pad = {"padx": 6, "pady": 4} + + cfg = tk.Frame(self, bg=BG2) + cfg.pack(side=tk.TOP, fill=tk.X, padx=4, pady=4) + + # Row 0 — port picker + tk.Label(cfg, text="COM port:", bg=BG2, fg=FG, font=MONO + ).grid(row=0, column=0, sticky="e", **pad) + + self._port_var = tk.StringVar() + self._port_cb = ttk.Combobox(cfg, textvariable=self._port_var, + width=12, font=MONO, state="normal") + self._port_cb.grid(row=0, column=1, sticky="w", **pad) + + tk.Button(cfg, text="↺", bg=BG3, fg=FG, relief="flat", cursor="hand2", + font=MONO, command=self._refresh_ports + ).grid(row=0, column=2, **pad) + + tk.Label(cfg, text=" Baud:", bg=BG2, fg=FG, font=MONO + ).grid(row=0, column=3, sticky="e", **pad) + self._baud_var = tk.StringVar(value="38400") + tk.Entry(cfg, textvariable=self._baud_var, width=8, + bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO + ).grid(row=0, column=4, sticky="w", **pad) + + self._ack_ok_var = tk.BooleanVar(value=False) + tk.Checkbutton(cfg, text="Ack OK to AT commands", + variable=self._ack_ok_var, + bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2, + font=MONO).grid(row=0, column=5, sticky="w", **pad) + + # Row 1 — capture dir + tk.Label(cfg, text="Save to:", bg=BG2, fg=FG, font=MONO + ).grid(row=1, column=0, sticky="e", **pad) + self._dir_var = tk.StringVar( + value=str(SCRIPT_DIR / "bridges" / "captures")) + tk.Entry(cfg, textvariable=self._dir_var, width=40, + bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO + ).grid(row=1, column=1, columnspan=4, sticky="we", **pad) + tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat", + cursor="hand2", font=MONO, command=self._choose_dir + ).grid(row=1, column=5, **pad) + + # Button row + btn_row = tk.Frame(self, bg=BG2) + btn_row.pack(side=tk.TOP, fill=tk.X, padx=4, pady=2) + + self._start_btn = tk.Button( + btn_row, text="Start Watch", bg=GREEN, fg="#000000", + relief="flat", padx=12, cursor="hand2", font=MONO_B, + command=self._start) + self._start_btn.pack(side=tk.LEFT, padx=6) + + self._stop_btn = tk.Button( + btn_row, text="Stop", bg=BG3, fg=FG, + relief="flat", padx=12, cursor="hand2", font=MONO, + command=self._stop, state="disabled") + self._stop_btn.pack(side=tk.LEFT, padx=4) + + self._analyzer_btn = tk.Button( + btn_row, text="Open in Analyzer", bg=BG3, fg=FG, + relief="flat", padx=10, cursor="hand2", font=MONO, + command=self._send_to_analyzer, state="disabled") + self._analyzer_btn.pack(side=tk.LEFT, padx=4) + + tk.Button(btn_row, text="Clear", bg=BG3, fg=FG, + relief="flat", padx=8, cursor="hand2", font=MONO, + command=self._clear_log).pack(side=tk.LEFT, padx=4) + + self._status_var = tk.StringVar(value="Idle") + tk.Label(btn_row, textvariable=self._status_var, + bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=10) + + # Log view + self._log = scrolledtext.ScrolledText( + self, height=24, font=MONO_SM, + bg=BG, fg=FG, insertbackground=FG, + relief="flat", state="disabled", + ) + self._log.pack(fill=tk.BOTH, expand=True, padx=4, pady=4) + self._log.tag_config("frame", foreground=self._COL_FRAME) + self._log.tag_config("ctrl", foreground=self._COL_CTRL) + self._log.tag_config("at", foreground=self._COL_AT) + self._log.tag_config("err", foreground=self._COL_ERR) + self._log.tag_config("dim", foreground=FG_DIM) + + # Populate ports on first load + self._refresh_ports() + + # ── port helpers ────────────────────────────────────────────────────── + + def _refresh_ports(self) -> None: + if not _SERIAL_OK: + self._port_cb["values"] = ["(pyserial not installed)"] + return + ports = [p.device for p in _list_ports.comports()] + self._port_cb["values"] = ports + if ports and not self._port_var.get(): + self._port_var.set(ports[0]) + + def _choose_dir(self) -> None: + d = filedialog.askdirectory(initialdir=self._dir_var.get()) + if d: + self._dir_var.set(d) + + # ── start / stop ────────────────────────────────────────────────────── + + def _start(self) -> None: + if not _SERIAL_OK: + messagebox.showerror( + "pyserial missing", + "Install pyserial first:\n pip install pyserial") + return + + port = self._port_var.get().strip() + if not port or "not installed" in port: + messagebox.showerror("Error", "Select a valid COM port first.") + return + + try: + baud = int(self._baud_var.get().strip()) + except ValueError: + messagebox.showerror("Error", "Invalid baud rate.") + return + + # Open output files + ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + out_dir = Path(self._dir_var.get()) / f"serial_{ts}" + out_dir.mkdir(parents=True, exist_ok=True) + self._raw_path = str(out_dir / f"raw_s3_{ts}.bin") + try: + self._raw_fh = open(self._raw_path, "wb") + except OSError as exc: + messagebox.showerror("Error", f"Cannot open capture file:\n{exc}") + return + + # Open serial port + try: + ser = _serial.Serial( + port=port, baudrate=baud, + bytesize=8, parity=_serial.PARITY_NONE, + stopbits=_serial.STOPBITS_ONE, + timeout=0.05, write_timeout=0, + ) + ser.setDTR(True) + ser.setRTS(True) + except Exception as exc: + self._raw_fh.close() + self._raw_fh = None + messagebox.showerror("Error", f"Cannot open {port}:\n{exc}") + return + + self._serial = ser + self._stop_evt.clear() + self._frame_count = 0 + self._analyzer_btn.configure(state="disabled") + + self._reader_thread = threading.Thread( + target=self._reader_loop, + args=(ser, baud), + daemon=True, + ) + self._reader_thread.start() + + self._status_var.set(f"Watching {port} @ {baud}") + self._start_btn.configure(state="disabled") + self._stop_btn.configure(state="normal", bg=RED) + self._append(f"── Serial watch started {port} @ {baud} [{ts}] ──\n", "dim") + self._append(f" Capture: {self._raw_path}\n", "dim") + self._append(" Waiting for data…\n\n", "dim") + + def _stop(self) -> None: + self._stop_evt.set() + if self._serial: + try: + self._serial.close() + except Exception: + pass + self._serial = None + if self._raw_fh: + self._raw_fh.close() + self._raw_fh = None + self._status_var.set("Stopped") + self._start_btn.configure(state="normal") + self._stop_btn.configure(state="disabled", bg=BG3) + if self._raw_path and Path(self._raw_path).exists(): + self._analyzer_btn.configure(state="normal") + self._append("\n── Watch stopped ──\n", "dim") + + # ── reader thread ───────────────────────────────────────────────────── + + def _reader_loop(self, ser, baud: int) -> None: + parser = _S3FrameParser() + rx_buf = bytearray() + ack_ok = self._ack_ok_var.get() + + # Monitor control lines in a sub-thread + ctrl_stop = threading.Event() + ctrl_thread = threading.Thread( + target=self._ctrl_loop, args=(ser, ctrl_stop), daemon=True) + ctrl_thread.start() + + try: + while not self._stop_evt.is_set(): + try: + data = ser.read(4096) + except Exception as exc: + self._log_q.put((f"Read error: {exc}\n", "err")) + break + + if not data: + continue + + # Save raw bytes + if self._raw_fh: + try: + self._raw_fh.write(data) + self._raw_fh.flush() + except Exception: + pass + + # Parse S3 frames + for byte in data: + result = parser.feed(bytes([byte])) + if result: + frames = result if isinstance(result, list) else [result] + for f in frames: + self._frame_count += 1 + name = _SW_KNOWN_SUBS.get(f.sub, f"UNK_0x{f.sub:02X}") + chk = "✓" if f.checksum_valid else "✗ BAD_CHK" + peek = f.data[:32].hex() + ("…" if len(f.data) > 32 else "") + msg = ( + f"[{self._frame_count:04d}] " + f"SUB=0x{f.sub:02X} ({name:<22}) " + f"page=0x{f.page_key:04X} " + f"data={len(f.data):4d}B {chk}\n" + f" {peek}\n" + ) + self._log_q.put((msg, "frame")) + + # AT command handling for --ack-ok mode + if ack_ok: + rx_buf.extend(data) + while b"\r" in rx_buf or b"\n" in rx_buf: + for sep in (b"\r", b"\n"): + idx = rx_buf.find(sep) + if idx != -1: + line_bytes = bytes(rx_buf[:idx]) + del rx_buf[:idx + 1] + break + else: + break + line_str = line_bytes.decode("latin1", errors="ignore").strip() + if line_str.upper().startswith("AT"): + self._log_q.put((f"AT: {line_str!r}\n", "at")) + if not line_str.upper().startswith("ATDT"): + try: + ser.write(b"\r\nOK\r\n") + ser.flush() + self._log_q.put((f" → OK\n", "at")) + except Exception: + pass + + finally: + ctrl_stop.set() + ctrl_thread.join(timeout=0.5) + # Signal the main thread that the reader ended naturally + if not self._stop_evt.is_set(): + self._log_q.put(("<>", "")) + + def _ctrl_loop(self, ser, stop: threading.Event) -> None: + prev = {} + try: + prev = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd) + try: + prev["RI"] = ser.ri + except Exception: + prev["RI"] = None + except Exception: + return + + while not stop.is_set(): + try: + cur = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd, RI=None) + try: + cur["RI"] = ser.ri + except Exception: + pass + for name, val in cur.items(): + if val != prev.get(name): + self._log_q.put((f"CTRL {name} → {val}\n", "ctrl")) + prev[name] = val + except Exception: + break + stop.wait(0.2) + + # ── log view ────────────────────────────────────────────────────────── + + def _poll_log_queue(self) -> None: + try: + while True: + text, tag = self._log_q.get_nowait() + if text == "<>": + self._stop() + break + self._append(text, tag) + except queue.Empty: + pass + finally: + self.after(80, self._poll_log_queue) + + def _append(self, text: str, tag: str = "") -> None: + self._log.configure(state="normal") + if tag: + self._log.insert(tk.END, text, tag) + else: + self._log.insert(tk.END, text) + self._log.see(tk.END) + self._log.configure(state="disabled") + + def _clear_log(self) -> None: + self._log.configure(state="normal") + self._log.delete("1.0", tk.END) + self._log.configure(state="disabled") + + # ── send to analyzer ────────────────────────────────────────────────── + + def _send_to_analyzer(self) -> None: + if self._raw_path and self._on_capture_ready: + self._on_capture_ready(self._raw_path) + + # Console panel (tk.Frame — lives inside a notebook tab) # ───────────────────────────────────────────────────────────────────────────── @@ -1504,6 +1896,12 @@ class SeismoLab(tk.Tk): ) nb.add(self._console_panel, text=" Console ") + self._serial_watch_panel = SerialWatchPanel( + nb, + on_capture_ready=self._on_serial_capture_ready, + ) + nb.add(self._serial_watch_panel, text=" Serial Watch ") + self._nb = nb self.protocol("WM_DELETE_WINDOW", self._on_close) @@ -1522,8 +1920,14 @@ class SeismoLab(tk.Tk): self._analyzer_panel.s3_var.set(raw_s3_path) self._nb.select(1) + def _on_serial_capture_ready(self, raw_s3_path: str) -> None: + """Serial Watch capture finished → inject into Analyzer and switch tab.""" + self._analyzer_panel.s3_var.set(raw_s3_path) + self._nb.select(1) + def _on_close(self) -> None: self._bridge_panel.stop_bridge() + self._serial_watch_panel._stop() self.destroy()