Files
seismo-relay/bridges/s3-bridge/s3_bridge.py

364 lines
11 KiB
Python

#!/usr/bin/env python3
"""
s3_bridge.py — S3 <-> Blastware serial bridge with frame-aware session logging
Version: v0.4.0
Key features:
- Low CPU: avoids per-byte console printing
- Forwards bytes immediately (true bridge)
- Frame-aware logging: buffers per direction until ETX (0x03), then logs full frame on one line
- Also logs plain ASCII bursts (e.g., "Operating System") cleanly
- Dual log output: hex text log (.log) AND raw binary log (.bin) written simultaneously
- Interactive annotation: type 'm' + Enter to stamp a [MARK] into both logs mid-capture
- Binary sentinel markers: out-of-band FF FF FF FF <len> <label> in .bin for programmatic correlation
- Auto-marks on session start and end
Usage examples:
python s3_bridge.py
python s3_bridge.py --bw COM5 --s3 COM4 --baud 38400
python s3_bridge.py --quiet
Annotation:
While running, type 'm' and press Enter. You will be prompted for a label.
The mark is written to the .log as:
[HH:MM:SS.mmm] >>> MARK: your label here
And to the .bin as an out-of-band sentinel (never valid frame data):
FF FF FF FF <1-byte length> <label bytes>
"""
from __future__ import annotations
import argparse
import datetime as _dt
import os
import signal
import sys
import threading
import time
from typing import Optional
import serial
VERSION = "v0.4.0"
# Sentinel prefix for binary markers. Four 0xFF bytes can never appear in
# valid Instantel DLE-framed data (0xFF is not a legal protocol byte in any
# framing position), so this sequence is unambiguously out-of-band.
BIN_MARK_SENTINEL = b"\xFF\xFF\xFF\xFF"
def now_ts() -> str:
# Local time with milliseconds, like [13:37:06.239]
t = _dt.datetime.now()
return t.strftime("%H:%M:%S.") + f"{int(t.microsecond/1000):03d}"
def bytes_to_hex(b: bytes) -> str:
return " ".join(f"{x:02X}" for x in b)
def looks_like_text(b: bytes) -> bool:
# Heuristic: mostly printable ASCII plus spaces
if not b:
return False
printable = 0
for x in b:
if x in (9, 10, 13): # \t \n \r
printable += 1
elif 32 <= x <= 126:
printable += 1
return (printable / len(b)) >= 0.90
class SessionLogger:
def __init__(self, path: str, bin_path: str):
self.path = path
self.bin_path = bin_path
self._fh = open(path, "a", buffering=1, encoding="utf-8", errors="replace")
self._bin_fh = open(bin_path, "ab", buffering=0)
self._lock = threading.Lock()
def log_line(self, line: str) -> None:
with self._lock:
self._fh.write(line + "\n")
def log_raw(self, data: bytes) -> None:
"""Write raw bytes directly to the binary log."""
with self._lock:
self._bin_fh.write(data)
def log_mark(self, label: str) -> None:
"""
Write an annotation mark to both logs simultaneously.
.log — visually distinct line: [TS] >>> MARK: label
.bin — out-of-band sentinel: FF FF FF FF <len> <label utf-8, max 255 bytes>
"""
ts = now_ts()
label_bytes = label.encode("utf-8", errors="replace")[:255]
sentinel = BIN_MARK_SENTINEL + bytes([len(label_bytes)]) + label_bytes
with self._lock:
self._fh.write(f"[{ts}] >>> MARK: {label}\n")
self._bin_fh.write(sentinel)
def close(self) -> None:
with self._lock:
try:
self._fh.flush()
self._bin_fh.flush()
finally:
self._fh.close()
self._bin_fh.close()
class FrameAssembler:
"""
Maintains a rolling buffer of bytes for one direction and emits complete frames.
We treat ETX=0x03 as an end-of-frame marker.
"""
def __init__(self):
self.buf = bytearray()
def push(self, chunk: bytes) -> list[bytes]:
if chunk:
self.buf.extend(chunk)
frames: list[bytes] = []
while True:
try:
etx_i = self.buf.index(0x03)
except ValueError:
break
# include ETX byte
frame = bytes(self.buf[: etx_i + 1])
del self.buf[: etx_i + 1]
# ignore empty noise
if frame:
frames.append(frame)
return frames
def drain_as_text_if_any(self) -> Optional[bytes]:
"""
If buffer contains non-framed data (no ETX) and looks like text, emit it.
Useful for things like "Operating System" that come as raw ASCII.
"""
if not self.buf:
return None
b = bytes(self.buf)
if looks_like_text(b):
self.buf.clear()
return b
return None
def open_serial(port: str, baud: int) -> serial.Serial:
# timeout keeps read() from blocking forever, enabling clean Ctrl+C shutdown
return serial.Serial(
port=port,
baudrate=baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.05,
write_timeout=0.2,
)
def forward_loop(
name: str,
src: serial.Serial,
dst: serial.Serial,
logger: SessionLogger,
stop: threading.Event,
quiet: bool,
status_every_s: float,
) -> None:
assembler = FrameAssembler()
last_status = time.monotonic()
while not stop.is_set():
try:
n = src.in_waiting
if n:
chunk = src.read(n if n < 4096 else 4096)
else:
chunk = src.read(1) # will return b"" after timeout
except serial.SerialException as e:
logger.log_line(f"[{now_ts()}] [ERROR] {name} serial exception: {e!r}")
break
if chunk:
# forward immediately
try:
dst.write(chunk)
except serial.SerialTimeoutException:
logger.log_line(f"[{now_ts()}] [WARN] {name} dst write timeout (dropped {len(chunk)} bytes)")
except serial.SerialException as e:
logger.log_line(f"[{now_ts()}] [ERROR] {name} dst write exception: {e!r}")
break
# frame-aware logging
frames = assembler.push(chunk)
for frame in frames:
# Some devices send leading STX separately; we still log as-is.
logger.log_line(f"[{now_ts()}] [{name}] {bytes_to_hex(frame)}")
logger.log_raw(frame)
# If we have non-ETX data that looks like text, flush it as TEXT
text = assembler.drain_as_text_if_any()
if text is not None:
try:
s = text.decode("ascii", errors="replace").strip("\r\n")
except Exception:
s = repr(text)
logger.log_line(f"[{now_ts()}] [{name}] [TEXT] {s}")
logger.log_raw(text)
# minimal console heartbeat (cheap)
if not quiet and status_every_s > 0:
now = time.monotonic()
if (now - last_status) >= status_every_s:
print(f"[{now_ts()}] {name} alive")
last_status = now
# tiny sleep only when idle to avoid spin
if not chunk:
time.sleep(0.002)
def annotation_loop(logger: SessionLogger, stop: threading.Event) -> None:
"""
Runs on the main thread (or a dedicated thread) reading stdin.
Type 'm' + Enter to trigger an annotation prompt.
Any other non-empty input is ignored with a hint.
Bare Enter (empty line) is silently ignored to prevent accidental marks.
"""
print("[MARK] Type 'm' + Enter to annotate the capture. Ctrl+C to stop.")
while not stop.is_set():
try:
line = input()
except EOFError:
# stdin closed (e.g. piped input exhausted)
break
except KeyboardInterrupt:
break
line = line.strip()
if not line:
continue # bare Enter — ignore silently
if line.lower() == "m":
try:
sys.stdout.write(" Label: ")
sys.stdout.flush()
label = input().strip()
except (EOFError, KeyboardInterrupt):
break
if label:
logger.log_mark(label)
print(f" [MARK written] {label}")
else:
print(" (empty label — mark cancelled)")
else:
print(" (type 'm' + Enter to annotate)")
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--bw", default="COM5", help="Blastware-side COM port (default: COM5)")
ap.add_argument("--s3", default="COM4", help="S3-side COM port (default: COM4)")
ap.add_argument("--baud", type=int, default=38400, help="Baud rate (default: 38400)")
ap.add_argument("--logdir", default=".", help="Directory to write session logs into (default: .)")
ap.add_argument("--quiet", action="store_true", help="No console heartbeat output")
ap.add_argument("--status-every", type=float, default=0.0, help="Seconds between console heartbeat lines (default: 0 = off)")
args = ap.parse_args()
print("Opening ports...")
try:
bw = open_serial(args.bw, args.baud)
s3 = open_serial(args.s3, args.baud)
except Exception as e:
print(f"Failed to open serial ports: {e!r}")
return 2
print(f"Connected: {args.bw} <-> {args.s3}")
os.makedirs(args.logdir, exist_ok=True)
ts = _dt.datetime.now().strftime("%Y%m%d_%H%M%S")
log_path = os.path.join(args.logdir, f"s3_session_{ts}.log")
bin_path = os.path.join(args.logdir, f"s3_session_{ts}.bin")
logger = SessionLogger(log_path, bin_path)
print(f"[LOG] Writing hex log to {log_path}")
print(f"[LOG] Writing binary log to {bin_path}")
logger.log_line(f"[{now_ts()}] [INFO] s3_bridge {VERSION} start")
logger.log_line(f"[{now_ts()}] [INFO] BW={args.bw} S3={args.s3} baud={args.baud}")
logger.log_mark(f"SESSION START — BW={args.bw} S3={args.s3} baud={args.baud}")
stop = threading.Event()
def handle_sigint(sig, frame):
stop.set()
signal.signal(signal.SIGINT, handle_sigint)
t1 = threading.Thread(
target=forward_loop,
name="BW_to_S3",
args=("BW->S3", bw, s3, logger, stop, args.quiet, args.status_every),
daemon=True,
)
t2 = threading.Thread(
target=forward_loop,
name="S3_to_BW",
args=("S3->BW", s3, bw, logger, stop, args.quiet, args.status_every),
daemon=True,
)
# Annotation loop runs in its own daemon thread so it doesn't block shutdown
t_ann = threading.Thread(
target=annotation_loop,
name="Annotator",
args=(logger, stop),
daemon=True,
)
t1.start()
t2.start()
t_ann.start()
try:
while not stop.is_set():
time.sleep(0.05)
finally:
print("\n[INFO] Ctrl+C detected, shutting down...")
logger.log_line(f"[{now_ts()}] [INFO] shutdown requested")
stop.set()
t1.join(timeout=1.0)
t2.join(timeout=1.0)
# t_ann is daemon — don't join, it may be blocked on input()
try:
bw.close()
except Exception:
pass
try:
s3.close()
except Exception:
pass
logger.log_mark("SESSION END")
logger.log_line(f"[{now_ts()}] [INFO] ports closed, session end")
print("[LOG] Closing session log")
logger.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())