feat: add ACH TCP bridge, serial tap tool, and Serial Watch tab

- bridges/ach_bridge.py: transparent TCP bridge that MITMs the MiniMate Plus
  call-home connection — forwards to real ACH server while logging all frames
  to raw_client/raw_server .bin files compatible with parse_capture.py;
  standalone capture mode for lab use without a real server

- bridges/serial_watch.py: RS-232 serial monitor with live S3 frame parsing;
  taps the line between MiniMate and modem (RV50/RV55); captures raw bytes,
  .log and .jsonl; --ack-ok mode auto-replies to AT commands; fixed fatal
  indentation bug in the original that silently prevented any data capture

- seismo_lab.py: new "Serial Watch" fourth tab (SerialWatchPanel) wrapping
  serial_watch.py functionality; COM port picker with refresh, baud config,
  ack-ok toggle, colour-coded live frame log (teal frames / yellow ctrl /
  blue AT), raw .bin capture auto-fed into Analyzer tab on stop

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-09 12:10:52 -04:00
committed by serversdown
parent 2db565ff9c
commit 37d32077a4
3 changed files with 1240 additions and 0 deletions
+401
View File
@@ -0,0 +1,401 @@
#!/usr/bin/env python3
"""
ach_bridge.py — Transparent TCP bridge for capturing Instantel MiniMate Plus
call-home (ACH) traffic.
Usage
-----
# Bridge mode: forward to real ACH server while logging
python bridges/ach_bridge.py --upstream HOST:PORT [--port 9034]
# Standalone capture mode: accept connection, don't forward (use when you
# want to see what the device sends/expects without a real server)
python bridges/ach_bridge.py --standalone [--port 9034]
Setup
-----
1. Find the "Remote Hostname/IP" and port in ACEmanager → Dual SIM / WAN →
Call Home (or equivalent menu on your RV50/RV55 firmware).
2. Temporarily change that setting on ONE unit to point at:
your-machine-local-ip : <--port>
3. Run this script.
4. Wait for the unit to trigger / call home. A capture file is written to
bridges/captures/ach_<timestamp>/ alongside an auto-parsed frame log.
5. Revert the unit's ACEmanager setting.
Output
------
bridges/captures/ach_<ISO-timestamp>/
raw_client_<ts>.bin — raw bytes from the device (S3 side)
raw_server_<ts>.bin — raw bytes from the upstream server (BW side)
(empty in standalone mode)
session_<ts>.log — human-readable frame parse log
session_<ts>.jsonl — JSON-lines frame log (for downstream tooling)
The raw_client / raw_server files are byte-for-byte compatible with the
existing capture format used by bridges/parse_capture.py and the rest of
the analysis tooling.
"""
from __future__ import annotations
import argparse
import asyncio
import datetime
import json
import logging
import os
import sys
from pathlib import Path
from typing import Optional
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from minimateplus.framing import S3FrameParser, S3Frame
log = logging.getLogger("ach_bridge")
# ── Frame label helpers ──────────────────────────────────────────────────────
_KNOWN_RSP_SUBS = {
0xA4: "POLL_RSP",
0xA5: "BULK_WAVEFORM_RSP",
0xE0: "ADVANCE_EVENT_RSP",
0xE1: "EVENT_INDEX_FIRST_RSP",
0xE3: "MONITOR_STATUS_RSP",
0xEA: "SERIAL_NUM_RSP",
0xF3: "WAVEFORM_RECORD_RSP",
0xF5: "WAVEFORM_HEADER_RSP",
0xF7: "EVENT_INDEX_RSP",
0xF9: "UNK_06_RSP",
0xFE: "DEVICE_INFO_RSP",
# Write acks
0x97: "EVT_IDX_WRITE_ACK",
0x8C: "CONFIRM_B_ACK",
0x8E: "COMPLIANCE_WRITE_ACK",
0x8D: "CONFIRM_A_ACK",
0x7D: "TRIGGER_WRITE_ACK",
0x7C: "TRIGGER_CONFIRM_ACK",
0x96: "WAVEFORM_WRITE_ACK",
0x8B: "CONFIRM_C_ACK",
0x69: "START_MONITOR_ACK",
0x68: "STOP_MONITOR_ACK",
}
_KNOWN_REQ_SUBS = {
0x5B: "POLL",
0x5A: "BULK_WAVEFORM",
0x1F: "ADVANCE_EVENT",
0x1E: "EVENT_INDEX_FIRST",
0x1C: "MONITOR_STATUS",
0x15: "SERIAL_NUM",
0x0C: "WAVEFORM_RECORD",
0x0A: "WAVEFORM_HEADER",
0x08: "EVENT_INDEX",
0x06: "UNK_06",
0x01: "DEVICE_INFO",
# Write commands
0x68: "EVT_IDX_WRITE",
0x73: "CONFIRM_B",
0x71: "COMPLIANCE_WRITE",
0x72: "CONFIRM_A",
0x82: "TRIGGER_WRITE",
0x83: "TRIGGER_CONFIRM",
0x69: "WAVEFORM_WRITE",
0x74: "CONFIRM_C",
0x96: "START_MONITOR",
0x97: "STOP_MONITOR",
}
def _label_s3_frame(frame: S3Frame) -> str:
name = _KNOWN_RSP_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}")
chk = "" if frame.checksum_valid else "✗CHK"
return f"S3→ SUB=0x{frame.sub:02X} ({name}) page=0x{frame.page_key:04X} data={len(frame.data)}B {chk}"
def _label_bw_frame(data: bytes) -> str:
"""Best-effort label for a raw BW request frame."""
# BW frame (destuffed): ACK STX [10 10] flags sub ...
# Wire: 41 02 10 10 00 sub ...
if len(data) < 6:
return f"BW→ (short {len(data)}B)"
sub = data[5]
name = _KNOWN_REQ_SUBS.get(sub, f"UNK_0x{sub:02X}")
return f" →BW SUB=0x{sub:02X} ({name}) {len(data)}B"
# ── Per-session capture writer ────────────────────────────────────────────────
class CaptureSession:
"""Writes raw bytes + parsed log for one TCP connection."""
def __init__(self, capture_dir: Path, peer: str):
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
self.dir = capture_dir / f"ach_{ts}"
self.dir.mkdir(parents=True, exist_ok=True)
self.peer = peer
self._raw_client = open(self.dir / f"raw_client_{ts}.bin", "wb")
self._raw_server = open(self.dir / f"raw_server_{ts}.bin", "wb")
self._log_fh = open(self.dir / f"session_{ts}.log", "w")
self._jsonl_fh = open(self.dir / f"session_{ts}.jsonl", "w")
self._s3_parser = S3FrameParser()
self._frame_count = 0
self._byte_count_client = 0
self._byte_count_server = 0
self._log(f"# ACH capture — peer={peer} started={datetime.datetime.now().isoformat()}")
self._log(f"# Output dir: {self.dir}")
log.info("Capture session opened: %s (peer=%s)", self.dir, peer)
# ── public API ───────────────────────────────────────────────────────────
def feed_client(self, data: bytes) -> None:
"""Bytes arriving FROM the device (S3 side)."""
self._raw_client.write(data)
self._raw_client.flush()
self._byte_count_client += len(data)
# Parse S3 frames
for byte in data:
frame = self._s3_parser.feed(bytes([byte]))
if frame:
frames = frame if isinstance(frame, list) else [frame]
for f in frames:
self._frame_count += 1
label = _label_s3_frame(f)
self._log(f"[{self._frame_count:04d}] {label}")
self._log(f" hex: {f.data[:64].hex()}"
+ (" ..." if len(f.data) > 64 else ""))
self._emit_json("s3", f)
def feed_server(self, data: bytes) -> None:
"""Bytes arriving FROM the upstream server (BW side)."""
self._raw_server.write(data)
self._raw_server.flush()
self._byte_count_server += len(data)
label = _label_bw_frame(data)
self._log(f" {label}")
def close(self, reason: str = "connection closed") -> None:
self._log(f"# Session ended: {reason}")
self._log(f"# Totals — client_bytes={self._byte_count_client} "
f"server_bytes={self._byte_count_server} "
f"s3_frames={self._frame_count}")
for fh in (self._raw_client, self._raw_server, self._log_fh, self._jsonl_fh):
try:
fh.close()
except Exception:
pass
log.info(
"Capture session closed (%s): %dB client, %dB server, %d S3 frames → %s",
reason, self._byte_count_client, self._byte_count_server,
self._frame_count, self.dir,
)
# ── internals ────────────────────────────────────────────────────────────
def _log(self, msg: str) -> None:
print(msg, file=self._log_fh, flush=True)
# Also echo to console for live monitoring
print(msg)
def _emit_json(self, direction: str, frame: S3Frame) -> None:
record = {
"dir": direction,
"sub": frame.sub,
"page_key": frame.page_key,
"data_len": len(frame.data),
"data_hex": frame.data.hex(),
"checksum_valid": frame.checksum_valid,
}
print(json.dumps(record), file=self._jsonl_fh, flush=True)
# ── Bridge connection handler ─────────────────────────────────────────────────
class BridgeHandler:
def __init__(
self,
capture_dir: Path,
upstream_host: Optional[str],
upstream_port: Optional[int],
):
self.capture_dir = capture_dir
self.upstream_host = upstream_host
self.upstream_port = upstream_port
async def handle(
self,
client_reader: asyncio.StreamReader,
client_writer: asyncio.StreamWriter,
) -> None:
peer = client_writer.get_extra_info("peername", ("?", 0))
peer_str = f"{peer[0]}:{peer[1]}"
log.info("Inbound connection from %s", peer_str)
session = CaptureSession(self.capture_dir, peer_str)
if self.upstream_host:
# Bridge mode: connect to upstream and relay
try:
up_reader, up_writer = await asyncio.open_connection(
self.upstream_host, self.upstream_port
)
log.info("Connected to upstream %s:%s", self.upstream_host, self.upstream_port)
except Exception as exc:
log.error("Failed to connect to upstream: %s", exc)
session.close(f"upstream connect failed: {exc}")
client_writer.close()
return
try:
await asyncio.gather(
self._relay(client_reader, up_writer, session, "client"),
self._relay(up_reader, client_writer, session, "server"),
)
except asyncio.CancelledError:
pass
except Exception as exc:
log.warning("Bridge relay error: %s", exc)
finally:
session.close("bridge relay ended")
for writer in (client_writer, up_writer):
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
else:
# Standalone mode: just capture, don't forward
log.info("Standalone mode — recording inbound traffic only")
try:
while True:
data = await client_reader.read(4096)
if not data:
break
session.feed_client(data)
except asyncio.CancelledError:
pass
except Exception as exc:
log.warning("Standalone read error: %s", exc)
finally:
session.close("standalone capture ended")
try:
client_writer.close()
await client_writer.wait_closed()
except Exception:
pass
async def _relay(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
session: CaptureSession,
direction: str,
) -> None:
try:
while True:
data = await reader.read(4096)
if not data:
break
if direction == "client":
session.feed_client(data)
else:
session.feed_server(data)
writer.write(data)
await writer.drain()
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
pass
# ── Main ──────────────────────────────────────────────────────────────────────
async def main(args: argparse.Namespace) -> None:
capture_dir = Path(__file__).parent / "captures"
capture_dir.mkdir(parents=True, exist_ok=True)
upstream_host: Optional[str] = None
upstream_port: Optional[int] = None
if not args.standalone:
if not args.upstream:
print("ERROR: --upstream HOST:PORT is required unless --standalone is set.")
sys.exit(1)
parts = args.upstream.rsplit(":", 1)
if len(parts) != 2:
print("ERROR: --upstream must be HOST:PORT (e.g. 203.0.113.5:9034)")
sys.exit(1)
upstream_host = parts[0]
upstream_port = int(parts[1])
handler = BridgeHandler(capture_dir, upstream_host, upstream_port)
server = await asyncio.start_server(
handler.handle,
host="0.0.0.0",
port=args.port,
)
mode = f"bridge → {upstream_host}:{upstream_port}" if upstream_host else "standalone capture"
addrs = ", ".join(str(s.getsockname()) for s in server.sockets)
print(f"\n{'='*60}")
print(f" ACH bridge listening on {addrs}")
print(f" Mode: {mode}")
print(f" Captures: {capture_dir}/ach_<timestamp>/")
print(f"{'='*60}")
print(f"\n Point your unit's ACEmanager call-home destination to:")
print(f" <this machine's LAN IP>:{args.port}")
if upstream_host:
print(f"\n All traffic will be forwarded to {upstream_host}:{upstream_port}")
print(f" Your live data feed is uninterrupted.")
print(f"\n Waiting for inbound connection... (Ctrl-C to stop)\n")
async with server:
await server.serve_forever()
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Transparent TCP bridge for capturing MiniMate Plus call-home traffic."
)
p.add_argument(
"--upstream", "-u",
metavar="HOST:PORT",
help="Upstream ACH server to forward to (e.g. 203.0.113.5:9034). "
"Omit with --standalone for capture-only mode.",
)
p.add_argument(
"--port", "-p",
type=int,
default=9034,
help="Local port to listen on (default: 9034).",
)
p.add_argument(
"--standalone", "-s",
action="store_true",
help="Capture-only mode: accept connection but do not forward to upstream.",
)
p.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable debug logging.",
)
return p.parse_args()
if __name__ == "__main__":
args = parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
)
try:
asyncio.run(main(args))
except KeyboardInterrupt:
print("\nStopped.")
+435
View File
@@ -0,0 +1,435 @@
#!/usr/bin/env python3
"""
serial_watch.py — Instantel Series-3 serial monitor with S3 frame parsing.
Taps the RS-232 line between the MiniMate Plus and its modem (RV50/RV55).
Saves raw binary captures compatible with the rest of the analysis toolchain,
plus a human-readable frame log.
Usage
-----
python bridges/serial_watch.py # interactive COM picker
python bridges/serial_watch.py --port COM3 # specify port
python bridges/serial_watch.py --port COM3 --ack-ok # reply OK to AT commands
# (useful if modem is absent
# and you want the device to
# proceed past AT negotiation)
python bridges/serial_watch.py --list # list available ports
Output
------
bridges/captures/serial_<ISO-timestamp>/
raw_s3_<ts>.bin — raw bytes from device (feeds directly into S3FrameParser)
session_<ts>.log — human-readable frame + control-line log
session_<ts>.jsonl — JSON-lines frame log
The raw_s3_*.bin file is byte-for-byte compatible with the existing capture
format used by bridges/parse_capture.py and all analysis scripts.
What to look for in a call-home capture
----------------------------------------
1. Does the device talk first after CONNECT, or does it wait?
- If raw_s3_*.bin has bytes before any AT/POLL exchange → PUSH protocol
- If it stays silent → PULL protocol (same as Blastware manual download)
2. Look for "Operating System" ASCII at the start — the device sends this 16-byte
boot string on cold start before entering DLE-framed mode.
3. RING/CONNECT from the modem appear as ASCII before the DLE frames — the parser
handles these automatically (scans forward to DLE+STX).
"""
from __future__ import annotations
import argparse
import sys
import threading
import time
from datetime import datetime
from pathlib import Path
try:
import serial
from serial.tools import list_ports
except ModuleNotFoundError:
print(
"pyserial not found. Install with:\n python -m pip install pyserial",
file=sys.stderr,
)
sys.exit(1)
# Add project root so we can import the frame parser
sys.path.insert(0, str(Path(__file__).parent.parent))
from minimateplus.framing import S3FrameParser, S3Frame
import json
# ── Helpers ───────────────────────────────────────────────────────────────────
def _ts() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
def _hexdump(b: bytes) -> str:
return " ".join(f"{x:02X}" for x in b)
def _printable(b: bytes) -> str:
return b.decode("latin1", errors="replace")
_KNOWN_SUBS = {
0xA4: "POLL_RSP", 0xA5: "BULK_WAVEFORM_RSP", 0xE0: "ADVANCE_EVENT_RSP",
0xE1: "EVENT_IDX_FIRST_RSP", 0xE3: "MONITOR_STATUS_RSP", 0xEA: "SERIAL_NUM_RSP",
0xF3: "WAVEFORM_RECORD_RSP", 0xF5: "WAVEFORM_HEADER_RSP", 0xF7: "EVENT_INDEX_RSP",
0xF9: "UNK_06_RSP", 0xFE: "DEVICE_INFO_RSP",
0x69: "START_MONITOR_ACK", 0x68: "STOP_MONITOR_ACK",
0x97: "EVT_IDX_WRITE_ACK", 0x8C: "CONFIRM_B_ACK", 0x8E: "COMPLIANCE_WRITE_ACK",
0x8D: "CONFIRM_A_ACK", 0x7D: "TRIGGER_WRITE_ACK", 0x7C: "TRIGGER_CONFIRM_ACK",
0x96: "WAVEFORM_WRITE_ACK", 0x8B: "CONFIRM_C_ACK",
}
def _label_frame(frame: S3Frame) -> str:
name = _KNOWN_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}")
chk = "" if frame.checksum_valid else "✗ BAD_CHK"
peek = frame.data[:24].hex() + ("" if len(frame.data) > 24 else "")
return (
f"S3 SUB=0x{frame.sub:02X} ({name:<22}) "
f"page=0x{frame.page_key:04X} data={len(frame.data):4d}B {chk} {peek}"
)
# ── Logger ────────────────────────────────────────────────────────────────────
class Logger:
def __init__(self, log_path: Path, jsonl_path: Path, raw_path: Path) -> None:
self._log = log_path.open("a", encoding="utf-8", newline="")
self._jl = jsonl_path.open("a", encoding="utf-8", newline="")
self._raw = raw_path.open("ab")
self._lock = threading.Lock()
self._frame_count = 0
def info(self, msg: str) -> None:
line = f"[{_ts()}] INFO | {msg}"
with self._lock:
print(line)
print(line, file=self._log, flush=True)
def ctrl(self, msg: str) -> None:
line = f"[{_ts()}] CTRL | {msg}"
with self._lock:
print(line)
print(line, file=self._log, flush=True)
def data_hex(self, msg: str) -> None:
line = f"[{_ts()}] HEX | {msg}"
with self._lock:
print(line)
print(line, file=self._log, flush=True)
def data_ascii(self, msg: str) -> None:
line = f"[{_ts()}] DATA | {msg}"
with self._lock:
print(line)
print(line, file=self._log, flush=True)
def frame(self, f: S3Frame) -> None:
with self._lock:
self._frame_count += 1
label = f"[{_ts()}] FRAME | #{self._frame_count:04d} {_label_frame(f)}"
print(label)
print(label, file=self._log, flush=True)
record = {
"frame": self._frame_count,
"sub": f.sub,
"page_key": f.page_key,
"data_len": len(f.data),
"data_hex": f.data.hex(),
"checksum_valid": f.checksum_valid,
}
print(json.dumps(record), file=self._jl, flush=True)
def write_raw(self, data: bytes) -> None:
with self._lock:
self._raw.write(data)
self._raw.flush()
def close(self) -> None:
with self._lock:
for fh in (self._log, self._jl, self._raw):
try:
fh.flush()
fh.close()
except Exception:
pass
# ── Control-line monitor thread ───────────────────────────────────────────────
def _monitor_control_lines(
ser: serial.Serial,
logger: Logger,
stop: threading.Event,
interval: float,
) -> None:
prev = dict(CTS=None, DSR=None, DCD=None, RI=None)
try:
prev.update(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd)
try:
prev["RI"] = ser.ri
except Exception:
pass
except Exception as exc:
logger.ctrl(f"Init error: {exc}")
return
logger.ctrl(
f"Initial: CTS={prev['CTS']} DSR={prev['DSR']} DCD={prev['DCD']} RI={prev['RI']}"
)
while not stop.is_set():
try:
cur = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd, RI=None)
try:
cur["RI"] = ser.ri
except Exception:
pass
for name, val in cur.items():
if val != prev[name]:
logger.ctrl(f"{name}{val}")
prev[name] = val
except serial.SerialException as exc:
logger.ctrl(f"Poll error: {exc}")
break
stop.wait(interval)
# ── Serial open ───────────────────────────────────────────────────────────────
_PARITY = {
"N": serial.PARITY_NONE, "E": serial.PARITY_EVEN, "O": serial.PARITY_ODD,
"M": serial.PARITY_MARK, "S": serial.PARITY_SPACE,
}
_STOPBITS = {
1: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, 2: serial.STOPBITS_TWO,
}
def _open_serial(args: argparse.Namespace, logger: Logger) -> serial.Serial | None:
for attempt in range(1, args.open_retries + 2):
logger.info(
f"Opening {args.port} @ {args.baud},{args.bytesize}{args.parity}{args.stopbits} "
f"rtscts={args.rtscts} xonxoff={args.xonxoff} dsrdtr={args.dsrdtr} "
f"(attempt {attempt})"
)
try:
ser = serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=args.bytesize,
parity=_PARITY[args.parity],
stopbits=_STOPBITS[args.stopbits],
timeout=args.timeout,
xonxoff=args.xonxoff,
rtscts=args.rtscts,
dsrdtr=args.dsrdtr,
write_timeout=0,
)
try:
ser.setDTR(args.dtr == "on")
ser.setRTS(args.rts == "on")
logger.ctrl(f"Set DTR={args.dtr} RTS={args.rts}")
except Exception as exc:
logger.ctrl(f"DTR/RTS set failed: {exc}")
if args.send_break > 0:
try:
ser.break_condition = True
time.sleep(args.send_break / 1000.0)
ser.break_condition = False
logger.ctrl(f"BREAK held {args.send_break} ms")
except Exception as exc:
logger.ctrl(f"BREAK failed: {exc}")
return ser
except serial.SerialException as exc:
logger.info(f"Open failed: {exc}")
if attempt <= args.open_retries:
time.sleep(args.open_retry_delay)
return None
# ── Port picker ───────────────────────────────────────────────────────────────
def _list_ports() -> list:
ports = list(list_ports.comports())
if not ports:
print("No serial ports found.")
return []
print("Available serial ports:")
for i, p in enumerate(ports, 1):
print(f" {i:2d}) {p.device:<12} {p.description or ''}")
return ports
def _pick_port() -> str:
ports = _list_ports()
if not ports:
sys.exit(1)
if len(ports) == 1:
print(f"Auto-selecting: {ports[0].device}")
return ports[0].device
while True:
sel = input("Select port (number or name, e.g. COM3): ").strip()
if sel.isdigit() and 1 <= int(sel) <= len(ports):
return ports[int(sel) - 1].device
for p in ports:
if p.device.upper() == sel.upper():
return p.device
print("Not recognised. Enter list number or exact port name.")
# ── Main loop ─────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(
description="Monitor Instantel Series-3 serial traffic with S3 frame parsing."
)
ap.add_argument("--port", "-p",
help="COM port (e.g. COM3). Omit to be prompted.")
ap.add_argument("--baud", "-b", type=int, default=38400)
ap.add_argument("--bytesize", type=int, choices=[5, 6, 7, 8], default=8)
ap.add_argument("--parity", choices=["N", "E", "O", "M", "S"], default="N")
ap.add_argument("--stopbits", type=float, choices=[1, 1.5, 2], default=1)
ap.add_argument("--rtscts", action="store_true")
ap.add_argument("--xonxoff", action="store_true")
ap.add_argument("--dsrdtr", action="store_true")
ap.add_argument("--dtr", choices=["on", "off"], default="on")
ap.add_argument("--rts", choices=["on", "off"], default="on")
ap.add_argument("--send-break", type=int, default=0,
help="Hold BREAK for N ms after open.")
ap.add_argument("--show", choices=["ascii", "hex", "both", "frames"],
default="frames",
help="'frames' (default) shows only parsed S3 frames. "
"'ascii'/'hex'/'both' also show raw bytes.")
ap.add_argument("--encoding", default="latin1")
ap.add_argument("--read-chunk", type=int, default=4096)
ap.add_argument("--timeout", type=float, default=0.05)
ap.add_argument("--poll-lines-interval", type=float, default=0.2)
ap.add_argument("--open-retries", type=int, default=0)
ap.add_argument("--open-retry-delay", type=float, default=0.8)
ap.add_argument("--ack-ok", action="store_true",
help="Auto-reply OK to AT* commands (except ATDT). "
"Useful for testing without a real modem.")
ap.add_argument("--list", action="store_true",
help="List available serial ports and exit.")
args = ap.parse_args()
if args.list:
_list_ports()
return
args.port = args.port or _pick_port()
# Build output paths
ts_str = datetime.now().strftime("%Y%m%d_%H%M%S")
out_dir = Path(__file__).parent / "captures" / f"serial_{ts_str}"
out_dir.mkdir(parents=True, exist_ok=True)
log_path = out_dir / f"session_{ts_str}.log"
jsonl_path = out_dir / f"session_{ts_str}.jsonl"
raw_path = out_dir / f"raw_s3_{ts_str}.bin"
logger = Logger(log_path, jsonl_path, raw_path)
logger.info(f"Output directory: {out_dir}")
logger.info(f"raw_s3 → {raw_path.name} (compatible with parse_capture.py)")
ser = _open_serial(args, logger)
if ser is None:
logger.info("Could not open serial port. Exiting.")
logger.close()
sys.exit(1)
s3_parser = S3FrameParser()
rx_buf = bytearray()
stop_evt = threading.Event()
ctrl_thread = threading.Thread(
target=_monitor_control_lines,
args=(ser, logger, stop_evt, args.poll_lines_interval),
daemon=True,
)
ctrl_thread.start()
logger.info("Monitoring started. Waiting for call-home. Press Ctrl+C to stop.")
try:
while True:
try:
data = ser.read(args.read_chunk)
except serial.SerialException as exc:
logger.info(f"Read error: {exc}")
break
if not data:
continue
# 1. Save raw bytes
logger.write_raw(data)
# 2. Optional raw display
if args.show in ("ascii", "both"):
txt = _printable(data)
for line in txt.splitlines():
logger.data_ascii(line)
if args.show in ("hex", "both"):
logger.data_hex(_hexdump(data))
# 3. Parse S3 frames
for byte in data:
result = s3_parser.feed(bytes([byte]))
if result:
frames = result if isinstance(result, list) else [result]
for f in frames:
logger.frame(f)
# 4. AT command handling for --ack-ok
if args.ack_ok:
rx_buf.extend(data)
while b"\r" in rx_buf or b"\n" in rx_buf:
for sep in (b"\r", b"\n"):
idx = rx_buf.find(sep)
if idx != -1:
line_bytes = bytes(rx_buf[:idx])
del rx_buf[:idx + 1]
break
else:
break
line_str = line_bytes.decode("latin1", errors="ignore").strip().upper()
if line_str.startswith("AT") and not line_str.startswith("ATDT"):
try:
ser.write(b"\r\nOK\r\n")
ser.flush()
logger.info(f"AT ack: {line_str!r} → OK")
except Exception as exc:
logger.info(f"AT ack write failed: {exc}")
except KeyboardInterrupt:
logger.info("Ctrl+C — stopping.")
finally:
stop_evt.set()
try:
ser.close()
except Exception:
pass
ctrl_thread.join(timeout=1.0)
logger.info(f"Capture saved to: {out_dir}")
logger.close()
if __name__ == "__main__":
main()
+404
View File
@@ -1071,6 +1071,398 @@ class AnalyzerPanel(tk.Frame):
# ───────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────────────────────────────────────
# Serial Watch panel — tap the RS-232 line between device and modem
# ─────────────────────────────────────────────────────────────────────────────
try:
import serial as _serial
from serial.tools import list_ports as _list_ports
_SERIAL_OK = True
except ImportError:
_SERIAL_OK = False
from minimateplus.framing import S3FrameParser as _S3FrameParser # noqa: E402
_SW_KNOWN_SUBS = {
0xA4: "POLL_RSP", 0xA5: "BULK_WAVEFORM_RSP", 0xE0: "ADV_EVENT_RSP",
0xE1: "EVT_IDX_FIRST_RSP", 0xE3: "MONITOR_STATUS_RSP", 0xEA: "SERIAL_NUM_RSP",
0xF3: "WAVEFORM_REC_RSP", 0xF5: "WAVEFORM_HDR_RSP", 0xF7: "EVENT_INDEX_RSP",
0xF9: "UNK_06_RSP", 0xFE: "DEVICE_INFO_RSP",
0x69: "START_MON_ACK", 0x68: "STOP_MON_ACK",
}
class SerialWatchPanel(tk.Frame):
"""
Tap the RS-232 line between the MiniMate Plus and its modem (RV50/RV55).
Runs the serial reader in a background thread; surfaces parsed S3 frames
live in the log view. Writes raw_s3_<ts>.bin compatible with Analyzer.
Typical use for call-home capture:
1. Connect a USB-to-serial tap to the RS-232 line.
2. Pick that COM port here, click Start.
3. Wait for the unit to trigger / call home.
4. Click Stop, then 'Open in Analyzer' to inspect the frames.
"""
_COL_FRAME = "#4ec9b0" # teal — parsed S3 frame
_COL_CTRL = "#dcdcaa" # yellow — control-line change
_COL_AT = "#9cdcfe" # blue — AT command / ASCII noise
_COL_ERR = "#f44747" # red — error
def __init__(self, parent: tk.Widget, on_capture_ready=None, **kw):
"""
on_capture_ready(raw_s3_path: str) — called when capture stops,
so the parent can inject the file into the Analyzer.
"""
super().__init__(parent, bg=BG2, **kw)
self._on_capture_ready = on_capture_ready
self._serial: Optional[object] = None # serial.Serial instance
self._reader_thread: Optional[threading.Thread] = None
self._stop_evt = threading.Event()
self._log_q: queue.Queue[tuple[str, str]] = queue.Queue() # (text, colour)
self._raw_fh = None # open binary file handle
self._raw_path: Optional[str] = None
self._frame_count = 0
self._build()
self._poll_log_queue()
# ── build ─────────────────────────────────────────────────────────────
def _build(self) -> None:
pad = {"padx": 6, "pady": 4}
cfg = tk.Frame(self, bg=BG2)
cfg.pack(side=tk.TOP, fill=tk.X, padx=4, pady=4)
# Row 0 — port picker
tk.Label(cfg, text="COM port:", bg=BG2, fg=FG, font=MONO
).grid(row=0, column=0, sticky="e", **pad)
self._port_var = tk.StringVar()
self._port_cb = ttk.Combobox(cfg, textvariable=self._port_var,
width=12, font=MONO, state="normal")
self._port_cb.grid(row=0, column=1, sticky="w", **pad)
tk.Button(cfg, text="", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=self._refresh_ports
).grid(row=0, column=2, **pad)
tk.Label(cfg, text=" Baud:", bg=BG2, fg=FG, font=MONO
).grid(row=0, column=3, sticky="e", **pad)
self._baud_var = tk.StringVar(value="38400")
tk.Entry(cfg, textvariable=self._baud_var, width=8,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO
).grid(row=0, column=4, sticky="w", **pad)
self._ack_ok_var = tk.BooleanVar(value=False)
tk.Checkbutton(cfg, text="Ack OK to AT commands",
variable=self._ack_ok_var,
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO).grid(row=0, column=5, sticky="w", **pad)
# Row 1 — capture dir
tk.Label(cfg, text="Save to:", bg=BG2, fg=FG, font=MONO
).grid(row=1, column=0, sticky="e", **pad)
self._dir_var = tk.StringVar(
value=str(SCRIPT_DIR / "bridges" / "captures"))
tk.Entry(cfg, textvariable=self._dir_var, width=40,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO
).grid(row=1, column=1, columnspan=4, sticky="we", **pad)
tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat",
cursor="hand2", font=MONO, command=self._choose_dir
).grid(row=1, column=5, **pad)
# Button row
btn_row = tk.Frame(self, bg=BG2)
btn_row.pack(side=tk.TOP, fill=tk.X, padx=4, pady=2)
self._start_btn = tk.Button(
btn_row, text="Start Watch", bg=GREEN, fg="#000000",
relief="flat", padx=12, cursor="hand2", font=MONO_B,
command=self._start)
self._start_btn.pack(side=tk.LEFT, padx=6)
self._stop_btn = tk.Button(
btn_row, text="Stop", bg=BG3, fg=FG,
relief="flat", padx=12, cursor="hand2", font=MONO,
command=self._stop, state="disabled")
self._stop_btn.pack(side=tk.LEFT, padx=4)
self._analyzer_btn = tk.Button(
btn_row, text="Open in Analyzer", bg=BG3, fg=FG,
relief="flat", padx=10, cursor="hand2", font=MONO,
command=self._send_to_analyzer, state="disabled")
self._analyzer_btn.pack(side=tk.LEFT, padx=4)
tk.Button(btn_row, text="Clear", bg=BG3, fg=FG,
relief="flat", padx=8, cursor="hand2", font=MONO,
command=self._clear_log).pack(side=tk.LEFT, padx=4)
self._status_var = tk.StringVar(value="Idle")
tk.Label(btn_row, textvariable=self._status_var,
bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=10)
# Log view
self._log = scrolledtext.ScrolledText(
self, height=24, font=MONO_SM,
bg=BG, fg=FG, insertbackground=FG,
relief="flat", state="disabled",
)
self._log.pack(fill=tk.BOTH, expand=True, padx=4, pady=4)
self._log.tag_config("frame", foreground=self._COL_FRAME)
self._log.tag_config("ctrl", foreground=self._COL_CTRL)
self._log.tag_config("at", foreground=self._COL_AT)
self._log.tag_config("err", foreground=self._COL_ERR)
self._log.tag_config("dim", foreground=FG_DIM)
# Populate ports on first load
self._refresh_ports()
# ── port helpers ──────────────────────────────────────────────────────
def _refresh_ports(self) -> None:
if not _SERIAL_OK:
self._port_cb["values"] = ["(pyserial not installed)"]
return
ports = [p.device for p in _list_ports.comports()]
self._port_cb["values"] = ports
if ports and not self._port_var.get():
self._port_var.set(ports[0])
def _choose_dir(self) -> None:
d = filedialog.askdirectory(initialdir=self._dir_var.get())
if d:
self._dir_var.set(d)
# ── start / stop ──────────────────────────────────────────────────────
def _start(self) -> None:
if not _SERIAL_OK:
messagebox.showerror(
"pyserial missing",
"Install pyserial first:\n pip install pyserial")
return
port = self._port_var.get().strip()
if not port or "not installed" in port:
messagebox.showerror("Error", "Select a valid COM port first.")
return
try:
baud = int(self._baud_var.get().strip())
except ValueError:
messagebox.showerror("Error", "Invalid baud rate.")
return
# Open output files
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
out_dir = Path(self._dir_var.get()) / f"serial_{ts}"
out_dir.mkdir(parents=True, exist_ok=True)
self._raw_path = str(out_dir / f"raw_s3_{ts}.bin")
try:
self._raw_fh = open(self._raw_path, "wb")
except OSError as exc:
messagebox.showerror("Error", f"Cannot open capture file:\n{exc}")
return
# Open serial port
try:
ser = _serial.Serial(
port=port, baudrate=baud,
bytesize=8, parity=_serial.PARITY_NONE,
stopbits=_serial.STOPBITS_ONE,
timeout=0.05, write_timeout=0,
)
ser.setDTR(True)
ser.setRTS(True)
except Exception as exc:
self._raw_fh.close()
self._raw_fh = None
messagebox.showerror("Error", f"Cannot open {port}:\n{exc}")
return
self._serial = ser
self._stop_evt.clear()
self._frame_count = 0
self._analyzer_btn.configure(state="disabled")
self._reader_thread = threading.Thread(
target=self._reader_loop,
args=(ser, baud),
daemon=True,
)
self._reader_thread.start()
self._status_var.set(f"Watching {port} @ {baud}")
self._start_btn.configure(state="disabled")
self._stop_btn.configure(state="normal", bg=RED)
self._append(f"── Serial watch started {port} @ {baud} [{ts}] ──\n", "dim")
self._append(f" Capture: {self._raw_path}\n", "dim")
self._append(" Waiting for data…\n\n", "dim")
def _stop(self) -> None:
self._stop_evt.set()
if self._serial:
try:
self._serial.close()
except Exception:
pass
self._serial = None
if self._raw_fh:
self._raw_fh.close()
self._raw_fh = None
self._status_var.set("Stopped")
self._start_btn.configure(state="normal")
self._stop_btn.configure(state="disabled", bg=BG3)
if self._raw_path and Path(self._raw_path).exists():
self._analyzer_btn.configure(state="normal")
self._append("\n── Watch stopped ──\n", "dim")
# ── reader thread ─────────────────────────────────────────────────────
def _reader_loop(self, ser, baud: int) -> None:
parser = _S3FrameParser()
rx_buf = bytearray()
ack_ok = self._ack_ok_var.get()
# Monitor control lines in a sub-thread
ctrl_stop = threading.Event()
ctrl_thread = threading.Thread(
target=self._ctrl_loop, args=(ser, ctrl_stop), daemon=True)
ctrl_thread.start()
try:
while not self._stop_evt.is_set():
try:
data = ser.read(4096)
except Exception as exc:
self._log_q.put((f"Read error: {exc}\n", "err"))
break
if not data:
continue
# Save raw bytes
if self._raw_fh:
try:
self._raw_fh.write(data)
self._raw_fh.flush()
except Exception:
pass
# Parse S3 frames
for byte in data:
result = parser.feed(bytes([byte]))
if result:
frames = result if isinstance(result, list) else [result]
for f in frames:
self._frame_count += 1
name = _SW_KNOWN_SUBS.get(f.sub, f"UNK_0x{f.sub:02X}")
chk = "" if f.checksum_valid else "✗ BAD_CHK"
peek = f.data[:32].hex() + ("" if len(f.data) > 32 else "")
msg = (
f"[{self._frame_count:04d}] "
f"SUB=0x{f.sub:02X} ({name:<22}) "
f"page=0x{f.page_key:04X} "
f"data={len(f.data):4d}B {chk}\n"
f" {peek}\n"
)
self._log_q.put((msg, "frame"))
# AT command handling for --ack-ok mode
if ack_ok:
rx_buf.extend(data)
while b"\r" in rx_buf or b"\n" in rx_buf:
for sep in (b"\r", b"\n"):
idx = rx_buf.find(sep)
if idx != -1:
line_bytes = bytes(rx_buf[:idx])
del rx_buf[:idx + 1]
break
else:
break
line_str = line_bytes.decode("latin1", errors="ignore").strip()
if line_str.upper().startswith("AT"):
self._log_q.put((f"AT: {line_str!r}\n", "at"))
if not line_str.upper().startswith("ATDT"):
try:
ser.write(b"\r\nOK\r\n")
ser.flush()
self._log_q.put((f" → OK\n", "at"))
except Exception:
pass
finally:
ctrl_stop.set()
ctrl_thread.join(timeout=0.5)
# Signal the main thread that the reader ended naturally
if not self._stop_evt.is_set():
self._log_q.put(("<<done>>", ""))
def _ctrl_loop(self, ser, stop: threading.Event) -> None:
prev = {}
try:
prev = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd)
try:
prev["RI"] = ser.ri
except Exception:
prev["RI"] = None
except Exception:
return
while not stop.is_set():
try:
cur = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd, RI=None)
try:
cur["RI"] = ser.ri
except Exception:
pass
for name, val in cur.items():
if val != prev.get(name):
self._log_q.put((f"CTRL {name}{val}\n", "ctrl"))
prev[name] = val
except Exception:
break
stop.wait(0.2)
# ── log view ──────────────────────────────────────────────────────────
def _poll_log_queue(self) -> None:
try:
while True:
text, tag = self._log_q.get_nowait()
if text == "<<done>>":
self._stop()
break
self._append(text, tag)
except queue.Empty:
pass
finally:
self.after(80, self._poll_log_queue)
def _append(self, text: str, tag: str = "") -> None:
self._log.configure(state="normal")
if tag:
self._log.insert(tk.END, text, tag)
else:
self._log.insert(tk.END, text)
self._log.see(tk.END)
self._log.configure(state="disabled")
def _clear_log(self) -> None:
self._log.configure(state="normal")
self._log.delete("1.0", tk.END)
self._log.configure(state="disabled")
# ── send to analyzer ──────────────────────────────────────────────────
def _send_to_analyzer(self) -> None:
if self._raw_path and self._on_capture_ready:
self._on_capture_ready(self._raw_path)
# Console panel (tk.Frame — lives inside a notebook tab) # Console panel (tk.Frame — lives inside a notebook tab)
# ───────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────────────────
@@ -1504,6 +1896,12 @@ class SeismoLab(tk.Tk):
) )
nb.add(self._console_panel, text=" Console ") nb.add(self._console_panel, text=" Console ")
self._serial_watch_panel = SerialWatchPanel(
nb,
on_capture_ready=self._on_serial_capture_ready,
)
nb.add(self._serial_watch_panel, text=" Serial Watch ")
self._nb = nb self._nb = nb
self.protocol("WM_DELETE_WINDOW", self._on_close) self.protocol("WM_DELETE_WINDOW", self._on_close)
@@ -1522,8 +1920,14 @@ class SeismoLab(tk.Tk):
self._analyzer_panel.s3_var.set(raw_s3_path) self._analyzer_panel.s3_var.set(raw_s3_path)
self._nb.select(1) self._nb.select(1)
def _on_serial_capture_ready(self, raw_s3_path: str) -> None:
"""Serial Watch capture finished → inject into Analyzer and switch tab."""
self._analyzer_panel.s3_var.set(raw_s3_path)
self._nb.select(1)
def _on_close(self) -> None: def _on_close(self) -> None:
self._bridge_panel.stop_bridge() self._bridge_panel.stop_bridge()
self._serial_watch_panel._stop()
self.destroy() self.destroy()