Files
seismo-relay/seismo_lab.py

1124 lines
52 KiB
Python

#!/usr/bin/env python3
"""
seismo_lab.py — Combined S3 Bridge + Protocol Analyzer GUI.
Single window with two top-level tabs:
Bridge — capture live serial traffic (wraps s3_bridge.py as subprocess)
Analyzer — parse, diff, and query captured frames
When the bridge starts:
- raw tap paths are auto-filled in the Analyzer tab
- Live mode is automatically enabled so the Analyzer updates in real time
Run from anywhere:
python seismo_lab.py
"""
from __future__ import annotations
import datetime
import os
import queue
import subprocess
import sys
import threading
import tkinter as tk
from pathlib import Path
from tkinter import filedialog, messagebox, scrolledtext, simpledialog, ttk
from typing import Optional
# ── path setup ────────────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).parent
BRIDGE_PATH = SCRIPT_DIR / "bridges" / "s3-bridge" / "s3_bridge.py"
PARSERS_DIR = SCRIPT_DIR / "parsers"
sys.path.insert(0, str(PARSERS_DIR))
from s3_analyzer import ( # noqa: E402
AnnotatedFrame,
FrameDiff,
Session,
annotate_frames,
diff_sessions,
format_hex_dump,
parse_bw,
parse_s3,
parse_structured_bin,
render_session_report,
split_into_sessions,
split_sessions_at_marks,
write_claude_export,
)
from frame_db import FrameDB # noqa: E402
# ── colour palette ────────────────────────────────────────────────────────────
BG = "#1e1e1e"
BG2 = "#252526"
BG3 = "#2d2d30"
FG = "#d4d4d4"
FG_DIM = "#6a6a6a"
ACCENT = "#569cd6"
RED = "#f44747"
YELLOW = "#dcdcaa"
GREEN = "#4caf50"
ORANGE = "#ce9178"
COL_BW = "#9cdcfe"
COL_S3 = "#4ec9b0"
COL_DIFF = "#f44747"
COL_KNOW = "#4caf50"
COL_HEAD = "#569cd6"
MONO = ("Consolas", 9)
MONO_SM = ("Consolas", 8)
MONO_B = ("Consolas", 9, "bold")
# ─────────────────────────────────────────────────────────────────────────────
# Shared state
# ─────────────────────────────────────────────────────────────────────────────
class AnalyzerState:
def __init__(self) -> None:
self.sessions: list[Session] = []
self.diffs: list[Optional[list[FrameDiff]]] = []
self.s3_path: Optional[Path] = None
self.bw_path: Optional[Path] = None
self.last_capture_id: Optional[int] = None
# ─────────────────────────────────────────────────────────────────────────────
# Bridge panel (tk.Frame — lives inside a notebook tab)
# ─────────────────────────────────────────────────────────────────────────────
class BridgePanel(tk.Frame):
"""
All bridge controls and live log output.
Calls on_bridge_started(raw_bw_path, raw_s3_path) when the bridge starts
so the parent can wire up the Analyzer.
"""
def __init__(self, parent: tk.Widget, on_bridge_started, on_bridge_stopped, **kw):
super().__init__(parent, bg=BG2, **kw)
self._on_started = on_bridge_started # signature: (raw_bw, raw_s3, struct_bin)
self._on_stopped = on_bridge_stopped
self.process: Optional[subprocess.Popen] = None
self._stdout_q: queue.Queue[str] = queue.Queue()
self._build()
self._poll_stdout()
# ── build ─────────────────────────────────────────────────────────────
def _build(self) -> None:
pad = {"padx": 6, "pady": 4}
cfg = tk.Frame(self, bg=BG2)
cfg.pack(side=tk.TOP, fill=tk.X, padx=4, pady=4)
# Row 0: ports
tk.Label(cfg, text="BW COM:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=0, sticky="e", **pad)
self.bw_var = tk.StringVar(value="COM4")
tk.Entry(cfg, textvariable=self.bw_var, width=10,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=0, column=1, sticky="w", **pad)
tk.Label(cfg, text="S3 COM:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=2, sticky="e", **pad)
self.s3_var = tk.StringVar(value="COM5")
tk.Entry(cfg, textvariable=self.s3_var, width=10,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=0, column=3, sticky="w", **pad)
tk.Label(cfg, text="Baud:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=4, sticky="e", **pad)
self.baud_var = tk.StringVar(value="38400")
tk.Entry(cfg, textvariable=self.baud_var, width=8,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=0, column=5, sticky="w", **pad)
# Row 1: log dir
tk.Label(cfg, text="Log dir:", bg=BG2, fg=FG, font=MONO).grid(row=1, column=0, sticky="e", **pad)
self.logdir_var = tk.StringVar(value=str(SCRIPT_DIR / "bridges" / "captures"))
tk.Entry(cfg, textvariable=self.logdir_var, width=40,
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
font=MONO).grid(row=1, column=1, columnspan=4, sticky="we", **pad)
tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=self._choose_dir).grid(row=1, column=5, **pad)
# Row 2: raw taps (always enabled — timestamped names generated at start)
self._raw_bw_on = tk.BooleanVar(value=True)
self._raw_s3_on = tk.BooleanVar(value=True)
tk.Checkbutton(cfg, text="Capture BW->S3 raw", variable=self._raw_bw_on,
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO).grid(row=2, column=0, columnspan=2, sticky="w", **pad)
tk.Checkbutton(cfg, text="Capture S3->BW raw", variable=self._raw_s3_on,
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO).grid(row=2, column=2, columnspan=2, sticky="w", **pad)
# Row 3: buttons + status
btn_row = tk.Frame(self, bg=BG2)
btn_row.pack(side=tk.TOP, fill=tk.X, padx=4, pady=2)
self.start_btn = tk.Button(btn_row, text="Start Bridge", bg=GREEN, fg="#000000",
relief="flat", padx=12, cursor="hand2", font=MONO_B,
command=self.start_bridge)
self.start_btn.pack(side=tk.LEFT, padx=6)
self.stop_btn = tk.Button(btn_row, text="Stop Bridge", bg=BG3, fg=FG,
relief="flat", padx=12, cursor="hand2", font=MONO,
command=self.stop_bridge, state="disabled")
self.stop_btn.pack(side=tk.LEFT, padx=4)
self.mark_btn = tk.Button(btn_row, text="Add Mark", bg=BG3, fg=FG,
relief="flat", padx=10, cursor="hand2", font=MONO,
command=self.add_mark, state="disabled")
self.mark_btn.pack(side=tk.LEFT, padx=4)
self.status_var = tk.StringVar(value="Idle")
tk.Label(btn_row, textvariable=self.status_var,
bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=10)
# Log output
self.log_view = scrolledtext.ScrolledText(
self, height=18, font=MONO_SM,
bg=BG, fg=FG, insertbackground=FG,
relief="flat", state="disabled",
)
self.log_view.pack(fill=tk.BOTH, expand=True, padx=4, pady=4)
# ── helpers ───────────────────────────────────────────────────────────
def _choose_dir(self) -> None:
path = filedialog.askdirectory(initialdir=self.logdir_var.get())
if path:
self.logdir_var.set(path)
def _append_log(self, text: str) -> None:
self.log_view.configure(state="normal")
self.log_view.insert(tk.END, text)
self.log_view.see(tk.END)
self.log_view.configure(state="disabled")
# ── bridge control ────────────────────────────────────────────────────
def start_bridge(self) -> None:
if self.process and self.process.poll() is None:
messagebox.showinfo("Bridge", "Bridge is already running.")
return
bw = self.bw_var.get().strip()
s3 = self.s3_var.get().strip()
baud = self.baud_var.get().strip()
logdir = self.logdir_var.get().strip() or "."
if not bw or not s3:
messagebox.showerror("Error", "Please enter both BW and S3 COM ports.")
return
os.makedirs(logdir, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
args = [sys.executable, str(BRIDGE_PATH),
"--bw", bw, "--s3", s3, "--baud", baud, "--logdir", logdir]
raw_bw_path = raw_s3_path = None
if self._raw_bw_on.get():
raw_bw_path = os.path.join(logdir, f"raw_bw_{ts}.bin")
args += ["--raw-bw", raw_bw_path]
if self._raw_s3_on.get():
raw_s3_path = os.path.join(logdir, f"raw_s3_{ts}.bin")
args += ["--raw-s3", raw_s3_path]
# Structured bin path — written by bridge automatically, named by ts
struct_bin_path = os.path.join(logdir, f"s3_session_{ts}.bin")
try:
self.process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE,
text=True,
bufsize=1,
)
except Exception as e:
messagebox.showerror("Error", f"Failed to start bridge:\n{e}")
return
threading.Thread(target=self._reader_thread, daemon=True).start()
self.status_var.set(f"Running — {bw} <-> {s3}")
self.start_btn.configure(state="disabled")
self.stop_btn.configure(state="normal", bg=RED)
self.mark_btn.configure(state="normal")
self._append_log(f"== Bridge started [{ts}] ==\n")
# Notify parent so Analyzer can wire up live mode
self._on_started(raw_bw_path, raw_s3_path, struct_bin_path)
def stop_bridge(self) -> None:
if self.process and self.process.poll() is None:
self.process.terminate()
try:
self.process.wait(timeout=3)
except subprocess.TimeoutExpired:
self.process.kill()
self._bridge_ended()
self._on_stopped()
def _bridge_ended(self) -> None:
self.status_var.set("Stopped")
self.start_btn.configure(state="normal")
self.stop_btn.configure(state="disabled", bg=BG3)
self.mark_btn.configure(state="disabled")
self._append_log("== Bridge stopped ==\n")
def _reader_thread(self) -> None:
if not self.process or not self.process.stdout:
return
for line in self.process.stdout:
self._stdout_q.put(line)
self._stdout_q.put("<<exit>>")
def _poll_stdout(self) -> None:
try:
while True:
line = self._stdout_q.get_nowait()
if line == "<<exit>>":
self._bridge_ended()
self._on_stopped()
break
self._append_log(line)
except queue.Empty:
pass
finally:
self.after(100, self._poll_stdout)
def add_mark(self) -> None:
if not self.process or not self.process.stdin or self.process.poll() is not None:
return
label = simpledialog.askstring("Mark", "Enter label for this mark:", parent=self)
if not label or not label.strip():
return
try:
self.process.stdin.write("m\n")
self.process.stdin.write(label.strip() + "\n")
self.process.stdin.flush()
self._append_log(f"[MARK] {label.strip()}\n")
except Exception as e:
messagebox.showerror("Error", f"Failed to send mark:\n{e}")
# ─────────────────────────────────────────────────────────────────────────────
# Analyzer panel (tk.Frame — lives inside a notebook tab)
# Extracted from gui_analyzer.py; accepts external path injection.
# ─────────────────────────────────────────────────────────────────────────────
class AnalyzerPanel(tk.Frame):
def __init__(self, parent: tk.Widget, db: FrameDB, **kw):
super().__init__(parent, bg=BG, **kw)
self._db = db
self.state = AnalyzerState()
self._live_thread: Optional[threading.Thread] = None
self._live_stop = threading.Event()
self._live_q: queue.Queue[str] = queue.Queue()
self._build()
self._poll_live_queue()
# ── external API (called by parent when bridge starts/stops) ──────────
def set_live_files(self, raw_bw: Optional[str], raw_s3: Optional[str],
struct_bin: Optional[str] = None) -> None:
"""Called when the bridge starts — inject file paths and start live mode."""
if raw_s3:
self.s3_var.set(raw_s3)
if raw_bw:
self.bw_var.set(raw_bw)
if struct_bin:
self.bin_var.set(struct_bin)
if raw_s3 and raw_bw:
self._start_live()
def stop_live(self) -> None:
"""Called when the bridge stops."""
if self._live_thread and self._live_thread.is_alive():
self._live_stop.set()
self._set_live_btn_off()
# ── build ─────────────────────────────────────────────────────────────
def _build(self) -> None:
self._build_toolbar()
self._build_panes()
self._build_statusbar()
def _build_toolbar(self) -> None:
bar = tk.Frame(self, bg=BG2, pady=4)
bar.pack(side=tk.TOP, fill=tk.X)
pad = {"padx": 5, "pady": 2}
# Row 1: raw files
row1 = tk.Frame(bar, bg=BG2)
row1.pack(side=tk.TOP, fill=tk.X)
tk.Label(row1, text="S3 raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self.s3_var = tk.StringVar()
tk.Entry(row1, textvariable=self.s3_var, width=30, bg=BG3, fg=FG,
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
tk.Button(row1, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=lambda: self._browse(self.s3_var, "raw_s3.bin")
).pack(side=tk.LEFT, **pad)
tk.Label(row1, text=" BW raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self.bw_var = tk.StringVar()
tk.Entry(row1, textvariable=self.bw_var, width=30, bg=BG3, fg=FG,
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
tk.Button(row1, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=lambda: self._browse(self.bw_var, "raw_bw.bin")
).pack(side=tk.LEFT, **pad)
# Row 2: structured bin (optional — enables mark-based session splitting)
row2 = tk.Frame(bar, bg=BG2)
row2.pack(side=tk.TOP, fill=tk.X)
tk.Label(row2, text="Session .bin:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self.bin_var = tk.StringVar()
tk.Entry(row2, textvariable=self.bin_var, width=46, bg=BG3, fg=FG,
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
tk.Button(row2, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO, command=lambda: self._browse(self.bin_var, "s3_session.bin")
).pack(side=tk.LEFT, **pad)
tk.Label(row2, text="(optional — splits sessions at marks)", bg=BG2, fg=FG_DIM,
font=MONO_SM).pack(side=tk.LEFT, padx=6)
# Row 3: buttons
bar = tk.Frame(bar, bg=BG2)
bar.pack(side=tk.TOP, fill=tk.X)
tk.Frame(bar, bg=BG2, width=10).pack(side=tk.LEFT)
self.analyze_btn = tk.Button(bar, text="Analyze", bg=ACCENT, fg="#ffffff",
relief="flat", padx=10, cursor="hand2", font=MONO_B,
command=self._run_analyze)
self.analyze_btn.pack(side=tk.LEFT, **pad)
self.live_btn = tk.Button(bar, text="Live: OFF", bg=BG3, fg=FG,
relief="flat", padx=10, cursor="hand2",
font=MONO, command=self._toggle_live)
self.live_btn.pack(side=tk.LEFT, **pad)
self.export_btn = tk.Button(bar, text="Export for Claude", bg=ORANGE, fg="#000000",
relief="flat", padx=10, cursor="hand2", font=MONO_B,
command=self._run_export, state="disabled")
self.export_btn.pack(side=tk.LEFT, **pad)
self.status_var = tk.StringVar(value="Idle")
tk.Label(bar, textvariable=self.status_var, bg=BG2, fg=FG_DIM,
font=MONO, anchor="w").pack(side=tk.LEFT, padx=10)
def _build_panes(self) -> None:
pane = tk.PanedWindow(self, orient=tk.HORIZONTAL, bg=BG,
sashwidth=4, sashrelief="flat")
pane.pack(fill=tk.BOTH, expand=True)
# Left: session tree
left = tk.Frame(pane, bg=BG2, width=260)
pane.add(left, minsize=200)
tk.Label(left, text="Sessions", bg=BG2, fg=ACCENT,
font=MONO_B, anchor="w", padx=6).pack(fill=tk.X)
tree_frame = tk.Frame(left, bg=BG2)
tree_frame.pack(fill=tk.BOTH, expand=True)
style = ttk.Style()
style.theme_use("clam")
style.configure("Treeview", background=BG2, foreground=FG,
fieldbackground=BG2, font=MONO_SM, rowheight=18, borderwidth=0)
style.configure("Treeview.Heading", background=BG3, foreground=ACCENT, font=MONO_SM)
style.map("Treeview", background=[("selected", BG3)],
foreground=[("selected", "#ffffff")])
self.tree = ttk.Treeview(tree_frame, columns=("info",), show="tree headings",
selectmode="browse")
self.tree.heading("#0", text="Frame")
self.tree.heading("info", text="Info")
self.tree.column("#0", width=160, stretch=True)
self.tree.column("info", width=80, stretch=False)
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.tree.yview)
self.tree.configure(yscrollcommand=vsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
self.tree.pack(fill=tk.BOTH, expand=True)
self.tree.tag_configure("session", foreground=ACCENT, font=MONO_B)
self.tree.tag_configure("bw_frame", foreground=COL_BW)
self.tree.tag_configure("s3_frame", foreground=COL_S3)
self.tree.tag_configure("bad_chk", foreground=RED)
self.tree.tag_configure("malformed", foreground=RED)
self.tree.bind("<<TreeviewSelect>>", self._on_tree_select)
# Right: detail notebook
right = tk.Frame(pane, bg=BG)
pane.add(right, minsize=600)
style.configure("TNotebook", background=BG2, borderwidth=0)
style.configure("TNotebook.Tab", background=BG3, foreground=FG,
font=MONO, padding=[8, 2])
style.map("TNotebook.Tab", background=[("selected", BG)],
foreground=[("selected", ACCENT)])
self.nb = ttk.Notebook(right)
self.nb.pack(fill=tk.BOTH, expand=True)
self.inv_text = self._make_text_tab("Inventory")
self.hex_text = self._make_text_tab("Hex Dump")
self.diff_text = self._make_text_tab("Diff")
self.report_text = self._make_text_tab("Full Report")
self._build_query_tab()
for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text):
w.tag_configure("head", foreground=COL_HEAD, font=MONO_B)
w.tag_configure("bw", foreground=COL_BW)
w.tag_configure("s3", foreground=COL_S3)
w.tag_configure("changed", foreground=COL_DIFF)
w.tag_configure("known", foreground=COL_KNOW)
w.tag_configure("dim", foreground=FG_DIM)
w.tag_configure("normal", foreground=FG)
w.tag_configure("warn", foreground=YELLOW)
w.tag_configure("addr", foreground=ORANGE)
def _make_text_tab(self, title: str) -> tk.Text:
frame = tk.Frame(self.nb, bg=BG)
self.nb.add(frame, text=title)
w = tk.Text(frame, bg=BG, fg=FG, font=MONO, state="disabled",
relief="flat", wrap="none", insertbackground=FG,
selectbackground=BG3, selectforeground="#ffffff")
vsb = ttk.Scrollbar(frame, orient="vertical", command=w.yview)
hsb = ttk.Scrollbar(frame, orient="horizontal", command=w.xview)
w.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
hsb.pack(side=tk.BOTTOM, fill=tk.X)
w.pack(fill=tk.BOTH, expand=True)
return w
def _build_query_tab(self) -> None:
frame = tk.Frame(self.nb, bg=BG)
self.nb.add(frame, text="Query DB")
pad = {"padx": 4, "pady": 2}
filt = tk.Frame(frame, bg=BG2, pady=4)
filt.pack(side=tk.TOP, fill=tk.X)
tk.Label(filt, text="Capture:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=0, sticky="e", **pad)
self._q_capture_var = tk.StringVar(value="All")
self._q_capture_cb = ttk.Combobox(filt, textvariable=self._q_capture_var,
width=20, font=MONO_SM, state="readonly")
self._q_capture_cb.grid(row=0, column=1, sticky="w", **pad)
tk.Label(filt, text="Dir:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=2, sticky="e", **pad)
self._q_dir_var = tk.StringVar(value="All")
ttk.Combobox(filt, textvariable=self._q_dir_var, values=["All", "BW", "S3"],
width=6, font=MONO_SM, state="readonly").grid(row=0, column=3, sticky="w", **pad)
tk.Label(filt, text="SUB:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=4, sticky="e", **pad)
self._q_sub_var = tk.StringVar(value="All")
self._q_sub_cb = ttk.Combobox(filt, textvariable=self._q_sub_var,
width=12, font=MONO_SM, state="readonly")
self._q_sub_cb.grid(row=0, column=5, sticky="w", **pad)
tk.Label(filt, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=6, sticky="e", **pad)
self._q_offset_var = tk.StringVar()
tk.Entry(filt, textvariable=self._q_offset_var, width=8, bg=BG3, fg=FG,
font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=7, sticky="w", **pad)
tk.Label(filt, text="Value:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=8, sticky="e", **pad)
self._q_value_var = tk.StringVar()
tk.Entry(filt, textvariable=self._q_value_var, width=8, bg=BG3, fg=FG,
font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=9, sticky="w", **pad)
tk.Button(filt, text="Run Query", bg=ACCENT, fg="#ffffff", relief="flat",
padx=8, cursor="hand2", font=("Consolas", 8, "bold"),
command=self._run_db_query).grid(row=0, column=10, padx=8)
tk.Button(filt, text="Refresh", bg=BG3, fg=FG, relief="flat",
padx=6, cursor="hand2", font=MONO_SM,
command=self._refresh_query_dropdowns).grid(row=0, column=11, padx=4)
self._q_stats_var = tk.StringVar(value="DB: —")
tk.Label(filt, textvariable=self._q_stats_var, bg=BG2, fg=FG_DIM,
font=MONO_SM).grid(row=0, column=12, padx=12, sticky="w")
res_frame = tk.Frame(frame, bg=BG)
res_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
cols = ("cap", "sess", "dir", "sub", "sub_name", "page", "len", "chk")
self._q_tree = ttk.Treeview(res_frame, columns=cols, show="headings", selectmode="browse")
for cid, head, w in [
("cap", "Cap", 40), ("sess", "Sess", 40), ("dir", "Dir", 40),
("sub", "SUB", 50), ("sub_name", "Name", 160), ("page", "Page", 60),
("len", "Len", 50), ("chk", "Chk", 50),
]:
self._q_tree.heading(cid, text=head, anchor="w")
self._q_tree.column(cid, width=w, stretch=(cid == "sub_name"))
q_vsb = ttk.Scrollbar(res_frame, orient="vertical", command=self._q_tree.yview)
q_hsb = ttk.Scrollbar(res_frame, orient="horizontal", command=self._q_tree.xview)
self._q_tree.configure(yscrollcommand=q_vsb.set, xscrollcommand=q_hsb.set)
q_vsb.pack(side=tk.RIGHT, fill=tk.Y)
q_hsb.pack(side=tk.BOTTOM, fill=tk.X)
self._q_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self._q_tree.tag_configure("bw_row", foreground=COL_BW)
self._q_tree.tag_configure("s3_row", foreground=COL_S3)
self._q_tree.tag_configure("bad_row", foreground=RED)
self._q_tree.bind("<<TreeviewSelect>>", lambda _e: self._run_interpret())
interp_frame = tk.Frame(frame, bg=BG2, height=120)
interp_frame.pack(side=tk.BOTTOM, fill=tk.X)
interp_frame.pack_propagate(False)
tk.Label(interp_frame, text="Byte interpretation:", bg=BG2, fg=ACCENT,
font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X)
ii = tk.Frame(interp_frame, bg=BG2)
ii.pack(fill=tk.X, padx=6, pady=2)
tk.Label(ii, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).pack(side=tk.LEFT)
self._interp_offset_var = tk.StringVar(value="5")
tk.Entry(ii, textvariable=self._interp_offset_var, width=6, bg=BG3, fg=FG,
font=MONO_SM, insertbackground=FG, relief="flat").pack(side=tk.LEFT, padx=4)
tk.Button(ii, text="Interpret", bg=BG3, fg=FG, relief="flat", cursor="hand2",
font=MONO_SM, command=self._run_interpret).pack(side=tk.LEFT, padx=4)
self._interp_text = tk.Text(interp_frame, bg=BG2, fg=FG, font=MONO_SM,
height=4, relief="flat", state="disabled",
insertbackground=FG)
self._interp_text.pack(fill=tk.X, padx=6, pady=2)
self._interp_text.tag_configure("label", foreground=FG_DIM)
self._interp_text.tag_configure("value", foreground=YELLOW)
self._q_rows: dict[str, object] = {}
self._q_capture_rows: list = [None]
self._q_sub_values: list = [None]
self._refresh_query_dropdowns()
def _build_statusbar(self) -> None:
bar = tk.Frame(self, bg=BG3, height=20)
bar.pack(side=tk.BOTTOM, fill=tk.X)
self.sb_var = tk.StringVar(value="Ready")
tk.Label(bar, textvariable=self.sb_var, bg=BG3, fg=FG_DIM,
font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X)
# ── file picking ──────────────────────────────────────────────────────
def _browse(self, var: tk.StringVar, default: str) -> None:
path = filedialog.askopenfilename(
title=f"Select {default}",
filetypes=[("Binary", "*.bin"), ("All files", "*.*")],
initialfile=default,
)
if path:
var.set(path)
# ── analysis ──────────────────────────────────────────────────────────
def _run_analyze(self) -> None:
s3p = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None
bwp = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None
if not s3p or not bwp:
messagebox.showerror("Missing files", "Select both S3 and BW raw files.")
return
if not s3p.exists():
messagebox.showerror("Not found", f"S3 file not found:\n{s3p}")
return
if not bwp.exists():
messagebox.showerror("Not found", f"BW file not found:\n{bwp}")
return
self.state.s3_path = s3p
self.state.bw_path = bwp
self._do_analyze(s3p, bwp)
def _browse_bin(self) -> None:
path = filedialog.askopenfilename(
title="Select session .bin file",
filetypes=[("Binary", "*.bin"), ("All files", "*.*")],
)
if path:
self.bin_var.set(path)
def _do_analyze(self, s3_path: Path, bw_path: Path) -> None:
self.status_var.set("Parsing...")
self.update_idletasks()
s3_blob = s3_path.read_bytes()
bw_blob = bw_path.read_bytes()
# Use mark-based session splitting if a structured .bin is provided
bin_str = self.bin_var.get().strip()
bin_path = Path(bin_str) if bin_str else None
marks = []
if bin_path and bin_path.exists():
marks = parse_structured_bin(bin_path.read_bytes())
if marks:
sessions = split_sessions_at_marks(bw_blob, s3_blob, marks)
mark_labels = " | ".join(m.label for m in marks)
self.sb_var.set(f"{len(marks)} user mark(s): {mark_labels}")
self.update_idletasks()
else:
if bin_path and bin_path.exists():
self.sb_var.set("No user marks found in session .bin — using standard session detection")
s3_frames = annotate_frames(parse_s3(s3_blob, trailer_len=0), "S3")
bw_frames = annotate_frames(parse_bw(bw_blob, trailer_len=0,
validate_checksum=True), "BW")
sessions = split_into_sessions(bw_frames, s3_frames)
diffs: list[Optional[list[FrameDiff]]] = [None]
for i in range(1, len(sessions)):
diffs.append(diff_sessions(sessions[i - 1], sessions[i]))
self.state.sessions = sessions
self.state.diffs = diffs
n_s3 = sum(len(s.s3_frames) for s in sessions)
n_bw = sum(len(s.bw_frames) for s in sessions)
self.status_var.set(f"{len(sessions)} sessions | BW:{n_bw} S3:{n_s3}")
self.sb_var.set(f"Loaded: {s3_path.name} + {bw_path.name}")
self.export_btn.configure(state="normal")
self._rebuild_tree()
try:
cap_id = self._db.ingest(sessions, s3_path, bw_path)
if cap_id is not None:
self.state.last_capture_id = cap_id
self._refresh_query_dropdowns()
for i, lbl in enumerate(self._q_capture_cb["values"]):
if lbl.startswith(f"#{cap_id} "):
self._q_capture_cb.current(i)
break
except Exception as exc:
self.sb_var.set(f"DB ingest error: {exc}")
def _run_export(self) -> None:
if not self.state.sessions:
messagebox.showinfo("Export", "Run Analyze first.")
return
outdir = self.state.s3_path.parent if self.state.s3_path else Path(".")
out_path = write_claude_export(self.state.sessions, self.state.diffs,
outdir, self.state.s3_path, self.state.bw_path)
self.sb_var.set(f"Exported: {out_path.name}")
if messagebox.askyesno("Export complete", f"Saved to:\n{out_path}\n\nOpen folder?"):
import subprocess as sp
sp.Popen(["explorer", str(out_path.parent)])
# ── session tree ──────────────────────────────────────────────────────
def _rebuild_tree(self) -> None:
self.tree.delete(*self.tree.get_children())
for sess in self.state.sessions:
is_complete = any(af.header and af.header.sub == 0x74
for af in sess.bw_frames)
label = f"Session {sess.index}" + ("" if is_complete else " [partial]")
n_diff = len(self.state.diffs[sess.index] or [])
diff_str = f"{n_diff} changes" if n_diff else ""
sid = self.tree.insert("", tk.END, text=label,
values=(diff_str,), tags=("session",))
for af in sess.all_frames:
src_tag = "bw_frame" if af.source == "BW" else "s3_frame"
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
lbl_text = f"[{af.source}] {sub_hex} {af.sub_name}"
extra = ""
tags = (src_tag,)
if af.frame.checksum_valid is False:
extra = "BAD CHK"; tags = ("bad_chk",)
elif af.header is None:
lbl_text = f"[{af.source}] MALFORMED"; tags = ("malformed",)
self.tree.insert(sid, tk.END, text=lbl_text, values=(extra,), tags=tags,
iid=f"frame_{sess.index}_{af.frame.index}_{af.source}")
for item in self.tree.get_children():
self.tree.item(item, open=True)
def _on_tree_select(self, _e: tk.Event) -> None:
sel = self.tree.selection()
if not sel:
return
iid = sel[0]
if iid.startswith("frame_"):
parts = iid.split("_")
self._show_frame_detail(int(parts[1]), int(parts[2]), parts[3])
else:
text = self.tree.item(iid, "text")
try:
self._show_session_detail(int(text.split()[1]))
except (IndexError, ValueError):
pass
def _find_frame_by_sub(self, sess_idx: int, sub: int, page_key: int) -> Optional[AnnotatedFrame]:
"""Find a frame in a session by (sub, page_key) — used for diff click-through."""
if sess_idx >= len(self.state.sessions):
return None
sess = self.state.sessions[sess_idx]
for af in sess.bw_frames + sess.s3_frames:
if af.header and af.header.sub == sub and af.header.page_key == page_key:
return af
return None
def _find_frame(self, sess_idx: int, frame_idx: int, source: str) -> Optional[AnnotatedFrame]:
if sess_idx >= len(self.state.sessions):
return None
pool = (self.state.sessions[sess_idx].bw_frames if source == "BW"
else self.state.sessions[sess_idx].s3_frames)
return next((af for af in pool if af.frame.index == frame_idx), None)
# ── detail renderers ──────────────────────────────────────────────────
def _clear_tabs(self) -> None:
for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text):
self._tc(w)
def _show_session_detail(self, idx: int) -> None:
if idx >= len(self.state.sessions):
return
sess = self.state.sessions[idx]
diffs = self.state.diffs[idx]
self._clear_tabs()
w = self.inv_text
self._tw(w, f"SESSION {sess.index}\n", "head")
self._tw(w, f"Frames: {len(sess.bw_frames)+len(sess.s3_frames)}"
f" (BW:{len(sess.bw_frames)} S3:{len(sess.s3_frames)})\n", "normal")
if len(sess.bw_frames) != len(sess.s3_frames):
self._tw(w, " WARNING: BW/S3 count mismatch\n", "warn")
self._tn(w)
for i, af in enumerate(sess.all_frames):
src = "bw" if af.source == "BW" else "s3"
sh = f"{af.header.sub:02X}" if af.header else "??"
pg = f" (page {af.header.page_key:04X})" if af.header and af.header.page_key else ""
chk = ""
if af.frame.checksum_valid is False: chk = " [BAD CHECKSUM]"
elif af.frame.checksum_valid is True: chk = f" [{af.frame.checksum_type}]"
self._tw(w, f" [{af.source}] #{i:<3} ", src)
self._tw(w, f"SUB={sh} ", "addr")
self._tw(w, f"{af.sub_name:<30}{pg} len={len(af.frame.payload)}", "dim")
if chk: self._tw(w, chk, "warn" if af.frame.checksum_valid is False else "dim")
self._tn(w)
w = self.diff_text
self._tc(w)
if diffs is None:
self._tw(w, "(No previous session to diff against)\n", "dim")
elif not diffs:
self._tw(w, f"DIFF vs SESSION {idx-1}\n", "head")
self._tw(w, " No changes detected.\n", "dim")
else:
self._tw(w, f"DIFF vs SESSION {idx-1}\n", "head")
self._tw(w, " (click any SUB header to open its hex dump)\n", "dim")
for fd in diffs:
pg = f" (page {fd.page_key:04X})" if fd.page_key else ""
link_tag = f"difflink_{fd.sub}_{fd.page_key}"
w.configure(state="normal")
start = w.index(tk.INSERT)
w.insert(tk.END, f"\n SUB {fd.sub:02X} ({fd.sub_name}){pg}:\n")
end = w.index(tk.INSERT)
w.tag_add("addr", start, end)
w.tag_add(link_tag, start, end)
w.tag_configure(link_tag, underline=True)
# capture fd values for the closure
def _make_handler(s=idx, sub=fd.sub, pk=fd.page_key):
def _handler(event):
af = self._find_frame_by_sub(s, sub, pk)
if af:
self._show_frame_detail(s, af.frame.index, af.source)
return _handler
w.tag_bind(link_tag, "<Button-1>", _make_handler())
w.tag_bind(link_tag, "<Enter>", lambda e, t=link_tag: w.configure(cursor="hand2"))
w.tag_bind(link_tag, "<Leave>", lambda e: w.configure(cursor=""))
w.configure(state="disabled")
for bd in fd.diffs:
b = f"{bd.before:02x}" if bd.before >= 0 else "--"
a = f"{bd.after:02x}" if bd.after >= 0 else "--"
self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim")
self._tw(w, f"{b} -> {a}", "changed")
if bd.field_name: self._tw(w, f" [{bd.field_name}]", "known")
self._tn(w)
report = render_session_report(sess, diffs, idx - 1 if idx > 0 else None)
self._tc(self.report_text)
self._tw(self.report_text, report, "normal")
self.nb.select(0)
def _show_frame_detail(self, sess_idx: int, frame_idx: int, source: str) -> None:
af = self._find_frame(sess_idx, frame_idx, source)
if af is None:
return
self._clear_tabs()
src = "bw" if source == "BW" else "s3"
sh = f"{af.header.sub:02X}" if af.header else "??"
w = self.inv_text
self._tw(w, f"[{af.source}] Frame #{af.frame.index}\n", src)
self._tw(w, f"Session {sess_idx} | ", "dim")
self._tw(w, f"SUB={sh} {af.sub_name}\n", "addr")
if af.header:
self._tw(w, f" OFFSET: {af.header.page_key:04X} "
f"CMD={af.header.cmd:02X} FLAGS={af.header.flags:02X}\n", "dim")
self._tn(w)
self._tw(w, f"Payload: {len(af.frame.payload)} bytes\n", "dim")
if af.frame.checksum_valid is False: self._tw(w, " BAD CHECKSUM\n", "warn")
elif af.frame.checksum_valid is True:
self._tw(w, f" Checksum: {af.frame.checksum_type} {af.frame.checksum_hex}\n", "dim")
self._tn(w)
p = af.frame.payload
if len(p) >= 5:
self._tw(w, "Header breakdown:\n", "head")
self._tw(w, f" [0] CMD = {p[0]:02x}\n", "dim")
self._tw(w, f" [1] ? = {p[1]:02x}\n", "dim")
self._tw(w, f" [2] SUB = {p[2]:02x} ({af.sub_name})\n", src)
self._tw(w, f" [3] OFFSET_HI = {p[3]:02x}\n", "dim")
self._tw(w, f" [4] OFFSET_LO = {p[4]:02x}\n", "dim")
if len(p) > 5: self._tw(w, f" [5..] data = {len(p)-5} bytes\n", "dim")
w = self.hex_text
self._tw(w, f"[{af.source}] SUB={sh} {af.sub_name}\n", src)
self._tw(w, f"Payload ({len(af.frame.payload)} bytes):\n", "dim")
self._tn(w)
self._tw(w, "\n".join(format_hex_dump(af.frame.payload, indent=" ")) + "\n", "normal")
diffs_for_sess = self.state.diffs[sess_idx] if sess_idx < len(self.state.diffs) else None
if diffs_for_sess and af.header:
matching = [fd for fd in diffs_for_sess
if fd.sub == af.header.sub and fd.page_key == af.header.page_key]
if matching:
self._tn(w)
self._tw(w, "Changed bytes (vs prev session):\n", "head")
for bd in matching[0].diffs:
b = f"{bd.before:02x}" if bd.before >= 0 else "--"
a = f"{bd.after:02x}" if bd.after >= 0 else "--"
self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim")
self._tw(w, f"{b} -> {a}", "changed")
if bd.field_name: self._tw(w, f" [{bd.field_name}]", "known")
self._tn(w)
self.nb.select(1)
# ── live mode ─────────────────────────────────────────────────────────
def _toggle_live(self) -> None:
if self._live_thread and self._live_thread.is_alive():
self._live_stop.set()
self._set_live_btn_off()
else:
s3p = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None
bwp = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None
if not s3p or not bwp:
messagebox.showerror("Missing files", "Select both raw files before starting live mode.")
return
self.state.s3_path = s3p
self.state.bw_path = bwp
self._start_live()
def _start_live(self) -> None:
s3p = Path(self.s3_var.get().strip())
bwp = Path(self.bw_var.get().strip())
self.state.s3_path = s3p
self.state.bw_path = bwp
self._live_stop.clear()
self._live_thread = threading.Thread(
target=self._live_worker, args=(s3p, bwp), daemon=True)
self._live_thread.start()
self.live_btn.configure(text="Live: ON", bg=GREEN, fg="#000000")
self.status_var.set("Live mode running...")
def _set_live_btn_off(self) -> None:
self.live_btn.configure(text="Live: OFF", bg=BG3, fg=FG)
self.status_var.set("Live stopped")
def _live_worker(self, s3_path: Path, bw_path: Path) -> None:
import time
s3_pos = bw_pos = 0
while not self._live_stop.is_set():
changed = False
for path, pos_attr in ((s3_path, "s3"), (bw_path, "bw")):
if path.exists():
with path.open("rb") as fh:
fh.seek(s3_pos if pos_attr == "s3" else bw_pos)
nb = fh.read()
if nb:
if pos_attr == "s3": s3_pos += len(nb)
else: bw_pos += len(nb)
changed = True
if changed:
self._live_q.put("refresh")
time.sleep(0.1)
def _poll_live_queue(self) -> None:
try:
while True:
msg = self._live_q.get_nowait()
if msg == "refresh" and self.state.s3_path and self.state.bw_path:
self._do_analyze(self.state.s3_path, self.state.bw_path)
except queue.Empty:
pass
finally:
self.after(150, self._poll_live_queue)
# ── DB query ──────────────────────────────────────────────────────────
def _refresh_query_dropdowns(self) -> None:
try:
captures = self._db.list_captures()
self._q_capture_cb["values"] = ["All"] + [
f"#{r['id']} {r['timestamp'][:16]} ({r['frame_count']} frames)"
for r in captures
]
self._q_capture_rows = [None] + [r["id"] for r in captures]
subs = self._db.get_distinct_subs()
self._q_sub_cb["values"] = ["All"] + [f"0x{s:02X}" for s in subs]
self._q_sub_values = [None] + subs
stats = self._db.get_stats()
self._q_stats_var.set(f"DB: {stats['captures']} captures | {stats['frames']} frames")
except Exception as exc:
self._q_stats_var.set(f"DB error: {exc}")
def _parse_int(self, s: str) -> Optional[int]:
s = s.strip()
if not s: return None
try: return int(s, 0)
except ValueError: return None
def _run_db_query(self) -> None:
cap_idx = self._q_capture_cb.current()
cap_id = self._q_capture_rows[cap_idx] if cap_idx > 0 else None
dir_val = self._q_dir_var.get()
direction = dir_val if dir_val != "All" else None
sub_idx = self._q_sub_cb.current()
sub = self._q_sub_values[sub_idx] if sub_idx > 0 else None
offset = self._parse_int(self._q_offset_var.get())
value = self._parse_int(self._q_value_var.get())
try:
if offset is not None:
rows = self._db.query_by_byte(offset=offset, value=value,
capture_id=cap_id, direction=direction, sub=sub)
else:
rows = self._db.query_frames(capture_id=cap_id, direction=direction, sub=sub)
except Exception as exc:
messagebox.showerror("Query error", str(exc))
return
self._q_tree.delete(*self._q_tree.get_children())
self._q_rows.clear()
for row in rows:
sh = f"0x{row['sub']:02X}" if row["sub"] is not None else ""
pg = f"0x{row['page_key']:04X}" if row["page_key"] is not None else ""
chk = {1: "OK", 0: "BAD", None: ""}.get(row["checksum_ok"], "")
tag = "bw_row" if row["direction"] == "BW" else "s3_row"
if row["checksum_ok"] == 0: tag = "bad_row"
iid = str(row["id"])
self._q_tree.insert("", tk.END, iid=iid, tags=(tag,), values=(
row["capture_id"], row["session_idx"], row["direction"],
sh, row["sub_name"] or "", pg, row["payload_len"], chk))
self._q_rows[iid] = row
self.sb_var.set(f"Query returned {len(rows)} rows")
def _run_interpret(self) -> None:
sel = self._q_tree.selection()
if not sel: return
row = self._q_rows.get(sel[0])
if not row: return
offset = self._parse_int(self._interp_offset_var.get())
if offset is None: return
payload = bytes(row["payload"])
interp = self._db.interpret_offset(payload, offset)
w = self._interp_text
w.configure(state="normal")
w.delete("1.0", tk.END)
sh = f"0x{row['sub']:02X}" if row["sub"] is not None else "??"
w.insert(tk.END,
f"Frame #{row['id']} [{row['direction']}] SUB={sh} "
f"offset={offset} (0x{offset:04X})\n", "label")
line = ""
for key, lbl in [
("uint8","uint8 "), ("int8","int8 "),
("uint16_be","uint16 BE "), ("uint16_le","uint16 LE "),
("uint32_be","uint32 BE "), ("uint32_le","uint32 LE "),
("float32_be","float32 BE "), ("float32_le","float32 LE "),
]:
if key not in interp: continue
val = interp[key]
vs = f"{val:.6g}" if isinstance(val, float) else (
f"{val} (0x{int(val)&0xFFFFFFFF:X})")
line += f" {lbl}: {vs:<28}"
if len(line) > 80:
w.insert(tk.END, line + "\n", "value"); line = ""
if line: w.insert(tk.END, line + "\n", "value")
w.configure(state="disabled")
# ── text helpers ──────────────────────────────────────────────────────
def _tc(self, w: tk.Text) -> None:
w.configure(state="normal"); w.delete("1.0", tk.END)
def _tw(self, w: tk.Text, text: str, tag: str = "normal") -> None:
w.configure(state="normal"); w.insert(tk.END, text, tag)
def _tn(self, w: tk.Text) -> None:
w.configure(state="normal"); w.insert(tk.END, "\n"); w.configure(state="disabled")
# ─────────────────────────────────────────────────────────────────────────────
# Main application window
# ─────────────────────────────────────────────────────────────────────────────
class SeismoLab(tk.Tk):
def __init__(self) -> None:
super().__init__()
self.title("Seismo Lab")
self.configure(bg=BG)
self.minsize(1100, 680)
self._db = FrameDB()
style = ttk.Style()
style.theme_use("clam")
style.configure("Top.TNotebook", background=BG3, borderwidth=0, tabposition="nw")
style.configure("Top.TNotebook.Tab", background=BG3, foreground=FG,
font=("Consolas", 10, "bold"), padding=[16, 6])
style.map("Top.TNotebook.Tab",
background=[("selected", ACCENT)],
foreground=[("selected", "#ffffff")])
nb = ttk.Notebook(self, style="Top.TNotebook")
nb.pack(fill=tk.BOTH, expand=True)
self._bridge_panel = BridgePanel(
nb,
on_bridge_started=self._on_bridge_started,
on_bridge_stopped=self._on_bridge_stopped,
)
nb.add(self._bridge_panel, text=" Bridge ")
self._analyzer_panel = AnalyzerPanel(nb, db=self._db)
nb.add(self._analyzer_panel, text=" Analyzer ")
self._nb = nb
self.protocol("WM_DELETE_WINDOW", self._on_close)
def _on_bridge_started(self, raw_bw: Optional[str], raw_s3: Optional[str],
struct_bin: Optional[str] = None) -> None:
"""Bridge started — inject paths into analyzer and start live mode."""
self._analyzer_panel.set_live_files(raw_bw, raw_s3, struct_bin)
# Switch to Analyzer tab so the user can watch it update
self._nb.select(1)
def _on_bridge_stopped(self) -> None:
self._analyzer_panel.stop_live()
def _on_close(self) -> None:
self._bridge_panel.stop_bridge()
self.destroy()
# ─────────────────────────────────────────────────────────────────────────────
def main() -> None:
app = SeismoLab()
app.mainloop()
if __name__ == "__main__":
main()