Files
seismo-relay/seismo_lab.py
T
Claude 6861d9ed97 Merge TCP mode into Bridge tab (Serial/TCP radio toggle)
Removes the separate 'TCP Capture' tab and folds TCP MITM capture directly
into the existing Bridge tab.  A Serial/TCP radio selector at the top swaps
the connection fields (COM ports vs. listen port + device host:port) while
keeping the same Start Bridge / Stop Bridge / Add Mark buttons, capture
checkboxes, log dir, and live log — identical UX for both modes.

https://claude.ai/code/session_014NczSHUz9uTzCAf4cVASTJ
2026-04-26 23:01:45 +00:00

2175 lines
100 KiB
Python

#!/usr/bin/env python3
"""
seismo_lab.py — Combined S3 Bridge + Protocol Analyzer + Device Console GUI.
Single window with three top-level tabs:
Bridge — capture live serial traffic (wraps s3_bridge.py as subprocess)
Analyzer — parse, diff, and query captured frames
Console — direct device connection; runs commands and shows raw bytes +
decoded output; colour-coded TX/RX console with log save and
Send-to-Analyzer support
When the bridge starts:
- raw tap paths are auto-filled in the Analyzer tab
- Live mode is automatically enabled so the Analyzer updates in real time
Run from anywhere:
python seismo_lab.py
"""
from __future__ import annotations
import datetime
import os
import queue
import socket
import subprocess
import sys
import threading
import tkinter as tk
from pathlib import Path
from tkinter import filedialog, messagebox, scrolledtext, simpledialog, ttk
from typing import Optional
# ── path setup ────────────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).parent
BRIDGE_PATH = SCRIPT_DIR / "bridges" / "s3-bridge" / "s3_bridge.py"
PARSERS_DIR = SCRIPT_DIR / "parsers"
sys.path.insert(0, str(PARSERS_DIR))
sys.path.insert(0, str(SCRIPT_DIR)) # for minimateplus package
from s3_analyzer import ( # noqa: E402
AnnotatedFrame,
FrameDiff,
Session,
annotate_frames,
diff_sessions,
format_hex_dump,
parse_bw,
parse_s3,
parse_structured_bin,
render_session_report,
split_into_sessions,
split_sessions_at_marks,
write_claude_export,
)
from frame_db import FrameDB # noqa: E402
# ── colour palette ────────────────────────────────────────────────────────────
BG = "#1e1e1e"
BG2 = "#252526"
BG3 = "#2d2d30"
FG = "#d4d4d4"
FG_DIM = "#6a6a6a"
ACCENT = "#569cd6"
RED = "#f44747"
YELLOW = "#dcdcaa"
GREEN = "#4caf50"
ORANGE = "#ce9178"
COL_BW = "#9cdcfe"
COL_S3 = "#4ec9b0"
COL_DIFF = "#f44747"
COL_KNOW = "#4caf50"
COL_HEAD = "#569cd6"
MONO = ("Consolas", 9)
MONO_SM = ("Consolas", 8)
MONO_B = ("Consolas", 9, "bold")
# ─────────────────────────────────────────────────────────────────────────────
# Shared state
# ─────────────────────────────────────────────────────────────────────────────
class AnalyzerState:
def __init__(self) -> None:
self.sessions: list[Session] = []
self.diffs: list[Optional[list[FrameDiff]]] = []
self.s3_path: Optional[Path] = None
self.bw_path: Optional[Path] = None
self.last_capture_id: Optional[int] = None
# ─────────────────────────────────────────────────────────────────────────────
# Bridge panel (tk.Frame — lives inside a notebook tab)
# ─────────────────────────────────────────────────────────────────────────────
class BridgePanel(tk.Frame):
"""
Bridge controls and live log output.
Two modes selectable at the top:
- Serial: wraps s3_bridge.py as a subprocess (two COM ports)
- TCP: MITM proxy — listens for an incoming Blastware connection,
forwards all bytes to the real device over IP, captures both
directions to raw .bin files
Calls on_bridge_started(raw_bw_path, raw_s3_path, struct_bin_path) when
traffic begins so the parent can wire up the Analyzer in live mode.
"""
def __init__(self, parent: tk.Widget, on_bridge_started, on_bridge_stopped, **kw):
super().__init__(parent, bg=BG2, **kw)
self._on_started = on_bridge_started
self._on_stopped = on_bridge_stopped
# serial state
self.process: Optional[subprocess.Popen] = None
# tcp state
self._server: Optional[socket.socket] = None
self._tcp_stop_event = threading.Event()
# unified log queue (serial reader thread + TCP pipe threads both push here)
self._log_q: queue.Queue[str] = queue.Queue()
# mode
self._mode = tk.StringVar(value="serial")
self._build()
self._poll_log_q()
# ── build ─────────────────────────────────────────────────────────────
def _build(self) -> None:
pad = {"padx": 6, "pady": 4}
cfg = tk.Frame(self, bg=BG2)
cfg.pack(side=tk.TOP, fill=tk.X, padx=4, pady=4)
# Row 0: mode selector
mode_row = tk.Frame(cfg, bg=BG2)
mode_row.grid(row=0, column=0, columnspan=6, sticky="w", padx=6, pady=(4, 0))
tk.Label(mode_row, text="Mode:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, padx=(0, 8))
tk.Radiobutton(mode_row, text="Serial", variable=self._mode, value="serial",
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO, command=self._on_mode_change).pack(side=tk.LEFT, padx=4)
tk.Radiobutton(mode_row, text="TCP", variable=self._mode, value="tcp",
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO, command=self._on_mode_change).pack(side=tk.LEFT, padx=4)
# Row 1a: serial connection fields (shown by default)
self._serial_frame = tk.Frame(cfg, bg=BG2)
self._serial_frame.grid(row=1, column=0, columnspan=6, sticky="w")
tk.Label(self._serial_frame, text="BW COM:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=0, sticky="e", **pad)
self.bw_var = tk.StringVar(value="COM4")
tk.Entry(self._serial_frame, textvariable=self.bw_var, width=10,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=0, column=1, sticky="w", **pad)
tk.Label(self._serial_frame, text="S3 COM:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=2, sticky="e", **pad)
self.s3_var = tk.StringVar(value="COM5")
tk.Entry(self._serial_frame, textvariable=self.s3_var, width=10,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=0, column=3, sticky="w", **pad)
tk.Label(self._serial_frame, text="Baud:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=4, sticky="e", **pad)
self.baud_var = tk.StringVar(value="38400")
tk.Entry(self._serial_frame, textvariable=self.baud_var, width=8,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=0, column=5, sticky="w", **pad)
# Row 1b: TCP connection fields (hidden until TCP mode selected)
self._tcp_frame = tk.Frame(cfg, bg=BG2)
tk.Label(self._tcp_frame, text="Listen port:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=0, sticky="e", **pad)
self.listen_port_var = tk.StringVar(value="9034")
tk.Entry(self._tcp_frame, textvariable=self.listen_port_var, width=8,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=0, column=1, sticky="w", **pad)
tk.Label(self._tcp_frame, text="Device host:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=2, sticky="e", **pad)
self.remote_host_var = tk.StringVar(value="63.43.212.232")
tk.Entry(self._tcp_frame, textvariable=self.remote_host_var, width=18,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=0, column=3, sticky="w", **pad)
tk.Label(self._tcp_frame, text="Port:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=4, sticky="e", **pad)
self.remote_port_var = tk.StringVar(value="9034")
tk.Entry(self._tcp_frame, textvariable=self.remote_port_var, width=8,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=0, column=5, sticky="w", **pad)
# Row 2: log dir
tk.Label(cfg, text="Log dir:", bg=BG2, fg=FG, font=MONO).grid(row=2, column=0, sticky="e", **pad)
self.logdir_var = tk.StringVar(value=str(SCRIPT_DIR / "bridges" / "captures"))
tk.Entry(cfg, textvariable=self.logdir_var, width=40,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=2, column=1, columnspan=4, sticky="we", **pad)
tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=self._choose_dir).grid(row=2, column=5, **pad)
# Row 3: raw capture checkboxes
self._raw_bw_on = tk.BooleanVar(value=True)
self._raw_s3_on = tk.BooleanVar(value=True)
tk.Checkbutton(cfg, text="Capture BW->S3 raw", variable=self._raw_bw_on,
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO).grid(row=3, column=0, columnspan=2, sticky="w", **pad)
tk.Checkbutton(cfg, text="Capture S3->BW raw", variable=self._raw_s3_on,
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO).grid(row=3, column=2, columnspan=2, sticky="w", **pad)
# Buttons + status
btn_row = tk.Frame(self, bg=BG2)
btn_row.pack(side=tk.TOP, fill=tk.X, padx=4, pady=2)
self.start_btn = tk.Button(btn_row, text="Start Bridge", bg=GREEN, fg="#000000",
relief="flat", padx=12, cursor="hand2", font=MONO_B,
command=self.start_bridge)
self.start_btn.pack(side=tk.LEFT, padx=6)
self.stop_btn = tk.Button(btn_row, text="Stop Bridge", bg=BG3, fg=FG,
relief="flat", padx=12, cursor="hand2", font=MONO,
command=self.stop_bridge, state="disabled")
self.stop_btn.pack(side=tk.LEFT, padx=4)
self.mark_btn = tk.Button(btn_row, text="Add Mark", bg=BG3, fg=FG,
relief="flat", padx=10, cursor="hand2", font=MONO,
command=self.add_mark, state="disabled")
self.mark_btn.pack(side=tk.LEFT, padx=4)
self.status_var = tk.StringVar(value="Idle")
tk.Label(btn_row, textvariable=self.status_var,
bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=10)
# Log output
self.log_view = scrolledtext.ScrolledText(
self, height=18, font=MONO_SM,
bg=BG, fg=FG, insertbackground=FG,
relief="flat", state="disabled",
)
self.log_view.pack(fill=tk.BOTH, expand=True, padx=4, pady=4)
# ── helpers ───────────────────────────────────────────────────────────
def _on_mode_change(self) -> None:
if self._mode.get() == "serial":
self._tcp_frame.grid_remove()
self._serial_frame.grid(row=1, column=0, columnspan=6, sticky="w")
else:
self._serial_frame.grid_remove()
self._tcp_frame.grid(row=1, column=0, columnspan=6, sticky="w")
def _choose_dir(self) -> None:
path = filedialog.askdirectory(initialdir=self.logdir_var.get())
if path:
self.logdir_var.set(path)
def _append_log(self, text: str) -> None:
self.log_view.configure(state="normal")
self.log_view.insert(tk.END, text)
self.log_view.see(tk.END)
self.log_view.configure(state="disabled")
# ── unified log-queue polling (serial subprocess + TCP threads both push here)
def _poll_log_q(self) -> None:
try:
while True:
msg = self._log_q.get_nowait()
if msg == "<<exit>>":
self._bridge_ended()
self._on_stopped()
elif msg == "<<session_ended>>":
if self._server is not None:
self.status_var.set(f"Listening on :{self.listen_port_var.get()}")
self._on_stopped()
else:
self._append_log(msg)
except queue.Empty:
pass
finally:
self.after(100, self._poll_log_q)
# ── bridge control (delegates to serial or TCP) ───────────────────────
def start_bridge(self) -> None:
if self._mode.get() == "tcp":
self._start_tcp()
else:
self._start_serial()
def stop_bridge(self) -> None:
if self._mode.get() == "tcp":
self._stop_tcp()
else:
self._stop_serial()
def _bridge_ended(self) -> None:
self.status_var.set("Stopped")
self.start_btn.configure(state="normal")
self.stop_btn.configure(state="disabled", bg=BG3)
self.mark_btn.configure(state="disabled")
self._append_log("== Bridge stopped ==\n")
# ── serial mode ───────────────────────────────────────────────────────
def _start_serial(self) -> None:
if self.process and self.process.poll() is None:
messagebox.showinfo("Bridge", "Bridge is already running.")
return
bw = self.bw_var.get().strip()
s3 = self.s3_var.get().strip()
baud = self.baud_var.get().strip()
logdir = self.logdir_var.get().strip() or "."
if not bw or not s3:
messagebox.showerror("Error", "Please enter both BW and S3 COM ports.")
return
os.makedirs(logdir, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
args = [sys.executable, str(BRIDGE_PATH),
"--bw", bw, "--s3", s3, "--baud", baud, "--logdir", logdir]
raw_bw_path = raw_s3_path = None
if self._raw_bw_on.get():
raw_bw_path = os.path.join(logdir, f"raw_bw_{ts}.bin")
args += ["--raw-bw", raw_bw_path]
if self._raw_s3_on.get():
raw_s3_path = os.path.join(logdir, f"raw_s3_{ts}.bin")
args += ["--raw-s3", raw_s3_path]
struct_bin_path = os.path.join(logdir, f"s3_session_{ts}.bin")
try:
self.process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE,
text=True,
bufsize=1,
)
except Exception as e:
messagebox.showerror("Error", f"Failed to start bridge:\n{e}")
return
threading.Thread(target=self._reader_thread, daemon=True).start()
self.status_var.set(f"Running — {bw} <-> {s3}")
self.start_btn.configure(state="disabled")
self.stop_btn.configure(state="normal", bg=RED)
self.mark_btn.configure(state="normal")
self._append_log(f"== Bridge started [{ts}] ==\n")
self._on_started(raw_bw_path, raw_s3_path, struct_bin_path)
def _stop_serial(self) -> None:
if self.process and self.process.poll() is None:
self.process.terminate()
try:
self.process.wait(timeout=3)
except subprocess.TimeoutExpired:
self.process.kill()
self._bridge_ended()
self._on_stopped()
def _reader_thread(self) -> None:
if not self.process or not self.process.stdout:
return
for line in self.process.stdout:
self._log_q.put(line)
self._log_q.put("<<exit>>")
# ── TCP mode ──────────────────────────────────────────────────────────
def _start_tcp(self) -> None:
if self._server is not None:
messagebox.showinfo("Bridge", "TCP bridge is already listening.")
return
try:
listen_port = int(self.listen_port_var.get().strip())
remote_host = self.remote_host_var.get().strip()
remote_port = int(self.remote_port_var.get().strip())
except ValueError:
messagebox.showerror("Error", "Invalid port number.")
return
if not remote_host:
messagebox.showerror("Error", "Please enter the device host.")
return
try:
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(("0.0.0.0", listen_port))
srv.listen(5)
srv.settimeout(1.0)
except OSError as e:
messagebox.showerror("Error", f"Cannot bind to port {listen_port}:\n{e}")
return
self._server = srv
self._tcp_stop_event.clear()
self.start_btn.configure(state="disabled")
self.stop_btn.configure(state="normal", bg=RED)
self.mark_btn.configure(state="normal")
self.status_var.set(f"Listening on :{listen_port}")
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
self._append_log(
f"== TCP Bridge started [{ts}]\n"
f" Listening on 0.0.0.0:{listen_port}\n"
f" Forwarding to {remote_host}:{remote_port}\n==\n"
)
logdir = self.logdir_var.get().strip() or "."
raw_bw_on = self._raw_bw_on.get()
raw_s3_on = self._raw_s3_on.get()
threading.Thread(
target=self._accept_loop,
args=(srv, remote_host, remote_port, logdir, raw_bw_on, raw_s3_on),
daemon=True,
).start()
def _stop_tcp(self) -> None:
self._tcp_stop_event.set()
if self._server:
try:
self._server.close()
except OSError:
pass
self._server = None
self._bridge_ended()
self._on_stopped()
def _accept_loop(self, srv: socket.socket, remote_host: str, remote_port: int,
logdir: str, raw_bw_on: bool, raw_s3_on: bool) -> None:
while not self._tcp_stop_event.is_set():
try:
client_sock, addr = srv.accept()
except socket.timeout:
continue
except OSError:
break
peer = f"{addr[0]}:{addr[1]}"
self._log_q.put(f"[TCP] Blastware connected from {peer}\n")
try:
dev_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dev_sock.connect((remote_host, remote_port))
except OSError as e:
self._log_q.put(f"[TCP] Cannot reach device {remote_host}:{remote_port}: {e}\n")
client_sock.close()
continue
self._log_q.put(f"[TCP] Connected to device at {remote_host}:{remote_port}\n")
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
os.makedirs(logdir, exist_ok=True)
raw_bw_path = os.path.join(logdir, f"raw_bw_{ts}.bin") if raw_bw_on else None
raw_s3_path = os.path.join(logdir, f"raw_s3_{ts}.bin") if raw_s3_on else None
self.after(0, self._notify_tcp_session_start,
raw_bw_path, raw_s3_path, peer, remote_host, remote_port)
self._run_tcp_session(client_sock, dev_sock, raw_bw_path, raw_s3_path, ts)
self._log_q.put("<<session_ended>>")
def _notify_tcp_session_start(self, raw_bw_path, raw_s3_path,
peer, remote_host, remote_port) -> None:
self.status_var.set(f"Active: {peer}{remote_host}:{remote_port}")
self._on_started(raw_bw_path, raw_s3_path, None)
def _run_tcp_session(self, bw_sock: socket.socket, dev_sock: socket.socket,
raw_bw_path: Optional[str], raw_s3_path: Optional[str],
ts: str) -> None:
bw_fh = open(raw_bw_path, "wb") if raw_bw_path else None
s3_fh = open(raw_s3_path, "wb") if raw_s3_path else None
bw_bytes = [0]
s3_bytes = [0]
def _pipe(src, dst, fh, counter):
try:
while True:
data = src.recv(4096)
if not data:
break
dst.sendall(data)
if fh:
fh.write(data)
fh.flush()
counter[0] += len(data)
except OSError:
pass
finally:
try:
dst.shutdown(socket.SHUT_WR)
except OSError:
pass
t_bw = threading.Thread(target=_pipe, args=(bw_sock, dev_sock, bw_fh, bw_bytes), daemon=True)
t_s3 = threading.Thread(target=_pipe, args=(dev_sock, bw_sock, s3_fh, s3_bytes), daemon=True)
t_bw.start()
t_s3.start()
t_bw.join()
t_s3.join()
bw_sock.close()
dev_sock.close()
if bw_fh:
bw_fh.close()
if s3_fh:
s3_fh.close()
self._log_q.put(
f"[TCP] Session {ts} done "
f"BW→dev: {bw_bytes[0]} bytes dev→BW: {s3_bytes[0]} bytes\n"
)
if raw_bw_path:
self._log_q.put(f"[TCP] BW capture: {raw_bw_path}\n")
if raw_s3_path:
self._log_q.put(f"[TCP] S3 capture: {raw_s3_path}\n")
# ── marks ─────────────────────────────────────────────────────────────
def add_mark(self) -> None:
label = simpledialog.askstring("Mark", "Enter label for this mark:", parent=self)
if not label or not label.strip():
return
if self._mode.get() == "tcp":
ts = datetime.datetime.now().strftime("%H:%M:%S")
self._append_log(f"[MARK {ts}] {label.strip()}\n")
else:
if not self.process or not self.process.stdin or self.process.poll() is not None:
return
try:
self.process.stdin.write("m\n")
self.process.stdin.write(label.strip() + "\n")
self.process.stdin.flush()
self._append_log(f"[MARK] {label.strip()}\n")
except Exception as e:
messagebox.showerror("Error", f"Failed to send mark:\n{e}")
# ─────────────────────────────────────────────────────────────────────────────
# Analyzer panel (tk.Frame — lives inside a notebook tab)
# Extracted from gui_analyzer.py; accepts external path injection.
# ─────────────────────────────────────────────────────────────────────────────
class AnalyzerPanel(tk.Frame):
def __init__(self, parent: tk.Widget, db: FrameDB, **kw):
super().__init__(parent, bg=BG, **kw)
self._db = db
self.state = AnalyzerState()
self._live_thread: Optional[threading.Thread] = None
self._live_stop = threading.Event()
self._live_q: queue.Queue[str] = queue.Queue()
self._build()
self._poll_live_queue()
# ── external API (called by parent when bridge starts/stops) ──────────
def set_live_files(self, raw_bw: Optional[str], raw_s3: Optional[str],
struct_bin: Optional[str] = None) -> None:
"""Called when the bridge starts — inject file paths and start live mode."""
if raw_s3:
self.s3_var.set(raw_s3)
if raw_bw:
self.bw_var.set(raw_bw)
if struct_bin:
self.bin_var.set(struct_bin)
if raw_s3 and raw_bw:
self._start_live()
def stop_live(self) -> None:
"""Called when the bridge stops."""
if self._live_thread and self._live_thread.is_alive():
self._live_stop.set()
self._set_live_btn_off()
# ── build ─────────────────────────────────────────────────────────────
def _build(self) -> None:
self._build_toolbar()
self._build_panes()
self._build_statusbar()
def _build_toolbar(self) -> None:
bar = tk.Frame(self, bg=BG2, pady=4)
bar.pack(side=tk.TOP, fill=tk.X)
pad = {"padx": 5, "pady": 2}
# Row 1: raw files
row1 = tk.Frame(bar, bg=BG2)
row1.pack(side=tk.TOP, fill=tk.X)
tk.Label(row1, text="S3 raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self.s3_var = tk.StringVar()
tk.Entry(row1, textvariable=self.s3_var, width=30, bg=BG3, fg=FG,
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
tk.Button(row1, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=lambda: self._browse(self.s3_var, "raw_s3.bin")
).pack(side=tk.LEFT, **pad)
tk.Label(row1, text=" BW raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self.bw_var = tk.StringVar()
tk.Entry(row1, textvariable=self.bw_var, width=30, bg=BG3, fg=FG,
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
tk.Button(row1, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=lambda: self._browse(self.bw_var, "raw_bw.bin")
).pack(side=tk.LEFT, **pad)
# Row 2: structured bin (optional — enables mark-based session splitting)
row2 = tk.Frame(bar, bg=BG2)
row2.pack(side=tk.TOP, fill=tk.X)
tk.Label(row2, text="Session .bin:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self.bin_var = tk.StringVar()
tk.Entry(row2, textvariable=self.bin_var, width=46, bg=BG3, fg=FG,
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
tk.Button(row2, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=lambda: self._browse(self.bin_var, "s3_session.bin")
).pack(side=tk.LEFT, **pad)
tk.Label(row2, text="(optional — splits sessions at marks)", bg=BG2, fg=FG_DIM,
font=MONO_SM).pack(side=tk.LEFT, padx=6)
# Row 3: buttons
bar = tk.Frame(bar, bg=BG2)
bar.pack(side=tk.TOP, fill=tk.X)
tk.Frame(bar, bg=BG2, width=10).pack(side=tk.LEFT)
self.analyze_btn = tk.Button(bar, text="Analyze", bg=ACCENT, fg="#ffffff",
relief="flat", padx=10, cursor="hand2", font=MONO_B,
command=self._run_analyze)
self.analyze_btn.pack(side=tk.LEFT, **pad)
self.live_btn = tk.Button(bar, text="Live: OFF", bg=BG3, fg=FG,
relief="flat", padx=10, cursor="hand2",
font=MONO, command=self._toggle_live)
self.live_btn.pack(side=tk.LEFT, **pad)
self.export_btn = tk.Button(bar, text="Export for Claude", bg=ORANGE, fg="#000000",
relief="flat", padx=10, cursor="hand2", font=MONO_B,
command=self._run_export, state="disabled")
self.export_btn.pack(side=tk.LEFT, **pad)
self.status_var = tk.StringVar(value="Idle")
tk.Label(bar, textvariable=self.status_var, bg=BG2, fg=FG_DIM,
font=MONO, anchor="w").pack(side=tk.LEFT, padx=10)
def _build_panes(self) -> None:
pane = tk.PanedWindow(self, orient=tk.HORIZONTAL, bg=BG,
sashwidth=4, sashrelief="flat")
pane.pack(fill=tk.BOTH, expand=True)
# Left: session tree
left = tk.Frame(pane, bg=BG2, width=260)
pane.add(left, minsize=200)
tk.Label(left, text="Sessions", bg=BG2, fg=ACCENT,
font=MONO_B, anchor="w", padx=6).pack(fill=tk.X)
tree_frame = tk.Frame(left, bg=BG2)
tree_frame.pack(fill=tk.BOTH, expand=True)
style = ttk.Style()
style.theme_use("clam")
style.configure("Treeview", background=BG2, foreground=FG,
fieldbackground=BG2, font=MONO_SM, rowheight=18, borderwidth=0)
style.configure("Treeview.Heading", background=BG3, foreground=ACCENT, font=MONO_SM)
style.map("Treeview", background=[("selected", BG3)],
foreground=[("selected", "#ffffff")])
self.tree = ttk.Treeview(tree_frame, columns=("info",), show="tree headings",
selectmode="browse")
self.tree.heading("#0", text="Frame")
self.tree.heading("info", text="Info")
self.tree.column("#0", width=160, stretch=True)
self.tree.column("info", width=80, stretch=False)
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.tree.yview)
self.tree.configure(yscrollcommand=vsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
self.tree.pack(fill=tk.BOTH, expand=True)
self.tree.tag_configure("session", foreground=ACCENT, font=MONO_B)
self.tree.tag_configure("bw_frame", foreground=COL_BW)
self.tree.tag_configure("s3_frame", foreground=COL_S3)
self.tree.tag_configure("bad_chk", foreground=RED)
self.tree.tag_configure("malformed", foreground=RED)
self.tree.bind("<<TreeviewSelect>>", self._on_tree_select)
# Right: detail notebook
right = tk.Frame(pane, bg=BG)
pane.add(right, minsize=600)
style.configure("TNotebook", background=BG2, borderwidth=0)
style.configure("TNotebook.Tab", background=BG3, foreground=FG,
font=MONO, padding=[8, 2])
style.map("TNotebook.Tab", background=[("selected", BG)],
foreground=[("selected", ACCENT)])
self.nb = ttk.Notebook(right)
self.nb.pack(fill=tk.BOTH, expand=True)
self.inv_text = self._make_text_tab("Inventory")
self.hex_text = self._make_text_tab("Hex Dump")
self.diff_text = self._make_text_tab("Diff")
self.report_text = self._make_text_tab("Full Report")
self._build_query_tab()
for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text):
w.tag_configure("head", foreground=COL_HEAD, font=MONO_B)
w.tag_configure("bw", foreground=COL_BW)
w.tag_configure("s3", foreground=COL_S3)
w.tag_configure("changed", foreground=COL_DIFF)
w.tag_configure("known", foreground=COL_KNOW)
w.tag_configure("dim", foreground=FG_DIM)
w.tag_configure("normal", foreground=FG)
w.tag_configure("warn", foreground=YELLOW)
w.tag_configure("addr", foreground=ORANGE)
def _make_text_tab(self, title: str) -> tk.Text:
frame = tk.Frame(self.nb, bg=BG)
self.nb.add(frame, text=title)
w = tk.Text(frame, bg=BG, fg=FG, font=MONO, state="disabled",
relief="flat", wrap="none", insertbackground=FG,
selectbackground=BG3, selectforeground="#ffffff")
vsb = ttk.Scrollbar(frame, orient="vertical", command=w.yview)
hsb = ttk.Scrollbar(frame, orient="horizontal", command=w.xview)
w.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
hsb.pack(side=tk.BOTTOM, fill=tk.X)
w.pack(fill=tk.BOTH, expand=True)
return w
def _build_query_tab(self) -> None:
frame = tk.Frame(self.nb, bg=BG)
self.nb.add(frame, text="Query DB")
pad = {"padx": 4, "pady": 2}
filt = tk.Frame(frame, bg=BG2, pady=4)
filt.pack(side=tk.TOP, fill=tk.X)
tk.Label(filt, text="Capture:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=0, sticky="e", **pad)
self._q_capture_var = tk.StringVar(value="All")
self._q_capture_cb = ttk.Combobox(filt, textvariable=self._q_capture_var,
width=20, font=MONO_SM, state="readonly")
self._q_capture_cb.grid(row=0, column=1, sticky="w", **pad)
tk.Label(filt, text="Dir:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=2, sticky="e", **pad)
self._q_dir_var = tk.StringVar(value="All")
ttk.Combobox(filt, textvariable=self._q_dir_var, values=["All", "BW", "S3"],
width=6, font=MONO_SM, state="readonly").grid(row=0, column=3, sticky="w", **pad)
tk.Label(filt, text="SUB:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=4, sticky="e", **pad)
self._q_sub_var = tk.StringVar(value="All")
self._q_sub_cb = ttk.Combobox(filt, textvariable=self._q_sub_var,
width=12, font=MONO_SM, state="readonly")
self._q_sub_cb.grid(row=0, column=5, sticky="w", **pad)
tk.Label(filt, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=6, sticky="e", **pad)
self._q_offset_var = tk.StringVar()
tk.Entry(filt, textvariable=self._q_offset_var, width=8, bg=BG3, fg=FG,
font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=7, sticky="w", **pad)
tk.Label(filt, text="Value:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=8, sticky="e", **pad)
self._q_value_var = tk.StringVar()
tk.Entry(filt, textvariable=self._q_value_var, width=8, bg=BG3, fg=FG,
font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=9, sticky="w", **pad)
tk.Button(filt, text="Run Query", bg=ACCENT, fg="#ffffff", relief="flat",
padx=8, cursor="hand2", font=("Consolas", 8, "bold"),
command=self._run_db_query).grid(row=0, column=10, padx=8)
tk.Button(filt, text="Refresh", bg=BG3, fg=FG, relief="flat",
padx=6, cursor="hand2", font=MONO_SM,
command=self._refresh_query_dropdowns).grid(row=0, column=11, padx=4)
self._q_stats_var = tk.StringVar(value="DB: —")
tk.Label(filt, textvariable=self._q_stats_var, bg=BG2, fg=FG_DIM,
font=MONO_SM).grid(row=0, column=12, padx=12, sticky="w")
res_frame = tk.Frame(frame, bg=BG)
res_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
cols = ("cap", "sess", "dir", "sub", "sub_name", "page", "len", "chk")
self._q_tree = ttk.Treeview(res_frame, columns=cols, show="headings", selectmode="browse")
for cid, head, w in [
("cap", "Cap", 40), ("sess", "Sess", 40), ("dir", "Dir", 40),
("sub", "SUB", 50), ("sub_name", "Name", 160), ("page", "Page", 60),
("len", "Len", 50), ("chk", "Chk", 50),
]:
self._q_tree.heading(cid, text=head, anchor="w")
self._q_tree.column(cid, width=w, stretch=(cid == "sub_name"))
q_vsb = ttk.Scrollbar(res_frame, orient="vertical", command=self._q_tree.yview)
q_hsb = ttk.Scrollbar(res_frame, orient="horizontal", command=self._q_tree.xview)
self._q_tree.configure(yscrollcommand=q_vsb.set, xscrollcommand=q_hsb.set)
q_vsb.pack(side=tk.RIGHT, fill=tk.Y)
q_hsb.pack(side=tk.BOTTOM, fill=tk.X)
self._q_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self._q_tree.tag_configure("bw_row", foreground=COL_BW)
self._q_tree.tag_configure("s3_row", foreground=COL_S3)
self._q_tree.tag_configure("bad_row", foreground=RED)
self._q_tree.bind("<<TreeviewSelect>>", lambda _e: self._run_interpret())
interp_frame = tk.Frame(frame, bg=BG2, height=120)
interp_frame.pack(side=tk.BOTTOM, fill=tk.X)
interp_frame.pack_propagate(False)
tk.Label(interp_frame, text="Byte interpretation:", bg=BG2, fg=ACCENT,
font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X)
ii = tk.Frame(interp_frame, bg=BG2)
ii.pack(fill=tk.X, padx=6, pady=2)
tk.Label(ii, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).pack(side=tk.LEFT)
self._interp_offset_var = tk.StringVar(value="5")
tk.Entry(ii, textvariable=self._interp_offset_var, width=6, bg=BG3, fg=FG,
font=MONO_SM, insertbackground=FG, relief="flat").pack(side=tk.LEFT, padx=4)
tk.Button(ii, text="Interpret", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO_SM, command=self._run_interpret).pack(side=tk.LEFT, padx=4)
self._interp_text = tk.Text(interp_frame, bg=BG2, fg=FG, font=MONO_SM,
height=4, relief="flat", state="disabled",
insertbackground=FG)
self._interp_text.pack(fill=tk.X, padx=6, pady=2)
self._interp_text.tag_configure("label", foreground=FG_DIM)
self._interp_text.tag_configure("value", foreground=YELLOW)
self._q_rows: dict[str, object] = {}
self._q_capture_rows: list = [None]
self._q_sub_values: list = [None]
self._refresh_query_dropdowns()
def _build_statusbar(self) -> None:
bar = tk.Frame(self, bg=BG3, height=20)
bar.pack(side=tk.BOTTOM, fill=tk.X)
self.sb_var = tk.StringVar(value="Ready")
tk.Label(bar, textvariable=self.sb_var, bg=BG3, fg=FG_DIM,
font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X)
# ── file picking ──────────────────────────────────────────────────────
def _browse(self, var: tk.StringVar, default: str) -> None:
path = filedialog.askopenfilename(
title=f"Select {default}",
filetypes=[("Binary", "*.bin"), ("All files", "*.*")],
initialfile=default,
)
if path:
var.set(path)
# ── analysis ──────────────────────────────────────────────────────────
def _run_analyze(self) -> None:
s3p = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None
bwp = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None
if not s3p or not bwp:
messagebox.showerror("Missing files", "Select both S3 and BW raw files.")
return
if not s3p.exists():
messagebox.showerror("Not found", f"S3 file not found:\n{s3p}")
return
if not bwp.exists():
messagebox.showerror("Not found", f"BW file not found:\n{bwp}")
return
self.state.s3_path = s3p
self.state.bw_path = bwp
self._do_analyze(s3p, bwp)
def _browse_bin(self) -> None:
path = filedialog.askopenfilename(
title="Select session .bin file",
filetypes=[("Binary", "*.bin"), ("All files", "*.*")],
)
if path:
self.bin_var.set(path)
def _do_analyze(self, s3_path: Path, bw_path: Path) -> None:
self.status_var.set("Parsing...")
self.update_idletasks()
s3_blob = s3_path.read_bytes()
bw_blob = bw_path.read_bytes()
# Use mark-based session splitting if a structured .bin is provided
bin_str = self.bin_var.get().strip()
bin_path = Path(bin_str) if bin_str else None
marks = []
if bin_path and bin_path.exists():
marks = parse_structured_bin(bin_path.read_bytes())
if marks:
sessions = split_sessions_at_marks(bw_blob, s3_blob, marks)
mark_labels = " | ".join(m.label for m in marks)
self.sb_var.set(f"{len(marks)} user mark(s): {mark_labels}")
self.update_idletasks()
else:
if bin_path and bin_path.exists():
self.sb_var.set("No user marks found in session .bin — using standard session detection")
s3_frames = annotate_frames(parse_s3(s3_blob, trailer_len=0), "S3")
bw_frames = annotate_frames(parse_bw(bw_blob, trailer_len=0,
validate_checksum=True), "BW")
sessions = split_into_sessions(bw_frames, s3_frames)
diffs: list[Optional[list[FrameDiff]]] = [None]
for i in range(1, len(sessions)):
diffs.append(diff_sessions(sessions[i - 1], sessions[i]))
self.state.sessions = sessions
self.state.diffs = diffs
n_s3 = sum(len(s.s3_frames) for s in sessions)
n_bw = sum(len(s.bw_frames) for s in sessions)
self.status_var.set(f"{len(sessions)} sessions | BW:{n_bw} S3:{n_s3}")
self.sb_var.set(f"Loaded: {s3_path.name} + {bw_path.name}")
self.export_btn.configure(state="normal")
self._rebuild_tree()
try:
cap_id = self._db.ingest(sessions, s3_path, bw_path)
if cap_id is not None:
self.state.last_capture_id = cap_id
self._refresh_query_dropdowns()
for i, lbl in enumerate(self._q_capture_cb["values"]):
if lbl.startswith(f"#{cap_id} "):
self._q_capture_cb.current(i)
break
except Exception as exc:
self.sb_var.set(f"DB ingest error: {exc}")
def _run_export(self) -> None:
if not self.state.sessions:
messagebox.showinfo("Export", "Run Analyze first.")
return
outdir = self.state.s3_path.parent if self.state.s3_path else Path(".")
out_path = write_claude_export(self.state.sessions, self.state.diffs,
outdir, self.state.s3_path, self.state.bw_path)
self.sb_var.set(f"Exported: {out_path.name}")
if messagebox.askyesno("Export complete", f"Saved to:\n{out_path}\n\nOpen folder?"):
import subprocess as sp
sp.Popen(["explorer", str(out_path.parent)])
# ── session tree ──────────────────────────────────────────────────────
def _rebuild_tree(self) -> None:
self.tree.delete(*self.tree.get_children())
for sess in self.state.sessions:
label = f"Session {sess.index}" + ("" if sess.is_complete() else " [partial]")
n_diff = len(self.state.diffs[sess.index] or [])
diff_str = f"{n_diff} changes" if n_diff else ""
sid = self.tree.insert("", tk.END, text=label,
values=(diff_str,), tags=("session",))
for af in sess.all_frames:
src_tag = "bw_frame" if af.source == "BW" else "s3_frame"
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
lbl_text = f"[{af.source}] {sub_hex} {af.sub_name}"
extra = ""
tags = (src_tag,)
if af.frame.checksum_valid is False:
extra = "BAD CHK"; tags = ("bad_chk",)
elif af.header is None:
lbl_text = f"[{af.source}] MALFORMED"; tags = ("malformed",)
self.tree.insert(sid, tk.END, text=lbl_text, values=(extra,), tags=tags,
iid=f"frame_{sess.index}_{af.frame.index}_{af.source}")
for item in self.tree.get_children():
self.tree.item(item, open=True)
def _on_tree_select(self, _e: tk.Event) -> None:
sel = self.tree.selection()
if not sel:
return
iid = sel[0]
if iid.startswith("frame_"):
parts = iid.split("_")
self._show_frame_detail(int(parts[1]), int(parts[2]), parts[3])
else:
text = self.tree.item(iid, "text")
try:
self._show_session_detail(int(text.split()[1]))
except (IndexError, ValueError):
pass
def _find_frame_by_sub(self, sess_idx: int, sub: int, page_key: int) -> Optional[AnnotatedFrame]:
"""Find a frame in a session by (sub, page_key) — used for diff click-through."""
if sess_idx >= len(self.state.sessions):
return None
sess = self.state.sessions[sess_idx]
for af in sess.bw_frames + sess.s3_frames:
if af.header and af.header.sub == sub and af.header.page_key == page_key:
return af
return None
def _find_frame(self, sess_idx: int, frame_idx: int, source: str) -> Optional[AnnotatedFrame]:
if sess_idx >= len(self.state.sessions):
return None
pool = (self.state.sessions[sess_idx].bw_frames if source == "BW"
else self.state.sessions[sess_idx].s3_frames)
return next((af for af in pool if af.frame.index == frame_idx), None)
# ── detail renderers ──────────────────────────────────────────────────
def _clear_tabs(self) -> None:
for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text):
self._tc(w)
def _show_session_detail(self, idx: int) -> None:
if idx >= len(self.state.sessions):
return
sess = self.state.sessions[idx]
diffs = self.state.diffs[idx]
self._clear_tabs()
w = self.inv_text
self._tw(w, f"SESSION {sess.index}\n", "head")
self._tw(w, f"Frames: {len(sess.bw_frames)+len(sess.s3_frames)}"
f" (BW:{len(sess.bw_frames)} S3:{len(sess.s3_frames)})\n", "normal")
if len(sess.bw_frames) != len(sess.s3_frames):
self._tw(w, " WARNING: BW/S3 count mismatch\n", "warn")
self._tn(w)
for i, af in enumerate(sess.all_frames):
src = "bw" if af.source == "BW" else "s3"
sh = f"{af.header.sub:02X}" if af.header else "??"
pg = f" (page {af.header.page_key:04X})" if af.header and af.header.page_key else ""
chk = ""
if af.frame.checksum_valid is False: chk = " [BAD CHECKSUM]"
elif af.frame.checksum_valid is True: chk = f" [{af.frame.checksum_type}]"
self._tw(w, f" [{af.source}] #{i:<3} ", src)
self._tw(w, f"SUB={sh} ", "addr")
self._tw(w, f"{af.sub_name:<30}{pg} len={len(af.frame.payload)}", "dim")
if chk: self._tw(w, chk, "warn" if af.frame.checksum_valid is False else "dim")
self._tn(w)
w = self.diff_text
self._tc(w)
if diffs is None:
self._tw(w, "(No previous session to diff against)\n", "dim")
elif not diffs:
self._tw(w, f"DIFF vs SESSION {idx-1}\n", "head")
self._tw(w, " No changes detected.\n", "dim")
else:
self._tw(w, f"DIFF vs SESSION {idx-1}\n", "head")
self._tw(w, " (click any SUB header to open its hex dump)\n", "dim")
for fd in diffs:
pg = f" (page {fd.page_key:04X})" if fd.page_key else ""
link_tag = f"difflink_{fd.sub}_{fd.page_key}"
w.configure(state="normal")
start = w.index(tk.INSERT)
w.insert(tk.END, f"\n SUB {fd.sub:02X} ({fd.sub_name}){pg}:\n")
end = w.index(tk.INSERT)
w.tag_add("addr", start, end)
w.tag_add(link_tag, start, end)
w.tag_configure(link_tag, underline=True)
# capture fd values for the closure
def _make_handler(s=idx, sub=fd.sub, pk=fd.page_key):
def _handler(event):
af = self._find_frame_by_sub(s, sub, pk)
if af:
self._show_frame_detail(s, af.frame.index, af.source)
return _handler
w.tag_bind(link_tag, "<Button-1>", _make_handler())
w.tag_bind(link_tag, "<Enter>", lambda e, t=link_tag: w.configure(cursor="hand2"))
w.tag_bind(link_tag, "<Leave>", lambda e: w.configure(cursor=""))
w.configure(state="disabled")
for bd in fd.diffs:
def _fmt(v: int) -> str:
if v == -2: return "ADD"
if v == -1: return "--"
return f"{v:02x}"
b, a = _fmt(bd.before), _fmt(bd.after)
# A4 inner-frame add/remove: field_name carries the full description
if bd.field_name and (bd.before == -2 or bd.after == -2 or bd.before == -1 or bd.after == -1) and bd.field_name.startswith("[A4"):
self._tw(w, f" {b} -> {a} ", "changed")
self._tw(w, bd.field_name, "known")
self._tn(w)
else:
self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim")
self._tw(w, f"{b} -> {a}", "changed")
if bd.field_name: self._tw(w, f" [{bd.field_name}]", "known")
self._tn(w)
report = render_session_report(sess, diffs, idx - 1 if idx > 0 else None)
self._tc(self.report_text)
self._tw(self.report_text, report, "normal")
self.nb.select(0)
def _show_frame_detail(self, sess_idx: int, frame_idx: int, source: str) -> None:
af = self._find_frame(sess_idx, frame_idx, source)
if af is None:
return
self._clear_tabs()
src = "bw" if source == "BW" else "s3"
sh = f"{af.header.sub:02X}" if af.header else "??"
w = self.inv_text
self._tw(w, f"[{af.source}] Frame #{af.frame.index}\n", src)
self._tw(w, f"Session {sess_idx} | ", "dim")
self._tw(w, f"SUB={sh} {af.sub_name}\n", "addr")
if af.header:
self._tw(w, f" OFFSET: {af.header.page_key:04X} "
f"CMD={af.header.cmd:02X} FLAGS={af.header.flags:02X}\n", "dim")
self._tn(w)
self._tw(w, f"Payload: {len(af.frame.payload)} bytes\n", "dim")
if af.frame.checksum_valid is False: self._tw(w, " BAD CHECKSUM\n", "warn")
elif af.frame.checksum_valid is True:
self._tw(w, f" Checksum: {af.frame.checksum_type} {af.frame.checksum_hex}\n", "dim")
self._tn(w)
p = af.frame.payload
if len(p) >= 5:
self._tw(w, "Header breakdown:\n", "head")
self._tw(w, f" [0] CMD = {p[0]:02x}\n", "dim")
self._tw(w, f" [1] ? = {p[1]:02x}\n", "dim")
self._tw(w, f" [2] SUB = {p[2]:02x} ({af.sub_name})\n", src)
self._tw(w, f" [3] OFFSET_HI = {p[3]:02x}\n", "dim")
self._tw(w, f" [4] OFFSET_LO = {p[4]:02x}\n", "dim")
if len(p) > 5: self._tw(w, f" [5..] data = {len(p)-5} bytes\n", "dim")
w = self.hex_text
self._tw(w, f"[{af.source}] SUB={sh} {af.sub_name}\n", src)
self._tw(w, f"Payload ({len(af.frame.payload)} bytes):\n", "dim")
self._tn(w)
self._tw(w, "\n".join(format_hex_dump(af.frame.payload, indent=" ")) + "\n", "normal")
diffs_for_sess = self.state.diffs[sess_idx] if sess_idx < len(self.state.diffs) else None
if diffs_for_sess and af.header:
matching = [fd for fd in diffs_for_sess
if fd.sub == af.header.sub and fd.page_key == af.header.page_key]
if matching:
self._tn(w)
self._tw(w, "Changed bytes (vs prev session):\n", "head")
for bd in matching[0].diffs:
b = f"{bd.before:02x}" if bd.before >= 0 else "--"
a = f"{bd.after:02x}" if bd.after >= 0 else "--"
self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim")
self._tw(w, f"{b} -> {a}", "changed")
if bd.field_name: self._tw(w, f" [{bd.field_name}]", "known")
self._tn(w)
self.nb.select(1)
# ── live mode ─────────────────────────────────────────────────────────
def _toggle_live(self) -> None:
if self._live_thread and self._live_thread.is_alive():
self._live_stop.set()
self._set_live_btn_off()
else:
s3p = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None
bwp = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None
if not s3p or not bwp:
messagebox.showerror("Missing files", "Select both raw files before starting live mode.")
return
self.state.s3_path = s3p
self.state.bw_path = bwp
self._start_live()
def _start_live(self) -> None:
s3p = Path(self.s3_var.get().strip())
bwp = Path(self.bw_var.get().strip())
self.state.s3_path = s3p
self.state.bw_path = bwp
self._live_stop.clear()
self._live_thread = threading.Thread(
target=self._live_worker, args=(s3p, bwp), daemon=True)
self._live_thread.start()
self.live_btn.configure(text="Live: ON", bg=GREEN, fg="#000000")
self.status_var.set("Live mode running...")
def _set_live_btn_off(self) -> None:
self.live_btn.configure(text="Live: OFF", bg=BG3, fg=FG)
self.status_var.set("Live stopped")
def _live_worker(self, s3_path: Path, bw_path: Path) -> None:
import time
s3_pos = bw_pos = 0
while not self._live_stop.is_set():
changed = False
for path, pos_attr in ((s3_path, "s3"), (bw_path, "bw")):
if path.exists():
with path.open("rb") as fh:
fh.seek(s3_pos if pos_attr == "s3" else bw_pos)
nb = fh.read()
if nb:
if pos_attr == "s3": s3_pos += len(nb)
else: bw_pos += len(nb)
changed = True
if changed:
self._live_q.put("refresh")
time.sleep(0.1)
def _poll_live_queue(self) -> None:
try:
while True:
msg = self._live_q.get_nowait()
if msg == "refresh" and self.state.s3_path and self.state.bw_path:
self._do_analyze(self.state.s3_path, self.state.bw_path)
except queue.Empty:
pass
finally:
self.after(150, self._poll_live_queue)
# ── DB query ──────────────────────────────────────────────────────────
def _refresh_query_dropdowns(self) -> None:
try:
captures = self._db.list_captures()
self._q_capture_cb["values"] = ["All"] + [
f"#{r['id']} {r['timestamp'][:16]} ({r['frame_count']} frames)"
for r in captures
]
self._q_capture_rows = [None] + [r["id"] for r in captures]
subs = self._db.get_distinct_subs()
self._q_sub_cb["values"] = ["All"] + [f"0x{s:02X}" for s in subs]
self._q_sub_values = [None] + subs
stats = self._db.get_stats()
self._q_stats_var.set(f"DB: {stats['captures']} captures | {stats['frames']} frames")
except Exception as exc:
self._q_stats_var.set(f"DB error: {exc}")
def _parse_int(self, s: str) -> Optional[int]:
s = s.strip()
if not s: return None
try: return int(s, 0)
except ValueError: return None
def _run_db_query(self) -> None:
cap_idx = self._q_capture_cb.current()
cap_id = self._q_capture_rows[cap_idx] if cap_idx > 0 else None
dir_val = self._q_dir_var.get()
direction = dir_val if dir_val != "All" else None
sub_idx = self._q_sub_cb.current()
sub = self._q_sub_values[sub_idx] if sub_idx > 0 else None
offset = self._parse_int(self._q_offset_var.get())
value = self._parse_int(self._q_value_var.get())
try:
if offset is not None:
rows = self._db.query_by_byte(offset=offset, value=value,
capture_id=cap_id, direction=direction, sub=sub)
else:
rows = self._db.query_frames(capture_id=cap_id, direction=direction, sub=sub)
except Exception as exc:
messagebox.showerror("Query error", str(exc))
return
self._q_tree.delete(*self._q_tree.get_children())
self._q_rows.clear()
for row in rows:
sh = f"0x{row['sub']:02X}" if row["sub"] is not None else ""
pg = f"0x{row['page_key']:04X}" if row["page_key"] is not None else ""
chk = {1: "OK", 0: "BAD", None: ""}.get(row["checksum_ok"], "")
tag = "bw_row" if row["direction"] == "BW" else "s3_row"
if row["checksum_ok"] == 0: tag = "bad_row"
iid = str(row["id"])
self._q_tree.insert("", tk.END, iid=iid, tags=(tag,), values=(
row["capture_id"], row["session_idx"], row["direction"],
sh, row["sub_name"] or "", pg, row["payload_len"], chk))
self._q_rows[iid] = row
self.sb_var.set(f"Query returned {len(rows)} rows")
def _run_interpret(self) -> None:
sel = self._q_tree.selection()
if not sel: return
row = self._q_rows.get(sel[0])
if not row: return
offset = self._parse_int(self._interp_offset_var.get())
if offset is None: return
payload = bytes(row["payload"])
interp = self._db.interpret_offset(payload, offset)
w = self._interp_text
w.configure(state="normal")
w.delete("1.0", tk.END)
sh = f"0x{row['sub']:02X}" if row["sub"] is not None else "??"
w.insert(tk.END,
f"Frame #{row['id']} [{row['direction']}] SUB={sh} "
f"offset={offset} (0x{offset:04X})\n", "label")
line = ""
for key, lbl in [
("uint8","uint8 "), ("int8","int8 "),
("uint16_be","uint16 BE "), ("uint16_le","uint16 LE "),
("uint32_be","uint32 BE "), ("uint32_le","uint32 LE "),
("float32_be","float32 BE "), ("float32_le","float32 LE "),
]:
if key not in interp: continue
val = interp[key]
vs = f"{val:.6g}" if isinstance(val, float) else (
f"{val} (0x{int(val)&0xFFFFFFFF:X})")
line += f" {lbl}: {vs:<28}"
if len(line) > 80:
w.insert(tk.END, line + "\n", "value"); line = ""
if line: w.insert(tk.END, line + "\n", "value")
w.configure(state="disabled")
# ── text helpers ──────────────────────────────────────────────────────
def _tc(self, w: tk.Text) -> None:
w.configure(state="normal"); w.delete("1.0", tk.END)
def _tw(self, w: tk.Text, text: str, tag: str = "normal") -> None:
w.configure(state="normal"); w.insert(tk.END, text, tag)
def _tn(self, w: tk.Text) -> None:
w.configure(state="normal"); w.insert(tk.END, "\n"); w.configure(state="disabled")
# ─────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────────────────────────────────────
# Serial Watch panel — tap the RS-232 line between device and modem
# ─────────────────────────────────────────────────────────────────────────────
try:
import serial as _serial
from serial.tools import list_ports as _list_ports
_SERIAL_OK = True
except ImportError:
_SERIAL_OK = False
from minimateplus.framing import S3FrameParser as _S3FrameParser # noqa: E402
_SW_KNOWN_SUBS = {
0xA4: "POLL_RSP", 0xA5: "BULK_WAVEFORM_RSP", 0xE0: "ADV_EVENT_RSP",
0xE1: "EVT_IDX_FIRST_RSP", 0xE3: "MONITOR_STATUS_RSP", 0xEA: "SERIAL_NUM_RSP",
0xF3: "WAVEFORM_REC_RSP", 0xF5: "WAVEFORM_HDR_RSP", 0xF7: "EVENT_INDEX_RSP",
0xF9: "UNK_06_RSP", 0xFE: "DEVICE_INFO_RSP",
0x69: "START_MON_ACK", 0x68: "STOP_MON_ACK",
}
class SerialWatchPanel(tk.Frame):
"""
Tap the RS-232 line between the MiniMate Plus and its modem (RV50/RV55).
Runs the serial reader in a background thread; surfaces parsed S3 frames
live in the log view. Writes raw_s3_<ts>.bin compatible with Analyzer.
Typical use for call-home capture:
1. Connect a USB-to-serial tap to the RS-232 line.
2. Pick that COM port here, click Start.
3. Wait for the unit to trigger / call home.
4. Click Stop, then 'Open in Analyzer' to inspect the frames.
"""
_COL_FRAME = "#4ec9b0" # teal — parsed S3 frame
_COL_CTRL = "#dcdcaa" # yellow — control-line change
_COL_AT = "#9cdcfe" # blue — AT command / ASCII noise
_COL_ERR = "#f44747" # red — error
def __init__(self, parent: tk.Widget, on_capture_ready=None, **kw):
"""
on_capture_ready(raw_s3_path: str) — called when capture stops,
so the parent can inject the file into the Analyzer.
"""
super().__init__(parent, bg=BG2, **kw)
self._on_capture_ready = on_capture_ready
self._serial: Optional[object] = None # serial.Serial instance
self._reader_thread: Optional[threading.Thread] = None
self._stop_evt = threading.Event()
self._log_q: queue.Queue[tuple[str, str]] = queue.Queue() # (text, colour)
self._raw_fh = None # open binary file handle
self._raw_path: Optional[str] = None
self._frame_count = 0
self._build()
self._poll_log_queue()
# ── build ─────────────────────────────────────────────────────────────
def _build(self) -> None:
pad = {"padx": 6, "pady": 4}
cfg = tk.Frame(self, bg=BG2)
cfg.pack(side=tk.TOP, fill=tk.X, padx=4, pady=4)
# Row 0 — port picker
tk.Label(cfg, text="COM port:", bg=BG2, fg=FG, font=MONO
).grid(row=0, column=0, sticky="e", **pad)
self._port_var = tk.StringVar()
self._port_cb = ttk.Combobox(cfg, textvariable=self._port_var,
width=12, font=MONO, state="normal")
self._port_cb.grid(row=0, column=1, sticky="w", **pad)
tk.Button(cfg, text="", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=self._refresh_ports
).grid(row=0, column=2, **pad)
tk.Label(cfg, text=" Baud:", bg=BG2, fg=FG, font=MONO
).grid(row=0, column=3, sticky="e", **pad)
self._baud_var = tk.StringVar(value="38400")
tk.Entry(cfg, textvariable=self._baud_var, width=8,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO
).grid(row=0, column=4, sticky="w", **pad)
self._ack_ok_var = tk.BooleanVar(value=False)
tk.Checkbutton(cfg, text="Ack OK to AT commands",
variable=self._ack_ok_var,
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO).grid(row=0, column=5, sticky="w", **pad)
# Row 1 — capture dir
tk.Label(cfg, text="Save to:", bg=BG2, fg=FG, font=MONO
).grid(row=1, column=0, sticky="e", **pad)
self._dir_var = tk.StringVar(
value=str(SCRIPT_DIR / "bridges" / "captures"))
tk.Entry(cfg, textvariable=self._dir_var, width=40,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO
).grid(row=1, column=1, columnspan=4, sticky="we", **pad)
tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat",
cursor="hand2", font=MONO, command=self._choose_dir
).grid(row=1, column=5, **pad)
# Button row
btn_row = tk.Frame(self, bg=BG2)
btn_row.pack(side=tk.TOP, fill=tk.X, padx=4, pady=2)
self._start_btn = tk.Button(
btn_row, text="Start Watch", bg=GREEN, fg="#000000",
relief="flat", padx=12, cursor="hand2", font=MONO_B,
command=self._start)
self._start_btn.pack(side=tk.LEFT, padx=6)
self._stop_btn = tk.Button(
btn_row, text="Stop", bg=BG3, fg=FG,
relief="flat", padx=12, cursor="hand2", font=MONO,
command=self._stop, state="disabled")
self._stop_btn.pack(side=tk.LEFT, padx=4)
self._analyzer_btn = tk.Button(
btn_row, text="Open in Analyzer", bg=BG3, fg=FG,
relief="flat", padx=10, cursor="hand2", font=MONO,
command=self._send_to_analyzer, state="disabled")
self._analyzer_btn.pack(side=tk.LEFT, padx=4)
tk.Button(btn_row, text="Clear", bg=BG3, fg=FG,
relief="flat", padx=8, cursor="hand2", font=MONO,
command=self._clear_log).pack(side=tk.LEFT, padx=4)
self._status_var = tk.StringVar(value="Idle")
tk.Label(btn_row, textvariable=self._status_var,
bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=10)
# Log view
self._log = scrolledtext.ScrolledText(
self, height=24, font=MONO_SM,
bg=BG, fg=FG, insertbackground=FG,
relief="flat", state="disabled",
)
self._log.pack(fill=tk.BOTH, expand=True, padx=4, pady=4)
self._log.tag_config("frame", foreground=self._COL_FRAME)
self._log.tag_config("ctrl", foreground=self._COL_CTRL)
self._log.tag_config("at", foreground=self._COL_AT)
self._log.tag_config("err", foreground=self._COL_ERR)
self._log.tag_config("dim", foreground=FG_DIM)
# Populate ports on first load
self._refresh_ports()
# ── port helpers ──────────────────────────────────────────────────────
def _refresh_ports(self) -> None:
if not _SERIAL_OK:
self._port_cb["values"] = ["(pyserial not installed)"]
return
ports = [p.device for p in _list_ports.comports()]
self._port_cb["values"] = ports
if ports and not self._port_var.get():
self._port_var.set(ports[0])
def _choose_dir(self) -> None:
d = filedialog.askdirectory(initialdir=self._dir_var.get())
if d:
self._dir_var.set(d)
# ── start / stop ──────────────────────────────────────────────────────
def _start(self) -> None:
if not _SERIAL_OK:
messagebox.showerror(
"pyserial missing",
"Install pyserial first:\n pip install pyserial")
return
port = self._port_var.get().strip()
if not port or "not installed" in port:
messagebox.showerror("Error", "Select a valid COM port first.")
return
try:
baud = int(self._baud_var.get().strip())
except ValueError:
messagebox.showerror("Error", "Invalid baud rate.")
return
# Open output files
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
out_dir = Path(self._dir_var.get()) / f"serial_{ts}"
out_dir.mkdir(parents=True, exist_ok=True)
self._raw_path = str(out_dir / f"raw_s3_{ts}.bin")
try:
self._raw_fh = open(self._raw_path, "wb")
except OSError as exc:
messagebox.showerror("Error", f"Cannot open capture file:\n{exc}")
return
# Open serial port
try:
ser = _serial.Serial(
port=port, baudrate=baud,
bytesize=8, parity=_serial.PARITY_NONE,
stopbits=_serial.STOPBITS_ONE,
timeout=0.05, write_timeout=0,
)
ser.setDTR(True)
ser.setRTS(True)
except Exception as exc:
self._raw_fh.close()
self._raw_fh = None
messagebox.showerror("Error", f"Cannot open {port}:\n{exc}")
return
self._serial = ser
self._stop_evt.clear()
self._frame_count = 0
self._analyzer_btn.configure(state="disabled")
self._reader_thread = threading.Thread(
target=self._reader_loop,
args=(ser, baud),
daemon=True,
)
self._reader_thread.start()
self._status_var.set(f"Watching {port} @ {baud}")
self._start_btn.configure(state="disabled")
self._stop_btn.configure(state="normal", bg=RED)
self._append(f"── Serial watch started {port} @ {baud} [{ts}] ──\n", "dim")
self._append(f" Capture: {self._raw_path}\n", "dim")
self._append(" Waiting for data…\n\n", "dim")
def _stop(self) -> None:
self._stop_evt.set()
if self._serial:
try:
self._serial.close()
except Exception:
pass
self._serial = None
if self._raw_fh:
self._raw_fh.close()
self._raw_fh = None
self._status_var.set("Stopped")
self._start_btn.configure(state="normal")
self._stop_btn.configure(state="disabled", bg=BG3)
if self._raw_path and Path(self._raw_path).exists():
self._analyzer_btn.configure(state="normal")
self._append("\n── Watch stopped ──\n", "dim")
# ── reader thread ─────────────────────────────────────────────────────
def _reader_loop(self, ser, baud: int) -> None:
parser = _S3FrameParser()
rx_buf = bytearray()
ack_ok = self._ack_ok_var.get()
# Monitor control lines in a sub-thread
ctrl_stop = threading.Event()
ctrl_thread = threading.Thread(
target=self._ctrl_loop, args=(ser, ctrl_stop), daemon=True)
ctrl_thread.start()
try:
while not self._stop_evt.is_set():
try:
data = ser.read(4096)
except Exception as exc:
self._log_q.put((f"Read error: {exc}\n", "err"))
break
if not data:
continue
# Save raw bytes
if self._raw_fh:
try:
self._raw_fh.write(data)
self._raw_fh.flush()
except Exception:
pass
# Parse S3 frames
for byte in data:
result = parser.feed(bytes([byte]))
if result:
frames = result if isinstance(result, list) else [result]
for f in frames:
self._frame_count += 1
name = _SW_KNOWN_SUBS.get(f.sub, f"UNK_0x{f.sub:02X}")
chk = "" if f.checksum_valid else "✗ BAD_CHK"
peek = f.data[:32].hex() + ("" if len(f.data) > 32 else "")
msg = (
f"[{self._frame_count:04d}] "
f"SUB=0x{f.sub:02X} ({name:<22}) "
f"page=0x{f.page_key:04X} "
f"data={len(f.data):4d}B {chk}\n"
f" {peek}\n"
)
self._log_q.put((msg, "frame"))
# AT command handling for --ack-ok mode
if ack_ok:
rx_buf.extend(data)
while b"\r" in rx_buf or b"\n" in rx_buf:
for sep in (b"\r", b"\n"):
idx = rx_buf.find(sep)
if idx != -1:
line_bytes = bytes(rx_buf[:idx])
del rx_buf[:idx + 1]
break
else:
break
line_str = line_bytes.decode("latin1", errors="ignore").strip()
if line_str.upper().startswith("AT"):
self._log_q.put((f"AT: {line_str!r}\n", "at"))
if not line_str.upper().startswith("ATDT"):
try:
ser.write(b"\r\nOK\r\n")
ser.flush()
self._log_q.put((f" → OK\n", "at"))
except Exception:
pass
finally:
ctrl_stop.set()
ctrl_thread.join(timeout=0.5)
# Signal the main thread that the reader ended naturally
if not self._stop_evt.is_set():
self._log_q.put(("<<done>>", ""))
def _ctrl_loop(self, ser, stop: threading.Event) -> None:
prev = {}
try:
prev = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd)
try:
prev["RI"] = ser.ri
except Exception:
prev["RI"] = None
except Exception:
return
while not stop.is_set():
try:
cur = dict(CTS=ser.cts, DSR=ser.dsr, DCD=ser.cd, RI=None)
try:
cur["RI"] = ser.ri
except Exception:
pass
for name, val in cur.items():
if val != prev.get(name):
self._log_q.put((f"CTRL {name}{val}\n", "ctrl"))
prev[name] = val
except Exception:
break
stop.wait(0.2)
# ── log view ──────────────────────────────────────────────────────────
def _poll_log_queue(self) -> None:
try:
while True:
text, tag = self._log_q.get_nowait()
if text == "<<done>>":
self._stop()
break
self._append(text, tag)
except queue.Empty:
pass
finally:
self.after(80, self._poll_log_queue)
def _append(self, text: str, tag: str = "") -> None:
self._log.configure(state="normal")
if tag:
self._log.insert(tk.END, text, tag)
else:
self._log.insert(tk.END, text)
self._log.see(tk.END)
self._log.configure(state="disabled")
def _clear_log(self) -> None:
self._log.configure(state="normal")
self._log.delete("1.0", tk.END)
self._log.configure(state="disabled")
# ── send to analyzer ──────────────────────────────────────────────────
def _send_to_analyzer(self) -> None:
if self._raw_path and self._on_capture_ready:
self._on_capture_ready(self._raw_path)
# Console panel (tk.Frame — lives inside a notebook tab)
# ─────────────────────────────────────────────────────────────────────────────
class ConsolePanel(tk.Frame):
"""
Direct device connection and diagnostic console.
Lets you run individual protocol commands against a MiniMate Plus via
serial or TCP, showing colour-coded TX/RX bytes and decoded output in a
scrolling console.
Colour scheme:
TX frames — ACCENT blue (#569cd6)
RX raw hex — teal (#4ec9b0)
Parsed/decoded — green (#4caf50)
Errors — red (#f44747)
Status/info — dim grey (#6a6a6a)
Section heads — yellow (#dcdcaa)
Log is auto-saved on "Save Log"; "Send to Analyzer" writes the captured
RX bytes as a raw .bin file and injects the path into the Analyzer tab.
"""
TAG_TX = "tx"
TAG_RX_RAW = "rx_raw"
TAG_PARSED = "parsed"
TAG_ERROR = "error"
TAG_STATUS = "status"
TAG_HEAD = "head"
MAX_LINES = 5000
def __init__(self, parent: tk.Widget, on_send_to_analyzer=None, **kw):
super().__init__(parent, bg=BG2, **kw)
self._on_send_to_analyzer = on_send_to_analyzer
self._q: queue.Queue = queue.Queue()
self._running = False
self._log_lines: list[str] = []
self._last_raw_rx: Optional[bytes] = None
self._cmd_btns: list[tk.Button] = []
self._build()
self._poll_q()
# ── build ─────────────────────────────────────────────────────────────
def _build(self) -> None:
pad = {"padx": 5, "pady": 3}
# ── top config row ────────────────────────────────────────────────
cfg = tk.Frame(self, bg=BG2)
cfg.pack(side=tk.TOP, fill=tk.X, padx=6, pady=4)
# Transport radio buttons
self._transport_var = tk.StringVar(value="tcp")
tk.Radiobutton(
cfg, text="TCP", variable=self._transport_var, value="tcp",
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO, command=self._on_transport_change,
).grid(row=0, column=0, padx=(0, 4))
tk.Radiobutton(
cfg, text="Serial", variable=self._transport_var, value="serial",
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO, command=self._on_transport_change,
).grid(row=0, column=1, padx=(0, 12))
# TCP fields
self._tcp_frame = tk.Frame(cfg, bg=BG2)
self._tcp_frame.grid(row=0, column=2, sticky="w")
tk.Label(self._tcp_frame, text="Host:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self._host_var = tk.StringVar(value="127.0.0.1")
tk.Entry(
self._tcp_frame, textvariable=self._host_var, width=18,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).pack(side=tk.LEFT, padx=2)
tk.Label(self._tcp_frame, text="Port:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, padx=(10, 4))
self._tcp_port_var = tk.StringVar(value="9034")
tk.Entry(
self._tcp_frame, textvariable=self._tcp_port_var, width=6,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).pack(side=tk.LEFT, padx=2)
# Serial fields (hidden by default)
self._serial_frame = tk.Frame(cfg, bg=BG2)
tk.Label(self._serial_frame, text="Port:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self._port_var = tk.StringVar(value="COM5")
tk.Entry(
self._serial_frame, textvariable=self._port_var, width=10,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).pack(side=tk.LEFT, padx=2)
tk.Label(self._serial_frame, text="Baud:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, padx=(10, 4))
self._baud_var = tk.StringVar(value="38400")
tk.Entry(
self._serial_frame, textvariable=self._baud_var, width=8,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).pack(side=tk.LEFT, padx=2)
# Timeout
tk.Label(cfg, text="Timeout:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=3, padx=(18, 4))
self._timeout_var = tk.StringVar(value="30")
tk.Entry(
cfg, textvariable=self._timeout_var, width=5,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).grid(row=0, column=4, padx=2)
tk.Label(cfg, text="s", bg=BG2, fg=FG_DIM, font=MONO).grid(row=0, column=5)
# ── command buttons row ───────────────────────────────────────────
cmd_row = tk.Frame(self, bg=BG2)
cmd_row.pack(side=tk.TOP, fill=tk.X, padx=6, pady=(0, 4))
tk.Label(cmd_row, text="Commands:", bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=(0, 10))
for label, cmd in [
("POLL", "poll"),
("Serial #", "serial_number"),
("Full Config", "full_config"),
("Event Index", "event_index"),
]:
btn = tk.Button(
cmd_row, text=label, bg=ACCENT, fg="#ffffff",
relief="flat", padx=10, cursor="hand2", font=MONO,
command=lambda c=cmd: self._run_command(c),
)
btn.pack(side=tk.LEFT, padx=4)
self._cmd_btns.append(btn)
self._status_var = tk.StringVar(value="Ready")
tk.Label(cmd_row, textvariable=self._status_var,
bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=14)
# ── console output ────────────────────────────────────────────────
self._console = scrolledtext.ScrolledText(
self, height=20, font=MONO_SM,
bg=BG, fg=FG, insertbackground=FG,
relief="flat", state="disabled",
)
self._console.pack(fill=tk.BOTH, expand=True, padx=6, pady=4)
self._console.tag_configure(self.TAG_TX, foreground=ACCENT)
self._console.tag_configure(self.TAG_RX_RAW, foreground=COL_S3)
self._console.tag_configure(self.TAG_PARSED, foreground=GREEN)
self._console.tag_configure(self.TAG_ERROR, foreground=RED)
self._console.tag_configure(self.TAG_STATUS, foreground=FG_DIM)
self._console.tag_configure(self.TAG_HEAD, foreground=YELLOW, font=MONO_B)
# ── bottom bar ────────────────────────────────────────────────────
bot = tk.Frame(self, bg=BG2)
bot.pack(side=tk.BOTTOM, fill=tk.X, padx=6, pady=4)
tk.Button(
bot, text="Clear", bg=BG3, fg=FG, relief="flat",
padx=10, cursor="hand2", font=MONO,
command=self._clear_console,
).pack(side=tk.LEFT, padx=4)
tk.Button(
bot, text="Save Log", bg=BG3, fg=FG, relief="flat",
padx=10, cursor="hand2", font=MONO,
command=self._save_log,
).pack(side=tk.LEFT, padx=4)
self._send_btn = tk.Button(
bot, text="Send to Analyzer", bg=BG3, fg=FG_DIM, relief="flat",
padx=10, cursor="hand2", font=MONO,
command=self._send_to_analyzer, state="disabled",
)
self._send_btn.pack(side=tk.LEFT, padx=4)
# ── transport toggle ──────────────────────────────────────────────────
def _on_transport_change(self) -> None:
if self._transport_var.get() == "tcp":
self._serial_frame.grid_remove()
self._tcp_frame.grid(row=0, column=2, sticky="w")
else:
self._tcp_frame.grid_remove()
self._serial_frame.grid(row=0, column=2, sticky="w")
# ── console helpers ───────────────────────────────────────────────────
def _append(self, text: str, tag: str = "status") -> None:
"""Append coloured text (main thread only — called via _poll_q)."""
self._log_lines.append(text)
self._console.configure(state="normal")
self._console.insert(tk.END, text, tag)
line_count = int(self._console.index("end-1c").split(".")[0])
if line_count > self.MAX_LINES:
self._console.delete("1.0", f"{line_count - self.MAX_LINES}.0")
self._console.see(tk.END)
self._console.configure(state="disabled")
def _clear_console(self) -> None:
self._console.configure(state="normal")
self._console.delete("1.0", tk.END)
self._console.configure(state="disabled")
self._log_lines.clear()
def _save_log(self) -> None:
cap_dir = SCRIPT_DIR / "bridges" / "captures"
cap_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
path = cap_dir / f"console_{ts}.log"
try:
path.write_text("".join(self._log_lines), encoding="utf-8")
self._q.put(("status", f"Log saved → {path.name}"))
except Exception as exc:
messagebox.showerror("Save Error", str(exc))
def _send_to_analyzer(self) -> None:
if not self._last_raw_rx or not self._on_send_to_analyzer:
return
cap_dir = SCRIPT_DIR / "bridges" / "captures"
cap_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
raw_path = cap_dir / f"console_s3_{ts}.bin"
try:
raw_path.write_bytes(self._last_raw_rx)
self._on_send_to_analyzer(str(raw_path))
self._q.put(("status", f"Sent to Analyzer → {raw_path.name}"))
except Exception as exc:
messagebox.showerror("Error", str(exc))
# ── command dispatch ──────────────────────────────────────────────────
def _set_buttons_state(self, state: str) -> None:
for btn in self._cmd_btns:
btn.configure(state=state)
def _run_command(self, cmd: str) -> None:
if self._running:
return
# Snapshot config in main thread before handing off to worker
config = {
"transport": self._transport_var.get(),
"host": self._host_var.get().strip(),
"tcp_port": int(self._tcp_port_var.get().strip() or "9034"),
"port": self._port_var.get().strip(),
"baud": int(self._baud_var.get().strip() or "38400"),
"timeout": float(self._timeout_var.get().strip() or "30"),
"cmd": cmd,
}
self._running = True
self._set_buttons_state("disabled")
self._status_var.set("Running…")
threading.Thread(target=self._worker, args=(config,), daemon=True).start()
# ── worker thread ─────────────────────────────────────────────────────
def _worker(self, cfg: dict) -> None:
"""Background thread — open transport, run command, post results to queue."""
q = self._q
def post(kind: str, text: str) -> None:
q.put((kind, text))
try:
from minimateplus.transport import SerialTransport, TcpTransport
from minimateplus.protocol import (
MiniMateProtocol,
SUB_SERIAL_NUMBER,
SUB_FULL_CONFIG,
SUB_EVENT_INDEX,
)
except ImportError as exc:
post("error", f"Import error: {exc}\nIs minimateplus installed?\n")
q.put(("done", None))
return
timeout = cfg["timeout"]
cmd = cfg["cmd"]
# Build transport
if cfg["transport"] == "tcp":
host = cfg["host"]
tcp_port = cfg["tcp_port"]
post("status", f"Connecting {host}:{tcp_port}")
transport = TcpTransport(host, tcp_port, connect_timeout=timeout)
else:
port = cfg["port"]
baud = cfg["baud"]
post("status", f"Opening {port} @ {baud} baud…")
transport = SerialTransport(port, baud)
# Wrap transport to capture every TX/RX byte
raw_rx = bytearray()
orig_write = transport.write
orig_read = transport.read
def logged_write(data: bytes) -> None:
post("tx", f"TX [{len(data):3d}B]: {data.hex()}\n")
orig_write(data)
def logged_read(n: int) -> bytes:
result = orig_read(n)
if result:
raw_rx.extend(result)
post("rx_raw", f"RX [{len(result):3d}B]: {result.hex()}\n")
return result
transport.write = logged_write # type: ignore[method-assign]
transport.read = logged_read # type: ignore[method-assign]
try:
with transport:
post("status", "Connected.")
proto = MiniMateProtocol(transport, recv_timeout=timeout)
if cmd == "poll":
post("head", "\n── POLL startup ─────────────────────────────\n")
frame = proto.startup()
post("parsed", f" payload ({len(frame.data)} B): {frame.data.hex()}\n")
try:
text = frame.data.decode("ascii", errors="replace")
post("parsed", f" text: {text!r}\n")
except Exception:
pass
elif cmd == "serial_number":
post("head", "\n── POLL startup ─────────────────────────────\n")
proto.startup()
post("head", "\n── Serial Number (SUB 0x15) ─────────────────\n")
data = proto.read(SUB_SERIAL_NUMBER)
post("parsed", f" raw ({len(data)} B): {data.hex()}\n")
sn = data.rstrip(b"\x00").decode("ascii", errors="replace").strip()
post("parsed", f" serial: {sn!r}\n")
elif cmd == "full_config":
post("head", "\n── POLL startup ─────────────────────────────\n")
proto.startup()
post("head", "\n── Full Config (SUB 0x01) ───────────────────\n")
data = proto.read(SUB_FULL_CONFIG)
post("parsed", f" raw ({len(data)} B):\n")
for i in range(0, len(data), 16):
chunk = data[i:i + 16]
hex_part = " ".join(f"{b:02X}" for b in chunk)
asc_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
post("parsed", f" {i:04X}: {hex_part:<48} {asc_part}\n")
elif cmd == "event_index":
post("head", "\n── POLL startup ─────────────────────────────\n")
proto.startup()
post("head", "\n── Event Index (SUB 0x08) ───────────────────\n")
data = proto.read(SUB_EVENT_INDEX)
post("parsed", f" raw ({len(data)} B):\n")
for i in range(0, len(data), 16):
chunk = data[i:i + 16]
hex_part = " ".join(f"{b:02X}" for b in chunk)
asc_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
post("parsed", f" {i:04X}: {hex_part:<48} {asc_part}\n")
post("status", "Done.")
q.put(("save_raw", bytes(raw_rx)))
except Exception as exc:
post("error", f"\nError: {exc}\n")
finally:
q.put(("done", None))
# ── queue poll ────────────────────────────────────────────────────────
def _poll_q(self) -> None:
try:
while True:
kind, payload = self._q.get_nowait()
if kind == "tx":
self._append(payload, self.TAG_TX)
elif kind == "rx_raw":
self._append(payload, self.TAG_RX_RAW)
elif kind == "parsed":
self._append(payload, self.TAG_PARSED)
elif kind == "error":
self._append(payload, self.TAG_ERROR)
elif kind == "head":
self._append(payload, self.TAG_HEAD)
elif kind == "status":
self._status_var.set(str(payload))
self._append(f" [{payload}]\n", self.TAG_STATUS)
elif kind == "save_raw":
self._last_raw_rx = payload
if payload:
self._send_btn.configure(state="normal", fg=FG)
elif kind == "done":
self._running = False
self._set_buttons_state("normal")
self._status_var.set("Ready")
except queue.Empty:
pass
finally:
self.after(100, self._poll_q)
# ─────────────────────────────────────────────────────────────────────────────
# Main application window
# ─────────────────────────────────────────────────────────────────────────────
class SeismoLab(tk.Tk):
def __init__(self) -> None:
super().__init__()
self.title("Seismo Lab")
self.configure(bg=BG)
self.minsize(1100, 680)
self._db = FrameDB()
style = ttk.Style()
style.theme_use("clam")
style.configure("Top.TNotebook", background=BG3, borderwidth=0, tabposition="nw")
style.configure("Top.TNotebook.Tab", background=BG3, foreground=FG,
font=("Consolas", 10, "bold"), padding=[16, 6])
style.map("Top.TNotebook.Tab",
background=[("selected", ACCENT)],
foreground=[("selected", "#ffffff")])
nb = ttk.Notebook(self, style="Top.TNotebook")
nb.pack(fill=tk.BOTH, expand=True)
self._bridge_panel = BridgePanel(
nb,
on_bridge_started=self._on_bridge_started,
on_bridge_stopped=self._on_bridge_stopped,
)
nb.add(self._bridge_panel, text=" Bridge ")
self._analyzer_panel = AnalyzerPanel(nb, db=self._db)
nb.add(self._analyzer_panel, text=" Analyzer ")
self._console_panel = ConsolePanel(
nb,
on_send_to_analyzer=self._on_console_send_to_analyzer,
)
nb.add(self._console_panel, text=" Console ")
self._serial_watch_panel = SerialWatchPanel(
nb,
on_capture_ready=self._on_serial_capture_ready,
)
nb.add(self._serial_watch_panel, text=" Serial Watch ")
self._nb = nb
self.protocol("WM_DELETE_WINDOW", self._on_close)
def _on_bridge_started(self, raw_bw: Optional[str], raw_s3: Optional[str],
struct_bin: Optional[str] = None) -> None:
"""Bridge started — inject paths into analyzer and start live mode."""
self._analyzer_panel.set_live_files(raw_bw, raw_s3, struct_bin)
# Switch to Analyzer tab so the user can watch it update
self._nb.select(1)
def _on_bridge_stopped(self) -> None:
self._analyzer_panel.stop_live()
def _on_console_send_to_analyzer(self, raw_s3_path: str) -> None:
"""Console captured bytes → inject into Analyzer S3 field and switch tab."""
self._analyzer_panel.s3_var.set(raw_s3_path)
self._nb.select(1)
def _on_serial_capture_ready(self, raw_s3_path: str) -> None:
"""Serial Watch capture finished → inject into Analyzer and switch tab."""
self._analyzer_panel.s3_var.set(raw_s3_path)
self._nb.select(1)
def _on_close(self) -> None:
self._bridge_panel.stop_bridge()
self._serial_watch_panel._stop()
self.destroy()
# ─────────────────────────────────────────────────────────────────────────────
def main() -> None:
app = SeismoLab()
app.mainloop()
if __name__ == "__main__":
main()