diff --git a/seismo_lab.py b/seismo_lab.py index a8b6c35..c1fbd3d 100644 --- a/seismo_lab.py +++ b/seismo_lab.py @@ -128,6 +128,12 @@ class BridgePanel(tk.Frame): self._server: Optional[socket.socket] = None self._tcp_stop_event = threading.Event() self._tcp_log_q: queue.Queue[str] = queue.Queue() + # tcp capture file handles — written only when capture is active + self._tcp_cap_lock = threading.Lock() + self._tcp_cap_bw_fh = None + self._tcp_cap_s3_fh = None + self._tcp_cap_bw_path: Optional[str] = None + self._tcp_cap_s3_path: Optional[str] = None # shared capture state self._capturing = False self._cap_label: Optional[str] = None @@ -457,8 +463,6 @@ class BridgePanel(tk.Frame): self.after(100, self._poll_stdout) def _start_capture(self) -> None: - if not self.process or self.process.poll() is not None: - return label = simpledialog.askstring( "New Capture", "Label for this capture\n(e.g. 'copy_event_download').\nLeave blank for timestamp only:", @@ -467,32 +471,58 @@ class BridgePanel(tk.Frame): if label is None: return label = label.strip() - try: - self.process.stdin.write(f"CAP_START:{label}\n") - self.process.stdin.flush() - except Exception as e: - messagebox.showerror("Error", f"Failed to start capture:\n{e}") - return self._capturing = True self._cap_label = label or datetime.datetime.now().strftime("%H%M%S") + + if self._mode.get() == "tcp": + # TCP: open the capture files now; pipe threads write here while active + logdir = self.logdir_var.get().strip() or "." + os.makedirs(logdir, exist_ok=True) + ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + bw_path = os.path.join(logdir, f"raw_bw_{ts}.bin") + s3_path = os.path.join(logdir, f"raw_s3_{ts}.bin") + with self._tcp_cap_lock: + self._tcp_cap_bw_fh = open(bw_path, "wb") + self._tcp_cap_s3_fh = open(s3_path, "wb") + self._tcp_cap_bw_path = bw_path + self._tcp_cap_s3_path = s3_path + self._cap_history.append({"label": self._cap_label, "status": "recording", + "bw": bw_path, "s3": s3_path}) + self._refresh_hist() + self._on_cap_started_msg(bw_path, s3_path) + else: + if not self.process or self.process.poll() is not None: + return + try: + self.process.stdin.write(f"CAP_START:{label}\n") + self.process.stdin.flush() + except Exception as e: + messagebox.showerror("Error", f"Failed to start capture:\n{e}") + return + self._cap_history.append({"label": self._cap_label, "status": "recording", + "bw": None, "s3": None}) + self._refresh_hist() + self.cap_btn.configure(state="disabled") self.stop_cap_btn.configure(state="normal", bg=RED) self.mark_btn.configure(state="normal") self._append_log(f"[CAPTURE] Starting: {self._cap_label!r}...\n") - self._cap_history.append({"label": self._cap_label, "status": "recording", - "bw": None, "s3": None}) - self._refresh_hist() def _stop_capture(self) -> None: if self._mode.get() == "tcp": - # TCP: close the server so the current session ends naturally - self._tcp_stop_event.set() - if self._server: - try: - self._server.close() - except OSError: - pass - self._server = None + with self._tcp_cap_lock: + bw_path = self._tcp_cap_bw_path + s3_path = self._tcp_cap_s3_path + if self._tcp_cap_bw_fh: + self._tcp_cap_bw_fh.close() + self._tcp_cap_bw_fh = None + if self._tcp_cap_s3_fh: + self._tcp_cap_s3_fh.close() + self._tcp_cap_s3_fh = None + self._tcp_cap_bw_path = None + self._tcp_cap_s3_path = None + if bw_path and s3_path: + self._on_cap_stopped_msg(bw_path, s3_path) return if not self.process or self.process.poll() is not None: return @@ -534,6 +564,7 @@ class BridgePanel(tk.Frame): self._tcp_stop_event.clear() self.start_btn.configure(state="disabled") self.stop_btn.configure(state="normal", bg=RED) + self.cap_btn.configure(state="normal") self.status_var.set(f"Listening on :{listen_port}") ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") @@ -541,18 +572,25 @@ class BridgePanel(tk.Frame): f"== TCP Bridge started [{ts}]\n" f" Listening on 0.0.0.0:{listen_port}\n" f" Forwarding to {remote_host}:{remote_port}\n" - f" Each Blastware connection is automatically captured.\n==\n" + f" Click 'New Capture' before the operation you want to record.\n==\n" ) self._on_started(None) - logdir = self.logdir_var.get().strip() or "." threading.Thread( target=self._accept_loop, - args=(srv, remote_host, remote_port, logdir), + args=(srv, remote_host, remote_port), daemon=True, ).start() def _stop_tcp(self) -> None: + # Close any open capture files first + with self._tcp_cap_lock: + if self._tcp_cap_bw_fh: + self._tcp_cap_bw_fh.close() + self._tcp_cap_bw_fh = None + if self._tcp_cap_s3_fh: + self._tcp_cap_s3_fh.close() + self._tcp_cap_s3_fh = None self._tcp_stop_event.set() if self._server: try: @@ -563,8 +601,7 @@ class BridgePanel(tk.Frame): self._bridge_ended() self._on_stopped() - def _accept_loop(self, srv: socket.socket, remote_host: str, - remote_port: int, logdir: str) -> None: + def _accept_loop(self, srv: socket.socket, remote_host: str, remote_port: int) -> None: while not self._tcp_stop_event.is_set(): try: client_sock, addr = srv.accept() @@ -585,83 +622,53 @@ class BridgePanel(tk.Frame): continue self._tcp_log_q.put(f"[TCP] Connected to device at {remote_host}:{remote_port}\n") + self._run_tcp_session(client_sock, dev_sock) + self._tcp_log_q.put(f"[TCP] Connection from {peer} closed\n") - ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - os.makedirs(logdir, exist_ok=True) - bw_path = os.path.join(logdir, f"raw_bw_{ts}.bin") - s3_path = os.path.join(logdir, f"raw_s3_{ts}.bin") - - # Auto-register in history as recording - label = f"tcp_{ts}" - self.after(0, self._tcp_session_started, bw_path, s3_path, label, peer, remote_host, remote_port) - - self._run_tcp_session(client_sock, dev_sock, bw_path, s3_path, ts) - - self._tcp_log_q.put(f"<>\t{bw_path}\t{s3_path}\t{label}") - - def _tcp_session_started(self, bw_path, s3_path, label, peer, remote_host, remote_port) -> None: - self.status_var.set(f"Active: {peer} → {remote_host}:{remote_port}") - self._cap_label = label - self._cap_history.append({"label": label, "status": "recording", - "bw": bw_path, "s3": s3_path}) - self._refresh_hist() - self.mark_btn.configure(state="normal") - self.stop_cap_btn.configure(state="normal", bg=RED) - if self._on_cap_started: - self._on_cap_started(bw_path, s3_path, label) - - def _run_tcp_session(self, bw_sock: socket.socket, dev_sock: socket.socket, - bw_path: str, s3_path: str, ts: str) -> None: + def _run_tcp_session(self, bw_sock: socket.socket, dev_sock: socket.socket) -> None: + """Forward bytes in both directions; write to capture files only when active.""" bw_bytes = [0] s3_bytes = [0] - with open(bw_path, "wb") as bw_fh, open(s3_path, "wb") as s3_fh: - def _pipe(src, dst, fh, counter): + def _pipe(src, dst, get_fh, counter): + try: + while True: + data = src.recv(4096) + if not data: + break + dst.sendall(data) + with self._tcp_cap_lock: + fh = get_fh() + if fh: + fh.write(data) + fh.flush() + counter[0] += len(data) + except OSError: + pass + finally: try: - while True: - data = src.recv(4096) - if not data: - break - dst.sendall(data) - fh.write(data) - fh.flush() - counter[0] += len(data) + dst.shutdown(socket.SHUT_WR) except OSError: pass - finally: - try: - dst.shutdown(socket.SHUT_WR) - except OSError: - pass - - t_bw = threading.Thread(target=_pipe, args=(bw_sock, dev_sock, bw_fh, bw_bytes), daemon=True) - t_s3 = threading.Thread(target=_pipe, args=(dev_sock, bw_sock, s3_fh, s3_bytes), daemon=True) - t_bw.start() - t_s3.start() - t_bw.join() - t_s3.join() + t_bw = threading.Thread(target=_pipe, + args=(bw_sock, dev_sock, + lambda: self._tcp_cap_bw_fh, bw_bytes), daemon=True) + t_s3 = threading.Thread(target=_pipe, + args=(dev_sock, bw_sock, + lambda: self._tcp_cap_s3_fh, s3_bytes), daemon=True) + t_bw.start() + t_s3.start() + t_bw.join() + t_s3.join() bw_sock.close() dev_sock.close() - self._tcp_log_q.put( - f"[TCP] Session {ts} done " - f"BW→dev: {bw_bytes[0]} bytes dev→BW: {s3_bytes[0]} bytes\n" - ) def _poll_tcp_log(self) -> None: try: while True: msg = self._tcp_log_q.get_nowait() - if msg.startswith("<>"): - parts = msg.split("\t") - if len(parts) == 4: - _, bw_path, s3_path, label = parts - self._on_cap_stopped_msg(bw_path, s3_path) - if self._server is not None: - self.status_var.set(f"Listening on :{self.listen_port_var.get()}") - self.stop_cap_btn.configure(state="disabled", bg=BG3) - else: - self._append_log(msg) + self._append_log(msg) except queue.Empty: pass finally: