From 5e1a532544d16556cda9096cdbba1aa48dd8bd8d Mon Sep 17 00:00:00 2001 From: serversdwn Date: Wed, 25 Feb 2026 16:55:34 -0500 Subject: [PATCH] fix: fixed loop causing cpu overload, cleaned up outputs, disabled in console logging because apparently its resource heavy and i want this to run on a toaster. --- bridges/s3-bridge/s3_bridge.py | 305 +++++++++++++++++++++++++++------ 1 file changed, 250 insertions(+), 55 deletions(-) diff --git a/bridges/s3-bridge/s3_bridge.py b/bridges/s3-bridge/s3_bridge.py index ba8605b..76b490a 100644 --- a/bridges/s3-bridge/s3_bridge.py +++ b/bridges/s3-bridge/s3_bridge.py @@ -1,78 +1,273 @@ -import serial -import threading -import datetime -import sys -import os +#!/usr/bin/env python3 +""" +s3_bridge.py — S3 <-> Blastware serial bridge with frame-aware session logging +Version: v0.4.0 + +Key features: +- Low CPU: avoids per-byte console printing +- Forwards bytes immediately (true bridge) +- Frame-aware logging: buffers per direction until ETX (0x03), then logs full frame on one line +- Also logs plain ASCII bursts (e.g., "Operating System") cleanly +- Session log file created on start, closed on Ctrl+C + +Usage examples: + python s3_bridge.py + python s3_bridge.py --bw COM5 --s3 COM4 --baud 38400 + python s3_bridge.py --quiet +""" + +from __future__ import annotations + +import argparse +import datetime as _dt +import os +import signal +import sys +import threading +import time +from typing import Optional + +import serial + + +VERSION = "v0.4.0" + + +def now_ts() -> str: + # Local time with milliseconds, like [13:37:06.239] + t = _dt.datetime.now() + return t.strftime("%H:%M:%S.") + f"{int(t.microsecond/1000):03d}" + + +def bytes_to_hex(b: bytes) -> str: + return " ".join(f"{x:02X}" for x in b) + + +def looks_like_text(b: bytes) -> bool: + # Heuristic: mostly printable ASCII plus spaces + if not b: + return False + printable = 0 + for x in b: + if x in (9, 10, 13): # \t \n \r + printable += 1 + elif 32 <= x <= 126: + printable += 1 + return (printable / len(b)) >= 0.90 -PORT_A = "COM5" # Real device -PORT_B = "COM4" # Virtual port for Blastware -BAUD = 38400 class SessionLogger: + def __init__(self, path: str): + self.path = path + self._fh = open(path, "a", buffering=1, encoding="utf-8", errors="replace") + self._lock = threading.Lock() + + def log_line(self, line: str) -> None: + with self._lock: + self._fh.write(line + "\n") + + def close(self) -> None: + with self._lock: + try: + self._fh.flush() + finally: + self._fh.close() + + +class FrameAssembler: + """ + Maintains a rolling buffer of bytes for one direction and emits complete frames. + We treat ETX=0x03 as an end-of-frame marker. + """ def __init__(self): - ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - self.filename = f"s3_session_{ts}.log" - self.file = open(self.filename, "w", buffering=1) - print(f"[LOG] Writing session log to {self.filename}") + self.buf = bytearray() - def log(self, direction, data): - now = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3] - hex_string = " ".join(f"{b:02X}" for b in data) - line = f"[{now}] [{direction}] {hex_string}" - print(line) - self.file.write(line + "\n") + def push(self, chunk: bytes) -> list[bytes]: + if chunk: + self.buf.extend(chunk) - def close(self): - print("[LOG] Closing session log") - self.file.close() - - -def forward(src, dst, direction, logger): - try: + frames: list[bytes] = [] while True: - data = src.read(src.in_waiting or 1) - if data: - logger.log(direction, data) - dst.write(data) - except Exception as e: - print(f"[ERROR] {direction}: {e}") + try: + etx_i = self.buf.index(0x03) + except ValueError: + break + + # include ETX byte + frame = bytes(self.buf[: etx_i + 1]) + del self.buf[: etx_i + 1] + + # ignore empty noise + if frame: + frames.append(frame) + + return frames + + def drain_as_text_if_any(self) -> Optional[bytes]: + """ + If buffer contains non-framed data (no ETX) and looks like text, emit it. + Useful for things like "Operating System" that come as raw ASCII. + """ + if not self.buf: + return None + b = bytes(self.buf) + if looks_like_text(b): + self.buf.clear() + return b + return None -def main(): - print("Opening ports...") - - ser_a = serial.Serial(PORT_A, BAUD, timeout=0) - ser_b = serial.Serial(PORT_B, BAUD, timeout=0) - - print(f"Connected: {PORT_A} <-> {PORT_B}") - - logger = SessionLogger() - - t1 = threading.Thread( - target=forward, - args=(ser_b, ser_a, "BLASTWARE -> S3", logger), - daemon=True +def open_serial(port: str, baud: int) -> serial.Serial: + # timeout keeps read() from blocking forever, enabling clean Ctrl+C shutdown + return serial.Serial( + port=port, + baudrate=baud, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=0.05, + write_timeout=0.2, ) + +def forward_loop( + name: str, + src: serial.Serial, + dst: serial.Serial, + logger: SessionLogger, + stop: threading.Event, + quiet: bool, + status_every_s: float, +) -> None: + assembler = FrameAssembler() + last_status = time.monotonic() + + while not stop.is_set(): + try: + n = src.in_waiting + if n: + chunk = src.read(n if n < 4096 else 4096) + else: + chunk = src.read(1) # will return b"" after timeout + except serial.SerialException as e: + logger.log_line(f"[{now_ts()}] [ERROR] {name} serial exception: {e!r}") + break + + if chunk: + # forward immediately + try: + dst.write(chunk) + except serial.SerialTimeoutException: + logger.log_line(f"[{now_ts()}] [WARN] {name} dst write timeout (dropped {len(chunk)} bytes)") + except serial.SerialException as e: + logger.log_line(f"[{now_ts()}] [ERROR] {name} dst write exception: {e!r}") + break + + # frame-aware logging + frames = assembler.push(chunk) + for frame in frames: + # Some devices send leading STX separately; we still log as-is. + logger.log_line(f"[{now_ts()}] [{name}] {bytes_to_hex(frame)}") + + # If we have non-ETX data that looks like text, flush it as TEXT + text = assembler.drain_as_text_if_any() + if text is not None: + try: + s = text.decode("ascii", errors="replace").strip("\r\n") + except Exception: + s = repr(text) + logger.log_line(f"[{now_ts()}] [{name}] [TEXT] {s}") + + # minimal console heartbeat (cheap) + if not quiet and status_every_s > 0: + now = time.monotonic() + if (now - last_status) >= status_every_s: + print(f"[{now_ts()}] {name} alive") + last_status = now + + # tiny sleep only when idle to avoid spin + if not chunk: + time.sleep(0.002) + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--bw", default="COM5", help="Blastware-side COM port (default: COM5)") + ap.add_argument("--s3", default="COM4", help="S3-side COM port (default: COM4)") + ap.add_argument("--baud", type=int, default=38400, help="Baud rate (default: 38400)") + ap.add_argument("--logdir", default=".", help="Directory to write session logs into (default: .)") + ap.add_argument("--quiet", action="store_true", help="No console heartbeat output") + ap.add_argument("--status-every", type=float, default=0.0, help="Seconds between console heartbeat lines (default: 0 = off)") + args = ap.parse_args() + + print("Opening ports...") + try: + bw = open_serial(args.bw, args.baud) + s3 = open_serial(args.s3, args.baud) + except Exception as e: + print(f"Failed to open serial ports: {e!r}") + return 2 + + print(f"Connected: {args.bw} <-> {args.s3}") + + os.makedirs(args.logdir, exist_ok=True) + log_name = _dt.datetime.now().strftime("s3_session_%Y%m%d_%H%M%S.log") + log_path = os.path.join(args.logdir, log_name) + logger = SessionLogger(log_path) + print(f"[LOG] Writing session log to {log_path}") + logger.log_line(f"[{now_ts()}] [INFO] s3_bridge {VERSION} start") + logger.log_line(f"[{now_ts()}] [INFO] BW={args.bw} S3={args.s3} baud={args.baud}") + + stop = threading.Event() + + def handle_sigint(sig, frame): + stop.set() + + signal.signal(signal.SIGINT, handle_sigint) + + t1 = threading.Thread( + target=forward_loop, + name="BW_to_S3", + args=("BW->S3", bw, s3, logger, stop, args.quiet, args.status_every), + daemon=True, + ) t2 = threading.Thread( - target=forward, - args=(ser_a, ser_b, "S3 -> BLASTWARE", logger), - daemon=True + target=forward_loop, + name="S3_to_BW", + args=("S3->BW", s3, bw, logger, stop, args.quiet, args.status_every), + daemon=True, ) t1.start() t2.start() try: - while True: - pass - except KeyboardInterrupt: - print("\n[INFO] Ctrl+C detected, shutting down...") + # Wait until Ctrl+C + while not stop.is_set(): + time.sleep(0.05) finally: + print("\n[INFO] Ctrl+C detected, shutting down...") + logger.log_line(f"[{now_ts()}] [INFO] shutdown requested") + + stop.set() + t1.join(timeout=1.0) + t2.join(timeout=1.0) + + try: + bw.close() + except Exception: + pass + try: + s3.close() + except Exception: + pass + + logger.log_line(f"[{now_ts()}] [INFO] ports closed, session end") + print("[LOG] Closing session log") logger.close() - ser_a.close() - ser_b.close() + + return 0 if __name__ == "__main__": - main() \ No newline at end of file + raise SystemExit(main()) \ No newline at end of file