#!/usr/bin/env python3 """ s3_bridge.py — S3 <-> Blastware serial bridge with raw binary capture + DLE-aware text framing Version: v0.5.1 What’s new vs v0.4.0: - .bin is now a TRUE raw capture stream with direction + timestamps (record container format). - .log remains human-friendly and frame-oriented, but frame detection is now DLE-aware: - frame start = 0x10 0x02 (DLE STX) - frame end = 0x10 0x03 (DLE ETX) (No longer splits on bare 0x03.) - Marks/Info are stored as proper record types in .bin (no unsafe sentinel bytes). - Optional raw taps: use --raw-bw / --raw-s3 to also dump byte-for-byte traffic per direction with no headers (for tools that just need a flat stream). BIN record format (little-endian): [type:1][ts_us:8][len:4][payload:len] Types: 0x01 BW->S3 bytes 0x02 S3->BW bytes 0x03 MARK (utf-8) 0x04 INFO (utf-8) """ from __future__ import annotations import argparse import datetime as _dt import os import signal import sys import threading import time from typing import Optional import serial VERSION = "v0.5.1" DLE = 0x10 STX = 0x02 ETX = 0x03 ACK = 0x41 REC_BW = 0x01 REC_S3 = 0x02 REC_MARK = 0x03 REC_INFO = 0x04 def now_ts() -> str: t = _dt.datetime.now() return t.strftime("%H:%M:%S.") + f"{int(t.microsecond/1000):03d}" def now_us() -> int: # Wall-clock microseconds (fine for correlation). If you want monotonic, we can switch. return int(time.time() * 1_000_000) def bytes_to_hex(b: bytes) -> str: return " ".join(f"{x:02X}" for x in b) def looks_like_text(b: bytes) -> bool: if not b: return False printable = 0 for x in b: if x in (9, 10, 13): printable += 1 elif 32 <= x <= 126: printable += 1 return (printable / len(b)) >= 0.90 def pack_u32_le(n: int) -> bytes: return bytes((n & 0xFF, (n >> 8) & 0xFF, (n >> 16) & 0xFF, (n >> 24) & 0xFF)) def pack_u64_le(n: int) -> bytes: out = [] for i in range(8): out.append((n >> (8 * i)) & 0xFF) return bytes(out) class SessionLogger: def __init__(self, path: str, bin_path: str, raw_bw_path: Optional[str] = None, raw_s3_path: Optional[str] = None): self.path = path self.bin_path = bin_path self._fh = open(path, "a", buffering=1, encoding="utf-8", errors="replace") self._bin_fh = open(bin_path, "ab", buffering=0) self._lock = threading.Lock() # Optional pure-byte taps (no headers). BW=Blastware tx, S3=device tx. # These can be opened/closed on demand via start_raw_capture/stop_raw_capture. self._raw_bw = open(raw_bw_path, "ab", buffering=0) if raw_bw_path else None self._raw_s3 = open(raw_s3_path, "ab", buffering=0) if raw_s3_path else None self._cap_bw_path: Optional[str] = raw_bw_path self._cap_s3_path: Optional[str] = raw_s3_path def log_line(self, line: str) -> None: with self._lock: self._fh.write(line + "\n") def bin_write_record(self, rec_type: int, payload: bytes, ts_us: Optional[int] = None) -> None: if ts_us is None: ts_us = now_us() header = bytes([rec_type]) + pack_u64_le(ts_us) + pack_u32_le(len(payload)) with self._lock: self._bin_fh.write(header) if payload: self._bin_fh.write(payload) # Raw taps: write only the payload bytes (no headers) if rec_type == REC_BW and self._raw_bw: self._raw_bw.write(payload) if rec_type == REC_S3 and self._raw_s3: self._raw_s3.write(payload) def log_mark(self, label: str) -> None: ts = now_ts() self.log_line(f"[{ts}] >>> MARK: {label}") self.bin_write_record(REC_MARK, label.encode("utf-8", errors="replace")) def log_info(self, msg: str) -> None: ts = now_ts() self.log_line(f"[{ts}] [INFO] {msg}") self.bin_write_record(REC_INFO, msg.encode("utf-8", errors="replace")) def start_raw_capture(self, label: str, logdir: str) -> tuple: """Open new raw tap files for a named capture. Returns (bw_path, s3_path).""" ts = _dt.datetime.now().strftime("%Y%m%d_%H%M%S") safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in label)[:40] if label else "" suffix = f"_{safe}" if safe else "" bw_path = os.path.join(logdir, f"raw_bw_{ts}{suffix}.bin") s3_path = os.path.join(logdir, f"raw_s3_{ts}{suffix}.bin") with self._lock: # Close any previously open taps first if self._raw_bw: self._raw_bw.close() if self._raw_s3: self._raw_s3.close() self._raw_bw = open(bw_path, "ab", buffering=0) self._raw_s3 = open(s3_path, "ab", buffering=0) self._cap_bw_path = bw_path self._cap_s3_path = s3_path self.log_info(f"raw capture started: label={label!r} bw={bw_path} s3={s3_path}") return bw_path, s3_path def stop_raw_capture(self) -> tuple: """Close raw tap files. Returns (bw_path, s3_path) for the capture just closed.""" with self._lock: bw = self._cap_bw_path s3 = self._cap_s3_path if self._raw_bw: self._raw_bw.close() self._raw_bw = None if self._raw_s3: self._raw_s3.close() self._raw_s3 = None self._cap_bw_path = None self._cap_s3_path = None if bw: self.log_info(f"raw capture stopped: bw={bw} s3={s3}") return bw, s3 def close(self) -> None: with self._lock: try: self._fh.flush() self._bin_fh.flush() finally: self._fh.close() self._bin_fh.close() if self._raw_bw: self._raw_bw.close() if self._raw_s3: self._raw_s3.close() class DLEFrameSniffer: """ DLE-aware sniffer for logging only. Extracts: - ACK bytes (0x41) as single-byte events - DLE-framed blocks starting at 10 02 and ending at 10 03 - Occasional ASCII bursts (e.g. "Operating System") outside framing It does NOT modify bytes; it just segments them for the .log. """ def __init__(self): self.buf = bytearray() def push(self, chunk: bytes) -> list[tuple[str, bytes]]: if chunk: self.buf.extend(chunk) events: list[tuple[str, bytes]] = [] # Opportunistically peel off leading ACK(s) when idle-ish. # We do this only when an ACK is not inside a frame (frames start with DLE). while self.buf and self.buf[0] == ACK: events.append(("ACK", bytes([ACK]))) del self.buf[0] # Try to parse frames: find DLE STX then scan for DLE ETX while True: # Find start of frame start = self._find_dle_stx(self.buf) if start is None: # No frame start. Maybe text? txt = bytes(self.buf) if looks_like_text(txt): events.append(("TEXT", txt)) self.buf.clear() break # Emit any leading text before the frame if start > 0: leading = bytes(self.buf[:start]) if looks_like_text(leading): events.append(("TEXT", leading)) else: # Unknown junk; still preserve in log as RAW so you can see it events.append(("RAW", leading)) del self.buf[:start] # Now buf starts with DLE STX end = self._find_dle_etx(self.buf) if end is None: break # need more bytes frame = bytes(self.buf[:end]) del self.buf[:end] events.append(("FRAME", frame)) # peel off any ACKs that may immediately follow while self.buf and self.buf[0] == ACK: events.append(("ACK", bytes([ACK]))) del self.buf[0] return events @staticmethod def _find_dle_stx(b: bytearray) -> Optional[int]: for i in range(len(b) - 1): if b[i] == DLE and b[i + 1] == STX: return i return None @staticmethod def _find_dle_etx(b: bytearray) -> Optional[int]: # Find first occurrence of DLE ETX after the initial DLE STX. # Return index *after* ETX (slice end). for i in range(2, len(b) - 1): if b[i] == DLE and b[i + 1] == ETX: return i + 2 return None def open_serial(port: str, baud: int) -> serial.Serial: return serial.Serial( port=port, baudrate=baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=0.05, write_timeout=0.2, ) def forward_loop( name: str, rec_type: int, src: serial.Serial, dst: serial.Serial, logger: SessionLogger, stop: threading.Event, quiet: bool, status_every_s: float, ) -> None: sniffer = DLEFrameSniffer() last_status = time.monotonic() while not stop.is_set(): try: n = src.in_waiting chunk = src.read(n if n and n < 4096 else (4096 if n else 1)) except serial.SerialException as e: logger.log_line(f"[{now_ts()}] [ERROR] {name} serial exception: {e!r}") break if chunk: ts = now_us() # 1) RAW BIN CAPTURE (absolute truth) logger.bin_write_record(rec_type, chunk, ts_us=ts) # 2) Forward immediately (bridge behavior) try: dst.write(chunk) except serial.SerialTimeoutException: logger.log_line(f"[{now_ts()}] [WARN] {name} dst write timeout (dropped {len(chunk)} bytes)") except serial.SerialException as e: logger.log_line(f"[{now_ts()}] [ERROR] {name} dst write exception: {e!r}") break # 3) Human-friendly .log segmentation (DLE-aware) for kind, payload in sniffer.push(chunk): if kind == "ACK": logger.log_line(f"[{now_ts()}] [{name}] [ACK] 41") elif kind == "FRAME": logger.log_line(f"[{now_ts()}] [{name}] {bytes_to_hex(payload)}") elif kind == "TEXT": try: s = payload.decode("ascii", errors="replace").strip("\r\n") except Exception: s = repr(payload) logger.log_line(f"[{now_ts()}] [{name}] [TEXT] {s}") else: # RAW logger.log_line(f"[{now_ts()}] [{name}] [RAW] {bytes_to_hex(payload)}") if not quiet and status_every_s > 0: now = time.monotonic() if (now - last_status) >= status_every_s: print(f"[{now_ts()}] {name} alive") last_status = now if not chunk: time.sleep(0.002) def annotation_loop(logger: SessionLogger, logdir: str, stop: threading.Event) -> None: """ Reads stdin commands while the bridge runs. Commands: m — prompt for a mark label (interactive) CAP_START: