From 5eb5499034aed3e5adc51f533abca73913ad7638 Mon Sep 17 00:00:00 2001 From: serversdwn Date: Wed, 11 Mar 2026 15:36:59 -0400 Subject: [PATCH] feat: add unified gui for bridge, parser, and analyzer. All in one. --- seismo_lab.py | 1037 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1037 insertions(+) create mode 100644 seismo_lab.py diff --git a/seismo_lab.py b/seismo_lab.py new file mode 100644 index 0000000..e73167f --- /dev/null +++ b/seismo_lab.py @@ -0,0 +1,1037 @@ +#!/usr/bin/env python3 +""" +seismo_lab.py — Combined S3 Bridge + Protocol Analyzer GUI. + +Single window with two top-level tabs: + Bridge — capture live serial traffic (wraps s3_bridge.py as subprocess) + Analyzer — parse, diff, and query captured frames + +When the bridge starts: + - raw tap paths are auto-filled in the Analyzer tab + - Live mode is automatically enabled so the Analyzer updates in real time + +Run from anywhere: + python seismo_lab.py +""" + +from __future__ import annotations + +import datetime +import os +import queue +import subprocess +import sys +import threading +import tkinter as tk +from pathlib import Path +from tkinter import filedialog, messagebox, scrolledtext, simpledialog, ttk +from typing import Optional + +# ── path setup ──────────────────────────────────────────────────────────────── +SCRIPT_DIR = Path(__file__).parent +BRIDGE_PATH = SCRIPT_DIR / "bridges" / "s3-bridge" / "s3_bridge.py" +PARSERS_DIR = SCRIPT_DIR / "parsers" +sys.path.insert(0, str(PARSERS_DIR)) + +from s3_analyzer import ( # noqa: E402 + AnnotatedFrame, + FrameDiff, + Session, + annotate_frames, + diff_sessions, + format_hex_dump, + parse_bw, + parse_s3, + render_session_report, + split_into_sessions, + write_claude_export, +) +from frame_db import FrameDB # noqa: E402 + +# ── colour palette ──────────────────────────────────────────────────────────── +BG = "#1e1e1e" +BG2 = "#252526" +BG3 = "#2d2d30" +FG = "#d4d4d4" +FG_DIM = "#6a6a6a" +ACCENT = "#569cd6" +RED = "#f44747" +YELLOW = "#dcdcaa" +GREEN = "#4caf50" +ORANGE = "#ce9178" + +COL_BW = "#9cdcfe" +COL_S3 = "#4ec9b0" +COL_DIFF = "#f44747" +COL_KNOW = "#4caf50" +COL_HEAD = "#569cd6" + +MONO = ("Consolas", 9) +MONO_SM = ("Consolas", 8) +MONO_B = ("Consolas", 9, "bold") + + +# ───────────────────────────────────────────────────────────────────────────── +# Shared state +# ───────────────────────────────────────────────────────────────────────────── + +class AnalyzerState: + def __init__(self) -> None: + self.sessions: list[Session] = [] + self.diffs: list[Optional[list[FrameDiff]]] = [] + self.s3_path: Optional[Path] = None + self.bw_path: Optional[Path] = None + self.last_capture_id: Optional[int] = None + + +# ───────────────────────────────────────────────────────────────────────────── +# Bridge panel (tk.Frame — lives inside a notebook tab) +# ───────────────────────────────────────────────────────────────────────────── + +class BridgePanel(tk.Frame): + """ + All bridge controls and live log output. + Calls on_bridge_started(raw_bw_path, raw_s3_path) when the bridge starts + so the parent can wire up the Analyzer. + """ + + def __init__(self, parent: tk.Widget, on_bridge_started, on_bridge_stopped, **kw): + super().__init__(parent, bg=BG2, **kw) + self._on_started = on_bridge_started + self._on_stopped = on_bridge_stopped + self.process: Optional[subprocess.Popen] = None + self._stdout_q: queue.Queue[str] = queue.Queue() + self._build() + self._poll_stdout() + + # ── build ───────────────────────────────────────────────────────────── + + def _build(self) -> None: + pad = {"padx": 6, "pady": 4} + + cfg = tk.Frame(self, bg=BG2) + cfg.pack(side=tk.TOP, fill=tk.X, padx=4, pady=4) + + # Row 0: ports + tk.Label(cfg, text="BW COM:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=0, sticky="e", **pad) + self.bw_var = tk.StringVar(value="COM4") + tk.Entry(cfg, textvariable=self.bw_var, width=10, + bg=BG3, fg=FG, insertbackground=FG, relief="flat", + font=MONO).grid(row=0, column=1, sticky="w", **pad) + + tk.Label(cfg, text="S3 COM:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=2, sticky="e", **pad) + self.s3_var = tk.StringVar(value="COM5") + tk.Entry(cfg, textvariable=self.s3_var, width=10, + bg=BG3, fg=FG, insertbackground=FG, relief="flat", + font=MONO).grid(row=0, column=3, sticky="w", **pad) + + tk.Label(cfg, text="Baud:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=4, sticky="e", **pad) + self.baud_var = tk.StringVar(value="38400") + tk.Entry(cfg, textvariable=self.baud_var, width=8, + bg=BG3, fg=FG, insertbackground=FG, relief="flat", + font=MONO).grid(row=0, column=5, sticky="w", **pad) + + # Row 1: log dir + tk.Label(cfg, text="Log dir:", bg=BG2, fg=FG, font=MONO).grid(row=1, column=0, sticky="e", **pad) + self.logdir_var = tk.StringVar(value=str(SCRIPT_DIR / "bridges" / "captures")) + tk.Entry(cfg, textvariable=self.logdir_var, width=40, + bg=BG3, fg=FG, insertbackground=FG, relief="flat", + font=MONO).grid(row=1, column=1, columnspan=4, sticky="we", **pad) + tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2", + font=MONO, command=self._choose_dir).grid(row=1, column=5, **pad) + + # Row 2: raw taps (always enabled — timestamped names generated at start) + self._raw_bw_on = tk.BooleanVar(value=True) + self._raw_s3_on = tk.BooleanVar(value=True) + tk.Checkbutton(cfg, text="Capture BW->S3 raw", variable=self._raw_bw_on, + bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2, + font=MONO).grid(row=2, column=0, columnspan=2, sticky="w", **pad) + tk.Checkbutton(cfg, text="Capture S3->BW raw", variable=self._raw_s3_on, + bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2, + font=MONO).grid(row=2, column=2, columnspan=2, sticky="w", **pad) + + # Row 3: buttons + status + btn_row = tk.Frame(self, bg=BG2) + btn_row.pack(side=tk.TOP, fill=tk.X, padx=4, pady=2) + + self.start_btn = tk.Button(btn_row, text="Start Bridge", bg=GREEN, fg="#000000", + relief="flat", padx=12, cursor="hand2", font=MONO_B, + command=self.start_bridge) + self.start_btn.pack(side=tk.LEFT, padx=6) + + self.stop_btn = tk.Button(btn_row, text="Stop Bridge", bg=BG3, fg=FG, + relief="flat", padx=12, cursor="hand2", font=MONO, + command=self.stop_bridge, state="disabled") + self.stop_btn.pack(side=tk.LEFT, padx=4) + + self.mark_btn = tk.Button(btn_row, text="Add Mark", bg=BG3, fg=FG, + relief="flat", padx=10, cursor="hand2", font=MONO, + command=self.add_mark, state="disabled") + self.mark_btn.pack(side=tk.LEFT, padx=4) + + self.status_var = tk.StringVar(value="Idle") + tk.Label(btn_row, textvariable=self.status_var, + bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=10) + + # Log output + self.log_view = scrolledtext.ScrolledText( + self, height=18, font=MONO_SM, + bg=BG, fg=FG, insertbackground=FG, + relief="flat", state="disabled", + ) + self.log_view.pack(fill=tk.BOTH, expand=True, padx=4, pady=4) + + # ── helpers ─────────────────────────────────────────────────────────── + + def _choose_dir(self) -> None: + path = filedialog.askdirectory(initialdir=self.logdir_var.get()) + if path: + self.logdir_var.set(path) + + def _append_log(self, text: str) -> None: + self.log_view.configure(state="normal") + self.log_view.insert(tk.END, text) + self.log_view.see(tk.END) + self.log_view.configure(state="disabled") + + # ── bridge control ──────────────────────────────────────────────────── + + def start_bridge(self) -> None: + if self.process and self.process.poll() is None: + messagebox.showinfo("Bridge", "Bridge is already running.") + return + + bw = self.bw_var.get().strip() + s3 = self.s3_var.get().strip() + baud = self.baud_var.get().strip() + logdir = self.logdir_var.get().strip() or "." + + if not bw or not s3: + messagebox.showerror("Error", "Please enter both BW and S3 COM ports.") + return + + os.makedirs(logdir, exist_ok=True) + ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + + args = [sys.executable, str(BRIDGE_PATH), + "--bw", bw, "--s3", s3, "--baud", baud, "--logdir", logdir] + + raw_bw_path = raw_s3_path = None + if self._raw_bw_on.get(): + raw_bw_path = os.path.join(logdir, f"raw_bw_{ts}.bin") + args += ["--raw-bw", raw_bw_path] + if self._raw_s3_on.get(): + raw_s3_path = os.path.join(logdir, f"raw_s3_{ts}.bin") + args += ["--raw-s3", raw_s3_path] + + try: + self.process = subprocess.Popen( + args, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdin=subprocess.PIPE, + text=True, + bufsize=1, + ) + except Exception as e: + messagebox.showerror("Error", f"Failed to start bridge:\n{e}") + return + + threading.Thread(target=self._reader_thread, daemon=True).start() + self.status_var.set(f"Running — {bw} <-> {s3}") + self.start_btn.configure(state="disabled") + self.stop_btn.configure(state="normal", bg=RED) + self.mark_btn.configure(state="normal") + self._append_log(f"== Bridge started [{ts}] ==\n") + + # Notify parent so Analyzer can wire up live mode + self._on_started(raw_bw_path, raw_s3_path) + + def stop_bridge(self) -> None: + if self.process and self.process.poll() is None: + self.process.terminate() + try: + self.process.wait(timeout=3) + except subprocess.TimeoutExpired: + self.process.kill() + self._bridge_ended() + self._on_stopped() + + def _bridge_ended(self) -> None: + self.status_var.set("Stopped") + self.start_btn.configure(state="normal") + self.stop_btn.configure(state="disabled", bg=BG3) + self.mark_btn.configure(state="disabled") + self._append_log("== Bridge stopped ==\n") + + def _reader_thread(self) -> None: + if not self.process or not self.process.stdout: + return + for line in self.process.stdout: + self._stdout_q.put(line) + self._stdout_q.put("<>") + + def _poll_stdout(self) -> None: + try: + while True: + line = self._stdout_q.get_nowait() + if line == "<>": + self._bridge_ended() + self._on_stopped() + break + self._append_log(line) + except queue.Empty: + pass + finally: + self.after(100, self._poll_stdout) + + def add_mark(self) -> None: + if not self.process or not self.process.stdin or self.process.poll() is not None: + return + label = simpledialog.askstring("Mark", "Enter label for this mark:", parent=self) + if not label or not label.strip(): + return + try: + self.process.stdin.write("m\n") + self.process.stdin.write(label.strip() + "\n") + self.process.stdin.flush() + self._append_log(f"[MARK] {label.strip()}\n") + except Exception as e: + messagebox.showerror("Error", f"Failed to send mark:\n{e}") + + +# ───────────────────────────────────────────────────────────────────────────── +# Analyzer panel (tk.Frame — lives inside a notebook tab) +# Extracted from gui_analyzer.py; accepts external path injection. +# ───────────────────────────────────────────────────────────────────────────── + +class AnalyzerPanel(tk.Frame): + def __init__(self, parent: tk.Widget, db: FrameDB, **kw): + super().__init__(parent, bg=BG, **kw) + self._db = db + self.state = AnalyzerState() + self._live_thread: Optional[threading.Thread] = None + self._live_stop = threading.Event() + self._live_q: queue.Queue[str] = queue.Queue() + self._build() + self._poll_live_queue() + + # ── external API (called by parent when bridge starts/stops) ────────── + + def set_live_files(self, raw_bw: Optional[str], raw_s3: Optional[str]) -> None: + """Called when the bridge starts — inject file paths and start live mode.""" + if raw_s3: + self.s3_var.set(raw_s3) + if raw_bw: + self.bw_var.set(raw_bw) + if raw_s3 and raw_bw: + self._start_live() + + def stop_live(self) -> None: + """Called when the bridge stops.""" + if self._live_thread and self._live_thread.is_alive(): + self._live_stop.set() + self._set_live_btn_off() + + # ── build ───────────────────────────────────────────────────────────── + + def _build(self) -> None: + self._build_toolbar() + self._build_panes() + self._build_statusbar() + + def _build_toolbar(self) -> None: + bar = tk.Frame(self, bg=BG2, pady=4) + bar.pack(side=tk.TOP, fill=tk.X) + pad = {"padx": 5, "pady": 2} + + tk.Label(bar, text="S3 raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad) + self.s3_var = tk.StringVar() + tk.Entry(bar, textvariable=self.s3_var, width=30, bg=BG3, fg=FG, + insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad) + tk.Button(bar, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2", + font=MONO, command=lambda: self._browse(self.s3_var, "raw_s3.bin") + ).pack(side=tk.LEFT, **pad) + + tk.Label(bar, text=" BW raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad) + self.bw_var = tk.StringVar() + tk.Entry(bar, textvariable=self.bw_var, width=30, bg=BG3, fg=FG, + insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad) + tk.Button(bar, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2", + font=MONO, command=lambda: self._browse(self.bw_var, "raw_bw.bin") + ).pack(side=tk.LEFT, **pad) + + tk.Frame(bar, bg=BG2, width=10).pack(side=tk.LEFT) + self.analyze_btn = tk.Button(bar, text="Analyze", bg=ACCENT, fg="#ffffff", + relief="flat", padx=10, cursor="hand2", font=MONO_B, + command=self._run_analyze) + self.analyze_btn.pack(side=tk.LEFT, **pad) + + self.live_btn = tk.Button(bar, text="Live: OFF", bg=BG3, fg=FG, + relief="flat", padx=10, cursor="hand2", + font=MONO, command=self._toggle_live) + self.live_btn.pack(side=tk.LEFT, **pad) + + self.export_btn = tk.Button(bar, text="Export for Claude", bg=ORANGE, fg="#000000", + relief="flat", padx=10, cursor="hand2", font=MONO_B, + command=self._run_export, state="disabled") + self.export_btn.pack(side=tk.LEFT, **pad) + + self.status_var = tk.StringVar(value="Idle") + tk.Label(bar, textvariable=self.status_var, bg=BG2, fg=FG_DIM, + font=MONO, anchor="w").pack(side=tk.LEFT, padx=10) + + def _build_panes(self) -> None: + pane = tk.PanedWindow(self, orient=tk.HORIZONTAL, bg=BG, + sashwidth=4, sashrelief="flat") + pane.pack(fill=tk.BOTH, expand=True) + + # Left: session tree + left = tk.Frame(pane, bg=BG2, width=260) + pane.add(left, minsize=200) + + tk.Label(left, text="Sessions", bg=BG2, fg=ACCENT, + font=MONO_B, anchor="w", padx=6).pack(fill=tk.X) + + tree_frame = tk.Frame(left, bg=BG2) + tree_frame.pack(fill=tk.BOTH, expand=True) + + style = ttk.Style() + style.theme_use("clam") + style.configure("Treeview", background=BG2, foreground=FG, + fieldbackground=BG2, font=MONO_SM, rowheight=18, borderwidth=0) + style.configure("Treeview.Heading", background=BG3, foreground=ACCENT, font=MONO_SM) + style.map("Treeview", background=[("selected", BG3)], + foreground=[("selected", "#ffffff")]) + + self.tree = ttk.Treeview(tree_frame, columns=("info",), show="tree headings", + selectmode="browse") + self.tree.heading("#0", text="Frame") + self.tree.heading("info", text="Info") + self.tree.column("#0", width=160, stretch=True) + self.tree.column("info", width=80, stretch=False) + + vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.tree.yview) + self.tree.configure(yscrollcommand=vsb.set) + vsb.pack(side=tk.RIGHT, fill=tk.Y) + self.tree.pack(fill=tk.BOTH, expand=True) + + self.tree.tag_configure("session", foreground=ACCENT, font=MONO_B) + self.tree.tag_configure("bw_frame", foreground=COL_BW) + self.tree.tag_configure("s3_frame", foreground=COL_S3) + self.tree.tag_configure("bad_chk", foreground=RED) + self.tree.tag_configure("malformed", foreground=RED) + self.tree.bind("<>", self._on_tree_select) + + # Right: detail notebook + right = tk.Frame(pane, bg=BG) + pane.add(right, minsize=600) + + style.configure("TNotebook", background=BG2, borderwidth=0) + style.configure("TNotebook.Tab", background=BG3, foreground=FG, + font=MONO, padding=[8, 2]) + style.map("TNotebook.Tab", background=[("selected", BG)], + foreground=[("selected", ACCENT)]) + + self.nb = ttk.Notebook(right) + self.nb.pack(fill=tk.BOTH, expand=True) + + self.inv_text = self._make_text_tab("Inventory") + self.hex_text = self._make_text_tab("Hex Dump") + self.diff_text = self._make_text_tab("Diff") + self.report_text = self._make_text_tab("Full Report") + self._build_query_tab() + + for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text): + w.tag_configure("head", foreground=COL_HEAD, font=MONO_B) + w.tag_configure("bw", foreground=COL_BW) + w.tag_configure("s3", foreground=COL_S3) + w.tag_configure("changed", foreground=COL_DIFF) + w.tag_configure("known", foreground=COL_KNOW) + w.tag_configure("dim", foreground=FG_DIM) + w.tag_configure("normal", foreground=FG) + w.tag_configure("warn", foreground=YELLOW) + w.tag_configure("addr", foreground=ORANGE) + + def _make_text_tab(self, title: str) -> tk.Text: + frame = tk.Frame(self.nb, bg=BG) + self.nb.add(frame, text=title) + w = tk.Text(frame, bg=BG, fg=FG, font=MONO, state="disabled", + relief="flat", wrap="none", insertbackground=FG, + selectbackground=BG3, selectforeground="#ffffff") + vsb = ttk.Scrollbar(frame, orient="vertical", command=w.yview) + hsb = ttk.Scrollbar(frame, orient="horizontal", command=w.xview) + w.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set) + vsb.pack(side=tk.RIGHT, fill=tk.Y) + hsb.pack(side=tk.BOTTOM, fill=tk.X) + w.pack(fill=tk.BOTH, expand=True) + return w + + def _build_query_tab(self) -> None: + frame = tk.Frame(self.nb, bg=BG) + self.nb.add(frame, text="Query DB") + pad = {"padx": 4, "pady": 2} + + filt = tk.Frame(frame, bg=BG2, pady=4) + filt.pack(side=tk.TOP, fill=tk.X) + + tk.Label(filt, text="Capture:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=0, sticky="e", **pad) + self._q_capture_var = tk.StringVar(value="All") + self._q_capture_cb = ttk.Combobox(filt, textvariable=self._q_capture_var, + width=20, font=MONO_SM, state="readonly") + self._q_capture_cb.grid(row=0, column=1, sticky="w", **pad) + + tk.Label(filt, text="Dir:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=2, sticky="e", **pad) + self._q_dir_var = tk.StringVar(value="All") + ttk.Combobox(filt, textvariable=self._q_dir_var, values=["All", "BW", "S3"], + width=6, font=MONO_SM, state="readonly").grid(row=0, column=3, sticky="w", **pad) + + tk.Label(filt, text="SUB:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=4, sticky="e", **pad) + self._q_sub_var = tk.StringVar(value="All") + self._q_sub_cb = ttk.Combobox(filt, textvariable=self._q_sub_var, + width=12, font=MONO_SM, state="readonly") + self._q_sub_cb.grid(row=0, column=5, sticky="w", **pad) + + tk.Label(filt, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=6, sticky="e", **pad) + self._q_offset_var = tk.StringVar() + tk.Entry(filt, textvariable=self._q_offset_var, width=8, bg=BG3, fg=FG, + font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=7, sticky="w", **pad) + + tk.Label(filt, text="Value:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=8, sticky="e", **pad) + self._q_value_var = tk.StringVar() + tk.Entry(filt, textvariable=self._q_value_var, width=8, bg=BG3, fg=FG, + font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=9, sticky="w", **pad) + + tk.Button(filt, text="Run Query", bg=ACCENT, fg="#ffffff", relief="flat", + padx=8, cursor="hand2", font=("Consolas", 8, "bold"), + command=self._run_db_query).grid(row=0, column=10, padx=8) + tk.Button(filt, text="Refresh", bg=BG3, fg=FG, relief="flat", + padx=6, cursor="hand2", font=MONO_SM, + command=self._refresh_query_dropdowns).grid(row=0, column=11, padx=4) + + self._q_stats_var = tk.StringVar(value="DB: —") + tk.Label(filt, textvariable=self._q_stats_var, bg=BG2, fg=FG_DIM, + font=MONO_SM).grid(row=0, column=12, padx=12, sticky="w") + + res_frame = tk.Frame(frame, bg=BG) + res_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True) + + cols = ("cap", "sess", "dir", "sub", "sub_name", "page", "len", "chk") + self._q_tree = ttk.Treeview(res_frame, columns=cols, show="headings", selectmode="browse") + for cid, head, w in [ + ("cap", "Cap", 40), ("sess", "Sess", 40), ("dir", "Dir", 40), + ("sub", "SUB", 50), ("sub_name", "Name", 160), ("page", "Page", 60), + ("len", "Len", 50), ("chk", "Chk", 50), + ]: + self._q_tree.heading(cid, text=head, anchor="w") + self._q_tree.column(cid, width=w, stretch=(cid == "sub_name")) + + q_vsb = ttk.Scrollbar(res_frame, orient="vertical", command=self._q_tree.yview) + q_hsb = ttk.Scrollbar(res_frame, orient="horizontal", command=self._q_tree.xview) + self._q_tree.configure(yscrollcommand=q_vsb.set, xscrollcommand=q_hsb.set) + q_vsb.pack(side=tk.RIGHT, fill=tk.Y) + q_hsb.pack(side=tk.BOTTOM, fill=tk.X) + self._q_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + self._q_tree.tag_configure("bw_row", foreground=COL_BW) + self._q_tree.tag_configure("s3_row", foreground=COL_S3) + self._q_tree.tag_configure("bad_row", foreground=RED) + self._q_tree.bind("<>", lambda _e: self._run_interpret()) + + interp_frame = tk.Frame(frame, bg=BG2, height=120) + interp_frame.pack(side=tk.BOTTOM, fill=tk.X) + interp_frame.pack_propagate(False) + tk.Label(interp_frame, text="Byte interpretation:", bg=BG2, fg=ACCENT, + font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X) + ii = tk.Frame(interp_frame, bg=BG2) + ii.pack(fill=tk.X, padx=6, pady=2) + tk.Label(ii, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).pack(side=tk.LEFT) + self._interp_offset_var = tk.StringVar(value="5") + tk.Entry(ii, textvariable=self._interp_offset_var, width=6, bg=BG3, fg=FG, + font=MONO_SM, insertbackground=FG, relief="flat").pack(side=tk.LEFT, padx=4) + tk.Button(ii, text="Interpret", bg=BG3, fg=FG, relief="flat", cursor="hand2", + font=MONO_SM, command=self._run_interpret).pack(side=tk.LEFT, padx=4) + self._interp_text = tk.Text(interp_frame, bg=BG2, fg=FG, font=MONO_SM, + height=4, relief="flat", state="disabled", + insertbackground=FG) + self._interp_text.pack(fill=tk.X, padx=6, pady=2) + self._interp_text.tag_configure("label", foreground=FG_DIM) + self._interp_text.tag_configure("value", foreground=YELLOW) + + self._q_rows: dict[str, object] = {} + self._q_capture_rows: list = [None] + self._q_sub_values: list = [None] + self._refresh_query_dropdowns() + + def _build_statusbar(self) -> None: + bar = tk.Frame(self, bg=BG3, height=20) + bar.pack(side=tk.BOTTOM, fill=tk.X) + self.sb_var = tk.StringVar(value="Ready") + tk.Label(bar, textvariable=self.sb_var, bg=BG3, fg=FG_DIM, + font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X) + + # ── file picking ────────────────────────────────────────────────────── + + def _browse(self, var: tk.StringVar, default: str) -> None: + path = filedialog.askopenfilename( + title=f"Select {default}", + filetypes=[("Binary", "*.bin"), ("All files", "*.*")], + initialfile=default, + ) + if path: + var.set(path) + + # ── analysis ────────────────────────────────────────────────────────── + + def _run_analyze(self) -> None: + s3p = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None + bwp = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None + if not s3p or not bwp: + messagebox.showerror("Missing files", "Select both S3 and BW raw files.") + return + if not s3p.exists(): + messagebox.showerror("Not found", f"S3 file not found:\n{s3p}") + return + if not bwp.exists(): + messagebox.showerror("Not found", f"BW file not found:\n{bwp}") + return + self.state.s3_path = s3p + self.state.bw_path = bwp + self._do_analyze(s3p, bwp) + + def _do_analyze(self, s3_path: Path, bw_path: Path) -> None: + self.status_var.set("Parsing...") + self.update_idletasks() + + s3_frames = annotate_frames(parse_s3(s3_path.read_bytes(), trailer_len=0), "S3") + bw_frames = annotate_frames(parse_bw(bw_path.read_bytes(), trailer_len=0, + validate_checksum=True), "BW") + sessions = split_into_sessions(bw_frames, s3_frames) + + diffs: list[Optional[list[FrameDiff]]] = [None] + for i in range(1, len(sessions)): + diffs.append(diff_sessions(sessions[i - 1], sessions[i])) + + self.state.sessions = sessions + self.state.diffs = diffs + + n_s3 = sum(len(s.s3_frames) for s in sessions) + n_bw = sum(len(s.bw_frames) for s in sessions) + self.status_var.set(f"{len(sessions)} sessions | BW:{n_bw} S3:{n_s3}") + self.sb_var.set(f"Loaded: {s3_path.name} + {bw_path.name}") + self.export_btn.configure(state="normal") + self._rebuild_tree() + + try: + cap_id = self._db.ingest(sessions, s3_path, bw_path) + if cap_id is not None: + self.state.last_capture_id = cap_id + self._refresh_query_dropdowns() + for i, lbl in enumerate(self._q_capture_cb["values"]): + if lbl.startswith(f"#{cap_id} "): + self._q_capture_cb.current(i) + break + except Exception as exc: + self.sb_var.set(f"DB ingest error: {exc}") + + def _run_export(self) -> None: + if not self.state.sessions: + messagebox.showinfo("Export", "Run Analyze first.") + return + outdir = self.state.s3_path.parent if self.state.s3_path else Path(".") + out_path = write_claude_export(self.state.sessions, self.state.diffs, + outdir, self.state.s3_path, self.state.bw_path) + self.sb_var.set(f"Exported: {out_path.name}") + if messagebox.askyesno("Export complete", f"Saved to:\n{out_path}\n\nOpen folder?"): + import subprocess as sp + sp.Popen(["explorer", str(out_path.parent)]) + + # ── session tree ────────────────────────────────────────────────────── + + def _rebuild_tree(self) -> None: + self.tree.delete(*self.tree.get_children()) + for sess in self.state.sessions: + is_complete = any(af.header and af.header.sub == 0x74 + for af in sess.bw_frames) + label = f"Session {sess.index}" + ("" if is_complete else " [partial]") + n_diff = len(self.state.diffs[sess.index] or []) + diff_str = f"{n_diff} changes" if n_diff else "" + sid = self.tree.insert("", tk.END, text=label, + values=(diff_str,), tags=("session",)) + for af in sess.all_frames: + src_tag = "bw_frame" if af.source == "BW" else "s3_frame" + sub_hex = f"{af.header.sub:02X}" if af.header else "??" + lbl_text = f"[{af.source}] {sub_hex} {af.sub_name}" + extra = "" + tags = (src_tag,) + if af.frame.checksum_valid is False: + extra = "BAD CHK"; tags = ("bad_chk",) + elif af.header is None: + lbl_text = f"[{af.source}] MALFORMED"; tags = ("malformed",) + self.tree.insert(sid, tk.END, text=lbl_text, values=(extra,), tags=tags, + iid=f"frame_{sess.index}_{af.frame.index}_{af.source}") + for item in self.tree.get_children(): + self.tree.item(item, open=True) + + def _on_tree_select(self, _e: tk.Event) -> None: + sel = self.tree.selection() + if not sel: + return + iid = sel[0] + if iid.startswith("frame_"): + parts = iid.split("_") + self._show_frame_detail(int(parts[1]), int(parts[2]), parts[3]) + else: + text = self.tree.item(iid, "text") + try: + self._show_session_detail(int(text.split()[1])) + except (IndexError, ValueError): + pass + + def _find_frame(self, sess_idx: int, frame_idx: int, source: str) -> Optional[AnnotatedFrame]: + if sess_idx >= len(self.state.sessions): + return None + pool = (self.state.sessions[sess_idx].bw_frames if source == "BW" + else self.state.sessions[sess_idx].s3_frames) + return next((af for af in pool if af.frame.index == frame_idx), None) + + # ── detail renderers ────────────────────────────────────────────────── + + def _clear_tabs(self) -> None: + for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text): + self._tc(w) + + def _show_session_detail(self, idx: int) -> None: + if idx >= len(self.state.sessions): + return + sess = self.state.sessions[idx] + diffs = self.state.diffs[idx] + self._clear_tabs() + + w = self.inv_text + self._tw(w, f"SESSION {sess.index}\n", "head") + self._tw(w, f"Frames: {len(sess.bw_frames)+len(sess.s3_frames)}" + f" (BW:{len(sess.bw_frames)} S3:{len(sess.s3_frames)})\n", "normal") + if len(sess.bw_frames) != len(sess.s3_frames): + self._tw(w, " WARNING: BW/S3 count mismatch\n", "warn") + self._tn(w) + for i, af in enumerate(sess.all_frames): + src = "bw" if af.source == "BW" else "s3" + sh = f"{af.header.sub:02X}" if af.header else "??" + pg = f" (page {af.header.page_key:04X})" if af.header and af.header.page_key else "" + chk = "" + if af.frame.checksum_valid is False: chk = " [BAD CHECKSUM]" + elif af.frame.checksum_valid is True: chk = f" [{af.frame.checksum_type}]" + self._tw(w, f" [{af.source}] #{i:<3} ", src) + self._tw(w, f"SUB={sh} ", "addr") + self._tw(w, f"{af.sub_name:<30}{pg} len={len(af.frame.payload)}", "dim") + if chk: self._tw(w, chk, "warn" if af.frame.checksum_valid is False else "dim") + self._tn(w) + + w = self.diff_text + self._tc(w) + if diffs is None: + self._tw(w, "(No previous session to diff against)\n", "dim") + elif not diffs: + self._tw(w, f"DIFF vs SESSION {idx-1}\n", "head") + self._tw(w, " No changes detected.\n", "dim") + else: + self._tw(w, f"DIFF vs SESSION {idx-1}\n", "head") + for fd in diffs: + pg = f" (page {fd.page_key:04X})" if fd.page_key else "" + self._tw(w, f"\n SUB {fd.sub:02X} ({fd.sub_name}){pg}:\n", "addr") + for bd in fd.diffs: + b = f"{bd.before:02x}" if bd.before >= 0 else "--" + a = f"{bd.after:02x}" if bd.after >= 0 else "--" + self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim") + self._tw(w, f"{b} -> {a}", "changed") + if bd.field_name: self._tw(w, f" [{bd.field_name}]", "known") + self._tn(w) + + report = render_session_report(sess, diffs, idx - 1 if idx > 0 else None) + self._tc(self.report_text) + self._tw(self.report_text, report, "normal") + self.nb.select(0) + + def _show_frame_detail(self, sess_idx: int, frame_idx: int, source: str) -> None: + af = self._find_frame(sess_idx, frame_idx, source) + if af is None: + return + self._clear_tabs() + src = "bw" if source == "BW" else "s3" + sh = f"{af.header.sub:02X}" if af.header else "??" + + w = self.inv_text + self._tw(w, f"[{af.source}] Frame #{af.frame.index}\n", src) + self._tw(w, f"Session {sess_idx} | ", "dim") + self._tw(w, f"SUB={sh} {af.sub_name}\n", "addr") + if af.header: + self._tw(w, f" OFFSET: {af.header.page_key:04X} " + f"CMD={af.header.cmd:02X} FLAGS={af.header.flags:02X}\n", "dim") + self._tn(w) + self._tw(w, f"Payload: {len(af.frame.payload)} bytes\n", "dim") + if af.frame.checksum_valid is False: self._tw(w, " BAD CHECKSUM\n", "warn") + elif af.frame.checksum_valid is True: + self._tw(w, f" Checksum: {af.frame.checksum_type} {af.frame.checksum_hex}\n", "dim") + self._tn(w) + p = af.frame.payload + if len(p) >= 5: + self._tw(w, "Header breakdown:\n", "head") + self._tw(w, f" [0] CMD = {p[0]:02x}\n", "dim") + self._tw(w, f" [1] ? = {p[1]:02x}\n", "dim") + self._tw(w, f" [2] SUB = {p[2]:02x} ({af.sub_name})\n", src) + self._tw(w, f" [3] OFFSET_HI = {p[3]:02x}\n", "dim") + self._tw(w, f" [4] OFFSET_LO = {p[4]:02x}\n", "dim") + if len(p) > 5: self._tw(w, f" [5..] data = {len(p)-5} bytes\n", "dim") + + w = self.hex_text + self._tw(w, f"[{af.source}] SUB={sh} {af.sub_name}\n", src) + self._tw(w, f"Payload ({len(af.frame.payload)} bytes):\n", "dim") + self._tn(w) + self._tw(w, "\n".join(format_hex_dump(af.frame.payload, indent=" ")) + "\n", "normal") + + diffs_for_sess = self.state.diffs[sess_idx] if sess_idx < len(self.state.diffs) else None + if diffs_for_sess and af.header: + matching = [fd for fd in diffs_for_sess + if fd.sub == af.header.sub and fd.page_key == af.header.page_key] + if matching: + self._tn(w) + self._tw(w, "Changed bytes (vs prev session):\n", "head") + for bd in matching[0].diffs: + b = f"{bd.before:02x}" if bd.before >= 0 else "--" + a = f"{bd.after:02x}" if bd.after >= 0 else "--" + self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim") + self._tw(w, f"{b} -> {a}", "changed") + if bd.field_name: self._tw(w, f" [{bd.field_name}]", "known") + self._tn(w) + self.nb.select(1) + + # ── live mode ───────────────────────────────────────────────────────── + + def _toggle_live(self) -> None: + if self._live_thread and self._live_thread.is_alive(): + self._live_stop.set() + self._set_live_btn_off() + else: + s3p = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None + bwp = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None + if not s3p or not bwp: + messagebox.showerror("Missing files", "Select both raw files before starting live mode.") + return + self.state.s3_path = s3p + self.state.bw_path = bwp + self._start_live() + + def _start_live(self) -> None: + s3p = Path(self.s3_var.get().strip()) + bwp = Path(self.bw_var.get().strip()) + self.state.s3_path = s3p + self.state.bw_path = bwp + self._live_stop.clear() + self._live_thread = threading.Thread( + target=self._live_worker, args=(s3p, bwp), daemon=True) + self._live_thread.start() + self.live_btn.configure(text="Live: ON", bg=GREEN, fg="#000000") + self.status_var.set("Live mode running...") + + def _set_live_btn_off(self) -> None: + self.live_btn.configure(text="Live: OFF", bg=BG3, fg=FG) + self.status_var.set("Live stopped") + + def _live_worker(self, s3_path: Path, bw_path: Path) -> None: + import time + s3_pos = bw_pos = 0 + while not self._live_stop.is_set(): + changed = False + for path, pos_attr in ((s3_path, "s3"), (bw_path, "bw")): + if path.exists(): + with path.open("rb") as fh: + fh.seek(s3_pos if pos_attr == "s3" else bw_pos) + nb = fh.read() + if nb: + if pos_attr == "s3": s3_pos += len(nb) + else: bw_pos += len(nb) + changed = True + if changed: + self._live_q.put("refresh") + time.sleep(0.1) + + def _poll_live_queue(self) -> None: + try: + while True: + msg = self._live_q.get_nowait() + if msg == "refresh" and self.state.s3_path and self.state.bw_path: + self._do_analyze(self.state.s3_path, self.state.bw_path) + except queue.Empty: + pass + finally: + self.after(150, self._poll_live_queue) + + # ── DB query ────────────────────────────────────────────────────────── + + def _refresh_query_dropdowns(self) -> None: + try: + captures = self._db.list_captures() + self._q_capture_cb["values"] = ["All"] + [ + f"#{r['id']} {r['timestamp'][:16]} ({r['frame_count']} frames)" + for r in captures + ] + self._q_capture_rows = [None] + [r["id"] for r in captures] + + subs = self._db.get_distinct_subs() + self._q_sub_cb["values"] = ["All"] + [f"0x{s:02X}" for s in subs] + self._q_sub_values = [None] + subs + + stats = self._db.get_stats() + self._q_stats_var.set(f"DB: {stats['captures']} captures | {stats['frames']} frames") + except Exception as exc: + self._q_stats_var.set(f"DB error: {exc}") + + def _parse_int(self, s: str) -> Optional[int]: + s = s.strip() + if not s: return None + try: return int(s, 0) + except ValueError: return None + + def _run_db_query(self) -> None: + cap_idx = self._q_capture_cb.current() + cap_id = self._q_capture_rows[cap_idx] if cap_idx > 0 else None + dir_val = self._q_dir_var.get() + direction = dir_val if dir_val != "All" else None + sub_idx = self._q_sub_cb.current() + sub = self._q_sub_values[sub_idx] if sub_idx > 0 else None + offset = self._parse_int(self._q_offset_var.get()) + value = self._parse_int(self._q_value_var.get()) + + try: + if offset is not None: + rows = self._db.query_by_byte(offset=offset, value=value, + capture_id=cap_id, direction=direction, sub=sub) + else: + rows = self._db.query_frames(capture_id=cap_id, direction=direction, sub=sub) + except Exception as exc: + messagebox.showerror("Query error", str(exc)) + return + + self._q_tree.delete(*self._q_tree.get_children()) + self._q_rows.clear() + for row in rows: + sh = f"0x{row['sub']:02X}" if row["sub"] is not None else "—" + pg = f"0x{row['page_key']:04X}" if row["page_key"] is not None else "—" + chk = {1: "OK", 0: "BAD", None: "—"}.get(row["checksum_ok"], "—") + tag = "bw_row" if row["direction"] == "BW" else "s3_row" + if row["checksum_ok"] == 0: tag = "bad_row" + iid = str(row["id"]) + self._q_tree.insert("", tk.END, iid=iid, tags=(tag,), values=( + row["capture_id"], row["session_idx"], row["direction"], + sh, row["sub_name"] or "", pg, row["payload_len"], chk)) + self._q_rows[iid] = row + self.sb_var.set(f"Query returned {len(rows)} rows") + + def _run_interpret(self) -> None: + sel = self._q_tree.selection() + if not sel: return + row = self._q_rows.get(sel[0]) + if not row: return + offset = self._parse_int(self._interp_offset_var.get()) + if offset is None: return + + payload = bytes(row["payload"]) + interp = self._db.interpret_offset(payload, offset) + + w = self._interp_text + w.configure(state="normal") + w.delete("1.0", tk.END) + sh = f"0x{row['sub']:02X}" if row["sub"] is not None else "??" + w.insert(tk.END, + f"Frame #{row['id']} [{row['direction']}] SUB={sh} " + f"offset={offset} (0x{offset:04X})\n", "label") + line = "" + for key, lbl in [ + ("uint8","uint8 "), ("int8","int8 "), + ("uint16_be","uint16 BE "), ("uint16_le","uint16 LE "), + ("uint32_be","uint32 BE "), ("uint32_le","uint32 LE "), + ("float32_be","float32 BE "), ("float32_le","float32 LE "), + ]: + if key not in interp: continue + val = interp[key] + vs = f"{val:.6g}" if isinstance(val, float) else ( + f"{val} (0x{int(val)&0xFFFFFFFF:X})") + line += f" {lbl}: {vs:<28}" + if len(line) > 80: + w.insert(tk.END, line + "\n", "value"); line = "" + if line: w.insert(tk.END, line + "\n", "value") + w.configure(state="disabled") + + # ── text helpers ────────────────────────────────────────────────────── + + def _tc(self, w: tk.Text) -> None: + w.configure(state="normal"); w.delete("1.0", tk.END) + + def _tw(self, w: tk.Text, text: str, tag: str = "normal") -> None: + w.configure(state="normal"); w.insert(tk.END, text, tag) + + def _tn(self, w: tk.Text) -> None: + w.configure(state="normal"); w.insert(tk.END, "\n"); w.configure(state="disabled") + + +# ───────────────────────────────────────────────────────────────────────────── +# Main application window +# ───────────────────────────────────────────────────────────────────────────── + +class SeismoLab(tk.Tk): + def __init__(self) -> None: + super().__init__() + self.title("Seismo Lab") + self.configure(bg=BG) + self.minsize(1100, 680) + + self._db = FrameDB() + + style = ttk.Style() + style.theme_use("clam") + style.configure("Top.TNotebook", background=BG3, borderwidth=0, tabposition="nw") + style.configure("Top.TNotebook.Tab", background=BG3, foreground=FG, + font=("Consolas", 10, "bold"), padding=[16, 6]) + style.map("Top.TNotebook.Tab", + background=[("selected", ACCENT)], + foreground=[("selected", "#ffffff")]) + + nb = ttk.Notebook(self, style="Top.TNotebook") + nb.pack(fill=tk.BOTH, expand=True) + + self._bridge_panel = BridgePanel( + nb, + on_bridge_started=self._on_bridge_started, + on_bridge_stopped=self._on_bridge_stopped, + ) + nb.add(self._bridge_panel, text=" Bridge ") + + self._analyzer_panel = AnalyzerPanel(nb, db=self._db) + nb.add(self._analyzer_panel, text=" Analyzer ") + + self._nb = nb + self.protocol("WM_DELETE_WINDOW", self._on_close) + + def _on_bridge_started(self, raw_bw: Optional[str], raw_s3: Optional[str]) -> None: + """Bridge started — inject paths into analyzer and start live mode.""" + self._analyzer_panel.set_live_files(raw_bw, raw_s3) + # Switch to Analyzer tab so the user can watch it update + self._nb.select(1) + + def _on_bridge_stopped(self) -> None: + self._analyzer_panel.stop_live() + + def _on_close(self) -> None: + self._bridge_panel.stop_bridge() + self.destroy() + + +# ───────────────────────────────────────────────────────────────────────────── + +def main() -> None: + app = SeismoLab() + app.mainloop() + + +if __name__ == "__main__": + main()