1038 lines
48 KiB
Python
1038 lines
48 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
seismo_lab.py — Combined S3 Bridge + Protocol Analyzer GUI.
|
|
|
|
Single window with two top-level tabs:
|
|
Bridge — capture live serial traffic (wraps s3_bridge.py as subprocess)
|
|
Analyzer — parse, diff, and query captured frames
|
|
|
|
When the bridge starts:
|
|
- raw tap paths are auto-filled in the Analyzer tab
|
|
- Live mode is automatically enabled so the Analyzer updates in real time
|
|
|
|
Run from anywhere:
|
|
python seismo_lab.py
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import os
|
|
import queue
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import tkinter as tk
|
|
from pathlib import Path
|
|
from tkinter import filedialog, messagebox, scrolledtext, simpledialog, ttk
|
|
from typing import Optional
|
|
|
|
# ── path setup ────────────────────────────────────────────────────────────────
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
BRIDGE_PATH = SCRIPT_DIR / "bridges" / "s3-bridge" / "s3_bridge.py"
|
|
PARSERS_DIR = SCRIPT_DIR / "parsers"
|
|
sys.path.insert(0, str(PARSERS_DIR))
|
|
|
|
from s3_analyzer import ( # noqa: E402
|
|
AnnotatedFrame,
|
|
FrameDiff,
|
|
Session,
|
|
annotate_frames,
|
|
diff_sessions,
|
|
format_hex_dump,
|
|
parse_bw,
|
|
parse_s3,
|
|
render_session_report,
|
|
split_into_sessions,
|
|
write_claude_export,
|
|
)
|
|
from frame_db import FrameDB # noqa: E402
|
|
|
|
# ── colour palette ────────────────────────────────────────────────────────────
|
|
BG = "#1e1e1e"
|
|
BG2 = "#252526"
|
|
BG3 = "#2d2d30"
|
|
FG = "#d4d4d4"
|
|
FG_DIM = "#6a6a6a"
|
|
ACCENT = "#569cd6"
|
|
RED = "#f44747"
|
|
YELLOW = "#dcdcaa"
|
|
GREEN = "#4caf50"
|
|
ORANGE = "#ce9178"
|
|
|
|
COL_BW = "#9cdcfe"
|
|
COL_S3 = "#4ec9b0"
|
|
COL_DIFF = "#f44747"
|
|
COL_KNOW = "#4caf50"
|
|
COL_HEAD = "#569cd6"
|
|
|
|
MONO = ("Consolas", 9)
|
|
MONO_SM = ("Consolas", 8)
|
|
MONO_B = ("Consolas", 9, "bold")
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Shared state
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
class AnalyzerState:
|
|
def __init__(self) -> None:
|
|
self.sessions: list[Session] = []
|
|
self.diffs: list[Optional[list[FrameDiff]]] = []
|
|
self.s3_path: Optional[Path] = None
|
|
self.bw_path: Optional[Path] = None
|
|
self.last_capture_id: Optional[int] = None
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Bridge panel (tk.Frame — lives inside a notebook tab)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
class BridgePanel(tk.Frame):
|
|
"""
|
|
All bridge controls and live log output.
|
|
Calls on_bridge_started(raw_bw_path, raw_s3_path) when the bridge starts
|
|
so the parent can wire up the Analyzer.
|
|
"""
|
|
|
|
def __init__(self, parent: tk.Widget, on_bridge_started, on_bridge_stopped, **kw):
|
|
super().__init__(parent, bg=BG2, **kw)
|
|
self._on_started = on_bridge_started
|
|
self._on_stopped = on_bridge_stopped
|
|
self.process: Optional[subprocess.Popen] = None
|
|
self._stdout_q: queue.Queue[str] = queue.Queue()
|
|
self._build()
|
|
self._poll_stdout()
|
|
|
|
# ── build ─────────────────────────────────────────────────────────────
|
|
|
|
def _build(self) -> None:
|
|
pad = {"padx": 6, "pady": 4}
|
|
|
|
cfg = tk.Frame(self, bg=BG2)
|
|
cfg.pack(side=tk.TOP, fill=tk.X, padx=4, pady=4)
|
|
|
|
# Row 0: ports
|
|
tk.Label(cfg, text="BW COM:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=0, sticky="e", **pad)
|
|
self.bw_var = tk.StringVar(value="COM4")
|
|
tk.Entry(cfg, textvariable=self.bw_var, width=10,
|
|
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
|
|
font=MONO).grid(row=0, column=1, sticky="w", **pad)
|
|
|
|
tk.Label(cfg, text="S3 COM:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=2, sticky="e", **pad)
|
|
self.s3_var = tk.StringVar(value="COM5")
|
|
tk.Entry(cfg, textvariable=self.s3_var, width=10,
|
|
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
|
|
font=MONO).grid(row=0, column=3, sticky="w", **pad)
|
|
|
|
tk.Label(cfg, text="Baud:", bg=BG2, fg=FG, font=MONO).grid(row=0, column=4, sticky="e", **pad)
|
|
self.baud_var = tk.StringVar(value="38400")
|
|
tk.Entry(cfg, textvariable=self.baud_var, width=8,
|
|
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
|
|
font=MONO).grid(row=0, column=5, sticky="w", **pad)
|
|
|
|
# Row 1: log dir
|
|
tk.Label(cfg, text="Log dir:", bg=BG2, fg=FG, font=MONO).grid(row=1, column=0, sticky="e", **pad)
|
|
self.logdir_var = tk.StringVar(value=str(SCRIPT_DIR / "bridges" / "captures"))
|
|
tk.Entry(cfg, textvariable=self.logdir_var, width=40,
|
|
bg=BG3, fg=FG, insertbackground=FG, relief="flat",
|
|
font=MONO).grid(row=1, column=1, columnspan=4, sticky="we", **pad)
|
|
tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
|
|
font=MONO, command=self._choose_dir).grid(row=1, column=5, **pad)
|
|
|
|
# Row 2: raw taps (always enabled — timestamped names generated at start)
|
|
self._raw_bw_on = tk.BooleanVar(value=True)
|
|
self._raw_s3_on = tk.BooleanVar(value=True)
|
|
tk.Checkbutton(cfg, text="Capture BW->S3 raw", variable=self._raw_bw_on,
|
|
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
|
|
font=MONO).grid(row=2, column=0, columnspan=2, sticky="w", **pad)
|
|
tk.Checkbutton(cfg, text="Capture S3->BW raw", variable=self._raw_s3_on,
|
|
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
|
|
font=MONO).grid(row=2, column=2, columnspan=2, sticky="w", **pad)
|
|
|
|
# Row 3: buttons + status
|
|
btn_row = tk.Frame(self, bg=BG2)
|
|
btn_row.pack(side=tk.TOP, fill=tk.X, padx=4, pady=2)
|
|
|
|
self.start_btn = tk.Button(btn_row, text="Start Bridge", bg=GREEN, fg="#000000",
|
|
relief="flat", padx=12, cursor="hand2", font=MONO_B,
|
|
command=self.start_bridge)
|
|
self.start_btn.pack(side=tk.LEFT, padx=6)
|
|
|
|
self.stop_btn = tk.Button(btn_row, text="Stop Bridge", bg=BG3, fg=FG,
|
|
relief="flat", padx=12, cursor="hand2", font=MONO,
|
|
command=self.stop_bridge, state="disabled")
|
|
self.stop_btn.pack(side=tk.LEFT, padx=4)
|
|
|
|
self.mark_btn = tk.Button(btn_row, text="Add Mark", bg=BG3, fg=FG,
|
|
relief="flat", padx=10, cursor="hand2", font=MONO,
|
|
command=self.add_mark, state="disabled")
|
|
self.mark_btn.pack(side=tk.LEFT, padx=4)
|
|
|
|
self.status_var = tk.StringVar(value="Idle")
|
|
tk.Label(btn_row, textvariable=self.status_var,
|
|
bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=10)
|
|
|
|
# Log output
|
|
self.log_view = scrolledtext.ScrolledText(
|
|
self, height=18, font=MONO_SM,
|
|
bg=BG, fg=FG, insertbackground=FG,
|
|
relief="flat", state="disabled",
|
|
)
|
|
self.log_view.pack(fill=tk.BOTH, expand=True, padx=4, pady=4)
|
|
|
|
# ── helpers ───────────────────────────────────────────────────────────
|
|
|
|
def _choose_dir(self) -> None:
|
|
path = filedialog.askdirectory(initialdir=self.logdir_var.get())
|
|
if path:
|
|
self.logdir_var.set(path)
|
|
|
|
def _append_log(self, text: str) -> None:
|
|
self.log_view.configure(state="normal")
|
|
self.log_view.insert(tk.END, text)
|
|
self.log_view.see(tk.END)
|
|
self.log_view.configure(state="disabled")
|
|
|
|
# ── bridge control ────────────────────────────────────────────────────
|
|
|
|
def start_bridge(self) -> None:
|
|
if self.process and self.process.poll() is None:
|
|
messagebox.showinfo("Bridge", "Bridge is already running.")
|
|
return
|
|
|
|
bw = self.bw_var.get().strip()
|
|
s3 = self.s3_var.get().strip()
|
|
baud = self.baud_var.get().strip()
|
|
logdir = self.logdir_var.get().strip() or "."
|
|
|
|
if not bw or not s3:
|
|
messagebox.showerror("Error", "Please enter both BW and S3 COM ports.")
|
|
return
|
|
|
|
os.makedirs(logdir, exist_ok=True)
|
|
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
args = [sys.executable, str(BRIDGE_PATH),
|
|
"--bw", bw, "--s3", s3, "--baud", baud, "--logdir", logdir]
|
|
|
|
raw_bw_path = raw_s3_path = None
|
|
if self._raw_bw_on.get():
|
|
raw_bw_path = os.path.join(logdir, f"raw_bw_{ts}.bin")
|
|
args += ["--raw-bw", raw_bw_path]
|
|
if self._raw_s3_on.get():
|
|
raw_s3_path = os.path.join(logdir, f"raw_s3_{ts}.bin")
|
|
args += ["--raw-s3", raw_s3_path]
|
|
|
|
try:
|
|
self.process = subprocess.Popen(
|
|
args,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
stdin=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1,
|
|
)
|
|
except Exception as e:
|
|
messagebox.showerror("Error", f"Failed to start bridge:\n{e}")
|
|
return
|
|
|
|
threading.Thread(target=self._reader_thread, daemon=True).start()
|
|
self.status_var.set(f"Running — {bw} <-> {s3}")
|
|
self.start_btn.configure(state="disabled")
|
|
self.stop_btn.configure(state="normal", bg=RED)
|
|
self.mark_btn.configure(state="normal")
|
|
self._append_log(f"== Bridge started [{ts}] ==\n")
|
|
|
|
# Notify parent so Analyzer can wire up live mode
|
|
self._on_started(raw_bw_path, raw_s3_path)
|
|
|
|
def stop_bridge(self) -> None:
|
|
if self.process and self.process.poll() is None:
|
|
self.process.terminate()
|
|
try:
|
|
self.process.wait(timeout=3)
|
|
except subprocess.TimeoutExpired:
|
|
self.process.kill()
|
|
self._bridge_ended()
|
|
self._on_stopped()
|
|
|
|
def _bridge_ended(self) -> None:
|
|
self.status_var.set("Stopped")
|
|
self.start_btn.configure(state="normal")
|
|
self.stop_btn.configure(state="disabled", bg=BG3)
|
|
self.mark_btn.configure(state="disabled")
|
|
self._append_log("== Bridge stopped ==\n")
|
|
|
|
def _reader_thread(self) -> None:
|
|
if not self.process or not self.process.stdout:
|
|
return
|
|
for line in self.process.stdout:
|
|
self._stdout_q.put(line)
|
|
self._stdout_q.put("<<exit>>")
|
|
|
|
def _poll_stdout(self) -> None:
|
|
try:
|
|
while True:
|
|
line = self._stdout_q.get_nowait()
|
|
if line == "<<exit>>":
|
|
self._bridge_ended()
|
|
self._on_stopped()
|
|
break
|
|
self._append_log(line)
|
|
except queue.Empty:
|
|
pass
|
|
finally:
|
|
self.after(100, self._poll_stdout)
|
|
|
|
def add_mark(self) -> None:
|
|
if not self.process or not self.process.stdin or self.process.poll() is not None:
|
|
return
|
|
label = simpledialog.askstring("Mark", "Enter label for this mark:", parent=self)
|
|
if not label or not label.strip():
|
|
return
|
|
try:
|
|
self.process.stdin.write("m\n")
|
|
self.process.stdin.write(label.strip() + "\n")
|
|
self.process.stdin.flush()
|
|
self._append_log(f"[MARK] {label.strip()}\n")
|
|
except Exception as e:
|
|
messagebox.showerror("Error", f"Failed to send mark:\n{e}")
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Analyzer panel (tk.Frame — lives inside a notebook tab)
|
|
# Extracted from gui_analyzer.py; accepts external path injection.
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
class AnalyzerPanel(tk.Frame):
|
|
def __init__(self, parent: tk.Widget, db: FrameDB, **kw):
|
|
super().__init__(parent, bg=BG, **kw)
|
|
self._db = db
|
|
self.state = AnalyzerState()
|
|
self._live_thread: Optional[threading.Thread] = None
|
|
self._live_stop = threading.Event()
|
|
self._live_q: queue.Queue[str] = queue.Queue()
|
|
self._build()
|
|
self._poll_live_queue()
|
|
|
|
# ── external API (called by parent when bridge starts/stops) ──────────
|
|
|
|
def set_live_files(self, raw_bw: Optional[str], raw_s3: Optional[str]) -> None:
|
|
"""Called when the bridge starts — inject file paths and start live mode."""
|
|
if raw_s3:
|
|
self.s3_var.set(raw_s3)
|
|
if raw_bw:
|
|
self.bw_var.set(raw_bw)
|
|
if raw_s3 and raw_bw:
|
|
self._start_live()
|
|
|
|
def stop_live(self) -> None:
|
|
"""Called when the bridge stops."""
|
|
if self._live_thread and self._live_thread.is_alive():
|
|
self._live_stop.set()
|
|
self._set_live_btn_off()
|
|
|
|
# ── build ─────────────────────────────────────────────────────────────
|
|
|
|
def _build(self) -> None:
|
|
self._build_toolbar()
|
|
self._build_panes()
|
|
self._build_statusbar()
|
|
|
|
def _build_toolbar(self) -> None:
|
|
bar = tk.Frame(self, bg=BG2, pady=4)
|
|
bar.pack(side=tk.TOP, fill=tk.X)
|
|
pad = {"padx": 5, "pady": 2}
|
|
|
|
tk.Label(bar, text="S3 raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
|
|
self.s3_var = tk.StringVar()
|
|
tk.Entry(bar, textvariable=self.s3_var, width=30, bg=BG3, fg=FG,
|
|
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
|
|
tk.Button(bar, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
|
|
font=MONO, command=lambda: self._browse(self.s3_var, "raw_s3.bin")
|
|
).pack(side=tk.LEFT, **pad)
|
|
|
|
tk.Label(bar, text=" BW raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
|
|
self.bw_var = tk.StringVar()
|
|
tk.Entry(bar, textvariable=self.bw_var, width=30, bg=BG3, fg=FG,
|
|
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
|
|
tk.Button(bar, text="Browse", bg=BG3, fg=FG, relief="flat", cursor="hand2",
|
|
font=MONO, command=lambda: self._browse(self.bw_var, "raw_bw.bin")
|
|
).pack(side=tk.LEFT, **pad)
|
|
|
|
tk.Frame(bar, bg=BG2, width=10).pack(side=tk.LEFT)
|
|
self.analyze_btn = tk.Button(bar, text="Analyze", bg=ACCENT, fg="#ffffff",
|
|
relief="flat", padx=10, cursor="hand2", font=MONO_B,
|
|
command=self._run_analyze)
|
|
self.analyze_btn.pack(side=tk.LEFT, **pad)
|
|
|
|
self.live_btn = tk.Button(bar, text="Live: OFF", bg=BG3, fg=FG,
|
|
relief="flat", padx=10, cursor="hand2",
|
|
font=MONO, command=self._toggle_live)
|
|
self.live_btn.pack(side=tk.LEFT, **pad)
|
|
|
|
self.export_btn = tk.Button(bar, text="Export for Claude", bg=ORANGE, fg="#000000",
|
|
relief="flat", padx=10, cursor="hand2", font=MONO_B,
|
|
command=self._run_export, state="disabled")
|
|
self.export_btn.pack(side=tk.LEFT, **pad)
|
|
|
|
self.status_var = tk.StringVar(value="Idle")
|
|
tk.Label(bar, textvariable=self.status_var, bg=BG2, fg=FG_DIM,
|
|
font=MONO, anchor="w").pack(side=tk.LEFT, padx=10)
|
|
|
|
def _build_panes(self) -> None:
|
|
pane = tk.PanedWindow(self, orient=tk.HORIZONTAL, bg=BG,
|
|
sashwidth=4, sashrelief="flat")
|
|
pane.pack(fill=tk.BOTH, expand=True)
|
|
|
|
# Left: session tree
|
|
left = tk.Frame(pane, bg=BG2, width=260)
|
|
pane.add(left, minsize=200)
|
|
|
|
tk.Label(left, text="Sessions", bg=BG2, fg=ACCENT,
|
|
font=MONO_B, anchor="w", padx=6).pack(fill=tk.X)
|
|
|
|
tree_frame = tk.Frame(left, bg=BG2)
|
|
tree_frame.pack(fill=tk.BOTH, expand=True)
|
|
|
|
style = ttk.Style()
|
|
style.theme_use("clam")
|
|
style.configure("Treeview", background=BG2, foreground=FG,
|
|
fieldbackground=BG2, font=MONO_SM, rowheight=18, borderwidth=0)
|
|
style.configure("Treeview.Heading", background=BG3, foreground=ACCENT, font=MONO_SM)
|
|
style.map("Treeview", background=[("selected", BG3)],
|
|
foreground=[("selected", "#ffffff")])
|
|
|
|
self.tree = ttk.Treeview(tree_frame, columns=("info",), show="tree headings",
|
|
selectmode="browse")
|
|
self.tree.heading("#0", text="Frame")
|
|
self.tree.heading("info", text="Info")
|
|
self.tree.column("#0", width=160, stretch=True)
|
|
self.tree.column("info", width=80, stretch=False)
|
|
|
|
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.tree.yview)
|
|
self.tree.configure(yscrollcommand=vsb.set)
|
|
vsb.pack(side=tk.RIGHT, fill=tk.Y)
|
|
self.tree.pack(fill=tk.BOTH, expand=True)
|
|
|
|
self.tree.tag_configure("session", foreground=ACCENT, font=MONO_B)
|
|
self.tree.tag_configure("bw_frame", foreground=COL_BW)
|
|
self.tree.tag_configure("s3_frame", foreground=COL_S3)
|
|
self.tree.tag_configure("bad_chk", foreground=RED)
|
|
self.tree.tag_configure("malformed", foreground=RED)
|
|
self.tree.bind("<<TreeviewSelect>>", self._on_tree_select)
|
|
|
|
# Right: detail notebook
|
|
right = tk.Frame(pane, bg=BG)
|
|
pane.add(right, minsize=600)
|
|
|
|
style.configure("TNotebook", background=BG2, borderwidth=0)
|
|
style.configure("TNotebook.Tab", background=BG3, foreground=FG,
|
|
font=MONO, padding=[8, 2])
|
|
style.map("TNotebook.Tab", background=[("selected", BG)],
|
|
foreground=[("selected", ACCENT)])
|
|
|
|
self.nb = ttk.Notebook(right)
|
|
self.nb.pack(fill=tk.BOTH, expand=True)
|
|
|
|
self.inv_text = self._make_text_tab("Inventory")
|
|
self.hex_text = self._make_text_tab("Hex Dump")
|
|
self.diff_text = self._make_text_tab("Diff")
|
|
self.report_text = self._make_text_tab("Full Report")
|
|
self._build_query_tab()
|
|
|
|
for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text):
|
|
w.tag_configure("head", foreground=COL_HEAD, font=MONO_B)
|
|
w.tag_configure("bw", foreground=COL_BW)
|
|
w.tag_configure("s3", foreground=COL_S3)
|
|
w.tag_configure("changed", foreground=COL_DIFF)
|
|
w.tag_configure("known", foreground=COL_KNOW)
|
|
w.tag_configure("dim", foreground=FG_DIM)
|
|
w.tag_configure("normal", foreground=FG)
|
|
w.tag_configure("warn", foreground=YELLOW)
|
|
w.tag_configure("addr", foreground=ORANGE)
|
|
|
|
def _make_text_tab(self, title: str) -> tk.Text:
|
|
frame = tk.Frame(self.nb, bg=BG)
|
|
self.nb.add(frame, text=title)
|
|
w = tk.Text(frame, bg=BG, fg=FG, font=MONO, state="disabled",
|
|
relief="flat", wrap="none", insertbackground=FG,
|
|
selectbackground=BG3, selectforeground="#ffffff")
|
|
vsb = ttk.Scrollbar(frame, orient="vertical", command=w.yview)
|
|
hsb = ttk.Scrollbar(frame, orient="horizontal", command=w.xview)
|
|
w.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
|
|
vsb.pack(side=tk.RIGHT, fill=tk.Y)
|
|
hsb.pack(side=tk.BOTTOM, fill=tk.X)
|
|
w.pack(fill=tk.BOTH, expand=True)
|
|
return w
|
|
|
|
def _build_query_tab(self) -> None:
|
|
frame = tk.Frame(self.nb, bg=BG)
|
|
self.nb.add(frame, text="Query DB")
|
|
pad = {"padx": 4, "pady": 2}
|
|
|
|
filt = tk.Frame(frame, bg=BG2, pady=4)
|
|
filt.pack(side=tk.TOP, fill=tk.X)
|
|
|
|
tk.Label(filt, text="Capture:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=0, sticky="e", **pad)
|
|
self._q_capture_var = tk.StringVar(value="All")
|
|
self._q_capture_cb = ttk.Combobox(filt, textvariable=self._q_capture_var,
|
|
width=20, font=MONO_SM, state="readonly")
|
|
self._q_capture_cb.grid(row=0, column=1, sticky="w", **pad)
|
|
|
|
tk.Label(filt, text="Dir:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=2, sticky="e", **pad)
|
|
self._q_dir_var = tk.StringVar(value="All")
|
|
ttk.Combobox(filt, textvariable=self._q_dir_var, values=["All", "BW", "S3"],
|
|
width=6, font=MONO_SM, state="readonly").grid(row=0, column=3, sticky="w", **pad)
|
|
|
|
tk.Label(filt, text="SUB:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=4, sticky="e", **pad)
|
|
self._q_sub_var = tk.StringVar(value="All")
|
|
self._q_sub_cb = ttk.Combobox(filt, textvariable=self._q_sub_var,
|
|
width=12, font=MONO_SM, state="readonly")
|
|
self._q_sub_cb.grid(row=0, column=5, sticky="w", **pad)
|
|
|
|
tk.Label(filt, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=6, sticky="e", **pad)
|
|
self._q_offset_var = tk.StringVar()
|
|
tk.Entry(filt, textvariable=self._q_offset_var, width=8, bg=BG3, fg=FG,
|
|
font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=7, sticky="w", **pad)
|
|
|
|
tk.Label(filt, text="Value:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=8, sticky="e", **pad)
|
|
self._q_value_var = tk.StringVar()
|
|
tk.Entry(filt, textvariable=self._q_value_var, width=8, bg=BG3, fg=FG,
|
|
font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=9, sticky="w", **pad)
|
|
|
|
tk.Button(filt, text="Run Query", bg=ACCENT, fg="#ffffff", relief="flat",
|
|
padx=8, cursor="hand2", font=("Consolas", 8, "bold"),
|
|
command=self._run_db_query).grid(row=0, column=10, padx=8)
|
|
tk.Button(filt, text="Refresh", bg=BG3, fg=FG, relief="flat",
|
|
padx=6, cursor="hand2", font=MONO_SM,
|
|
command=self._refresh_query_dropdowns).grid(row=0, column=11, padx=4)
|
|
|
|
self._q_stats_var = tk.StringVar(value="DB: —")
|
|
tk.Label(filt, textvariable=self._q_stats_var, bg=BG2, fg=FG_DIM,
|
|
font=MONO_SM).grid(row=0, column=12, padx=12, sticky="w")
|
|
|
|
res_frame = tk.Frame(frame, bg=BG)
|
|
res_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
|
|
|
|
cols = ("cap", "sess", "dir", "sub", "sub_name", "page", "len", "chk")
|
|
self._q_tree = ttk.Treeview(res_frame, columns=cols, show="headings", selectmode="browse")
|
|
for cid, head, w in [
|
|
("cap", "Cap", 40), ("sess", "Sess", 40), ("dir", "Dir", 40),
|
|
("sub", "SUB", 50), ("sub_name", "Name", 160), ("page", "Page", 60),
|
|
("len", "Len", 50), ("chk", "Chk", 50),
|
|
]:
|
|
self._q_tree.heading(cid, text=head, anchor="w")
|
|
self._q_tree.column(cid, width=w, stretch=(cid == "sub_name"))
|
|
|
|
q_vsb = ttk.Scrollbar(res_frame, orient="vertical", command=self._q_tree.yview)
|
|
q_hsb = ttk.Scrollbar(res_frame, orient="horizontal", command=self._q_tree.xview)
|
|
self._q_tree.configure(yscrollcommand=q_vsb.set, xscrollcommand=q_hsb.set)
|
|
q_vsb.pack(side=tk.RIGHT, fill=tk.Y)
|
|
q_hsb.pack(side=tk.BOTTOM, fill=tk.X)
|
|
self._q_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
|
self._q_tree.tag_configure("bw_row", foreground=COL_BW)
|
|
self._q_tree.tag_configure("s3_row", foreground=COL_S3)
|
|
self._q_tree.tag_configure("bad_row", foreground=RED)
|
|
self._q_tree.bind("<<TreeviewSelect>>", lambda _e: self._run_interpret())
|
|
|
|
interp_frame = tk.Frame(frame, bg=BG2, height=120)
|
|
interp_frame.pack(side=tk.BOTTOM, fill=tk.X)
|
|
interp_frame.pack_propagate(False)
|
|
tk.Label(interp_frame, text="Byte interpretation:", bg=BG2, fg=ACCENT,
|
|
font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X)
|
|
ii = tk.Frame(interp_frame, bg=BG2)
|
|
ii.pack(fill=tk.X, padx=6, pady=2)
|
|
tk.Label(ii, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).pack(side=tk.LEFT)
|
|
self._interp_offset_var = tk.StringVar(value="5")
|
|
tk.Entry(ii, textvariable=self._interp_offset_var, width=6, bg=BG3, fg=FG,
|
|
font=MONO_SM, insertbackground=FG, relief="flat").pack(side=tk.LEFT, padx=4)
|
|
tk.Button(ii, text="Interpret", bg=BG3, fg=FG, relief="flat", cursor="hand2",
|
|
font=MONO_SM, command=self._run_interpret).pack(side=tk.LEFT, padx=4)
|
|
self._interp_text = tk.Text(interp_frame, bg=BG2, fg=FG, font=MONO_SM,
|
|
height=4, relief="flat", state="disabled",
|
|
insertbackground=FG)
|
|
self._interp_text.pack(fill=tk.X, padx=6, pady=2)
|
|
self._interp_text.tag_configure("label", foreground=FG_DIM)
|
|
self._interp_text.tag_configure("value", foreground=YELLOW)
|
|
|
|
self._q_rows: dict[str, object] = {}
|
|
self._q_capture_rows: list = [None]
|
|
self._q_sub_values: list = [None]
|
|
self._refresh_query_dropdowns()
|
|
|
|
def _build_statusbar(self) -> None:
|
|
bar = tk.Frame(self, bg=BG3, height=20)
|
|
bar.pack(side=tk.BOTTOM, fill=tk.X)
|
|
self.sb_var = tk.StringVar(value="Ready")
|
|
tk.Label(bar, textvariable=self.sb_var, bg=BG3, fg=FG_DIM,
|
|
font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X)
|
|
|
|
# ── file picking ──────────────────────────────────────────────────────
|
|
|
|
def _browse(self, var: tk.StringVar, default: str) -> None:
|
|
path = filedialog.askopenfilename(
|
|
title=f"Select {default}",
|
|
filetypes=[("Binary", "*.bin"), ("All files", "*.*")],
|
|
initialfile=default,
|
|
)
|
|
if path:
|
|
var.set(path)
|
|
|
|
# ── analysis ──────────────────────────────────────────────────────────
|
|
|
|
def _run_analyze(self) -> None:
|
|
s3p = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None
|
|
bwp = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None
|
|
if not s3p or not bwp:
|
|
messagebox.showerror("Missing files", "Select both S3 and BW raw files.")
|
|
return
|
|
if not s3p.exists():
|
|
messagebox.showerror("Not found", f"S3 file not found:\n{s3p}")
|
|
return
|
|
if not bwp.exists():
|
|
messagebox.showerror("Not found", f"BW file not found:\n{bwp}")
|
|
return
|
|
self.state.s3_path = s3p
|
|
self.state.bw_path = bwp
|
|
self._do_analyze(s3p, bwp)
|
|
|
|
def _do_analyze(self, s3_path: Path, bw_path: Path) -> None:
|
|
self.status_var.set("Parsing...")
|
|
self.update_idletasks()
|
|
|
|
s3_frames = annotate_frames(parse_s3(s3_path.read_bytes(), trailer_len=0), "S3")
|
|
bw_frames = annotate_frames(parse_bw(bw_path.read_bytes(), trailer_len=0,
|
|
validate_checksum=True), "BW")
|
|
sessions = split_into_sessions(bw_frames, s3_frames)
|
|
|
|
diffs: list[Optional[list[FrameDiff]]] = [None]
|
|
for i in range(1, len(sessions)):
|
|
diffs.append(diff_sessions(sessions[i - 1], sessions[i]))
|
|
|
|
self.state.sessions = sessions
|
|
self.state.diffs = diffs
|
|
|
|
n_s3 = sum(len(s.s3_frames) for s in sessions)
|
|
n_bw = sum(len(s.bw_frames) for s in sessions)
|
|
self.status_var.set(f"{len(sessions)} sessions | BW:{n_bw} S3:{n_s3}")
|
|
self.sb_var.set(f"Loaded: {s3_path.name} + {bw_path.name}")
|
|
self.export_btn.configure(state="normal")
|
|
self._rebuild_tree()
|
|
|
|
try:
|
|
cap_id = self._db.ingest(sessions, s3_path, bw_path)
|
|
if cap_id is not None:
|
|
self.state.last_capture_id = cap_id
|
|
self._refresh_query_dropdowns()
|
|
for i, lbl in enumerate(self._q_capture_cb["values"]):
|
|
if lbl.startswith(f"#{cap_id} "):
|
|
self._q_capture_cb.current(i)
|
|
break
|
|
except Exception as exc:
|
|
self.sb_var.set(f"DB ingest error: {exc}")
|
|
|
|
def _run_export(self) -> None:
|
|
if not self.state.sessions:
|
|
messagebox.showinfo("Export", "Run Analyze first.")
|
|
return
|
|
outdir = self.state.s3_path.parent if self.state.s3_path else Path(".")
|
|
out_path = write_claude_export(self.state.sessions, self.state.diffs,
|
|
outdir, self.state.s3_path, self.state.bw_path)
|
|
self.sb_var.set(f"Exported: {out_path.name}")
|
|
if messagebox.askyesno("Export complete", f"Saved to:\n{out_path}\n\nOpen folder?"):
|
|
import subprocess as sp
|
|
sp.Popen(["explorer", str(out_path.parent)])
|
|
|
|
# ── session tree ──────────────────────────────────────────────────────
|
|
|
|
def _rebuild_tree(self) -> None:
|
|
self.tree.delete(*self.tree.get_children())
|
|
for sess in self.state.sessions:
|
|
is_complete = any(af.header and af.header.sub == 0x74
|
|
for af in sess.bw_frames)
|
|
label = f"Session {sess.index}" + ("" if is_complete else " [partial]")
|
|
n_diff = len(self.state.diffs[sess.index] or [])
|
|
diff_str = f"{n_diff} changes" if n_diff else ""
|
|
sid = self.tree.insert("", tk.END, text=label,
|
|
values=(diff_str,), tags=("session",))
|
|
for af in sess.all_frames:
|
|
src_tag = "bw_frame" if af.source == "BW" else "s3_frame"
|
|
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
|
|
lbl_text = f"[{af.source}] {sub_hex} {af.sub_name}"
|
|
extra = ""
|
|
tags = (src_tag,)
|
|
if af.frame.checksum_valid is False:
|
|
extra = "BAD CHK"; tags = ("bad_chk",)
|
|
elif af.header is None:
|
|
lbl_text = f"[{af.source}] MALFORMED"; tags = ("malformed",)
|
|
self.tree.insert(sid, tk.END, text=lbl_text, values=(extra,), tags=tags,
|
|
iid=f"frame_{sess.index}_{af.frame.index}_{af.source}")
|
|
for item in self.tree.get_children():
|
|
self.tree.item(item, open=True)
|
|
|
|
def _on_tree_select(self, _e: tk.Event) -> None:
|
|
sel = self.tree.selection()
|
|
if not sel:
|
|
return
|
|
iid = sel[0]
|
|
if iid.startswith("frame_"):
|
|
parts = iid.split("_")
|
|
self._show_frame_detail(int(parts[1]), int(parts[2]), parts[3])
|
|
else:
|
|
text = self.tree.item(iid, "text")
|
|
try:
|
|
self._show_session_detail(int(text.split()[1]))
|
|
except (IndexError, ValueError):
|
|
pass
|
|
|
|
def _find_frame(self, sess_idx: int, frame_idx: int, source: str) -> Optional[AnnotatedFrame]:
|
|
if sess_idx >= len(self.state.sessions):
|
|
return None
|
|
pool = (self.state.sessions[sess_idx].bw_frames if source == "BW"
|
|
else self.state.sessions[sess_idx].s3_frames)
|
|
return next((af for af in pool if af.frame.index == frame_idx), None)
|
|
|
|
# ── detail renderers ──────────────────────────────────────────────────
|
|
|
|
def _clear_tabs(self) -> None:
|
|
for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text):
|
|
self._tc(w)
|
|
|
|
def _show_session_detail(self, idx: int) -> None:
|
|
if idx >= len(self.state.sessions):
|
|
return
|
|
sess = self.state.sessions[idx]
|
|
diffs = self.state.diffs[idx]
|
|
self._clear_tabs()
|
|
|
|
w = self.inv_text
|
|
self._tw(w, f"SESSION {sess.index}\n", "head")
|
|
self._tw(w, f"Frames: {len(sess.bw_frames)+len(sess.s3_frames)}"
|
|
f" (BW:{len(sess.bw_frames)} S3:{len(sess.s3_frames)})\n", "normal")
|
|
if len(sess.bw_frames) != len(sess.s3_frames):
|
|
self._tw(w, " WARNING: BW/S3 count mismatch\n", "warn")
|
|
self._tn(w)
|
|
for i, af in enumerate(sess.all_frames):
|
|
src = "bw" if af.source == "BW" else "s3"
|
|
sh = f"{af.header.sub:02X}" if af.header else "??"
|
|
pg = f" (page {af.header.page_key:04X})" if af.header and af.header.page_key else ""
|
|
chk = ""
|
|
if af.frame.checksum_valid is False: chk = " [BAD CHECKSUM]"
|
|
elif af.frame.checksum_valid is True: chk = f" [{af.frame.checksum_type}]"
|
|
self._tw(w, f" [{af.source}] #{i:<3} ", src)
|
|
self._tw(w, f"SUB={sh} ", "addr")
|
|
self._tw(w, f"{af.sub_name:<30}{pg} len={len(af.frame.payload)}", "dim")
|
|
if chk: self._tw(w, chk, "warn" if af.frame.checksum_valid is False else "dim")
|
|
self._tn(w)
|
|
|
|
w = self.diff_text
|
|
self._tc(w)
|
|
if diffs is None:
|
|
self._tw(w, "(No previous session to diff against)\n", "dim")
|
|
elif not diffs:
|
|
self._tw(w, f"DIFF vs SESSION {idx-1}\n", "head")
|
|
self._tw(w, " No changes detected.\n", "dim")
|
|
else:
|
|
self._tw(w, f"DIFF vs SESSION {idx-1}\n", "head")
|
|
for fd in diffs:
|
|
pg = f" (page {fd.page_key:04X})" if fd.page_key else ""
|
|
self._tw(w, f"\n SUB {fd.sub:02X} ({fd.sub_name}){pg}:\n", "addr")
|
|
for bd in fd.diffs:
|
|
b = f"{bd.before:02x}" if bd.before >= 0 else "--"
|
|
a = f"{bd.after:02x}" if bd.after >= 0 else "--"
|
|
self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim")
|
|
self._tw(w, f"{b} -> {a}", "changed")
|
|
if bd.field_name: self._tw(w, f" [{bd.field_name}]", "known")
|
|
self._tn(w)
|
|
|
|
report = render_session_report(sess, diffs, idx - 1 if idx > 0 else None)
|
|
self._tc(self.report_text)
|
|
self._tw(self.report_text, report, "normal")
|
|
self.nb.select(0)
|
|
|
|
def _show_frame_detail(self, sess_idx: int, frame_idx: int, source: str) -> None:
|
|
af = self._find_frame(sess_idx, frame_idx, source)
|
|
if af is None:
|
|
return
|
|
self._clear_tabs()
|
|
src = "bw" if source == "BW" else "s3"
|
|
sh = f"{af.header.sub:02X}" if af.header else "??"
|
|
|
|
w = self.inv_text
|
|
self._tw(w, f"[{af.source}] Frame #{af.frame.index}\n", src)
|
|
self._tw(w, f"Session {sess_idx} | ", "dim")
|
|
self._tw(w, f"SUB={sh} {af.sub_name}\n", "addr")
|
|
if af.header:
|
|
self._tw(w, f" OFFSET: {af.header.page_key:04X} "
|
|
f"CMD={af.header.cmd:02X} FLAGS={af.header.flags:02X}\n", "dim")
|
|
self._tn(w)
|
|
self._tw(w, f"Payload: {len(af.frame.payload)} bytes\n", "dim")
|
|
if af.frame.checksum_valid is False: self._tw(w, " BAD CHECKSUM\n", "warn")
|
|
elif af.frame.checksum_valid is True:
|
|
self._tw(w, f" Checksum: {af.frame.checksum_type} {af.frame.checksum_hex}\n", "dim")
|
|
self._tn(w)
|
|
p = af.frame.payload
|
|
if len(p) >= 5:
|
|
self._tw(w, "Header breakdown:\n", "head")
|
|
self._tw(w, f" [0] CMD = {p[0]:02x}\n", "dim")
|
|
self._tw(w, f" [1] ? = {p[1]:02x}\n", "dim")
|
|
self._tw(w, f" [2] SUB = {p[2]:02x} ({af.sub_name})\n", src)
|
|
self._tw(w, f" [3] OFFSET_HI = {p[3]:02x}\n", "dim")
|
|
self._tw(w, f" [4] OFFSET_LO = {p[4]:02x}\n", "dim")
|
|
if len(p) > 5: self._tw(w, f" [5..] data = {len(p)-5} bytes\n", "dim")
|
|
|
|
w = self.hex_text
|
|
self._tw(w, f"[{af.source}] SUB={sh} {af.sub_name}\n", src)
|
|
self._tw(w, f"Payload ({len(af.frame.payload)} bytes):\n", "dim")
|
|
self._tn(w)
|
|
self._tw(w, "\n".join(format_hex_dump(af.frame.payload, indent=" ")) + "\n", "normal")
|
|
|
|
diffs_for_sess = self.state.diffs[sess_idx] if sess_idx < len(self.state.diffs) else None
|
|
if diffs_for_sess and af.header:
|
|
matching = [fd for fd in diffs_for_sess
|
|
if fd.sub == af.header.sub and fd.page_key == af.header.page_key]
|
|
if matching:
|
|
self._tn(w)
|
|
self._tw(w, "Changed bytes (vs prev session):\n", "head")
|
|
for bd in matching[0].diffs:
|
|
b = f"{bd.before:02x}" if bd.before >= 0 else "--"
|
|
a = f"{bd.after:02x}" if bd.after >= 0 else "--"
|
|
self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim")
|
|
self._tw(w, f"{b} -> {a}", "changed")
|
|
if bd.field_name: self._tw(w, f" [{bd.field_name}]", "known")
|
|
self._tn(w)
|
|
self.nb.select(1)
|
|
|
|
# ── live mode ─────────────────────────────────────────────────────────
|
|
|
|
def _toggle_live(self) -> None:
|
|
if self._live_thread and self._live_thread.is_alive():
|
|
self._live_stop.set()
|
|
self._set_live_btn_off()
|
|
else:
|
|
s3p = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None
|
|
bwp = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None
|
|
if not s3p or not bwp:
|
|
messagebox.showerror("Missing files", "Select both raw files before starting live mode.")
|
|
return
|
|
self.state.s3_path = s3p
|
|
self.state.bw_path = bwp
|
|
self._start_live()
|
|
|
|
def _start_live(self) -> None:
|
|
s3p = Path(self.s3_var.get().strip())
|
|
bwp = Path(self.bw_var.get().strip())
|
|
self.state.s3_path = s3p
|
|
self.state.bw_path = bwp
|
|
self._live_stop.clear()
|
|
self._live_thread = threading.Thread(
|
|
target=self._live_worker, args=(s3p, bwp), daemon=True)
|
|
self._live_thread.start()
|
|
self.live_btn.configure(text="Live: ON", bg=GREEN, fg="#000000")
|
|
self.status_var.set("Live mode running...")
|
|
|
|
def _set_live_btn_off(self) -> None:
|
|
self.live_btn.configure(text="Live: OFF", bg=BG3, fg=FG)
|
|
self.status_var.set("Live stopped")
|
|
|
|
def _live_worker(self, s3_path: Path, bw_path: Path) -> None:
|
|
import time
|
|
s3_pos = bw_pos = 0
|
|
while not self._live_stop.is_set():
|
|
changed = False
|
|
for path, pos_attr in ((s3_path, "s3"), (bw_path, "bw")):
|
|
if path.exists():
|
|
with path.open("rb") as fh:
|
|
fh.seek(s3_pos if pos_attr == "s3" else bw_pos)
|
|
nb = fh.read()
|
|
if nb:
|
|
if pos_attr == "s3": s3_pos += len(nb)
|
|
else: bw_pos += len(nb)
|
|
changed = True
|
|
if changed:
|
|
self._live_q.put("refresh")
|
|
time.sleep(0.1)
|
|
|
|
def _poll_live_queue(self) -> None:
|
|
try:
|
|
while True:
|
|
msg = self._live_q.get_nowait()
|
|
if msg == "refresh" and self.state.s3_path and self.state.bw_path:
|
|
self._do_analyze(self.state.s3_path, self.state.bw_path)
|
|
except queue.Empty:
|
|
pass
|
|
finally:
|
|
self.after(150, self._poll_live_queue)
|
|
|
|
# ── DB query ──────────────────────────────────────────────────────────
|
|
|
|
def _refresh_query_dropdowns(self) -> None:
|
|
try:
|
|
captures = self._db.list_captures()
|
|
self._q_capture_cb["values"] = ["All"] + [
|
|
f"#{r['id']} {r['timestamp'][:16]} ({r['frame_count']} frames)"
|
|
for r in captures
|
|
]
|
|
self._q_capture_rows = [None] + [r["id"] for r in captures]
|
|
|
|
subs = self._db.get_distinct_subs()
|
|
self._q_sub_cb["values"] = ["All"] + [f"0x{s:02X}" for s in subs]
|
|
self._q_sub_values = [None] + subs
|
|
|
|
stats = self._db.get_stats()
|
|
self._q_stats_var.set(f"DB: {stats['captures']} captures | {stats['frames']} frames")
|
|
except Exception as exc:
|
|
self._q_stats_var.set(f"DB error: {exc}")
|
|
|
|
def _parse_int(self, s: str) -> Optional[int]:
|
|
s = s.strip()
|
|
if not s: return None
|
|
try: return int(s, 0)
|
|
except ValueError: return None
|
|
|
|
def _run_db_query(self) -> None:
|
|
cap_idx = self._q_capture_cb.current()
|
|
cap_id = self._q_capture_rows[cap_idx] if cap_idx > 0 else None
|
|
dir_val = self._q_dir_var.get()
|
|
direction = dir_val if dir_val != "All" else None
|
|
sub_idx = self._q_sub_cb.current()
|
|
sub = self._q_sub_values[sub_idx] if sub_idx > 0 else None
|
|
offset = self._parse_int(self._q_offset_var.get())
|
|
value = self._parse_int(self._q_value_var.get())
|
|
|
|
try:
|
|
if offset is not None:
|
|
rows = self._db.query_by_byte(offset=offset, value=value,
|
|
capture_id=cap_id, direction=direction, sub=sub)
|
|
else:
|
|
rows = self._db.query_frames(capture_id=cap_id, direction=direction, sub=sub)
|
|
except Exception as exc:
|
|
messagebox.showerror("Query error", str(exc))
|
|
return
|
|
|
|
self._q_tree.delete(*self._q_tree.get_children())
|
|
self._q_rows.clear()
|
|
for row in rows:
|
|
sh = f"0x{row['sub']:02X}" if row["sub"] is not None else "—"
|
|
pg = f"0x{row['page_key']:04X}" if row["page_key"] is not None else "—"
|
|
chk = {1: "OK", 0: "BAD", None: "—"}.get(row["checksum_ok"], "—")
|
|
tag = "bw_row" if row["direction"] == "BW" else "s3_row"
|
|
if row["checksum_ok"] == 0: tag = "bad_row"
|
|
iid = str(row["id"])
|
|
self._q_tree.insert("", tk.END, iid=iid, tags=(tag,), values=(
|
|
row["capture_id"], row["session_idx"], row["direction"],
|
|
sh, row["sub_name"] or "", pg, row["payload_len"], chk))
|
|
self._q_rows[iid] = row
|
|
self.sb_var.set(f"Query returned {len(rows)} rows")
|
|
|
|
def _run_interpret(self) -> None:
|
|
sel = self._q_tree.selection()
|
|
if not sel: return
|
|
row = self._q_rows.get(sel[0])
|
|
if not row: return
|
|
offset = self._parse_int(self._interp_offset_var.get())
|
|
if offset is None: return
|
|
|
|
payload = bytes(row["payload"])
|
|
interp = self._db.interpret_offset(payload, offset)
|
|
|
|
w = self._interp_text
|
|
w.configure(state="normal")
|
|
w.delete("1.0", tk.END)
|
|
sh = f"0x{row['sub']:02X}" if row["sub"] is not None else "??"
|
|
w.insert(tk.END,
|
|
f"Frame #{row['id']} [{row['direction']}] SUB={sh} "
|
|
f"offset={offset} (0x{offset:04X})\n", "label")
|
|
line = ""
|
|
for key, lbl in [
|
|
("uint8","uint8 "), ("int8","int8 "),
|
|
("uint16_be","uint16 BE "), ("uint16_le","uint16 LE "),
|
|
("uint32_be","uint32 BE "), ("uint32_le","uint32 LE "),
|
|
("float32_be","float32 BE "), ("float32_le","float32 LE "),
|
|
]:
|
|
if key not in interp: continue
|
|
val = interp[key]
|
|
vs = f"{val:.6g}" if isinstance(val, float) else (
|
|
f"{val} (0x{int(val)&0xFFFFFFFF:X})")
|
|
line += f" {lbl}: {vs:<28}"
|
|
if len(line) > 80:
|
|
w.insert(tk.END, line + "\n", "value"); line = ""
|
|
if line: w.insert(tk.END, line + "\n", "value")
|
|
w.configure(state="disabled")
|
|
|
|
# ── text helpers ──────────────────────────────────────────────────────
|
|
|
|
def _tc(self, w: tk.Text) -> None:
|
|
w.configure(state="normal"); w.delete("1.0", tk.END)
|
|
|
|
def _tw(self, w: tk.Text, text: str, tag: str = "normal") -> None:
|
|
w.configure(state="normal"); w.insert(tk.END, text, tag)
|
|
|
|
def _tn(self, w: tk.Text) -> None:
|
|
w.configure(state="normal"); w.insert(tk.END, "\n"); w.configure(state="disabled")
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Main application window
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
class SeismoLab(tk.Tk):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.title("Seismo Lab")
|
|
self.configure(bg=BG)
|
|
self.minsize(1100, 680)
|
|
|
|
self._db = FrameDB()
|
|
|
|
style = ttk.Style()
|
|
style.theme_use("clam")
|
|
style.configure("Top.TNotebook", background=BG3, borderwidth=0, tabposition="nw")
|
|
style.configure("Top.TNotebook.Tab", background=BG3, foreground=FG,
|
|
font=("Consolas", 10, "bold"), padding=[16, 6])
|
|
style.map("Top.TNotebook.Tab",
|
|
background=[("selected", ACCENT)],
|
|
foreground=[("selected", "#ffffff")])
|
|
|
|
nb = ttk.Notebook(self, style="Top.TNotebook")
|
|
nb.pack(fill=tk.BOTH, expand=True)
|
|
|
|
self._bridge_panel = BridgePanel(
|
|
nb,
|
|
on_bridge_started=self._on_bridge_started,
|
|
on_bridge_stopped=self._on_bridge_stopped,
|
|
)
|
|
nb.add(self._bridge_panel, text=" Bridge ")
|
|
|
|
self._analyzer_panel = AnalyzerPanel(nb, db=self._db)
|
|
nb.add(self._analyzer_panel, text=" Analyzer ")
|
|
|
|
self._nb = nb
|
|
self.protocol("WM_DELETE_WINDOW", self._on_close)
|
|
|
|
def _on_bridge_started(self, raw_bw: Optional[str], raw_s3: Optional[str]) -> None:
|
|
"""Bridge started — inject paths into analyzer and start live mode."""
|
|
self._analyzer_panel.set_live_files(raw_bw, raw_s3)
|
|
# Switch to Analyzer tab so the user can watch it update
|
|
self._nb.select(1)
|
|
|
|
def _on_bridge_stopped(self) -> None:
|
|
self._analyzer_panel.stop_live()
|
|
|
|
def _on_close(self) -> None:
|
|
self._bridge_panel.stop_bridge()
|
|
self.destroy()
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
def main() -> None:
|
|
app = SeismoLab()
|
|
app.mainloop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|