Files
seismo-relay/minimateplus/transport.py
T
Claude 625b0a4dfc feat(seismo_lab): add Download tab that captures wire bytes during event download
Adds a new CapturingTransport wrapper in minimateplus.transport that mirrors
every TX/RX byte to two raw .bin files using the same on-wire format as
bridges/ach_mitm.py, so the resulting captures are byte-for-byte compatible
with the existing Blastware MITM captures and load directly in the Analyzer.

A new "Download" tab in seismo_lab.py lets the user connect to a device over
TCP or serial and run connect / list-keys / download-events while the wrapper
saves raw_bw_<ts>.bin (our TX) and raw_s3_<ts>.bin (device TX) into a
seismo_dl_<ts>[_<label>]/ session directory. On completion, the panel hands
both files to the Analyzer and switches tabs, mirroring the UX of the
existing Bridge capture flow.
2026-05-01 00:12:02 +00:00

556 lines
20 KiB
Python

"""
transport.py — Serial and TCP transport layer for the MiniMate Plus protocol.
Provides a thin I/O abstraction so that protocol.py never imports pyserial or
socket directly. Two concrete implementations:
SerialTransport — direct RS-232 cable connection (pyserial)
TcpTransport — TCP socket to a modem or ACH relay (stdlib socket)
The MiniMate Plus protocol bytes are identical over both transports. TCP is used
when field units call home via the ACH (Auto Call Home) server, or when SFM
"calls up" a unit by connecting to the modem's IP address directly.
Field hardware: Sierra Wireless RV55 / RX55 (4G LTE) cellular modem, replacing
the older 3G-only Raven X (now decommissioned). All run ALEOS firmware with an
ACEmanager web UI. Serial port must be configured 38400,8N1, no flow control,
Data Forwarding Timeout = 1 s.
Typical usage:
from minimateplus.transport import SerialTransport, TcpTransport
# Direct serial connection
with SerialTransport("COM5") as t:
t.write(frame_bytes)
# Modem / ACH TCP connection (Blastware port 12345)
with TcpTransport("192.168.1.50", 12345) as t:
t.write(frame_bytes)
"""
from __future__ import annotations
import socket
import time
from abc import ABC, abstractmethod
from typing import Optional
# pyserial is the only non-stdlib dependency in this project.
# Import lazily so unit-tests that mock the transport can run without it.
try:
import serial # type: ignore
except ImportError: # pragma: no cover
serial = None # type: ignore
# ── Abstract base ─────────────────────────────────────────────────────────────
class BaseTransport(ABC):
"""Common interface for all transport implementations."""
@abstractmethod
def connect(self) -> None:
"""Open the underlying connection."""
@abstractmethod
def disconnect(self) -> None:
"""Close the underlying connection."""
@property
@abstractmethod
def is_connected(self) -> bool:
"""True while the connection is open."""
@abstractmethod
def write(self, data: bytes) -> None:
"""Write *data* bytes to the wire."""
@abstractmethod
def read(self, n: int) -> bytes:
"""
Read up to *n* bytes. Returns immediately with whatever is available
(may return fewer than *n* bytes, or b"" if nothing is ready).
"""
# ── Context manager ───────────────────────────────────────────────────────
def __enter__(self) -> "BaseTransport":
self.connect()
return self
def __exit__(self, *_) -> None:
self.disconnect()
# ── Higher-level read helpers ─────────────────────────────────────────────
def read_until_idle(
self,
timeout: float = 2.0,
idle_gap: float = 0.05,
chunk: int = 256,
) -> bytes:
"""
Read bytes until the line goes quiet.
Keeps reading in *chunk*-sized bursts. Returns when either:
- *timeout* seconds have elapsed since the first byte arrived, or
- *idle_gap* seconds pass with no new bytes (line went quiet).
This mirrors how Blastware behaves: it waits for the seismograph to
stop transmitting rather than counting bytes.
Args:
timeout: Hard deadline (seconds) from the moment read starts.
idle_gap: How long to wait after the last byte before declaring done.
chunk: How many bytes to request per low-level read() call.
Returns:
All bytes received as a single bytes object (may be b"" if nothing
arrived within *timeout*).
"""
buf = bytearray()
deadline = time.monotonic() + timeout
last_rx = None
while time.monotonic() < deadline:
got = self.read(chunk)
if got:
buf.extend(got)
last_rx = time.monotonic()
else:
# Nothing ready — check idle gap
if last_rx is not None and (time.monotonic() - last_rx) >= idle_gap:
break
time.sleep(0.005)
return bytes(buf)
def read_exact(self, n: int, timeout: float = 2.0) -> bytes:
"""
Read exactly *n* bytes or raise TimeoutError.
Useful when the caller already knows the expected response length
(e.g. fixed-size ACK packets).
"""
buf = bytearray()
deadline = time.monotonic() + timeout
while len(buf) < n:
if time.monotonic() >= deadline:
raise TimeoutError(
f"read_exact: wanted {n} bytes, got {len(buf)} "
f"after {timeout:.1f}s"
)
got = self.read(n - len(buf))
if got:
buf.extend(got)
else:
time.sleep(0.005)
return bytes(buf)
# ── Serial transport ──────────────────────────────────────────────────────────
# Default baud rate confirmed from Blastware / MiniMate Plus documentation.
DEFAULT_BAUD = 38_400
# pyserial serial port config matching the MiniMate Plus RS-232 spec:
# 8 data bits, no parity, 1 stop bit (8N1).
_SERIAL_BYTESIZE = 8 # serial.EIGHTBITS
_SERIAL_PARITY = "N" # serial.PARITY_NONE
_SERIAL_STOPBITS = 1 # serial.STOPBITS_ONE
class SerialTransport(BaseTransport):
"""
pyserial-backed transport for a direct RS-232 cable connection.
The port is opened with a very short read timeout (10 ms) so that
read() returns quickly and the caller can implement its own framing /
timeout logic without blocking the whole process.
Args:
port: COM port name (e.g. "COM5" on Windows, "/dev/ttyUSB0" on Linux).
baud: Baud rate (default 38400).
rts_cts: Enable RTS/CTS hardware flow control (default False — MiniMate
typically uses no flow control).
"""
# Internal read timeout (seconds). Short so read() is non-blocking in practice.
_READ_TIMEOUT = 0.01
def __init__(
self,
port: str,
baud: int = DEFAULT_BAUD,
rts_cts: bool = False,
) -> None:
if serial is None:
raise ImportError(
"pyserial is required for SerialTransport. "
"Install it with: pip install pyserial"
)
self.port = port
self.baud = baud
self.rts_cts = rts_cts
self._ser: Optional[serial.Serial] = None
# ── BaseTransport interface ───────────────────────────────────────────────
def connect(self) -> None:
"""Open the serial port. Raises serial.SerialException on failure."""
if self._ser and self._ser.is_open:
return # Already open — idempotent
self._ser = serial.Serial(
port = self.port,
baudrate = self.baud,
bytesize = _SERIAL_BYTESIZE,
parity = _SERIAL_PARITY,
stopbits = _SERIAL_STOPBITS,
timeout = self._READ_TIMEOUT,
rtscts = self.rts_cts,
xonxoff = False,
dsrdtr = False,
)
# Flush any stale bytes left in device / OS buffers from a previous session
self._ser.reset_input_buffer()
self._ser.reset_output_buffer()
def disconnect(self) -> None:
"""Close the serial port. Safe to call even if already closed."""
if self._ser:
try:
self._ser.close()
except Exception:
pass
self._ser = None
@property
def is_connected(self) -> bool:
return bool(self._ser and self._ser.is_open)
def write(self, data: bytes) -> None:
"""
Write *data* to the serial port.
Raises:
RuntimeError: if not connected.
serial.SerialException: on I/O error.
"""
if not self.is_connected:
raise RuntimeError("SerialTransport.write: not connected")
self._ser.write(data) # type: ignore[union-attr]
self._ser.flush() # type: ignore[union-attr]
def read(self, n: int) -> bytes:
"""
Read up to *n* bytes from the serial port.
Returns b"" immediately if no data is available (non-blocking in
practice thanks to the 10 ms read timeout).
Raises:
RuntimeError: if not connected.
"""
if not self.is_connected:
raise RuntimeError("SerialTransport.read: not connected")
return self._ser.read(n) # type: ignore[union-attr]
# ── Extras ────────────────────────────────────────────────────────────────
def flush_input(self) -> None:
"""Discard any unread bytes in the OS receive buffer."""
if self.is_connected:
self._ser.reset_input_buffer() # type: ignore[union-attr]
def __repr__(self) -> str:
state = "open" if self.is_connected else "closed"
return f"SerialTransport({self.port!r}, baud={self.baud}, {state})"
# ── TCP transport ─────────────────────────────────────────────────────────────
# Default TCP port for Blastware modem communications / ACH relay.
# Confirmed from field setup: Blastware → Communication Setup → TCP/IP uses 12345.
DEFAULT_TCP_PORT = 12345
class TcpTransport(BaseTransport):
"""
TCP socket transport for MiniMate Plus units in the field.
The protocol bytes over TCP are identical to RS-232 — TCP is simply a
different physical layer. The modem (Sierra Wireless RV55 / RX55, or older
Raven X) bridges the unit's RS-232 serial port to a TCP socket transparently.
No application-layer handshake or framing is added.
Two usage scenarios:
"Call up" (outbound): SFM connects to the unit's modem IP directly.
TcpTransport(host="203.0.113.5", port=12345)
"Call home" / ACH relay: The unit has already dialled in to the office
ACH server, which bridged the modem to a TCP socket. In this case
the host/port identifies the relay's listening socket, not the modem.
(ACH inbound mode is handled by a separate AchServer — not this class.)
IMPORTANT — modem data forwarding delay:
Sierra Wireless (and Raven) modems buffer RS-232 bytes for up to 1 second
before forwarding them as a TCP segment ("Data Forwarding Timeout" in
ACEmanager). read_until_idle() is overridden to use idle_gap=1.5 s rather
than the serial default of 0.05 s — without this, the parser would declare
a frame complete mid-stream during the modem's buffering pause.
Args:
host: IP address or hostname of the modem / ACH relay.
port: TCP port number (default 12345).
connect_timeout: Seconds to wait for the TCP handshake (default 10.0).
"""
# Internal recv timeout — short so read() returns promptly if no data.
_RECV_TIMEOUT = 0.01
def __init__(
self,
host: str,
port: int = DEFAULT_TCP_PORT,
connect_timeout: float = 10.0,
) -> None:
self.host = host
self.port = port
self.connect_timeout = connect_timeout
self._sock: Optional[socket.socket] = None
# ── BaseTransport interface ───────────────────────────────────────────────
def connect(self) -> None:
"""
Open a TCP connection to host:port.
Idempotent — does nothing if already connected.
Raises:
OSError / socket.timeout: if the connection cannot be established.
"""
if self._sock is not None:
return # Already connected — idempotent
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.connect_timeout)
sock.connect((self.host, self.port))
# Switch to short timeout so read() is non-blocking in practice
sock.settimeout(self._RECV_TIMEOUT)
self._sock = sock
def disconnect(self) -> None:
"""Close the TCP socket. Safe to call even if already closed."""
if self._sock:
try:
self._sock.shutdown(socket.SHUT_RDWR)
except OSError:
pass
try:
self._sock.close()
except OSError:
pass
self._sock = None
@property
def is_connected(self) -> bool:
return self._sock is not None
def write(self, data: bytes) -> None:
"""
Send all bytes to the peer.
Raises:
RuntimeError: if not connected.
OSError: on network I/O error.
"""
if not self.is_connected:
raise RuntimeError("TcpTransport.write: not connected")
self._sock.sendall(data) # type: ignore[union-attr]
def read(self, n: int) -> bytes:
"""
Read up to *n* bytes from the socket.
Returns b"" immediately if no data is available (non-blocking in
practice thanks to the short socket timeout).
Raises:
RuntimeError: if not connected.
"""
if not self.is_connected:
raise RuntimeError("TcpTransport.read: not connected")
try:
return self._sock.recv(n) # type: ignore[union-attr]
except socket.timeout:
return b""
def read_until_idle(
self,
timeout: float = 2.0,
idle_gap: float = 1.5,
chunk: int = 256,
) -> bytes:
"""
TCP-aware version of read_until_idle.
Overrides the BaseTransport default to use a much longer idle_gap (1.5 s
vs 0.05 s for serial). This is necessary because the Raven modem (and
similar cellular modems) buffer serial-port bytes for up to 1 second
before forwarding them over TCP ("Data Forwarding Timeout" setting).
If read_until_idle returned after a 50 ms quiet period, it would trigger
mid-frame when the modem is still accumulating bytes — causing frame
parse failures on every call.
Args:
timeout: Hard deadline from first byte (default 2.0 s — callers
typically pass a longer value for S3 frames).
idle_gap: Quiet-line threshold (default 1.5 s to survive modem
buffering). Pass a smaller value only if you are
connecting directly to a unit's Ethernet port with no
modem buffering in the path.
chunk: Bytes per low-level recv() call.
"""
return super().read_until_idle(timeout=timeout, idle_gap=idle_gap, chunk=chunk)
def __repr__(self) -> str:
state = "connected" if self.is_connected else "disconnected"
return f"TcpTransport({self.host!r}, port={self.port}, {state})"
# ── Inbound / accepted-socket transport ───────────────────────────────────────
class SocketTransport(TcpTransport):
"""
Like TcpTransport but wraps an already-accepted inbound socket.
Used by the ACH inbound server (bridges/ach_server.py) — the device dials
IN to us, so by the time we create this transport the socket is already live.
connect() is a no-op; everything else (read, write, read_until_idle, …) is
inherited unchanged from TcpTransport.
Args:
sock: An already-connected socket.socket returned by server_socket.accept().
peer: Human-readable peer label for repr / logging (e.g. "203.0.113.5:54321").
"""
def __init__(self, sock: socket.socket, peer: str = "inbound") -> None:
# Bypass TcpTransport.__init__ — we already have a live socket.
self.host = peer
self.port = 0
self.connect_timeout = 0.0
self._sock = sock
sock.settimeout(self._RECV_TIMEOUT)
def connect(self) -> None:
"""No-op — socket was already accepted inbound."""
pass # Already have a live socket; nothing to open.
@property
def is_connected(self) -> bool:
return self._sock is not None
def __repr__(self) -> str:
return f"SocketTransport(peer={self.host!r})"
# ── Capturing transport (MITM-style raw byte mirror) ──────────────────────────
class CapturingTransport(BaseTransport):
"""
Wraps another BaseTransport and mirrors every byte to two raw capture files:
raw_bw_<...>.bin — bytes WE wrote to the device (BW-side TX)
raw_s3_<...>.bin — bytes the device wrote back (S3-side TX)
The file naming and on-wire byte layout are identical to the captures
produced by `bridges/ach_mitm.py`, so the resulting `.bin` files can be
loaded directly by the Analyzer (File > Open Capture) and parsed by the
same tooling used for genuine Blastware MITM captures.
All BaseTransport methods are forwarded to the inner transport; the only
side-effect is that successful read/write byte streams are appended to the
two open binary files.
Args:
inner: An already-built BaseTransport (SerialTransport / TcpTransport).
bw_path: File path for the "BW TX" stream (bytes we send). Opened "wb".
s3_path: File path for the "S3 TX" stream (bytes the device sends).
Opened "wb".
Example:
with CapturingTransport(TcpTransport("1.2.3.4", 9034),
"raw_bw.bin", "raw_s3.bin") as t:
client = MiniMateClient(transport=t)
client.connect()
client.get_events()
# both .bin files now hold the full bidirectional capture.
"""
def __init__(self, inner: BaseTransport, bw_path: str, s3_path: str) -> None:
self._inner = inner
self._bw_path = bw_path
self._s3_path = s3_path
self._bw_fh = None
self._s3_fh = None
# Forward inner attrs so callers can introspect (e.g. .host, .port).
self.host = getattr(inner, "host", None)
self.port = getattr(inner, "port", None)
# ── BaseTransport interface ───────────────────────────────────────────────
def connect(self) -> None:
if self._bw_fh is None:
self._bw_fh = open(self._bw_path, "wb", buffering=0)
if self._s3_fh is None:
self._s3_fh = open(self._s3_path, "wb", buffering=0)
self._inner.connect()
def disconnect(self) -> None:
try:
self._inner.disconnect()
finally:
for fh_attr in ("_bw_fh", "_s3_fh"):
fh = getattr(self, fh_attr)
if fh is not None:
try:
fh.flush()
fh.close()
except Exception:
pass
setattr(self, fh_attr, None)
@property
def is_connected(self) -> bool:
return self._inner.is_connected
def write(self, data: bytes) -> None:
self._inner.write(data)
if data and self._bw_fh is not None:
try:
self._bw_fh.write(data)
except Exception:
pass
def read(self, n: int) -> bytes:
got = self._inner.read(n)
if got and self._s3_fh is not None:
try:
self._s3_fh.write(got)
except Exception:
pass
return got
@property
def bw_path(self) -> str:
return self._bw_path
@property
def s3_path(self) -> str:
return self._s3_path
def __repr__(self) -> str:
return f"CapturingTransport({self._inner!r}, bw={self._bw_path!r}, s3={self._s3_path!r})"