Files
claude 4331215e23 feat(protocol): enhance raw capture functionality and documentation updates
- Update `s3_bridge.py` to default raw capture file paths to "auto" for timestamped naming.
- Modify `gui_bridge.py` to pre-check raw capture options and streamline path handling.
- Extend `ach_server.py` to save both incoming and outgoing raw bytes for analysis.
- Revise `CHANGELOG.md` and `instantel_protocol_reference.md` to reflect changes in recording mode handling and compliance data encoding.
2026-04-21 16:07:24 -04:00

506 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
s3_bridge.py — S3 <-> Blastware serial bridge with raw binary capture + DLE-aware text framing
Version: v0.5.1
Whats new vs v0.4.0:
- .bin is now a TRUE raw capture stream with direction + timestamps (record container format).
- .log remains human-friendly and frame-oriented, but frame detection is now DLE-aware:
- frame start = 0x10 0x02 (DLE STX)
- frame end = 0x10 0x03 (DLE ETX)
(No longer splits on bare 0x03.)
- Marks/Info are stored as proper record types in .bin (no unsafe sentinel bytes).
- Optional raw taps: use --raw-bw / --raw-s3 to also dump byte-for-byte traffic per direction
with no headers (for tools that just need a flat stream).
BIN record format (little-endian):
[type:1][ts_us:8][len:4][payload:len]
Types:
0x01 BW->S3 bytes
0x02 S3->BW bytes
0x03 MARK (utf-8)
0x04 INFO (utf-8)
"""
from __future__ import annotations
import argparse
import datetime as _dt
import os
import signal
import sys
import threading
import time
from typing import Optional
import serial
VERSION = "v0.5.1"
DLE = 0x10
STX = 0x02
ETX = 0x03
ACK = 0x41
REC_BW = 0x01
REC_S3 = 0x02
REC_MARK = 0x03
REC_INFO = 0x04
def now_ts() -> str:
t = _dt.datetime.now()
return t.strftime("%H:%M:%S.") + f"{int(t.microsecond/1000):03d}"
def now_us() -> int:
# Wall-clock microseconds (fine for correlation). If you want monotonic, we can switch.
return int(time.time() * 1_000_000)
def bytes_to_hex(b: bytes) -> str:
return " ".join(f"{x:02X}" for x in b)
def looks_like_text(b: bytes) -> bool:
if not b:
return False
printable = 0
for x in b:
if x in (9, 10, 13):
printable += 1
elif 32 <= x <= 126:
printable += 1
return (printable / len(b)) >= 0.90
def pack_u32_le(n: int) -> bytes:
return bytes((n & 0xFF, (n >> 8) & 0xFF, (n >> 16) & 0xFF, (n >> 24) & 0xFF))
def pack_u64_le(n: int) -> bytes:
out = []
for i in range(8):
out.append((n >> (8 * i)) & 0xFF)
return bytes(out)
class SessionLogger:
def __init__(self, path: str, bin_path: str, raw_bw_path: Optional[str] = None, raw_s3_path: Optional[str] = None):
self.path = path
self.bin_path = bin_path
self._fh = open(path, "a", buffering=1, encoding="utf-8", errors="replace")
self._bin_fh = open(bin_path, "ab", buffering=0)
self._lock = threading.Lock()
# Optional pure-byte taps (no headers). BW=Blastware tx, S3=device tx.
# These can be opened/closed on demand via start_raw_capture/stop_raw_capture.
self._raw_bw = open(raw_bw_path, "ab", buffering=0) if raw_bw_path else None
self._raw_s3 = open(raw_s3_path, "ab", buffering=0) if raw_s3_path else None
self._cap_bw_path: Optional[str] = raw_bw_path
self._cap_s3_path: Optional[str] = raw_s3_path
def log_line(self, line: str) -> None:
with self._lock:
self._fh.write(line + "\n")
def bin_write_record(self, rec_type: int, payload: bytes, ts_us: Optional[int] = None) -> None:
if ts_us is None:
ts_us = now_us()
header = bytes([rec_type]) + pack_u64_le(ts_us) + pack_u32_le(len(payload))
with self._lock:
self._bin_fh.write(header)
if payload:
self._bin_fh.write(payload)
# Raw taps: write only the payload bytes (no headers)
if rec_type == REC_BW and self._raw_bw:
self._raw_bw.write(payload)
if rec_type == REC_S3 and self._raw_s3:
self._raw_s3.write(payload)
def log_mark(self, label: str) -> None:
ts = now_ts()
self.log_line(f"[{ts}] >>> MARK: {label}")
self.bin_write_record(REC_MARK, label.encode("utf-8", errors="replace"))
def log_info(self, msg: str) -> None:
ts = now_ts()
self.log_line(f"[{ts}] [INFO] {msg}")
self.bin_write_record(REC_INFO, msg.encode("utf-8", errors="replace"))
def start_raw_capture(self, label: str, logdir: str) -> tuple:
"""Open new raw tap files for a named capture. Returns (bw_path, s3_path)."""
ts = _dt.datetime.now().strftime("%Y%m%d_%H%M%S")
safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in label)[:40] if label else ""
suffix = f"_{safe}" if safe else ""
bw_path = os.path.join(logdir, f"raw_bw_{ts}{suffix}.bin")
s3_path = os.path.join(logdir, f"raw_s3_{ts}{suffix}.bin")
with self._lock:
# Close any previously open taps first
if self._raw_bw:
self._raw_bw.close()
if self._raw_s3:
self._raw_s3.close()
self._raw_bw = open(bw_path, "ab", buffering=0)
self._raw_s3 = open(s3_path, "ab", buffering=0)
self._cap_bw_path = bw_path
self._cap_s3_path = s3_path
self.log_info(f"raw capture started: label={label!r} bw={bw_path} s3={s3_path}")
return bw_path, s3_path
def stop_raw_capture(self) -> tuple:
"""Close raw tap files. Returns (bw_path, s3_path) for the capture just closed."""
with self._lock:
bw = self._cap_bw_path
s3 = self._cap_s3_path
if self._raw_bw:
self._raw_bw.close()
self._raw_bw = None
if self._raw_s3:
self._raw_s3.close()
self._raw_s3 = None
self._cap_bw_path = None
self._cap_s3_path = None
if bw:
self.log_info(f"raw capture stopped: bw={bw} s3={s3}")
return bw, s3
def close(self) -> None:
with self._lock:
try:
self._fh.flush()
self._bin_fh.flush()
finally:
self._fh.close()
self._bin_fh.close()
if self._raw_bw:
self._raw_bw.close()
if self._raw_s3:
self._raw_s3.close()
class DLEFrameSniffer:
"""
DLE-aware sniffer for logging only.
Extracts:
- ACK bytes (0x41) as single-byte events
- DLE-framed blocks starting at 10 02 and ending at 10 03
- Occasional ASCII bursts (e.g. "Operating System") outside framing
It does NOT modify bytes; it just segments them for the .log.
"""
def __init__(self):
self.buf = bytearray()
def push(self, chunk: bytes) -> list[tuple[str, bytes]]:
if chunk:
self.buf.extend(chunk)
events: list[tuple[str, bytes]] = []
# Opportunistically peel off leading ACK(s) when idle-ish.
# We do this only when an ACK is not inside a frame (frames start with DLE).
while self.buf and self.buf[0] == ACK:
events.append(("ACK", bytes([ACK])))
del self.buf[0]
# Try to parse frames: find DLE STX then scan for DLE ETX
while True:
# Find start of frame
start = self._find_dle_stx(self.buf)
if start is None:
# No frame start. Maybe text?
txt = bytes(self.buf)
if looks_like_text(txt):
events.append(("TEXT", txt))
self.buf.clear()
break
# Emit any leading text before the frame
if start > 0:
leading = bytes(self.buf[:start])
if looks_like_text(leading):
events.append(("TEXT", leading))
else:
# Unknown junk; still preserve in log as RAW so you can see it
events.append(("RAW", leading))
del self.buf[:start]
# Now buf starts with DLE STX
end = self._find_dle_etx(self.buf)
if end is None:
break # need more bytes
frame = bytes(self.buf[:end])
del self.buf[:end]
events.append(("FRAME", frame))
# peel off any ACKs that may immediately follow
while self.buf and self.buf[0] == ACK:
events.append(("ACK", bytes([ACK])))
del self.buf[0]
return events
@staticmethod
def _find_dle_stx(b: bytearray) -> Optional[int]:
for i in range(len(b) - 1):
if b[i] == DLE and b[i + 1] == STX:
return i
return None
@staticmethod
def _find_dle_etx(b: bytearray) -> Optional[int]:
# Find first occurrence of DLE ETX after the initial DLE STX.
# Return index *after* ETX (slice end).
for i in range(2, len(b) - 1):
if b[i] == DLE and b[i + 1] == ETX:
return i + 2
return None
def open_serial(port: str, baud: int) -> serial.Serial:
return serial.Serial(
port=port,
baudrate=baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.05,
write_timeout=0.2,
)
def forward_loop(
name: str,
rec_type: int,
src: serial.Serial,
dst: serial.Serial,
logger: SessionLogger,
stop: threading.Event,
quiet: bool,
status_every_s: float,
) -> None:
sniffer = DLEFrameSniffer()
last_status = time.monotonic()
while not stop.is_set():
try:
n = src.in_waiting
chunk = src.read(n if n and n < 4096 else (4096 if n else 1))
except serial.SerialException as e:
logger.log_line(f"[{now_ts()}] [ERROR] {name} serial exception: {e!r}")
break
if chunk:
ts = now_us()
# 1) RAW BIN CAPTURE (absolute truth)
logger.bin_write_record(rec_type, chunk, ts_us=ts)
# 2) Forward immediately (bridge behavior)
try:
dst.write(chunk)
except serial.SerialTimeoutException:
logger.log_line(f"[{now_ts()}] [WARN] {name} dst write timeout (dropped {len(chunk)} bytes)")
except serial.SerialException as e:
logger.log_line(f"[{now_ts()}] [ERROR] {name} dst write exception: {e!r}")
break
# 3) Human-friendly .log segmentation (DLE-aware)
for kind, payload in sniffer.push(chunk):
if kind == "ACK":
logger.log_line(f"[{now_ts()}] [{name}] [ACK] 41")
elif kind == "FRAME":
logger.log_line(f"[{now_ts()}] [{name}] {bytes_to_hex(payload)}")
elif kind == "TEXT":
try:
s = payload.decode("ascii", errors="replace").strip("\r\n")
except Exception:
s = repr(payload)
logger.log_line(f"[{now_ts()}] [{name}] [TEXT] {s}")
else: # RAW
logger.log_line(f"[{now_ts()}] [{name}] [RAW] {bytes_to_hex(payload)}")
if not quiet and status_every_s > 0:
now = time.monotonic()
if (now - last_status) >= status_every_s:
print(f"[{now_ts()}] {name} alive")
last_status = now
if not chunk:
time.sleep(0.002)
def annotation_loop(logger: SessionLogger, logdir: str, stop: threading.Event) -> None:
"""
Reads stdin commands while the bridge runs.
Commands:
m — prompt for a mark label (interactive)
CAP_START:<label> — begin a raw tap capture with the given label
CAP_STOP — stop the current raw tap capture
Responses (printed to stdout, parsed by the GUI):
[CAP_START] <bw_path>\\t<s3_path>
[CAP_STOP] <bw_path>\\t<s3_path>
"""
while not stop.is_set():
try:
line = input()
except (EOFError, KeyboardInterrupt):
break
line = line.strip()
if not line:
continue
if line.startswith("CAP_START:"):
label = line[10:].strip()
bw_path, s3_path = logger.start_raw_capture(label, logdir)
print(f"[CAP_START] {bw_path}\t{s3_path}")
sys.stdout.flush()
elif line == "CAP_STOP":
bw_path, s3_path = logger.stop_raw_capture()
if bw_path:
print(f"[CAP_STOP] {bw_path}\t{s3_path}")
else:
print("[CAP_STOP] no active capture")
sys.stdout.flush()
elif line.lower() == "m":
try:
sys.stdout.write(" Label: ")
sys.stdout.flush()
label = input().strip()
except (EOFError, KeyboardInterrupt):
break
if label:
logger.log_mark(label)
print(f" [MARK written] {label}")
else:
print(" (empty label — mark cancelled)")
else:
print(f" (unknown command: {line!r})")
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--bw", default="COM4", help="Blastware-side COM port (default: COM4)")
ap.add_argument("--s3", default="COM5", help="S3-side COM port (default: COM5)")
ap.add_argument("--baud", type=int, default=38400, help="Baud rate (default: 38400)")
ap.add_argument("--logdir", default=".", help="Directory to write session logs into (default: .)")
ap.add_argument("--raw-bw", default="auto",
help="File to append raw bytes sent from BW->S3 (no headers). "
"Default 'auto' generates a timestamped name in --logdir. "
"Pass an empty string to disable.")
ap.add_argument("--raw-s3", default="auto",
help="File to append raw bytes sent from S3->BW (no headers). "
"Default 'auto' generates a timestamped name in --logdir. "
"Pass an empty string to disable.")
ap.add_argument("--quiet", action="store_true", help="No console heartbeat output")
ap.add_argument("--status-every", type=float, default=0.0, help="Seconds between console heartbeat lines (default: 0 = off)")
args = ap.parse_args()
print("Opening ports...")
try:
bw = open_serial(args.bw, args.baud)
s3 = open_serial(args.s3, args.baud)
except Exception as e:
print(f"Failed to open serial ports: {e!r}")
return 2
print(f"Connected: {args.bw} <-> {args.s3}")
os.makedirs(args.logdir, exist_ok=True)
ts = _dt.datetime.now().strftime("%Y%m%d_%H%M%S")
log_path = os.path.join(args.logdir, f"s3_session_{ts}.log")
bin_path = os.path.join(args.logdir, f"s3_session_{ts}.bin")
# If raw tap flags were passed without a path (bare --raw-bw / --raw-s3),
# or if the sentinel value "auto" is used, generate a timestamped name.
# If a specific path was provided, use it as-is (caller's responsibility).
# Resolve raw tap paths.
# "auto" (default) → timestamped file in logdir (always captured).
# Explicit path → use verbatim.
# None or "" → disabled (pass --raw-bw "" to suppress capture).
raw_bw_path: Optional[str] = args.raw_bw if args.raw_bw else None
raw_s3_path: Optional[str] = args.raw_s3 if args.raw_s3 else None
if raw_bw_path == "auto":
raw_bw_path = os.path.join(args.logdir, f"raw_bw_{ts}.bin")
if raw_s3_path == "auto":
raw_s3_path = os.path.join(args.logdir, f"raw_s3_{ts}.bin")
logger = SessionLogger(log_path, bin_path, raw_bw_path=raw_bw_path, raw_s3_path=raw_s3_path)
print(f"[LOG] Writing hex log to {log_path}")
print(f"[LOG] Writing binary log to {bin_path}")
if raw_bw_path:
print(f"[LOG] Raw tap BW->S3 -> {raw_bw_path}")
if raw_s3_path:
print(f"[LOG] Raw tap S3->BW -> {raw_s3_path}")
logger.log_info(f"s3_bridge {VERSION} start")
logger.log_info(f"BW={args.bw} S3={args.s3} baud={args.baud}")
logger.log_mark(f"SESSION START — BW={args.bw} S3={args.s3} baud={args.baud}")
stop = threading.Event()
def handle_sigint(sig, frame):
stop.set()
signal.signal(signal.SIGINT, handle_sigint)
t1 = threading.Thread(
target=forward_loop,
name="BW_to_S3",
args=("BW->S3", REC_BW, bw, s3, logger, stop, args.quiet, args.status_every),
daemon=True,
)
t2 = threading.Thread(
target=forward_loop,
name="S3_to_BW",
args=("S3->BW", REC_S3, s3, bw, logger, stop, args.quiet, args.status_every),
daemon=True,
)
t_ann = threading.Thread(
target=annotation_loop,
name="Annotator",
args=(logger, args.logdir, stop),
daemon=True,
)
t1.start()
t2.start()
t_ann.start()
try:
while not stop.is_set():
time.sleep(0.05)
finally:
print("\n[INFO] Ctrl+C detected, shutting down...")
logger.log_info("shutdown requested")
stop.set()
t1.join(timeout=1.0)
t2.join(timeout=1.0)
try:
bw.close()
except Exception:
pass
try:
s3.close()
except Exception:
pass
logger.log_mark("SESSION END")
logger.log_info("ports closed, session end")
logger.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())