#!/usr/bin/env python3 """ s3_bridge.py — S3 <-> Blastware serial bridge with frame-aware session logging Version: v0.4.0 Key features: - Low CPU: avoids per-byte console printing - Forwards bytes immediately (true bridge) - Frame-aware logging: buffers per direction until ETX (0x03), then logs full frame on one line - Also logs plain ASCII bursts (e.g., "Operating System") cleanly - Session log file created on start, closed on Ctrl+C Usage examples: python s3_bridge.py python s3_bridge.py --bw COM5 --s3 COM4 --baud 38400 python s3_bridge.py --quiet """ from __future__ import annotations import argparse import datetime as _dt import os import signal import sys import threading import time from typing import Optional import serial VERSION = "v0.4.0" def now_ts() -> str: # Local time with milliseconds, like [13:37:06.239] t = _dt.datetime.now() return t.strftime("%H:%M:%S.") + f"{int(t.microsecond/1000):03d}" def bytes_to_hex(b: bytes) -> str: return " ".join(f"{x:02X}" for x in b) def looks_like_text(b: bytes) -> bool: # Heuristic: mostly printable ASCII plus spaces if not b: return False printable = 0 for x in b: if x in (9, 10, 13): # \t \n \r printable += 1 elif 32 <= x <= 126: printable += 1 return (printable / len(b)) >= 0.90 class SessionLogger: def __init__(self, path: str): self.path = path self._fh = open(path, "a", buffering=1, encoding="utf-8", errors="replace") self._lock = threading.Lock() def log_line(self, line: str) -> None: with self._lock: self._fh.write(line + "\n") def close(self) -> None: with self._lock: try: self._fh.flush() finally: self._fh.close() class FrameAssembler: """ Maintains a rolling buffer of bytes for one direction and emits complete frames. We treat ETX=0x03 as an end-of-frame marker. """ def __init__(self): self.buf = bytearray() def push(self, chunk: bytes) -> list[bytes]: if chunk: self.buf.extend(chunk) frames: list[bytes] = [] while True: try: etx_i = self.buf.index(0x03) except ValueError: break # include ETX byte frame = bytes(self.buf[: etx_i + 1]) del self.buf[: etx_i + 1] # ignore empty noise if frame: frames.append(frame) return frames def drain_as_text_if_any(self) -> Optional[bytes]: """ If buffer contains non-framed data (no ETX) and looks like text, emit it. Useful for things like "Operating System" that come as raw ASCII. """ if not self.buf: return None b = bytes(self.buf) if looks_like_text(b): self.buf.clear() return b return None def open_serial(port: str, baud: int) -> serial.Serial: # timeout keeps read() from blocking forever, enabling clean Ctrl+C shutdown return serial.Serial( port=port, baudrate=baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=0.05, write_timeout=0.2, ) def forward_loop( name: str, src: serial.Serial, dst: serial.Serial, logger: SessionLogger, stop: threading.Event, quiet: bool, status_every_s: float, ) -> None: assembler = FrameAssembler() last_status = time.monotonic() while not stop.is_set(): try: n = src.in_waiting if n: chunk = src.read(n if n < 4096 else 4096) else: chunk = src.read(1) # will return b"" after timeout except serial.SerialException as e: logger.log_line(f"[{now_ts()}] [ERROR] {name} serial exception: {e!r}") break if chunk: # forward immediately try: dst.write(chunk) except serial.SerialTimeoutException: logger.log_line(f"[{now_ts()}] [WARN] {name} dst write timeout (dropped {len(chunk)} bytes)") except serial.SerialException as e: logger.log_line(f"[{now_ts()}] [ERROR] {name} dst write exception: {e!r}") break # frame-aware logging frames = assembler.push(chunk) for frame in frames: # Some devices send leading STX separately; we still log as-is. logger.log_line(f"[{now_ts()}] [{name}] {bytes_to_hex(frame)}") # If we have non-ETX data that looks like text, flush it as TEXT text = assembler.drain_as_text_if_any() if text is not None: try: s = text.decode("ascii", errors="replace").strip("\r\n") except Exception: s = repr(text) logger.log_line(f"[{now_ts()}] [{name}] [TEXT] {s}") # minimal console heartbeat (cheap) if not quiet and status_every_s > 0: now = time.monotonic() if (now - last_status) >= status_every_s: print(f"[{now_ts()}] {name} alive") last_status = now # tiny sleep only when idle to avoid spin if not chunk: time.sleep(0.002) def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--bw", default="COM5", help="Blastware-side COM port (default: COM5)") ap.add_argument("--s3", default="COM4", help="S3-side COM port (default: COM4)") ap.add_argument("--baud", type=int, default=38400, help="Baud rate (default: 38400)") ap.add_argument("--logdir", default=".", help="Directory to write session logs into (default: .)") ap.add_argument("--quiet", action="store_true", help="No console heartbeat output") ap.add_argument("--status-every", type=float, default=0.0, help="Seconds between console heartbeat lines (default: 0 = off)") args = ap.parse_args() print("Opening ports...") try: bw = open_serial(args.bw, args.baud) s3 = open_serial(args.s3, args.baud) except Exception as e: print(f"Failed to open serial ports: {e!r}") return 2 print(f"Connected: {args.bw} <-> {args.s3}") os.makedirs(args.logdir, exist_ok=True) log_name = _dt.datetime.now().strftime("s3_session_%Y%m%d_%H%M%S.log") log_path = os.path.join(args.logdir, log_name) logger = SessionLogger(log_path) print(f"[LOG] Writing session log to {log_path}") logger.log_line(f"[{now_ts()}] [INFO] s3_bridge {VERSION} start") logger.log_line(f"[{now_ts()}] [INFO] BW={args.bw} S3={args.s3} baud={args.baud}") stop = threading.Event() def handle_sigint(sig, frame): stop.set() signal.signal(signal.SIGINT, handle_sigint) t1 = threading.Thread( target=forward_loop, name="BW_to_S3", args=("BW->S3", bw, s3, logger, stop, args.quiet, args.status_every), daemon=True, ) t2 = threading.Thread( target=forward_loop, name="S3_to_BW", args=("S3->BW", s3, bw, logger, stop, args.quiet, args.status_every), daemon=True, ) t1.start() t2.start() try: # Wait until Ctrl+C while not stop.is_set(): time.sleep(0.05) finally: print("\n[INFO] Ctrl+C detected, shutting down...") logger.log_line(f"[{now_ts()}] [INFO] shutdown requested") stop.set() t1.join(timeout=1.0) t2.join(timeout=1.0) try: bw.close() except Exception: pass try: s3.close() except Exception: pass logger.log_line(f"[{now_ts()}] [INFO] ports closed, session end") print("[LOG] Closing session log") logger.close() return 0 if __name__ == "__main__": raise SystemExit(main())