""" transport.py — Serial and TCP transport layer for the MiniMate Plus protocol. Provides a thin I/O abstraction so that protocol.py never imports pyserial or socket directly. Two concrete implementations: SerialTransport — direct RS-232 cable connection (pyserial) TcpTransport — TCP socket to a modem or ACH relay (stdlib socket) The MiniMate Plus protocol bytes are identical over both transports. TCP is used when field units call home via the ACH (Auto Call Home) server, or when SFM "calls up" a unit by connecting to the modem's IP address directly. Field hardware: Sierra Wireless RV55 / RX55 (4G LTE) cellular modem, replacing the older 3G-only Raven X (now decommissioned). All run ALEOS firmware with an ACEmanager web UI. Serial port must be configured 38400,8N1, no flow control, Data Forwarding Timeout = 1 s. Typical usage: from minimateplus.transport import SerialTransport, TcpTransport # Direct serial connection with SerialTransport("COM5") as t: t.write(frame_bytes) # Modem / ACH TCP connection (Blastware port 12345) with TcpTransport("192.168.1.50", 12345) as t: t.write(frame_bytes) """ from __future__ import annotations import socket import time from abc import ABC, abstractmethod from typing import Optional # pyserial is the only non-stdlib dependency in this project. # Import lazily so unit-tests that mock the transport can run without it. try: import serial # type: ignore except ImportError: # pragma: no cover serial = None # type: ignore # ── Abstract base ───────────────────────────────────────────────────────────── class BaseTransport(ABC): """Common interface for all transport implementations.""" @abstractmethod def connect(self) -> None: """Open the underlying connection.""" @abstractmethod def disconnect(self) -> None: """Close the underlying connection.""" @property @abstractmethod def is_connected(self) -> bool: """True while the connection is open.""" @abstractmethod def write(self, data: bytes) -> None: """Write *data* bytes to the wire.""" @abstractmethod def read(self, n: int) -> bytes: """ Read up to *n* bytes. Returns immediately with whatever is available (may return fewer than *n* bytes, or b"" if nothing is ready). """ # ── Context manager ─────────────────────────────────────────────────────── def __enter__(self) -> "BaseTransport": self.connect() return self def __exit__(self, *_) -> None: self.disconnect() # ── Higher-level read helpers ───────────────────────────────────────────── def read_until_idle( self, timeout: float = 2.0, idle_gap: float = 0.05, chunk: int = 256, ) -> bytes: """ Read bytes until the line goes quiet. Keeps reading in *chunk*-sized bursts. Returns when either: - *timeout* seconds have elapsed since the first byte arrived, or - *idle_gap* seconds pass with no new bytes (line went quiet). This mirrors how Blastware behaves: it waits for the seismograph to stop transmitting rather than counting bytes. Args: timeout: Hard deadline (seconds) from the moment read starts. idle_gap: How long to wait after the last byte before declaring done. chunk: How many bytes to request per low-level read() call. Returns: All bytes received as a single bytes object (may be b"" if nothing arrived within *timeout*). """ buf = bytearray() deadline = time.monotonic() + timeout last_rx = None while time.monotonic() < deadline: got = self.read(chunk) if got: buf.extend(got) last_rx = time.monotonic() else: # Nothing ready — check idle gap if last_rx is not None and (time.monotonic() - last_rx) >= idle_gap: break time.sleep(0.005) return bytes(buf) def read_exact(self, n: int, timeout: float = 2.0) -> bytes: """ Read exactly *n* bytes or raise TimeoutError. Useful when the caller already knows the expected response length (e.g. fixed-size ACK packets). """ buf = bytearray() deadline = time.monotonic() + timeout while len(buf) < n: if time.monotonic() >= deadline: raise TimeoutError( f"read_exact: wanted {n} bytes, got {len(buf)} " f"after {timeout:.1f}s" ) got = self.read(n - len(buf)) if got: buf.extend(got) else: time.sleep(0.005) return bytes(buf) # ── Serial transport ────────────────────────────────────────────────────────── # Default baud rate confirmed from Blastware / MiniMate Plus documentation. DEFAULT_BAUD = 38_400 # pyserial serial port config matching the MiniMate Plus RS-232 spec: # 8 data bits, no parity, 1 stop bit (8N1). _SERIAL_BYTESIZE = 8 # serial.EIGHTBITS _SERIAL_PARITY = "N" # serial.PARITY_NONE _SERIAL_STOPBITS = 1 # serial.STOPBITS_ONE class SerialTransport(BaseTransport): """ pyserial-backed transport for a direct RS-232 cable connection. The port is opened with a very short read timeout (10 ms) so that read() returns quickly and the caller can implement its own framing / timeout logic without blocking the whole process. Args: port: COM port name (e.g. "COM5" on Windows, "/dev/ttyUSB0" on Linux). baud: Baud rate (default 38400). rts_cts: Enable RTS/CTS hardware flow control (default False — MiniMate typically uses no flow control). """ # Internal read timeout (seconds). Short so read() is non-blocking in practice. _READ_TIMEOUT = 0.01 def __init__( self, port: str, baud: int = DEFAULT_BAUD, rts_cts: bool = False, ) -> None: if serial is None: raise ImportError( "pyserial is required for SerialTransport. " "Install it with: pip install pyserial" ) self.port = port self.baud = baud self.rts_cts = rts_cts self._ser: Optional[serial.Serial] = None # ── BaseTransport interface ─────────────────────────────────────────────── def connect(self) -> None: """Open the serial port. Raises serial.SerialException on failure.""" if self._ser and self._ser.is_open: return # Already open — idempotent self._ser = serial.Serial( port = self.port, baudrate = self.baud, bytesize = _SERIAL_BYTESIZE, parity = _SERIAL_PARITY, stopbits = _SERIAL_STOPBITS, timeout = self._READ_TIMEOUT, rtscts = self.rts_cts, xonxoff = False, dsrdtr = False, ) # Flush any stale bytes left in device / OS buffers from a previous session self._ser.reset_input_buffer() self._ser.reset_output_buffer() def disconnect(self) -> None: """Close the serial port. Safe to call even if already closed.""" if self._ser: try: self._ser.close() except Exception: pass self._ser = None @property def is_connected(self) -> bool: return bool(self._ser and self._ser.is_open) def write(self, data: bytes) -> None: """ Write *data* to the serial port. Raises: RuntimeError: if not connected. serial.SerialException: on I/O error. """ if not self.is_connected: raise RuntimeError("SerialTransport.write: not connected") self._ser.write(data) # type: ignore[union-attr] self._ser.flush() # type: ignore[union-attr] def read(self, n: int) -> bytes: """ Read up to *n* bytes from the serial port. Returns b"" immediately if no data is available (non-blocking in practice thanks to the 10 ms read timeout). Raises: RuntimeError: if not connected. """ if not self.is_connected: raise RuntimeError("SerialTransport.read: not connected") return self._ser.read(n) # type: ignore[union-attr] # ── Extras ──────────────────────────────────────────────────────────────── def flush_input(self) -> None: """Discard any unread bytes in the OS receive buffer.""" if self.is_connected: self._ser.reset_input_buffer() # type: ignore[union-attr] def __repr__(self) -> str: state = "open" if self.is_connected else "closed" return f"SerialTransport({self.port!r}, baud={self.baud}, {state})" # ── TCP transport ───────────────────────────────────────────────────────────── # Default TCP port for Blastware modem communications / ACH relay. # Confirmed from field setup: Blastware → Communication Setup → TCP/IP uses 12345. DEFAULT_TCP_PORT = 12345 class TcpTransport(BaseTransport): """ TCP socket transport for MiniMate Plus units in the field. The protocol bytes over TCP are identical to RS-232 — TCP is simply a different physical layer. The modem (Sierra Wireless RV55 / RX55, or older Raven X) bridges the unit's RS-232 serial port to a TCP socket transparently. No application-layer handshake or framing is added. Two usage scenarios: "Call up" (outbound): SFM connects to the unit's modem IP directly. TcpTransport(host="203.0.113.5", port=12345) "Call home" / ACH relay: The unit has already dialled in to the office ACH server, which bridged the modem to a TCP socket. In this case the host/port identifies the relay's listening socket, not the modem. (ACH inbound mode is handled by a separate AchServer — not this class.) IMPORTANT — modem data forwarding delay: Sierra Wireless (and Raven) modems buffer RS-232 bytes for up to 1 second before forwarding them as a TCP segment ("Data Forwarding Timeout" in ACEmanager). read_until_idle() is overridden to use idle_gap=1.5 s rather than the serial default of 0.05 s — without this, the parser would declare a frame complete mid-stream during the modem's buffering pause. Args: host: IP address or hostname of the modem / ACH relay. port: TCP port number (default 12345). connect_timeout: Seconds to wait for the TCP handshake (default 10.0). """ # Internal recv timeout — short so read() returns promptly if no data. _RECV_TIMEOUT = 0.01 def __init__( self, host: str, port: int = DEFAULT_TCP_PORT, connect_timeout: float = 10.0, ) -> None: self.host = host self.port = port self.connect_timeout = connect_timeout self._sock: Optional[socket.socket] = None # ── BaseTransport interface ─────────────────────────────────────────────── def connect(self) -> None: """ Open a TCP connection to host:port. Idempotent — does nothing if already connected. Raises: OSError / socket.timeout: if the connection cannot be established. """ if self._sock is not None: return # Already connected — idempotent sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.connect_timeout) sock.connect((self.host, self.port)) # Switch to short timeout so read() is non-blocking in practice sock.settimeout(self._RECV_TIMEOUT) self._sock = sock def disconnect(self) -> None: """Close the TCP socket. Safe to call even if already closed.""" if self._sock: try: self._sock.shutdown(socket.SHUT_RDWR) except OSError: pass try: self._sock.close() except OSError: pass self._sock = None @property def is_connected(self) -> bool: return self._sock is not None def write(self, data: bytes) -> None: """ Send all bytes to the peer. Raises: RuntimeError: if not connected. OSError: on network I/O error. """ if not self.is_connected: raise RuntimeError("TcpTransport.write: not connected") self._sock.sendall(data) # type: ignore[union-attr] def read(self, n: int) -> bytes: """ Read up to *n* bytes from the socket. Returns b"" immediately if no data is available (non-blocking in practice thanks to the short socket timeout). Raises: RuntimeError: if not connected. """ if not self.is_connected: raise RuntimeError("TcpTransport.read: not connected") try: return self._sock.recv(n) # type: ignore[union-attr] except socket.timeout: return b"" def read_until_idle( self, timeout: float = 2.0, idle_gap: float = 1.5, chunk: int = 256, ) -> bytes: """ TCP-aware version of read_until_idle. Overrides the BaseTransport default to use a much longer idle_gap (1.5 s vs 0.05 s for serial). This is necessary because the Raven modem (and similar cellular modems) buffer serial-port bytes for up to 1 second before forwarding them over TCP ("Data Forwarding Timeout" setting). If read_until_idle returned after a 50 ms quiet period, it would trigger mid-frame when the modem is still accumulating bytes — causing frame parse failures on every call. Args: timeout: Hard deadline from first byte (default 2.0 s — callers typically pass a longer value for S3 frames). idle_gap: Quiet-line threshold (default 1.5 s to survive modem buffering). Pass a smaller value only if you are connecting directly to a unit's Ethernet port with no modem buffering in the path. chunk: Bytes per low-level recv() call. """ return super().read_until_idle(timeout=timeout, idle_gap=idle_gap, chunk=chunk) def __repr__(self) -> str: state = "connected" if self.is_connected else "disconnected" return f"TcpTransport({self.host!r}, port={self.port}, {state})"