Files
seismo-relay/seismo_lab.py
Brian Harrison 1078576023 Add Console tab to seismo_lab + document RV50/RV55 modem config
seismo_lab.py:
- Add ConsolePanel — third tab for direct device connections over serial
  or TCP without the bridge subprocess
- Commands: POLL, Serial #, Full Config, Event Index (open/close per cmd)
- Colour-coded output: TX blue, RX raw teal, parsed green, errors red
- Save Log and Send to Analyzer buttons; auto-saves to bridges/captures/
- Queue/after(100) pattern — no performance impact
- Add SCRIPT_DIR to sys.path so minimateplus imports work from GUI

docs/instantel_protocol_reference.md:
- Confirm calibration year field at SUB FE payload offset 0x56–0x57
  (uint16 BE): 0x07E7=2023 (BE18189), 0x07E9=2025 (BE11529)
- Document full Sierra Wireless RV50/RV55 required ACEmanager settings
  (Quiet Mode, Data Forwarding Timeout, TCP Connect Response Delay, etc.)
- Correct §14.2: RV50/RV55 sends RING/CONNECT over TCP to caller even
  with Quiet Mode on; parser handles by scanning for DLE+STX
- Confirm "Operating System" boot string capture via cold-start Console
- Resolve open question: 0x07E7 field = calibration year

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 17:19:31 -04:00

1539 lines
72 KiB
Python

