Add s3_analyzer.py for live protocol analysis of Instantel MiniMate Plus RS-232

- Implement functionality to read and parse raw_s3.bin and raw_bw.bin files.
- Define protocol constants and mappings for various command and response identifiers.
- Create data structures for frames, sessions, and diffs to facilitate analysis.
- Develop functions for annotating frames, splitting sessions, and generating reports.
- Include live mode for continuous monitoring and reporting of protocol frames.
- Add command-line interface for user interaction and configuration.
This commit is contained in:
serversdwn
2026-03-10 05:00:55 -04:00
parent faa869d03b
commit 154a11d057
4 changed files with 2250 additions and 1 deletions

24
.gitignore vendored
View File

@@ -1 +1,25 @@
/bridges/captures/
# Python bytecode
__pycache__/
*.py[cod]
# Virtual environments
.venv/
venv/
env/
# Editor / OS
.vscode/
*.swp
.DS_Store
Thumbs.db
# Analyzer outputs
*.report
claude_export_*.md
# Frame database
*.db
*.db-wal
*.db-shm

337
parsers/frame_db.py Normal file
View File

@@ -0,0 +1,337 @@
#!/usr/bin/env python3
"""
frame_db.py — SQLite frame database for Instantel protocol captures.
Schema:
captures — one row per ingested capture pair (deduped by SHA256)
frames — one row per parsed frame
byte_values — one row per (frame, offset, value) for fast indexed queries
Usage:
db = FrameDB() # opens default DB at ~/.seismo_lab/frames.db
db = FrameDB(path) # custom path
cap_id = db.ingest(sessions, s3_path, bw_path)
rows = db.query_frames(sub=0xF7, direction="S3")
rows = db.query_by_byte(offset=85, value=0x0A)
"""
from __future__ import annotations
import hashlib
import os
import sqlite3
import struct
from pathlib import Path
from typing import Optional
# ─────────────────────────────────────────────────────────────────────────────
# DB location
# ─────────────────────────────────────────────────────────────────────────────
DEFAULT_DB_DIR = Path.home() / ".seismo_lab"
DEFAULT_DB_PATH = DEFAULT_DB_DIR / "frames.db"
# ─────────────────────────────────────────────────────────────────────────────
# Schema
# ─────────────────────────────────────────────────────────────────────────────
_DDL = """
PRAGMA journal_mode=WAL;
PRAGMA foreign_keys=ON;
CREATE TABLE IF NOT EXISTS captures (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL, -- ISO-8601 ingest time
s3_path TEXT,
bw_path TEXT,
capture_hash TEXT NOT NULL UNIQUE, -- SHA256 of s3_blob+bw_blob
notes TEXT DEFAULT ''
);
CREATE TABLE IF NOT EXISTS frames (
id INTEGER PRIMARY KEY AUTOINCREMENT,
capture_id INTEGER NOT NULL REFERENCES captures(id) ON DELETE CASCADE,
session_idx INTEGER NOT NULL,
direction TEXT NOT NULL, -- 'BW' or 'S3'
sub INTEGER, -- NULL if malformed
page_key INTEGER,
sub_name TEXT,
payload BLOB NOT NULL,
payload_len INTEGER NOT NULL,
checksum_ok INTEGER -- 1/0/NULL
);
CREATE INDEX IF NOT EXISTS idx_frames_capture ON frames(capture_id);
CREATE INDEX IF NOT EXISTS idx_frames_sub ON frames(sub);
CREATE INDEX IF NOT EXISTS idx_frames_page_key ON frames(page_key);
CREATE INDEX IF NOT EXISTS idx_frames_dir ON frames(direction);
CREATE TABLE IF NOT EXISTS byte_values (
id INTEGER PRIMARY KEY AUTOINCREMENT,
frame_id INTEGER NOT NULL REFERENCES frames(id) ON DELETE CASCADE,
offset INTEGER NOT NULL,
value INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_bv_frame ON byte_values(frame_id);
CREATE INDEX IF NOT EXISTS idx_bv_offset ON byte_values(offset);
CREATE INDEX IF NOT EXISTS idx_bv_value ON byte_values(value);
CREATE INDEX IF NOT EXISTS idx_bv_off_val ON byte_values(offset, value);
"""
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _sha256_blobs(s3_blob: bytes, bw_blob: bytes) -> str:
h = hashlib.sha256()
h.update(s3_blob)
h.update(bw_blob)
return h.hexdigest()
def _interp_bytes(data: bytes, offset: int) -> dict:
"""
Return multi-interpretation dict for 14 bytes starting at offset.
Used in the GUI's byte interpretation panel.
"""
result: dict = {}
remaining = len(data) - offset
if remaining <= 0:
return result
b1 = data[offset]
result["uint8"] = b1
result["int8"] = b1 if b1 < 128 else b1 - 256
if remaining >= 2:
u16be = struct.unpack_from(">H", data, offset)[0]
u16le = struct.unpack_from("<H", data, offset)[0]
result["uint16_be"] = u16be
result["uint16_le"] = u16le
if remaining >= 4:
f32be = struct.unpack_from(">f", data, offset)[0]
f32le = struct.unpack_from("<f", data, offset)[0]
u32be = struct.unpack_from(">I", data, offset)[0]
u32le = struct.unpack_from("<I", data, offset)[0]
result["float32_be"] = round(f32be, 6)
result["float32_le"] = round(f32le, 6)
result["uint32_be"] = u32be
result["uint32_le"] = u32le
return result
# ─────────────────────────────────────────────────────────────────────────────
# FrameDB class
# ─────────────────────────────────────────────────────────────────────────────
class FrameDB:
def __init__(self, path: Optional[Path] = None) -> None:
if path is None:
path = DEFAULT_DB_PATH
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
self.path = path
self._con = sqlite3.connect(str(path), check_same_thread=False)
self._con.row_factory = sqlite3.Row
self._init_schema()
def _init_schema(self) -> None:
self._con.executescript(_DDL)
self._con.commit()
def close(self) -> None:
self._con.close()
# ── Ingest ────────────────────────────────────────────────────────────
def ingest(
self,
sessions: list, # list[Session] from s3_analyzer
s3_path: Optional[Path],
bw_path: Optional[Path],
notes: str = "",
) -> Optional[int]:
"""
Ingest a list of sessions into the DB.
Returns capture_id, or None if already ingested (duplicate hash).
"""
import datetime
s3_blob = s3_path.read_bytes() if s3_path and s3_path.exists() else b""
bw_blob = bw_path.read_bytes() if bw_path and bw_path.exists() else b""
cap_hash = _sha256_blobs(s3_blob, bw_blob)
# Dedup check
row = self._con.execute(
"SELECT id FROM captures WHERE capture_hash=?", (cap_hash,)
).fetchone()
if row:
return None # already in DB
ts = datetime.datetime.now().isoformat(timespec="seconds")
cur = self._con.execute(
"INSERT INTO captures (timestamp, s3_path, bw_path, capture_hash, notes) "
"VALUES (?, ?, ?, ?, ?)",
(ts, str(s3_path) if s3_path else None,
str(bw_path) if bw_path else None,
cap_hash, notes)
)
cap_id = cur.lastrowid
for sess in sessions:
for af in sess.all_frames:
frame_id = self._insert_frame(cap_id, af)
self._insert_byte_values(frame_id, af.frame.payload)
self._con.commit()
return cap_id
def _insert_frame(self, cap_id: int, af) -> int:
"""Insert one AnnotatedFrame; return its rowid."""
sub = af.header.sub if af.header else None
page_key = af.header.page_key if af.header else None
chk_ok = None
if af.frame.checksum_valid is True:
chk_ok = 1
elif af.frame.checksum_valid is False:
chk_ok = 0
cur = self._con.execute(
"INSERT INTO frames "
"(capture_id, session_idx, direction, sub, page_key, sub_name, payload, payload_len, checksum_ok) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(cap_id, af.session_idx, af.source,
sub, page_key, af.sub_name,
af.frame.payload, len(af.frame.payload), chk_ok)
)
return cur.lastrowid
def _insert_byte_values(self, frame_id: int, payload: bytes) -> None:
"""Insert one row per byte in payload into byte_values."""
rows = [(frame_id, i, b) for i, b in enumerate(payload)]
self._con.executemany(
"INSERT INTO byte_values (frame_id, offset, value) VALUES (?, ?, ?)",
rows
)
# ── Queries ───────────────────────────────────────────────────────────
def list_captures(self) -> list[sqlite3.Row]:
return self._con.execute(
"SELECT id, timestamp, s3_path, bw_path, notes, "
" (SELECT COUNT(*) FROM frames WHERE capture_id=captures.id) AS frame_count "
"FROM captures ORDER BY id DESC"
).fetchall()
def query_frames(
self,
capture_id: Optional[int] = None,
direction: Optional[str] = None, # "BW" or "S3"
sub: Optional[int] = None,
page_key: Optional[int] = None,
limit: int = 500,
) -> list[sqlite3.Row]:
"""
Query frames table with optional filters.
Returns rows with: id, capture_id, session_idx, direction, sub, page_key,
sub_name, payload, payload_len, checksum_ok
"""
clauses = []
params = []
if capture_id is not None:
clauses.append("capture_id=?"); params.append(capture_id)
if direction is not None:
clauses.append("direction=?"); params.append(direction)
if sub is not None:
clauses.append("sub=?"); params.append(sub)
if page_key is not None:
clauses.append("page_key=?"); params.append(page_key)
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
sql = f"SELECT * FROM frames {where} ORDER BY id LIMIT ?"
params.append(limit)
return self._con.execute(sql, params).fetchall()
def query_by_byte(
self,
offset: int,
value: Optional[int] = None,
capture_id: Optional[int] = None,
direction: Optional[str] = None,
sub: Optional[int] = None,
limit: int = 500,
) -> list[sqlite3.Row]:
"""
Return frames that have a specific byte at a specific offset.
Joins byte_values -> frames for indexed lookup.
"""
clauses = ["bv.offset=?"]
params = [offset]
if value is not None:
clauses.append("bv.value=?"); params.append(value)
if capture_id is not None:
clauses.append("f.capture_id=?"); params.append(capture_id)
if direction is not None:
clauses.append("f.direction=?"); params.append(direction)
if sub is not None:
clauses.append("f.sub=?"); params.append(sub)
where = "WHERE " + " AND ".join(clauses)
sql = (
f"SELECT f.*, bv.offset AS q_offset, bv.value AS q_value "
f"FROM byte_values bv "
f"JOIN frames f ON f.id=bv.frame_id "
f"{where} "
f"ORDER BY f.id LIMIT ?"
)
params.append(limit)
return self._con.execute(sql, params).fetchall()
def get_frame_payload(self, frame_id: int) -> Optional[bytes]:
row = self._con.execute(
"SELECT payload FROM frames WHERE id=?", (frame_id,)
).fetchone()
return bytes(row["payload"]) if row else None
def get_distinct_subs(self, capture_id: Optional[int] = None) -> list[int]:
if capture_id is not None:
rows = self._con.execute(
"SELECT DISTINCT sub FROM frames WHERE capture_id=? AND sub IS NOT NULL ORDER BY sub",
(capture_id,)
).fetchall()
else:
rows = self._con.execute(
"SELECT DISTINCT sub FROM frames WHERE sub IS NOT NULL ORDER BY sub"
).fetchall()
return [r[0] for r in rows]
def get_distinct_offsets(self, capture_id: Optional[int] = None) -> list[int]:
if capture_id is not None:
rows = self._con.execute(
"SELECT DISTINCT bv.offset FROM byte_values bv "
"JOIN frames f ON f.id=bv.frame_id WHERE f.capture_id=? ORDER BY bv.offset",
(capture_id,)
).fetchall()
else:
rows = self._con.execute(
"SELECT DISTINCT offset FROM byte_values ORDER BY offset"
).fetchall()
return [r[0] for r in rows]
def interpret_offset(self, payload: bytes, offset: int) -> dict:
"""Return multi-format interpretation of bytes starting at offset."""
return _interp_bytes(payload, offset)
def get_stats(self) -> dict:
captures = self._con.execute("SELECT COUNT(*) FROM captures").fetchone()[0]
frames = self._con.execute("SELECT COUNT(*) FROM frames").fetchone()[0]
bv_rows = self._con.execute("SELECT COUNT(*) FROM byte_values").fetchone()[0]
return {"captures": captures, "frames": frames, "byte_value_rows": bv_rows}

