#!/usr/bin/env python3 """ serial_watch.py — Instantel Series-3 serial monitor with S3 frame parsing. Taps the RS-232 line between the MiniMate Plus and its modem (RV50/RV55). Saves raw binary captures compatible with the rest of the analysis toolchain, plus a human-readable frame log. Usage ----- python bridges/serial_watch.py # interactive COM picker python bridges/serial_watch.py --port COM3 # specify port python bridges/serial_watch.py --port COM3 --ack-ok # reply OK to AT commands # (useful if modem is absent # and you want the device to # proceed past AT negotiation) python bridges/serial_watch.py --list # list available ports Output ------ bridges/captures/serial_/ raw_s3_.bin — raw bytes from device (feeds directly into S3FrameParser) session_.log — human-readable frame + control-line log session_.jsonl — JSON-lines frame log The raw_s3_*.bin file is byte-for-byte compatible with the existing capture format used by bridges/parse_capture.py and all analysis scripts. What to look for in a call-home capture ---------------------------------------- 1. Does the device talk first after CONNECT, or does it wait? - If raw_s3_*.bin has bytes before any AT/POLL exchange → PUSH protocol - If it stays silent → PULL protocol (same as Blastware manual download) 2. Look for "Operating System" ASCII at the start — the device sends this 16-byte boot string on cold start before entering DLE-framed mode. 3. RING/CONNECT from the modem appear as ASCII before the DLE frames — the parser handles these automatically (scans forward to DLE+STX). """ from __future__ import annotations import argparse import sys import threading import time from datetime import datetime from pathlib import Path try: import serial from serial.tools import list_ports except ModuleNotFoundError: print( "pyserial not found. Install with:\n python -m pip install pyserial", file=sys.stderr, ) sys.exit(1) # Add project root so we can import the frame parser sys.path.insert(0, str(Path(__file__).parent.parent)) from minimateplus.framing import S3FrameParser, S3Frame import json # ── Helpers ─────────────────────────────────────────────────────────────────── def _ts() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] def _hexdump(b: bytes) -> str: return " ".join(f"{x:02X}" for x in b) def _printable(b: bytes) -> str: return b.decode("latin1", errors="replace") _KNOWN_SUBS = { 0xA4: "POLL_RSP", 0xA5: "BULK_WAVEFORM_RSP", 0xE0: "ADVANCE_EVENT_RSP", 0xE1: "EVENT_IDX_FIRST_RSP", 0xE3: "MONITOR_STATUS_RSP", 0xEA: "SERIAL_NUM_RSP", 0xF3: "WAVEFORM_RECORD_RSP", 0xF5: "WAVEFORM_HEADER_RSP", 0xF7: "EVENT_INDEX_RSP", 0xF9: "UNK_06_RSP", 0xFE: "DEVICE_INFO_RSP", 0x69: "START_MONITOR_ACK", 0x68: "STOP_MONITOR_ACK", 0x97: "EVT_IDX_WRITE_ACK", 0x8C: "CONFIRM_B_ACK", 0x8E: "COMPLIANCE_WRITE_ACK", 0x8D: "CONFIRM_A_ACK", 0x7D: "TRIGGER_WRITE_ACK", 0x7C: "TRIGGER_CONFIRM_ACK", 0x96: "WAVEFORM_WRITE_ACK", 0x8B: "CONFIRM_C_ACK", } def _label_frame(frame: S3Frame) -> str: name = _KNOWN_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}") chk = "✓" if frame.checksum_valid else "✗ BAD_CHK" peek = frame.data[:24].hex() + ("…" if len(frame.data) > 24 else "") return ( f"S3 SUB=0x{frame.sub:02X} ({name:<22}) " f"page=0x{frame.page_key:04X} data={len(frame.data):4d}B {chk} {peek}" ) # ── Logger ──────────────────────────────────────────────────────────────────── class Logger: def __init__(self, log_path: Path, jsonl_path: Path, raw_path: Path) -> None: self._log = log_path.open("a", encoding="utf-8", newline="") self._jl = jsonl_path.open("a", encoding="utf-8", newline="") self._raw = raw_path.open("ab") self._lock = threading.Lock() self._frame_count = 0 def info(self, msg: str) -> None: line = f"[{_ts()}] INFO | {msg}" with self._lock: print(line) print(line, file=self._log, flush=True) def ctrl(self, msg: str) -> None: line = f"[{_ts()}] CTRL | {msg}" with self._lock: print(line) print(line, file=self._log, flush=True) def data_hex(self, msg: str) -> None: line = f"[{_ts()}] HEX | {msg}" with self._lock: print(line) print(line, file=self._log, flush=True) def data_ascii(self, msg: str) -> None: line = f"[{_ts()}] DATA | {msg}" with self._lock: print(line) print(line, file=self._log, flush=True) def frame(self, f: S3Frame) -> None: with self._lock: self._frame_count += 1 label = f"[{_ts()}] FRAME | #{self._frame_count:04d} {_label_frame(f)}" print(label) print(label, file=self._log, flush=True) record = { "frame": self._frame_count, "sub": f.sub, "page_key": f.page_key, "data_len": len(f.data), "data_hex": f.data.hex(), "checksum_valid": f.checksum_valid, } print(json.dumps(record), file=self._jl, flush=True) def write_raw(self, data: bytes) -> None: with self._lock: self._raw.write(data) self._raw.flush() def close(self) -> None: with self._lock: for fh in (self._log, self._jl, self._raw): try: fh.flush() fh.close() except Exception: pass # ── Control-line monitor thread ─────────────────────────────────────────────── def _monitor_control_lines( ser: serial.Serial, logger: Logger, stop: threading.Event, interval: float, ) -> None: prev = dict(CTS=None, DSR=None, DCD=None, RI=None) try: prev.update(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd) try: prev["RI"] = ser.ri except Exception: pass except Exception as exc: logger.ctrl(f"Init error: {exc}") return logger.ctrl( f"Initial: CTS={prev['CTS']} DSR={prev['DSR']} DCD={prev['DCD']} RI={prev['RI']}" ) while not stop.is_set(): try: cur = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd, RI=None) try: cur["RI"] = ser.ri except Exception: pass for name, val in cur.items(): if val != prev[name]: logger.ctrl(f"{name} → {val}") prev[name] = val except serial.SerialException as exc: logger.ctrl(f"Poll error: {exc}") break stop.wait(interval) # ── Serial open ─────────────────────────────────────────────────────────────── _PARITY = { "N": serial.PARITY_NONE, "E": serial.PARITY_EVEN, "O": serial.PARITY_ODD, "M": serial.PARITY_MARK, "S": serial.PARITY_SPACE, } _STOPBITS = { 1: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, 2: serial.STOPBITS_TWO, } def _open_serial(args: argparse.Namespace, logger: Logger) -> serial.Serial | None: for attempt in range(1, args.open_retries + 2): logger.info( f"Opening {args.port} @ {args.baud},{args.bytesize}{args.parity}{args.stopbits} " f"rtscts={args.rtscts} xonxoff={args.xonxoff} dsrdtr={args.dsrdtr} " f"(attempt {attempt})" ) try: ser = serial.Serial( port=args.port, baudrate=args.baud, bytesize=args.bytesize, parity=_PARITY[args.parity], stopbits=_STOPBITS[args.stopbits], timeout=args.timeout, xonxoff=args.xonxoff, rtscts=args.rtscts, dsrdtr=args.dsrdtr, write_timeout=0, ) try: ser.setDTR(args.dtr == "on") ser.setRTS(args.rts == "on") logger.ctrl(f"Set DTR={args.dtr} RTS={args.rts}") except Exception as exc: logger.ctrl(f"DTR/RTS set failed: {exc}") if args.send_break > 0: try: ser.break_condition = True time.sleep(args.send_break / 1000.0) ser.break_condition = False logger.ctrl(f"BREAK held {args.send_break} ms") except Exception as exc: logger.ctrl(f"BREAK failed: {exc}") return ser except serial.SerialException as exc: logger.info(f"Open failed: {exc}") if attempt <= args.open_retries: time.sleep(args.open_retry_delay) return None # ── Port picker ─────────────────────────────────────────────────────────────── def _list_ports() -> list: ports = list(list_ports.comports()) if not ports: print("No serial ports found.") return [] print("Available serial ports:") for i, p in enumerate(ports, 1): print(f" {i:2d}) {p.device:<12} {p.description or ''}") return ports def _pick_port() -> str: ports = _list_ports() if not ports: sys.exit(1) if len(ports) == 1: print(f"Auto-selecting: {ports[0].device}") return ports[0].device while True: sel = input("Select port (number or name, e.g. COM3): ").strip() if sel.isdigit() and 1 <= int(sel) <= len(ports): return ports[int(sel) - 1].device for p in ports: if p.device.upper() == sel.upper(): return p.device print("Not recognised. Enter list number or exact port name.") # ── Main loop ───────────────────────────────────────────────────────────────── def main() -> None: ap = argparse.ArgumentParser( description="Monitor Instantel Series-3 serial traffic with S3 frame parsing." ) ap.add_argument("--port", "-p", help="COM port (e.g. COM3). Omit to be prompted.") ap.add_argument("--baud", "-b", type=int, default=38400) ap.add_argument("--bytesize", type=int, choices=[5, 6, 7, 8], default=8) ap.add_argument("--parity", choices=["N", "E", "O", "M", "S"], default="N") ap.add_argument("--stopbits", type=float, choices=[1, 1.5, 2], default=1) ap.add_argument("--rtscts", action="store_true") ap.add_argument("--xonxoff", action="store_true") ap.add_argument("--dsrdtr", action="store_true") ap.add_argument("--dtr", choices=["on", "off"], default="on") ap.add_argument("--rts", choices=["on", "off"], default="on") ap.add_argument("--send-break", type=int, default=0, help="Hold BREAK for N ms after open.") ap.add_argument("--show", choices=["ascii", "hex", "both", "frames"], default="frames", help="'frames' (default) shows only parsed S3 frames. " "'ascii'/'hex'/'both' also show raw bytes.") ap.add_argument("--encoding", default="latin1") ap.add_argument("--read-chunk", type=int, default=4096) ap.add_argument("--timeout", type=float, default=0.05) ap.add_argument("--poll-lines-interval", type=float, default=0.2) ap.add_argument("--open-retries", type=int, default=0) ap.add_argument("--open-retry-delay", type=float, default=0.8) ap.add_argument("--ack-ok", action="store_true", help="Auto-reply OK to AT* commands (except ATDT). " "Useful for testing without a real modem.") ap.add_argument("--list", action="store_true", help="List available serial ports and exit.") args = ap.parse_args() if args.list: _list_ports() return args.port = args.port or _pick_port() # Build output paths ts_str = datetime.now().strftime("%Y%m%d_%H%M%S") out_dir = Path(__file__).parent / "captures" / f"serial_{ts_str}" out_dir.mkdir(parents=True, exist_ok=True) log_path = out_dir / f"session_{ts_str}.log" jsonl_path = out_dir / f"session_{ts_str}.jsonl" raw_path = out_dir / f"raw_s3_{ts_str}.bin" logger = Logger(log_path, jsonl_path, raw_path) logger.info(f"Output directory: {out_dir}") logger.info(f"raw_s3 → {raw_path.name} (compatible with parse_capture.py)") ser = _open_serial(args, logger) if ser is None: logger.info("Could not open serial port. Exiting.") logger.close() sys.exit(1) s3_parser = S3FrameParser() rx_buf = bytearray() stop_evt = threading.Event() ctrl_thread = threading.Thread( target=_monitor_control_lines, args=(ser, logger, stop_evt, args.poll_lines_interval), daemon=True, ) ctrl_thread.start() logger.info("Monitoring started. Waiting for call-home. Press Ctrl+C to stop.") try: while True: try: data = ser.read(args.read_chunk) except serial.SerialException as exc: logger.info(f"Read error: {exc}") break if not data: continue # 1. Save raw bytes logger.write_raw(data) # 2. Optional raw display if args.show in ("ascii", "both"): txt = _printable(data) for line in txt.splitlines(): logger.data_ascii(line) if args.show in ("hex", "both"): logger.data_hex(_hexdump(data)) # 3. Parse S3 frames for byte in data: result = s3_parser.feed(bytes([byte])) if result: frames = result if isinstance(result, list) else [result] for f in frames: logger.frame(f) # 4. AT command handling for --ack-ok if args.ack_ok: rx_buf.extend(data) while b"\r" in rx_buf or b"\n" in rx_buf: for sep in (b"\r", b"\n"): idx = rx_buf.find(sep) if idx != -1: line_bytes = bytes(rx_buf[:idx]) del rx_buf[:idx + 1] break else: break line_str = line_bytes.decode("latin1", errors="ignore").strip().upper() if line_str.startswith("AT") and not line_str.startswith("ATDT"): try: ser.write(b"\r\nOK\r\n") ser.flush() logger.info(f"AT ack: {line_str!r} → OK") except Exception as exc: logger.info(f"AT ack write failed: {exc}") except KeyboardInterrupt: logger.info("Ctrl+C — stopping.") finally: stop_evt.set() try: ser.close() except Exception: pass ctrl_thread.join(timeout=1.0) logger.info(f"Capture saved to: {out_dir}") logger.close() if __name__ == "__main__": main()