#!/usr/bin/env python3 """ s3_bridge.py — S3 <-> Blastware serial bridge with raw binary capture + DLE-aware text framing Version: v0.5.0 What’s new vs v0.4.0: - .bin is now a TRUE raw capture stream with direction + timestamps (record container format). - .log remains human-friendly and frame-oriented, but frame detection is now DLE-aware: - frame start = 0x10 0x02 (DLE STX) - frame end = 0x10 0x03 (DLE ETX) (No longer splits on bare 0x03.) - Marks/Info are stored as proper record types in .bin (no unsafe sentinel bytes). BIN record format (little-endian): [type:1][ts_us:8][len:4][payload:len] Types: 0x01 BW->S3 bytes 0x02 S3->BW bytes 0x03 MARK (utf-8) 0x04 INFO (utf-8) """ from __future__ import annotations import argparse import datetime as _dt import os import signal import sys import threading import time from typing import Optional import serial VERSION = "v0.5.0" DLE = 0x10 STX = 0x02 ETX = 0x03 ACK = 0x41 REC_BW = 0x01 REC_S3 = 0x02 REC_MARK = 0x03 REC_INFO = 0x04 def now_ts() -> str: t = _dt.datetime.now() return t.strftime("%H:%M:%S.") + f"{int(t.microsecond/1000):03d}" def now_us() -> int: # Wall-clock microseconds (fine for correlation). If you want monotonic, we can switch. return int(time.time() * 1_000_000) def bytes_to_hex(b: bytes) -> str: return " ".join(f"{x:02X}" for x in b) def looks_like_text(b: bytes) -> bool: if not b: return False printable = 0 for x in b: if x in (9, 10, 13): printable += 1 elif 32 <= x <= 126: printable += 1 return (printable / len(b)) >= 0.90 def pack_u32_le(n: int) -> bytes: return bytes((n & 0xFF, (n >> 8) & 0xFF, (n >> 16) & 0xFF, (n >> 24) & 0xFF)) def pack_u64_le(n: int) -> bytes: out = [] for i in range(8): out.append((n >> (8 * i)) & 0xFF) return bytes(out) class SessionLogger: def __init__(self, path: str, bin_path: str): self.path = path self.bin_path = bin_path self._fh = open(path, "a", buffering=1, encoding="utf-8", errors="replace") self._bin_fh = open(bin_path, "ab", buffering=0) self._lock = threading.Lock() def log_line(self, line: str) -> None: with self._lock: self._fh.write(line + "\n") def bin_write_record(self, rec_type: int, payload: bytes, ts_us: Optional[int] = None) -> None: if ts_us is None: ts_us = now_us() header = bytes([rec_type]) + pack_u64_le(ts_us) + pack_u32_le(len(payload)) with self._lock: self._bin_fh.write(header) if payload: self._bin_fh.write(payload) def log_mark(self, label: str) -> None: ts = now_ts() self.log_line(f"[{ts}] >>> MARK: {label}") self.bin_write_record(REC_MARK, label.encode("utf-8", errors="replace")) def log_info(self, msg: str) -> None: ts = now_ts() self.log_line(f"[{ts}] [INFO] {msg}") self.bin_write_record(REC_INFO, msg.encode("utf-8", errors="replace")) def close(self) -> None: with self._lock: try: self._fh.flush() self._bin_fh.flush() finally: self._fh.close() self._bin_fh.close() class DLEFrameSniffer: """ DLE-aware sniffer for logging only. Extracts: - ACK bytes (0x41) as single-byte events - DLE-framed blocks starting at 10 02 and ending at 10 03 - Occasional ASCII bursts (e.g. "Operating System") outside framing It does NOT modify bytes; it just segments them for the .log. """ def __init__(self): self.buf = bytearray() def push(self, chunk: bytes) -> list[tuple[str, bytes]]: if chunk: self.buf.extend(chunk) events: list[tuple[str, bytes]] = [] # Opportunistically peel off leading ACK(s) when idle-ish. # We do this only when an ACK is not inside a frame (frames start with DLE). while self.buf and self.buf[0] == ACK: events.append(("ACK", bytes([ACK]))) del self.buf[0] # Try to parse frames: find DLE STX then scan for DLE ETX while True: # Find start of frame start = self._find_dle_stx(self.buf) if start is None: # No frame start. Maybe text? txt = bytes(self.buf) if looks_like_text(txt): events.append(("TEXT", txt)) self.buf.clear() break # Emit any leading text before the frame if start > 0: leading = bytes(self.buf[:start]) if looks_like_text(leading): events.append(("TEXT", leading)) else: # Unknown junk; still preserve in log as RAW so you can see it events.append(("RAW", leading)) del self.buf[:start] # Now buf starts with DLE STX end = self._find_dle_etx(self.buf) if end is None: break # need more bytes frame = bytes(self.buf[:end]) del self.buf[:end] events.append(("FRAME", frame)) # peel off any ACKs that may immediately follow while self.buf and self.buf[0] == ACK: events.append(("ACK", bytes([ACK]))) del self.buf[0] return events @staticmethod def _find_dle_stx(b: bytearray) -> Optional[int]: for i in range(len(b) - 1): if b[i] == DLE and b[i + 1] == STX: return i return None @staticmethod def _find_dle_etx(b: bytearray) -> Optional[int]: # Find first occurrence of DLE ETX after the initial DLE STX. # Return index *after* ETX (slice end). for i in range(2, len(b) - 1): if b[i] == DLE and b[i + 1] == ETX: return i + 2 return None def open_serial(port: str, baud: int) -> serial.Serial: return serial.Serial( port=port, baudrate=baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=0.05, write_timeout=0.2, ) def forward_loop( name: str, rec_type: int, src: serial.Serial, dst: serial.Serial, logger: SessionLogger, stop: threading.Event, quiet: bool, status_every_s: float, ) -> None: sniffer = DLEFrameSniffer() last_status = time.monotonic() while not stop.is_set(): try: n = src.in_waiting chunk = src.read(n if n and n < 4096 else (4096 if n else 1)) except serial.SerialException as e: logger.log_line(f"[{now_ts()}] [ERROR] {name} serial exception: {e!r}") break if chunk: ts = now_us() # 1) RAW BIN CAPTURE (absolute truth) logger.bin_write_record(rec_type, chunk, ts_us=ts) # 2) Forward immediately (bridge behavior) try: dst.write(chunk) except serial.SerialTimeoutException: logger.log_line(f"[{now_ts()}] [WARN] {name} dst write timeout (dropped {len(chunk)} bytes)") except serial.SerialException as e: logger.log_line(f"[{now_ts()}] [ERROR] {name} dst write exception: {e!r}") break # 3) Human-friendly .log segmentation (DLE-aware) for kind, payload in sniffer.push(chunk): if kind == "ACK": logger.log_line(f"[{now_ts()}] [{name}] [ACK] 41") elif kind == "FRAME": logger.log_line(f"[{now_ts()}] [{name}] {bytes_to_hex(payload)}") elif kind == "TEXT": try: s = payload.decode("ascii", errors="replace").strip("\r\n") except Exception: s = repr(payload) logger.log_line(f"[{now_ts()}] [{name}] [TEXT] {s}") else: # RAW logger.log_line(f"[{now_ts()}] [{name}] [RAW] {bytes_to_hex(payload)}") if not quiet and status_every_s > 0: now = time.monotonic() if (now - last_status) >= status_every_s: print(f"[{now_ts()}] {name} alive") last_status = now if not chunk: time.sleep(0.002) def annotation_loop(logger: SessionLogger, stop: threading.Event) -> None: print("[MARK] Type 'm' + Enter to annotate the capture. Ctrl+C to stop.") while not stop.is_set(): try: line = input() except (EOFError, KeyboardInterrupt): break line = line.strip() if not line: continue if line.lower() == "m": try: sys.stdout.write(" Label: ") sys.stdout.flush() label = input().strip() except (EOFError, KeyboardInterrupt): break if label: logger.log_mark(label) print(f" [MARK written] {label}") else: print(" (empty label — mark cancelled)") else: print(" (type 'm' + Enter to annotate)") def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--bw", default="COM5", help="Blastware-side COM port (default: COM5)") ap.add_argument("--s3", default="COM4", help="S3-side COM port (default: COM4)") ap.add_argument("--baud", type=int, default=38400, help="Baud rate (default: 38400)") ap.add_argument("--logdir", default=".", help="Directory to write session logs into (default: .)") ap.add_argument("--quiet", action="store_true", help="No console heartbeat output") ap.add_argument("--status-every", type=float, default=0.0, help="Seconds between console heartbeat lines (default: 0 = off)") args = ap.parse_args() print("Opening ports...") try: bw = open_serial(args.bw, args.baud) s3 = open_serial(args.s3, args.baud) except Exception as e: print(f"Failed to open serial ports: {e!r}") return 2 print(f"Connected: {args.bw} <-> {args.s3}") os.makedirs(args.logdir, exist_ok=True) ts = _dt.datetime.now().strftime("%Y%m%d_%H%M%S") log_path = os.path.join(args.logdir, f"s3_session_{ts}.log") bin_path = os.path.join(args.logdir, f"s3_session_{ts}.bin") logger = SessionLogger(log_path, bin_path) print(f"[LOG] Writing hex log to {log_path}") print(f"[LOG] Writing binary log to {bin_path}") logger.log_info(f"s3_bridge {VERSION} start") logger.log_info(f"BW={args.bw} S3={args.s3} baud={args.baud}") logger.log_mark(f"SESSION START — BW={args.bw} S3={args.s3} baud={args.baud}") stop = threading.Event() def handle_sigint(sig, frame): stop.set() signal.signal(signal.SIGINT, handle_sigint) t1 = threading.Thread( target=forward_loop, name="BW_to_S3", args=("BW->S3", REC_BW, bw, s3, logger, stop, args.quiet, args.status_every), daemon=True, ) t2 = threading.Thread( target=forward_loop, name="S3_to_BW", args=("S3->BW", REC_S3, s3, bw, logger, stop, args.quiet, args.status_every), daemon=True, ) t_ann = threading.Thread( target=annotation_loop, name="Annotator", args=(logger, stop), daemon=True, ) t1.start() t2.start() t_ann.start() try: while not stop.is_set(): time.sleep(0.05) finally: print("\n[INFO] Ctrl+C detected, shutting down...") logger.log_info("shutdown requested") stop.set() t1.join(timeout=1.0) t2.join(timeout=1.0) try: bw.close() except Exception: pass try: s3.close() except Exception: pass logger.log_mark("SESSION END") logger.log_info("ports closed, session end") logger.close() return 0 if __name__ == "__main__": raise SystemExit(main())