import serial import threading import datetime import sys import os PORT_A = "COM5" # Real device PORT_B = "COM4" # Virtual port for Blastware BAUD = 38400 class SessionLogger: def __init__(self): ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") self.filename = f"s3_session_{ts}.log" self.file = open(self.filename, "w", buffering=1) print(f"[LOG] Writing session log to {self.filename}") def log(self, direction, data): now = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3] hex_string = " ".join(f"{b:02X}" for b in data) line = f"[{now}] [{direction}] {hex_string}" print(line) self.file.write(line + "\n") def close(self): print("[LOG] Closing session log") self.file.close() def forward(src, dst, direction, logger): try: while True: data = src.read(src.in_waiting or 1) if data: logger.log(direction, data) dst.write(data) except Exception as e: print(f"[ERROR] {direction}: {e}") def main(): print("Opening ports...") ser_a = serial.Serial(PORT_A, BAUD, timeout=0) ser_b = serial.Serial(PORT_B, BAUD, timeout=0) print(f"Connected: {PORT_A} <-> {PORT_B}") logger = SessionLogger() t1 = threading.Thread( target=forward, args=(ser_b, ser_a, "BLASTWARE -> S3", logger), daemon=True ) t2 = threading.Thread( target=forward, args=(ser_a, ser_b, "S3 -> BLASTWARE", logger), daemon=True ) t1.start() t2.start() try: while True: pass except KeyboardInterrupt: print("\n[INFO] Ctrl+C detected, shutting down...") finally: logger.close() ser_a.close() ser_b.close() if __name__ == "__main__": main()