940
parsers/gui_analyzer.py Normal file
View File

@@ -0,0 +1,940 @@
#!/usr/bin/env python3
"""
gui_analyzer.py — Tkinter GUI for s3_analyzer.
Layout:
┌─────────────────────────────────────────────────────────┐
│ [S3 file: ___________ Browse] [BW file: ___ Browse] │
│ [Analyze] [Live mode toggle] Status: Idle │
├──────────────────┬──────────────────────────────────────┤
│ Session list │ Detail panel (tabs) │
│ ─ Session 0 │ Inventory | Hex Dump | Diff │
│ └ POLL (BW) │ │
│ └ POLL_RESP │ (content of selected tab) │
│ ─ Session 1 │ │
│ └ ... │ │
└──────────────────┴──────────────────────────────────────┘
│ Status bar │
└─────────────────────────────────────────────────────────┘
"""
from __future__ import annotations
import queue
import sys
import threading
import time
import tkinter as tk
from pathlib import Path
from tkinter import filedialog, font, messagebox, ttk
from typing import Optional
sys.path.insert(0, str(Path(__file__).parent))
from s3_analyzer import ( # noqa: E402
AnnotatedFrame,
FrameDiff,
Session,
annotate_frames,
diff_sessions,
format_hex_dump,
parse_bw,
parse_s3,
render_session_report,
split_into_sessions,
write_claude_export,
)
from frame_db import FrameDB, DEFAULT_DB_PATH # noqa: E402
# ──────────────────────────────────────────────────────────────────────────────
# Colour palette (dark-ish terminal feel)
# ──────────────────────────────────────────────────────────────────────────────
BG = "#1e1e1e"
BG2 = "#252526"
BG3 = "#2d2d30"
FG = "#d4d4d4"
FG_DIM = "#6a6a6a"
ACCENT = "#569cd6"
ACCENT2 = "#4ec9b0"
RED = "#f44747"
YELLOW = "#dcdcaa"
GREEN = "#4caf50"
ORANGE = "#ce9178"
COL_BW = "#9cdcfe" # BW frames
COL_S3 = "#4ec9b0" # S3 frames
COL_DIFF = "#f44747" # Changed bytes
COL_KNOW = "#4caf50" # Known-field annotations
COL_HEAD = "#569cd6" # Section headers
MONO = ("Consolas", 9)
MONO_SM = ("Consolas", 8)
# ──────────────────────────────────────────────────────────────────────────────
# State container
# ──────────────────────────────────────────────────────────────────────────────
class AnalyzerState:
def __init__(self) -> None:
self.sessions: list[Session] = []
self.diffs: list[Optional[list[FrameDiff]]] = [] # diffs[i] = diff of session i vs i-1
self.s3_path: Optional[Path] = None
self.bw_path: Optional[Path] = None
self.last_capture_id: Optional[int] = None
# ──────────────────────────────────────────────────────────────────────────────
# Main GUI
# ──────────────────────────────────────────────────────────────────────────────
class AnalyzerGUI(tk.Tk):
def __init__(self) -> None:
super().__init__()
self.title("S3 Protocol Analyzer")
self.configure(bg=BG)
self.minsize(1050, 600)
self.state = AnalyzerState()
self._live_thread: Optional[threading.Thread] = None
self._live_stop = threading.Event()
self._live_q: queue.Queue[str] = queue.Queue()
self._db = FrameDB()
self._build_widgets()
self._poll_live_queue()
# ── widget construction ────────────────────────────────────────────────
def _build_widgets(self) -> None:
self._build_toolbar()
self._build_panes()
self._build_statusbar()
def _build_toolbar(self) -> None:
bar = tk.Frame(self, bg=BG2, pady=4)
bar.pack(side=tk.TOP, fill=tk.X)
pad = {"padx": 5, "pady": 2}
# S3 file
tk.Label(bar, text="S3 raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self.s3_var = tk.StringVar()
tk.Entry(bar, textvariable=self.s3_var, width=28, bg=BG3, fg=FG,
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
tk.Button(bar, text="Browse", bg=BG3, fg=FG, relief="flat",
activebackground=ACCENT, cursor="hand2",
command=lambda: self._browse_file(self.s3_var, "raw_s3.bin")
).pack(side=tk.LEFT, **pad)
tk.Label(bar, text=" BW raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self.bw_var = tk.StringVar()
tk.Entry(bar, textvariable=self.bw_var, width=28, bg=BG3, fg=FG,
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
tk.Button(bar, text="Browse", bg=BG3, fg=FG, relief="flat",
activebackground=ACCENT, cursor="hand2",
command=lambda: self._browse_file(self.bw_var, "raw_bw.bin")
).pack(side=tk.LEFT, **pad)
# Buttons
tk.Frame(bar, bg=BG2, width=10).pack(side=tk.LEFT)
self.analyze_btn = tk.Button(bar, text="Analyze", bg=ACCENT, fg="#ffffff",
relief="flat", padx=10, cursor="hand2",
font=("Consolas", 9, "bold"),
command=self._run_analyze)
self.analyze_btn.pack(side=tk.LEFT, **pad)
self.live_btn = tk.Button(bar, text="Live: OFF", bg=BG3, fg=FG,
relief="flat", padx=10, cursor="hand2",
font=MONO, command=self._toggle_live)
self.live_btn.pack(side=tk.LEFT, **pad)
self.export_btn = tk.Button(bar, text="Export for Claude", bg=ORANGE, fg="#000000",
relief="flat", padx=10, cursor="hand2",
font=("Consolas", 9, "bold"),
command=self._run_export, state="disabled")
self.export_btn.pack(side=tk.LEFT, **pad)
self.status_var = tk.StringVar(value="Idle")
tk.Label(bar, textvariable=self.status_var, bg=BG2, fg=FG_DIM,
font=MONO, anchor="w").pack(side=tk.LEFT, padx=10)
def _build_panes(self) -> None:
pane = tk.PanedWindow(self, orient=tk.HORIZONTAL, bg=BG,
sashwidth=4, sashrelief="flat")
pane.pack(fill=tk.BOTH, expand=True, padx=0, pady=0)
# ── Left: session/frame tree ──────────────────────────────────────
left = tk.Frame(pane, bg=BG2, width=260)
pane.add(left, minsize=200)
tk.Label(left, text="Sessions", bg=BG2, fg=ACCENT,
font=("Consolas", 9, "bold"), anchor="w", padx=6).pack(fill=tk.X)
tree_frame = tk.Frame(left, bg=BG2)
tree_frame.pack(fill=tk.BOTH, expand=True)
style = ttk.Style()
style.theme_use("clam")
style.configure("Treeview",
background=BG2, foreground=FG, fieldbackground=BG2,
font=MONO_SM, rowheight=18, borderwidth=0)
style.configure("Treeview.Heading",
background=BG3, foreground=ACCENT, font=MONO_SM)
style.map("Treeview", background=[("selected", BG3)],
foreground=[("selected", "#ffffff")])
self.tree = ttk.Treeview(tree_frame, columns=("info",), show="tree headings",
selectmode="browse")
self.tree.heading("#0", text="Frame")
self.tree.heading("info", text="Info")
self.tree.column("#0", width=160, stretch=True)
self.tree.column("info", width=80, stretch=False)
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.tree.yview)
self.tree.configure(yscrollcommand=vsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
self.tree.pack(fill=tk.BOTH, expand=True)
self.tree.tag_configure("session", foreground=ACCENT, font=("Consolas", 9, "bold"))
self.tree.tag_configure("bw_frame", foreground=COL_BW)
self.tree.tag_configure("s3_frame", foreground=COL_S3)
self.tree.tag_configure("bad_chk", foreground=RED)
self.tree.tag_configure("malformed", foreground=RED)
self.tree.bind("<<TreeviewSelect>>", self._on_tree_select)
# ── Right: detail notebook ────────────────────────────────────────
right = tk.Frame(pane, bg=BG)
pane.add(right, minsize=600)
style.configure("TNotebook", background=BG2, borderwidth=0)
style.configure("TNotebook.Tab", background=BG3, foreground=FG,
font=MONO, padding=[8, 2])
style.map("TNotebook.Tab", background=[("selected", BG)],
foreground=[("selected", ACCENT)])
self.nb = ttk.Notebook(right)
self.nb.pack(fill=tk.BOTH, expand=True)
# Tab: Inventory
self.inv_text = self._make_text_tab("Inventory")
# Tab: Hex Dump
self.hex_text = self._make_text_tab("Hex Dump")
# Tab: Diff
self.diff_text = self._make_text_tab("Diff")
# Tab: Full Report (raw text)
self.report_text = self._make_text_tab("Full Report")
# Tab: Query (DB)
self._build_query_tab()
# Tag colours for rich text in all tabs
for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text):
w.tag_configure("head", foreground=COL_HEAD, font=("Consolas", 9, "bold"))
w.tag_configure("bw", foreground=COL_BW)
w.tag_configure("s3", foreground=COL_S3)
w.tag_configure("changed", foreground=COL_DIFF)
w.tag_configure("known", foreground=COL_KNOW)
w.tag_configure("dim", foreground=FG_DIM)
w.tag_configure("normal", foreground=FG)
w.tag_configure("warn", foreground=YELLOW)
w.tag_configure("addr", foreground=ORANGE)
def _make_text_tab(self, title: str) -> tk.Text:
frame = tk.Frame(self.nb, bg=BG)
self.nb.add(frame, text=title)
w = tk.Text(frame, bg=BG, fg=FG, font=MONO, state="disabled",
relief="flat", wrap="none", insertbackground=FG,
selectbackground=BG3, selectforeground="#ffffff")
vsb = ttk.Scrollbar(frame, orient="vertical", command=w.yview)
hsb = ttk.Scrollbar(frame, orient="horizontal", command=w.xview)
w.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
hsb.pack(side=tk.BOTTOM, fill=tk.X)
w.pack(fill=tk.BOTH, expand=True)
return w
def _build_query_tab(self) -> None:
"""Build the Query tab: filter controls + results table + interpretation panel."""
frame = tk.Frame(self.nb, bg=BG)
self.nb.add(frame, text="Query DB")
# ── Filter row ────────────────────────────────────────────────────
filt = tk.Frame(frame, bg=BG2, pady=4)
filt.pack(side=tk.TOP, fill=tk.X)
pad = {"padx": 4, "pady": 2}
# Capture filter
tk.Label(filt, text="Capture:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=0, sticky="e", **pad)
self._q_capture_var = tk.StringVar(value="All")
self._q_capture_cb = ttk.Combobox(filt, textvariable=self._q_capture_var,
width=18, font=MONO_SM, state="readonly")
self._q_capture_cb.grid(row=0, column=1, sticky="w", **pad)
# Direction filter
tk.Label(filt, text="Dir:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=2, sticky="e", **pad)
self._q_dir_var = tk.StringVar(value="All")
self._q_dir_cb = ttk.Combobox(filt, textvariable=self._q_dir_var,
values=["All", "BW", "S3"],
width=6, font=MONO_SM, state="readonly")
self._q_dir_cb.grid(row=0, column=3, sticky="w", **pad)
# SUB filter
tk.Label(filt, text="SUB:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=4, sticky="e", **pad)
self._q_sub_var = tk.StringVar(value="All")
self._q_sub_cb = ttk.Combobox(filt, textvariable=self._q_sub_var,
width=12, font=MONO_SM, state="readonly")
self._q_sub_cb.grid(row=0, column=5, sticky="w", **pad)
# Byte offset filter
tk.Label(filt, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=6, sticky="e", **pad)
self._q_offset_var = tk.StringVar(value="")
tk.Entry(filt, textvariable=self._q_offset_var, width=8, bg=BG3, fg=FG,
font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=7, sticky="w", **pad)
# Value filter
tk.Label(filt, text="Value:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=8, sticky="e", **pad)
self._q_value_var = tk.StringVar(value="")
tk.Entry(filt, textvariable=self._q_value_var, width=8, bg=BG3, fg=FG,
font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=9, sticky="w", **pad)
# Run / Refresh buttons
tk.Button(filt, text="Run Query", bg=ACCENT, fg="#ffffff", relief="flat",
padx=8, cursor="hand2", font=("Consolas", 8, "bold"),
command=self._run_db_query).grid(row=0, column=10, padx=8)
tk.Button(filt, text="Refresh dropdowns", bg=BG3, fg=FG, relief="flat",
padx=6, cursor="hand2", font=MONO_SM,
command=self._refresh_query_dropdowns).grid(row=0, column=11, padx=4)
# DB stats label
self._q_stats_var = tk.StringVar(value="DB: —")
tk.Label(filt, textvariable=self._q_stats_var, bg=BG2, fg=FG_DIM,
font=MONO_SM).grid(row=0, column=12, padx=12, sticky="w")
# ── Results table ─────────────────────────────────────────────────
res_frame = tk.Frame(frame, bg=BG)
res_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
# Results treeview
cols = ("cap", "sess", "dir", "sub", "sub_name", "page", "len", "chk")
self._q_tree = ttk.Treeview(res_frame, columns=cols,
show="headings", selectmode="browse")
col_cfg = [
("cap", "Cap", 40),
("sess", "Sess", 40),
("dir", "Dir", 40),
("sub", "SUB", 50),
("sub_name", "Name", 160),
("page", "Page", 60),
("len", "Len", 50),
("chk", "Chk", 50),
]
for cid, heading, width in col_cfg:
self._q_tree.heading(cid, text=heading, anchor="w")
self._q_tree.column(cid, width=width, stretch=(cid == "sub_name"))
q_vsb = ttk.Scrollbar(res_frame, orient="vertical", command=self._q_tree.yview)
q_hsb = ttk.Scrollbar(res_frame, orient="horizontal", command=self._q_tree.xview)
self._q_tree.configure(yscrollcommand=q_vsb.set, xscrollcommand=q_hsb.set)
q_vsb.pack(side=tk.RIGHT, fill=tk.Y)
q_hsb.pack(side=tk.BOTTOM, fill=tk.X)
self._q_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self._q_tree.tag_configure("bw_row", foreground=COL_BW)
self._q_tree.tag_configure("s3_row", foreground=COL_S3)
self._q_tree.tag_configure("bad_row", foreground=RED)
# ── Interpretation panel (below results) ──────────────────────────
interp_frame = tk.Frame(frame, bg=BG2, height=120)
interp_frame.pack(side=tk.BOTTOM, fill=tk.X)
interp_frame.pack_propagate(False)
tk.Label(interp_frame, text="Byte interpretation (click a row, enter offset):",
bg=BG2, fg=ACCENT, font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X)
interp_inner = tk.Frame(interp_frame, bg=BG2)
interp_inner.pack(fill=tk.X, padx=6, pady=2)
tk.Label(interp_inner, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).pack(side=tk.LEFT)
self._interp_offset_var = tk.StringVar(value="5")
tk.Entry(interp_inner, textvariable=self._interp_offset_var,
width=6, bg=BG3, fg=FG, font=MONO_SM,
insertbackground=FG, relief="flat").pack(side=tk.LEFT, padx=4)
tk.Button(interp_inner, text="Interpret", bg=BG3, fg=FG, relief="flat",
cursor="hand2", font=MONO_SM,
command=self._run_interpret).pack(side=tk.LEFT, padx=4)
self._interp_text = tk.Text(interp_frame, bg=BG2, fg=FG, font=MONO_SM,
height=4, relief="flat", state="disabled",
insertbackground=FG)
self._interp_text.pack(fill=tk.X, padx=6, pady=2)
self._interp_text.tag_configure("label", foreground=FG_DIM)
self._interp_text.tag_configure("value", foreground=YELLOW)
# Store frame rows by tree iid -> db row
self._q_rows: dict[str, object] = {}
self._q_capture_rows: list = [None]
self._q_sub_values: list = [None]
self._q_tree.bind("<<TreeviewSelect>>", self._on_q_select)
# Init dropdowns
self._refresh_query_dropdowns()
def _refresh_query_dropdowns(self) -> None:
"""Reload capture and SUB dropdowns from the DB."""
try:
captures = self._db.list_captures()
cap_labels = ["All"] + [
f"#{r['id']} {r['timestamp'][:16]} ({r['frame_count']} frames)"
for r in captures
]
self._q_capture_cb["values"] = cap_labels
self._q_capture_rows = [None] + [r["id"] for r in captures]
subs = self._db.get_distinct_subs()
sub_labels = ["All"] + [f"0x{s:02X}" for s in subs]
self._q_sub_cb["values"] = sub_labels
self._q_sub_values = [None] + subs
stats = self._db.get_stats()
self._q_stats_var.set(
f"DB: {stats['captures']} captures | {stats['frames']} frames"
)
except Exception as exc:
self._q_stats_var.set(f"DB error: {exc}")
def _parse_hex_or_int(self, s: str) -> Optional[int]:
"""Parse '0x1F', '31', or '' into int or None."""
s = s.strip()
if not s:
return None
try:
return int(s, 0)
except ValueError:
return None
def _run_db_query(self) -> None:
"""Execute query with current filter values and populate results tree."""
# Resolve capture_id
cap_idx = self._q_capture_cb.current()
cap_id = self._q_capture_rows[cap_idx] if cap_idx > 0 else None
# Direction
dir_val = self._q_dir_var.get()
direction = dir_val if dir_val != "All" else None
# SUB
sub_idx = self._q_sub_cb.current()
sub = self._q_sub_values[sub_idx] if sub_idx > 0 else None
# Offset / value
offset = self._parse_hex_or_int(self._q_offset_var.get())
value = self._parse_hex_or_int(self._q_value_var.get())
try:
if offset is not None:
rows = self._db.query_by_byte(
offset=offset, value=value,
capture_id=cap_id, direction=direction, sub=sub
)
else:
rows = self._db.query_frames(
capture_id=cap_id, direction=direction, sub=sub
)
except Exception as exc:
messagebox.showerror("Query error", str(exc))
return
# Populate tree
self._q_tree.delete(*self._q_tree.get_children())
self._q_rows.clear()
for row in rows:
sub_hex = f"0x{row['sub']:02X}" if row["sub"] is not None else ""
page_hex = f"0x{row['page_key']:04X}" if row["page_key"] is not None else ""
chk_str = {1: "OK", 0: "BAD", None: ""}.get(row["checksum_ok"], "")
tag = "bw_row" if row["direction"] == "BW" else "s3_row"
if row["checksum_ok"] == 0:
tag = "bad_row"
iid = str(row["id"])
self._q_tree.insert("", tk.END, iid=iid, tags=(tag,), values=(
row["capture_id"],
row["session_idx"],
row["direction"],
sub_hex,
row["sub_name"] or "",
page_hex,
row["payload_len"],
chk_str,
))
self._q_rows[iid] = row
self.sb_var.set(f"Query returned {len(rows)} rows")
def _on_q_select(self, _event: tk.Event) -> None:
"""When a DB result row is selected, auto-run interpret at current offset."""
self._run_interpret()
def _run_interpret(self) -> None:
"""Show multi-format byte interpretation for the selected row + offset."""
sel = self._q_tree.selection()
if not sel:
return
iid = sel[0]
row = self._q_rows.get(iid)
if row is None:
return
offset = self._parse_hex_or_int(self._interp_offset_var.get())
if offset is None:
return
payload = bytes(row["payload"])
interp = self._db.interpret_offset(payload, offset)
w = self._interp_text
w.configure(state="normal")
w.delete("1.0", tk.END)
sub_hex = f"0x{row['sub']:02X}" if row["sub"] is not None else "??"
w.insert(tk.END, f"Frame #{row['id']} [{row['direction']}] SUB={sub_hex} "
f"offset={offset} (0x{offset:04X})\n", "label")
label_order = [
("uint8", "uint8 "),
("int8", "int8 "),
("uint16_be", "uint16 BE "),
("uint16_le", "uint16 LE "),
("uint32_be", "uint32 BE "),
("uint32_le", "uint32 LE "),
("float32_be", "float32 BE "),
("float32_le", "float32 LE "),
]
line = ""
for key, label in label_order:
if key in interp:
val = interp[key]
if isinstance(val, float):
val_str = f"{val:.6g}"
else:
val_str = str(val)
if key.startswith("uint") or key.startswith("int"):
val_str += f" (0x{int(val) & 0xFFFFFFFF:X})"
chunk = f"{label}: {val_str}"
line += f" {chunk:<30}"
if len(line) > 80:
w.insert(tk.END, line + "\n", "value")
line = ""
if line:
w.insert(tk.END, line + "\n", "value")
w.configure(state="disabled")
def _build_statusbar(self) -> None:
bar = tk.Frame(self, bg=BG3, height=20)
bar.pack(side=tk.BOTTOM, fill=tk.X)
self.sb_var = tk.StringVar(value="Ready")
tk.Label(bar, textvariable=self.sb_var, bg=BG3, fg=FG_DIM,
font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X)
# ── file picking ───────────────────────────────────────────────────────
def _browse_file(self, var: tk.StringVar, default_name: str) -> None:
path = filedialog.askopenfilename(
title=f"Select {default_name}",
filetypes=[("Binary files", "*.bin"), ("All files", "*.*")],
initialfile=default_name,
)
if path:
var.set(path)
# ── analysis ──────────────────────────────────────────────────────────
def _run_analyze(self) -> None:
s3_path = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None
bw_path = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None
if not s3_path or not bw_path:
messagebox.showerror("Missing files", "Please select both S3 and BW raw files.")
return
if not s3_path.exists():
messagebox.showerror("File not found", f"S3 file not found:\n{s3_path}")
return
if not bw_path.exists():
messagebox.showerror("File not found", f"BW file not found:\n{bw_path}")
return
self.state.s3_path = s3_path
self.state.bw_path = bw_path
self._do_analyze(s3_path, bw_path)
def _run_export(self) -> None:
if not self.state.sessions:
messagebox.showinfo("Export", "Run Analyze first.")
return
outdir = self.state.s3_path.parent if self.state.s3_path else Path(".")
out_path = write_claude_export(
self.state.sessions,
self.state.diffs,
outdir,
self.state.s3_path,
self.state.bw_path,
)
self.sb_var.set(f"Exported: {out_path.name}")
if messagebox.askyesno(
"Export complete",
f"Saved to:\n{out_path}\n\nOpen the folder?",
):
import subprocess
subprocess.Popen(["explorer", str(out_path.parent)])
def _do_analyze(self, s3_path: Path, bw_path: Path) -> None:
self.status_var.set("Parsing...")
self.update_idletasks()
s3_blob = s3_path.read_bytes()
bw_blob = bw_path.read_bytes()
s3_frames = annotate_frames(parse_s3(s3_blob, trailer_len=0), "S3")
bw_frames = annotate_frames(parse_bw(bw_blob, trailer_len=0, validate_checksum=True), "BW")
sessions = split_into_sessions(bw_frames, s3_frames)
diffs: list[Optional[list[FrameDiff]]] = [None]
for i in range(1, len(sessions)):
diffs.append(diff_sessions(sessions[i - 1], sessions[i]))
self.state.sessions = sessions
self.state.diffs = diffs
n_s3 = sum(len(s.s3_frames) for s in sessions)
n_bw = sum(len(s.bw_frames) for s in sessions)
self.status_var.set(
f"{len(sessions)} sessions | BW: {n_bw} frames S3: {n_s3} frames"
)
self.sb_var.set(f"Loaded: {s3_path.name} + {bw_path.name}")
self.export_btn.configure(state="normal")
self._rebuild_tree()
# Auto-ingest into DB (deduped by SHA256 — fast no-op on re-analyze)
try:
cap_id = self._db.ingest(sessions, s3_path, bw_path)
if cap_id is not None:
self.state.last_capture_id = cap_id
self._refresh_query_dropdowns()
# Pre-select this capture in the Query tab
cap_labels = list(self._q_capture_cb["values"])
# Find label that starts with #<cap_id>
for i, lbl in enumerate(cap_labels):
if lbl.startswith(f"#{cap_id} "):
self._q_capture_cb.current(i)
break
# else: already ingested — no change to dropdown selection
except Exception as exc:
self.sb_var.set(f"DB ingest error: {exc}")
# ── tree building ──────────────────────────────────────────────────────
def _rebuild_tree(self) -> None:
self.tree.delete(*self.tree.get_children())
for sess in self.state.sessions:
is_complete = any(
af.header is not None and af.header.sub == 0x74
for af in sess.bw_frames
)
label = f"Session {sess.index}"
if not is_complete:
label += " [partial]"
n_diff = len(self.state.diffs[sess.index] or [])
diff_info = f"{n_diff} changes" if n_diff > 0 else ""
sess_id = self.tree.insert("", tk.END, text=label,
values=(diff_info,), tags=("session",))
for af in sess.all_frames:
src_tag = "bw_frame" if af.source == "BW" else "s3_frame"
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
label_text = f"[{af.source}] {sub_hex} {af.sub_name}"
extra = ""
tags = (src_tag,)
if af.frame.checksum_valid is False:
extra = "BAD CHK"
tags = ("bad_chk",)
elif af.header is None:
tags = ("malformed",)
label_text = f"[{af.source}] MALFORMED"
self.tree.insert(sess_id, tk.END, text=label_text,
values=(extra,), tags=tags,
iid=f"frame_{sess.index}_{af.frame.index}_{af.source}")
# Expand all sessions
for item in self.tree.get_children():
self.tree.item(item, open=True)
# ── tree selection → detail panel ─────────────────────────────────────
def _on_tree_select(self, _event: tk.Event) -> None:
sel = self.tree.selection()
if not sel:
return
iid = sel[0]
# Determine if it's a session node or a frame node
if iid.startswith("frame_"):
# frame_<sessidx>_<frameidx>_<source>
parts = iid.split("_")
sess_idx = int(parts[1])
frame_idx = int(parts[2])
source = parts[3]
self._show_frame_detail(sess_idx, frame_idx, source)
else:
# Session node — show session summary
# Find session index from text
text = self.tree.item(iid, "text")
try:
idx = int(text.split()[1])
self._show_session_detail(idx)
except (IndexError, ValueError):
pass
def _find_frame(self, sess_idx: int, frame_idx: int, source: str) -> Optional[AnnotatedFrame]:
if sess_idx >= len(self.state.sessions):
return None
sess = self.state.sessions[sess_idx]
pool = sess.bw_frames if source == "BW" else sess.s3_frames
for af in pool:
if af.frame.index == frame_idx:
return af
return None
# ── detail renderers ──────────────────────────────────────────────────
def _clear_all_tabs(self) -> None:
for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text):
self._text_clear(w)
def _show_session_detail(self, sess_idx: int) -> None:
if sess_idx >= len(self.state.sessions):
return
sess = self.state.sessions[sess_idx]
diffs = self.state.diffs[sess_idx]
self._clear_all_tabs()
# ── Inventory tab ────────────────────────────────────────────────
w = self.inv_text
self._text_clear(w)
self._tw(w, f"SESSION {sess.index}", "head"); self._tn(w)
n_bw, n_s3 = len(sess.bw_frames), len(sess.s3_frames)
self._tw(w, f"Frames: {n_bw + n_s3} (BW: {n_bw}, S3: {n_s3})\n", "normal")
if n_bw != n_s3:
self._tw(w, " WARNING: BW/S3 count mismatch\n", "warn")
self._tn(w)
for seq_i, af in enumerate(sess.all_frames):
src_tag = "bw" if af.source == "BW" else "s3"
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
page_str = f" (page {af.header.page_key:04X})" if af.header and af.header.page_key != 0 else ""
chk = ""
if af.frame.checksum_valid is False:
chk = " [BAD CHECKSUM]"
elif af.frame.checksum_valid is True:
chk = f" [{af.frame.checksum_type}]"
self._tw(w, f" [{af.source}] #{seq_i:<3} ", src_tag)
self._tw(w, f"SUB={sub_hex} ", "addr")
self._tw(w, f"{af.sub_name:<30}", src_tag)
self._tw(w, f"{page_str} len={len(af.frame.payload)}", "dim")
if chk:
self._tw(w, chk, "warn" if af.frame.checksum_valid is False else "dim")
self._tn(w)
# ── Diff tab ─────────────────────────────────────────────────────
w = self.diff_text
self._text_clear(w)
if diffs is None:
self._tw(w, "(No previous session to diff against)\n", "dim")
elif not diffs:
self._tw(w, f"DIFF vs SESSION {sess_idx - 1}\n", "head"); self._tn(w)
self._tw(w, " No changes detected.\n", "dim")
else:
self._tw(w, f"DIFF vs SESSION {sess_idx - 1}\n", "head"); self._tn(w)
for fd in diffs:
page_str = f" (page {fd.page_key:04X})" if fd.page_key != 0 else ""
self._tw(w, f"\n SUB {fd.sub:02X} ({fd.sub_name}){page_str}:\n", "addr")
for bd in fd.diffs:
before_s = f"{bd.before:02x}" if bd.before >= 0 else "--"
after_s = f"{bd.after:02x}" if bd.after >= 0 else "--"
self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim")
self._tw(w, f"{before_s} -> {after_s}", "changed")
if bd.field_name:
self._tw(w, f" [{bd.field_name}]", "known")
self._tn(w)
# ── Full Report tab ───────────────────────────────────────────────
report_text = render_session_report(sess, diffs, sess_idx - 1 if sess_idx > 0 else None)
w = self.report_text
self._text_clear(w)
self._tw(w, report_text, "normal")
# Switch to Inventory tab
self.nb.select(0)
def _show_frame_detail(self, sess_idx: int, frame_idx: int, source: str) -> None:
af = self._find_frame(sess_idx, frame_idx, source)
if af is None:
return
self._clear_all_tabs()
src_tag = "bw" if source == "BW" else "s3"
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
# ── Inventory tab — single frame summary ─────────────────────────
w = self.inv_text
self._tw(w, f"[{af.source}] Frame #{af.frame.index}\n", src_tag)
self._tw(w, f"Session {sess_idx} | ", "dim")
self._tw(w, f"SUB={sub_hex} {af.sub_name}\n", "addr")
if af.header:
self._tw(w, f" OFFSET: {af.header.page_key:04X} ", "dim")
self._tw(w, f"CMD={af.header.cmd:02X} FLAGS={af.header.flags:02X}\n", "dim")
self._tn(w)
self._tw(w, f"Payload bytes: {len(af.frame.payload)}\n", "dim")
if af.frame.checksum_valid is False:
self._tw(w, " BAD CHECKSUM\n", "warn")
elif af.frame.checksum_valid is True:
self._tw(w, f" Checksum: {af.frame.checksum_type} {af.frame.checksum_hex}\n", "dim")
self._tn(w)
# Protocol header breakdown
p = af.frame.payload
if len(p) >= 5:
self._tw(w, "Header breakdown:\n", "head")
self._tw(w, f" [0] CMD = {p[0]:02x}\n", "dim")
self._tw(w, f" [1] ? = {p[1]:02x}\n", "dim")
self._tw(w, f" [2] SUB = {p[2]:02x} ({af.sub_name})\n", src_tag)
self._tw(w, f" [3] OFFSET_HI = {p[3]:02x}\n", "dim")
self._tw(w, f" [4] OFFSET_LO = {p[4]:02x}\n", "dim")
if len(p) > 5:
self._tw(w, f" [5..] data = {len(p) - 5} bytes\n", "dim")
# ── Hex Dump tab ─────────────────────────────────────────────────
w = self.hex_text
self._tw(w, f"[{af.source}] SUB={sub_hex} {af.sub_name}\n", src_tag)
self._tw(w, f"Payload ({len(af.frame.payload)} bytes):\n", "dim")
self._tn(w)
dump_lines = format_hex_dump(af.frame.payload, indent=" ")
self._tw(w, "\n".join(dump_lines) + "\n", "normal")
# Annotate known field offsets within this frame
diffs_for_sess = self.state.diffs[sess_idx] if sess_idx < len(self.state.diffs) else None
if diffs_for_sess and af.header:
page_key = af.header.page_key
matching = [fd for fd in diffs_for_sess
if fd.sub == af.header.sub and fd.page_key == page_key]
if matching:
self._tn(w)
self._tw(w, "Changed bytes in this frame (vs prev session):\n", "head")
for bd in matching[0].diffs:
before_s = f"{bd.before:02x}" if bd.before >= 0 else "--"
after_s = f"{bd.after:02x}" if bd.after >= 0 else "--"
self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim")
self._tw(w, f"{before_s} -> {after_s}", "changed")
if bd.field_name:
self._tw(w, f" [{bd.field_name}]", "known")
self._tn(w)
# Switch to Hex Dump tab for frame selection
self.nb.select(1)
# ── live mode ─────────────────────────────────────────────────────────
def _toggle_live(self) -> None:
if self._live_thread and self._live_thread.is_alive():
self._live_stop.set()
self.live_btn.configure(text="Live: OFF", bg=BG3, fg=FG)
self.status_var.set("Live stopped")
else:
s3_path = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None
bw_path = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None
if not s3_path or not bw_path:
messagebox.showerror("Missing files", "Select both raw files before starting live mode.")
return
self.state.s3_path = s3_path
self.state.bw_path = bw_path
self._live_stop.clear()
self._live_thread = threading.Thread(
target=self._live_worker, args=(s3_path, bw_path), daemon=True)
self._live_thread.start()
self.live_btn.configure(text="Live: ON", bg=GREEN, fg="#000000")
self.status_var.set("Live mode running...")
def _live_worker(self, s3_path: Path, bw_path: Path) -> None:
s3_buf = bytearray()
bw_buf = bytearray()
s3_pos = bw_pos = 0
while not self._live_stop.is_set():
changed = False
if s3_path.exists():
with s3_path.open("rb") as fh:
fh.seek(s3_pos)
nb = fh.read()
if nb:
s3_buf.extend(nb); s3_pos += len(nb); changed = True
if bw_path.exists():
with bw_path.open("rb") as fh:
fh.seek(bw_pos)
nb = fh.read()
if nb:
bw_buf.extend(nb); bw_pos += len(nb); changed = True
if changed:
self._live_q.put("refresh")
time.sleep(0.1)
def _poll_live_queue(self) -> None:
try:
while True:
msg = self._live_q.get_nowait()
if msg == "refresh" and self.state.s3_path and self.state.bw_path:
self._do_analyze(self.state.s3_path, self.state.bw_path)
except queue.Empty:
pass
finally:
self.after(150, self._poll_live_queue)
# ── text helpers ──────────────────────────────────────────────────────
def _text_clear(self, w: tk.Text) -> None:
w.configure(state="normal")
w.delete("1.0", tk.END)
# leave enabled for further inserts
def _tw(self, w: tk.Text, text: str, tag: str = "normal") -> None:
"""Insert text with a colour tag."""
w.configure(state="normal")
w.insert(tk.END, text, tag)
def _tn(self, w: tk.Text) -> None:
"""Insert newline."""
w.configure(state="normal")
w.insert(tk.END, "\n")
w.configure(state="disabled")
# ──────────────────────────────────────────────────────────────────────────────
# Entry point
# ──────────────────────────────────────────────────────────────────────────────
def main() -> None:
app = AnalyzerGUI()
app.mainloop()
if __name__ == "__main__":
main()

948
parsers/s3_analyzer.py Normal file
View File

@@ -0,0 +1,948 @@
#!/usr/bin/env python3
"""
s3_analyzer.py — Live protocol analysis tool for Instantel MiniMate Plus RS-232.
Reads raw_s3.bin and raw_bw.bin (produced by s3_bridge.py), parses DLE frames,
groups into sessions, auto-diffs consecutive sessions, and annotates known fields.
Usage:
python s3_analyzer.py --s3 raw_s3.bin --bw raw_bw.bin [--live] [--outdir DIR]
"""
from __future__ import annotations
import argparse
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
# Allow running from any working directory
sys.path.insert(0, str(Path(__file__).parent))
from s3_parser import Frame, parse_bw, parse_s3 # noqa: E402
__version__ = "0.1.0"
# ──────────────────────────────────────────────────────────────────────────────
# Protocol constants
# ──────────────────────────────────────────────────────────────────────────────
# SUB_TABLE: sub_byte → (name, direction, notes)
# direction: "BW→S3", "S3→BW", or "both"
SUB_TABLE: dict[int, tuple[str, str, str]] = {
# BW→S3 read requests
0x5B: ("POLL", "BW→S3", "Keepalive / device discovery"),
0x01: ("FULL_CONFIG_READ", "BW→S3", "~0x98 bytes; firmware, model, serial, channel config"),
0x06: ("CHANNEL_CONFIG_READ", "BW→S3", "0x24 bytes; channel configuration block"),
0x08: ("EVENT_INDEX_READ", "BW→S3", "0x58 bytes; event count and record pointers"),
0x0A: ("WAVEFORM_HEADER_READ", "BW→S3", "0x30 bytes/page; waveform header keyed by timestamp"),
0x0C: ("FULL_WAVEFORM_READ", "BW→S3", "0xD2 bytes/page × 2; project strings, PPV floats"),
0x1C: ("TRIGGER_CONFIG_READ", "BW→S3", "0x2C bytes; trigger settings block"),
0x09: ("UNKNOWN_READ_A", "BW→S3", "0xCA bytes response (F6); purpose unknown"),
0x1A: ("COMPLIANCE_CONFIG_READ", "BW→S3", "Large block (E5); trigger/alarm floats, unit strings"),
0x2E: ("UNKNOWN_READ_B", "BW→S3", "0x1A bytes response (D1); purpose unknown"),
# BW→S3 write commands
0x68: ("EVENT_INDEX_WRITE", "BW→S3", "Mirrors SUB 08 read; event count and timestamps"),
0x69: ("WAVEFORM_DATA_WRITE", "BW→S3", "0xCA bytes; mirrors SUB 09"),
0x71: ("COMPLIANCE_STRINGS_WRITE", "BW→S3", "Compliance config + all project string fields"),
0x72: ("WRITE_CONFIRM_A", "BW→S3", "Short frame; commit step after 0x71"),
0x73: ("WRITE_CONFIRM_B", "BW→S3", "Short frame"),
0x74: ("WRITE_CONFIRM_C", "BW→S3", "Short frame; final session-close confirm"),
0x82: ("TRIGGER_CONFIG_WRITE", "BW→S3", "0x1C bytes; trigger config block; mirrors SUB 1C"),
0x83: ("TRIGGER_WRITE_CONFIRM", "BW→S3", "Short frame; commit step after 0x82"),
# S3→BW responses
0xA4: ("POLL_RESPONSE", "S3→BW", "Response to SUB 5B poll"),
0xFE: ("FULL_CONFIG_RESPONSE", "S3→BW", "Response to SUB 01"),
0xF9: ("CHANNEL_CONFIG_RESPONSE", "S3→BW", "Response to SUB 06"),
0xF7: ("EVENT_INDEX_RESPONSE", "S3→BW", "Response to SUB 08; contains backlight/power-save"),
0xF5: ("WAVEFORM_HEADER_RESPONSE", "S3→BW", "Response to SUB 0A"),
0xF3: ("FULL_WAVEFORM_RESPONSE", "S3→BW", "Response to SUB 0C; project strings, PPV floats"),
0xE3: ("TRIGGER_CONFIG_RESPONSE", "S3→BW", "Response to SUB 1C; contains timestamps"),
0xF6: ("UNKNOWN_RESPONSE_A", "S3→BW", "Response to SUB 09; 0xCA bytes"),
0xE5: ("COMPLIANCE_CONFIG_RESPONSE","S3→BW", "Response to SUB 1A; record time in page 2"),
0xD1: ("UNKNOWN_RESPONSE_B", "S3→BW", "Response to SUB 2E; 0x1A bytes"),
0xEA: ("SERIAL_NUMBER_RESPONSE", "S3→BW", "0x0A bytes; serial number + firmware minor version"),
# Short ack responses to writes (0xFF - write_sub)
0x8E: ("WRITE_CONFIRM_RESPONSE_71", "S3→BW", "Ack for SUB 71 COMPLIANCE_STRINGS_WRITE"),
0x8D: ("WRITE_CONFIRM_RESPONSE_72", "S3→BW", "Ack for SUB 72 WRITE_CONFIRM_A"),
0x8C: ("WRITE_CONFIRM_RESPONSE_73", "S3→BW", "Ack for SUB 73 WRITE_CONFIRM_B"),
0x8B: ("WRITE_CONFIRM_RESPONSE_74", "S3→BW", "Ack for SUB 74 WRITE_CONFIRM_C"),
0x97: ("WRITE_CONFIRM_RESPONSE_68", "S3→BW", "Ack for SUB 68 EVENT_INDEX_WRITE"),
0x96: ("WRITE_CONFIRM_RESPONSE_69", "S3→BW", "Ack for SUB 69 WAVEFORM_DATA_WRITE"),
0x7D: ("WRITE_CONFIRM_RESPONSE_82", "S3→BW", "Ack for SUB 82 TRIGGER_CONFIG_WRITE"),
0x7C: ("WRITE_CONFIRM_RESPONSE_83", "S3→BW", "Ack for SUB 83 TRIGGER_WRITE_CONFIRM"),
}
# SUBs whose data-section bytes 05 are known timestamps (suppress in diffs)
NOISY_SUBS: set[int] = {0xE3, 0xF7, 0xF5}
# E5 page 2 key: the OFFSET_HI:OFFSET_LO that identifies the data page
# E5 page 1 (length probe) has offset 0x0000; page 2 has offset 0x082A
E5_PAGE2_KEY = 0x082A
# FieldEntry: (sub, page_key_or_none, payload_offset, field_name, type_hint, notes)
# payload_offset = offset from start of Frame.payload (not data section, not wire)
# Exception: for SUB 0x82, offset [22] is from full de-stuffed payload[0] per protocol ref.
@dataclass(frozen=True)
class FieldEntry:
sub: int
page_key: Optional[int] # None = any / all pages
payload_offset: int # offset from frame.payload[0]
name: str
type_hint: str
notes: str
FIELD_MAP: list[FieldEntry] = [
# F7 (EVENT_INDEX_RESPONSE) — data section starts at payload[5]
# Protocol ref: backlight at data+0x4B = payload[5+0x4B] = payload[80]
FieldEntry(0xF7, None, 5 + 0x4B, "backlight_on_time", "uint8", "seconds; 0=off"),
FieldEntry(0xF7, None, 5 + 0x53, "power_save_timeout", "uint8", "minutes; 0=disabled"),
FieldEntry(0xF7, None, 5 + 0x54, "monitoring_lcd_cycle", "uint16 BE","65500=disabled"),
# E5 page 2 (COMPLIANCE_CONFIG_RESPONSE) — record time at data+0x28
FieldEntry(0xE5, E5_PAGE2_KEY, 5 + 0x28, "record_time", "float32 BE", "seconds; 7s=40E00000, 13s=41500000"),
# SUB 0x82 (TRIGGER_CONFIG_WRITE) — BW→S3 write
# Protocol ref offset [22] is from the de-stuffed payload[0], confirmed from raw_bw.bin
FieldEntry(0x82, None, 22, "trigger_sample_width", "uint8", "samples; mode-gated, BW-side write only"),
]
# ──────────────────────────────────────────────────────────────────────────────
# Data structures
# ──────────────────────────────────────────────────────────────────────────────
@dataclass
class FrameHeader:
cmd: int
sub: int
offset_hi: int
offset_lo: int
flags: int
@property
def page_key(self) -> int:
return (self.offset_hi << 8) | self.offset_lo
@dataclass
class AnnotatedFrame:
frame: Frame
source: str # "BW" or "S3"
header: Optional[FrameHeader] # None if payload < 7 bytes (malformed/short)
sub_name: str
session_idx: int = -1
@dataclass
class Session:
index: int
bw_frames: list[AnnotatedFrame]
s3_frames: list[AnnotatedFrame]
@property
def all_frames(self) -> list[AnnotatedFrame]:
"""Interleave BW/S3 in synchronous protocol order: BW[0], S3[0], BW[1], S3[1]..."""
result: list[AnnotatedFrame] = []
for i in range(max(len(self.bw_frames), len(self.s3_frames))):
if i < len(self.bw_frames):
result.append(self.bw_frames[i])
if i < len(self.s3_frames):
result.append(self.s3_frames[i])
return result
@dataclass
class ByteDiff:
payload_offset: int
before: int
after: int
field_name: Optional[str]
@dataclass
class FrameDiff:
sub: int
page_key: int
sub_name: str
diffs: list[ByteDiff]
# ──────────────────────────────────────────────────────────────────────────────
# Parsing helpers
# ──────────────────────────────────────────────────────────────────────────────
def extract_header(payload: bytes) -> Optional[FrameHeader]:
"""
Extract protocol header from de-stuffed payload.
After de-stuffing, the actual observed layout is 5 bytes:
[0] CMD -- 0x10 for BW requests, 0x00 for S3 responses
[1] ? -- 0x00 for BW, 0x10 for S3 (DLE/ADDR byte that survives de-stuffing)
[2] SUB -- the actual command/response identifier
[3] OFFSET_HI
[4] OFFSET_LO
Data section begins at payload[5].
Note: The protocol reference describes a 7-byte header with CMD/DLE/ADDR/FLAGS/SUB/...,
but DLE+ADDR (both 0x10 on wire) are de-stuffed into single bytes by parse_bw/parse_s3,
collapsing the observable header to 5 bytes.
"""
if len(payload) < 5:
return None
return FrameHeader(
cmd=payload[0],
sub=payload[2],
offset_hi=payload[3],
offset_lo=payload[4],
flags=payload[1],
)
def annotate_frame(frame: Frame, source: str) -> AnnotatedFrame:
header = extract_header(frame.payload)
if header is not None:
entry = SUB_TABLE.get(header.sub)
sub_name = entry[0] if entry else f"UNKNOWN_{header.sub:02X}"
else:
sub_name = "MALFORMED"
return AnnotatedFrame(frame=frame, source=source, header=header, sub_name=sub_name)
def annotate_frames(frames: list[Frame], source: str) -> list[AnnotatedFrame]:
return [annotate_frame(f, source) for f in frames]
def load_and_annotate(s3_path: Path, bw_path: Path) -> tuple[list[AnnotatedFrame], list[AnnotatedFrame]]:
"""Parse both raw files and return annotated frame lists."""
s3_blob = s3_path.read_bytes() if s3_path.exists() else b""
bw_blob = bw_path.read_bytes() if bw_path.exists() else b""
s3_frames = parse_s3(s3_blob, trailer_len=0)
bw_frames = parse_bw(bw_blob, trailer_len=0, validate_checksum=True)
return annotate_frames(s3_frames, "S3"), annotate_frames(bw_frames, "BW")
# ──────────────────────────────────────────────────────────────────────────────
# Session detection
# ──────────────────────────────────────────────────────────────────────────────
# BW SUB that marks the end of a compliance write session
SESSION_CLOSE_SUB = 0x74
def split_into_sessions(
bw_annotated: list[AnnotatedFrame],
s3_annotated: list[AnnotatedFrame],
) -> list[Session]:
"""
Split frames into sessions. A session ends on BW SUB 0x74 (WRITE_CONFIRM_C).
New session starts at the stream beginning and after each 0x74.
The protocol is synchronous: BW[i] request → S3[i] response. S3 frame i
belongs to the same session as BW frame i.
"""
if not bw_annotated and not s3_annotated:
return []
sessions: list[Session] = []
session_idx = 0
bw_start = 0
# Track where we are in S3 frames — they mirror BW frame count per session
s3_cursor = 0
i = 0
while i < len(bw_annotated):
frame = bw_annotated[i]
i += 1
is_close = (
frame.header is not None and frame.header.sub == SESSION_CLOSE_SUB
)
if is_close:
bw_slice = bw_annotated[bw_start:i]
# S3 frames in this session match BW frame count (synchronous protocol)
n_s3 = len(bw_slice)
s3_slice = s3_annotated[s3_cursor : s3_cursor + n_s3]
s3_cursor += n_s3
sess = Session(index=session_idx, bw_frames=bw_slice, s3_frames=s3_slice)
for f in sess.all_frames:
f.session_idx = session_idx
sessions.append(sess)
session_idx += 1
bw_start = i
# Remaining frames (in-progress / no closing 0x74 yet)
if bw_start < len(bw_annotated) or s3_cursor < len(s3_annotated):
bw_slice = bw_annotated[bw_start:]
n_s3 = len(bw_slice)
s3_slice = s3_annotated[s3_cursor : s3_cursor + n_s3]
# also grab any extra S3 frames beyond expected pairing
if s3_cursor + n_s3 < len(s3_annotated):
s3_slice = s3_annotated[s3_cursor:]
if bw_slice or s3_slice:
sess = Session(index=session_idx, bw_frames=bw_slice, s3_frames=s3_slice)
for f in sess.all_frames:
f.session_idx = session_idx
sessions.append(sess)
return sessions
# ──────────────────────────────────────────────────────────────────────────────
# Diff engine
# ──────────────────────────────────────────────────────────────────────────────
def _mask_noisy(sub: int, data: bytes) -> bytearray:
"""
Zero out known-noisy byte ranges before diffing.
For NOISY_SUBS: mask bytes 05 of the data section (timestamps).
"""
buf = bytearray(data)
if sub in NOISY_SUBS and len(buf) >= 6:
for k in range(6):
buf[k] = 0x00
return buf
HEADER_LEN = 5 # Observed de-stuffed header size: CMD + ? + SUB + OFFSET_HI + OFFSET_LO
def _get_data_section(af: AnnotatedFrame) -> bytes:
"""
Return the data section of the frame (after the 5-byte protocol header).
For S3 frames, payload still contains a trailing SUM8 byte — exclude it.
For BW frames, parse_bw with validate_checksum=True already stripped it.
"""
payload = af.frame.payload
if len(payload) < HEADER_LEN:
return b""
data = payload[HEADER_LEN:]
if af.source == "S3" and len(data) >= 1:
# SUM8 is still present at end of S3 frame payload
data = data[:-1]
return data
def lookup_field_name(sub: int, page_key: int, payload_offset: int) -> Optional[str]:
"""Return field name if the given payload offset matches a known field, else None."""
for entry in FIELD_MAP:
if entry.sub != sub:
continue
if entry.page_key is not None and entry.page_key != page_key:
continue
if entry.payload_offset == payload_offset:
return entry.name
return None
def diff_sessions(sess_a: Session, sess_b: Session) -> list[FrameDiff]:
"""
Compare two sessions frame-by-frame, matched by (sub, page_key).
Returns a list of FrameDiff for SUBs where bytes changed.
"""
# Build lookup: (sub, page_key) → AnnotatedFrame for each session
def index_session(sess: Session) -> dict[tuple[int, int], AnnotatedFrame]:
idx: dict[tuple[int, int], AnnotatedFrame] = {}
for af in sess.all_frames:
if af.header is None:
continue
key = (af.header.sub, af.header.page_key)
# Keep first occurrence per key (or we could keep all — for now, first)
if key not in idx:
idx[key] = af
return idx
idx_a = index_session(sess_a)
idx_b = index_session(sess_b)
results: list[FrameDiff] = []
# Only compare SUBs present in both sessions
common_keys = set(idx_a.keys()) & set(idx_b.keys())
for key in sorted(common_keys):
sub, page_key = key
af_a = idx_a[key]
af_b = idx_b[key]
data_a = _mask_noisy(sub, _get_data_section(af_a))
data_b = _mask_noisy(sub, _get_data_section(af_b))
if data_a == data_b:
continue
# Compare byte by byte up to the shorter length
diffs: list[ByteDiff] = []
max_len = max(len(data_a), len(data_b))
for offset in range(max_len):
byte_a = data_a[offset] if offset < len(data_a) else None
byte_b = data_b[offset] if offset < len(data_b) else None
if byte_a != byte_b:
# payload_offset = data_section_offset + HEADER_LEN
payload_off = offset + HEADER_LEN
field = lookup_field_name(sub, page_key, payload_off)
diffs.append(ByteDiff(
payload_offset=payload_off,
before=byte_a if byte_a is not None else -1,
after=byte_b if byte_b is not None else -1,
field_name=field,
))
if diffs:
entry = SUB_TABLE.get(sub)
sub_name = entry[0] if entry else f"UNKNOWN_{sub:02X}"
results.append(FrameDiff(sub=sub, page_key=page_key, sub_name=sub_name, diffs=diffs))
return results
# ──────────────────────────────────────────────────────────────────────────────
# Report rendering
# ──────────────────────────────────────────────────────────────────────────────
def format_hex_dump(data: bytes, indent: str = " ") -> list[str]:
"""Compact 16-bytes-per-line hex dump. Returns list of lines."""
lines = []
for row_start in range(0, len(data), 16):
chunk = data[row_start:row_start + 16]
hex_part = " ".join(f"{b:02x}" for b in chunk)
lines.append(f"{indent}{row_start:04x}: {hex_part}")
return lines
def render_session_report(
session: Session,
diffs: Optional[list[FrameDiff]],
prev_session_index: Optional[int],
) -> str:
lines: list[str] = []
n_bw = len(session.bw_frames)
n_s3 = len(session.s3_frames)
total = n_bw + n_s3
is_complete = any(
af.header is not None and af.header.sub == SESSION_CLOSE_SUB
for af in session.bw_frames
)
status = "" if is_complete else " [IN PROGRESS]"
lines.append(f"{'='*72}")
lines.append(f"SESSION {session.index}{status}")
lines.append(f"{'='*72}")
lines.append(f"Frames: {total} (BW: {n_bw}, S3: {n_s3})")
if n_bw != n_s3:
lines.append(f" WARNING: BW/S3 frame count mismatch — protocol sync issue?")
lines.append("")
# ── Frame inventory ──────────────────────────────────────────────────────
lines.append("FRAME INVENTORY")
for seq_i, af in enumerate(session.all_frames):
if af.header is not None:
sub_hex = f"{af.header.sub:02X}"
page_str = f" (page {af.header.page_key:04X})" if af.header.page_key != 0 else ""
else:
sub_hex = "??"
page_str = ""
chk = ""
if af.frame.checksum_valid is False:
chk = " [BAD CHECKSUM]"
elif af.frame.checksum_valid is True:
chk = f" [{af.frame.checksum_type}]"
lines.append(
f" [{af.source}] #{seq_i:<3} SUB={sub_hex} {af.sub_name:<30}{page_str}"
f" len={len(af.frame.payload)}{chk}"
)
lines.append("")
# ── Hex dumps ────────────────────────────────────────────────────────────
lines.append("HEX DUMPS")
for seq_i, af in enumerate(session.all_frames):
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
lines.append(f" [{af.source}] #{seq_i} SUB={sub_hex} {af.sub_name}")
dump_lines = format_hex_dump(af.frame.payload, indent=" ")
if dump_lines:
lines.extend(dump_lines)
else:
lines.append(" (empty payload)")
lines.append("")
# ── Diff section ─────────────────────────────────────────────────────────
if diffs is not None:
if prev_session_index is not None:
lines.append(f"DIFF vs SESSION {prev_session_index}")
else:
lines.append("DIFF")
if not diffs:
lines.append(" (no changes)")
else:
for fd in diffs:
page_str = f" (page {fd.page_key:04X})" if fd.page_key != 0 else ""
lines.append(f" SUB {fd.sub:02X} ({fd.sub_name}){page_str}:")
for bd in fd.diffs:
field_str = f" [{bd.field_name}]" if bd.field_name else ""
before_str = f"{bd.before:02x}" if bd.before >= 0 else "--"
after_str = f"{bd.after:02x}" if bd.after >= 0 else "--"
lines.append(
f" offset [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: "
f"{before_str} -> {after_str}{field_str}"
)
lines.append("")
return "\n".join(lines) + "\n"
def write_report(session: Session, report_text: str, outdir: Path) -> Path:
outdir.mkdir(parents=True, exist_ok=True)
out_path = outdir / f"session_{session.index:03d}.report"
out_path.write_text(report_text, encoding="utf-8")
return out_path
# ──────────────────────────────────────────────────────────────────────────────
# Claude export
# ──────────────────────────────────────────────────────────────────────────────
def _hex_block(data: bytes, bytes_per_row: int = 16) -> list[str]:
"""Hex dump with offset + hex + ASCII columns."""
lines = []
for row in range(0, len(data), bytes_per_row):
chunk = data[row:row + bytes_per_row]
hex_col = " ".join(f"{b:02x}" for b in chunk)
hex_col = f"{hex_col:<{bytes_per_row * 3 - 1}}"
asc_col = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
lines.append(f" {row:04x} {hex_col} |{asc_col}|")
return lines
def render_claude_export(
sessions: list[Session],
diffs: list[Optional[list[FrameDiff]]],
s3_path: Optional[Path] = None,
bw_path: Optional[Path] = None,
) -> str:
"""
Produce a single self-contained Markdown file suitable for pasting into
a Claude conversation for protocol reverse-engineering assistance.
Structure:
1. Context block — what this is, protocol background, field map
2. Capture summary — session count, frame counts, what changed
3. Per-diff section — one section per session pair that had changes:
a. Diff table (before/after bytes, known field labels)
b. Full hex dumps of ONLY the frames that changed
4. Full hex dumps of all frames in sessions with no prior comparison
(session 0 baseline)
"""
import datetime
lines: list[str] = []
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
s3_name = s3_path.name if s3_path else "raw_s3.bin"
bw_name = bw_path.name if bw_path else "raw_bw.bin"
# ── 1. Context block ──────────────────────────────────────────────────
lines += [
f"# Instantel MiniMate Plus — Protocol Capture Analysis",
f"Generated: {now} | Source: `{s3_name}` + `{bw_name}`",
"",
"## Protocol Background",
"",
"This file contains parsed RS-232 captures from an Instantel MiniMate Plus",
"seismograph communicating with Blastware PC software at 38400 baud 8N1.",
"",
"**Frame structure (de-stuffed payload):**",
"```",
" [0] CMD 0x10 = BW request, 0x00 = S3 response",
" [1] ? 0x00 (BW) or 0x10 (S3)",
" [2] SUB Command/response identifier (key field)",
" [3] OFFSET_HI Page offset high byte",
" [4] OFFSET_LO Page offset low byte",
" [5+] DATA Payload data section",
"```",
"",
"**Response SUB rule:** response_SUB = 0xFF - request_SUB (confirmed, no exceptions observed)",
"",
"**Known field map** (offsets from payload[0]):",
"```",
" SUB F7 (EVENT_INDEX_RESPONSE):",
" [80] 0x52 backlight_on_time uint8 seconds",
" [88] 0x58 power_save_timeout uint8 minutes",
" [89] 0x59 monitoring_lcd_cycle uint16BE 65500=disabled",
" SUB E5 page 0x082A (COMPLIANCE_CONFIG_RESPONSE):",
" [45] 0x2D record_time float32BE seconds (7s=40E00000, 13s=41500000)",
" SUB 82 (TRIGGER_CONFIG_WRITE, BW-side only):",
" [22] trigger_sample_width uint8 samples",
"```",
"",
"**Session boundary:** a compliance session ends when BW sends SUB 0x74 (WRITE_CONFIRM_C).",
"Sessions are numbered from 0. The diff compares consecutive complete sessions.",
"",
]
# ── 2. Capture summary ────────────────────────────────────────────────
lines += ["## Capture Summary", ""]
lines.append(f"Sessions found: {len(sessions)}")
for sess in sessions:
is_complete = any(
af.header is not None and af.header.sub == SESSION_CLOSE_SUB
for af in sess.bw_frames
)
status = "complete" if is_complete else "partial/in-progress"
n_bw, n_s3 = len(sess.bw_frames), len(sess.s3_frames)
changed = len(diffs[sess.index] or []) if sess.index < len(diffs) else 0
changed_str = f" ({changed} SUBs changed vs prev)" if sess.index > 0 else " (baseline)"
lines.append(f" Session {sess.index} [{status}]: BW={n_bw} S3={n_s3} frames{changed_str}")
lines.append("")
# ── 3. Per-diff sections ──────────────────────────────────────────────
any_diffs = False
for sess in sessions:
sess_diffs = diffs[sess.index] if sess.index < len(diffs) else None
if sess_diffs is None or sess.index == 0:
continue
any_diffs = True
prev_idx = sess.index - 1
lines += [
f"---",
f"## Diff: Session {prev_idx} -> Session {sess.index}",
"",
]
if not sess_diffs:
lines.append("_No byte changes detected between these sessions._")
lines.append("")
continue
# Build index of changed frames for this session (and prev)
prev_sess = sessions[prev_idx] if prev_idx < len(sessions) else None
for fd in sess_diffs:
page_str = f" page 0x{fd.page_key:04X}" if fd.page_key != 0 else ""
lines += [
f"### SUB {fd.sub:02X}{fd.sub_name}{page_str}",
"",
]
# Diff table
known_count = sum(1 for bd in fd.diffs if bd.field_name)
unknown_count = sum(1 for bd in fd.diffs if not bd.field_name)
lines.append(
f"Changed bytes: **{len(fd.diffs)}** total "
f"({known_count} known fields, {unknown_count} unknown)"
)
lines.append("")
lines.append("| Offset | Hex | Dec | Session {0} | Session {1} | Field |".format(prev_idx, sess.index))
lines.append("|--------|-----|-----|" + "-" * 12 + "|" + "-" * 12 + "|-------|")
for bd in fd.diffs:
before_s = f"`{bd.before:02x}`" if bd.before >= 0 else "`--`"
after_s = f"`{bd.after:02x}`" if bd.after >= 0 else "`--`"
before_d = str(bd.before) if bd.before >= 0 else "--"
after_d = str(bd.after) if bd.after >= 0 else "--"
field = f"`{bd.field_name}`" if bd.field_name else "**UNKNOWN**"
lines.append(
f"| [{bd.payload_offset}] 0x{bd.payload_offset:04X} "
f"| {before_s}->{after_s} | {before_d}->{after_d} "
f"| {before_s} | {after_s} | {field} |"
)
lines.append("")
# Hex dumps of the changed frame in both sessions
def _find_af(target_sess: Session, sub: int, page_key: int) -> Optional[AnnotatedFrame]:
for af in target_sess.all_frames:
if af.header and af.header.sub == sub and af.header.page_key == page_key:
return af
return None
af_prev = _find_af(sessions[prev_idx], fd.sub, fd.page_key) if prev_sess else None
af_curr = _find_af(sess, fd.sub, fd.page_key)
lines.append("**Hex dumps (full de-stuffed payload):**")
lines.append("")
for label, af in [(f"Session {prev_idx} (before)", af_prev),
(f"Session {sess.index} (after)", af_curr)]:
if af is None:
lines.append(f"_{label}: frame not found_")
lines.append("")
continue
lines.append(f"_{label}_ — {len(af.frame.payload)} bytes:")
lines.append("```")
lines += _hex_block(af.frame.payload)
lines.append("```")
lines.append("")
if not any_diffs:
lines += [
"---",
"## Diffs",
"",
"_Only one session found — no diff available. "
"Run a second capture with changed settings to see what moves._",
"",
]
# ── 4. Baseline hex dumps (session 0, all frames) ─────────────────────
if sessions:
baseline = sessions[0]
lines += [
"---",
f"## Baseline — Session 0 (all frames)",
"",
"Full hex dump of every frame in the first session.",
"Use this to map field positions from known values.",
"",
]
for seq_i, af in enumerate(baseline.all_frames):
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
page_str = f" page 0x{af.header.page_key:04X}" if af.header and af.header.page_key != 0 else ""
chk_str = f" [{af.frame.checksum_type}]" if af.frame.checksum_valid else ""
lines.append(
f"### [{af.source}] #{seq_i} SUB {sub_hex}{af.sub_name}{page_str}{chk_str}"
)
lines.append(f"_{len(af.frame.payload)} bytes_")
lines.append("```")
lines += _hex_block(af.frame.payload)
lines.append("```")
lines.append("")
lines += [
"---",
"_End of analysis. To map an unknown field: change exactly one setting in Blastware,_",
"_capture again, run the analyzer, and look for the offset that moved._",
]
return "\n".join(lines) + "\n"
def write_claude_export(
sessions: list[Session],
diffs: list[Optional[list[FrameDiff]]],
outdir: Path,
s3_path: Optional[Path] = None,
bw_path: Optional[Path] = None,
) -> Path:
import datetime
outdir.mkdir(parents=True, exist_ok=True)
stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
out_path = outdir / f"claude_export_{stamp}.md"
out_path.write_text(
render_claude_export(sessions, diffs, s3_path, bw_path),
encoding="utf-8"
)
return out_path
# ──────────────────────────────────────────────────────────────────────────────
# Post-processing mode
# ──────────────────────────────────────────────────────────────────────────────
def run_postprocess(s3_path: Path, bw_path: Path, outdir: Path, export: bool = False) -> None:
print(f"s3_analyzer v{__version__}")
print(f" S3 file : {s3_path}")
print(f" BW file : {bw_path}")
print(f" Out dir : {outdir}")
print()
s3_frames, bw_frames = load_and_annotate(s3_path, bw_path)
print(f"Parsed: {len(s3_frames)} S3 frames, {len(bw_frames)} BW frames")
sessions = split_into_sessions(bw_frames, s3_frames)
print(f"Sessions: {len(sessions)}")
print()
all_diffs: list[Optional[list[FrameDiff]]] = [None]
prev_session: Optional[Session] = None
for sess in sessions:
sess_diffs: Optional[list[FrameDiff]] = None
prev_idx: Optional[int] = None
if prev_session is not None:
sess_diffs = diff_sessions(prev_session, sess)
prev_idx = prev_session.index
all_diffs.append(sess_diffs)
report = render_session_report(sess, sess_diffs, prev_idx)
out_path = write_report(sess, report, outdir)
n_diffs = len(sess_diffs) if sess_diffs else 0
print(f" Session {sess.index}: {len(sess.all_frames)} frames, {n_diffs} changed SUBs -> {out_path.name}")
prev_session = sess
if export:
export_path = write_claude_export(sessions, all_diffs, outdir, s3_path, bw_path)
print(f"\n Claude export -> {export_path.name}")
print()
print(f"Reports written to: {outdir}")
# ──────────────────────────────────────────────────────────────────────────────
# Live mode
# ──────────────────────────────────────────────────────────────────────────────
def live_loop(
s3_path: Path,
bw_path: Path,
outdir: Path,
poll_interval: float = 0.05,
) -> None:
"""
Tail both raw files continuously, re-parsing on new bytes.
Emits a session report as soon as BW SUB 0x74 is detected.
"""
print(f"s3_analyzer v{__version__} — LIVE MODE")
print(f" S3 file : {s3_path}")
print(f" BW file : {bw_path}")
print(f" Out dir : {outdir}")
print(f" Poll : {poll_interval*1000:.0f}ms")
print("Waiting for frames... (Ctrl+C to stop)")
print()
s3_buf = bytearray()
bw_buf = bytearray()
s3_pos = 0
bw_pos = 0
last_s3_count = 0
last_bw_count = 0
sessions: list[Session] = []
prev_complete_session: Optional[Session] = None
try:
while True:
# Read new bytes from both files
changed = False
if s3_path.exists():
with s3_path.open("rb") as fh:
fh.seek(s3_pos)
new_bytes = fh.read()
if new_bytes:
s3_buf.extend(new_bytes)
s3_pos += len(new_bytes)
changed = True
if bw_path.exists():
with bw_path.open("rb") as fh:
fh.seek(bw_pos)
new_bytes = fh.read()
if new_bytes:
bw_buf.extend(new_bytes)
bw_pos += len(new_bytes)
changed = True
if changed:
s3_frames_raw = parse_s3(bytes(s3_buf), trailer_len=0)
bw_frames_raw = parse_bw(bytes(bw_buf), trailer_len=0, validate_checksum=True)
s3_annotated = annotate_frames(s3_frames_raw, "S3")
bw_annotated = annotate_frames(bw_frames_raw, "BW")
new_s3 = len(s3_annotated) - last_s3_count
new_bw = len(bw_annotated) - last_bw_count
if new_s3 > 0 or new_bw > 0:
last_s3_count = len(s3_annotated)
last_bw_count = len(bw_annotated)
print(f"[+] S3:{len(s3_annotated)} BW:{len(bw_annotated)} frames", end="")
# Annotate newest BW frame
if bw_annotated:
latest_bw = bw_annotated[-1]
sub_str = f"SUB={latest_bw.header.sub:02X}" if latest_bw.header else "SUB=??"
print(f" latest BW {sub_str} {latest_bw.sub_name}", end="")
print()
# Check for session close
all_sessions = split_into_sessions(bw_annotated, s3_annotated)
# A complete session has the closing 0x74
complete_sessions = [
s for s in all_sessions
if any(
af.header is not None and af.header.sub == SESSION_CLOSE_SUB
for af in s.bw_frames
)
]
# Emit reports for newly completed sessions
for sess in complete_sessions[len(sessions):]:
diffs: Optional[list[FrameDiff]] = None
prev_idx: Optional[int] = None
if prev_complete_session is not None:
diffs = diff_sessions(prev_complete_session, sess)
prev_idx = prev_complete_session.index
report = render_session_report(sess, diffs, prev_idx)
out_path = write_report(sess, report, outdir)
n_diffs = len(diffs) if diffs else 0
print(f"\n [+] Session {sess.index} complete: {len(sess.all_frames)} frames, "
f"{n_diffs} changed SUBs -> {out_path.name}\n")
prev_complete_session = sess
sessions = complete_sessions
time.sleep(poll_interval)
except KeyboardInterrupt:
print("\nStopped.")
# Emit any in-progress (incomplete) session as a partial report
if s3_buf or bw_buf:
s3_frames_raw = parse_s3(bytes(s3_buf), trailer_len=0)
bw_frames_raw = parse_bw(bytes(bw_buf), trailer_len=0, validate_checksum=True)
s3_annotated = annotate_frames(s3_frames_raw, "S3")
bw_annotated = annotate_frames(bw_frames_raw, "BW")
all_sessions = split_into_sessions(bw_annotated, s3_annotated)
incomplete = [
s for s in all_sessions
if not any(
af.header is not None and af.header.sub == SESSION_CLOSE_SUB
for af in s.bw_frames
)
]
for sess in incomplete:
report = render_session_report(sess, diffs=None, prev_session_index=None)
out_path = write_report(sess, report, outdir)
print(f" Partial session {sess.index} written -> {out_path.name}")
# ──────────────────────────────────────────────────────────────────────────────
# CLI
# ──────────────────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(
description="s3_analyzer — Instantel MiniMate Plus live protocol analyzer"
)
ap.add_argument("--s3", type=Path, required=True, help="Path to raw_s3.bin (S3→BW raw capture)")
ap.add_argument("--bw", type=Path, required=True, help="Path to raw_bw.bin (BW→S3 raw capture)")
ap.add_argument("--live", action="store_true", help="Live mode: tail files as they grow")
ap.add_argument("--export", action="store_true", help="Also write a claude_export_<ts>.md file for Claude analysis")
ap.add_argument("--outdir", type=Path, default=None, help="Output directory for .report files (default: same as input)")
ap.add_argument("--poll", type=float, default=0.05, help="Live mode poll interval in seconds (default: 0.05)")
args = ap.parse_args()
outdir = args.outdir
if outdir is None:
outdir = args.s3.parent
if args.live:
live_loop(args.s3, args.bw, outdir, poll_interval=args.poll)
else:
if not args.s3.exists():
print(f"ERROR: S3 file not found: {args.s3}", file=sys.stderr)
sys.exit(1)
if not args.bw.exists():
print(f"ERROR: BW file not found: {args.bw}", file=sys.stderr)
sys.exit(1)
run_postprocess(args.s3, args.bw, outdir, export=args.export)
if __name__ == "__main__":
main()