diff --git a/.gitignore b/.gitignore index 75de90b..580dbc6 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,25 @@ -/bridges/captures/ \ No newline at end of file +/bridges/captures/ + +# Python bytecode +__pycache__/ +*.py[cod] + +# Virtual environments +.venv/ +venv/ +env/ + +# Editor / OS +.vscode/ +*.swp +.DS_Store +Thumbs.db + +# Analyzer outputs +*.report +claude_export_*.md + +# Frame database +*.db +*.db-wal +*.db-shm diff --git a/parsers/frame_db.py b/parsers/frame_db.py new file mode 100644 index 0000000..f9045ad --- /dev/null +++ b/parsers/frame_db.py @@ -0,0 +1,337 @@ +#!/usr/bin/env python3 +""" +frame_db.py — SQLite frame database for Instantel protocol captures. + +Schema: + captures — one row per ingested capture pair (deduped by SHA256) + frames — one row per parsed frame + byte_values — one row per (frame, offset, value) for fast indexed queries + +Usage: + db = FrameDB() # opens default DB at ~/.seismo_lab/frames.db + db = FrameDB(path) # custom path + cap_id = db.ingest(sessions, s3_path, bw_path) + rows = db.query_frames(sub=0xF7, direction="S3") + rows = db.query_by_byte(offset=85, value=0x0A) +""" + +from __future__ import annotations + +import hashlib +import os +import sqlite3 +import struct +from pathlib import Path +from typing import Optional + +# ───────────────────────────────────────────────────────────────────────────── +# DB location +# ───────────────────────────────────────────────────────────────────────────── + +DEFAULT_DB_DIR = Path.home() / ".seismo_lab" +DEFAULT_DB_PATH = DEFAULT_DB_DIR / "frames.db" + + +# ───────────────────────────────────────────────────────────────────────────── +# Schema +# ───────────────────────────────────────────────────────────────────────────── + +_DDL = """ +PRAGMA journal_mode=WAL; +PRAGMA foreign_keys=ON; + +CREATE TABLE IF NOT EXISTS captures ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, -- ISO-8601 ingest time + s3_path TEXT, + bw_path TEXT, + capture_hash TEXT NOT NULL UNIQUE, -- SHA256 of s3_blob+bw_blob + notes TEXT DEFAULT '' +); + +CREATE TABLE IF NOT EXISTS frames ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + capture_id INTEGER NOT NULL REFERENCES captures(id) ON DELETE CASCADE, + session_idx INTEGER NOT NULL, + direction TEXT NOT NULL, -- 'BW' or 'S3' + sub INTEGER, -- NULL if malformed + page_key INTEGER, + sub_name TEXT, + payload BLOB NOT NULL, + payload_len INTEGER NOT NULL, + checksum_ok INTEGER -- 1/0/NULL +); + +CREATE INDEX IF NOT EXISTS idx_frames_capture ON frames(capture_id); +CREATE INDEX IF NOT EXISTS idx_frames_sub ON frames(sub); +CREATE INDEX IF NOT EXISTS idx_frames_page_key ON frames(page_key); +CREATE INDEX IF NOT EXISTS idx_frames_dir ON frames(direction); + +CREATE TABLE IF NOT EXISTS byte_values ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + frame_id INTEGER NOT NULL REFERENCES frames(id) ON DELETE CASCADE, + offset INTEGER NOT NULL, + value INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_bv_frame ON byte_values(frame_id); +CREATE INDEX IF NOT EXISTS idx_bv_offset ON byte_values(offset); +CREATE INDEX IF NOT EXISTS idx_bv_value ON byte_values(value); +CREATE INDEX IF NOT EXISTS idx_bv_off_val ON byte_values(offset, value); +""" + + +# ───────────────────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _sha256_blobs(s3_blob: bytes, bw_blob: bytes) -> str: + h = hashlib.sha256() + h.update(s3_blob) + h.update(bw_blob) + return h.hexdigest() + + +def _interp_bytes(data: bytes, offset: int) -> dict: + """ + Return multi-interpretation dict for 1–4 bytes starting at offset. + Used in the GUI's byte interpretation panel. + """ + result: dict = {} + remaining = len(data) - offset + if remaining <= 0: + return result + + b1 = data[offset] + result["uint8"] = b1 + result["int8"] = b1 if b1 < 128 else b1 - 256 + + if remaining >= 2: + u16be = struct.unpack_from(">H", data, offset)[0] + u16le = struct.unpack_from("= 4: + f32be = struct.unpack_from(">f", data, offset)[0] + f32le = struct.unpack_from("I", data, offset)[0] + u32le = struct.unpack_from(" None: + if path is None: + path = DEFAULT_DB_PATH + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + self.path = path + self._con = sqlite3.connect(str(path), check_same_thread=False) + self._con.row_factory = sqlite3.Row + self._init_schema() + + def _init_schema(self) -> None: + self._con.executescript(_DDL) + self._con.commit() + + def close(self) -> None: + self._con.close() + + # ── Ingest ──────────────────────────────────────────────────────────── + + def ingest( + self, + sessions: list, # list[Session] from s3_analyzer + s3_path: Optional[Path], + bw_path: Optional[Path], + notes: str = "", + ) -> Optional[int]: + """ + Ingest a list of sessions into the DB. + Returns capture_id, or None if already ingested (duplicate hash). + """ + import datetime + + s3_blob = s3_path.read_bytes() if s3_path and s3_path.exists() else b"" + bw_blob = bw_path.read_bytes() if bw_path and bw_path.exists() else b"" + cap_hash = _sha256_blobs(s3_blob, bw_blob) + + # Dedup check + row = self._con.execute( + "SELECT id FROM captures WHERE capture_hash=?", (cap_hash,) + ).fetchone() + if row: + return None # already in DB + + ts = datetime.datetime.now().isoformat(timespec="seconds") + cur = self._con.execute( + "INSERT INTO captures (timestamp, s3_path, bw_path, capture_hash, notes) " + "VALUES (?, ?, ?, ?, ?)", + (ts, str(s3_path) if s3_path else None, + str(bw_path) if bw_path else None, + cap_hash, notes) + ) + cap_id = cur.lastrowid + + for sess in sessions: + for af in sess.all_frames: + frame_id = self._insert_frame(cap_id, af) + self._insert_byte_values(frame_id, af.frame.payload) + + self._con.commit() + return cap_id + + def _insert_frame(self, cap_id: int, af) -> int: + """Insert one AnnotatedFrame; return its rowid.""" + sub = af.header.sub if af.header else None + page_key = af.header.page_key if af.header else None + chk_ok = None + if af.frame.checksum_valid is True: + chk_ok = 1 + elif af.frame.checksum_valid is False: + chk_ok = 0 + + cur = self._con.execute( + "INSERT INTO frames " + "(capture_id, session_idx, direction, sub, page_key, sub_name, payload, payload_len, checksum_ok) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + (cap_id, af.session_idx, af.source, + sub, page_key, af.sub_name, + af.frame.payload, len(af.frame.payload), chk_ok) + ) + return cur.lastrowid + + def _insert_byte_values(self, frame_id: int, payload: bytes) -> None: + """Insert one row per byte in payload into byte_values.""" + rows = [(frame_id, i, b) for i, b in enumerate(payload)] + self._con.executemany( + "INSERT INTO byte_values (frame_id, offset, value) VALUES (?, ?, ?)", + rows + ) + + # ── Queries ─────────────────────────────────────────────────────────── + + def list_captures(self) -> list[sqlite3.Row]: + return self._con.execute( + "SELECT id, timestamp, s3_path, bw_path, notes, " + " (SELECT COUNT(*) FROM frames WHERE capture_id=captures.id) AS frame_count " + "FROM captures ORDER BY id DESC" + ).fetchall() + + def query_frames( + self, + capture_id: Optional[int] = None, + direction: Optional[str] = None, # "BW" or "S3" + sub: Optional[int] = None, + page_key: Optional[int] = None, + limit: int = 500, + ) -> list[sqlite3.Row]: + """ + Query frames table with optional filters. + Returns rows with: id, capture_id, session_idx, direction, sub, page_key, + sub_name, payload, payload_len, checksum_ok + """ + clauses = [] + params = [] + + if capture_id is not None: + clauses.append("capture_id=?"); params.append(capture_id) + if direction is not None: + clauses.append("direction=?"); params.append(direction) + if sub is not None: + clauses.append("sub=?"); params.append(sub) + if page_key is not None: + clauses.append("page_key=?"); params.append(page_key) + + where = ("WHERE " + " AND ".join(clauses)) if clauses else "" + sql = f"SELECT * FROM frames {where} ORDER BY id LIMIT ?" + params.append(limit) + + return self._con.execute(sql, params).fetchall() + + def query_by_byte( + self, + offset: int, + value: Optional[int] = None, + capture_id: Optional[int] = None, + direction: Optional[str] = None, + sub: Optional[int] = None, + limit: int = 500, + ) -> list[sqlite3.Row]: + """ + Return frames that have a specific byte at a specific offset. + Joins byte_values -> frames for indexed lookup. + """ + clauses = ["bv.offset=?"] + params = [offset] + + if value is not None: + clauses.append("bv.value=?"); params.append(value) + if capture_id is not None: + clauses.append("f.capture_id=?"); params.append(capture_id) + if direction is not None: + clauses.append("f.direction=?"); params.append(direction) + if sub is not None: + clauses.append("f.sub=?"); params.append(sub) + + where = "WHERE " + " AND ".join(clauses) + sql = ( + f"SELECT f.*, bv.offset AS q_offset, bv.value AS q_value " + f"FROM byte_values bv " + f"JOIN frames f ON f.id=bv.frame_id " + f"{where} " + f"ORDER BY f.id LIMIT ?" + ) + params.append(limit) + return self._con.execute(sql, params).fetchall() + + def get_frame_payload(self, frame_id: int) -> Optional[bytes]: + row = self._con.execute( + "SELECT payload FROM frames WHERE id=?", (frame_id,) + ).fetchone() + return bytes(row["payload"]) if row else None + + def get_distinct_subs(self, capture_id: Optional[int] = None) -> list[int]: + if capture_id is not None: + rows = self._con.execute( + "SELECT DISTINCT sub FROM frames WHERE capture_id=? AND sub IS NOT NULL ORDER BY sub", + (capture_id,) + ).fetchall() + else: + rows = self._con.execute( + "SELECT DISTINCT sub FROM frames WHERE sub IS NOT NULL ORDER BY sub" + ).fetchall() + return [r[0] for r in rows] + + def get_distinct_offsets(self, capture_id: Optional[int] = None) -> list[int]: + if capture_id is not None: + rows = self._con.execute( + "SELECT DISTINCT bv.offset FROM byte_values bv " + "JOIN frames f ON f.id=bv.frame_id WHERE f.capture_id=? ORDER BY bv.offset", + (capture_id,) + ).fetchall() + else: + rows = self._con.execute( + "SELECT DISTINCT offset FROM byte_values ORDER BY offset" + ).fetchall() + return [r[0] for r in rows] + + def interpret_offset(self, payload: bytes, offset: int) -> dict: + """Return multi-format interpretation of bytes starting at offset.""" + return _interp_bytes(payload, offset) + + def get_stats(self) -> dict: + captures = self._con.execute("SELECT COUNT(*) FROM captures").fetchone()[0] + frames = self._con.execute("SELECT COUNT(*) FROM frames").fetchone()[0] + bv_rows = self._con.execute("SELECT COUNT(*) FROM byte_values").fetchone()[0] + return {"captures": captures, "frames": frames, "byte_value_rows": bv_rows} diff --git a/parsers/gui_analyzer.py b/parsers/gui_analyzer.py new file mode 100644 index 0000000..8e6839b --- /dev/null +++ b/parsers/gui_analyzer.py @@ -0,0 +1,940 @@ +#!/usr/bin/env python3 +""" +gui_analyzer.py — Tkinter GUI for s3_analyzer. + +Layout: + ┌─────────────────────────────────────────────────────────┐ + │ [S3 file: ___________ Browse] [BW file: ___ Browse] │ + │ [Analyze] [Live mode toggle] Status: Idle │ + ├──────────────────┬──────────────────────────────────────┤ + │ Session list │ Detail panel (tabs) │ + │ ─ Session 0 │ Inventory | Hex Dump | Diff │ + │ └ POLL (BW) │ │ + │ └ POLL_RESP │ (content of selected tab) │ + │ ─ Session 1 │ │ + │ └ ... │ │ + └──────────────────┴──────────────────────────────────────┘ + │ Status bar │ + └─────────────────────────────────────────────────────────┘ +""" + +from __future__ import annotations + +import queue +import sys +import threading +import time +import tkinter as tk +from pathlib import Path +from tkinter import filedialog, font, messagebox, ttk +from typing import Optional + +sys.path.insert(0, str(Path(__file__).parent)) +from s3_analyzer import ( # noqa: E402 + AnnotatedFrame, + FrameDiff, + Session, + annotate_frames, + diff_sessions, + format_hex_dump, + parse_bw, + parse_s3, + render_session_report, + split_into_sessions, + write_claude_export, +) +from frame_db import FrameDB, DEFAULT_DB_PATH # noqa: E402 + + +# ────────────────────────────────────────────────────────────────────────────── +# Colour palette (dark-ish terminal feel) +# ────────────────────────────────────────────────────────────────────────────── +BG = "#1e1e1e" +BG2 = "#252526" +BG3 = "#2d2d30" +FG = "#d4d4d4" +FG_DIM = "#6a6a6a" +ACCENT = "#569cd6" +ACCENT2 = "#4ec9b0" +RED = "#f44747" +YELLOW = "#dcdcaa" +GREEN = "#4caf50" +ORANGE = "#ce9178" + +COL_BW = "#9cdcfe" # BW frames +COL_S3 = "#4ec9b0" # S3 frames +COL_DIFF = "#f44747" # Changed bytes +COL_KNOW = "#4caf50" # Known-field annotations +COL_HEAD = "#569cd6" # Section headers + +MONO = ("Consolas", 9) +MONO_SM = ("Consolas", 8) + + +# ────────────────────────────────────────────────────────────────────────────── +# State container +# ────────────────────────────────────────────────────────────────────────────── + +class AnalyzerState: + def __init__(self) -> None: + self.sessions: list[Session] = [] + self.diffs: list[Optional[list[FrameDiff]]] = [] # diffs[i] = diff of session i vs i-1 + self.s3_path: Optional[Path] = None + self.bw_path: Optional[Path] = None + self.last_capture_id: Optional[int] = None + + +# ────────────────────────────────────────────────────────────────────────────── +# Main GUI +# ────────────────────────────────────────────────────────────────────────────── + +class AnalyzerGUI(tk.Tk): + def __init__(self) -> None: + super().__init__() + self.title("S3 Protocol Analyzer") + self.configure(bg=BG) + self.minsize(1050, 600) + + self.state = AnalyzerState() + self._live_thread: Optional[threading.Thread] = None + self._live_stop = threading.Event() + self._live_q: queue.Queue[str] = queue.Queue() + self._db = FrameDB() + + self._build_widgets() + self._poll_live_queue() + + # ── widget construction ──────────────────────────────────────────────── + + def _build_widgets(self) -> None: + self._build_toolbar() + self._build_panes() + self._build_statusbar() + + def _build_toolbar(self) -> None: + bar = tk.Frame(self, bg=BG2, pady=4) + bar.pack(side=tk.TOP, fill=tk.X) + + pad = {"padx": 5, "pady": 2} + + # S3 file + tk.Label(bar, text="S3 raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad) + self.s3_var = tk.StringVar() + tk.Entry(bar, textvariable=self.s3_var, width=28, bg=BG3, fg=FG, + insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad) + tk.Button(bar, text="Browse", bg=BG3, fg=FG, relief="flat", + activebackground=ACCENT, cursor="hand2", + command=lambda: self._browse_file(self.s3_var, "raw_s3.bin") + ).pack(side=tk.LEFT, **pad) + + tk.Label(bar, text=" BW raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad) + self.bw_var = tk.StringVar() + tk.Entry(bar, textvariable=self.bw_var, width=28, bg=BG3, fg=FG, + insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad) + tk.Button(bar, text="Browse", bg=BG3, fg=FG, relief="flat", + activebackground=ACCENT, cursor="hand2", + command=lambda: self._browse_file(self.bw_var, "raw_bw.bin") + ).pack(side=tk.LEFT, **pad) + + # Buttons + tk.Frame(bar, bg=BG2, width=10).pack(side=tk.LEFT) + self.analyze_btn = tk.Button(bar, text="Analyze", bg=ACCENT, fg="#ffffff", + relief="flat", padx=10, cursor="hand2", + font=("Consolas", 9, "bold"), + command=self._run_analyze) + self.analyze_btn.pack(side=tk.LEFT, **pad) + + self.live_btn = tk.Button(bar, text="Live: OFF", bg=BG3, fg=FG, + relief="flat", padx=10, cursor="hand2", + font=MONO, command=self._toggle_live) + self.live_btn.pack(side=tk.LEFT, **pad) + + self.export_btn = tk.Button(bar, text="Export for Claude", bg=ORANGE, fg="#000000", + relief="flat", padx=10, cursor="hand2", + font=("Consolas", 9, "bold"), + command=self._run_export, state="disabled") + self.export_btn.pack(side=tk.LEFT, **pad) + + self.status_var = tk.StringVar(value="Idle") + tk.Label(bar, textvariable=self.status_var, bg=BG2, fg=FG_DIM, + font=MONO, anchor="w").pack(side=tk.LEFT, padx=10) + + def _build_panes(self) -> None: + pane = tk.PanedWindow(self, orient=tk.HORIZONTAL, bg=BG, + sashwidth=4, sashrelief="flat") + pane.pack(fill=tk.BOTH, expand=True, padx=0, pady=0) + + # ── Left: session/frame tree ────────────────────────────────────── + left = tk.Frame(pane, bg=BG2, width=260) + pane.add(left, minsize=200) + + tk.Label(left, text="Sessions", bg=BG2, fg=ACCENT, + font=("Consolas", 9, "bold"), anchor="w", padx=6).pack(fill=tk.X) + + tree_frame = tk.Frame(left, bg=BG2) + tree_frame.pack(fill=tk.BOTH, expand=True) + + style = ttk.Style() + style.theme_use("clam") + style.configure("Treeview", + background=BG2, foreground=FG, fieldbackground=BG2, + font=MONO_SM, rowheight=18, borderwidth=0) + style.configure("Treeview.Heading", + background=BG3, foreground=ACCENT, font=MONO_SM) + style.map("Treeview", background=[("selected", BG3)], + foreground=[("selected", "#ffffff")]) + + self.tree = ttk.Treeview(tree_frame, columns=("info",), show="tree headings", + selectmode="browse") + self.tree.heading("#0", text="Frame") + self.tree.heading("info", text="Info") + self.tree.column("#0", width=160, stretch=True) + self.tree.column("info", width=80, stretch=False) + + vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.tree.yview) + self.tree.configure(yscrollcommand=vsb.set) + vsb.pack(side=tk.RIGHT, fill=tk.Y) + self.tree.pack(fill=tk.BOTH, expand=True) + + self.tree.tag_configure("session", foreground=ACCENT, font=("Consolas", 9, "bold")) + self.tree.tag_configure("bw_frame", foreground=COL_BW) + self.tree.tag_configure("s3_frame", foreground=COL_S3) + self.tree.tag_configure("bad_chk", foreground=RED) + self.tree.tag_configure("malformed", foreground=RED) + + self.tree.bind("<>", self._on_tree_select) + + # ── Right: detail notebook ──────────────────────────────────────── + right = tk.Frame(pane, bg=BG) + pane.add(right, minsize=600) + + style.configure("TNotebook", background=BG2, borderwidth=0) + style.configure("TNotebook.Tab", background=BG3, foreground=FG, + font=MONO, padding=[8, 2]) + style.map("TNotebook.Tab", background=[("selected", BG)], + foreground=[("selected", ACCENT)]) + + self.nb = ttk.Notebook(right) + self.nb.pack(fill=tk.BOTH, expand=True) + + # Tab: Inventory + self.inv_text = self._make_text_tab("Inventory") + # Tab: Hex Dump + self.hex_text = self._make_text_tab("Hex Dump") + # Tab: Diff + self.diff_text = self._make_text_tab("Diff") + # Tab: Full Report (raw text) + self.report_text = self._make_text_tab("Full Report") + # Tab: Query (DB) + self._build_query_tab() + + # Tag colours for rich text in all tabs + for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text): + w.tag_configure("head", foreground=COL_HEAD, font=("Consolas", 9, "bold")) + w.tag_configure("bw", foreground=COL_BW) + w.tag_configure("s3", foreground=COL_S3) + w.tag_configure("changed", foreground=COL_DIFF) + w.tag_configure("known", foreground=COL_KNOW) + w.tag_configure("dim", foreground=FG_DIM) + w.tag_configure("normal", foreground=FG) + w.tag_configure("warn", foreground=YELLOW) + w.tag_configure("addr", foreground=ORANGE) + + def _make_text_tab(self, title: str) -> tk.Text: + frame = tk.Frame(self.nb, bg=BG) + self.nb.add(frame, text=title) + w = tk.Text(frame, bg=BG, fg=FG, font=MONO, state="disabled", + relief="flat", wrap="none", insertbackground=FG, + selectbackground=BG3, selectforeground="#ffffff") + vsb = ttk.Scrollbar(frame, orient="vertical", command=w.yview) + hsb = ttk.Scrollbar(frame, orient="horizontal", command=w.xview) + w.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set) + vsb.pack(side=tk.RIGHT, fill=tk.Y) + hsb.pack(side=tk.BOTTOM, fill=tk.X) + w.pack(fill=tk.BOTH, expand=True) + return w + + def _build_query_tab(self) -> None: + """Build the Query tab: filter controls + results table + interpretation panel.""" + frame = tk.Frame(self.nb, bg=BG) + self.nb.add(frame, text="Query DB") + + # ── Filter row ──────────────────────────────────────────────────── + filt = tk.Frame(frame, bg=BG2, pady=4) + filt.pack(side=tk.TOP, fill=tk.X) + + pad = {"padx": 4, "pady": 2} + + # Capture filter + tk.Label(filt, text="Capture:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=0, sticky="e", **pad) + self._q_capture_var = tk.StringVar(value="All") + self._q_capture_cb = ttk.Combobox(filt, textvariable=self._q_capture_var, + width=18, font=MONO_SM, state="readonly") + self._q_capture_cb.grid(row=0, column=1, sticky="w", **pad) + + # Direction filter + tk.Label(filt, text="Dir:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=2, sticky="e", **pad) + self._q_dir_var = tk.StringVar(value="All") + self._q_dir_cb = ttk.Combobox(filt, textvariable=self._q_dir_var, + values=["All", "BW", "S3"], + width=6, font=MONO_SM, state="readonly") + self._q_dir_cb.grid(row=0, column=3, sticky="w", **pad) + + # SUB filter + tk.Label(filt, text="SUB:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=4, sticky="e", **pad) + self._q_sub_var = tk.StringVar(value="All") + self._q_sub_cb = ttk.Combobox(filt, textvariable=self._q_sub_var, + width=12, font=MONO_SM, state="readonly") + self._q_sub_cb.grid(row=0, column=5, sticky="w", **pad) + + # Byte offset filter + tk.Label(filt, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=6, sticky="e", **pad) + self._q_offset_var = tk.StringVar(value="") + tk.Entry(filt, textvariable=self._q_offset_var, width=8, bg=BG3, fg=FG, + font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=7, sticky="w", **pad) + + # Value filter + tk.Label(filt, text="Value:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=8, sticky="e", **pad) + self._q_value_var = tk.StringVar(value="") + tk.Entry(filt, textvariable=self._q_value_var, width=8, bg=BG3, fg=FG, + font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=9, sticky="w", **pad) + + # Run / Refresh buttons + tk.Button(filt, text="Run Query", bg=ACCENT, fg="#ffffff", relief="flat", + padx=8, cursor="hand2", font=("Consolas", 8, "bold"), + command=self._run_db_query).grid(row=0, column=10, padx=8) + tk.Button(filt, text="Refresh dropdowns", bg=BG3, fg=FG, relief="flat", + padx=6, cursor="hand2", font=MONO_SM, + command=self._refresh_query_dropdowns).grid(row=0, column=11, padx=4) + + # DB stats label + self._q_stats_var = tk.StringVar(value="DB: —") + tk.Label(filt, textvariable=self._q_stats_var, bg=BG2, fg=FG_DIM, + font=MONO_SM).grid(row=0, column=12, padx=12, sticky="w") + + # ── Results table ───────────────────────────────────────────────── + res_frame = tk.Frame(frame, bg=BG) + res_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True) + + # Results treeview + cols = ("cap", "sess", "dir", "sub", "sub_name", "page", "len", "chk") + self._q_tree = ttk.Treeview(res_frame, columns=cols, + show="headings", selectmode="browse") + col_cfg = [ + ("cap", "Cap", 40), + ("sess", "Sess", 40), + ("dir", "Dir", 40), + ("sub", "SUB", 50), + ("sub_name", "Name", 160), + ("page", "Page", 60), + ("len", "Len", 50), + ("chk", "Chk", 50), + ] + for cid, heading, width in col_cfg: + self._q_tree.heading(cid, text=heading, anchor="w") + self._q_tree.column(cid, width=width, stretch=(cid == "sub_name")) + + q_vsb = ttk.Scrollbar(res_frame, orient="vertical", command=self._q_tree.yview) + q_hsb = ttk.Scrollbar(res_frame, orient="horizontal", command=self._q_tree.xview) + self._q_tree.configure(yscrollcommand=q_vsb.set, xscrollcommand=q_hsb.set) + q_vsb.pack(side=tk.RIGHT, fill=tk.Y) + q_hsb.pack(side=tk.BOTTOM, fill=tk.X) + self._q_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + + self._q_tree.tag_configure("bw_row", foreground=COL_BW) + self._q_tree.tag_configure("s3_row", foreground=COL_S3) + self._q_tree.tag_configure("bad_row", foreground=RED) + + # ── Interpretation panel (below results) ────────────────────────── + interp_frame = tk.Frame(frame, bg=BG2, height=120) + interp_frame.pack(side=tk.BOTTOM, fill=tk.X) + interp_frame.pack_propagate(False) + + tk.Label(interp_frame, text="Byte interpretation (click a row, enter offset):", + bg=BG2, fg=ACCENT, font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X) + + interp_inner = tk.Frame(interp_frame, bg=BG2) + interp_inner.pack(fill=tk.X, padx=6, pady=2) + + tk.Label(interp_inner, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).pack(side=tk.LEFT) + self._interp_offset_var = tk.StringVar(value="5") + tk.Entry(interp_inner, textvariable=self._interp_offset_var, + width=6, bg=BG3, fg=FG, font=MONO_SM, + insertbackground=FG, relief="flat").pack(side=tk.LEFT, padx=4) + tk.Button(interp_inner, text="Interpret", bg=BG3, fg=FG, relief="flat", + cursor="hand2", font=MONO_SM, + command=self._run_interpret).pack(side=tk.LEFT, padx=4) + + self._interp_text = tk.Text(interp_frame, bg=BG2, fg=FG, font=MONO_SM, + height=4, relief="flat", state="disabled", + insertbackground=FG) + self._interp_text.pack(fill=tk.X, padx=6, pady=2) + self._interp_text.tag_configure("label", foreground=FG_DIM) + self._interp_text.tag_configure("value", foreground=YELLOW) + + # Store frame rows by tree iid -> db row + self._q_rows: dict[str, object] = {} + self._q_capture_rows: list = [None] + self._q_sub_values: list = [None] + self._q_tree.bind("<>", self._on_q_select) + + # Init dropdowns + self._refresh_query_dropdowns() + + def _refresh_query_dropdowns(self) -> None: + """Reload capture and SUB dropdowns from the DB.""" + try: + captures = self._db.list_captures() + cap_labels = ["All"] + [ + f"#{r['id']} {r['timestamp'][:16]} ({r['frame_count']} frames)" + for r in captures + ] + self._q_capture_cb["values"] = cap_labels + self._q_capture_rows = [None] + [r["id"] for r in captures] + + subs = self._db.get_distinct_subs() + sub_labels = ["All"] + [f"0x{s:02X}" for s in subs] + self._q_sub_cb["values"] = sub_labels + self._q_sub_values = [None] + subs + + stats = self._db.get_stats() + self._q_stats_var.set( + f"DB: {stats['captures']} captures | {stats['frames']} frames" + ) + except Exception as exc: + self._q_stats_var.set(f"DB error: {exc}") + + def _parse_hex_or_int(self, s: str) -> Optional[int]: + """Parse '0x1F', '31', or '' into int or None.""" + s = s.strip() + if not s: + return None + try: + return int(s, 0) + except ValueError: + return None + + def _run_db_query(self) -> None: + """Execute query with current filter values and populate results tree.""" + # Resolve capture_id + cap_idx = self._q_capture_cb.current() + cap_id = self._q_capture_rows[cap_idx] if cap_idx > 0 else None + + # Direction + dir_val = self._q_dir_var.get() + direction = dir_val if dir_val != "All" else None + + # SUB + sub_idx = self._q_sub_cb.current() + sub = self._q_sub_values[sub_idx] if sub_idx > 0 else None + + # Offset / value + offset = self._parse_hex_or_int(self._q_offset_var.get()) + value = self._parse_hex_or_int(self._q_value_var.get()) + + try: + if offset is not None: + rows = self._db.query_by_byte( + offset=offset, value=value, + capture_id=cap_id, direction=direction, sub=sub + ) + else: + rows = self._db.query_frames( + capture_id=cap_id, direction=direction, sub=sub + ) + except Exception as exc: + messagebox.showerror("Query error", str(exc)) + return + + # Populate tree + self._q_tree.delete(*self._q_tree.get_children()) + self._q_rows.clear() + + for row in rows: + sub_hex = f"0x{row['sub']:02X}" if row["sub"] is not None else "—" + page_hex = f"0x{row['page_key']:04X}" if row["page_key"] is not None else "—" + chk_str = {1: "OK", 0: "BAD", None: "—"}.get(row["checksum_ok"], "—") + tag = "bw_row" if row["direction"] == "BW" else "s3_row" + if row["checksum_ok"] == 0: + tag = "bad_row" + + iid = str(row["id"]) + self._q_tree.insert("", tk.END, iid=iid, tags=(tag,), values=( + row["capture_id"], + row["session_idx"], + row["direction"], + sub_hex, + row["sub_name"] or "", + page_hex, + row["payload_len"], + chk_str, + )) + self._q_rows[iid] = row + + self.sb_var.set(f"Query returned {len(rows)} rows") + + def _on_q_select(self, _event: tk.Event) -> None: + """When a DB result row is selected, auto-run interpret at current offset.""" + self._run_interpret() + + def _run_interpret(self) -> None: + """Show multi-format byte interpretation for the selected row + offset.""" + sel = self._q_tree.selection() + if not sel: + return + iid = sel[0] + row = self._q_rows.get(iid) + if row is None: + return + + offset = self._parse_hex_or_int(self._interp_offset_var.get()) + if offset is None: + return + + payload = bytes(row["payload"]) + interp = self._db.interpret_offset(payload, offset) + + w = self._interp_text + w.configure(state="normal") + w.delete("1.0", tk.END) + + sub_hex = f"0x{row['sub']:02X}" if row["sub"] is not None else "??" + w.insert(tk.END, f"Frame #{row['id']} [{row['direction']}] SUB={sub_hex} " + f"offset={offset} (0x{offset:04X})\n", "label") + + label_order = [ + ("uint8", "uint8 "), + ("int8", "int8 "), + ("uint16_be", "uint16 BE "), + ("uint16_le", "uint16 LE "), + ("uint32_be", "uint32 BE "), + ("uint32_le", "uint32 LE "), + ("float32_be", "float32 BE "), + ("float32_le", "float32 LE "), + ] + line = "" + for key, label in label_order: + if key in interp: + val = interp[key] + if isinstance(val, float): + val_str = f"{val:.6g}" + else: + val_str = str(val) + if key.startswith("uint") or key.startswith("int"): + val_str += f" (0x{int(val) & 0xFFFFFFFF:X})" + chunk = f"{label}: {val_str}" + line += f" {chunk:<30}" + if len(line) > 80: + w.insert(tk.END, line + "\n", "value") + line = "" + if line: + w.insert(tk.END, line + "\n", "value") + + w.configure(state="disabled") + + def _build_statusbar(self) -> None: + bar = tk.Frame(self, bg=BG3, height=20) + bar.pack(side=tk.BOTTOM, fill=tk.X) + self.sb_var = tk.StringVar(value="Ready") + tk.Label(bar, textvariable=self.sb_var, bg=BG3, fg=FG_DIM, + font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X) + + # ── file picking ─────────────────────────────────────────────────────── + + def _browse_file(self, var: tk.StringVar, default_name: str) -> None: + path = filedialog.askopenfilename( + title=f"Select {default_name}", + filetypes=[("Binary files", "*.bin"), ("All files", "*.*")], + initialfile=default_name, + ) + if path: + var.set(path) + + # ── analysis ────────────────────────────────────────────────────────── + + def _run_analyze(self) -> None: + s3_path = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None + bw_path = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None + + if not s3_path or not bw_path: + messagebox.showerror("Missing files", "Please select both S3 and BW raw files.") + return + if not s3_path.exists(): + messagebox.showerror("File not found", f"S3 file not found:\n{s3_path}") + return + if not bw_path.exists(): + messagebox.showerror("File not found", f"BW file not found:\n{bw_path}") + return + + self.state.s3_path = s3_path + self.state.bw_path = bw_path + self._do_analyze(s3_path, bw_path) + + def _run_export(self) -> None: + if not self.state.sessions: + messagebox.showinfo("Export", "Run Analyze first.") + return + + outdir = self.state.s3_path.parent if self.state.s3_path else Path(".") + out_path = write_claude_export( + self.state.sessions, + self.state.diffs, + outdir, + self.state.s3_path, + self.state.bw_path, + ) + + self.sb_var.set(f"Exported: {out_path.name}") + if messagebox.askyesno( + "Export complete", + f"Saved to:\n{out_path}\n\nOpen the folder?", + ): + import subprocess + subprocess.Popen(["explorer", str(out_path.parent)]) + + def _do_analyze(self, s3_path: Path, bw_path: Path) -> None: + self.status_var.set("Parsing...") + self.update_idletasks() + + s3_blob = s3_path.read_bytes() + bw_blob = bw_path.read_bytes() + + s3_frames = annotate_frames(parse_s3(s3_blob, trailer_len=0), "S3") + bw_frames = annotate_frames(parse_bw(bw_blob, trailer_len=0, validate_checksum=True), "BW") + + sessions = split_into_sessions(bw_frames, s3_frames) + + diffs: list[Optional[list[FrameDiff]]] = [None] + for i in range(1, len(sessions)): + diffs.append(diff_sessions(sessions[i - 1], sessions[i])) + + self.state.sessions = sessions + self.state.diffs = diffs + + n_s3 = sum(len(s.s3_frames) for s in sessions) + n_bw = sum(len(s.bw_frames) for s in sessions) + self.status_var.set( + f"{len(sessions)} sessions | BW: {n_bw} frames S3: {n_s3} frames" + ) + self.sb_var.set(f"Loaded: {s3_path.name} + {bw_path.name}") + + self.export_btn.configure(state="normal") + self._rebuild_tree() + + # Auto-ingest into DB (deduped by SHA256 — fast no-op on re-analyze) + try: + cap_id = self._db.ingest(sessions, s3_path, bw_path) + if cap_id is not None: + self.state.last_capture_id = cap_id + self._refresh_query_dropdowns() + # Pre-select this capture in the Query tab + cap_labels = list(self._q_capture_cb["values"]) + # Find label that starts with # + for i, lbl in enumerate(cap_labels): + if lbl.startswith(f"#{cap_id} "): + self._q_capture_cb.current(i) + break + # else: already ingested — no change to dropdown selection + except Exception as exc: + self.sb_var.set(f"DB ingest error: {exc}") + + # ── tree building ────────────────────────────────────────────────────── + + def _rebuild_tree(self) -> None: + self.tree.delete(*self.tree.get_children()) + + for sess in self.state.sessions: + is_complete = any( + af.header is not None and af.header.sub == 0x74 + for af in sess.bw_frames + ) + label = f"Session {sess.index}" + if not is_complete: + label += " [partial]" + n_diff = len(self.state.diffs[sess.index] or []) + diff_info = f"{n_diff} changes" if n_diff > 0 else "" + sess_id = self.tree.insert("", tk.END, text=label, + values=(diff_info,), tags=("session",)) + + for af in sess.all_frames: + src_tag = "bw_frame" if af.source == "BW" else "s3_frame" + sub_hex = f"{af.header.sub:02X}" if af.header else "??" + label_text = f"[{af.source}] {sub_hex} {af.sub_name}" + extra = "" + tags = (src_tag,) + if af.frame.checksum_valid is False: + extra = "BAD CHK" + tags = ("bad_chk",) + elif af.header is None: + tags = ("malformed",) + label_text = f"[{af.source}] MALFORMED" + self.tree.insert(sess_id, tk.END, text=label_text, + values=(extra,), tags=tags, + iid=f"frame_{sess.index}_{af.frame.index}_{af.source}") + + # Expand all sessions + for item in self.tree.get_children(): + self.tree.item(item, open=True) + + # ── tree selection → detail panel ───────────────────────────────────── + + def _on_tree_select(self, _event: tk.Event) -> None: + sel = self.tree.selection() + if not sel: + return + iid = sel[0] + + # Determine if it's a session node or a frame node + if iid.startswith("frame_"): + # frame___ + parts = iid.split("_") + sess_idx = int(parts[1]) + frame_idx = int(parts[2]) + source = parts[3] + self._show_frame_detail(sess_idx, frame_idx, source) + else: + # Session node — show session summary + # Find session index from text + text = self.tree.item(iid, "text") + try: + idx = int(text.split()[1]) + self._show_session_detail(idx) + except (IndexError, ValueError): + pass + + def _find_frame(self, sess_idx: int, frame_idx: int, source: str) -> Optional[AnnotatedFrame]: + if sess_idx >= len(self.state.sessions): + return None + sess = self.state.sessions[sess_idx] + pool = sess.bw_frames if source == "BW" else sess.s3_frames + for af in pool: + if af.frame.index == frame_idx: + return af + return None + + # ── detail renderers ────────────────────────────────────────────────── + + def _clear_all_tabs(self) -> None: + for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text): + self._text_clear(w) + + def _show_session_detail(self, sess_idx: int) -> None: + if sess_idx >= len(self.state.sessions): + return + sess = self.state.sessions[sess_idx] + diffs = self.state.diffs[sess_idx] + + self._clear_all_tabs() + + # ── Inventory tab ──────────────────────────────────────────────── + w = self.inv_text + self._text_clear(w) + self._tw(w, f"SESSION {sess.index}", "head"); self._tn(w) + n_bw, n_s3 = len(sess.bw_frames), len(sess.s3_frames) + self._tw(w, f"Frames: {n_bw + n_s3} (BW: {n_bw}, S3: {n_s3})\n", "normal") + if n_bw != n_s3: + self._tw(w, " WARNING: BW/S3 count mismatch\n", "warn") + self._tn(w) + + for seq_i, af in enumerate(sess.all_frames): + src_tag = "bw" if af.source == "BW" else "s3" + sub_hex = f"{af.header.sub:02X}" if af.header else "??" + page_str = f" (page {af.header.page_key:04X})" if af.header and af.header.page_key != 0 else "" + chk = "" + if af.frame.checksum_valid is False: + chk = " [BAD CHECKSUM]" + elif af.frame.checksum_valid is True: + chk = f" [{af.frame.checksum_type}]" + self._tw(w, f" [{af.source}] #{seq_i:<3} ", src_tag) + self._tw(w, f"SUB={sub_hex} ", "addr") + self._tw(w, f"{af.sub_name:<30}", src_tag) + self._tw(w, f"{page_str} len={len(af.frame.payload)}", "dim") + if chk: + self._tw(w, chk, "warn" if af.frame.checksum_valid is False else "dim") + self._tn(w) + + # ── Diff tab ───────────────────────────────────────────────────── + w = self.diff_text + self._text_clear(w) + if diffs is None: + self._tw(w, "(No previous session to diff against)\n", "dim") + elif not diffs: + self._tw(w, f"DIFF vs SESSION {sess_idx - 1}\n", "head"); self._tn(w) + self._tw(w, " No changes detected.\n", "dim") + else: + self._tw(w, f"DIFF vs SESSION {sess_idx - 1}\n", "head"); self._tn(w) + for fd in diffs: + page_str = f" (page {fd.page_key:04X})" if fd.page_key != 0 else "" + self._tw(w, f"\n SUB {fd.sub:02X} ({fd.sub_name}){page_str}:\n", "addr") + for bd in fd.diffs: + before_s = f"{bd.before:02x}" if bd.before >= 0 else "--" + after_s = f"{bd.after:02x}" if bd.after >= 0 else "--" + self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim") + self._tw(w, f"{before_s} -> {after_s}", "changed") + if bd.field_name: + self._tw(w, f" [{bd.field_name}]", "known") + self._tn(w) + + # ── Full Report tab ─────────────────────────────────────────────── + report_text = render_session_report(sess, diffs, sess_idx - 1 if sess_idx > 0 else None) + w = self.report_text + self._text_clear(w) + self._tw(w, report_text, "normal") + + # Switch to Inventory tab + self.nb.select(0) + + def _show_frame_detail(self, sess_idx: int, frame_idx: int, source: str) -> None: + af = self._find_frame(sess_idx, frame_idx, source) + if af is None: + return + + self._clear_all_tabs() + src_tag = "bw" if source == "BW" else "s3" + sub_hex = f"{af.header.sub:02X}" if af.header else "??" + + # ── Inventory tab — single frame summary ───────────────────────── + w = self.inv_text + self._tw(w, f"[{af.source}] Frame #{af.frame.index}\n", src_tag) + self._tw(w, f"Session {sess_idx} | ", "dim") + self._tw(w, f"SUB={sub_hex} {af.sub_name}\n", "addr") + if af.header: + self._tw(w, f" OFFSET: {af.header.page_key:04X} ", "dim") + self._tw(w, f"CMD={af.header.cmd:02X} FLAGS={af.header.flags:02X}\n", "dim") + self._tn(w) + self._tw(w, f"Payload bytes: {len(af.frame.payload)}\n", "dim") + if af.frame.checksum_valid is False: + self._tw(w, " BAD CHECKSUM\n", "warn") + elif af.frame.checksum_valid is True: + self._tw(w, f" Checksum: {af.frame.checksum_type} {af.frame.checksum_hex}\n", "dim") + self._tn(w) + + # Protocol header breakdown + p = af.frame.payload + if len(p) >= 5: + self._tw(w, "Header breakdown:\n", "head") + self._tw(w, f" [0] CMD = {p[0]:02x}\n", "dim") + self._tw(w, f" [1] ? = {p[1]:02x}\n", "dim") + self._tw(w, f" [2] SUB = {p[2]:02x} ({af.sub_name})\n", src_tag) + self._tw(w, f" [3] OFFSET_HI = {p[3]:02x}\n", "dim") + self._tw(w, f" [4] OFFSET_LO = {p[4]:02x}\n", "dim") + if len(p) > 5: + self._tw(w, f" [5..] data = {len(p) - 5} bytes\n", "dim") + + # ── Hex Dump tab ───────────────────────────────────────────────── + w = self.hex_text + self._tw(w, f"[{af.source}] SUB={sub_hex} {af.sub_name}\n", src_tag) + self._tw(w, f"Payload ({len(af.frame.payload)} bytes):\n", "dim") + self._tn(w) + dump_lines = format_hex_dump(af.frame.payload, indent=" ") + self._tw(w, "\n".join(dump_lines) + "\n", "normal") + + # Annotate known field offsets within this frame + diffs_for_sess = self.state.diffs[sess_idx] if sess_idx < len(self.state.diffs) else None + if diffs_for_sess and af.header: + page_key = af.header.page_key + matching = [fd for fd in diffs_for_sess + if fd.sub == af.header.sub and fd.page_key == page_key] + if matching: + self._tn(w) + self._tw(w, "Changed bytes in this frame (vs prev session):\n", "head") + for bd in matching[0].diffs: + before_s = f"{bd.before:02x}" if bd.before >= 0 else "--" + after_s = f"{bd.after:02x}" if bd.after >= 0 else "--" + self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim") + self._tw(w, f"{before_s} -> {after_s}", "changed") + if bd.field_name: + self._tw(w, f" [{bd.field_name}]", "known") + self._tn(w) + + # Switch to Hex Dump tab for frame selection + self.nb.select(1) + + # ── live mode ───────────────────────────────────────────────────────── + + def _toggle_live(self) -> None: + if self._live_thread and self._live_thread.is_alive(): + self._live_stop.set() + self.live_btn.configure(text="Live: OFF", bg=BG3, fg=FG) + self.status_var.set("Live stopped") + else: + s3_path = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None + bw_path = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None + if not s3_path or not bw_path: + messagebox.showerror("Missing files", "Select both raw files before starting live mode.") + return + self.state.s3_path = s3_path + self.state.bw_path = bw_path + self._live_stop.clear() + self._live_thread = threading.Thread( + target=self._live_worker, args=(s3_path, bw_path), daemon=True) + self._live_thread.start() + self.live_btn.configure(text="Live: ON", bg=GREEN, fg="#000000") + self.status_var.set("Live mode running...") + + def _live_worker(self, s3_path: Path, bw_path: Path) -> None: + s3_buf = bytearray() + bw_buf = bytearray() + s3_pos = bw_pos = 0 + + while not self._live_stop.is_set(): + changed = False + if s3_path.exists(): + with s3_path.open("rb") as fh: + fh.seek(s3_pos) + nb = fh.read() + if nb: + s3_buf.extend(nb); s3_pos += len(nb); changed = True + if bw_path.exists(): + with bw_path.open("rb") as fh: + fh.seek(bw_pos) + nb = fh.read() + if nb: + bw_buf.extend(nb); bw_pos += len(nb); changed = True + + if changed: + self._live_q.put("refresh") + + time.sleep(0.1) + + def _poll_live_queue(self) -> None: + try: + while True: + msg = self._live_q.get_nowait() + if msg == "refresh" and self.state.s3_path and self.state.bw_path: + self._do_analyze(self.state.s3_path, self.state.bw_path) + except queue.Empty: + pass + finally: + self.after(150, self._poll_live_queue) + + # ── text helpers ────────────────────────────────────────────────────── + + def _text_clear(self, w: tk.Text) -> None: + w.configure(state="normal") + w.delete("1.0", tk.END) + # leave enabled for further inserts + + def _tw(self, w: tk.Text, text: str, tag: str = "normal") -> None: + """Insert text with a colour tag.""" + w.configure(state="normal") + w.insert(tk.END, text, tag) + + def _tn(self, w: tk.Text) -> None: + """Insert newline.""" + w.configure(state="normal") + w.insert(tk.END, "\n") + w.configure(state="disabled") + + +# ────────────────────────────────────────────────────────────────────────────── +# Entry point +# ────────────────────────────────────────────────────────────────────────────── + +def main() -> None: + app = AnalyzerGUI() + app.mainloop() + + +if __name__ == "__main__": + main() diff --git a/parsers/s3_analyzer.py b/parsers/s3_analyzer.py new file mode 100644 index 0000000..fb8b264 --- /dev/null +++ b/parsers/s3_analyzer.py @@ -0,0 +1,948 @@ +#!/usr/bin/env python3 +""" +s3_analyzer.py — Live protocol analysis tool for Instantel MiniMate Plus RS-232. + +Reads raw_s3.bin and raw_bw.bin (produced by s3_bridge.py), parses DLE frames, +groups into sessions, auto-diffs consecutive sessions, and annotates known fields. + +Usage: + python s3_analyzer.py --s3 raw_s3.bin --bw raw_bw.bin [--live] [--outdir DIR] +""" + +from __future__ import annotations + +import argparse +import sys +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +# Allow running from any working directory +sys.path.insert(0, str(Path(__file__).parent)) +from s3_parser import Frame, parse_bw, parse_s3 # noqa: E402 + +__version__ = "0.1.0" + +# ────────────────────────────────────────────────────────────────────────────── +# Protocol constants +# ────────────────────────────────────────────────────────────────────────────── + +# SUB_TABLE: sub_byte → (name, direction, notes) +# direction: "BW→S3", "S3→BW", or "both" +SUB_TABLE: dict[int, tuple[str, str, str]] = { + # BW→S3 read requests + 0x5B: ("POLL", "BW→S3", "Keepalive / device discovery"), + 0x01: ("FULL_CONFIG_READ", "BW→S3", "~0x98 bytes; firmware, model, serial, channel config"), + 0x06: ("CHANNEL_CONFIG_READ", "BW→S3", "0x24 bytes; channel configuration block"), + 0x08: ("EVENT_INDEX_READ", "BW→S3", "0x58 bytes; event count and record pointers"), + 0x0A: ("WAVEFORM_HEADER_READ", "BW→S3", "0x30 bytes/page; waveform header keyed by timestamp"), + 0x0C: ("FULL_WAVEFORM_READ", "BW→S3", "0xD2 bytes/page × 2; project strings, PPV floats"), + 0x1C: ("TRIGGER_CONFIG_READ", "BW→S3", "0x2C bytes; trigger settings block"), + 0x09: ("UNKNOWN_READ_A", "BW→S3", "0xCA bytes response (F6); purpose unknown"), + 0x1A: ("COMPLIANCE_CONFIG_READ", "BW→S3", "Large block (E5); trigger/alarm floats, unit strings"), + 0x2E: ("UNKNOWN_READ_B", "BW→S3", "0x1A bytes response (D1); purpose unknown"), + # BW→S3 write commands + 0x68: ("EVENT_INDEX_WRITE", "BW→S3", "Mirrors SUB 08 read; event count and timestamps"), + 0x69: ("WAVEFORM_DATA_WRITE", "BW→S3", "0xCA bytes; mirrors SUB 09"), + 0x71: ("COMPLIANCE_STRINGS_WRITE", "BW→S3", "Compliance config + all project string fields"), + 0x72: ("WRITE_CONFIRM_A", "BW→S3", "Short frame; commit step after 0x71"), + 0x73: ("WRITE_CONFIRM_B", "BW→S3", "Short frame"), + 0x74: ("WRITE_CONFIRM_C", "BW→S3", "Short frame; final session-close confirm"), + 0x82: ("TRIGGER_CONFIG_WRITE", "BW→S3", "0x1C bytes; trigger config block; mirrors SUB 1C"), + 0x83: ("TRIGGER_WRITE_CONFIRM", "BW→S3", "Short frame; commit step after 0x82"), + # S3→BW responses + 0xA4: ("POLL_RESPONSE", "S3→BW", "Response to SUB 5B poll"), + 0xFE: ("FULL_CONFIG_RESPONSE", "S3→BW", "Response to SUB 01"), + 0xF9: ("CHANNEL_CONFIG_RESPONSE", "S3→BW", "Response to SUB 06"), + 0xF7: ("EVENT_INDEX_RESPONSE", "S3→BW", "Response to SUB 08; contains backlight/power-save"), + 0xF5: ("WAVEFORM_HEADER_RESPONSE", "S3→BW", "Response to SUB 0A"), + 0xF3: ("FULL_WAVEFORM_RESPONSE", "S3→BW", "Response to SUB 0C; project strings, PPV floats"), + 0xE3: ("TRIGGER_CONFIG_RESPONSE", "S3→BW", "Response to SUB 1C; contains timestamps"), + 0xF6: ("UNKNOWN_RESPONSE_A", "S3→BW", "Response to SUB 09; 0xCA bytes"), + 0xE5: ("COMPLIANCE_CONFIG_RESPONSE","S3→BW", "Response to SUB 1A; record time in page 2"), + 0xD1: ("UNKNOWN_RESPONSE_B", "S3→BW", "Response to SUB 2E; 0x1A bytes"), + 0xEA: ("SERIAL_NUMBER_RESPONSE", "S3→BW", "0x0A bytes; serial number + firmware minor version"), + # Short ack responses to writes (0xFF - write_sub) + 0x8E: ("WRITE_CONFIRM_RESPONSE_71", "S3→BW", "Ack for SUB 71 COMPLIANCE_STRINGS_WRITE"), + 0x8D: ("WRITE_CONFIRM_RESPONSE_72", "S3→BW", "Ack for SUB 72 WRITE_CONFIRM_A"), + 0x8C: ("WRITE_CONFIRM_RESPONSE_73", "S3→BW", "Ack for SUB 73 WRITE_CONFIRM_B"), + 0x8B: ("WRITE_CONFIRM_RESPONSE_74", "S3→BW", "Ack for SUB 74 WRITE_CONFIRM_C"), + 0x97: ("WRITE_CONFIRM_RESPONSE_68", "S3→BW", "Ack for SUB 68 EVENT_INDEX_WRITE"), + 0x96: ("WRITE_CONFIRM_RESPONSE_69", "S3→BW", "Ack for SUB 69 WAVEFORM_DATA_WRITE"), + 0x7D: ("WRITE_CONFIRM_RESPONSE_82", "S3→BW", "Ack for SUB 82 TRIGGER_CONFIG_WRITE"), + 0x7C: ("WRITE_CONFIRM_RESPONSE_83", "S3→BW", "Ack for SUB 83 TRIGGER_WRITE_CONFIRM"), +} + +# SUBs whose data-section bytes 0–5 are known timestamps (suppress in diffs) +NOISY_SUBS: set[int] = {0xE3, 0xF7, 0xF5} + +# E5 page 2 key: the OFFSET_HI:OFFSET_LO that identifies the data page +# E5 page 1 (length probe) has offset 0x0000; page 2 has offset 0x082A +E5_PAGE2_KEY = 0x082A + +# FieldEntry: (sub, page_key_or_none, payload_offset, field_name, type_hint, notes) +# payload_offset = offset from start of Frame.payload (not data section, not wire) +# Exception: for SUB 0x82, offset [22] is from full de-stuffed payload[0] per protocol ref. +@dataclass(frozen=True) +class FieldEntry: + sub: int + page_key: Optional[int] # None = any / all pages + payload_offset: int # offset from frame.payload[0] + name: str + type_hint: str + notes: str + + +FIELD_MAP: list[FieldEntry] = [ + # F7 (EVENT_INDEX_RESPONSE) — data section starts at payload[5] + # Protocol ref: backlight at data+0x4B = payload[5+0x4B] = payload[80] + FieldEntry(0xF7, None, 5 + 0x4B, "backlight_on_time", "uint8", "seconds; 0=off"), + FieldEntry(0xF7, None, 5 + 0x53, "power_save_timeout", "uint8", "minutes; 0=disabled"), + FieldEntry(0xF7, None, 5 + 0x54, "monitoring_lcd_cycle", "uint16 BE","65500=disabled"), + # E5 page 2 (COMPLIANCE_CONFIG_RESPONSE) — record time at data+0x28 + FieldEntry(0xE5, E5_PAGE2_KEY, 5 + 0x28, "record_time", "float32 BE", "seconds; 7s=40E00000, 13s=41500000"), + # SUB 0x82 (TRIGGER_CONFIG_WRITE) — BW→S3 write + # Protocol ref offset [22] is from the de-stuffed payload[0], confirmed from raw_bw.bin + FieldEntry(0x82, None, 22, "trigger_sample_width", "uint8", "samples; mode-gated, BW-side write only"), +] + + +# ────────────────────────────────────────────────────────────────────────────── +# Data structures +# ────────────────────────────────────────────────────────────────────────────── + +@dataclass +class FrameHeader: + cmd: int + sub: int + offset_hi: int + offset_lo: int + flags: int + + @property + def page_key(self) -> int: + return (self.offset_hi << 8) | self.offset_lo + + +@dataclass +class AnnotatedFrame: + frame: Frame + source: str # "BW" or "S3" + header: Optional[FrameHeader] # None if payload < 7 bytes (malformed/short) + sub_name: str + session_idx: int = -1 + + +@dataclass +class Session: + index: int + bw_frames: list[AnnotatedFrame] + s3_frames: list[AnnotatedFrame] + + @property + def all_frames(self) -> list[AnnotatedFrame]: + """Interleave BW/S3 in synchronous protocol order: BW[0], S3[0], BW[1], S3[1]...""" + result: list[AnnotatedFrame] = [] + for i in range(max(len(self.bw_frames), len(self.s3_frames))): + if i < len(self.bw_frames): + result.append(self.bw_frames[i]) + if i < len(self.s3_frames): + result.append(self.s3_frames[i]) + return result + + +@dataclass +class ByteDiff: + payload_offset: int + before: int + after: int + field_name: Optional[str] + + +@dataclass +class FrameDiff: + sub: int + page_key: int + sub_name: str + diffs: list[ByteDiff] + + +# ────────────────────────────────────────────────────────────────────────────── +# Parsing helpers +# ────────────────────────────────────────────────────────────────────────────── + +def extract_header(payload: bytes) -> Optional[FrameHeader]: + """ + Extract protocol header from de-stuffed payload. + + After de-stuffing, the actual observed layout is 5 bytes: + [0] CMD -- 0x10 for BW requests, 0x00 for S3 responses + [1] ? -- 0x00 for BW, 0x10 for S3 (DLE/ADDR byte that survives de-stuffing) + [2] SUB -- the actual command/response identifier + [3] OFFSET_HI + [4] OFFSET_LO + Data section begins at payload[5]. + + Note: The protocol reference describes a 7-byte header with CMD/DLE/ADDR/FLAGS/SUB/..., + but DLE+ADDR (both 0x10 on wire) are de-stuffed into single bytes by parse_bw/parse_s3, + collapsing the observable header to 5 bytes. + """ + if len(payload) < 5: + return None + return FrameHeader( + cmd=payload[0], + sub=payload[2], + offset_hi=payload[3], + offset_lo=payload[4], + flags=payload[1], + ) + + +def annotate_frame(frame: Frame, source: str) -> AnnotatedFrame: + header = extract_header(frame.payload) + if header is not None: + entry = SUB_TABLE.get(header.sub) + sub_name = entry[0] if entry else f"UNKNOWN_{header.sub:02X}" + else: + sub_name = "MALFORMED" + return AnnotatedFrame(frame=frame, source=source, header=header, sub_name=sub_name) + + +def annotate_frames(frames: list[Frame], source: str) -> list[AnnotatedFrame]: + return [annotate_frame(f, source) for f in frames] + + +def load_and_annotate(s3_path: Path, bw_path: Path) -> tuple[list[AnnotatedFrame], list[AnnotatedFrame]]: + """Parse both raw files and return annotated frame lists.""" + s3_blob = s3_path.read_bytes() if s3_path.exists() else b"" + bw_blob = bw_path.read_bytes() if bw_path.exists() else b"" + + s3_frames = parse_s3(s3_blob, trailer_len=0) + bw_frames = parse_bw(bw_blob, trailer_len=0, validate_checksum=True) + + return annotate_frames(s3_frames, "S3"), annotate_frames(bw_frames, "BW") + + +# ────────────────────────────────────────────────────────────────────────────── +# Session detection +# ────────────────────────────────────────────────────────────────────────────── + +# BW SUB that marks the end of a compliance write session +SESSION_CLOSE_SUB = 0x74 + +def split_into_sessions( + bw_annotated: list[AnnotatedFrame], + s3_annotated: list[AnnotatedFrame], +) -> list[Session]: + """ + Split frames into sessions. A session ends on BW SUB 0x74 (WRITE_CONFIRM_C). + New session starts at the stream beginning and after each 0x74. + + The protocol is synchronous: BW[i] request → S3[i] response. S3 frame i + belongs to the same session as BW frame i. + """ + if not bw_annotated and not s3_annotated: + return [] + + sessions: list[Session] = [] + session_idx = 0 + bw_start = 0 + + # Track where we are in S3 frames — they mirror BW frame count per session + s3_cursor = 0 + + i = 0 + while i < len(bw_annotated): + frame = bw_annotated[i] + i += 1 + + is_close = ( + frame.header is not None and frame.header.sub == SESSION_CLOSE_SUB + ) + + if is_close: + bw_slice = bw_annotated[bw_start:i] + # S3 frames in this session match BW frame count (synchronous protocol) + n_s3 = len(bw_slice) + s3_slice = s3_annotated[s3_cursor : s3_cursor + n_s3] + s3_cursor += n_s3 + + sess = Session(index=session_idx, bw_frames=bw_slice, s3_frames=s3_slice) + for f in sess.all_frames: + f.session_idx = session_idx + sessions.append(sess) + + session_idx += 1 + bw_start = i + + # Remaining frames (in-progress / no closing 0x74 yet) + if bw_start < len(bw_annotated) or s3_cursor < len(s3_annotated): + bw_slice = bw_annotated[bw_start:] + n_s3 = len(bw_slice) + s3_slice = s3_annotated[s3_cursor : s3_cursor + n_s3] + # also grab any extra S3 frames beyond expected pairing + if s3_cursor + n_s3 < len(s3_annotated): + s3_slice = s3_annotated[s3_cursor:] + + if bw_slice or s3_slice: + sess = Session(index=session_idx, bw_frames=bw_slice, s3_frames=s3_slice) + for f in sess.all_frames: + f.session_idx = session_idx + sessions.append(sess) + + return sessions + + +# ────────────────────────────────────────────────────────────────────────────── +# Diff engine +# ────────────────────────────────────────────────────────────────────────────── + +def _mask_noisy(sub: int, data: bytes) -> bytearray: + """ + Zero out known-noisy byte ranges before diffing. + For NOISY_SUBS: mask bytes 0–5 of the data section (timestamps). + """ + buf = bytearray(data) + if sub in NOISY_SUBS and len(buf) >= 6: + for k in range(6): + buf[k] = 0x00 + return buf + + +HEADER_LEN = 5 # Observed de-stuffed header size: CMD + ? + SUB + OFFSET_HI + OFFSET_LO + + +def _get_data_section(af: AnnotatedFrame) -> bytes: + """ + Return the data section of the frame (after the 5-byte protocol header). + For S3 frames, payload still contains a trailing SUM8 byte — exclude it. + For BW frames, parse_bw with validate_checksum=True already stripped it. + """ + payload = af.frame.payload + if len(payload) < HEADER_LEN: + return b"" + data = payload[HEADER_LEN:] + if af.source == "S3" and len(data) >= 1: + # SUM8 is still present at end of S3 frame payload + data = data[:-1] + return data + + +def lookup_field_name(sub: int, page_key: int, payload_offset: int) -> Optional[str]: + """Return field name if the given payload offset matches a known field, else None.""" + for entry in FIELD_MAP: + if entry.sub != sub: + continue + if entry.page_key is not None and entry.page_key != page_key: + continue + if entry.payload_offset == payload_offset: + return entry.name + return None + + +def diff_sessions(sess_a: Session, sess_b: Session) -> list[FrameDiff]: + """ + Compare two sessions frame-by-frame, matched by (sub, page_key). + Returns a list of FrameDiff for SUBs where bytes changed. + """ + # Build lookup: (sub, page_key) → AnnotatedFrame for each session + def index_session(sess: Session) -> dict[tuple[int, int], AnnotatedFrame]: + idx: dict[tuple[int, int], AnnotatedFrame] = {} + for af in sess.all_frames: + if af.header is None: + continue + key = (af.header.sub, af.header.page_key) + # Keep first occurrence per key (or we could keep all — for now, first) + if key not in idx: + idx[key] = af + return idx + + idx_a = index_session(sess_a) + idx_b = index_session(sess_b) + + results: list[FrameDiff] = [] + + # Only compare SUBs present in both sessions + common_keys = set(idx_a.keys()) & set(idx_b.keys()) + for key in sorted(common_keys): + sub, page_key = key + af_a = idx_a[key] + af_b = idx_b[key] + + data_a = _mask_noisy(sub, _get_data_section(af_a)) + data_b = _mask_noisy(sub, _get_data_section(af_b)) + + if data_a == data_b: + continue + + # Compare byte by byte up to the shorter length + diffs: list[ByteDiff] = [] + max_len = max(len(data_a), len(data_b)) + for offset in range(max_len): + byte_a = data_a[offset] if offset < len(data_a) else None + byte_b = data_b[offset] if offset < len(data_b) else None + if byte_a != byte_b: + # payload_offset = data_section_offset + HEADER_LEN + payload_off = offset + HEADER_LEN + field = lookup_field_name(sub, page_key, payload_off) + diffs.append(ByteDiff( + payload_offset=payload_off, + before=byte_a if byte_a is not None else -1, + after=byte_b if byte_b is not None else -1, + field_name=field, + )) + + if diffs: + entry = SUB_TABLE.get(sub) + sub_name = entry[0] if entry else f"UNKNOWN_{sub:02X}" + results.append(FrameDiff(sub=sub, page_key=page_key, sub_name=sub_name, diffs=diffs)) + + return results + + +# ────────────────────────────────────────────────────────────────────────────── +# Report rendering +# ────────────────────────────────────────────────────────────────────────────── + +def format_hex_dump(data: bytes, indent: str = " ") -> list[str]: + """Compact 16-bytes-per-line hex dump. Returns list of lines.""" + lines = [] + for row_start in range(0, len(data), 16): + chunk = data[row_start:row_start + 16] + hex_part = " ".join(f"{b:02x}" for b in chunk) + lines.append(f"{indent}{row_start:04x}: {hex_part}") + return lines + + +def render_session_report( + session: Session, + diffs: Optional[list[FrameDiff]], + prev_session_index: Optional[int], +) -> str: + lines: list[str] = [] + + n_bw = len(session.bw_frames) + n_s3 = len(session.s3_frames) + total = n_bw + n_s3 + is_complete = any( + af.header is not None and af.header.sub == SESSION_CLOSE_SUB + for af in session.bw_frames + ) + status = "" if is_complete else " [IN PROGRESS]" + + lines.append(f"{'='*72}") + lines.append(f"SESSION {session.index}{status}") + lines.append(f"{'='*72}") + lines.append(f"Frames: {total} (BW: {n_bw}, S3: {n_s3})") + if n_bw != n_s3: + lines.append(f" WARNING: BW/S3 frame count mismatch — protocol sync issue?") + lines.append("") + + # ── Frame inventory ────────────────────────────────────────────────────── + lines.append("FRAME INVENTORY") + for seq_i, af in enumerate(session.all_frames): + if af.header is not None: + sub_hex = f"{af.header.sub:02X}" + page_str = f" (page {af.header.page_key:04X})" if af.header.page_key != 0 else "" + else: + sub_hex = "??" + page_str = "" + chk = "" + if af.frame.checksum_valid is False: + chk = " [BAD CHECKSUM]" + elif af.frame.checksum_valid is True: + chk = f" [{af.frame.checksum_type}]" + lines.append( + f" [{af.source}] #{seq_i:<3} SUB={sub_hex} {af.sub_name:<30}{page_str}" + f" len={len(af.frame.payload)}{chk}" + ) + lines.append("") + + # ── Hex dumps ──────────────────────────────────────────────────────────── + lines.append("HEX DUMPS") + for seq_i, af in enumerate(session.all_frames): + sub_hex = f"{af.header.sub:02X}" if af.header else "??" + lines.append(f" [{af.source}] #{seq_i} SUB={sub_hex} {af.sub_name}") + dump_lines = format_hex_dump(af.frame.payload, indent=" ") + if dump_lines: + lines.extend(dump_lines) + else: + lines.append(" (empty payload)") + lines.append("") + + # ── Diff section ───────────────────────────────────────────────────────── + if diffs is not None: + if prev_session_index is not None: + lines.append(f"DIFF vs SESSION {prev_session_index}") + else: + lines.append("DIFF") + + if not diffs: + lines.append(" (no changes)") + else: + for fd in diffs: + page_str = f" (page {fd.page_key:04X})" if fd.page_key != 0 else "" + lines.append(f" SUB {fd.sub:02X} ({fd.sub_name}){page_str}:") + for bd in fd.diffs: + field_str = f" [{bd.field_name}]" if bd.field_name else "" + before_str = f"{bd.before:02x}" if bd.before >= 0 else "--" + after_str = f"{bd.after:02x}" if bd.after >= 0 else "--" + lines.append( + f" offset [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: " + f"{before_str} -> {after_str}{field_str}" + ) + lines.append("") + + return "\n".join(lines) + "\n" + + +def write_report(session: Session, report_text: str, outdir: Path) -> Path: + outdir.mkdir(parents=True, exist_ok=True) + out_path = outdir / f"session_{session.index:03d}.report" + out_path.write_text(report_text, encoding="utf-8") + return out_path + + +# ────────────────────────────────────────────────────────────────────────────── +# Claude export +# ────────────────────────────────────────────────────────────────────────────── + +def _hex_block(data: bytes, bytes_per_row: int = 16) -> list[str]: + """Hex dump with offset + hex + ASCII columns.""" + lines = [] + for row in range(0, len(data), bytes_per_row): + chunk = data[row:row + bytes_per_row] + hex_col = " ".join(f"{b:02x}" for b in chunk) + hex_col = f"{hex_col:<{bytes_per_row * 3 - 1}}" + asc_col = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk) + lines.append(f" {row:04x} {hex_col} |{asc_col}|") + return lines + + +def render_claude_export( + sessions: list[Session], + diffs: list[Optional[list[FrameDiff]]], + s3_path: Optional[Path] = None, + bw_path: Optional[Path] = None, +) -> str: + """ + Produce a single self-contained Markdown file suitable for pasting into + a Claude conversation for protocol reverse-engineering assistance. + + Structure: + 1. Context block — what this is, protocol background, field map + 2. Capture summary — session count, frame counts, what changed + 3. Per-diff section — one section per session pair that had changes: + a. Diff table (before/after bytes, known field labels) + b. Full hex dumps of ONLY the frames that changed + 4. Full hex dumps of all frames in sessions with no prior comparison + (session 0 baseline) + """ + import datetime + lines: list[str] = [] + + now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") + s3_name = s3_path.name if s3_path else "raw_s3.bin" + bw_name = bw_path.name if bw_path else "raw_bw.bin" + + # ── 1. Context block ────────────────────────────────────────────────── + lines += [ + f"# Instantel MiniMate Plus — Protocol Capture Analysis", + f"Generated: {now} | Source: `{s3_name}` + `{bw_name}`", + "", + "## Protocol Background", + "", + "This file contains parsed RS-232 captures from an Instantel MiniMate Plus", + "seismograph communicating with Blastware PC software at 38400 baud 8N1.", + "", + "**Frame structure (de-stuffed payload):**", + "```", + " [0] CMD 0x10 = BW request, 0x00 = S3 response", + " [1] ? 0x00 (BW) or 0x10 (S3)", + " [2] SUB Command/response identifier (key field)", + " [3] OFFSET_HI Page offset high byte", + " [4] OFFSET_LO Page offset low byte", + " [5+] DATA Payload data section", + "```", + "", + "**Response SUB rule:** response_SUB = 0xFF - request_SUB (confirmed, no exceptions observed)", + "", + "**Known field map** (offsets from payload[0]):", + "```", + " SUB F7 (EVENT_INDEX_RESPONSE):", + " [80] 0x52 backlight_on_time uint8 seconds", + " [88] 0x58 power_save_timeout uint8 minutes", + " [89] 0x59 monitoring_lcd_cycle uint16BE 65500=disabled", + " SUB E5 page 0x082A (COMPLIANCE_CONFIG_RESPONSE):", + " [45] 0x2D record_time float32BE seconds (7s=40E00000, 13s=41500000)", + " SUB 82 (TRIGGER_CONFIG_WRITE, BW-side only):", + " [22] trigger_sample_width uint8 samples", + "```", + "", + "**Session boundary:** a compliance session ends when BW sends SUB 0x74 (WRITE_CONFIRM_C).", + "Sessions are numbered from 0. The diff compares consecutive complete sessions.", + "", + ] + + # ── 2. Capture summary ──────────────────────────────────────────────── + lines += ["## Capture Summary", ""] + lines.append(f"Sessions found: {len(sessions)}") + for sess in sessions: + is_complete = any( + af.header is not None and af.header.sub == SESSION_CLOSE_SUB + for af in sess.bw_frames + ) + status = "complete" if is_complete else "partial/in-progress" + n_bw, n_s3 = len(sess.bw_frames), len(sess.s3_frames) + changed = len(diffs[sess.index] or []) if sess.index < len(diffs) else 0 + changed_str = f" ({changed} SUBs changed vs prev)" if sess.index > 0 else " (baseline)" + lines.append(f" Session {sess.index} [{status}]: BW={n_bw} S3={n_s3} frames{changed_str}") + lines.append("") + + # ── 3. Per-diff sections ────────────────────────────────────────────── + any_diffs = False + for sess in sessions: + sess_diffs = diffs[sess.index] if sess.index < len(diffs) else None + if sess_diffs is None or sess.index == 0: + continue + + any_diffs = True + prev_idx = sess.index - 1 + lines += [ + f"---", + f"## Diff: Session {prev_idx} -> Session {sess.index}", + "", + ] + + if not sess_diffs: + lines.append("_No byte changes detected between these sessions._") + lines.append("") + continue + + # Build index of changed frames for this session (and prev) + prev_sess = sessions[prev_idx] if prev_idx < len(sessions) else None + + for fd in sess_diffs: + page_str = f" page 0x{fd.page_key:04X}" if fd.page_key != 0 else "" + lines += [ + f"### SUB {fd.sub:02X} — {fd.sub_name}{page_str}", + "", + ] + + # Diff table + known_count = sum(1 for bd in fd.diffs if bd.field_name) + unknown_count = sum(1 for bd in fd.diffs if not bd.field_name) + lines.append( + f"Changed bytes: **{len(fd.diffs)}** total " + f"({known_count} known fields, {unknown_count} unknown)" + ) + lines.append("") + lines.append("| Offset | Hex | Dec | Session {0} | Session {1} | Field |".format(prev_idx, sess.index)) + lines.append("|--------|-----|-----|" + "-" * 12 + "|" + "-" * 12 + "|-------|") + for bd in fd.diffs: + before_s = f"`{bd.before:02x}`" if bd.before >= 0 else "`--`" + after_s = f"`{bd.after:02x}`" if bd.after >= 0 else "`--`" + before_d = str(bd.before) if bd.before >= 0 else "--" + after_d = str(bd.after) if bd.after >= 0 else "--" + field = f"`{bd.field_name}`" if bd.field_name else "**UNKNOWN**" + lines.append( + f"| [{bd.payload_offset}] 0x{bd.payload_offset:04X} " + f"| {before_s}->{after_s} | {before_d}->{after_d} " + f"| {before_s} | {after_s} | {field} |" + ) + lines.append("") + + # Hex dumps of the changed frame in both sessions + def _find_af(target_sess: Session, sub: int, page_key: int) -> Optional[AnnotatedFrame]: + for af in target_sess.all_frames: + if af.header and af.header.sub == sub and af.header.page_key == page_key: + return af + return None + + af_prev = _find_af(sessions[prev_idx], fd.sub, fd.page_key) if prev_sess else None + af_curr = _find_af(sess, fd.sub, fd.page_key) + + lines.append("**Hex dumps (full de-stuffed payload):**") + lines.append("") + + for label, af in [(f"Session {prev_idx} (before)", af_prev), + (f"Session {sess.index} (after)", af_curr)]: + if af is None: + lines.append(f"_{label}: frame not found_") + lines.append("") + continue + lines.append(f"_{label}_ — {len(af.frame.payload)} bytes:") + lines.append("```") + lines += _hex_block(af.frame.payload) + lines.append("```") + lines.append("") + + if not any_diffs: + lines += [ + "---", + "## Diffs", + "", + "_Only one session found — no diff available. " + "Run a second capture with changed settings to see what moves._", + "", + ] + + # ── 4. Baseline hex dumps (session 0, all frames) ───────────────────── + if sessions: + baseline = sessions[0] + lines += [ + "---", + f"## Baseline — Session 0 (all frames)", + "", + "Full hex dump of every frame in the first session.", + "Use this to map field positions from known values.", + "", + ] + for seq_i, af in enumerate(baseline.all_frames): + sub_hex = f"{af.header.sub:02X}" if af.header else "??" + page_str = f" page 0x{af.header.page_key:04X}" if af.header and af.header.page_key != 0 else "" + chk_str = f" [{af.frame.checksum_type}]" if af.frame.checksum_valid else "" + lines.append( + f"### [{af.source}] #{seq_i} SUB {sub_hex} — {af.sub_name}{page_str}{chk_str}" + ) + lines.append(f"_{len(af.frame.payload)} bytes_") + lines.append("```") + lines += _hex_block(af.frame.payload) + lines.append("```") + lines.append("") + + lines += [ + "---", + "_End of analysis. To map an unknown field: change exactly one setting in Blastware,_", + "_capture again, run the analyzer, and look for the offset that moved._", + ] + + return "\n".join(lines) + "\n" + + +def write_claude_export( + sessions: list[Session], + diffs: list[Optional[list[FrameDiff]]], + outdir: Path, + s3_path: Optional[Path] = None, + bw_path: Optional[Path] = None, +) -> Path: + import datetime + outdir.mkdir(parents=True, exist_ok=True) + stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + out_path = outdir / f"claude_export_{stamp}.md" + out_path.write_text( + render_claude_export(sessions, diffs, s3_path, bw_path), + encoding="utf-8" + ) + return out_path + + +# ────────────────────────────────────────────────────────────────────────────── +# Post-processing mode +# ────────────────────────────────────────────────────────────────────────────── + +def run_postprocess(s3_path: Path, bw_path: Path, outdir: Path, export: bool = False) -> None: + print(f"s3_analyzer v{__version__}") + print(f" S3 file : {s3_path}") + print(f" BW file : {bw_path}") + print(f" Out dir : {outdir}") + print() + + s3_frames, bw_frames = load_and_annotate(s3_path, bw_path) + print(f"Parsed: {len(s3_frames)} S3 frames, {len(bw_frames)} BW frames") + + sessions = split_into_sessions(bw_frames, s3_frames) + print(f"Sessions: {len(sessions)}") + print() + + all_diffs: list[Optional[list[FrameDiff]]] = [None] + prev_session: Optional[Session] = None + for sess in sessions: + sess_diffs: Optional[list[FrameDiff]] = None + prev_idx: Optional[int] = None + if prev_session is not None: + sess_diffs = diff_sessions(prev_session, sess) + prev_idx = prev_session.index + all_diffs.append(sess_diffs) + + report = render_session_report(sess, sess_diffs, prev_idx) + out_path = write_report(sess, report, outdir) + n_diffs = len(sess_diffs) if sess_diffs else 0 + print(f" Session {sess.index}: {len(sess.all_frames)} frames, {n_diffs} changed SUBs -> {out_path.name}") + + prev_session = sess + + if export: + export_path = write_claude_export(sessions, all_diffs, outdir, s3_path, bw_path) + print(f"\n Claude export -> {export_path.name}") + + print() + print(f"Reports written to: {outdir}") + + +# ────────────────────────────────────────────────────────────────────────────── +# Live mode +# ────────────────────────────────────────────────────────────────────────────── + +def live_loop( + s3_path: Path, + bw_path: Path, + outdir: Path, + poll_interval: float = 0.05, +) -> None: + """ + Tail both raw files continuously, re-parsing on new bytes. + Emits a session report as soon as BW SUB 0x74 is detected. + """ + print(f"s3_analyzer v{__version__} — LIVE MODE") + print(f" S3 file : {s3_path}") + print(f" BW file : {bw_path}") + print(f" Out dir : {outdir}") + print(f" Poll : {poll_interval*1000:.0f}ms") + print("Waiting for frames... (Ctrl+C to stop)") + print() + + s3_buf = bytearray() + bw_buf = bytearray() + s3_pos = 0 + bw_pos = 0 + + last_s3_count = 0 + last_bw_count = 0 + sessions: list[Session] = [] + prev_complete_session: Optional[Session] = None + + try: + while True: + # Read new bytes from both files + changed = False + + if s3_path.exists(): + with s3_path.open("rb") as fh: + fh.seek(s3_pos) + new_bytes = fh.read() + if new_bytes: + s3_buf.extend(new_bytes) + s3_pos += len(new_bytes) + changed = True + + if bw_path.exists(): + with bw_path.open("rb") as fh: + fh.seek(bw_pos) + new_bytes = fh.read() + if new_bytes: + bw_buf.extend(new_bytes) + bw_pos += len(new_bytes) + changed = True + + if changed: + s3_frames_raw = parse_s3(bytes(s3_buf), trailer_len=0) + bw_frames_raw = parse_bw(bytes(bw_buf), trailer_len=0, validate_checksum=True) + + s3_annotated = annotate_frames(s3_frames_raw, "S3") + bw_annotated = annotate_frames(bw_frames_raw, "BW") + + new_s3 = len(s3_annotated) - last_s3_count + new_bw = len(bw_annotated) - last_bw_count + + if new_s3 > 0 or new_bw > 0: + last_s3_count = len(s3_annotated) + last_bw_count = len(bw_annotated) + print(f"[+] S3:{len(s3_annotated)} BW:{len(bw_annotated)} frames", end="") + + # Annotate newest BW frame + if bw_annotated: + latest_bw = bw_annotated[-1] + sub_str = f"SUB={latest_bw.header.sub:02X}" if latest_bw.header else "SUB=??" + print(f" latest BW {sub_str} {latest_bw.sub_name}", end="") + print() + + # Check for session close + all_sessions = split_into_sessions(bw_annotated, s3_annotated) + # A complete session has the closing 0x74 + complete_sessions = [ + s for s in all_sessions + if any( + af.header is not None and af.header.sub == SESSION_CLOSE_SUB + for af in s.bw_frames + ) + ] + + # Emit reports for newly completed sessions + for sess in complete_sessions[len(sessions):]: + diffs: Optional[list[FrameDiff]] = None + prev_idx: Optional[int] = None + if prev_complete_session is not None: + diffs = diff_sessions(prev_complete_session, sess) + prev_idx = prev_complete_session.index + + report = render_session_report(sess, diffs, prev_idx) + out_path = write_report(sess, report, outdir) + n_diffs = len(diffs) if diffs else 0 + print(f"\n [+] Session {sess.index} complete: {len(sess.all_frames)} frames, " + f"{n_diffs} changed SUBs -> {out_path.name}\n") + prev_complete_session = sess + + sessions = complete_sessions + + time.sleep(poll_interval) + + except KeyboardInterrupt: + print("\nStopped.") + + # Emit any in-progress (incomplete) session as a partial report + if s3_buf or bw_buf: + s3_frames_raw = parse_s3(bytes(s3_buf), trailer_len=0) + bw_frames_raw = parse_bw(bytes(bw_buf), trailer_len=0, validate_checksum=True) + s3_annotated = annotate_frames(s3_frames_raw, "S3") + bw_annotated = annotate_frames(bw_frames_raw, "BW") + all_sessions = split_into_sessions(bw_annotated, s3_annotated) + incomplete = [ + s for s in all_sessions + if not any( + af.header is not None and af.header.sub == SESSION_CLOSE_SUB + for af in s.bw_frames + ) + ] + for sess in incomplete: + report = render_session_report(sess, diffs=None, prev_session_index=None) + out_path = write_report(sess, report, outdir) + print(f" Partial session {sess.index} written -> {out_path.name}") + + +# ────────────────────────────────────────────────────────────────────────────── +# CLI +# ────────────────────────────────────────────────────────────────────────────── + +def main() -> None: + ap = argparse.ArgumentParser( + description="s3_analyzer — Instantel MiniMate Plus live protocol analyzer" + ) + ap.add_argument("--s3", type=Path, required=True, help="Path to raw_s3.bin (S3→BW raw capture)") + ap.add_argument("--bw", type=Path, required=True, help="Path to raw_bw.bin (BW→S3 raw capture)") + ap.add_argument("--live", action="store_true", help="Live mode: tail files as they grow") + ap.add_argument("--export", action="store_true", help="Also write a claude_export_.md file for Claude analysis") + ap.add_argument("--outdir", type=Path, default=None, help="Output directory for .report files (default: same as input)") + ap.add_argument("--poll", type=float, default=0.05, help="Live mode poll interval in seconds (default: 0.05)") + args = ap.parse_args() + + outdir = args.outdir + if outdir is None: + outdir = args.s3.parent + + if args.live: + live_loop(args.s3, args.bw, outdir, poll_interval=args.poll) + else: + if not args.s3.exists(): + print(f"ERROR: S3 file not found: {args.s3}", file=sys.stderr) + sys.exit(1) + if not args.bw.exists(): + print(f"ERROR: BW file not found: {args.bw}", file=sys.stderr) + sys.exit(1) + run_postprocess(args.s3, args.bw, outdir, export=args.export) + + +if __name__ == "__main__": + main()