feat: add persistent bridge and streamlined capture pipeline to seismo_lab.py

This commit is contained in:
2026-04-20 15:09:55 -04:00
parent aa28495a43
commit f10c5c1b86
2 changed files with 259 additions and 39 deletions
+70 -5
View File
@@ -93,8 +93,11 @@ class SessionLogger:
self._bin_fh = open(bin_path, "ab", buffering=0)
self._lock = threading.Lock()
# Optional pure-byte taps (no headers). BW=Blastware tx, S3=device tx.
# These can be opened/closed on demand via start_raw_capture/stop_raw_capture.
self._raw_bw = open(raw_bw_path, "ab", buffering=0) if raw_bw_path else None
self._raw_s3 = open(raw_s3_path, "ab", buffering=0) if raw_s3_path else None
self._cap_bw_path: Optional[str] = raw_bw_path
self._cap_s3_path: Optional[str] = raw_s3_path
def log_line(self, line: str) -> None:
with self._lock:
@@ -124,6 +127,43 @@ class SessionLogger:
self.log_line(f"[{ts}] [INFO] {msg}")
self.bin_write_record(REC_INFO, msg.encode("utf-8", errors="replace"))
def start_raw_capture(self, label: str, logdir: str) -> tuple:
"""Open new raw tap files for a named capture. Returns (bw_path, s3_path)."""
ts = _dt.datetime.now().strftime("%Y%m%d_%H%M%S")
safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in label)[:40] if label else ""
suffix = f"_{safe}" if safe else ""
bw_path = os.path.join(logdir, f"raw_bw_{ts}{suffix}.bin")
s3_path = os.path.join(logdir, f"raw_s3_{ts}{suffix}.bin")
with self._lock:
# Close any previously open taps first
if self._raw_bw:
self._raw_bw.close()
if self._raw_s3:
self._raw_s3.close()
self._raw_bw = open(bw_path, "ab", buffering=0)
self._raw_s3 = open(s3_path, "ab", buffering=0)
self._cap_bw_path = bw_path
self._cap_s3_path = s3_path
self.log_info(f"raw capture started: label={label!r} bw={bw_path} s3={s3_path}")
return bw_path, s3_path
def stop_raw_capture(self) -> tuple:
"""Close raw tap files. Returns (bw_path, s3_path) for the capture just closed."""
with self._lock:
bw = self._cap_bw_path
s3 = self._cap_s3_path
if self._raw_bw:
self._raw_bw.close()
self._raw_bw = None
if self._raw_s3:
self._raw_s3.close()
self._raw_s3 = None
self._cap_bw_path = None
self._cap_s3_path = None
if bw:
self.log_info(f"raw capture stopped: bw={bw} s3={s3}")
return bw, s3
def close(self) -> None:
with self._lock:
try:
@@ -291,8 +331,18 @@ def forward_loop(
time.sleep(0.002)
def annotation_loop(logger: SessionLogger, stop: threading.Event) -> None:
print("[MARK] Type 'm' + Enter to annotate the capture. Ctrl+C to stop.")
def annotation_loop(logger: SessionLogger, logdir: str, stop: threading.Event) -> None:
"""
Reads stdin commands while the bridge runs.
Commands:
m — prompt for a mark label (interactive)
CAP_START:<label> — begin a raw tap capture with the given label
CAP_STOP — stop the current raw tap capture
Responses (printed to stdout, parsed by the GUI):
[CAP_START] <bw_path>\\t<s3_path>
[CAP_STOP] <bw_path>\\t<s3_path>
"""
while not stop.is_set():
try:
line = input()
@@ -303,7 +353,21 @@ def annotation_loop(logger: SessionLogger, stop: threading.Event) -> None:
if not line:
continue
if line.lower() == "m":
if line.startswith("CAP_START:"):
label = line[10:].strip()
bw_path, s3_path = logger.start_raw_capture(label, logdir)
print(f"[CAP_START] {bw_path}\t{s3_path}")
sys.stdout.flush()
elif line == "CAP_STOP":
bw_path, s3_path = logger.stop_raw_capture()
if bw_path:
print(f"[CAP_STOP] {bw_path}\t{s3_path}")
else:
print("[CAP_STOP] no active capture")
sys.stdout.flush()
elif line.lower() == "m":
try:
sys.stdout.write(" Label: ")
sys.stdout.flush()
@@ -315,8 +379,9 @@ def annotation_loop(logger: SessionLogger, stop: threading.Event) -> None:
print(f" [MARK written] {label}")
else:
print(" (empty label — mark cancelled)")
else:
print(" (type 'm' + Enter to annotate)")
print(f" (unknown command: {line!r})")
def main() -> int:
@@ -391,7 +456,7 @@ def main() -> int:
t_ann = threading.Thread(
target=annotation_loop,
name="Annotator",
args=(logger, stop),
args=(logger, args.logdir, stop),
daemon=True,
)
+189 -34
View File
@@ -97,16 +97,24 @@ class AnalyzerState:
class BridgePanel(tk.Frame):
"""
All bridge controls and live log output.
Calls on_bridge_started(raw_bw_path, raw_s3_path) when the bridge starts
so the parent can wire up the Analyzer.
Calls on_bridge_started(struct_bin_path) when the bridge starts.
Calls on_capture_started(bw_path, s3_path, label) when a capture begins.
Calls on_capture_complete(bw_path, s3_path, label) when a capture ends.
"""
def __init__(self, parent: tk.Widget, on_bridge_started, on_bridge_stopped, **kw):
def __init__(self, parent: tk.Widget, on_bridge_started, on_bridge_stopped,
on_capture_started=None, on_capture_complete=None, **kw):
super().__init__(parent, bg=BG2, **kw)
self._on_started = on_bridge_started # signature: (raw_bw, raw_s3, struct_bin)
self._on_stopped = on_bridge_stopped
self._on_started = on_bridge_started # signature: (struct_bin)
self._on_stopped = on_bridge_stopped
self._on_cap_started = on_capture_started # (bw, s3, label)
self._on_cap_complete = on_capture_complete # (bw, s3, label)
self.process: Optional[subprocess.Popen] = None
self._stdout_q: queue.Queue[str] = queue.Queue()
# Capture state
self._capturing = False
self._cap_label: Optional[str] = None
self._cap_history: list[dict] = [] # {label, status, bw, s3}
self._build()
self._poll_stdout()
@@ -146,17 +154,7 @@ class BridgePanel(tk.Frame):
tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=self._choose_dir).grid(row=1, column=5, **pad)
# Row 2: raw taps (always enabled — timestamped names generated at start)
self._raw_bw_on = tk.BooleanVar(value=True)
self._raw_s3_on = tk.BooleanVar(value=True)
tk.Checkbutton(cfg, text="Capture BW->S3 raw", variable=self._raw_bw_on,
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO).grid(row=2, column=0, columnspan=2, sticky="w", **pad)
tk.Checkbutton(cfg, text="Capture S3->BW raw", variable=self._raw_s3_on,
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO).grid(row=2, column=2, columnspan=2, sticky="w", **pad)
# Row 3: buttons + status
# Row 2: buttons + status
btn_row = tk.Frame(self, bg=BG2)
btn_row.pack(side=tk.TOP, fill=tk.X, padx=4, pady=2)
@@ -170,6 +168,18 @@ class BridgePanel(tk.Frame):
command=self.stop_bridge, state="disabled")
self.stop_btn.pack(side=tk.LEFT, padx=4)
tk.Frame(btn_row, bg=BG2, width=16).pack(side=tk.LEFT) # spacer
self.cap_btn = tk.Button(btn_row, text="⬤ New Capture", bg=ORANGE, fg="#000000",
relief="flat", padx=10, cursor="hand2", font=MONO_B,
command=self._start_capture, state="disabled")
self.cap_btn.pack(side=tk.LEFT, padx=4)
self.stop_cap_btn = tk.Button(btn_row, text="■ Stop Capture", bg=BG3, fg=RED,
relief="flat", padx=10, cursor="hand2", font=MONO_B,
command=self._stop_capture, state="disabled")
self.stop_cap_btn.pack(side=tk.LEFT, padx=4)
self.mark_btn = tk.Button(btn_row, text="Add Mark", bg=BG3, fg=FG,
relief="flat", padx=10, cursor="hand2", font=MONO,
command=self.add_mark, state="disabled")
@@ -179,9 +189,34 @@ class BridgePanel(tk.Frame):
tk.Label(btn_row, textvariable=self.status_var,
bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=10)
# Capture history panel
hist_outer = tk.Frame(self, bg=BG2)
hist_outer.pack(side=tk.TOP, fill=tk.X, padx=4, pady=(2, 0))
tk.Label(hist_outer, text="Captures:", bg=BG2, fg=FG_DIM,
font=MONO_SM, anchor="w").pack(side=tk.LEFT, padx=(4, 6))
hist_inner = tk.Frame(hist_outer, bg=BG2)
hist_inner.pack(side=tk.LEFT, fill=tk.X, expand=True)
self._hist_lb = tk.Listbox(
hist_inner, bg=BG3, fg=FG, font=MONO_SM,
height=3, relief="flat", selectbackground=BG,
selectforeground=ACCENT, activestyle="none",
highlightthickness=0,
)
hist_vsb = ttk.Scrollbar(hist_inner, orient="vertical", command=self._hist_lb.yview)
self._hist_lb.configure(yscrollcommand=hist_vsb.set)
hist_vsb.pack(side=tk.RIGHT, fill=tk.Y)
self._hist_lb.pack(side=tk.LEFT, fill=tk.X, expand=True)
self._hist_lb.bind("<Double-Button-1>", self._on_hist_dblclick)
tk.Label(hist_outer, text="dbl-click to reload", bg=BG2, fg=FG_DIM,
font=MONO_SM, anchor="e").pack(side=tk.RIGHT, padx=6)
# Log output
self.log_view = scrolledtext.ScrolledText(
self, height=18, font=MONO_SM,
self, height=14, font=MONO_SM,
bg=BG, fg=FG, insertbackground=FG,
relief="flat", state="disabled",
)
@@ -221,14 +256,8 @@ class BridgePanel(tk.Frame):
args = [sys.executable, str(BRIDGE_PATH),
"--bw", bw, "--s3", s3, "--baud", baud, "--logdir", logdir]
raw_bw_path = raw_s3_path = None
if self._raw_bw_on.get():
raw_bw_path = os.path.join(logdir, f"raw_bw_{ts}.bin")
args += ["--raw-bw", raw_bw_path]
if self._raw_s3_on.get():
raw_s3_path = os.path.join(logdir, f"raw_s3_{ts}.bin")
args += ["--raw-s3", raw_s3_path]
# Raw BW/S3 taps are NOT opened at bridge start.
# Use "New Capture" to start a labeled tap on demand.
# Structured bin path — written by bridge automatically, named by ts
struct_bin_path = os.path.join(logdir, f"s3_session_{ts}.bin")
@@ -250,11 +279,12 @@ class BridgePanel(tk.Frame):
self.status_var.set(f"Running — {bw} <-> {s3}")
self.start_btn.configure(state="disabled")
self.stop_btn.configure(state="normal", bg=RED)
self.mark_btn.configure(state="normal")
self.cap_btn.configure(state="normal")
self._append_log(f"== Bridge started [{ts}] ==\n")
self._append_log(" Click 'New Capture' when ready to record a setting change.\n")
# Notify parent so Analyzer can wire up live mode
self._on_started(raw_bw_path, raw_s3_path, struct_bin_path)
# Notify parent — no raw files yet, just the structured bin path
self._on_started(struct_bin_path)
def stop_bridge(self) -> None:
if self.process and self.process.poll() is None:
@@ -270,7 +300,11 @@ class BridgePanel(tk.Frame):
self.status_var.set("Stopped")
self.start_btn.configure(state="normal")
self.stop_btn.configure(state="disabled", bg=BG3)
self.cap_btn.configure(state="disabled")
self.stop_cap_btn.configure(state="disabled", bg=BG3)
self.mark_btn.configure(state="disabled")
self._capturing = False
self._cap_label = None
self._append_log("== Bridge stopped ==\n")
def _reader_thread(self) -> None:
@@ -288,12 +322,120 @@ class BridgePanel(tk.Frame):
self._bridge_ended()
self._on_stopped()
break
stripped = line.strip()
# Handle capture lifecycle events from bridge
if stripped.startswith("[CAP_START] ") and "\t" in stripped:
parts = stripped[12:].split("\t", 1)
if len(parts) == 2:
bw_path, s3_path = parts[0].strip(), parts[1].strip()
self._on_cap_started_msg(bw_path, s3_path)
elif stripped.startswith("[CAP_STOP] ") and "\t" in stripped:
parts = stripped[11:].split("\t", 1)
if len(parts) == 2:
bw_path, s3_path = parts[0].strip(), parts[1].strip()
self._on_cap_stopped_msg(bw_path, s3_path)
self._append_log(line)
except queue.Empty:
pass
finally:
self.after(100, self._poll_stdout)
# ── capture control ───────────────────────────────────────────────────
def _start_capture(self) -> None:
"""Ask for a label and tell the bridge to start writing raw tap files."""
if not self.process or self.process.poll() is not None:
return
label = simpledialog.askstring(
"New Capture",
"Label for this capture\n(e.g. 'recording_mode_continuous').\nLeave blank for timestamp only:",
parent=self,
)
if label is None:
return # user hit Cancel
label = label.strip()
try:
self.process.stdin.write(f"CAP_START:{label}\n")
self.process.stdin.flush()
except Exception as e:
messagebox.showerror("Error", f"Failed to start capture:\n{e}")
return
self._capturing = True
self._cap_label = label or datetime.datetime.now().strftime("%H%M%S")
self.cap_btn.configure(state="disabled")
self.stop_cap_btn.configure(state="normal", bg=RED)
self.mark_btn.configure(state="normal")
self._append_log(f"[CAPTURE] Starting: {self._cap_label!r}...\n")
# Add to history as recording (paths filled in when [CAP_START] arrives)
self._cap_history.append({"label": self._cap_label, "status": "recording",
"bw": None, "s3": None})
self._refresh_hist()
def _stop_capture(self) -> None:
"""Tell the bridge to flush and close the current raw tap files."""
if not self.process or self.process.poll() is not None:
return
try:
self.process.stdin.write("CAP_STOP\n")
self.process.stdin.flush()
except Exception as e:
messagebox.showerror("Error", f"Failed to stop capture:\n{e}")
# UI is updated when [CAP_STOP] arrives in stdout
def _on_cap_started_msg(self, bw_path: str, s3_path: str) -> None:
"""Called when bridge confirms capture has started (files are open)."""
# Fill in paths for the last 'recording' history entry
for entry in reversed(self._cap_history):
if entry["status"] == "recording" and entry["bw"] is None:
entry["bw"] = bw_path
entry["s3"] = s3_path
break
if self._on_cap_started:
self._on_cap_started(bw_path, s3_path, self._cap_label or "")
def _on_cap_stopped_msg(self, bw_path: str, s3_path: str) -> None:
"""Called when bridge confirms capture has stopped (files are closed)."""
label = self._cap_label or "capture"
# Mark history entry as done
for entry in reversed(self._cap_history):
if entry["status"] == "recording":
entry["status"] = "done"
entry["bw"] = bw_path
entry["s3"] = s3_path
break
self._refresh_hist()
self._capturing = False
self._cap_label = None
self.cap_btn.configure(state="normal")
self.stop_cap_btn.configure(state="disabled", bg=BG3)
self._append_log(f"[CAPTURE] Done: {label!r} — ready in Analyzer\n")
if self._on_cap_complete:
self._on_cap_complete(bw_path, s3_path, label)
def _refresh_hist(self) -> None:
self._hist_lb.delete(0, tk.END)
for entry in self._cap_history:
icon = "🔴" if entry["status"] == "recording" else ""
label = entry["label"] or "(unlabeled)"
self._hist_lb.insert(tk.END, f" {icon} {label}")
if self._cap_history:
self._hist_lb.see(tk.END)
def _on_hist_dblclick(self, _e=None) -> None:
sel = self._hist_lb.curselection()
if not sel:
return
entry = self._cap_history[sel[0]]
if entry["status"] == "done" and entry["bw"] and entry["s3"]:
if self._on_cap_complete:
self._on_cap_complete(entry["bw"], entry["s3"], entry["label"])
# ── mark ──────────────────────────────────────────────────────────────
def add_mark(self) -> None:
if not self.process or not self.process.stdin or self.process.poll() is not None:
return
@@ -1884,6 +2026,8 @@ class SeismoLab(tk.Tk):
nb,
on_bridge_started=self._on_bridge_started,
on_bridge_stopped=self._on_bridge_stopped,
on_capture_started=self._on_capture_started,
on_capture_complete=self._on_capture_complete,
)
nb.add(self._bridge_panel, text=" Bridge ")
@@ -1905,16 +2049,27 @@ class SeismoLab(tk.Tk):
self._nb = nb
self.protocol("WM_DELETE_WINDOW", self._on_close)
def _on_bridge_started(self, raw_bw: Optional[str], raw_s3: Optional[str],
struct_bin: Optional[str] = None) -> None:
"""Bridge started — inject paths into analyzer and start live mode."""
self._analyzer_panel.set_live_files(raw_bw, raw_s3, struct_bin)
# Switch to Analyzer tab so the user can watch it update
self._nb.select(1)
def _on_bridge_started(self, struct_bin: Optional[str] = None) -> None:
"""Bridge started — stash the structured bin path; stay on Bridge tab."""
if struct_bin:
self._analyzer_panel.bin_var.set(struct_bin)
def _on_bridge_stopped(self) -> None:
self._analyzer_panel.stop_live()
def _on_capture_started(self, bw_path: str, s3_path: str, label: str) -> None:
"""A capture began — wire up live mode in the Analyzer and switch tabs."""
self._analyzer_panel.set_live_files(bw_path, s3_path)
self._nb.select(1)
def _on_capture_complete(self, bw_path: str, s3_path: str, label: str) -> None:
"""A capture stopped — stop live mode, run full analysis, switch to Analyzer."""
self._analyzer_panel.stop_live()
self._analyzer_panel.s3_var.set(s3_path)
self._analyzer_panel.bw_var.set(bw_path)
self._analyzer_panel._run_analyze()
self._nb.select(1)
def _on_console_send_to_analyzer(self, raw_s3_path: str) -> None:
"""Console captured bytes → inject into Analyzer S3 field and switch tab."""
self._analyzer_panel.s3_var.set(raw_s3_path)