""" tcp_serial_bridge.py — Local TCP-to-serial bridge for bench testing TcpTransport. Listens on a TCP port and, when a client connects, opens a serial port and bridges bytes bidirectionally. This lets you test the SFM server's TCP endpoint (?host=127.0.0.1&tcp_port=12345) against a locally-attached MiniMate Plus without needing a field modem. The bridge simulates an RV55 cellular modem in transparent TCP passthrough mode: - No handshake bytes on connect - Raw bytes forwarded in both directions - One connection at a time (new connection closes any existing serial session) Usage: python bridges/tcp_serial_bridge.py --serial COM5 --tcp-port 12345 Then in another window: python -m uvicorn sfm.server:app --port 8200 curl "http://localhost:8200/device/info?host=127.0.0.1&tcp_port=12345" Or just hit http://localhost:8200/device/info?host=127.0.0.1&tcp_port=12345 in a browser. Requirements: pip install pyserial """ from __future__ import annotations import argparse import logging import select import socket import sys import threading import time try: import serial # type: ignore except ImportError: print("pyserial required: pip install pyserial", file=sys.stderr) sys.exit(1) logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-7s %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger("tcp_serial_bridge") # ── Constants ───────────────────────────────────────────────────────────────── DEFAULT_BAUD = 38_400 DEFAULT_TCP_PORT = 12345 CHUNK = 256 # bytes per read call SERIAL_TIMEOUT = 0.02 # serial read timeout (s) — non-blocking in practice TCP_TIMEOUT = 0.02 # socket recv timeout (s) BOOT_DELAY = 2.0 # seconds to wait after opening serial port before # forwarding data — mirrors the unit's startup beep # ── Bridge session ───────────────────────────────────────────────────────────── def _pipe_tcp_to_serial(sock: socket.socket, ser: serial.Serial, stop: threading.Event) -> None: """Forward bytes from TCP socket → serial port.""" sock.settimeout(TCP_TIMEOUT) while not stop.is_set(): try: data = sock.recv(CHUNK) if not data: log.info("TCP peer closed connection") stop.set() break log.debug("TCP→SER %d bytes: %s", len(data), data.hex()) ser.write(data) except socket.timeout: pass except OSError as exc: if not stop.is_set(): log.warning("TCP read error: %s", exc) stop.set() break def _pipe_serial_to_tcp(sock: socket.socket, ser: serial.Serial, stop: threading.Event) -> None: """Forward bytes from serial port → TCP socket.""" while not stop.is_set(): try: data = ser.read(CHUNK) if data: log.debug("SER→TCP %d bytes: %s", len(data), data.hex()) try: sock.sendall(data) except OSError as exc: if not stop.is_set(): log.warning("TCP send error: %s", exc) stop.set() break except serial.SerialException as exc: if not stop.is_set(): log.warning("Serial read error: %s", exc) stop.set() break def _run_session(conn: socket.socket, addr: tuple, serial_port: str, baud: int, boot_delay: float) -> None: """Handle one TCP client connection.""" peer = f"{addr[0]}:{addr[1]}" log.info("Connection from %s", peer) try: ser = serial.Serial( port = serial_port, baudrate = baud, bytesize = 8, parity = "N", stopbits = 1, timeout = SERIAL_TIMEOUT, ) except serial.SerialException as exc: log.error("Cannot open serial port %s: %s", serial_port, exc) conn.close() return log.info("Opened %s at %d baud — waiting %.1fs for unit boot", serial_port, baud, boot_delay) ser.reset_input_buffer() ser.reset_output_buffer() if boot_delay > 0: time.sleep(boot_delay) ser.reset_input_buffer() # discard any boot noise log.info("Bridge active: TCP %s ↔ %s", peer, serial_port) stop = threading.Event() t_tcp_to_ser = threading.Thread( target=_pipe_tcp_to_serial, args=(conn, ser, stop), daemon=True ) t_ser_to_tcp = threading.Thread( target=_pipe_serial_to_tcp, args=(conn, ser, stop), daemon=True ) t_tcp_to_ser.start() t_ser_to_tcp.start() stop.wait() # block until either thread sets the stop flag log.info("Session ended, cleaning up") try: conn.close() except OSError: pass try: ser.close() except OSError: pass t_tcp_to_ser.join(timeout=2.0) t_ser_to_tcp.join(timeout=2.0) log.info("Session with %s closed", peer) # ── Server ──────────────────────────────────────────────────────────────────── def run_bridge(serial_port: str, baud: int, tcp_port: int, boot_delay: float) -> None: """Accept TCP connections forever and bridge each one to the serial port.""" srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) srv.bind(("0.0.0.0", tcp_port)) srv.listen(1) log.info( "Listening on TCP :%d — will bridge to %s at %d baud", tcp_port, serial_port, baud, ) log.info("Send test: curl 'http://localhost:8200/device/info?host=127.0.0.1&tcp_port=%d'", tcp_port) try: while True: conn, addr = srv.accept() # Handle one session at a time (synchronous) — matches modem behaviour _run_session(conn, addr, serial_port, baud, boot_delay) except KeyboardInterrupt: log.info("Shutting down") finally: srv.close() # ── Entry point ──────────────────────────────────────────────────────────────── if __name__ == "__main__": ap = argparse.ArgumentParser(description="TCP-to-serial bridge for bench testing TcpTransport") ap.add_argument("--serial", default="COM5", help="Serial port (default: COM5)") ap.add_argument("--baud", type=int, default=DEFAULT_BAUD, help="Baud rate (default: 38400)") ap.add_argument("--tcp-port", type=int, default=DEFAULT_TCP_PORT, help="TCP listen port (default: 12345)") ap.add_argument("--boot-delay", type=float, default=BOOT_DELAY, help="Seconds to wait after opening serial before forwarding (default: 2.0). " "Set to 0 if unit is already powered on.") ap.add_argument("--debug", action="store_true", help="Show individual byte transfers") args = ap.parse_args() if args.debug: logging.getLogger().setLevel(logging.DEBUG) run_bridge(args.serial, args.baud, args.tcp_port, args.boot_delay)