#!/usr/bin/env python3
"""
seismo_lab.py — Combined S3 Bridge + Protocol Analyzer + Device Console GUI.
Single window with three top-level tabs:
Bridge — capture live serial traffic (wraps s3_bridge.py as subprocess)
Analyzer — parse, diff, and query captured frames
Console — direct device connection; runs commands and shows raw bytes +
decoded output; colour-coded TX/RX console with log save and
Send-to-Analyzer support
When the bridge starts:
- raw tap paths are auto-filled in the Analyzer tab
- Live mode is automatically enabled so the Analyzer updates in real time
Run from anywhere:
python seismo_lab.py
"""
from __future__ import annotations
import datetime
import os
import queue
import subprocess
import sys
import threading
import tkinter as tk
from pathlib import Path
from tkinter import filedialog, messagebox, scrolledtext, simpledialog, ttk
from typing import Optional
# ── path setup ────────────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).parent
BRIDGE_PATH = SCRIPT_DIR / "bridges" / "s3-bridge" / "s3_bridge.py"
PARSERS_DIR = SCRIPT_DIR / "parsers"
sys.path.insert(0, str(PARSERS_DIR))
sys.path.insert(0, str(SCRIPT_DIR)) # for minimateplus package
from s3_analyzer import ( # noqa: E402
AnnotatedFrame,
FrameDiff,
Session,
annotate_frames,
diff_sessions,
format_hex_dump,
parse_bw,
parse_s3,
parse_structured_bin,
render_session_report,
split_into_sessions,
split_sessions_at_marks,
write_claude_export,
)
from frame_db import FrameDB # noqa: E402
# ── colour palette ────────────────────────────────────────────────────────────
BG = "#1e1e1e"
BG2 = "#252526"
BG3 = "#2d2d30"
FG = "#d4d4d4"
FG_DIM = "#6a6a6a"
ACCENT = "#569cd6"
RED = "#f44747"
YELLOW = "#dcdcaa"
GREEN = "#4caf50"
ORANGE = "#ce9178"
COL_BW = "#9cdcfe"
COL_S3 = "#4ec9b0"
COL_DIFF = "#f44747"
COL_KNOW = "#4caf50"
COL_HEAD = "#569cd6"
MONO = ("Consolas", 9)
MONO_SM = ("Consolas", 8)
MONO_B = ("Consolas", 9, "bold")
# ─────────────────────────────────────────────────────────────────────────────
# Shared state
# ─────────────────────────────────────────────────────────────────────────────
class AnalyzerState:
def __init__(self) -> None:
self.sessions: list[Session] = []
self.diffs: list[Optional[list[FrameDiff]]] = []
self.s3_path: Optional[Path] = None
self.bw_path: Optional[Path] = None
self.last_capture_id: Optional[int] = None
# ─────────────────────────────────────────────────────────────────────────────
# Bridge panel (tk.Frame — lives inside a notebook tab)
# ─────────────────────────────────────────────────────────────────────────────
class BridgePanel(tk.Frame):
"""
All bridge controls and live log output.
Calls on_bridge_started(raw_bw_path, raw_s3_path) when the bridge starts
so the parent can wire up the Analyzer.
"""
def __init__(self, parent: tk.Widget, on_bridge_started, on_bridge_stopped, **kw):
super().__init__(parent, bg=BG2, **kw)
self._on_started = on_bridge_started # signature: (raw_bw, raw_s3, struct_bin)
self._on_stopped = on_bridge_stopped
self.process: Optional[subprocess.Popen] = None
self._stdout_q: queue.Queue[str] = queue.Queue()
self._build()
self._poll_stdout()
# ── build ─────────────────────────────────────────────────────────────
def _build(self) -> None:
pad = {"padx": 6, "pady": 4}
cfg = tk.Frame(self, bg=BG2)
cfg.pack(side=tk.TOP, fill=tk.X, padx=4, pady=4)
# Row 0: ports
tk.Label(cfg, text="BW COM:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=0, sticky="e", **pad)
self.bw_var = tk.StringVar(value="COM4")
tk.Entry(cfg, textvariable=self.bw_var, width=10,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=0, column=1, sticky="w", **pad)
tk.Label(cfg, text="S3 COM:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=2, sticky="e", **pad)
self.s3_var = tk.StringVar(value="COM5")
tk.Entry(cfg, textvariable=self.s3_var, width=10,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=0, column=3, sticky="w", **pad)
tk.Label(cfg, text="Baud:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=4, sticky="e", **pad)
self.baud_var = tk.StringVar(value="38400")
tk.Entry(cfg, textvariable=self.baud_var, width=8,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=0, column=5, sticky="w", **pad)
# Row 1: log dir
tk.Label(cfg, text="Log dir:", bg=BG2, fg=FG, font=MONO).grid(row=1, column=0, sticky="e", **pad)
self.logdir_var = tk.StringVar(value=str(SCRIPT_DIR / "bridges" / "captures"))
tk.Entry(cfg, textvariable=self.logdir_var, width=40,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=1, column=1, columnspan=4, sticky="we", **pad)
tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=self._choose_dir).grid(row=1, column=5, **pad)
# Row 2: raw taps (always enabled — timestamped names generated at start)
self._raw_bw_on = tk.BooleanVar(value=True)
self._raw_s3_on = tk.BooleanVar(value=True)
tk.Checkbutton(cfg, text="Capture BW->S3 raw", variable=self._raw_bw_on,
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO).grid(row=2, column=0, columnspan=2, sticky="w", **pad)
tk.Checkbutton(cfg, text="Capture S3->BW raw", variable=self._raw_s3_on,
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO).grid(row=2, column=2, columnspan=2, sticky="w", **pad)
# Row 3: buttons + status
btn_row = tk.Frame(self, bg=BG2)
btn_row.pack(side=tk.TOP, fill=tk.X, padx=4, pady=2)
self.start_btn = tk.Button(btn_row, text="Start Bridge", bg=GREEN, fg="#000000",
relief="flat", padx=12, cursor="hand2", font=MONO_B,
command=self.start_bridge)
self.start_btn.pack(side=tk.LEFT, padx=6)
self.stop_btn = tk.Button(btn_row, text="Stop Bridge", bg=BG3, fg=FG,
relief="flat", padx=12, cursor="hand2", font=MONO,
command=self.stop_bridge, state="disabled")
self.stop_btn.pack(side=tk.LEFT, padx=4)
self.mark_btn = tk.Button(btn_row, text="Add Mark", bg=BG3, fg=FG,
relief="flat", padx=10, cursor="hand2", font=MONO,
command=self.add_mark, state="disabled")
self.mark_btn.pack(side=tk.LEFT, padx=4)
self.status_var = tk.StringVar(value="Idle")
tk.Label(btn_row, textvariable=self.status_var,
bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=10)
# Log output
self.log_view = scrolledtext.ScrolledText(
self, height=18, font=MONO_SM,
bg=BG, fg=FG, insertbackground=FG,
relief="flat", state="disabled",
)
self.log_view.pack(fill=tk.BOTH, expand=True, padx=4, pady=4)
# ── helpers ───────────────────────────────────────────────────────────
def _choose_dir(self) -> None:
path = filedialog.askdirectory(initialdir=self.logdir_var.get())
if path:
self.logdir_var.set(path)
def _append_log(self, text: str) -> None:
self.log_view.configure(state="normal")
self.log_view.insert(tk.END, text)
self.log_view.see(tk.END)
self.log_view.configure(state="disabled")
# ── bridge control ────────────────────────────────────────────────────
def start_bridge(self) -> None:
if self.process and self.process.poll() is None:
messagebox.showinfo("Bridge", "Bridge is already running.")
return
bw = self.bw_var.get().strip()
s3 = self.s3_var.get().strip()
baud = self.baud_var.get().strip()
logdir = self.logdir_var.get().strip() or "."
if not bw or not s3:
messagebox.showerror("Error", "Please enter both BW and S3 COM ports.")
return
os.makedirs(logdir, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
args = [sys.executable, str(BRIDGE_PATH),
"--bw", bw, "--s3", s3, "--baud", baud, "--logdir", logdir]
raw_bw_path = raw_s3_path = None
if self._raw_bw_on.get():
raw_bw_path = os.path.join(logdir, f"raw_bw_{ts}.bin")
args += ["--raw-bw", raw_bw_path]
if self._raw_s3_on.get():
raw_s3_path = os.path.join(logdir, f"raw_s3_{ts}.bin")
args += ["--raw-s3", raw_s3_path]
# Structured bin path — written by bridge automatically, named by ts
struct_bin_path = os.path.join(logdir, f"s3_session_{ts}.bin")
try:
self.process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE,
text=True,
bufsize=1,
)
except Exception as e:
messagebox.showerror("Error", f"Failed to start bridge:\n{e}")
return
threading.Thread(target=self._reader_thread, daemon=True).start()
self.status_var.set(f"Running — {bw} <-> {s3}")
self.start_btn.configure(state="disabled")
self.stop_btn.configure(state="normal", bg=RED)
self.mark_btn.configure(state="normal")
self._append_log(f"== Bridge started [{ts}] ==\n")
# Notify parent so Analyzer can wire up live mode
self._on_started(raw_bw_path, raw_s3_path, struct_bin_path)
def stop_bridge(self) -> None:
if self.process and self.process.poll() is None:
self.process.terminate()
try:
self.process.wait(timeout=3)
except subprocess.TimeoutExpired:
self.process.kill()
self._bridge_ended()
self._on_stopped()
def _bridge_ended(self) -> None:
self.status_var.set("Stopped")
self.start_btn.configure(state="normal")
self.stop_btn.configure(state="disabled", bg=BG3)
self.mark_btn.configure(state="disabled")
self._append_log("== Bridge stopped ==\n")
def _reader_thread(self) -> None:
if not self.process or not self.process.stdout:
return
for line in self.process.stdout:
self._stdout_q.put(line)
self._stdout_q.put("<<exit>>")
def _poll_stdout(self) -> None:
try:
while True:
line = self._stdout_q.get_nowait()
if line == "<<exit>>":
self._bridge_ended()
self._on_stopped()
break
self._append_log(line)
except queue.Empty:
pass
finally:
self.after(100, self._poll_stdout)
def add_mark(self) -> None:
if not self.process or not self.process.stdin or self.process.poll() is not None:
return
label = simpledialog.askstring("Mark", "Enter label for this mark:", parent=self)
if not label or not label.strip():
return
try:
self.process.stdin.write("m\n")
self.process.stdin.write(label.strip() + "\n")
self.process.stdin.flush()
self._append_log(f"[MARK] {label.strip()}\n")
except Exception as e:
messagebox.showerror("Error", f"Failed to send mark:\n{e}")
# ─────────────────────────────────────────────────────────────────────────────
# Analyzer panel (tk.Frame — lives inside a notebook tab)
# Extracted from gui_analyzer.py; accepts external path injection.
# ─────────────────────────────────────────────────────────────────────────────
class AnalyzerPanel(tk.Frame):
def __init__(self, parent: tk.Widget, db: FrameDB, **kw):
super().__init__(parent, bg=BG, **kw)
self._db = db
self.state = AnalyzerState()
self._live_thread: Optional[threading.Thread] = None
self._live_stop = threading.Event()
self._live_q: queue.Queue[str] = queue.Queue()
self._build()
self._poll_live_queue()
# ── external API (called by parent when bridge starts/stops) ──────────
def set_live_files(self, raw_bw: Optional[str], raw_s3: Optional[str],
struct_bin: Optional[str] = None) -> None:
"""Called when the bridge starts — inject file paths and start live mode."""
if raw_s3:
self.s3_var.set(raw_s3)
if raw_bw:
self.bw_var.set(raw_bw)
if struct_bin:
self.bin_var.set(struct_bin)
if raw_s3 and raw_bw:
self._start_live()
def stop_live(self) -> None:
"""Called when the bridge stops."""
if self._live_thread and self._live_thread.is_alive():
self._live_stop.set()
self._set_live_btn_off()
# ── build ─────────────────────────────────────────────────────────────
def _build(self) -> None:
self._build_toolbar()
self._build_panes()
self._build_statusbar()
def _build_toolbar(self) -> None:
bar = tk.Frame(self, bg=BG2, pady=4)
bar.pack(side=tk.TOP, fill=tk.X)
pad = {"padx": 5, "pady": 2}
# Row 1: raw files
row1 = tk.Frame(bar, bg=BG2)
row1.pack(side=tk.TOP, fill=tk.X)
tk.Label(row1, text="S3 raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self.s3_var = tk.StringVar()
tk.Entry(row1, textvariable=self.s3_var, width=30, bg=BG3, fg=FG,
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
tk.Button(row1, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=lambda: self._browse(self.s3_var, "raw_s3.bin")
).pack(side=tk.LEFT, **pad)
tk.Label(row1, text=" BW raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self.bw_var = tk.StringVar()
tk.Entry(row1, textvariable=self.bw_var, width=30, bg=BG3, fg=FG,
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
tk.Button(row1, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=lambda: self._browse(self.bw_var, "raw_bw.bin")
).pack(side=tk.LEFT, **pad)
# Row 2: structured bin (optional — enables mark-based session splitting)
row2 = tk.Frame(bar, bg=BG2)
row2.pack(side=tk.TOP, fill=tk.X)
tk.Label(row2, text="Session .bin:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self.bin_var = tk.StringVar()
tk.Entry(row2, textvariable=self.bin_var, width=46, bg=BG3, fg=FG,
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
tk.Button(row2, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=lambda: self._browse(self.bin_var, "s3_session.bin")
).pack(side=tk.LEFT, **pad)
tk.Label(row2, text="(optional — splits sessions at marks)", bg=BG2, fg=FG_DIM,
font=MONO_SM).pack(side=tk.LEFT, padx=6)
# Row 3: buttons
bar = tk.Frame(bar, bg=BG2)
bar.pack(side=tk.TOP, fill=tk.X)
tk.Frame(bar, bg=BG2, width=10).pack(side=tk.LEFT)
self.analyze_btn = tk.Button(bar, text="Analyze", bg=ACCENT, fg="#ffffff",
relief="flat", padx=10, cursor="hand2", font=MONO_B,
command=self._run_analyze)
self.analyze_btn.pack(side=tk.LEFT, **pad)
self.live_btn = tk.Button(bar, text="Live: OFF", bg=BG3, fg=FG,
relief="flat", padx=10, cursor="hand2",
font=MONO, command=self._toggle_live)
self.live_btn.pack(side=tk.LEFT, **pad)
self.export_btn = tk.Button(bar, text="Export for Claude", bg=ORANGE, fg="#000000",
relief="flat", padx=10, cursor="hand2", font=MONO_B,
command=self._run_export, state="disabled")
self.export_btn.pack(side=tk.LEFT, **pad)
self.status_var = tk.StringVar(value="Idle")
tk.Label(bar, textvariable=self.status_var, bg=BG2, fg=FG_DIM,
font=MONO, anchor="w").pack(side=tk.LEFT, padx=10)
def _build_panes(self) -> None:
pane = tk.PanedWindow(self, orient=tk.HORIZONTAL, bg=BG,
sashwidth=4, sashrelief="flat")
pane.pack(fill=tk.BOTH, expand=True)
# Left: session tree
left = tk.Frame(pane, bg=BG2, width=260)
pane.add(left, minsize=200)
tk.Label(left, text="Sessions", bg=BG2, fg=ACCENT,
font=MONO_B, anchor="w", padx=6).pack(fill=tk.X)
tree_frame = tk.Frame(left, bg=BG2)
tree_frame.pack(fill=tk.BOTH, expand=True)
style = ttk.Style()
style.theme_use("clam")
style.configure("Treeview", background=BG2, foreground=FG,
fieldbackground=BG2, font=MONO_SM, rowheight=18, borderwidth=0)
style.configure("Treeview.Heading", background=BG3, foreground=ACCENT, font=MONO_SM)
style.map("Treeview", background=[("selected", BG3)],
foreground=[("selected", "#ffffff")])
self.tree = ttk.Treeview(tree_frame, columns=("info",), show="tree headings",
selectmode="browse")
self.tree.heading("#0", text="Frame")
self.tree.heading("info", text="Info")
self.tree.column("#0", width=160, stretch=True)
self.tree.column("info", width=80, stretch=False)
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.tree.yview)
self.tree.configure(yscrollcommand=vsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
self.tree.pack(fill=tk.BOTH, expand=True)
self.tree.tag_configure("session", foreground=ACCENT, font=MONO_B)
self.tree.tag_configure("bw_frame", foreground=COL_BW)
self.tree.tag_configure("s3_frame", foreground=COL_S3)
self.tree.tag_configure("bad_chk", foreground=RED)
self.tree.tag_configure("malformed", foreground=RED)
self.tree.bind("<<TreeviewSelect>>", self._on_tree_select)
# Right: detail notebook
right = tk.Frame(pane, bg=BG)
pane.add(right, minsize=600)
style.configure("TNotebook", background=BG2, borderwidth=0)
style.configure("TNotebook.Tab", background=BG3, foreground=FG,
font=MONO, padding=[8, 2])
style.map("TNotebook.Tab", background=[("selected", BG)],
foreground=[("selected", ACCENT)])
self.nb = ttk.Notebook(right)
self.nb.pack(fill=tk.BOTH, expand=True)
self.inv_text = self._make_text_tab("Inventory")
self.hex_text = self._make_text_tab("Hex Dump")
self.diff_text = self._make_text_tab("Diff")
self.report_text = self._make_text_tab("Full Report")
self._build_query_tab()
for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text):
w.tag_configure("head", foreground=COL_HEAD, font=MONO_B)
w.tag_configure("bw", foreground=COL_BW)
w.tag_configure("s3", foreground=COL_S3)
w.tag_configure("changed", foreground=COL_DIFF)
w.tag_configure("known", foreground=COL_KNOW)
w.tag_configure("dim", foreground=FG_DIM)
w.tag_configure("normal", foreground=FG)
w.tag_configure("warn", foreground=YELLOW)
w.tag_configure("addr", foreground=ORANGE)
def _make_text_tab(self, title: str) -> tk.Text:
frame = tk.Frame(self.nb, bg=BG)
self.nb.add(frame, text=title)
w = tk.Text(frame, bg=BG, fg=FG, font=MONO, state="disabled",
relief="flat", wrap="none", insertbackground=FG,
selectbackground=BG3, selectforeground="#ffffff")
vsb = ttk.Scrollbar(frame, orient="vertical", command=w.yview)
hsb = ttk.Scrollbar(frame, orient="horizontal", command=w.xview)
w.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
hsb.pack(side=tk.BOTTOM, fill=tk.X)
w.pack(fill=tk.BOTH, expand=True)
return w
def _build_query_tab(self) -> None:
frame = tk.Frame(self.nb, bg=BG)
self.nb.add(frame, text="Query DB")
pad = {"padx": 4, "pady": 2}
filt = tk.Frame(frame, bg=BG2, pady=4)
filt.pack(side=tk.TOP, fill=tk.X)
tk.Label(filt, text="Capture:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=0, sticky="e", **pad)
self._q_capture_var = tk.StringVar(value="All")
self._q_capture_cb = ttk.Combobox(filt, textvariable=self._q_capture_var,
width=20, font=MONO_SM, state="readonly")
self._q_capture_cb.grid(row=0, column=1, sticky="w", **pad)
tk.Label(filt, text="Dir:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=2, sticky="e", **pad)
self._q_dir_var = tk.StringVar(value="All")
ttk.Combobox(filt, textvariable=self._q_dir_var, values=["All", "BW", "S3"],
width=6, font=MONO_SM, state="readonly").grid(row=0, column=3, sticky="w", **pad)
tk.Label(filt, text="SUB:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=4, sticky="e", **pad)
self._q_sub_var = tk.StringVar(value="All")
self._q_sub_cb = ttk.Combobox(filt, textvariable=self._q_sub_var,
width=12, font=MONO_SM, state="readonly")
self._q_sub_cb.grid(row=0, column=5, sticky="w", **pad)
tk.Label(filt, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=6, sticky="e", **pad)
self._q_offset_var = tk.StringVar()
tk.Entry(filt, textvariable=self._q_offset_var, width=8, bg=BG3, fg=FG,
font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=7, sticky="w", **pad)
tk.Label(filt, text="Value:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=8, sticky="e", **pad)
self._q_value_var = tk.StringVar()
tk.Entry(filt, textvariable=self._q_value_var, width=8, bg=BG3, fg=FG,
font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=9, sticky="w", **pad)
tk.Button(filt, text="Run Query", bg=ACCENT, fg="#ffffff", relief="flat",
padx=8, cursor="hand2", font=("Consolas", 8, "bold"),
command=self._run_db_query).grid(row=0, column=10, padx=8)
tk.Button(filt, text="Refresh", bg=BG3, fg=FG, relief="flat",
padx=6, cursor="hand2", font=MONO_SM,
command=self._refresh_query_dropdowns).grid(row=0, column=11, padx=4)
self._q_stats_var = tk.StringVar(value="DB: —")
tk.Label(filt, textvariable=self._q_stats_var, bg=BG2, fg=FG_DIM,
font=MONO_SM).grid(row=0, column=12, padx=12, sticky="w")
res_frame = tk.Frame(frame, bg=BG)
res_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
cols = ("cap", "sess", "dir", "sub", "sub_name", "page", "len", "chk")
self._q_tree = ttk.Treeview(res_frame, columns=cols, show="headings", selectmode="browse")
for cid, head, w in [
("cap", "Cap", 40), ("sess", "Sess", 40), ("dir", "Dir", 40),
("sub", "SUB", 50), ("sub_name", "Name", 160), ("page", "Page", 60),
("len", "Len", 50), ("chk", "Chk", 50),
]:
self._q_tree.heading(cid, text=head, anchor="w")
self._q_tree.column(cid, width=w, stretch=(cid == "sub_name"))
q_vsb = ttk.Scrollbar(res_frame, orient="vertical", command=self._q_tree.yview)
q_hsb = ttk.Scrollbar(res_frame, orient="horizontal", command=self._q_tree.xview)
self._q_tree.configure(yscrollcommand=q_vsb.set, xscrollcommand=q_hsb.set)
q_vsb.pack(side=tk.RIGHT, fill=tk.Y)
q_hsb.pack(side=tk.BOTTOM, fill=tk.X)
self._q_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self._q_tree.tag_configure("bw_row", foreground=COL_BW)
self._q_tree.tag_configure("s3_row", foreground=COL_S3)
self._q_tree.tag_configure("bad_row", foreground=RED)
self._q_tree.bind("<<TreeviewSelect>>", lambda _e: self._run_interpret())
interp_frame = tk.Frame(frame, bg=BG2, height=120)
interp_frame.pack(side=tk.BOTTOM, fill=tk.X)
interp_frame.pack_propagate(False)
tk.Label(interp_frame, text="Byte interpretation:", bg=BG2, fg=ACCENT,
font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X)
ii = tk.Frame(interp_frame, bg=BG2)
ii.pack(fill=tk.X, padx=6, pady=2)
tk.Label(ii, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).pack(side=tk.LEFT)
self._interp_offset_var = tk.StringVar(value="5")
tk.Entry(ii, textvariable=self._interp_offset_var, width=6, bg=BG3, fg=FG,
font=MONO_SM, insertbackground=FG, relief="flat").pack(side=tk.LEFT, padx=4)
tk.Button(ii, text="Interpret", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO_SM, command=self._run_interpret).pack(side=tk.LEFT, padx=4)
self._interp_text = tk.Text(interp_frame, bg=BG2, fg=FG, font=MONO_SM,
height=4, relief="flat", state="disabled",
insertbackground=FG)
self._interp_text.pack(fill=tk.X, padx=6, pady=2)
self._interp_text.tag_configure("label", foreground=FG_DIM)
self._interp_text.tag_configure("value", foreground=YELLOW)
self._q_rows: dict[str, object] = {}
self._q_capture_rows: list = [None]
self._q_sub_values: list = [None]
self._refresh_query_dropdowns()
def _build_statusbar(self) -> None:
bar = tk.Frame(self, bg=BG3, height=20)
bar.pack(side=tk.BOTTOM, fill=tk.X)
self.sb_var = tk.StringVar(value="Ready")
tk.Label(bar, textvariable=self.sb_var, bg=BG3, fg=FG_DIM,
font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X)
# ── file picking ──────────────────────────────────────────────────────
def _browse(self, var: tk.StringVar, default: str) -> None:
path = filedialog.askopenfilename(
title=f"Select {default}",
filetypes=[("Binary", "*.bin"), ("All files", "*.*")],
initialfile=default,
)
if path:
var.set(path)
# ── analysis ──────────────────────────────────────────────────────────
def _run_analyze(self) -> None:
s3p = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None
bwp = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None
if not s3p or not bwp:
messagebox.showerror("Missing files", "Select both S3 and BW raw files.")
return
if not s3p.exists():
messagebox.showerror("Not found", f"S3 file not found:\n{s3p}")
return
if not bwp.exists():
messagebox.showerror("Not found", f"BW file not found:\n{bwp}")
return
self.state.s3_path = s3p
self.state.bw_path = bwp
self._do_analyze(s3p, bwp)
def _browse_bin(self) -> None:
path = filedialog.askopenfilename(
title="Select session .bin file",
filetypes=[("Binary", "*.bin"), ("All files", "*.*")],
)
if path:
self.bin_var.set(path)
def _do_analyze(self, s3_path: Path, bw_path: Path) -> None:
self.status_var.set("Parsing...")
self.update_idletasks()
s3_blob = s3_path.read_bytes()
bw_blob = bw_path.read_bytes()
# Use mark-based session splitting if a structured .bin is provided
bin_str = self.bin_var.get().strip()
bin_path = Path(bin_str) if bin_str else None
marks = []
if bin_path and bin_path.exists():
marks = parse_structured_bin(bin_path.read_bytes())
if marks:
sessions = split_sessions_at_marks(bw_blob, s3_blob, marks)
mark_labels = " | ".join(m.label for m in marks)
self.sb_var.set(f"{len(marks)} user mark(s): {mark_labels}")
self.update_idletasks()
else:
if bin_path and bin_path.exists():
self.sb_var.set("No user marks found in session .bin — using standard session detection")
s3_frames = annotate_frames(parse_s3(s3_blob, trailer_len=0), "S3")
bw_frames = annotate_frames(parse_bw(bw_blob, trailer_len=0,
validate_checksum=True), "BW")
sessions = split_into_sessions(bw_frames, s3_frames)
diffs: list[Optional[list[FrameDiff]]] = [None]
for i in range(1, len(sessions)):
diffs.append(diff_sessions(sessions[i - 1], sessions[i]))
self.state.sessions = sessions
self.state.diffs = diffs
n_s3 = sum(len(s.s3_frames) for s in sessions)
n_bw = sum(len(s.bw_frames) for s in sessions)
self.status_var.set(f"{len(sessions)} sessions | BW:{n_bw} S3:{n_s3}")
self.sb_var.set(f"Loaded: {s3_path.name} + {bw_path.name}")
self.export_btn.configure(state="normal")
self._rebuild_tree()
try:
cap_id = self._db.ingest(sessions, s3_path, bw_path)
if cap_id is not None:
self.state.last_capture_id = cap_id
self._refresh_query_dropdowns()
for i, lbl in enumerate(self._q_capture_cb["values"]):
if lbl.startswith(f"#{cap_id} "):
self._q_capture_cb.current(i)
break
except Exception as exc:
self.sb_var.set(f"DB ingest error: {exc}")
def _run_export(self) -> None:
if not self.state.sessions:
messagebox.showinfo("Export", "Run Analyze first.")
return
outdir = self.state.s3_path.parent if self.state.s3_path else Path(".")
out_path = write_claude_export(self.state.sessions, self.state.diffs,
outdir, self.state.s3_path, self.state.bw_path)
self.sb_var.set(f"Exported: {out_path.name}")
if messagebox.askyesno("Export complete", f"Saved to:\n{out_path}\n\nOpen folder?"):
import subprocess as sp
sp.Popen(["explorer", str(out_path.parent)])
# ── session tree ──────────────────────────────────────────────────────
def _rebuild_tree(self) -> None:
self.tree.delete(*self.tree.get_children())
for sess in self.state.sessions:
label = f"Session {sess.index}" + ("" if sess.is_complete() else " [partial]")
n_diff = len(self.state.diffs[sess.index] or [])
diff_str = f"{n_diff} changes" if n_diff else ""
sid = self.tree.insert("", tk.END, text=label,
values=(diff_str,), tags=("session",))
for af in sess.all_frames:
src_tag = "bw_frame" if af.source == "BW" else "s3_frame"
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
lbl_text = f"[{af.source}] {sub_hex} {af.sub_name}"
extra = ""
tags = (src_tag,)
if af.frame.checksum_valid is False:
extra = "BAD CHK"; tags = ("bad_chk",)
elif af.header is None:
lbl_text = f"[{af.source}] MALFORMED"; tags = ("malformed",)
self.tree.insert(sid, tk.END, text=lbl_text, values=(extra,), tags=tags,
iid=f"frame_{sess.index}_{af.frame.index}_{af.source}")
for item in self.tree.get_children():
self.tree.item(item, open=True)
def _on_tree_select(self, _e: tk.Event) -> None:
sel = self.tree.selection()
if not sel:
return
iid = sel[0]
if iid.startswith("frame_"):
parts = iid.split("_")
self._show_frame_detail(int(parts[1]), int(parts[2]), parts[3])
else:
text = self.tree.item(iid, "text")
try:
self._show_session_detail(int(text.split()[1]))
except (IndexError, ValueError):
pass
def _find_frame_by_sub(self, sess_idx: int, sub: int, page_key: int) -> Optional[AnnotatedFrame]:
"""Find a frame in a session by (sub, page_key) — used for diff click-through."""
if sess_idx >= len(self.state.sessions):
return None
sess = self.state.sessions[sess_idx]
for af in sess.bw_frames + sess.s3_frames:
if af.header and af.header.sub == sub and af.header.page_key == page_key:
return af
return None
def _find_frame(self, sess_idx: int, frame_idx: int, source: str) -> Optional[AnnotatedFrame]:
if sess_idx >= len(self.state.sessions):
return None
pool = (self.state.sessions[sess_idx].bw_frames if source == "BW"
else self.state.sessions[sess_idx].s3_frames)
return next((af for af in pool if af.frame.index == frame_idx), None)
# ── detail renderers ──────────────────────────────────────────────────
def _clear_tabs(self) -> None:
for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text):
self._tc(w)
def _show_session_detail(self, idx: int) -> None:
if idx >= len(self.state.sessions):
return
sess = self.state.sessions[idx]
diffs = self.state.diffs[idx]
self._clear_tabs()
w = self.inv_text
self._tw(w, f"SESSION {sess.index}\n", "head")
self._tw(w, f"Frames: {len(sess.bw_frames)+len(sess.s3_frames)}"
f" (BW:{len(sess.bw_frames)} S3:{len(sess.s3_frames)})\n", "normal")
if len(sess.bw_frames) != len(sess.s3_frames):
self._tw(w, " WARNING: BW/S3 count mismatch\n", "warn")
self._tn(w)
for i, af in enumerate(sess.all_frames):
src = "bw" if af.source == "BW" else "s3"
sh = f"{af.header.sub:02X}" if af.header else "??"
pg = f" (page {af.header.page_key:04X})" if af.header and af.header.page_key else ""
chk = ""
if af.frame.checksum_valid is False: chk = " [BAD CHECKSUM]"
elif af.frame.checksum_valid is True: chk = f" [{af.frame.checksum_type}]"
self._tw(w, f" [{af.source}] #{i:<3} ", src)
self._tw(w, f"SUB={sh} ", "addr")
self._tw(w, f"{af.sub_name:<30}{pg} len={len(af.frame.payload)}", "dim")
if chk: self._tw(w, chk, "warn" if af.frame.checksum_valid is False else "dim")
self._tn(w)
w = self.diff_text
self._tc(w)
if diffs is None:
self._tw(w, "(No previous session to diff against)\n", "dim")
elif not diffs:
self._tw(w, f"DIFF vs SESSION {idx-1}\n", "head")
self._tw(w, " No changes detected.\n", "dim")
else:
self._tw(w, f"DIFF vs SESSION {idx-1}\n", "head")
self._tw(w, " (click any SUB header to open its hex dump)\n", "dim")
for fd in diffs:
pg = f" (page {fd.page_key:04X})" if fd.page_key else ""
link_tag = f"difflink_{fd.sub}_{fd.page_key}"
w.configure(state="normal")
start = w.index(tk.INSERT)
w.insert(tk.END, f"\n SUB {fd.sub:02X} ({fd.sub_name}){pg}:\n")
end = w.index(tk.INSERT)
w.tag_add("addr", start, end)
w.tag_add(link_tag, start, end)
w.tag_configure(link_tag, underline=True)
# capture fd values for the closure
def _make_handler(s=idx, sub=fd.sub, pk=fd.page_key):
def _handler(event):
af = self._find_frame_by_sub(s, sub, pk)
if af:
self._show_frame_detail(s, af.frame.index, af.source)
return _handler
w.tag_bind(link_tag, "<Button-1>", _make_handler())
w.tag_bind(link_tag, "<Enter>", lambda e, t=link_tag: w.configure(cursor="hand2"))
w.tag_bind(link_tag, "<Leave>", lambda e: w.configure(cursor=""))
w.configure(state="disabled")
for bd in fd.diffs:
def _fmt(v: int) -> str:
if v == -2: return "ADD"
if v == -1: return "--"
return f"{v:02x}"
b, a = _fmt(bd.before), _fmt(bd.after)
# A4 inner-frame add/remove: field_name carries the full description
if bd.field_name and (bd.before == -2 or bd.after == -2 or bd.before == -1 or bd.after == -1) and bd.field_name.startswith("[A4"):
self._tw(w, f" {b} -> {a} ", "changed")
self._tw(w, bd.field_name, "known")
self._tn(w)
else:
self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim")
self._tw(w, f"{b} -> {a}", "changed")
if bd.field_name: self._tw(w, f" [{bd.field_name}]", "known")
self._tn(w)
report = render_session_report(sess, diffs, idx - 1 if idx > 0 else None)
self._tc(self.report_text)
self._tw(self.report_text, report, "normal")
self.nb.select(0)
def _show_frame_detail(self, sess_idx: int, frame_idx: int, source: str) -> None:
af = self._find_frame(sess_idx, frame_idx, source)
if af is None:
return
self._clear_tabs()
src = "bw" if source == "BW" else "s3"
sh = f"{af.header.sub:02X}" if af.header else "??"
w = self.inv_text
self._tw(w, f"[{af.source}] Frame #{af.frame.index}\n", src)
self._tw(w, f"Session {sess_idx} | ", "dim")
self._tw(w, f"SUB={sh} {af.sub_name}\n", "addr")
if af.header:
self._tw(w, f" OFFSET: {af.header.page_key:04X} "
f"CMD={af.header.cmd:02X} FLAGS={af.header.flags:02X}\n", "dim")
self._tn(w)
self._tw(w, f"Payload: {len(af.frame.payload)} bytes\n", "dim")
if af.frame.checksum_valid is False: self._tw(w, " BAD CHECKSUM\n", "warn")
elif af.frame.checksum_valid is True:
self._tw(w, f" Checksum: {af.frame.checksum_type} {af.frame.checksum_hex}\n", "dim")
self._tn(w)
p = af.frame.payload
if len(p) >= 5:
self._tw(w, "Header breakdown:\n", "head")
self._tw(w, f" [0] CMD = {p[0]:02x}\n", "dim")
self._tw(w, f" [1] ? = {p[1]:02x}\n", "dim")
self._tw(w, f" [2] SUB = {p[2]:02x} ({af.sub_name})\n", src)
self._tw(w, f" [3] OFFSET_HI = {p[3]:02x}\n", "dim")
self._tw(w, f" [4] OFFSET_LO = {p[4]:02x}\n", "dim")
if len(p) > 5: self._tw(w, f" [5..] data = {len(p)-5} bytes\n", "dim")
w = self.hex_text
self._tw(w, f"[{af.source}] SUB={sh} {af.sub_name}\n", src)
self._tw(w, f"Payload ({len(af.frame.payload)} bytes):\n", "dim")
self._tn(w)
self._tw(w, "\n".join(format_hex_dump(af.frame.payload, indent=" ")) + "\n", "normal")
diffs_for_sess = self.state.diffs[sess_idx] if sess_idx < len(self.state.diffs) else None
if diffs_for_sess and af.header:
matching = [fd for fd in diffs_for_sess
if fd.sub == af.header.sub and fd.page_key == af.header.page_key]
if matching:
self._tn(w)
self._tw(w, "Changed bytes (vs prev session):\n", "head")
for bd in matching[0].diffs:
b = f"{bd.before:02x}" if bd.before >= 0 else "--"
a = f"{bd.after:02x}" if bd.after >= 0 else "--"
self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim")
self._tw(w, f"{b} -> {a}", "changed")
if bd.field_name: self._tw(w, f" [{bd.field_name}]", "known")
self._tn(w)
self.nb.select(1)
# ── live mode ─────────────────────────────────────────────────────────
def _toggle_live(self) -> None:
if self._live_thread and self._live_thread.is_alive():
self._live_stop.set()
self._set_live_btn_off()
else:
s3p = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None
bwp = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None
if not s3p or not bwp:
messagebox.showerror("Missing files", "Select both raw files before starting live mode.")
return
self.state.s3_path = s3p
self.state.bw_path = bwp
self._start_live()
def _start_live(self) -> None:
s3p = Path(self.s3_var.get().strip())
bwp = Path(self.bw_var.get().strip())
self.state.s3_path = s3p
self.state.bw_path = bwp
self._live_stop.clear()
self._live_thread = threading.Thread(
target=self._live_worker, args=(s3p, bwp), daemon=True)
self._live_thread.start()
self.live_btn.configure(text="Live: ON", bg=GREEN, fg="#000000")
self.status_var.set("Live mode running...")
def _set_live_btn_off(self) -> None:
self.live_btn.configure(text="Live: OFF", bg=BG3, fg=FG)
self.status_var.set("Live stopped")
def _live_worker(self, s3_path: Path, bw_path: Path) -> None:
import time
s3_pos = bw_pos = 0
while not self._live_stop.is_set():
changed = False
for path, pos_attr in ((s3_path, "s3"), (bw_path, "bw")):
if path.exists():
with path.open("rb") as fh:
fh.seek(s3_pos if pos_attr == "s3" else bw_pos)
nb = fh.read()
if nb:
if pos_attr == "s3": s3_pos += len(nb)
else: bw_pos += len(nb)
changed = True
if changed:
self._live_q.put("refresh")
time.sleep(0.1)
def _poll_live_queue(self) -> None:
try:
while True:
msg = self._live_q.get_nowait()
if msg == "refresh" and self.state.s3_path and self.state.bw_path:
self._do_analyze(self.state.s3_path, self.state.bw_path)
except queue.Empty:
pass
finally:
self.after(150, self._poll_live_queue)
# ── DB query ──────────────────────────────────────────────────────────
def _refresh_query_dropdowns(self) -> None:
try:
captures = self._db.list_captures()
self._q_capture_cb["values"] = ["All"] + [
f"#{r['id']} {r['timestamp'][:16]} ({r['frame_count']} frames)"
for r in captures
]
self._q_capture_rows = [None] + [r["id"] for r in captures]
subs = self._db.get_distinct_subs()
self._q_sub_cb["values"] = ["All"] + [f"0x{s:02X}" for s in subs]
self._q_sub_values = [None] + subs
stats = self._db.get_stats()
self._q_stats_var.set(f"DB: {stats['captures']} captures | {stats['frames']} frames")
except Exception as exc:
self._q_stats_var.set(f"DB error: {exc}")
def _parse_int(self, s: str) -> Optional[int]:
s = s.strip()
if not s: return None
try: return int(s, 0)
except ValueError: return None
def _run_db_query(self) -> None:
cap_idx = self._q_capture_cb.current()
cap_id = self._q_capture_rows[cap_idx] if cap_idx > 0 else None
dir_val = self._q_dir_var.get()
direction = dir_val if dir_val != "All" else None
sub_idx = self._q_sub_cb.current()
sub = self._q_sub_values[sub_idx] if sub_idx > 0 else None
offset = self._parse_int(self._q_offset_var.get())
value = self._parse_int(self._q_value_var.get())
try:
if offset is not None:
rows = self._db.query_by_byte(offset=offset, value=value,
capture_id=cap_id, direction=direction, sub=sub)
else:
rows = self._db.query_frames(capture_id=cap_id, direction=direction, sub=sub)
except Exception as exc:
messagebox.showerror("Query error", str(exc))
return
self._q_tree.delete(*self._q_tree.get_children())
self._q_rows.clear()
for row in rows:
sh = f"0x{row['sub']:02X}" if row["sub"] is not None else ""
pg = f"0x{row['page_key']:04X}" if row["page_key"] is not None else ""
chk = {1: "OK", 0: "BAD", None: ""}.get(row["checksum_ok"], "")
tag = "bw_row" if row["direction"] == "BW" else "s3_row"
if row["checksum_ok"] == 0: tag = "bad_row"
iid = str(row["id"])
self._q_tree.insert("", tk.END, iid=iid, tags=(tag,), values=(
row["capture_id"], row["session_idx"], row["direction"],
sh, row["sub_name"] or "", pg, row["payload_len"], chk))
self._q_rows[iid] = row
self.sb_var.set(f"Query returned {len(rows)} rows")
def _run_interpret(self) -> None:
sel = self._q_tree.selection()
if not sel: return
row = self._q_rows.get(sel[0])
if not row: return
offset = self._parse_int(self._interp_offset_var.get())
if offset is None: return
payload = bytes(row["payload"])
interp = self._db.interpret_offset(payload, offset)
w = self._interp_text
w.configure(state="normal")
w.delete("1.0", tk.END)
sh = f"0x{row['sub']:02X}" if row["sub"] is not None else "??"
w.insert(tk.END,
f"Frame #{row['id']} [{row['direction']}] SUB={sh} "
f"offset={offset} (0x{offset:04X})\n", "label")
line = ""
for key, lbl in [
("uint8","uint8 "), ("int8","int8 "),
("uint16_be","uint16 BE "), ("uint16_le","uint16 LE "),
("uint32_be","uint32 BE "), ("uint32_le","uint32 LE "),
("float32_be","float32 BE "), ("float32_le","float32 LE "),
]:
if key not in interp: continue
val = interp[key]
vs = f"{val:.6g}" if isinstance(val, float) else (
f"{val} (0x{int(val)&0xFFFFFFFF:X})")
line += f" {lbl}: {vs:<28}"
if len(line) > 80:
w.insert(tk.END, line + "\n", "value"); line = ""
if line: w.insert(tk.END, line + "\n", "value")
w.configure(state="disabled")
# ── text helpers ──────────────────────────────────────────────────────
def _tc(self, w: tk.Text) -> None:
w.configure(state="normal"); w.delete("1.0", tk.END)
def _tw(self, w: tk.Text, text: str, tag: str = "normal") -> None:
w.configure(state="normal"); w.insert(tk.END, text, tag)
def _tn(self, w: tk.Text) -> None:
w.configure(state="normal"); w.insert(tk.END, "\n"); w.configure(state="disabled")
# ─────────────────────────────────────────────────────────────────────────────
# Console panel (tk.Frame — lives inside a notebook tab)
# ─────────────────────────────────────────────────────────────────────────────
class ConsolePanel(tk.Frame):
"""
Direct device connection and diagnostic console.
Lets you run individual protocol commands against a MiniMate Plus via
serial or TCP, showing colour-coded TX/RX bytes and decoded output in a
scrolling console.
Colour scheme:
TX frames — ACCENT blue (#569cd6)
RX raw hex — teal (#4ec9b0)
Parsed/decoded — green (#4caf50)
Errors — red (#f44747)
Status/info — dim grey (#6a6a6a)
Section heads — yellow (#dcdcaa)
Log is auto-saved on "Save Log"; "Send to Analyzer" writes the captured
RX bytes as a raw .bin file and injects the path into the Analyzer tab.
"""
TAG_TX = "tx"
TAG_RX_RAW = "rx_raw"
TAG_PARSED = "parsed"
TAG_ERROR = "error"
TAG_STATUS = "status"
TAG_HEAD = "head"
MAX_LINES = 5000
def __init__(self, parent: tk.Widget, on_send_to_analyzer=None, **kw):
super().__init__(parent, bg=BG2, **kw)
self._on_send_to_analyzer = on_send_to_analyzer
self._q: queue.Queue = queue.Queue()
self._running = False
self._log_lines: list[str] = []
self._last_raw_rx: Optional[bytes] = None
self._cmd_btns: list[tk.Button] = []
self._build()
self._poll_q()
# ── build ─────────────────────────────────────────────────────────────
def _build(self) -> None:
pad = {"padx": 5, "pady": 3}
# ── top config row ────────────────────────────────────────────────
cfg = tk.Frame(self, bg=BG2)
cfg.pack(side=tk.TOP, fill=tk.X, padx=6, pady=4)
# Transport radio buttons
self._transport_var = tk.StringVar(value="tcp")
tk.Radiobutton(
cfg, text="TCP", variable=self._transport_var, value="tcp",
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO, command=self._on_transport_change,
).grid(row=0, column=0, padx=(0, 4))
tk.Radiobutton(
cfg, text="Serial", variable=self._transport_var, value="serial",
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO, command=self._on_transport_change,
).grid(row=0, column=1, padx=(0, 12))
# TCP fields
self._tcp_frame = tk.Frame(cfg, bg=BG2)
self._tcp_frame.grid(row=0, column=2, sticky="w")
tk.Label(self._tcp_frame, text="Host:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self._host_var = tk.StringVar(value="127.0.0.1")
tk.Entry(
self._tcp_frame, textvariable=self._host_var, width=18,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).pack(side=tk.LEFT, padx=2)
tk.Label(self._tcp_frame, text="Port:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, padx=(10, 4))
self._tcp_port_var = tk.StringVar(value="9034")
tk.Entry(
self._tcp_frame, textvariable=self._tcp_port_var, width=6,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).pack(side=tk.LEFT, padx=2)
# Serial fields (hidden by default)
self._serial_frame = tk.Frame(cfg, bg=BG2)
tk.Label(self._serial_frame, text="Port:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self._port_var = tk.StringVar(value="COM5")
tk.Entry(
self._serial_frame, textvariable=self._port_var, width=10,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).pack(side=tk.LEFT, padx=2)
tk.Label(self._serial_frame, text="Baud:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, padx=(10, 4))
self._baud_var = tk.StringVar(value="38400")
tk.Entry(
self._serial_frame, textvariable=self._baud_var, width=8,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).pack(side=tk.LEFT, padx=2)
# Timeout
tk.Label(cfg, text="Timeout:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=3, padx=(18, 4))
self._timeout_var = tk.StringVar(value="30")
tk.Entry(
cfg, textvariable=self._timeout_var, width=5,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).grid(row=0, column=4, padx=2)
tk.Label(cfg, text="s", bg=BG2, fg=FG_DIM, font=MONO).grid(row=0, column=5)
# ── command buttons row ───────────────────────────────────────────
cmd_row = tk.Frame(self, bg=BG2)
cmd_row.pack(side=tk.TOP, fill=tk.X, padx=6, pady=(0, 4))
tk.Label(cmd_row, text="Commands:", bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=(0, 10))
for label, cmd in [
("POLL", "poll"),
("Serial #", "serial_number"),
("Full Config", "full_config"),
("Event Index", "event_index"),
]:
btn = tk.Button(
cmd_row, text=label, bg=ACCENT, fg="#ffffff",
relief="flat", padx=10, cursor="hand2", font=MONO,
command=lambda c=cmd: self._run_command(c),
)
btn.pack(side=tk.LEFT, padx=4)
self._cmd_btns.append(btn)
self._status_var = tk.StringVar(value="Ready")
tk.Label(cmd_row, textvariable=self._status_var,
bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=14)
# ── console output ────────────────────────────────────────────────
self._console = scrolledtext.ScrolledText(
self, height=20, font=MONO_SM,
bg=BG, fg=FG, insertbackground=FG,
relief="flat", state="disabled",
)
self._console.pack(fill=tk.BOTH, expand=True, padx=6, pady=4)
self._console.tag_configure(self.TAG_TX, foreground=ACCENT)
self._console.tag_configure(self.TAG_RX_RAW, foreground=COL_S3)
self._console.tag_configure(self.TAG_PARSED, foreground=GREEN)
self._console.tag_configure(self.TAG_ERROR, foreground=RED)
self._console.tag_configure(self.TAG_STATUS, foreground=FG_DIM)
self._console.tag_configure(self.TAG_HEAD, foreground=YELLOW, font=MONO_B)
# ── bottom bar ────────────────────────────────────────────────────
bot = tk.Frame(self, bg=BG2)
bot.pack(side=tk.BOTTOM, fill=tk.X, padx=6, pady=4)
tk.Button(
bot, text="Clear", bg=BG3, fg=FG, relief="flat",
padx=10, cursor="hand2", font=MONO,
command=self._clear_console,
).pack(side=tk.LEFT, padx=4)
tk.Button(
bot, text="Save Log", bg=BG3, fg=FG, relief="flat",
padx=10, cursor="hand2", font=MONO,
command=self._save_log,
).pack(side=tk.LEFT, padx=4)
self._send_btn = tk.Button(
bot, text="Send to Analyzer", bg=BG3, fg=FG_DIM, relief="flat",
padx=10, cursor="hand2", font=MONO,
command=self._send_to_analyzer, state="disabled",
)
self._send_btn.pack(side=tk.LEFT, padx=4)
# ── transport toggle ──────────────────────────────────────────────────
def _on_transport_change(self) -> None:
if self._transport_var.get() == "tcp":
self._serial_frame.grid_remove()
self._tcp_frame.grid(row=0, column=2, sticky="w")
else:
self._tcp_frame.grid_remove()
self._serial_frame.grid(row=0, column=2, sticky="w")
# ── console helpers ───────────────────────────────────────────────────
def _append(self, text: str, tag: str = "status") -> None:
"""Append coloured text (main thread only — called via _poll_q)."""
self._log_lines.append(text)
self._console.configure(state="normal")
self._console.insert(tk.END, text, tag)
line_count = int(self._console.index("end-1c").split(".")[0])
if line_count > self.MAX_LINES:
self._console.delete("1.0", f"{line_count - self.MAX_LINES}.0")
self._console.see(tk.END)
self._console.configure(state="disabled")
def _clear_console(self) -> None:
self._console.configure(state="normal")
self._console.delete("1.0", tk.END)
self._console.configure(state="disabled")
self._log_lines.clear()
def _save_log(self) -> None:
cap_dir = SCRIPT_DIR / "bridges" / "captures"
cap_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
path = cap_dir / f"console_{ts}.log"
try:
path.write_text("".join(self._log_lines), encoding="utf-8")
self._q.put(("status", f"Log saved → {path.name}"))
except Exception as exc:
messagebox.showerror("Save Error", str(exc))
def _send_to_analyzer(self) -> None:
if not self._last_raw_rx or not self._on_send_to_analyzer:
return
cap_dir = SCRIPT_DIR / "bridges" / "captures"
cap_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
raw_path = cap_dir / f"console_s3_{ts}.bin"
try:
raw_path.write_bytes(self._last_raw_rx)
self._on_send_to_analyzer(str(raw_path))
self._q.put(("status", f"Sent to Analyzer → {raw_path.name}"))
except Exception as exc:
messagebox.showerror("Error", str(exc))
# ── command dispatch ──────────────────────────────────────────────────
def _set_buttons_state(self, state: str) -> None:
for btn in self._cmd_btns:
btn.configure(state=state)
def _run_command(self, cmd: str) -> None:
if self._running:
return
# Snapshot config in main thread before handing off to worker
config = {
"transport": self._transport_var.get(),
"host": self._host_var.get().strip(),
"tcp_port": int(self._tcp_port_var.get().strip() or "9034"),
"port": self._port_var.get().strip(),
"baud": int(self._baud_var.get().strip() or "38400"),
"timeout": float(self._timeout_var.get().strip() or "30"),
"cmd": cmd,
}
self._running = True
self._set_buttons_state("disabled")
self._status_var.set("Running…")
threading.Thread(target=self._worker, args=(config,), daemon=True).start()
# ── worker thread ─────────────────────────────────────────────────────
def _worker(self, cfg: dict) -> None:
"""Background thread — open transport, run command, post results to queue."""
q = self._q
def post(kind: str, text: str) -> None:
q.put((kind, text))
try:
from minimateplus.transport import SerialTransport, TcpTransport
from minimateplus.protocol import (
MiniMateProtocol,
SUB_SERIAL_NUMBER,
SUB_FULL_CONFIG,
SUB_EVENT_INDEX,
)
except ImportError as exc:
post("error", f"Import error: {exc}\nIs minimateplus installed?\n")
q.put(("done", None))
return
timeout = cfg["timeout"]
cmd = cfg["cmd"]
# Build transport
if cfg["transport"] == "tcp":
host = cfg["host"]
tcp_port = cfg["tcp_port"]
post("status", f"Connecting {host}:{tcp_port}")
transport = TcpTransport(host, tcp_port, connect_timeout=timeout)
else:
port = cfg["port"]
baud = cfg["baud"]
post("status", f"Opening {port} @ {baud} baud…")
transport = SerialTransport(port, baud)
# Wrap transport to capture every TX/RX byte
raw_rx = bytearray()
orig_write = transport.write
orig_read = transport.read
def logged_write(data: bytes) -> None:
post("tx", f"TX [{len(data):3d}B]: {data.hex()}\n")
orig_write(data)
def logged_read(n: int) -> bytes:
result = orig_read(n)
if result:
raw_rx.extend(result)
post("rx_raw", f"RX [{len(result):3d}B]: {result.hex()}\n")
return result
transport.write = logged_write # type: ignore[method-assign]
transport.read = logged_read # type: ignore[method-assign]
try:
with transport:
post("status", "Connected.")
proto = MiniMateProtocol(transport, recv_timeout=timeout)
if cmd == "poll":
post("head", "\n── POLL startup ─────────────────────────────\n")
frame = proto.startup()
post("parsed", f" payload ({len(frame.data)} B): {frame.data.hex()}\n")
try:
text = frame.data.decode("ascii", errors="replace")
post("parsed", f" text: {text!r}\n")
except Exception:
pass
elif cmd == "serial_number":
post("head", "\n── POLL startup ─────────────────────────────\n")
proto.startup()
post("head", "\n── Serial Number (SUB 0x15) ─────────────────\n")
data = proto.read(SUB_SERIAL_NUMBER)
post("parsed", f" raw ({len(data)} B): {data.hex()}\n")
sn = data.rstrip(b"\x00").decode("ascii", errors="replace").strip()
post("parsed", f" serial: {sn!r}\n")
elif cmd == "full_config":
post("head", "\n── POLL startup ─────────────────────────────\n")
proto.startup()
post("head", "\n── Full Config (SUB 0x01) ───────────────────\n")
data = proto.read(SUB_FULL_CONFIG)
post("parsed", f" raw ({len(data)} B):\n")
for i in range(0, len(data), 16):
chunk = data[i:i + 16]
hex_part = " ".join(f"{b:02X}" for b in chunk)
asc_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
post("parsed", f" {i:04X}: {hex_part:<48} {asc_part}\n")
elif cmd == "event_index":
post("head", "\n── POLL startup ─────────────────────────────\n")
proto.startup()
post("head", "\n── Event Index (SUB 0x08) ───────────────────\n")
data = proto.read(SUB_EVENT_INDEX)
post("parsed", f" raw ({len(data)} B):\n")
for i in range(0, len(data), 16):
chunk = data[i:i + 16]
hex_part = " ".join(f"{b:02X}" for b in chunk)
asc_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
post("parsed", f" {i:04X}: {hex_part:<48} {asc_part}\n")
post("status", "Done.")
q.put(("save_raw", bytes(raw_rx)))
except Exception as exc:
post("error", f"\nError: {exc}\n")
finally:
q.put(("done", None))
# ── queue poll ────────────────────────────────────────────────────────
def _poll_q(self) -> None:
try:
while True:
kind, payload = self._q.get_nowait()
if kind == "tx":
self._append(payload, self.TAG_TX)
elif kind == "rx_raw":
self._append(payload, self.TAG_RX_RAW)
elif kind == "parsed":
self._append(payload, self.TAG_PARSED)
elif kind == "error":
self._append(payload, self.TAG_ERROR)
elif kind == "head":
self._append(payload, self.TAG_HEAD)
elif kind == "status":
self._status_var.set(str(payload))
self._append(f" [{payload}]\n", self.TAG_STATUS)
elif kind == "save_raw":
self._last_raw_rx = payload
if payload:
self._send_btn.configure(state="normal", fg=FG)
elif kind == "done":
self._running = False
self._set_buttons_state("normal")
self._status_var.set("Ready")
except queue.Empty:
pass
finally:
self.after(100, self._poll_q)
# ─────────────────────────────────────────────────────────────────────────────
# Main application window
# ─────────────────────────────────────────────────────────────────────────────
class SeismoLab(tk.Tk):
def __init__(self) -> None:
super().__init__()
self.title("Seismo Lab")
self.configure(bg=BG)
self.minsize(1100, 680)
self._db = FrameDB()
style = ttk.Style()
style.theme_use("clam")
style.configure("Top.TNotebook", background=BG3, borderwidth=0, tabposition="nw")
style.configure("Top.TNotebook.Tab", background=BG3, foreground=FG,
font=("Consolas", 10, "bold"), padding=[16, 6])
style.map("Top.TNotebook.Tab",
background=[("selected", ACCENT)],
foreground=[("selected", "#ffffff")])
nb = ttk.Notebook(self, style="Top.TNotebook")
nb.pack(fill=tk.BOTH, expand=True)
self._bridge_panel = BridgePanel(
nb,
on_bridge_started=self._on_bridge_started,
on_bridge_stopped=self._on_bridge_stopped,
)
nb.add(self._bridge_panel, text=" Bridge ")
self._analyzer_panel = AnalyzerPanel(nb, db=self._db)
nb.add(self._analyzer_panel, text=" Analyzer ")
self._console_panel = ConsolePanel(
nb,
on_send_to_analyzer=self._on_console_send_to_analyzer,
)
nb.add(self._console_panel, text=" Console ")
self._nb = nb
self.protocol("WM_DELETE_WINDOW", self._on_close)
def _on_bridge_started(self, raw_bw: Optional[str], raw_s3: Optional[str],
struct_bin: Optional[str] = None) -> None:
"""Bridge started — inject paths into analyzer and start live mode."""
self._analyzer_panel.set_live_files(raw_bw, raw_s3, struct_bin)
# Switch to Analyzer tab so the user can watch it update
self._nb.select(1)
def _on_bridge_stopped(self) -> None:
self._analyzer_panel.stop_live()
def _on_console_send_to_analyzer(self, raw_s3_path: str) -> None:
"""Console captured bytes → inject into Analyzer S3 field and switch tab."""
self._analyzer_panel.s3_var.set(raw_s3_path)
self._nb.select(1)
def _on_close(self) -> None:
self._bridge_panel.stop_bridge()
self.destroy()
# ─────────────────────────────────────────────────────────────────────────────
def main() -> None:
app = SeismoLab()
app.mainloop()
if __name__ == "__main__":
main()