#!/usr/bin/env python3 """ seismo_lab.py — Combined S3 Bridge + Protocol Analyzer + Device Console GUI. Single window with three top-level tabs: Bridge — capture live serial traffic (wraps s3_bridge.py as subprocess) Analyzer — parse, diff, and query captured frames Console — direct device connection; runs commands and shows raw bytes + decoded output; colour-coded TX/RX console with log save and Send-to-Analyzer support When the bridge starts: - raw tap paths are auto-filled in the Analyzer tab - Live mode is automatically enabled so the Analyzer updates in real time Run from anywhere: python seismo_lab.py """ from __future__ import annotations import datetime import os import queue import socket import subprocess import sys import threading import tkinter as tk from pathlib import Path from tkinter import filedialog, messagebox, scrolledtext, simpledialog, ttk from typing import Optional # ── path setup ──────────────────────────────────────────────────────────────── SCRIPT_DIR = Path(__file__).parent BRIDGE_PATH = SCRIPT_DIR / "bridges" / "s3-bridge" / "s3_bridge.py" PARSERS_DIR = SCRIPT_DIR / "parsers" sys.path.insert(0, str(PARSERS_DIR)) sys.path.insert(0, str(SCRIPT_DIR)) # for minimateplus package from s3_analyzer import ( # noqa: E402 AnnotatedFrame, FrameDiff, Session, annotate_frames, diff_sessions, format_hex_dump, parse_bw, parse_s3, parse_structured_bin, render_session_report, split_into_sessions, split_sessions_at_marks, write_claude_export, ) from frame_db import FrameDB # noqa: E402 # ── colour palette ──────────────────────────────────────────────────────────── BG = "#1e1e1e" BG2 = "#252526" BG3 = "#2d2d30" FG = "#d4d4d4" FG_DIM = "#6a6a6a" ACCENT = "#569cd6" RED = "#f44747" YELLOW = "#dcdcaa" GREEN = "#4caf50" ORANGE = "#ce9178" COL_BW = "#9cdcfe" COL_S3 = "#4ec9b0" COL_DIFF = "#f44747" COL_KNOW = "#4caf50" COL_HEAD = "#569cd6" MONO = ("Consolas", 9) MONO_SM = ("Consolas", 8) MONO_B = ("Consolas", 9, "bold") # ───────────────────────────────────────────────────────────────────────────── # Shared state # ───────────────────────────────────────────────────────────────────────────── class AnalyzerState: def __init__(self) -> None: self.sessions: list[Session] = [] self.diffs: list[Optional[list[FrameDiff]]] = [] self.s3_path: Optional[Path] = None self.bw_path: Optional[Path] = None self.last_capture_id: Optional[int] = None # ───────────────────────────────────────────────────────────────────────────── # Bridge panel (tk.Frame — lives inside a notebook tab) # ───────────────────────────────────────────────────────────────────────────── class BridgePanel(tk.Frame): """ Bridge controls and live log output. Two modes selectable at the top: - Serial: wraps s3_bridge.py as a subprocess (two COM ports). Single bridge session; use New Capture / Stop Capture to create labelled raw-file segments on demand. - TCP: MITM proxy — listens for Blastware on a local port, forwards to the real device. Each incoming connection is a capture; segments appear in the history list automatically. Callbacks (all optional except on_bridge_started / on_bridge_stopped): on_bridge_started(struct_bin_path) — bridge is up on_bridge_stopped() — bridge stopped on_capture_started(bw_path, s3_path, label) — a capture segment began on_capture_complete(bw_path, s3_path, label)— a capture segment finished """ def __init__(self, parent: tk.Widget, on_bridge_started, on_bridge_stopped, on_capture_started=None, on_capture_complete=None, **kw): def __init__(self, parent: tk.Widget, on_bridge_started, on_bridge_stopped, on_capture_started=None, on_capture_complete=None, **kw): super().__init__(parent, bg=BG2, **kw) self._on_started = on_bridge_started # signature: (struct_bin) self._on_stopped = on_bridge_stopped self._on_cap_started = on_capture_started # (bw, s3, label) self._on_cap_complete = on_capture_complete # (bw, s3, label) self.process: Optional[subprocess.Popen] = None self._stdout_q: queue.Queue[str] = queue.Queue() # tcp state self._server: Optional[socket.socket] = None self._tcp_stop_event = threading.Event() self._tcp_log_q: queue.Queue[str] = queue.Queue() # tcp capture file handles — written only when capture is active self._tcp_cap_lock = threading.Lock() self._tcp_cap_bw_fh = None self._tcp_cap_s3_fh = None self._tcp_cap_bw_path: Optional[str] = None self._tcp_cap_s3_path: Optional[str] = None # shared capture state self._capturing = False self._cap_label: Optional[str] = None self._cap_history: list[dict] = [] # {label, status, bw, s3} # mode self._mode = tk.StringVar(value="serial") # Capture state self._capturing = False self._cap_label: Optional[str] = None self._cap_history: list[dict] = [] # {label, status, bw, s3} self._build() self._poll_stdout() self._poll_tcp_log() # ── build ───────────────────────────────────────────────────────────── def _build(self) -> None: pad = {"padx": 6, "pady": 4} cfg = tk.Frame(self, bg=BG2) cfg.pack(side=tk.TOP, fill=tk.X, padx=4, pady=4) # Row 0: mode selector mode_row = tk.Frame(cfg, bg=BG2) mode_row.grid(row=0, column=0, columnspan=6, sticky="w", padx=6, pady=(4, 0)) tk.Label(mode_row, text="Mode:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, padx=(0, 8)) tk.Radiobutton(mode_row, text="Serial", variable=self._mode, value="serial", bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2, font=MONO, command=self._on_mode_change).pack(side=tk.LEFT, padx=4) tk.Radiobutton(mode_row, text="TCP", variable=self._mode, value="tcp", bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2, font=MONO, command=self._on_mode_change).pack(side=tk.LEFT, padx=4) # Row 1a: serial connection fields (shown by default) self._serial_frame = tk.Frame(cfg, bg=BG2) self._serial_frame.grid(row=1, column=0, columnspan=6, sticky="w") tk.Label(self._serial_frame, text="BW COM:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=0, sticky="e", **pad) self.bw_var = tk.StringVar(value="COM4") tk.Entry(self._serial_frame, textvariable=self.bw_var, width=10, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO).grid(row=0, column=1, sticky="w", **pad) tk.Label(self._serial_frame, text="S3 COM:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=2, sticky="e", **pad) self.s3_var = tk.StringVar(value="COM5") tk.Entry(self._serial_frame, textvariable=self.s3_var, width=10, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO).grid(row=0, column=3, sticky="w", **pad) tk.Label(self._serial_frame, text="Baud:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=4, sticky="e", **pad) self.baud_var = tk.StringVar(value="38400") tk.Entry(self._serial_frame, textvariable=self.baud_var, width=8, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO).grid(row=0, column=5, sticky="w", **pad) # Row 1b: TCP connection fields (hidden until TCP mode selected) self._tcp_frame = tk.Frame(cfg, bg=BG2) tk.Label(self._tcp_frame, text="Listen port:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=0, sticky="e", **pad) self.listen_port_var = tk.StringVar(value="9034") tk.Entry(self._tcp_frame, textvariable=self.listen_port_var, width=8, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO).grid(row=0, column=1, sticky="w", **pad) tk.Label(self._tcp_frame, text="Device host:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=2, sticky="e", **pad) self.remote_host_var = tk.StringVar(value="63.43.212.232") tk.Entry(self._tcp_frame, textvariable=self.remote_host_var, width=18, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO).grid(row=0, column=3, sticky="w", **pad) tk.Label(self._tcp_frame, text="Port:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=4, sticky="e", **pad) self.remote_port_var = tk.StringVar(value="9034") tk.Entry(self._tcp_frame, textvariable=self.remote_port_var, width=8, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO).grid(row=0, column=5, sticky="w", **pad) # Row 2: log dir tk.Label(cfg, text="Log dir:", bg=BG2, fg=FG, font=MONO).grid(row=2, column=0, sticky="e", **pad) self.logdir_var = tk.StringVar(value=str(SCRIPT_DIR / "bridges" / "captures")) tk.Entry(cfg, textvariable=self.logdir_var, width=40, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO).grid(row=2, column=1, columnspan=4, sticky="we", **pad) tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2", font=MONO, command=self._choose_dir).grid(row=2, column=5, **pad) # Row 2: buttons + status btn_row = tk.Frame(self, bg=BG2) btn_row.pack(side=tk.TOP, fill=tk.X, padx=4, pady=2) self.start_btn = tk.Button(btn_row, text="Start Bridge", bg=GREEN, fg="#000000", relief="flat", padx=12, cursor="hand2", font=MONO_B, command=self.start_bridge) self.start_btn.pack(side=tk.LEFT, padx=6) self.stop_btn = tk.Button(btn_row, text="Stop Bridge", bg=BG3, fg=FG, relief="flat", padx=12, cursor="hand2", font=MONO, command=self.stop_bridge, state="disabled") self.stop_btn.pack(side=tk.LEFT, padx=4) tk.Frame(btn_row, bg=BG2, width=16).pack(side=tk.LEFT) # spacer self.cap_btn = tk.Button(btn_row, text="● New Capture", bg=ORANGE, fg="#000000", relief="flat", padx=10, cursor="hand2", font=MONO_B, command=self._start_capture, state="disabled") self.cap_btn.pack(side=tk.LEFT, padx=4) self.stop_cap_btn = tk.Button(btn_row, text="■ Stop Capture", bg=BG3, fg=RED, relief="flat", padx=10, cursor="hand2", font=MONO_B, command=self._stop_capture, state="disabled") self.stop_cap_btn.pack(side=tk.LEFT, padx=4) tk.Frame(btn_row, bg=BG2, width=16).pack(side=tk.LEFT) # spacer self.cap_btn = tk.Button(btn_row, text="⬤ New Capture", bg=ORANGE, fg="#000000", relief="flat", padx=10, cursor="hand2", font=MONO_B, command=self._start_capture, state="disabled") self.cap_btn.pack(side=tk.LEFT, padx=4) self.stop_cap_btn = tk.Button(btn_row, text="■ Stop Capture", bg=BG3, fg=RED, relief="flat", padx=10, cursor="hand2", font=MONO_B, command=self._stop_capture, state="disabled") self.stop_cap_btn.pack(side=tk.LEFT, padx=4) self.mark_btn = tk.Button(btn_row, text="Add Mark", bg=BG3, fg=FG, relief="flat", padx=10, cursor="hand2", font=MONO, command=self.add_mark, state="disabled") self.mark_btn.pack(side=tk.LEFT, padx=4) self.status_var = tk.StringVar(value="Idle") tk.Label(btn_row, textvariable=self.status_var, bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=10) # Capture history panel hist_outer = tk.Frame(self, bg=BG2) hist_outer.pack(side=tk.TOP, fill=tk.X, padx=4, pady=(2, 0)) tk.Label(hist_outer, text="Captures:", bg=BG2, fg=FG_DIM, font=MONO_SM, anchor="w").pack(side=tk.LEFT, padx=(4, 6)) hist_inner = tk.Frame(hist_outer, bg=BG2) hist_inner.pack(side=tk.LEFT, fill=tk.X, expand=True) self._hist_lb = tk.Listbox( hist_inner, bg=BG3, fg=FG, font=MONO_SM, height=3, relief="flat", selectbackground=BG, selectforeground=ACCENT, activestyle="none", highlightthickness=0, ) hist_vsb = ttk.Scrollbar(hist_inner, orient="vertical", command=self._hist_lb.yview) self._hist_lb.configure(yscrollcommand=hist_vsb.set) hist_vsb.pack(side=tk.RIGHT, fill=tk.Y) self._hist_lb.pack(side=tk.LEFT, fill=tk.X, expand=True) self._hist_lb.bind("", self._on_hist_dblclick) tk.Label(hist_outer, text="dbl-click to reload", bg=BG2, fg=FG_DIM, font=MONO_SM, anchor="e").pack(side=tk.RIGHT, padx=6) # Capture history list hist_outer = tk.Frame(self, bg=BG2) hist_outer.pack(side=tk.TOP, fill=tk.X, padx=4, pady=(2, 0)) tk.Label(hist_outer, text="Captures:", bg=BG2, fg=FG_DIM, font=MONO_SM, anchor="w").pack(side=tk.LEFT, padx=(4, 6)) hist_inner = tk.Frame(hist_outer, bg=BG2) hist_inner.pack(side=tk.LEFT, fill=tk.X, expand=True) self._hist_lb = tk.Listbox( hist_inner, bg=BG3, fg=FG, font=MONO_SM, height=3, relief="flat", selectbackground=BG, selectforeground=ACCENT, activestyle="none", highlightthickness=0, ) hist_vsb = ttk.Scrollbar(hist_inner, orient="vertical", command=self._hist_lb.yview) self._hist_lb.configure(yscrollcommand=hist_vsb.set) hist_vsb.pack(side=tk.RIGHT, fill=tk.Y) self._hist_lb.pack(side=tk.LEFT, fill=tk.X, expand=True) self._hist_lb.bind("", self._on_hist_dblclick) tk.Label(hist_outer, text="dbl-click to reload", bg=BG2, fg=FG_DIM, font=MONO_SM, anchor="e").pack(side=tk.RIGHT, padx=6) # Log output self.log_view = scrolledtext.ScrolledText( self, height=14, font=MONO_SM, self, height=14, font=MONO_SM, bg=BG, fg=FG, insertbackground=FG, relief="flat", state="disabled", ) self.log_view.pack(fill=tk.BOTH, expand=True, padx=4, pady=4) # ── helpers ─────────────────────────────────────────────────────────── def _on_mode_change(self) -> None: if self._mode.get() == "serial": self._tcp_frame.grid_remove() self._serial_frame.grid(row=1, column=0, columnspan=6, sticky="w") else: self._serial_frame.grid_remove() self._tcp_frame.grid(row=1, column=0, columnspan=6, sticky="w") def _choose_dir(self) -> None: path = filedialog.askdirectory(initialdir=self.logdir_var.get()) if path: self.logdir_var.set(path) def _append_log(self, text: str) -> None: self.log_view.configure(state="normal") self.log_view.insert(tk.END, text) self.log_view.see(tk.END) self.log_view.configure(state="disabled") def _refresh_hist(self) -> None: self._hist_lb.delete(0, tk.END) for entry in self._cap_history: icon = "\U0001f534" if entry["status"] == "recording" else "✅" self._hist_lb.insert(tk.END, f" {icon} {entry['label'] or '(unlabeled)'}") if self._cap_history: self._hist_lb.see(tk.END) def _on_hist_dblclick(self, _e=None) -> None: sel = self._hist_lb.curselection() if not sel: return entry = self._cap_history[sel[0]] if entry["status"] == "done" and entry["bw"] and entry["s3"] and self._on_cap_complete: self._on_cap_complete(entry["bw"], entry["s3"], entry["label"]) # ── bridge control (delegates to serial or TCP) ─────────────────────── def start_bridge(self) -> None: if self._mode.get() == "tcp": self._start_tcp() else: self._start_serial() def stop_bridge(self) -> None: if self._mode.get() == "tcp": self._stop_tcp() else: self._stop_serial() def _bridge_ended(self) -> None: self.status_var.set("Stopped") self.start_btn.configure(state="normal") self.stop_btn.configure(state="disabled", bg=BG3) self.cap_btn.configure(state="disabled") self.stop_cap_btn.configure(state="disabled", bg=BG3) self.mark_btn.configure(state="disabled") self._capturing = False self._cap_label = None self._append_log("== Bridge stopped ==\n") # ── capture lifecycle (shared by serial and TCP) ────────────────────── def _on_cap_started_msg(self, bw_path: str, s3_path: str) -> None: for entry in reversed(self._cap_history): if entry["status"] == "recording" and entry["bw"] is None: entry["bw"] = bw_path entry["s3"] = s3_path break self._refresh_hist() if self._on_cap_started: self._on_cap_started(bw_path, s3_path, self._cap_label or "") def _on_cap_stopped_msg(self, bw_path: str, s3_path: str) -> None: label = self._cap_label or "capture" for entry in reversed(self._cap_history): if entry["status"] == "recording": entry["status"] = "done" entry["bw"] = bw_path entry["s3"] = s3_path break self._refresh_hist() self._capturing = False self._cap_label = None self.cap_btn.configure(state="normal") self.stop_cap_btn.configure(state="disabled", bg=BG3) self._append_log(f"[CAPTURE] Done: {label!r} — ready in Analyzer\n") if self._on_cap_complete: self._on_cap_complete(bw_path, s3_path, label) # ── serial mode ─────────────────────────────────────────────────────── def _start_serial(self) -> None: if self.process and self.process.poll() is None: messagebox.showinfo("Bridge", "Bridge is already running.") return bw = self.bw_var.get().strip() s3 = self.s3_var.get().strip() baud = self.baud_var.get().strip() logdir = self.logdir_var.get().strip() or "." if not bw or not s3: messagebox.showerror("Error", "Please enter both BW and S3 COM ports.") return os.makedirs(logdir, exist_ok=True) ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") args = [sys.executable, str(BRIDGE_PATH), "--bw", bw, "--s3", s3, "--baud", baud, "--logdir", logdir] # Raw BW/S3 taps are NOT opened at bridge start. # Use "New Capture" to start a labeled tap on demand. # Structured bin path — written by bridge automatically, named by ts struct_bin_path = os.path.join(logdir, f"s3_session_{ts}.bin") try: self.process = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, text=True, bufsize=1, ) except Exception as e: messagebox.showerror("Error", f"Failed to start bridge:\n{e}") return threading.Thread(target=self._reader_thread, daemon=True).start() self.status_var.set(f"Running — {bw} <-> {s3}") self.start_btn.configure(state="disabled") self.stop_btn.configure(state="normal", bg=RED) self.cap_btn.configure(state="normal") self.cap_btn.configure(state="normal") self._append_log(f"== Bridge started [{ts}] ==\n") self._append_log(" Click 'New Capture' when ready to record.\n") self._on_started(struct_bin_path) # Notify parent — no raw files yet, just the structured bin path self._on_started(struct_bin_path) def stop_bridge(self) -> None: if self.process and self.process.poll() is None: self.process.terminate() try: self.process.wait(timeout=3) except subprocess.TimeoutExpired: self.process.kill() self._bridge_ended() self._on_stopped() def _bridge_ended(self) -> None: self.status_var.set("Stopped") self.start_btn.configure(state="normal") self.stop_btn.configure(state="disabled", bg=BG3) self.cap_btn.configure(state="disabled") self.stop_cap_btn.configure(state="disabled", bg=BG3) self.mark_btn.configure(state="disabled") self._capturing = False self._cap_label = None self._append_log("== Bridge stopped ==\n") def _reader_thread(self) -> None: if not self.process or not self.process.stdout: return for line in self.process.stdout: self._stdout_q.put(line) self._stdout_q.put("<>") def _poll_stdout(self) -> None: try: while True: line = self._stdout_q.get_nowait() if line == "<>": self._bridge_ended() self._on_stopped() break stripped = line.strip() # Handle capture lifecycle events from bridge if stripped.startswith("[CAP_START] ") and "\t" in stripped: parts = stripped[12:].split("\t", 1) if len(parts) == 2: bw_path, s3_path = parts[0].strip(), parts[1].strip() self._on_cap_started_msg(bw_path, s3_path) elif stripped.startswith("[CAP_STOP] ") and "\t" in stripped: parts = stripped[11:].split("\t", 1) if len(parts) == 2: bw_path, s3_path = parts[0].strip(), parts[1].strip() self._on_cap_stopped_msg(bw_path, s3_path) self._append_log(line) except queue.Empty: pass finally: self.after(100, self._poll_stdout) # ── capture control ─────────────────────────────────────────────────── def _start_capture(self) -> None: """Ask for a label and tell the bridge to start writing raw tap files.""" if not self.process or self.process.poll() is not None: return label = simpledialog.askstring( "New Capture", "Label for this capture\n(e.g. 'recording_mode_continuous').\nLeave blank for timestamp only:", parent=self, ) if label is None: return # user hit Cancel label = label.strip() try: self.process.stdin.write(f"CAP_START:{label}\n") self.process.stdin.flush() except Exception as e: messagebox.showerror("Error", f"Failed to start capture:\n{e}") return self._capturing = True self._cap_label = label or datetime.datetime.now().strftime("%H%M%S") self.cap_btn.configure(state="disabled") self.stop_cap_btn.configure(state="normal", bg=RED) self.mark_btn.configure(state="normal") self._append_log(f"[CAPTURE] Starting: {self._cap_label!r}...\n") # Add to history as recording (paths filled in when [CAP_START] arrives) self._cap_history.append({"label": self._cap_label, "status": "recording", "bw": None, "s3": None}) self._refresh_hist() def _stop_capture(self) -> None: """Tell the bridge to flush and close the current raw tap files.""" if not self.process or self.process.poll() is not None: return try: self.process.stdin.write("CAP_STOP\n") self.process.stdin.flush() except Exception as e: messagebox.showerror("Error", f"Failed to stop capture:\n{e}") # UI is updated when [CAP_STOP] arrives in stdout def _on_cap_started_msg(self, bw_path: str, s3_path: str) -> None: """Called when bridge confirms capture has started (files are open).""" # Fill in paths for the last 'recording' history entry for entry in reversed(self._cap_history): if entry["status"] == "recording" and entry["bw"] is None: entry["bw"] = bw_path entry["s3"] = s3_path break if self._on_cap_started: self._on_cap_started(bw_path, s3_path, self._cap_label or "") def _on_cap_stopped_msg(self, bw_path: str, s3_path: str) -> None: """Called when bridge confirms capture has stopped (files are closed).""" label = self._cap_label or "capture" # Mark history entry as done for entry in reversed(self._cap_history): if entry["status"] == "recording": entry["status"] = "done" entry["bw"] = bw_path entry["s3"] = s3_path break self._refresh_hist() self._capturing = False self._cap_label = None self.cap_btn.configure(state="normal") self.stop_cap_btn.configure(state="disabled", bg=BG3) self._append_log(f"[CAPTURE] Done: {label!r} — ready in Analyzer\n") if self._on_cap_complete: self._on_cap_complete(bw_path, s3_path, label) def _refresh_hist(self) -> None: self._hist_lb.delete(0, tk.END) for entry in self._cap_history: icon = "🔴" if entry["status"] == "recording" else "✅" label = entry["label"] or "(unlabeled)" self._hist_lb.insert(tk.END, f" {icon} {label}") if self._cap_history: self._hist_lb.see(tk.END) def _on_hist_dblclick(self, _e=None) -> None: sel = self._hist_lb.curselection() if not sel: return entry = self._cap_history[sel[0]] if entry["status"] == "done" and entry["bw"] and entry["s3"]: if self._on_cap_complete: self._on_cap_complete(entry["bw"], entry["s3"], entry["label"]) # ── mark ────────────────────────────────────────────────────────────── def add_mark(self) -> None: if not self.process or not self.process.stdin or self.process.poll() is not None: return label = label.strip() self._capturing = True self._cap_label = label or datetime.datetime.now().strftime("%H%M%S") if self._mode.get() == "tcp": # TCP: open the capture files now; pipe threads write here while active logdir = self.logdir_var.get().strip() or "." os.makedirs(logdir, exist_ok=True) ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") safe_label = self._cap_label.replace(" ", "_") if self._cap_label else "" suffix = f"_{safe_label}" if safe_label else "" bw_path = os.path.join(logdir, f"raw_bw_{ts}{suffix}.bin") s3_path = os.path.join(logdir, f"raw_s3_{ts}{suffix}.bin") with self._tcp_cap_lock: self._tcp_cap_bw_fh = open(bw_path, "wb") self._tcp_cap_s3_fh = open(s3_path, "wb") self._tcp_cap_bw_path = bw_path self._tcp_cap_s3_path = s3_path self._cap_history.append({"label": self._cap_label, "status": "recording", "bw": bw_path, "s3": s3_path}) self._refresh_hist() self._on_cap_started_msg(bw_path, s3_path) else: if not self.process or self.process.poll() is not None: return try: self.process.stdin.write(f"CAP_START:{label}\n") self.process.stdin.flush() except Exception as e: messagebox.showerror("Error", f"Failed to start capture:\n{e}") return self._cap_history.append({"label": self._cap_label, "status": "recording", "bw": None, "s3": None}) self._refresh_hist() self.cap_btn.configure(state="disabled") self.stop_cap_btn.configure(state="normal", bg=RED) self.mark_btn.configure(state="normal") self._append_log(f"[CAPTURE] Starting: {self._cap_label!r}...\n") def _stop_capture(self) -> None: if self._mode.get() == "tcp": with self._tcp_cap_lock: bw_path = self._tcp_cap_bw_path s3_path = self._tcp_cap_s3_path if self._tcp_cap_bw_fh: self._tcp_cap_bw_fh.close() self._tcp_cap_bw_fh = None if self._tcp_cap_s3_fh: self._tcp_cap_s3_fh.close() self._tcp_cap_s3_fh = None self._tcp_cap_bw_path = None self._tcp_cap_s3_path = None if bw_path and s3_path: self._on_cap_stopped_msg(bw_path, s3_path) return if not self.process or self.process.poll() is not None: return try: self.process.stdin.write("CAP_STOP\n") self.process.stdin.flush() except Exception as e: messagebox.showerror("Error", f"Failed to stop capture:\n{e}") # ── TCP mode ────────────────────────────────────────────────────────── def _start_tcp(self) -> None: if self._server is not None: messagebox.showinfo("Bridge", "TCP bridge is already listening.") return try: listen_port = int(self.listen_port_var.get().strip()) remote_host = self.remote_host_var.get().strip() remote_port = int(self.remote_port_var.get().strip()) except ValueError: messagebox.showerror("Error", "Invalid port number.") return if not remote_host: messagebox.showerror("Error", "Please enter the device host.") return try: srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) srv.bind(("0.0.0.0", listen_port)) srv.listen(5) srv.settimeout(1.0) except OSError as e: messagebox.showerror("Error", f"Cannot bind to port {listen_port}:\n{e}") return self._server = srv self._tcp_stop_event.clear() self.start_btn.configure(state="disabled") self.stop_btn.configure(state="normal", bg=RED) self.cap_btn.configure(state="normal") self.status_var.set(f"Listening on :{listen_port}") ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") self._append_log( f"== TCP Bridge started [{ts}]\n" f" Listening on 0.0.0.0:{listen_port}\n" f" Forwarding to {remote_host}:{remote_port}\n" f" Click 'New Capture' before the operation you want to record.\n==\n" ) self._on_started(None) threading.Thread( target=self._accept_loop, args=(srv, remote_host, remote_port), daemon=True, ).start() def _stop_tcp(self) -> None: # Close any open capture files first with self._tcp_cap_lock: if self._tcp_cap_bw_fh: self._tcp_cap_bw_fh.close() self._tcp_cap_bw_fh = None if self._tcp_cap_s3_fh: self._tcp_cap_s3_fh.close() self._tcp_cap_s3_fh = None self._tcp_stop_event.set() if self._server: try: self._server.close() except OSError: pass self._server = None self._bridge_ended() self._on_stopped() def _accept_loop(self, srv: socket.socket, remote_host: str, remote_port: int) -> None: while not self._tcp_stop_event.is_set(): try: client_sock, addr = srv.accept() except socket.timeout: continue except OSError: break peer = f"{addr[0]}:{addr[1]}" self._tcp_log_q.put(f"[TCP] Blastware connected from {peer}\n") try: dev_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) dev_sock.connect((remote_host, remote_port)) except OSError as e: self._tcp_log_q.put(f"[TCP] Cannot reach device {remote_host}:{remote_port}: {e}\n") client_sock.close() continue self._tcp_log_q.put(f"[TCP] Connected to device at {remote_host}:{remote_port}\n") self._run_tcp_session(client_sock, dev_sock) self._tcp_log_q.put(f"[TCP] Connection from {peer} closed\n") def _run_tcp_session(self, bw_sock: socket.socket, dev_sock: socket.socket) -> None: """Forward bytes in both directions; write to capture files only when active.""" bw_bytes = [0] s3_bytes = [0] def _pipe(src, dst, get_fh, counter): try: while True: data = src.recv(4096) if not data: break dst.sendall(data) with self._tcp_cap_lock: fh = get_fh() if fh: fh.write(data) fh.flush() counter[0] += len(data) except OSError: pass finally: try: dst.shutdown(socket.SHUT_WR) except OSError: pass t_bw = threading.Thread(target=_pipe, args=(bw_sock, dev_sock, lambda: self._tcp_cap_bw_fh, bw_bytes), daemon=True) t_s3 = threading.Thread(target=_pipe, args=(dev_sock, bw_sock, lambda: self._tcp_cap_s3_fh, s3_bytes), daemon=True) t_bw.start() t_s3.start() t_bw.join() t_s3.join() bw_sock.close() dev_sock.close() def _poll_tcp_log(self) -> None: try: while True: msg = self._tcp_log_q.get_nowait() self._append_log(msg) except queue.Empty: pass finally: self.after(100, self._poll_tcp_log) # ── marks ───────────────────────────────────────────────────────────── def add_mark(self) -> None: label = simpledialog.askstring("Mark", "Enter label for this mark:", parent=self) if not label or not label.strip(): return if self._mode.get() == "tcp": ts = datetime.datetime.now().strftime("%H:%M:%S") self._append_log(f"[MARK {ts}] {label.strip()}\n") else: if not self.process or not self.process.stdin or self.process.poll() is not None: return try: self.process.stdin.write("m\n") self.process.stdin.write(label.strip() + "\n") self.process.stdin.flush() self._append_log(f"[MARK] {label.strip()}\n") except Exception as e: messagebox.showerror("Error", f"Failed to send mark:\n{e}") # ───────────────────────────────────────────────────────────────────────────── # Analyzer panel (tk.Frame — lives inside a notebook tab) # Extracted from gui_analyzer.py; accepts external path injection. # ───────────────────────────────────────────────────────────────────────────── class AnalyzerPanel(tk.Frame): def __init__(self, parent: tk.Widget, db: FrameDB, **kw): super().__init__(parent, bg=BG, **kw) self._db = db self.state = AnalyzerState() self._live_thread: Optional[threading.Thread] = None self._live_stop = threading.Event() self._live_q: queue.Queue[str] = queue.Queue() self._build() self._poll_live_queue() # ── external API (called by parent when bridge starts/stops) ────────── def set_live_files(self, raw_bw: Optional[str], raw_s3: Optional[str], struct_bin: Optional[str] = None) -> None: """Called when the bridge starts — inject file paths and start live mode.""" if raw_s3: self.s3_var.set(raw_s3) if raw_bw: self.bw_var.set(raw_bw) if struct_bin: self.bin_var.set(struct_bin) if raw_s3 and raw_bw: self._start_live() def stop_live(self) -> None: """Called when the bridge stops.""" if self._live_thread and self._live_thread.is_alive(): self._live_stop.set() self._set_live_btn_off() # ── build ───────────────────────────────────────────────────────────── def _build(self) -> None: self._build_toolbar() self._build_panes() self._build_statusbar() def _build_toolbar(self) -> None: bar = tk.Frame(self, bg=BG2, pady=4) bar.pack(side=tk.TOP, fill=tk.X) pad = {"padx": 5, "pady": 2} # Row 1: raw files row1 = tk.Frame(bar, bg=BG2) row1.pack(side=tk.TOP, fill=tk.X) tk.Label(row1, text="S3 raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad) self.s3_var = tk.StringVar() tk.Entry(row1, textvariable=self.s3_var, width=30, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad) tk.Button(row1, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2", font=MONO, command=lambda: self._browse(self.s3_var, "raw_s3.bin") ).pack(side=tk.LEFT, **pad) tk.Label(row1, text=" BW raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad) self.bw_var = tk.StringVar() tk.Entry(row1, textvariable=self.bw_var, width=30, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad) tk.Button(row1, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2", font=MONO, command=lambda: self._browse(self.bw_var, "raw_bw.bin") ).pack(side=tk.LEFT, **pad) # Row 2: structured bin (optional — enables mark-based session splitting) row2 = tk.Frame(bar, bg=BG2) row2.pack(side=tk.TOP, fill=tk.X) tk.Label(row2, text="Session .bin:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad) self.bin_var = tk.StringVar() tk.Entry(row2, textvariable=self.bin_var, width=46, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad) tk.Button(row2, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2", font=MONO, command=lambda: self._browse(self.bin_var, "s3_session.bin") ).pack(side=tk.LEFT, **pad) tk.Label(row2, text="(optional — splits sessions at marks)", bg=BG2, fg=FG_DIM, font=MONO_SM).pack(side=tk.LEFT, padx=6) # Row 3: buttons bar = tk.Frame(bar, bg=BG2) bar.pack(side=tk.TOP, fill=tk.X) tk.Frame(bar, bg=BG2, width=10).pack(side=tk.LEFT) self.analyze_btn = tk.Button(bar, text="Analyze", bg=ACCENT, fg="#ffffff", relief="flat", padx=10, cursor="hand2", font=MONO_B, command=self._run_analyze) self.analyze_btn.pack(side=tk.LEFT, **pad) self.live_btn = tk.Button(bar, text="Live: OFF", bg=BG3, fg=FG, relief="flat", padx=10, cursor="hand2", font=MONO, command=self._toggle_live) self.live_btn.pack(side=tk.LEFT, **pad) self.export_btn = tk.Button(bar, text="Export for Claude", bg=ORANGE, fg="#000000", relief="flat", padx=10, cursor="hand2", font=MONO_B, command=self._run_export, state="disabled") self.export_btn.pack(side=tk.LEFT, **pad) self.status_var = tk.StringVar(value="Idle") tk.Label(bar, textvariable=self.status_var, bg=BG2, fg=FG_DIM, font=MONO, anchor="w").pack(side=tk.LEFT, padx=10) def _build_panes(self) -> None: pane = tk.PanedWindow(self, orient=tk.HORIZONTAL, bg=BG, sashwidth=4, sashrelief="flat") pane.pack(fill=tk.BOTH, expand=True) # Left: session tree left = tk.Frame(pane, bg=BG2, width=260) pane.add(left, minsize=200) tk.Label(left, text="Sessions", bg=BG2, fg=ACCENT, font=MONO_B, anchor="w", padx=6).pack(fill=tk.X) tree_frame = tk.Frame(left, bg=BG2) tree_frame.pack(fill=tk.BOTH, expand=True) style = ttk.Style() style.theme_use("clam") style.configure("Treeview", background=BG2, foreground=FG, fieldbackground=BG2, font=MONO_SM, rowheight=18, borderwidth=0) style.configure("Treeview.Heading", background=BG3, foreground=ACCENT, font=MONO_SM) style.map("Treeview", background=[("selected", BG3)], foreground=[("selected", "#ffffff")]) self.tree = ttk.Treeview(tree_frame, columns=("info",), show="tree headings", selectmode="browse") self.tree.heading("#0", text="Frame") self.tree.heading("info", text="Info") self.tree.column("#0", width=160, stretch=True) self.tree.column("info", width=80, stretch=False) vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.tree.yview) self.tree.configure(yscrollcommand=vsb.set) vsb.pack(side=tk.RIGHT, fill=tk.Y) self.tree.pack(fill=tk.BOTH, expand=True) self.tree.tag_configure("session", foreground=ACCENT, font=MONO_B) self.tree.tag_configure("bw_frame", foreground=COL_BW) self.tree.tag_configure("s3_frame", foreground=COL_S3) self.tree.tag_configure("bad_chk", foreground=RED) self.tree.tag_configure("malformed", foreground=RED) self.tree.bind("<>", self._on_tree_select) # Right: detail notebook right = tk.Frame(pane, bg=BG) pane.add(right, minsize=600) style.configure("TNotebook", background=BG2, borderwidth=0) style.configure("TNotebook.Tab", background=BG3, foreground=FG, font=MONO, padding=[8, 2]) style.map("TNotebook.Tab", background=[("selected", BG)], foreground=[("selected", ACCENT)]) self.nb = ttk.Notebook(right) self.nb.pack(fill=tk.BOTH, expand=True) self.inv_text = self._make_text_tab("Inventory") self.hex_text = self._make_text_tab("Hex Dump") self.diff_text = self._make_text_tab("Diff") self.report_text = self._make_text_tab("Full Report") self._build_query_tab() for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text): w.tag_configure("head", foreground=COL_HEAD, font=MONO_B) w.tag_configure("bw", foreground=COL_BW) w.tag_configure("s3", foreground=COL_S3) w.tag_configure("changed", foreground=COL_DIFF) w.tag_configure("known", foreground=COL_KNOW) w.tag_configure("dim", foreground=FG_DIM) w.tag_configure("normal", foreground=FG) w.tag_configure("warn", foreground=YELLOW) w.tag_configure("addr", foreground=ORANGE) def _make_text_tab(self, title: str) -> tk.Text: frame = tk.Frame(self.nb, bg=BG) self.nb.add(frame, text=title) w = tk.Text(frame, bg=BG, fg=FG, font=MONO, state="disabled", relief="flat", wrap="none", insertbackground=FG, selectbackground=BG3, selectforeground="#ffffff") vsb = ttk.Scrollbar(frame, orient="vertical", command=w.yview) hsb = ttk.Scrollbar(frame, orient="horizontal", command=w.xview) w.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set) vsb.pack(side=tk.RIGHT, fill=tk.Y) hsb.pack(side=tk.BOTTOM, fill=tk.X) w.pack(fill=tk.BOTH, expand=True) return w def _build_query_tab(self) -> None: frame = tk.Frame(self.nb, bg=BG) self.nb.add(frame, text="Query DB") pad = {"padx": 4, "pady": 2} filt = tk.Frame(frame, bg=BG2, pady=4) filt.pack(side=tk.TOP, fill=tk.X) tk.Label(filt, text="Capture:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=0, sticky="e", **pad) self._q_capture_var = tk.StringVar(value="All") self._q_capture_cb = ttk.Combobox(filt, textvariable=self._q_capture_var, width=20, font=MONO_SM, state="readonly") self._q_capture_cb.grid(row=0, column=1, sticky="w", **pad) tk.Label(filt, text="Dir:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=2, sticky="e", **pad) self._q_dir_var = tk.StringVar(value="All") ttk.Combobox(filt, textvariable=self._q_dir_var, values=["All", "BW", "S3"], width=6, font=MONO_SM, state="readonly").grid(row=0, column=3, sticky="w", **pad) tk.Label(filt, text="SUB:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=4, sticky="e", **pad) self._q_sub_var = tk.StringVar(value="All") self._q_sub_cb = ttk.Combobox(filt, textvariable=self._q_sub_var, width=12, font=MONO_SM, state="readonly") self._q_sub_cb.grid(row=0, column=5, sticky="w", **pad) tk.Label(filt, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=6, sticky="e", **pad) self._q_offset_var = tk.StringVar() tk.Entry(filt, textvariable=self._q_offset_var, width=8, bg=BG3, fg=FG, font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=7, sticky="w", **pad) tk.Label(filt, text="Value:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=8, sticky="e", **pad) self._q_value_var = tk.StringVar() tk.Entry(filt, textvariable=self._q_value_var, width=8, bg=BG3, fg=FG, font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=9, sticky="w", **pad) tk.Button(filt, text="Run Query", bg=ACCENT, fg="#ffffff", relief="flat", padx=8, cursor="hand2", font=("Consolas", 8, "bold"), command=self._run_db_query).grid(row=0, column=10, padx=8) tk.Button(filt, text="Refresh", bg=BG3, fg=FG, relief="flat", padx=6, cursor="hand2", font=MONO_SM, command=self._refresh_query_dropdowns).grid(row=0, column=11, padx=4) self._q_stats_var = tk.StringVar(value="DB: —") tk.Label(filt, textvariable=self._q_stats_var, bg=BG2, fg=FG_DIM, font=MONO_SM).grid(row=0, column=12, padx=12, sticky="w") res_frame = tk.Frame(frame, bg=BG) res_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True) cols = ("cap", "sess", "dir", "sub", "sub_name", "page", "len", "chk") self._q_tree = ttk.Treeview(res_frame, columns=cols, show="headings", selectmode="browse") for cid, head, w in [ ("cap", "Cap", 40), ("sess", "Sess", 40), ("dir", "Dir", 40), ("sub", "SUB", 50), ("sub_name", "Name", 160), ("page", "Page", 60), ("len", "Len", 50), ("chk", "Chk", 50), ]: self._q_tree.heading(cid, text=head, anchor="w") self._q_tree.column(cid, width=w, stretch=(cid == "sub_name")) q_vsb = ttk.Scrollbar(res_frame, orient="vertical", command=self._q_tree.yview) q_hsb = ttk.Scrollbar(res_frame, orient="horizontal", command=self._q_tree.xview) self._q_tree.configure(yscrollcommand=q_vsb.set, xscrollcommand=q_hsb.set) q_vsb.pack(side=tk.RIGHT, fill=tk.Y) q_hsb.pack(side=tk.BOTTOM, fill=tk.X) self._q_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) self._q_tree.tag_configure("bw_row", foreground=COL_BW) self._q_tree.tag_configure("s3_row", foreground=COL_S3) self._q_tree.tag_configure("bad_row", foreground=RED) self._q_tree.bind("<>", lambda _e: self._run_interpret()) interp_frame = tk.Frame(frame, bg=BG2, height=120) interp_frame.pack(side=tk.BOTTOM, fill=tk.X) interp_frame.pack_propagate(False) tk.Label(interp_frame, text="Byte interpretation:", bg=BG2, fg=ACCENT, font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X) ii = tk.Frame(interp_frame, bg=BG2) ii.pack(fill=tk.X, padx=6, pady=2) tk.Label(ii, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).pack(side=tk.LEFT) self._interp_offset_var = tk.StringVar(value="5") tk.Entry(ii, textvariable=self._interp_offset_var, width=6, bg=BG3, fg=FG, font=MONO_SM, insertbackground=FG, relief="flat").pack(side=tk.LEFT, padx=4) tk.Button(ii, text="Interpret", bg=BG3, fg=FG, relief="flat", cursor="hand2", font=MONO_SM, command=self._run_interpret).pack(side=tk.LEFT, padx=4) self._interp_text = tk.Text(interp_frame, bg=BG2, fg=FG, font=MONO_SM, height=4, relief="flat", state="disabled", insertbackground=FG) self._interp_text.pack(fill=tk.X, padx=6, pady=2) self._interp_text.tag_configure("label", foreground=FG_DIM) self._interp_text.tag_configure("value", foreground=YELLOW) self._q_rows: dict[str, object] = {} self._q_capture_rows: list = [None] self._q_sub_values: list = [None] self._refresh_query_dropdowns() def _build_statusbar(self) -> None: bar = tk.Frame(self, bg=BG3, height=20) bar.pack(side=tk.BOTTOM, fill=tk.X) self.sb_var = tk.StringVar(value="Ready") tk.Label(bar, textvariable=self.sb_var, bg=BG3, fg=FG_DIM, font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X) # ── file picking ────────────────────────────────────────────────────── def _browse(self, var: tk.StringVar, default: str) -> None: path = filedialog.askopenfilename( title=f"Select {default}", filetypes=[("Binary", "*.bin"), ("All files", "*.*")], initialfile=default, ) if path: var.set(path) # ── analysis ────────────────────────────────────────────────────────── def _run_analyze(self) -> None: s3p = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None bwp = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None if not s3p or not bwp: messagebox.showerror("Missing files", "Select both S3 and BW raw files.") return if not s3p.exists(): messagebox.showerror("Not found", f"S3 file not found:\n{s3p}") return if not bwp.exists(): messagebox.showerror("Not found", f"BW file not found:\n{bwp}") return self.state.s3_path = s3p self.state.bw_path = bwp self._do_analyze(s3p, bwp) def _browse_bin(self) -> None: path = filedialog.askopenfilename( title="Select session .bin file", filetypes=[("Binary", "*.bin"), ("All files", "*.*")], ) if path: self.bin_var.set(path) def _do_analyze(self, s3_path: Path, bw_path: Path) -> None: self.status_var.set("Parsing...") self.update_idletasks() s3_blob = s3_path.read_bytes() bw_blob = bw_path.read_bytes() # Use mark-based session splitting if a structured .bin is provided bin_str = self.bin_var.get().strip() bin_path = Path(bin_str) if bin_str else None marks = [] if bin_path and bin_path.exists(): marks = parse_structured_bin(bin_path.read_bytes()) if marks: sessions = split_sessions_at_marks(bw_blob, s3_blob, marks) mark_labels = " | ".join(m.label for m in marks) self.sb_var.set(f"{len(marks)} user mark(s): {mark_labels}") self.update_idletasks() else: if bin_path and bin_path.exists(): self.sb_var.set("No user marks found in session .bin — using standard session detection") s3_frames = annotate_frames(parse_s3(s3_blob, trailer_len=0), "S3") bw_frames = annotate_frames(parse_bw(bw_blob, trailer_len=0, validate_checksum=True), "BW") sessions = split_into_sessions(bw_frames, s3_frames) diffs: list[Optional[list[FrameDiff]]] = [None] for i in range(1, len(sessions)): diffs.append(diff_sessions(sessions[i - 1], sessions[i])) self.state.sessions = sessions self.state.diffs = diffs n_s3 = sum(len(s.s3_frames) for s in sessions) n_bw = sum(len(s.bw_frames) for s in sessions) self.status_var.set(f"{len(sessions)} sessions | BW:{n_bw} S3:{n_s3}") self.sb_var.set(f"Loaded: {s3_path.name} + {bw_path.name}") self.export_btn.configure(state="normal") self._rebuild_tree() try: cap_id = self._db.ingest(sessions, s3_path, bw_path) if cap_id is not None: self.state.last_capture_id = cap_id self._refresh_query_dropdowns() for i, lbl in enumerate(self._q_capture_cb["values"]): if lbl.startswith(f"#{cap_id} "): self._q_capture_cb.current(i) break except Exception as exc: self.sb_var.set(f"DB ingest error: {exc}") def _run_export(self) -> None: if not self.state.sessions: messagebox.showinfo("Export", "Run Analyze first.") return outdir = self.state.s3_path.parent if self.state.s3_path else Path(".") out_path = write_claude_export(self.state.sessions, self.state.diffs, outdir, self.state.s3_path, self.state.bw_path) self.sb_var.set(f"Exported: {out_path.name}") if messagebox.askyesno("Export complete", f"Saved to:\n{out_path}\n\nOpen folder?"): import subprocess as sp sp.Popen(["explorer", str(out_path.parent)]) # ── session tree ────────────────────────────────────────────────────── def _rebuild_tree(self) -> None: self.tree.delete(*self.tree.get_children()) for sess in self.state.sessions: label = f"Session {sess.index}" + ("" if sess.is_complete() else " [partial]") n_diff = len(self.state.diffs[sess.index] or []) diff_str = f"{n_diff} changes" if n_diff else "" sid = self.tree.insert("", tk.END, text=label, values=(diff_str,), tags=("session",)) for af in sess.all_frames: src_tag = "bw_frame" if af.source == "BW" else "s3_frame" sub_hex = f"{af.header.sub:02X}" if af.header else "??" lbl_text = f"[{af.source}] {sub_hex} {af.sub_name}" extra = "" tags = (src_tag,) if af.frame.checksum_valid is False: extra = "BAD CHK"; tags = ("bad_chk",) elif af.header is None: lbl_text = f"[{af.source}] MALFORMED"; tags = ("malformed",) self.tree.insert(sid, tk.END, text=lbl_text, values=(extra,), tags=tags, iid=f"frame_{sess.index}_{af.frame.index}_{af.source}") for item in self.tree.get_children(): self.tree.item(item, open=True) def _on_tree_select(self, _e: tk.Event) -> None: sel = self.tree.selection() if not sel: return iid = sel[0] if iid.startswith("frame_"): parts = iid.split("_") self._show_frame_detail(int(parts[1]), int(parts[2]), parts[3]) else: text = self.tree.item(iid, "text") try: self._show_session_detail(int(text.split()[1])) except (IndexError, ValueError): pass def _find_frame_by_sub(self, sess_idx: int, sub: int, page_key: int) -> Optional[AnnotatedFrame]: """Find a frame in a session by (sub, page_key) — used for diff click-through.""" if sess_idx >= len(self.state.sessions): return None sess = self.state.sessions[sess_idx] for af in sess.bw_frames + sess.s3_frames: if af.header and af.header.sub == sub and af.header.page_key == page_key: return af return None def _find_frame(self, sess_idx: int, frame_idx: int, source: str) -> Optional[AnnotatedFrame]: if sess_idx >= len(self.state.sessions): return None pool = (self.state.sessions[sess_idx].bw_frames if source == "BW" else self.state.sessions[sess_idx].s3_frames) return next((af for af in pool if af.frame.index == frame_idx), None) # ── detail renderers ────────────────────────────────────────────────── def _clear_tabs(self) -> None: for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text): self._tc(w) def _show_session_detail(self, idx: int) -> None: if idx >= len(self.state.sessions): return sess = self.state.sessions[idx] diffs = self.state.diffs[idx] self._clear_tabs() w = self.inv_text self._tw(w, f"SESSION {sess.index}\n", "head") self._tw(w, f"Frames: {len(sess.bw_frames)+len(sess.s3_frames)}" f" (BW:{len(sess.bw_frames)} S3:{len(sess.s3_frames)})\n", "normal") if len(sess.bw_frames) != len(sess.s3_frames): self._tw(w, " WARNING: BW/S3 count mismatch\n", "warn") self._tn(w) for i, af in enumerate(sess.all_frames): src = "bw" if af.source == "BW" else "s3" sh = f"{af.header.sub:02X}" if af.header else "??" pg = f" (page {af.header.page_key:04X})" if af.header and af.header.page_key else "" chk = "" if af.frame.checksum_valid is False: chk = " [BAD CHECKSUM]" elif af.frame.checksum_valid is True: chk = f" [{af.frame.checksum_type}]" self._tw(w, f" [{af.source}] #{i:<3} ", src) self._tw(w, f"SUB={sh} ", "addr") self._tw(w, f"{af.sub_name:<30}{pg} len={len(af.frame.payload)}", "dim") if chk: self._tw(w, chk, "warn" if af.frame.checksum_valid is False else "dim") self._tn(w) w = self.diff_text self._tc(w) if diffs is None: self._tw(w, "(No previous session to diff against)\n", "dim") elif not diffs: self._tw(w, f"DIFF vs SESSION {idx-1}\n", "head") self._tw(w, " No changes detected.\n", "dim") else: self._tw(w, f"DIFF vs SESSION {idx-1}\n", "head") self._tw(w, " (click any SUB header to open its hex dump)\n", "dim") for fd in diffs: pg = f" (page {fd.page_key:04X})" if fd.page_key else "" link_tag = f"difflink_{fd.sub}_{fd.page_key}" w.configure(state="normal") start = w.index(tk.INSERT) w.insert(tk.END, f"\n SUB {fd.sub:02X} ({fd.sub_name}){pg}:\n") end = w.index(tk.INSERT) w.tag_add("addr", start, end) w.tag_add(link_tag, start, end) w.tag_configure(link_tag, underline=True) # capture fd values for the closure def _make_handler(s=idx, sub=fd.sub, pk=fd.page_key): def _handler(event): af = self._find_frame_by_sub(s, sub, pk) if af: self._show_frame_detail(s, af.frame.index, af.source) return _handler w.tag_bind(link_tag, "", _make_handler()) w.tag_bind(link_tag, "", lambda e, t=link_tag: w.configure(cursor="hand2")) w.tag_bind(link_tag, "", lambda e: w.configure(cursor="")) w.configure(state="disabled") for bd in fd.diffs: def _fmt(v: int) -> str: if v == -2: return "ADD" if v == -1: return "--" return f"{v:02x}" b, a = _fmt(bd.before), _fmt(bd.after) # A4 inner-frame add/remove: field_name carries the full description if bd.field_name and (bd.before == -2 or bd.after == -2 or bd.before == -1 or bd.after == -1) and bd.field_name.startswith("[A4"): self._tw(w, f" {b} -> {a} ", "changed") self._tw(w, bd.field_name, "known") self._tn(w) else: self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim") self._tw(w, f"{b} -> {a}", "changed") if bd.field_name: self._tw(w, f" [{bd.field_name}]", "known") self._tn(w) report = render_session_report(sess, diffs, idx - 1 if idx > 0 else None) self._tc(self.report_text) self._tw(self.report_text, report, "normal") self.nb.select(0) def _show_frame_detail(self, sess_idx: int, frame_idx: int, source: str) -> None: af = self._find_frame(sess_idx, frame_idx, source) if af is None: return self._clear_tabs() src = "bw" if source == "BW" else "s3" sh = f"{af.header.sub:02X}" if af.header else "??" w = self.inv_text self._tw(w, f"[{af.source}] Frame #{af.frame.index}\n", src) self._tw(w, f"Session {sess_idx} | ", "dim") self._tw(w, f"SUB={sh} {af.sub_name}\n", "addr") if af.header: self._tw(w, f" OFFSET: {af.header.page_key:04X} " f"CMD={af.header.cmd:02X} FLAGS={af.header.flags:02X}\n", "dim") self._tn(w) self._tw(w, f"Payload: {len(af.frame.payload)} bytes\n", "dim") if af.frame.checksum_valid is False: self._tw(w, " BAD CHECKSUM\n", "warn") elif af.frame.checksum_valid is True: self._tw(w, f" Checksum: {af.frame.checksum_type} {af.frame.checksum_hex}\n", "dim") self._tn(w) p = af.frame.payload if len(p) >= 5: self._tw(w, "Header breakdown:\n", "head") self._tw(w, f" [0] CMD = {p[0]:02x}\n", "dim") self._tw(w, f" [1] ? = {p[1]:02x}\n", "dim") self._tw(w, f" [2] SUB = {p[2]:02x} ({af.sub_name})\n", src) self._tw(w, f" [3] OFFSET_HI = {p[3]:02x}\n", "dim") self._tw(w, f" [4] OFFSET_LO = {p[4]:02x}\n", "dim") if len(p) > 5: self._tw(w, f" [5..] data = {len(p)-5} bytes\n", "dim") w = self.hex_text self._tw(w, f"[{af.source}] SUB={sh} {af.sub_name}\n", src) self._tw(w, f"Payload ({len(af.frame.payload)} bytes):\n", "dim") self._tn(w) self._tw(w, "\n".join(format_hex_dump(af.frame.payload, indent=" ")) + "\n", "normal") diffs_for_sess = self.state.diffs[sess_idx] if sess_idx < len(self.state.diffs) else None if diffs_for_sess and af.header: matching = [fd for fd in diffs_for_sess if fd.sub == af.header.sub and fd.page_key == af.header.page_key] if matching: self._tn(w) self._tw(w, "Changed bytes (vs prev session):\n", "head") for bd in matching[0].diffs: b = f"{bd.before:02x}" if bd.before >= 0 else "--" a = f"{bd.after:02x}" if bd.after >= 0 else "--" self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim") self._tw(w, f"{b} -> {a}", "changed") if bd.field_name: self._tw(w, f" [{bd.field_name}]", "known") self._tn(w) self.nb.select(1) # ── live mode ───────────────────────────────────────────────────────── def _toggle_live(self) -> None: if self._live_thread and self._live_thread.is_alive(): self._live_stop.set() self._set_live_btn_off() else: s3p = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None bwp = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None if not s3p or not bwp: messagebox.showerror("Missing files", "Select both raw files before starting live mode.") return self.state.s3_path = s3p self.state.bw_path = bwp self._start_live() def _start_live(self) -> None: s3p = Path(self.s3_var.get().strip()) bwp = Path(self.bw_var.get().strip()) self.state.s3_path = s3p self.state.bw_path = bwp self._live_stop.clear() self._live_thread = threading.Thread( target=self._live_worker, args=(s3p, bwp), daemon=True) self._live_thread.start() self.live_btn.configure(text="Live: ON", bg=GREEN, fg="#000000") self.status_var.set("Live mode running...") def _set_live_btn_off(self) -> None: self.live_btn.configure(text="Live: OFF", bg=BG3, fg=FG) self.status_var.set("Live stopped") def _live_worker(self, s3_path: Path, bw_path: Path) -> None: import time s3_pos = bw_pos = 0 while not self._live_stop.is_set(): changed = False for path, pos_attr in ((s3_path, "s3"), (bw_path, "bw")): if path.exists(): with path.open("rb") as fh: fh.seek(s3_pos if pos_attr == "s3" else bw_pos) nb = fh.read() if nb: if pos_attr == "s3": s3_pos += len(nb) else: bw_pos += len(nb) changed = True if changed: self._live_q.put("refresh") time.sleep(0.1) def _poll_live_queue(self) -> None: try: while True: msg = self._live_q.get_nowait() if msg == "refresh" and self.state.s3_path and self.state.bw_path: self._do_analyze(self.state.s3_path, self.state.bw_path) except queue.Empty: pass finally: self.after(150, self._poll_live_queue) # ── DB query ────────────────────────────────────────────────────────── def _refresh_query_dropdowns(self) -> None: try: captures = self._db.list_captures() self._q_capture_cb["values"] = ["All"] + [ f"#{r['id']} {r['timestamp'][:16]} ({r['frame_count']} frames)" for r in captures ] self._q_capture_rows = [None] + [r["id"] for r in captures] subs = self._db.get_distinct_subs() self._q_sub_cb["values"] = ["All"] + [f"0x{s:02X}" for s in subs] self._q_sub_values = [None] + subs stats = self._db.get_stats() self._q_stats_var.set(f"DB: {stats['captures']} captures | {stats['frames']} frames") except Exception as exc: self._q_stats_var.set(f"DB error: {exc}") def _parse_int(self, s: str) -> Optional[int]: s = s.strip() if not s: return None try: return int(s, 0) except ValueError: return None def _run_db_query(self) -> None: cap_idx = self._q_capture_cb.current() cap_id = self._q_capture_rows[cap_idx] if cap_idx > 0 else None dir_val = self._q_dir_var.get() direction = dir_val if dir_val != "All" else None sub_idx = self._q_sub_cb.current() sub = self._q_sub_values[sub_idx] if sub_idx > 0 else None offset = self._parse_int(self._q_offset_var.get()) value = self._parse_int(self._q_value_var.get()) try: if offset is not None: rows = self._db.query_by_byte(offset=offset, value=value, capture_id=cap_id, direction=direction, sub=sub) else: rows = self._db.query_frames(capture_id=cap_id, direction=direction, sub=sub) except Exception as exc: messagebox.showerror("Query error", str(exc)) return self._q_tree.delete(*self._q_tree.get_children()) self._q_rows.clear() for row in rows: sh = f"0x{row['sub']:02X}" if row["sub"] is not None else "—" pg = f"0x{row['page_key']:04X}" if row["page_key"] is not None else "—" chk = {1: "OK", 0: "BAD", None: "—"}.get(row["checksum_ok"], "—") tag = "bw_row" if row["direction"] == "BW" else "s3_row" if row["checksum_ok"] == 0: tag = "bad_row" iid = str(row["id"]) self._q_tree.insert("", tk.END, iid=iid, tags=(tag,), values=( row["capture_id"], row["session_idx"], row["direction"], sh, row["sub_name"] or "", pg, row["payload_len"], chk)) self._q_rows[iid] = row self.sb_var.set(f"Query returned {len(rows)} rows") def _run_interpret(self) -> None: sel = self._q_tree.selection() if not sel: return row = self._q_rows.get(sel[0]) if not row: return offset = self._parse_int(self._interp_offset_var.get()) if offset is None: return payload = bytes(row["payload"]) interp = self._db.interpret_offset(payload, offset) w = self._interp_text w.configure(state="normal") w.delete("1.0", tk.END) sh = f"0x{row['sub']:02X}" if row["sub"] is not None else "??" w.insert(tk.END, f"Frame #{row['id']} [{row['direction']}] SUB={sh} " f"offset={offset} (0x{offset:04X})\n", "label") line = "" for key, lbl in [ ("uint8","uint8 "), ("int8","int8 "), ("uint16_be","uint16 BE "), ("uint16_le","uint16 LE "), ("uint32_be","uint32 BE "), ("uint32_le","uint32 LE "), ("float32_be","float32 BE "), ("float32_le","float32 LE "), ]: if key not in interp: continue val = interp[key] vs = f"{val:.6g}" if isinstance(val, float) else ( f"{val} (0x{int(val)&0xFFFFFFFF:X})") line += f" {lbl}: {vs:<28}" if len(line) > 80: w.insert(tk.END, line + "\n", "value"); line = "" if line: w.insert(tk.END, line + "\n", "value") w.configure(state="disabled") # ── text helpers ────────────────────────────────────────────────────── def _tc(self, w: tk.Text) -> None: w.configure(state="normal"); w.delete("1.0", tk.END) def _tw(self, w: tk.Text, text: str, tag: str = "normal") -> None: w.configure(state="normal"); w.insert(tk.END, text, tag) def _tn(self, w: tk.Text) -> None: w.configure(state="normal"); w.insert(tk.END, "\n"); w.configure(state="disabled") # ───────────────────────────────────────────────────────────────────────────── # ───────────────────────────────────────────────────────────────────────────── # Serial Watch panel — tap the RS-232 line between device and modem # ───────────────────────────────────────────────────────────────────────────── try: import serial as _serial from serial.tools import list_ports as _list_ports _SERIAL_OK = True except ImportError: _SERIAL_OK = False from minimateplus.framing import S3FrameParser as _S3FrameParser # noqa: E402 _SW_KNOWN_SUBS = { 0xA4: "POLL_RSP", 0xA5: "BULK_WAVEFORM_RSP", 0xE0: "ADV_EVENT_RSP", 0xE1: "EVT_IDX_FIRST_RSP", 0xE3: "MONITOR_STATUS_RSP", 0xEA: "SERIAL_NUM_RSP", 0xF3: "WAVEFORM_REC_RSP", 0xF5: "WAVEFORM_HDR_RSP", 0xF7: "EVENT_INDEX_RSP", 0xF9: "UNK_06_RSP", 0xFE: "DEVICE_INFO_RSP", 0x69: "START_MON_ACK", 0x68: "STOP_MON_ACK", } class SerialWatchPanel(tk.Frame): """ Tap the RS-232 line between the MiniMate Plus and its modem (RV50/RV55). Runs the serial reader in a background thread; surfaces parsed S3 frames live in the log view. Writes raw_s3_.bin compatible with Analyzer. Typical use for call-home capture: 1. Connect a USB-to-serial tap to the RS-232 line. 2. Pick that COM port here, click Start. 3. Wait for the unit to trigger / call home. 4. Click Stop, then 'Open in Analyzer' to inspect the frames. """ _COL_FRAME = "#4ec9b0" # teal — parsed S3 frame _COL_CTRL = "#dcdcaa" # yellow — control-line change _COL_AT = "#9cdcfe" # blue — AT command / ASCII noise _COL_ERR = "#f44747" # red — error def __init__(self, parent: tk.Widget, on_capture_ready=None, **kw): """ on_capture_ready(raw_s3_path: str) — called when capture stops, so the parent can inject the file into the Analyzer. """ super().__init__(parent, bg=BG2, **kw) self._on_capture_ready = on_capture_ready self._serial: Optional[object] = None # serial.Serial instance self._reader_thread: Optional[threading.Thread] = None self._stop_evt = threading.Event() self._log_q: queue.Queue[tuple[str, str]] = queue.Queue() # (text, colour) self._raw_fh = None # open binary file handle self._raw_path: Optional[str] = None self._frame_count = 0 self._build() self._poll_log_queue() # ── build ───────────────────────────────────────────────────────────── def _build(self) -> None: pad = {"padx": 6, "pady": 4} cfg = tk.Frame(self, bg=BG2) cfg.pack(side=tk.TOP, fill=tk.X, padx=4, pady=4) # Row 0 — port picker tk.Label(cfg, text="COM port:", bg=BG2, fg=FG, font=MONO ).grid(row=0, column=0, sticky="e", **pad) self._port_var = tk.StringVar() self._port_cb = ttk.Combobox(cfg, textvariable=self._port_var, width=12, font=MONO, state="normal") self._port_cb.grid(row=0, column=1, sticky="w", **pad) tk.Button(cfg, text="↺", bg=BG3, fg=FG, relief="flat", cursor="hand2", font=MONO, command=self._refresh_ports ).grid(row=0, column=2, **pad) tk.Label(cfg, text=" Baud:", bg=BG2, fg=FG, font=MONO ).grid(row=0, column=3, sticky="e", **pad) self._baud_var = tk.StringVar(value="38400") tk.Entry(cfg, textvariable=self._baud_var, width=8, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO ).grid(row=0, column=4, sticky="w", **pad) self._ack_ok_var = tk.BooleanVar(value=False) tk.Checkbutton(cfg, text="Ack OK to AT commands", variable=self._ack_ok_var, bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2, font=MONO).grid(row=0, column=5, sticky="w", **pad) # Row 1 — capture dir tk.Label(cfg, text="Save to:", bg=BG2, fg=FG, font=MONO ).grid(row=1, column=0, sticky="e", **pad) self._dir_var = tk.StringVar( value=str(SCRIPT_DIR / "bridges" / "captures")) tk.Entry(cfg, textvariable=self._dir_var, width=40, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO ).grid(row=1, column=1, columnspan=4, sticky="we", **pad) tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2", font=MONO, command=self._choose_dir ).grid(row=1, column=5, **pad) # Button row btn_row = tk.Frame(self, bg=BG2) btn_row.pack(side=tk.TOP, fill=tk.X, padx=4, pady=2) self._start_btn = tk.Button( btn_row, text="Start Watch", bg=GREEN, fg="#000000", relief="flat", padx=12, cursor="hand2", font=MONO_B, command=self._start) self._start_btn.pack(side=tk.LEFT, padx=6) self._stop_btn = tk.Button( btn_row, text="Stop", bg=BG3, fg=FG, relief="flat", padx=12, cursor="hand2", font=MONO, command=self._stop, state="disabled") self._stop_btn.pack(side=tk.LEFT, padx=4) self._analyzer_btn = tk.Button( btn_row, text="Open in Analyzer", bg=BG3, fg=FG, relief="flat", padx=10, cursor="hand2", font=MONO, command=self._send_to_analyzer, state="disabled") self._analyzer_btn.pack(side=tk.LEFT, padx=4) tk.Button(btn_row, text="Clear", bg=BG3, fg=FG, relief="flat", padx=8, cursor="hand2", font=MONO, command=self._clear_log).pack(side=tk.LEFT, padx=4) self._status_var = tk.StringVar(value="Idle") tk.Label(btn_row, textvariable=self._status_var, bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=10) # Log view self._log = scrolledtext.ScrolledText( self, height=24, font=MONO_SM, bg=BG, fg=FG, insertbackground=FG, relief="flat", state="disabled", ) self._log.pack(fill=tk.BOTH, expand=True, padx=4, pady=4) self._log.tag_config("frame", foreground=self._COL_FRAME) self._log.tag_config("ctrl", foreground=self._COL_CTRL) self._log.tag_config("at", foreground=self._COL_AT) self._log.tag_config("err", foreground=self._COL_ERR) self._log.tag_config("dim", foreground=FG_DIM) # Populate ports on first load self._refresh_ports() # ── port helpers ────────────────────────────────────────────────────── def _refresh_ports(self) -> None: if not _SERIAL_OK: self._port_cb["values"] = ["(pyserial not installed)"] return ports = [p.device for p in _list_ports.comports()] self._port_cb["values"] = ports if ports and not self._port_var.get(): self._port_var.set(ports[0]) def _choose_dir(self) -> None: d = filedialog.askdirectory(initialdir=self._dir_var.get()) if d: self._dir_var.set(d) # ── start / stop ────────────────────────────────────────────────────── def _start(self) -> None: if not _SERIAL_OK: messagebox.showerror( "pyserial missing", "Install pyserial first:\n pip install pyserial") return port = self._port_var.get().strip() if not port or "not installed" in port: messagebox.showerror("Error", "Select a valid COM port first.") return try: baud = int(self._baud_var.get().strip()) except ValueError: messagebox.showerror("Error", "Invalid baud rate.") return # Open output files ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") out_dir = Path(self._dir_var.get()) / f"serial_{ts}" out_dir.mkdir(parents=True, exist_ok=True) self._raw_path = str(out_dir / f"raw_s3_{ts}.bin") try: self._raw_fh = open(self._raw_path, "wb") except OSError as exc: messagebox.showerror("Error", f"Cannot open capture file:\n{exc}") return # Open serial port try: ser = _serial.Serial( port=port, baudrate=baud, bytesize=8, parity=_serial.PARITY_NONE, stopbits=_serial.STOPBITS_ONE, timeout=0.05, write_timeout=0, ) ser.setDTR(True) ser.setRTS(True) except Exception as exc: self._raw_fh.close() self._raw_fh = None messagebox.showerror("Error", f"Cannot open {port}:\n{exc}") return self._serial = ser self._stop_evt.clear() self._frame_count = 0 self._analyzer_btn.configure(state="disabled") self._reader_thread = threading.Thread( target=self._reader_loop, args=(ser, baud), daemon=True, ) self._reader_thread.start() self._status_var.set(f"Watching {port} @ {baud}") self._start_btn.configure(state="disabled") self._stop_btn.configure(state="normal", bg=RED) self._append(f"── Serial watch started {port} @ {baud} [{ts}] ──\n", "dim") self._append(f" Capture: {self._raw_path}\n", "dim") self._append(" Waiting for data…\n\n", "dim") def _stop(self) -> None: self._stop_evt.set() if self._serial: try: self._serial.close() except Exception: pass self._serial = None if self._raw_fh: self._raw_fh.close() self._raw_fh = None self._status_var.set("Stopped") self._start_btn.configure(state="normal") self._stop_btn.configure(state="disabled", bg=BG3) if self._raw_path and Path(self._raw_path).exists(): self._analyzer_btn.configure(state="normal") self._append("\n── Watch stopped ──\n", "dim") # ── reader thread ───────────────────────────────────────────────────── def _reader_loop(self, ser, baud: int) -> None: parser = _S3FrameParser() rx_buf = bytearray() ack_ok = self._ack_ok_var.get() # Monitor control lines in a sub-thread ctrl_stop = threading.Event() ctrl_thread = threading.Thread( target=self._ctrl_loop, args=(ser, ctrl_stop), daemon=True) ctrl_thread.start() try: while not self._stop_evt.is_set(): try: data = ser.read(4096) except Exception as exc: self._log_q.put((f"Read error: {exc}\n", "err")) break if not data: continue # Save raw bytes if self._raw_fh: try: self._raw_fh.write(data) self._raw_fh.flush() except Exception: pass # Parse S3 frames for byte in data: result = parser.feed(bytes([byte])) if result: frames = result if isinstance(result, list) else [result] for f in frames: self._frame_count += 1 name = _SW_KNOWN_SUBS.get(f.sub, f"UNK_0x{f.sub:02X}") chk = "✓" if f.checksum_valid else "✗ BAD_CHK" peek = f.data[:32].hex() + ("…" if len(f.data) > 32 else "") msg = ( f"[{self._frame_count:04d}] " f"SUB=0x{f.sub:02X} ({name:<22}) " f"page=0x{f.page_key:04X} " f"data={len(f.data):4d}B {chk}\n" f" {peek}\n" ) self._log_q.put((msg, "frame")) # AT command handling for --ack-ok mode if ack_ok: rx_buf.extend(data) while b"\r" in rx_buf or b"\n" in rx_buf: for sep in (b"\r", b"\n"): idx = rx_buf.find(sep) if idx != -1: line_bytes = bytes(rx_buf[:idx]) del rx_buf[:idx + 1] break else: break line_str = line_bytes.decode("latin1", errors="ignore").strip() if line_str.upper().startswith("AT"): self._log_q.put((f"AT: {line_str!r}\n", "at")) if not line_str.upper().startswith("ATDT"): try: ser.write(b"\r\nOK\r\n") ser.flush() self._log_q.put((f" → OK\n", "at")) except Exception: pass finally: ctrl_stop.set() ctrl_thread.join(timeout=0.5) # Signal the main thread that the reader ended naturally if not self._stop_evt.is_set(): self._log_q.put(("<>", "")) def _ctrl_loop(self, ser, stop: threading.Event) -> None: prev = {} try: prev = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd) try: prev["RI"] = ser.ri except Exception: prev["RI"] = None except Exception: return while not stop.is_set(): try: cur = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd, RI=None) try: cur["RI"] = ser.ri except Exception: pass for name, val in cur.items(): if val != prev.get(name): self._log_q.put((f"CTRL {name} → {val}\n", "ctrl")) prev[name] = val except Exception: break stop.wait(0.2) # ── log view ────────────────────────────────────────────────────────── def _poll_log_queue(self) -> None: try: while True: text, tag = self._log_q.get_nowait() if text == "<>": self._stop() break self._append(text, tag) except queue.Empty: pass finally: self.after(80, self._poll_log_queue) def _append(self, text: str, tag: str = "") -> None: self._log.configure(state="normal") if tag: self._log.insert(tk.END, text, tag) else: self._log.insert(tk.END, text) self._log.see(tk.END) self._log.configure(state="disabled") def _clear_log(self) -> None: self._log.configure(state="normal") self._log.delete("1.0", tk.END) self._log.configure(state="disabled") # ── send to analyzer ────────────────────────────────────────────────── def _send_to_analyzer(self) -> None: if self._raw_path and self._on_capture_ready: self._on_capture_ready(self._raw_path) # Console panel (tk.Frame — lives inside a notebook tab) # ───────────────────────────────────────────────────────────────────────────── class ConsolePanel(tk.Frame): """ Direct device connection and diagnostic console. Lets you run individual protocol commands against a MiniMate Plus via serial or TCP, showing colour-coded TX/RX bytes and decoded output in a scrolling console. Colour scheme: TX frames — ACCENT blue (#569cd6) RX raw hex — teal (#4ec9b0) Parsed/decoded — green (#4caf50) Errors — red (#f44747) Status/info — dim grey (#6a6a6a) Section heads — yellow (#dcdcaa) Log is auto-saved on "Save Log"; "Send to Analyzer" writes the captured RX bytes as a raw .bin file and injects the path into the Analyzer tab. """ TAG_TX = "tx" TAG_RX_RAW = "rx_raw" TAG_PARSED = "parsed" TAG_ERROR = "error" TAG_STATUS = "status" TAG_HEAD = "head" MAX_LINES = 5000 def __init__(self, parent: tk.Widget, on_send_to_analyzer=None, **kw): super().__init__(parent, bg=BG2, **kw) self._on_send_to_analyzer = on_send_to_analyzer self._q: queue.Queue = queue.Queue() self._running = False self._log_lines: list[str] = [] self._last_raw_rx: Optional[bytes] = None self._cmd_btns: list[tk.Button] = [] self._build() self._poll_q() # ── build ───────────────────────────────────────────────────────────── def _build(self) -> None: pad = {"padx": 5, "pady": 3} # ── top config row ──────────────────────────────────────────────── cfg = tk.Frame(self, bg=BG2) cfg.pack(side=tk.TOP, fill=tk.X, padx=6, pady=4) # Transport radio buttons self._transport_var = tk.StringVar(value="tcp") tk.Radiobutton( cfg, text="TCP", variable=self._transport_var, value="tcp", bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2, font=MONO, command=self._on_transport_change, ).grid(row=0, column=0, padx=(0, 4)) tk.Radiobutton( cfg, text="Serial", variable=self._transport_var, value="serial", bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2, font=MONO, command=self._on_transport_change, ).grid(row=0, column=1, padx=(0, 12)) # TCP fields self._tcp_frame = tk.Frame(cfg, bg=BG2) self._tcp_frame.grid(row=0, column=2, sticky="w") tk.Label(self._tcp_frame, text="Host:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad) self._host_var = tk.StringVar(value="127.0.0.1") tk.Entry( self._tcp_frame, textvariable=self._host_var, width=18, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO, ).pack(side=tk.LEFT, padx=2) tk.Label(self._tcp_frame, text="Port:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, padx=(10, 4)) self._tcp_port_var = tk.StringVar(value="9034") tk.Entry( self._tcp_frame, textvariable=self._tcp_port_var, width=6, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO, ).pack(side=tk.LEFT, padx=2) # Serial fields (hidden by default) self._serial_frame = tk.Frame(cfg, bg=BG2) tk.Label(self._serial_frame, text="Port:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad) self._port_var = tk.StringVar(value="COM5") tk.Entry( self._serial_frame, textvariable=self._port_var, width=10, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO, ).pack(side=tk.LEFT, padx=2) tk.Label(self._serial_frame, text="Baud:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, padx=(10, 4)) self._baud_var = tk.StringVar(value="38400") tk.Entry( self._serial_frame, textvariable=self._baud_var, width=8, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO, ).pack(side=tk.LEFT, padx=2) # Timeout tk.Label(cfg, text="Timeout:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=3, padx=(18, 4)) self._timeout_var = tk.StringVar(value="30") tk.Entry( cfg, textvariable=self._timeout_var, width=5, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO, ).grid(row=0, column=4, padx=2) tk.Label(cfg, text="s", bg=BG2, fg=FG_DIM, font=MONO).grid(row=0, column=5) # ── command buttons row ─────────────────────────────────────────── cmd_row = tk.Frame(self, bg=BG2) cmd_row.pack(side=tk.TOP, fill=tk.X, padx=6, pady=(0, 4)) tk.Label(cmd_row, text="Commands:", bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=(0, 10)) for label, cmd in [ ("POLL", "poll"), ("Serial #", "serial_number"), ("Full Config", "full_config"), ("Event Index", "event_index"), ]: btn = tk.Button( cmd_row, text=label, bg=ACCENT, fg="#ffffff", relief="flat", padx=10, cursor="hand2", font=MONO, command=lambda c=cmd: self._run_command(c), ) btn.pack(side=tk.LEFT, padx=4) self._cmd_btns.append(btn) self._status_var = tk.StringVar(value="Ready") tk.Label(cmd_row, textvariable=self._status_var, bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=14) # ── console output ──────────────────────────────────────────────── self._console = scrolledtext.ScrolledText( self, height=20, font=MONO_SM, bg=BG, fg=FG, insertbackground=FG, relief="flat", state="disabled", ) self._console.pack(fill=tk.BOTH, expand=True, padx=6, pady=4) self._console.tag_configure(self.TAG_TX, foreground=ACCENT) self._console.tag_configure(self.TAG_RX_RAW, foreground=COL_S3) self._console.tag_configure(self.TAG_PARSED, foreground=GREEN) self._console.tag_configure(self.TAG_ERROR, foreground=RED) self._console.tag_configure(self.TAG_STATUS, foreground=FG_DIM) self._console.tag_configure(self.TAG_HEAD, foreground=YELLOW, font=MONO_B) # ── bottom bar ──────────────────────────────────────────────────── bot = tk.Frame(self, bg=BG2) bot.pack(side=tk.BOTTOM, fill=tk.X, padx=6, pady=4) tk.Button( bot, text="Clear", bg=BG3, fg=FG, relief="flat", padx=10, cursor="hand2", font=MONO, command=self._clear_console, ).pack(side=tk.LEFT, padx=4) tk.Button( bot, text="Save Log", bg=BG3, fg=FG, relief="flat", padx=10, cursor="hand2", font=MONO, command=self._save_log, ).pack(side=tk.LEFT, padx=4) self._send_btn = tk.Button( bot, text="Send to Analyzer", bg=BG3, fg=FG_DIM, relief="flat", padx=10, cursor="hand2", font=MONO, command=self._send_to_analyzer, state="disabled", ) self._send_btn.pack(side=tk.LEFT, padx=4) # ── transport toggle ────────────────────────────────────────────────── def _on_transport_change(self) -> None: if self._transport_var.get() == "tcp": self._serial_frame.grid_remove() self._tcp_frame.grid(row=0, column=2, sticky="w") else: self._tcp_frame.grid_remove() self._serial_frame.grid(row=0, column=2, sticky="w") # ── console helpers ─────────────────────────────────────────────────── def _append(self, text: str, tag: str = "status") -> None: """Append coloured text (main thread only — called via _poll_q).""" self._log_lines.append(text) self._console.configure(state="normal") self._console.insert(tk.END, text, tag) line_count = int(self._console.index("end-1c").split(".")[0]) if line_count > self.MAX_LINES: self._console.delete("1.0", f"{line_count - self.MAX_LINES}.0") self._console.see(tk.END) self._console.configure(state="disabled") def _clear_console(self) -> None: self._console.configure(state="normal") self._console.delete("1.0", tk.END) self._console.configure(state="disabled") self._log_lines.clear() def _save_log(self) -> None: cap_dir = SCRIPT_DIR / "bridges" / "captures" cap_dir.mkdir(parents=True, exist_ok=True) ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") path = cap_dir / f"console_{ts}.log" try: path.write_text("".join(self._log_lines), encoding="utf-8") self._q.put(("status", f"Log saved → {path.name}")) except Exception as exc: messagebox.showerror("Save Error", str(exc)) def _send_to_analyzer(self) -> None: if not self._last_raw_rx or not self._on_send_to_analyzer: return cap_dir = SCRIPT_DIR / "bridges" / "captures" cap_dir.mkdir(parents=True, exist_ok=True) ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") raw_path = cap_dir / f"console_s3_{ts}.bin" try: raw_path.write_bytes(self._last_raw_rx) self._on_send_to_analyzer(str(raw_path)) self._q.put(("status", f"Sent to Analyzer → {raw_path.name}")) except Exception as exc: messagebox.showerror("Error", str(exc)) # ── command dispatch ────────────────────────────────────────────────── def _set_buttons_state(self, state: str) -> None: for btn in self._cmd_btns: btn.configure(state=state) def _run_command(self, cmd: str) -> None: if self._running: return # Snapshot config in main thread before handing off to worker config = { "transport": self._transport_var.get(), "host": self._host_var.get().strip(), "tcp_port": int(self._tcp_port_var.get().strip() or "9034"), "port": self._port_var.get().strip(), "baud": int(self._baud_var.get().strip() or "38400"), "timeout": float(self._timeout_var.get().strip() or "30"), "cmd": cmd, } self._running = True self._set_buttons_state("disabled") self._status_var.set("Running…") threading.Thread(target=self._worker, args=(config,), daemon=True).start() # ── worker thread ───────────────────────────────────────────────────── def _worker(self, cfg: dict) -> None: """Background thread — open transport, run command, post results to queue.""" q = self._q def post(kind: str, text: str) -> None: q.put((kind, text)) try: from minimateplus.transport import SerialTransport, TcpTransport from minimateplus.protocol import ( MiniMateProtocol, SUB_SERIAL_NUMBER, SUB_FULL_CONFIG, SUB_EVENT_INDEX, ) except ImportError as exc: post("error", f"Import error: {exc}\nIs minimateplus installed?\n") q.put(("done", None)) return timeout = cfg["timeout"] cmd = cfg["cmd"] # Build transport if cfg["transport"] == "tcp": host = cfg["host"] tcp_port = cfg["tcp_port"] post("status", f"Connecting {host}:{tcp_port}…") transport = TcpTransport(host, tcp_port, connect_timeout=timeout) else: port = cfg["port"] baud = cfg["baud"] post("status", f"Opening {port} @ {baud} baud…") transport = SerialTransport(port, baud) # Wrap transport to capture every TX/RX byte raw_rx = bytearray() orig_write = transport.write orig_read = transport.read def logged_write(data: bytes) -> None: post("tx", f"TX [{len(data):3d}B]: {data.hex()}\n") orig_write(data) def logged_read(n: int) -> bytes: result = orig_read(n) if result: raw_rx.extend(result) post("rx_raw", f"RX [{len(result):3d}B]: {result.hex()}\n") return result transport.write = logged_write # type: ignore[method-assign] transport.read = logged_read # type: ignore[method-assign] try: with transport: post("status", "Connected.") proto = MiniMateProtocol(transport, recv_timeout=timeout) if cmd == "poll": post("head", "\n── POLL startup ─────────────────────────────\n") frame = proto.startup() post("parsed", f" payload ({len(frame.data)} B): {frame.data.hex()}\n") try: text = frame.data.decode("ascii", errors="replace") post("parsed", f" text: {text!r}\n") except Exception: pass elif cmd == "serial_number": post("head", "\n── POLL startup ─────────────────────────────\n") proto.startup() post("head", "\n── Serial Number (SUB 0x15) ─────────────────\n") data = proto.read(SUB_SERIAL_NUMBER) post("parsed", f" raw ({len(data)} B): {data.hex()}\n") sn = data.rstrip(b"\x00").decode("ascii", errors="replace").strip() post("parsed", f" serial: {sn!r}\n") elif cmd == "full_config": post("head", "\n── POLL startup ─────────────────────────────\n") proto.startup() post("head", "\n── Full Config (SUB 0x01) ───────────────────\n") data = proto.read(SUB_FULL_CONFIG) post("parsed", f" raw ({len(data)} B):\n") for i in range(0, len(data), 16): chunk = data[i:i + 16] hex_part = " ".join(f"{b:02X}" for b in chunk) asc_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk) post("parsed", f" {i:04X}: {hex_part:<48} {asc_part}\n") elif cmd == "event_index": post("head", "\n── POLL startup ─────────────────────────────\n") proto.startup() post("head", "\n── Event Index (SUB 0x08) ───────────────────\n") data = proto.read(SUB_EVENT_INDEX) post("parsed", f" raw ({len(data)} B):\n") for i in range(0, len(data), 16): chunk = data[i:i + 16] hex_part = " ".join(f"{b:02X}" for b in chunk) asc_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk) post("parsed", f" {i:04X}: {hex_part:<48} {asc_part}\n") post("status", "Done.") q.put(("save_raw", bytes(raw_rx))) except Exception as exc: post("error", f"\nError: {exc}\n") finally: q.put(("done", None)) # ── queue poll ──────────────────────────────────────────────────────── def _poll_q(self) -> None: try: while True: kind, payload = self._q.get_nowait() if kind == "tx": self._append(payload, self.TAG_TX) elif kind == "rx_raw": self._append(payload, self.TAG_RX_RAW) elif kind == "parsed": self._append(payload, self.TAG_PARSED) elif kind == "error": self._append(payload, self.TAG_ERROR) elif kind == "head": self._append(payload, self.TAG_HEAD) elif kind == "status": self._status_var.set(str(payload)) self._append(f" [{payload}]\n", self.TAG_STATUS) elif kind == "save_raw": self._last_raw_rx = payload if payload: self._send_btn.configure(state="normal", fg=FG) elif kind == "done": self._running = False self._set_buttons_state("normal") self._status_var.set("Ready") except queue.Empty: pass finally: self.after(100, self._poll_q) # ───────────────────────────────────────────────────────────────────────────── # Download panel — connect to a device, run get_events(), capture wire bytes # ───────────────────────────────────────────────────────────────────────────── class DownloadPanel(tk.Frame): """ Connect directly to a MiniMate Plus and download events while transparently saving every wire byte in the same format as a Blastware MITM capture. Each download produces a session directory containing: seismo_dl_/raw_bw_.bin — bytes WE sent (BW TX) seismo_dl_/raw_s3_.bin — bytes the unit sent (S3 TX) These files are byte-for-byte compatible with the captures produced by `bridges/ach_mitm.py` and load directly in the Analyzer tab. Use this when you want to reproduce or troubleshoot a flow that Blastware is doing — any session captured here can be diffed against a real BW capture to confirm wire-level parity. """ TAG_TX = "tx" TAG_RX_RAW = "rx_raw" TAG_PARSED = "parsed" TAG_ERROR = "error" TAG_STATUS = "status" TAG_HEAD = "head" MAX_LINES = 5000 def __init__(self, parent: tk.Widget, on_capture_ready=None, **kw): """ on_capture_ready(bw_path, s3_path, label) — invoked when a capture completes successfully so the parent can hand the files to the Analyzer. """ super().__init__(parent, bg=BG2, **kw) self._on_capture_ready = on_capture_ready self._q: queue.Queue = queue.Queue() self._running = False self._cmd_btns: list[tk.Button] = [] self._last_paths: Optional[tuple[str, str, str]] = None # (bw, s3, label) self._build() self._poll_q() # ── build ───────────────────────────────────────────────────────────── def _build(self) -> None: pad = {"padx": 5, "pady": 3} cfg = tk.Frame(self, bg=BG2) cfg.pack(side=tk.TOP, fill=tk.X, padx=6, pady=4) # Transport radio self._transport_var = tk.StringVar(value="tcp") tk.Radiobutton( cfg, text="TCP", variable=self._transport_var, value="tcp", bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2, font=MONO, command=self._on_transport_change, ).grid(row=0, column=0, padx=(0, 4)) tk.Radiobutton( cfg, text="Serial", variable=self._transport_var, value="serial", bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2, font=MONO, command=self._on_transport_change, ).grid(row=0, column=1, padx=(0, 12)) # TCP fields self._tcp_frame = tk.Frame(cfg, bg=BG2) self._tcp_frame.grid(row=0, column=2, sticky="w") tk.Label(self._tcp_frame, text="Host:", bg=BG2, fg=FG, font=MONO ).pack(side=tk.LEFT, **pad) self._host_var = tk.StringVar(value="127.0.0.1") tk.Entry(self._tcp_frame, textvariable=self._host_var, width=18, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO, ).pack(side=tk.LEFT, padx=2) tk.Label(self._tcp_frame, text="Port:", bg=BG2, fg=FG, font=MONO ).pack(side=tk.LEFT, padx=(10, 4)) self._tcp_port_var = tk.StringVar(value="9034") tk.Entry(self._tcp_frame, textvariable=self._tcp_port_var, width=6, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO, ).pack(side=tk.LEFT, padx=2) # Serial fields (hidden by default) self._serial_frame = tk.Frame(cfg, bg=BG2) tk.Label(self._serial_frame, text="Port:", bg=BG2, fg=FG, font=MONO ).pack(side=tk.LEFT, **pad) self._port_var = tk.StringVar(value="COM5") tk.Entry(self._serial_frame, textvariable=self._port_var, width=10, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO, ).pack(side=tk.LEFT, padx=2) tk.Label(self._serial_frame, text="Baud:", bg=BG2, fg=FG, font=MONO ).pack(side=tk.LEFT, padx=(10, 4)) self._baud_var = tk.StringVar(value="38400") tk.Entry(self._serial_frame, textvariable=self._baud_var, width=8, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO, ).pack(side=tk.LEFT, padx=2) # Timeout tk.Label(cfg, text="Timeout:", bg=BG2, fg=FG, font=MONO ).grid(row=0, column=3, padx=(18, 4)) self._timeout_var = tk.StringVar(value="60") tk.Entry(cfg, textvariable=self._timeout_var, width=5, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO, ).grid(row=0, column=4, padx=2) tk.Label(cfg, text="s", bg=BG2, fg=FG_DIM, font=MONO ).grid(row=0, column=5) # Row 1 — output dir + label tk.Label(cfg, text="Save to:", bg=BG2, fg=FG, font=MONO ).grid(row=1, column=0, columnspan=2, sticky="e", padx=4, pady=4) self._dir_var = tk.StringVar( value=str(SCRIPT_DIR / "bridges" / "captures")) tk.Entry(cfg, textvariable=self._dir_var, width=46, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO, ).grid(row=1, column=2, columnspan=3, sticky="we", padx=4, pady=4) tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2", font=MONO, command=self._choose_dir ).grid(row=1, column=5, padx=4, pady=4) tk.Label(cfg, text="Label:", bg=BG2, fg=FG, font=MONO ).grid(row=2, column=0, columnspan=2, sticky="e", padx=4, pady=4) self._label_var = tk.StringVar(value="") tk.Entry(cfg, textvariable=self._label_var, width=46, bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO, ).grid(row=2, column=2, columnspan=3, sticky="we", padx=4, pady=4) tk.Label(cfg, text="(optional)", bg=BG2, fg=FG_DIM, font=MONO_SM ).grid(row=2, column=5, sticky="w", padx=4) # Row 2 — full waveform toggle opts = tk.Frame(self, bg=BG2) opts.pack(side=tk.TOP, fill=tk.X, padx=6, pady=(0, 4)) self._full_wf_var = tk.BooleanVar(value=False) tk.Checkbutton( opts, text="Full waveform (download raw ADC samples too)", variable=self._full_wf_var, bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2, font=MONO, ).pack(side=tk.LEFT, padx=4) # Command button row cmd_row = tk.Frame(self, bg=BG2) cmd_row.pack(side=tk.TOP, fill=tk.X, padx=6, pady=(0, 4)) for label, cmd in [ ("Connect Only", "connect"), ("List Event Keys", "list_keys"), ("Download Events", "download_events"), ]: btn = tk.Button( cmd_row, text=label, bg=ACCENT, fg="#ffffff", relief="flat", padx=10, cursor="hand2", font=MONO, command=lambda c=cmd: self._run_command(c), ) btn.pack(side=tk.LEFT, padx=4) self._cmd_btns.append(btn) self._open_btn = tk.Button( cmd_row, text="Open in Analyzer", bg=BG3, fg=FG_DIM, relief="flat", padx=10, cursor="hand2", font=MONO, command=self._open_in_analyzer, state="disabled", ) self._open_btn.pack(side=tk.LEFT, padx=14) self._status_var = tk.StringVar(value="Ready") tk.Label(cmd_row, textvariable=self._status_var, bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=14) # Console output self._console = scrolledtext.ScrolledText( self, height=22, font=MONO_SM, bg=BG, fg=FG, insertbackground=FG, relief="flat", state="disabled", ) self._console.pack(fill=tk.BOTH, expand=True, padx=6, pady=4) self._console.tag_configure(self.TAG_TX, foreground=ACCENT) self._console.tag_configure(self.TAG_RX_RAW, foreground=COL_S3) self._console.tag_configure(self.TAG_PARSED, foreground=GREEN) self._console.tag_configure(self.TAG_ERROR, foreground=RED) self._console.tag_configure(self.TAG_STATUS, foreground=FG_DIM) self._console.tag_configure(self.TAG_HEAD, foreground=YELLOW, font=MONO_B) # Bottom bar bot = tk.Frame(self, bg=BG2) bot.pack(side=tk.BOTTOM, fill=tk.X, padx=6, pady=4) tk.Button(bot, text="Clear", bg=BG3, fg=FG, relief="flat", padx=10, cursor="hand2", font=MONO, command=self._clear_console).pack(side=tk.LEFT, padx=4) # ── transport toggle ────────────────────────────────────────────────── def _on_transport_change(self) -> None: if self._transport_var.get() == "tcp": self._serial_frame.grid_remove() self._tcp_frame.grid(row=0, column=2, sticky="w") else: self._tcp_frame.grid_remove() self._serial_frame.grid(row=0, column=2, sticky="w") def _choose_dir(self) -> None: d = filedialog.askdirectory(initialdir=self._dir_var.get()) if d: self._dir_var.set(d) # ── console helpers ─────────────────────────────────────────────────── def _append(self, text: str, tag: str = "status") -> None: self._console.configure(state="normal") self._console.insert(tk.END, text, tag) line_count = int(self._console.index("end-1c").split(".")[0]) if line_count > self.MAX_LINES: self._console.delete("1.0", f"{line_count - self.MAX_LINES}.0") self._console.see(tk.END) self._console.configure(state="disabled") def _clear_console(self) -> None: self._console.configure(state="normal") self._console.delete("1.0", tk.END) self._console.configure(state="disabled") # ── command dispatch ────────────────────────────────────────────────── def _set_buttons_state(self, state: str) -> None: for btn in self._cmd_btns: btn.configure(state=state) def _run_command(self, cmd: str) -> None: if self._running: return try: tcp_port = int(self._tcp_port_var.get().strip() or "9034") baud = int(self._baud_var.get().strip() or "38400") timeout = float(self._timeout_var.get().strip() or "60") except ValueError: messagebox.showerror("Error", "Invalid numeric field.") return cfg = { "transport": self._transport_var.get(), "host": self._host_var.get().strip(), "tcp_port": tcp_port, "port": self._port_var.get().strip(), "baud": baud, "timeout": timeout, "cmd": cmd, "out_dir": self._dir_var.get().strip() or ".", "label": self._label_var.get().strip(), "full_waveform": bool(self._full_wf_var.get()), } self._running = True self._set_buttons_state("disabled") self._status_var.set("Running…") threading.Thread(target=self._worker, args=(cfg,), daemon=True).start() # ── worker thread ───────────────────────────────────────────────────── def _worker(self, cfg: dict) -> None: q = self._q def post(kind: str, text: str) -> None: q.put((kind, text)) try: from minimateplus.transport import ( # noqa: WPS433 CapturingTransport, SerialTransport, TcpTransport, ) from minimateplus.client import MiniMateClient # noqa: WPS433 except ImportError as exc: post("error", f"Import error: {exc}\n") q.put(("done", None)) return # Build session directory with timestamp + optional label ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") sess_name = f"seismo_dl_{ts}" if cfg["label"]: safe_label = "".join( ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in cfg["label"] ) sess_name = f"{sess_name}_{safe_label}" try: session_dir = Path(cfg["out_dir"]) / sess_name session_dir.mkdir(parents=True, exist_ok=True) except OSError as exc: post("error", f"Cannot create session dir: {exc}\n") q.put(("done", None)) return bw_path = str(session_dir / f"raw_bw_{ts}.bin") s3_path = str(session_dir / f"raw_s3_{ts}.bin") # Build inner transport if cfg["transport"] == "tcp": host = cfg["host"] tcp_port = cfg["tcp_port"] post("status", f"Connecting {host}:{tcp_port} (TCP)…") inner = TcpTransport(host, tcp_port, connect_timeout=cfg["timeout"]) else: port = cfg["port"] baud = cfg["baud"] post("status", f"Opening {port} @ {baud} baud…") inner = SerialTransport(port, baud) transport = CapturingTransport(inner, bw_path, s3_path) post("head", f"\n── Session {sess_name} ─────────────────────────────\n") post("status", f"BW capture: {bw_path}") post("status", f"S3 capture: {s3_path}") client = MiniMateClient(transport=transport, timeout=cfg["timeout"]) success = False try: with client: post("head", "\n── connect() — POLL + serial + config + index ──\n") info = client.connect() post("parsed", f" serial: {info.serial!r}\n") if getattr(info, "firmware", None): post("parsed", f" firmware: {info.firmware!r}\n") if getattr(info, "model", None): post("parsed", f" model: {info.model!r}\n") if cfg["cmd"] == "connect": success = True elif cfg["cmd"] == "list_keys": post("head", "\n── list_event_keys() — browse 1E/0A/1F walk ──\n") keys = client.list_event_keys() if not keys: post("parsed", " (no events stored)\n") else: post("parsed", f" {len(keys)} event(s):\n") for i, k in enumerate(keys): post("parsed", f" [{i}] {k}\n") success = True elif cfg["cmd"] == "download_events": post("head", "\n── get_events() — full download ──────────────\n") if cfg["full_waveform"]: post("status", "Full-waveform mode (raw ADC samples).") events = client.get_events(full_waveform=cfg["full_waveform"]) post("parsed", f" downloaded {len(events)} event(s)\n") for ev in events: ts_str = ( ev.event_time.isoformat(sep=" ", timespec="seconds") if getattr(ev, "event_time", None) else "?" ) ppv = getattr(ev, "peaks", None) ppv_str = "" if ppv is not None: try: ppv_str = f" PPV={ppv.peak_vector_sum:.4f} in/s" except Exception: pass key = ( ev._waveform_key.hex() if getattr(ev, "_waveform_key", None) else "?" ) post("parsed", f" [{ev.index:2d}] key={key} {ts_str}{ppv_str}\n") success = True else: post("error", f"Unknown command: {cfg['cmd']}\n") post("status", "Done.") except Exception as exc: post("error", f"\nError: {exc}\n") finally: # Capture files are flushed on transport.disconnect() (via __exit__). try: bw_size = Path(bw_path).stat().st_size s3_size = Path(s3_path).stat().st_size post("status", f"Capture closed. BW={bw_size}B S3={s3_size}B") post("head", f"\n── Capture saved → {session_dir} ─────────\n") if success: q.put(("ready", (bw_path, s3_path, sess_name))) except OSError: pass q.put(("done", None)) # ── queue poll ──────────────────────────────────────────────────────── def _poll_q(self) -> None: try: while True: kind, payload = self._q.get_nowait() if kind == "tx": self._append(payload, self.TAG_TX) elif kind == "rx_raw": self._append(payload, self.TAG_RX_RAW) elif kind == "parsed": self._append(payload, self.TAG_PARSED) elif kind == "error": self._append(payload, self.TAG_ERROR) elif kind == "head": self._append(payload, self.TAG_HEAD) elif kind == "status": self._status_var.set(str(payload)) self._append(f" [{payload}]\n", self.TAG_STATUS) elif kind == "ready": bw_path, s3_path, label = payload self._last_paths = (bw_path, s3_path, label) self._open_btn.configure(state="normal", fg=FG) elif kind == "done": self._running = False self._set_buttons_state("normal") self._status_var.set("Ready") except queue.Empty: pass finally: self.after(100, self._poll_q) # ── analyzer hand-off ───────────────────────────────────────────────── def _open_in_analyzer(self) -> None: if not self._last_paths or not self._on_capture_ready: return bw_path, s3_path, label = self._last_paths self._on_capture_ready(bw_path, s3_path, label) # ───────────────────────────────────────────────────────────────────────────── # Main application window # ───────────────────────────────────────────────────────────────────────────── class SeismoLab(tk.Tk): def __init__(self) -> None: super().__init__() self.title("Seismo Lab") self.configure(bg=BG) self.minsize(1100, 680) self._db = FrameDB() style = ttk.Style() style.theme_use("clam") style.configure("Top.TNotebook", background=BG3, borderwidth=0, tabposition="nw") style.configure("Top.TNotebook.Tab", background=BG3, foreground=FG, font=("Consolas", 10, "bold"), padding=[16, 6]) style.map("Top.TNotebook.Tab", background=[("selected", ACCENT)], foreground=[("selected", "#ffffff")]) nb = ttk.Notebook(self, style="Top.TNotebook") nb.pack(fill=tk.BOTH, expand=True) self._bridge_panel = BridgePanel( nb, on_bridge_started=self._on_bridge_started, on_bridge_stopped=self._on_bridge_stopped, on_capture_started=self._on_capture_started, on_capture_complete=self._on_capture_complete, ) nb.add(self._bridge_panel, text=" Bridge ") self._analyzer_panel = AnalyzerPanel(nb, db=self._db) nb.add(self._analyzer_panel, text=" Analyzer ") self._console_panel = ConsolePanel( nb, on_send_to_analyzer=self._on_console_send_to_analyzer, ) nb.add(self._console_panel, text=" Console ") self._serial_watch_panel = SerialWatchPanel( nb, on_capture_ready=self._on_serial_capture_ready, ) nb.add(self._serial_watch_panel, text=" Serial Watch ") self._download_panel = DownloadPanel( nb, on_capture_ready=self._on_download_capture_ready, ) nb.add(self._download_panel, text=" Download ") self._nb = nb self.protocol("WM_DELETE_WINDOW", self._on_close) def _on_bridge_started(self, struct_bin: Optional[str] = None) -> None: """Bridge started — stash the structured bin path; stay on Bridge tab.""" if struct_bin: self._analyzer_panel.bin_var.set(struct_bin) def _on_bridge_stopped(self) -> None: self._analyzer_panel.stop_live() def _on_capture_started(self, bw_path: str, s3_path: str, label: str) -> None: """A capture began — wire up live mode in the Analyzer and switch tabs.""" self._analyzer_panel.set_live_files(bw_path, s3_path) self._nb.select(1) def _on_capture_complete(self, bw_path: str, s3_path: str, label: str) -> None: """A capture stopped — stop live mode, run full analysis, switch to Analyzer.""" self._analyzer_panel.stop_live() self._analyzer_panel.s3_var.set(s3_path) self._analyzer_panel.bw_var.set(bw_path) self._analyzer_panel._run_analyze() self._nb.select(1) def _on_console_send_to_analyzer(self, raw_s3_path: str) -> None: """Console captured bytes → inject into Analyzer S3 field and switch tab.""" self._analyzer_panel.s3_var.set(raw_s3_path) self._nb.select(1) def _on_serial_capture_ready(self, raw_s3_path: str) -> None: """Serial Watch capture finished → inject into Analyzer and switch tab.""" self._analyzer_panel.s3_var.set(raw_s3_path) self._nb.select(1) def _on_download_capture_ready(self, bw_path: str, s3_path: str, label: str) -> None: """Download capture done → load both BW + S3 files into Analyzer and run.""" self._analyzer_panel.stop_live() self._analyzer_panel.s3_var.set(s3_path) self._analyzer_panel.bw_var.set(bw_path) self._analyzer_panel._run_analyze() self._nb.select(1) def _on_close(self) -> None: self._bridge_panel.stop_bridge() self._serial_watch_panel._stop() self.destroy() # ───────────────────────────────────────────────────────────────────────────── def main() -> None: app = SeismoLab() app.mainloop() if __name__ == "__main__": main()