#!/usr/bin/env python3 """ raw_capture.py — minimal serial logger for raw byte collection. Opens a single COM port, streams all bytes to a timestamped binary file, and does no parsing or forwarding. Useful when you just need the raw wire data without DLE framing or Blastware bridging. Record format (little-endian): [ts_us:8][len:4][payload:len] Exactly one record type is used, so there is no type byte. """ from __future__ import annotations import argparse import datetime as _dt import os import signal import sys import time from typing import Optional import serial def now_ts() -> str: t = _dt.datetime.now() return t.strftime("%H:%M:%S.") + f"{int(t.microsecond/1000):03d}" def pack_u32_le(n: int) -> bytes: return bytes((n & 0xFF, (n >> 8) & 0xFF, (n >> 16) & 0xFF, (n >> 24) & 0xFF)) def pack_u64_le(n: int) -> bytes: out = [] for i in range(8): out.append((n >> (8 * i)) & 0xFF) return bytes(out) def open_serial(port: str, baud: int, timeout: float) -> serial.Serial: return serial.Serial( port=port, baudrate=baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=timeout, write_timeout=timeout, ) class RawWriter: def __init__(self, path: str): self.path = path self._fh = open(path, "ab", buffering=0) def write(self, payload: bytes, ts_us: Optional[int] = None) -> None: if ts_us is None: ts_us = int(time.time() * 1_000_000) header = pack_u64_le(ts_us) + pack_u32_le(len(payload)) self._fh.write(header) if payload: self._fh.write(payload) def close(self) -> None: try: self._fh.flush() finally: self._fh.close() def capture_loop(port: serial.Serial, writer: RawWriter, stop_flag: "StopFlag", status_every_s: float) -> None: last_status = time.monotonic() bytes_written = 0 while not stop_flag.is_set(): try: n = port.in_waiting chunk = port.read(n if n and n < 4096 else (4096 if n else 1)) except serial.SerialException as e: print(f"[{now_ts()}] [ERROR] serial exception: {e!r}", file=sys.stderr) break if chunk: writer.write(chunk) bytes_written += len(chunk) if status_every_s > 0: now = time.monotonic() if now - last_status >= status_every_s: print(f"[{now_ts()}] captured {bytes_written} bytes", flush=True) last_status = now if not chunk: time.sleep(0.002) class StopFlag: def __init__(self): self._set = False def set(self): self._set = True def is_set(self) -> bool: return self._set def main() -> int: ap = argparse.ArgumentParser(description="Raw serial capture to timestamped binary file (no forwarding).") ap.add_argument("--port", default="COM5", help="Serial port to capture (default: COM5)") ap.add_argument("--baud", type=int, default=38400, help="Baud rate (default: 38400)") ap.add_argument("--timeout", type=float, default=0.05, help="Serial read timeout in seconds (default: 0.05)") ap.add_argument("--logdir", default=".", help="Directory to write captures (default: .)") ap.add_argument("--status-every", type=float, default=5.0, help="Seconds between progress lines (0 disables)") args = ap.parse_args() os.makedirs(args.logdir, exist_ok=True) ts = _dt.datetime.now().strftime("%Y%m%d_%H%M%S") bin_path = os.path.join(args.logdir, f"raw_capture_{ts}.bin") print(f"[INFO] Opening {args.port} @ {args.baud}...") try: ser = open_serial(args.port, args.baud, args.timeout) except Exception as e: print(f"[ERROR] failed to open port: {e!r}", file=sys.stderr) return 2 writer = RawWriter(bin_path) print(f"[INFO] Writing raw bytes to {bin_path}") print("[INFO] Press Ctrl+C to stop.") stop = StopFlag() def handle_sigint(sig, frame): stop.set() signal.signal(signal.SIGINT, handle_sigint) try: capture_loop(ser, writer, stop, args.status_every) finally: writer.close() try: ser.close() except Exception: pass print(f"[INFO] Capture stopped. Total bytes written: {os.path.getsize(bin_path)}") return 0 if __name__ == "__main__": raise SystemExit(main())