v0.12.6 #10

Merged
serversdown merged 43 commits from seismo-lab-new into main 2026-05-04 13:22:56 -04:00
Showing only changes of commit b9ab368934 - Show all commits
+72 -65
View File
@@ -128,6 +128,12 @@ class BridgePanel(tk.Frame):
self._server: Optional[socket.socket] = None self._server: Optional[socket.socket] = None
self._tcp_stop_event = threading.Event() self._tcp_stop_event = threading.Event()
self._tcp_log_q: queue.Queue[str] = queue.Queue() self._tcp_log_q: queue.Queue[str] = queue.Queue()
# tcp capture file handles — written only when capture is active
self._tcp_cap_lock = threading.Lock()
self._tcp_cap_bw_fh = None
self._tcp_cap_s3_fh = None
self._tcp_cap_bw_path: Optional[str] = None
self._tcp_cap_s3_path: Optional[str] = None
# shared capture state # shared capture state
self._capturing = False self._capturing = False
self._cap_label: Optional[str] = None self._cap_label: Optional[str] = None
@@ -457,8 +463,6 @@ class BridgePanel(tk.Frame):
self.after(100, self._poll_stdout) self.after(100, self._poll_stdout)
def _start_capture(self) -> None: def _start_capture(self) -> None:
if not self.process or self.process.poll() is not None:
return
label = simpledialog.askstring( label = simpledialog.askstring(
"New Capture", "New Capture",
"Label for this capture\n(e.g. 'copy_event_download').\nLeave blank for timestamp only:", "Label for this capture\n(e.g. 'copy_event_download').\nLeave blank for timestamp only:",
@@ -467,32 +471,58 @@ class BridgePanel(tk.Frame):
if label is None: if label is None:
return return
label = label.strip() label = label.strip()
self._capturing = True
self._cap_label = label or datetime.datetime.now().strftime("%H%M%S")
if self._mode.get() == "tcp":
# TCP: open the capture files now; pipe threads write here while active
logdir = self.logdir_var.get().strip() or "."
os.makedirs(logdir, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
bw_path = os.path.join(logdir, f"raw_bw_{ts}.bin")
s3_path = os.path.join(logdir, f"raw_s3_{ts}.bin")
with self._tcp_cap_lock:
self._tcp_cap_bw_fh = open(bw_path, "wb")
self._tcp_cap_s3_fh = open(s3_path, "wb")
self._tcp_cap_bw_path = bw_path
self._tcp_cap_s3_path = s3_path
self._cap_history.append({"label": self._cap_label, "status": "recording",
"bw": bw_path, "s3": s3_path})
self._refresh_hist()
self._on_cap_started_msg(bw_path, s3_path)
else:
if not self.process or self.process.poll() is not None:
return
try: try:
self.process.stdin.write(f"CAP_START:{label}\n") self.process.stdin.write(f"CAP_START:{label}\n")
self.process.stdin.flush() self.process.stdin.flush()
except Exception as e: except Exception as e:
messagebox.showerror("Error", f"Failed to start capture:\n{e}") messagebox.showerror("Error", f"Failed to start capture:\n{e}")
return return
self._capturing = True
self._cap_label = label or datetime.datetime.now().strftime("%H%M%S")
self.cap_btn.configure(state="disabled")
self.stop_cap_btn.configure(state="normal", bg=RED)
self.mark_btn.configure(state="normal")
self._append_log(f"[CAPTURE] Starting: {self._cap_label!r}...\n")
self._cap_history.append({"label": self._cap_label, "status": "recording", self._cap_history.append({"label": self._cap_label, "status": "recording",
"bw": None, "s3": None}) "bw": None, "s3": None})
self._refresh_hist() self._refresh_hist()
self.cap_btn.configure(state="disabled")
self.stop_cap_btn.configure(state="normal", bg=RED)
self.mark_btn.configure(state="normal")
self._append_log(f"[CAPTURE] Starting: {self._cap_label!r}...\n")
def _stop_capture(self) -> None: def _stop_capture(self) -> None:
if self._mode.get() == "tcp": if self._mode.get() == "tcp":
# TCP: close the server so the current session ends naturally with self._tcp_cap_lock:
self._tcp_stop_event.set() bw_path = self._tcp_cap_bw_path
if self._server: s3_path = self._tcp_cap_s3_path
try: if self._tcp_cap_bw_fh:
self._server.close() self._tcp_cap_bw_fh.close()
except OSError: self._tcp_cap_bw_fh = None
pass if self._tcp_cap_s3_fh:
self._server = None self._tcp_cap_s3_fh.close()
self._tcp_cap_s3_fh = None
self._tcp_cap_bw_path = None
self._tcp_cap_s3_path = None
if bw_path and s3_path:
self._on_cap_stopped_msg(bw_path, s3_path)
return return
if not self.process or self.process.poll() is not None: if not self.process or self.process.poll() is not None:
return return
@@ -534,6 +564,7 @@ class BridgePanel(tk.Frame):
self._tcp_stop_event.clear() self._tcp_stop_event.clear()
self.start_btn.configure(state="disabled") self.start_btn.configure(state="disabled")
self.stop_btn.configure(state="normal", bg=RED) self.stop_btn.configure(state="normal", bg=RED)
self.cap_btn.configure(state="normal")
self.status_var.set(f"Listening on :{listen_port}") self.status_var.set(f"Listening on :{listen_port}")
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
@@ -541,18 +572,25 @@ class BridgePanel(tk.Frame):
f"== TCP Bridge started [{ts}]\n" f"== TCP Bridge started [{ts}]\n"
f" Listening on 0.0.0.0:{listen_port}\n" f" Listening on 0.0.0.0:{listen_port}\n"
f" Forwarding to {remote_host}:{remote_port}\n" f" Forwarding to {remote_host}:{remote_port}\n"
f" Each Blastware connection is automatically captured.\n==\n" f" Click 'New Capture' before the operation you want to record.\n==\n"
) )
self._on_started(None) self._on_started(None)
logdir = self.logdir_var.get().strip() or "."
threading.Thread( threading.Thread(
target=self._accept_loop, target=self._accept_loop,
args=(srv, remote_host, remote_port, logdir), args=(srv, remote_host, remote_port),
daemon=True, daemon=True,
).start() ).start()
def _stop_tcp(self) -> None: def _stop_tcp(self) -> None:
# Close any open capture files first
with self._tcp_cap_lock:
if self._tcp_cap_bw_fh:
self._tcp_cap_bw_fh.close()
self._tcp_cap_bw_fh = None
if self._tcp_cap_s3_fh:
self._tcp_cap_s3_fh.close()
self._tcp_cap_s3_fh = None
self._tcp_stop_event.set() self._tcp_stop_event.set()
if self._server: if self._server:
try: try:
@@ -563,8 +601,7 @@ class BridgePanel(tk.Frame):
self._bridge_ended() self._bridge_ended()
self._on_stopped() self._on_stopped()
def _accept_loop(self, srv: socket.socket, remote_host: str, def _accept_loop(self, srv: socket.socket, remote_host: str, remote_port: int) -> None:
remote_port: int, logdir: str) -> None:
while not self._tcp_stop_event.is_set(): while not self._tcp_stop_event.is_set():
try: try:
client_sock, addr = srv.accept() client_sock, addr = srv.accept()
@@ -585,44 +622,24 @@ class BridgePanel(tk.Frame):
continue continue
self._tcp_log_q.put(f"[TCP] Connected to device at {remote_host}:{remote_port}\n") self._tcp_log_q.put(f"[TCP] Connected to device at {remote_host}:{remote_port}\n")
self._run_tcp_session(client_sock, dev_sock)
self._tcp_log_q.put(f"[TCP] Connection from {peer} closed\n")
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") def _run_tcp_session(self, bw_sock: socket.socket, dev_sock: socket.socket) -> None:
os.makedirs(logdir, exist_ok=True) """Forward bytes in both directions; write to capture files only when active."""
bw_path = os.path.join(logdir, f"raw_bw_{ts}.bin")
s3_path = os.path.join(logdir, f"raw_s3_{ts}.bin")
# Auto-register in history as recording
label = f"tcp_{ts}"
self.after(0, self._tcp_session_started, bw_path, s3_path, label, peer, remote_host, remote_port)
self._run_tcp_session(client_sock, dev_sock, bw_path, s3_path, ts)
self._tcp_log_q.put(f"<<tcp_session_ended>>\t{bw_path}\t{s3_path}\t{label}")
def _tcp_session_started(self, bw_path, s3_path, label, peer, remote_host, remote_port) -> None:
self.status_var.set(f"Active: {peer}{remote_host}:{remote_port}")
self._cap_label = label
self._cap_history.append({"label": label, "status": "recording",
"bw": bw_path, "s3": s3_path})
self._refresh_hist()
self.mark_btn.configure(state="normal")
self.stop_cap_btn.configure(state="normal", bg=RED)
if self._on_cap_started:
self._on_cap_started(bw_path, s3_path, label)
def _run_tcp_session(self, bw_sock: socket.socket, dev_sock: socket.socket,
bw_path: str, s3_path: str, ts: str) -> None:
bw_bytes = [0] bw_bytes = [0]
s3_bytes = [0] s3_bytes = [0]
with open(bw_path, "wb") as bw_fh, open(s3_path, "wb") as s3_fh: def _pipe(src, dst, get_fh, counter):
def _pipe(src, dst, fh, counter):
try: try:
while True: while True:
data = src.recv(4096) data = src.recv(4096)
if not data: if not data:
break break
dst.sendall(data) dst.sendall(data)
with self._tcp_cap_lock:
fh = get_fh()
if fh:
fh.write(data) fh.write(data)
fh.flush() fh.flush()
counter[0] += len(data) counter[0] += len(data)
@@ -634,33 +651,23 @@ class BridgePanel(tk.Frame):
except OSError: except OSError:
pass pass
t_bw = threading.Thread(target=_pipe, args=(bw_sock, dev_sock, bw_fh, bw_bytes), daemon=True) t_bw = threading.Thread(target=_pipe,
t_s3 = threading.Thread(target=_pipe, args=(dev_sock, bw_sock, s3_fh, s3_bytes), daemon=True) args=(bw_sock, dev_sock,
lambda: self._tcp_cap_bw_fh, bw_bytes), daemon=True)
t_s3 = threading.Thread(target=_pipe,
args=(dev_sock, bw_sock,
lambda: self._tcp_cap_s3_fh, s3_bytes), daemon=True)
t_bw.start() t_bw.start()
t_s3.start() t_s3.start()
t_bw.join() t_bw.join()
t_s3.join() t_s3.join()
bw_sock.close() bw_sock.close()
dev_sock.close() dev_sock.close()
self._tcp_log_q.put(
f"[TCP] Session {ts} done "
f"BW→dev: {bw_bytes[0]} bytes dev→BW: {s3_bytes[0]} bytes\n"
)
def _poll_tcp_log(self) -> None: def _poll_tcp_log(self) -> None:
try: try:
while True: while True:
msg = self._tcp_log_q.get_nowait() msg = self._tcp_log_q.get_nowait()
if msg.startswith("<<tcp_session_ended>>"):
parts = msg.split("\t")
if len(parts) == 4:
_, bw_path, s3_path, label = parts
self._on_cap_stopped_msg(bw_path, s3_path)
if self._server is not None:
self.status_var.set(f"Listening on :{self.listen_port_var.get()}")
self.stop_cap_btn.configure(state="disabled", bg=BG3)
else:
self._append_log(msg) self._append_log(msg)
except queue.Empty: except queue.Empty:
pass pass