feat: update s3_bridge to v0.4.0 with annotation markers and dual log output

This commit is contained in:
serversdwn
2026-02-27 02:24:47 -05:00
parent 75de3fb2fc
commit 0ad1505cc5
2 changed files with 114 additions and 7 deletions

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""
s3_bridge.py — S3 <-> Blastware serial bridge with frame-aware session logging
Version: v0.3.0
Version: v0.4.0
Key features:
- Low CPU: avoids per-byte console printing
@@ -9,12 +9,21 @@ Key features:
- Frame-aware logging: buffers per direction until ETX (0x03), then logs full frame on one line
- Also logs plain ASCII bursts (e.g., "Operating System") cleanly
- Dual log output: hex text log (.log) AND raw binary log (.bin) written simultaneously
- Session log files created on start, closed on Ctrl+C
- Interactive annotation: type 'm' + Enter to stamp a [MARK] into both logs mid-capture
- Binary sentinel markers: out-of-band FF FF FF FF <len> <label> in .bin for programmatic correlation
- Auto-marks on session start and end
Usage examples:
python s3_bridge.py
python s3_bridge.py --bw COM5 --s3 COM4 --baud 38400
python s3_bridge.py --quiet
Annotation:
While running, type 'm' and press Enter. You will be prompted for a label.
The mark is written to the .log as:
[HH:MM:SS.mmm] >>> MARK: your label here
And to the .bin as an out-of-band sentinel (never valid frame data):
FF FF FF FF <1-byte length> <label bytes>
"""
from __future__ import annotations
@@ -31,7 +40,12 @@ from typing import Optional
import serial
VERSION = "v0.3.0"
VERSION = "v0.4.0"
# Sentinel prefix for binary markers. Four 0xFF bytes can never appear in
# valid Instantel DLE-framed data (0xFF is not a legal protocol byte in any
# framing position), so this sequence is unambiguously out-of-band.
BIN_MARK_SENTINEL = b"\xFF\xFF\xFF\xFF"
def now_ts() -> str:
@@ -74,6 +88,20 @@ class SessionLogger:
with self._lock:
self._bin_fh.write(data)
def log_mark(self, label: str) -> None:
"""
Write an annotation mark to both logs simultaneously.
.log — visually distinct line: [TS] >>> MARK: label
.bin — out-of-band sentinel: FF FF FF FF <len> <label utf-8, max 255 bytes>
"""
ts = now_ts()
label_bytes = label.encode("utf-8", errors="replace")[:255]
sentinel = BIN_MARK_SENTINEL + bytes([len(label_bytes)]) + label_bytes
with self._lock:
self._fh.write(f"[{ts}] >>> MARK: {label}\n")
self._bin_fh.write(sentinel)
def close(self) -> None:
with self._lock:
try:
@@ -202,6 +230,43 @@ def forward_loop(
time.sleep(0.002)
def annotation_loop(logger: SessionLogger, stop: threading.Event) -> None:
"""
Runs on the main thread (or a dedicated thread) reading stdin.
Type 'm' + Enter to trigger an annotation prompt.
Any other non-empty input is ignored with a hint.
Bare Enter (empty line) is silently ignored to prevent accidental marks.
"""
print("[MARK] Type 'm' + Enter to annotate the capture. Ctrl+C to stop.")
while not stop.is_set():
try:
line = input()
except EOFError:
# stdin closed (e.g. piped input exhausted)
break
except KeyboardInterrupt:
break
line = line.strip()
if not line:
continue # bare Enter — ignore silently
if line.lower() == "m":
try:
sys.stdout.write(" Label: ")
sys.stdout.flush()
label = input().strip()
except (EOFError, KeyboardInterrupt):
break
if label:
logger.log_mark(label)
print(f" [MARK written] {label}")
else:
print(" (empty label — mark cancelled)")
else:
print(" (type 'm' + Enter to annotate)")
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--bw", default="COM5", help="Blastware-side COM port (default: COM5)")
@@ -229,8 +294,10 @@ def main() -> int:
logger = SessionLogger(log_path, bin_path)
print(f"[LOG] Writing hex log to {log_path}")
print(f"[LOG] Writing binary log to {bin_path}")
logger.log_line(f"[{now_ts()}] [INFO] s3_bridge {VERSION} start")
logger.log_line(f"[{now_ts()}] [INFO] BW={args.bw} S3={args.s3} baud={args.baud}")
logger.log_mark(f"SESSION START — BW={args.bw} S3={args.s3} baud={args.baud}")
stop = threading.Event()
@@ -251,12 +318,19 @@ def main() -> int:
args=("S3->BW", s3, bw, logger, stop, args.quiet, args.status_every),
daemon=True,
)
# Annotation loop runs in its own daemon thread so it doesn't block shutdown
t_ann = threading.Thread(
target=annotation_loop,
name="Annotator",
args=(logger, stop),
daemon=True,
)
t1.start()
t2.start()
t_ann.start()
try:
# Wait until Ctrl+C
while not stop.is_set():
time.sleep(0.05)
finally:
@@ -266,6 +340,7 @@ def main() -> int:
stop.set()
t1.join(timeout=1.0)
t2.join(timeout=1.0)
# t_ann is daemon — don't join, it may be blocked on input()
try:
bw.close()
@@ -276,6 +351,7 @@ def main() -> int:
except Exception:
pass
logger.log_mark("SESSION END")
logger.log_line(f"[{now_ts()}] [INFO] ports closed, session end")
print("[LOG] Closing session log")
logger.close()
@@ -284,4 +360,4 @@ def main() -> int:
if __name__ == "__main__":
raise SystemExit(main())
raise SystemExit(main())