#!/usr/bin/env python3 """ s3_analyzer.py — Live protocol analysis tool for Instantel MiniMate Plus RS-232. Reads raw_s3.bin and raw_bw.bin (produced by s3_bridge.py), parses DLE frames, groups into sessions, auto-diffs consecutive sessions, and annotates known fields. Usage: python s3_analyzer.py --s3 raw_s3.bin --bw raw_bw.bin [--live] [--outdir DIR] """ from __future__ import annotations import argparse import struct import sys import time from dataclasses import dataclass from pathlib import Path from typing import Optional # Allow running from any working directory sys.path.insert(0, str(Path(__file__).parent)) from s3_parser import Frame, parse_bw, parse_s3 # noqa: E402 __version__ = "0.1.0" # ────────────────────────────────────────────────────────────────────────────── # Protocol constants # ────────────────────────────────────────────────────────────────────────────── # SUB_TABLE: sub_byte → (name, direction, notes) # direction: "BW→S3", "S3→BW", or "both" SUB_TABLE: dict[int, tuple[str, str, str]] = { # BW→S3 read requests 0x5B: ("POLL", "BW→S3", "Keepalive / device discovery"), 0x01: ("FULL_CONFIG_READ", "BW→S3", "~0x98 bytes; firmware, model, serial, channel config"), 0x06: ("CHANNEL_CONFIG_READ", "BW→S3", "0x24 bytes; channel configuration block"), 0x08: ("EVENT_INDEX_READ", "BW→S3", "0x58 bytes; event count and record pointers"), 0x0A: ("WAVEFORM_HEADER_READ", "BW→S3", "0x30 bytes/page; waveform header keyed by timestamp"), 0x0C: ("FULL_WAVEFORM_READ", "BW→S3", "0xD2 bytes/page × 2; project strings, PPV floats"), 0x1C: ("TRIGGER_CONFIG_READ", "BW→S3", "0x2C bytes; trigger settings block"), 0x09: ("UNKNOWN_READ_A", "BW→S3", "0xCA bytes response (F6); purpose unknown"), 0x1A: ("COMPLIANCE_CONFIG_READ", "BW→S3", "Large block (E5); trigger/alarm floats, unit strings"), 0x2E: ("UNKNOWN_READ_B", "BW→S3", "0x1A bytes response (D1); purpose unknown"), # BW→S3 write commands 0x68: ("EVENT_INDEX_WRITE", "BW→S3", "Mirrors SUB 08 read; event count and timestamps"), 0x69: ("WAVEFORM_DATA_WRITE", "BW→S3", "0xCA bytes; mirrors SUB 09"), 0x71: ("COMPLIANCE_STRINGS_WRITE", "BW→S3", "Compliance config + all project string fields"), 0x72: ("WRITE_CONFIRM_A", "BW→S3", "Short frame; commit step after 0x71"), 0x73: ("WRITE_CONFIRM_B", "BW→S3", "Short frame"), 0x74: ("WRITE_CONFIRM_C", "BW→S3", "Short frame; final session-close confirm"), 0x82: ("TRIGGER_CONFIG_WRITE", "BW→S3", "0x1C bytes; trigger config block; mirrors SUB 1C"), 0x83: ("TRIGGER_WRITE_CONFIRM", "BW→S3", "Short frame; commit step after 0x82"), # S3→BW responses 0xA4: ("POLL_RESPONSE", "S3→BW", "Response to SUB 5B poll"), 0xFE: ("FULL_CONFIG_RESPONSE", "S3→BW", "Response to SUB 01"), 0xF9: ("CHANNEL_CONFIG_RESPONSE", "S3→BW", "Response to SUB 06"), 0xF7: ("EVENT_INDEX_RESPONSE", "S3→BW", "Response to SUB 08; contains backlight/power-save"), 0xF5: ("WAVEFORM_HEADER_RESPONSE", "S3→BW", "Response to SUB 0A"), 0xF3: ("FULL_WAVEFORM_RESPONSE", "S3→BW", "Response to SUB 0C; project strings, PPV floats"), 0xE3: ("TRIGGER_CONFIG_RESPONSE", "S3→BW", "Response to SUB 1C; contains timestamps"), 0xF6: ("UNKNOWN_RESPONSE_A", "S3→BW", "Response to SUB 09; 0xCA bytes"), 0xE5: ("COMPLIANCE_CONFIG_RESPONSE","S3→BW", "Response to SUB 1A; record time in page 2"), 0xD1: ("UNKNOWN_RESPONSE_B", "S3→BW", "Response to SUB 2E; 0x1A bytes"), 0xEA: ("SERIAL_NUMBER_RESPONSE", "S3→BW", "0x0A bytes; serial number + firmware minor version"), # Short ack responses to writes (0xFF - write_sub) 0x8E: ("WRITE_CONFIRM_RESPONSE_71", "S3→BW", "Ack for SUB 71 COMPLIANCE_STRINGS_WRITE"), 0x8D: ("WRITE_CONFIRM_RESPONSE_72", "S3→BW", "Ack for SUB 72 WRITE_CONFIRM_A"), 0x8C: ("WRITE_CONFIRM_RESPONSE_73", "S3→BW", "Ack for SUB 73 WRITE_CONFIRM_B"), 0x8B: ("WRITE_CONFIRM_RESPONSE_74", "S3→BW", "Ack for SUB 74 WRITE_CONFIRM_C"), 0x97: ("WRITE_CONFIRM_RESPONSE_68", "S3→BW", "Ack for SUB 68 EVENT_INDEX_WRITE"), 0x96: ("WRITE_CONFIRM_RESPONSE_69", "S3→BW", "Ack for SUB 69 WAVEFORM_DATA_WRITE"), 0x7D: ("WRITE_CONFIRM_RESPONSE_82", "S3→BW", "Ack for SUB 82 TRIGGER_CONFIG_WRITE"), 0x7C: ("WRITE_CONFIRM_RESPONSE_83", "S3→BW", "Ack for SUB 83 TRIGGER_WRITE_CONFIRM"), } # SUBs whose data-section bytes 0–5 are known timestamps (suppress in diffs) NOISY_SUBS: set[int] = {0xE3, 0xF7, 0xF5} # E5 page 2 key: the OFFSET_HI:OFFSET_LO that identifies the data page # E5 page 1 (length probe) has offset 0x0000; page 2 has offset 0x082A E5_PAGE2_KEY = 0x082A # FieldEntry: (sub, page_key_or_none, payload_offset, field_name, type_hint, notes) # payload_offset = offset from start of Frame.payload (not data section, not wire) # Exception: for SUB 0x82, offset [22] is from full de-stuffed payload[0] per protocol ref. @dataclass(frozen=True) class FieldEntry: sub: int page_key: Optional[int] # None = any / all pages payload_offset: int # offset from frame.payload[0] name: str type_hint: str notes: str FIELD_MAP: list[FieldEntry] = [ # F7 (EVENT_INDEX_RESPONSE) — data section starts at payload[5] # Protocol ref: backlight at data+0x4B = payload[5+0x4B] = payload[80] FieldEntry(0xF7, None, 5 + 0x4B, "backlight_on_time", "uint8", "seconds; 0=off"), FieldEntry(0xF7, None, 5 + 0x53, "power_save_timeout", "uint8", "minutes; 0=disabled"), FieldEntry(0xF7, None, 5 + 0x54, "monitoring_lcd_cycle", "uint16 BE","65500=disabled"), # E5 page 2 (COMPLIANCE_CONFIG_RESPONSE) — record time at data+0x28 FieldEntry(0xE5, E5_PAGE2_KEY, 5 + 0x28, "record_time", "float32 BE", "seconds; 7s=40E00000, 13s=41500000"), # SUB 0x82 (TRIGGER_CONFIG_WRITE) — BW→S3 write # Protocol ref offset [22] is from the de-stuffed payload[0], confirmed from raw_bw.bin FieldEntry(0x82, None, 22, "trigger_sample_width", "uint8", "samples; mode-gated, BW-side write only"), ] # ────────────────────────────────────────────────────────────────────────────── # Data structures # ────────────────────────────────────────────────────────────────────────────── @dataclass class FrameHeader: cmd: int sub: int offset_hi: int offset_lo: int flags: int @property def page_key(self) -> int: return (self.offset_hi << 8) | self.offset_lo @dataclass class AnnotatedFrame: frame: Frame source: str # "BW" or "S3" header: Optional[FrameHeader] # None if payload < 7 bytes (malformed/short) sub_name: str session_idx: int = -1 @dataclass class Session: index: int bw_frames: list[AnnotatedFrame] s3_frames: list[AnnotatedFrame] @property def all_frames(self) -> list[AnnotatedFrame]: """Interleave BW/S3 in synchronous protocol order: BW[0], S3[0], BW[1], S3[1]...""" result: list[AnnotatedFrame] = [] for i in range(max(len(self.bw_frames), len(self.s3_frames))): if i < len(self.bw_frames): result.append(self.bw_frames[i]) if i < len(self.s3_frames): result.append(self.s3_frames[i]) return result @dataclass class ByteDiff: payload_offset: int before: int after: int field_name: Optional[str] @dataclass class FrameDiff: sub: int page_key: int sub_name: str diffs: list[ByteDiff] # ────────────────────────────────────────────────────────────────────────────── # Parsing helpers # ────────────────────────────────────────────────────────────────────────────── def extract_header(payload: bytes) -> Optional[FrameHeader]: """ Extract protocol header from de-stuffed payload. After de-stuffing, the actual observed layout is 5 bytes: [0] CMD -- 0x10 for BW requests, 0x00 for S3 responses [1] ? -- 0x00 for BW, 0x10 for S3 (DLE/ADDR byte that survives de-stuffing) [2] SUB -- the actual command/response identifier [3] OFFSET_HI [4] OFFSET_LO Data section begins at payload[5]. Note: The protocol reference describes a 7-byte header with CMD/DLE/ADDR/FLAGS/SUB/..., but DLE+ADDR (both 0x10 on wire) are de-stuffed into single bytes by parse_bw/parse_s3, collapsing the observable header to 5 bytes. """ if len(payload) < 5: return None return FrameHeader( cmd=payload[0], sub=payload[2], offset_hi=payload[3], offset_lo=payload[4], flags=payload[1], ) def annotate_frame(frame: Frame, source: str) -> AnnotatedFrame: header = extract_header(frame.payload) if header is not None: entry = SUB_TABLE.get(header.sub) sub_name = entry[0] if entry else f"UNKNOWN_{header.sub:02X}" else: sub_name = "MALFORMED" return AnnotatedFrame(frame=frame, source=source, header=header, sub_name=sub_name) def annotate_frames(frames: list[Frame], source: str) -> list[AnnotatedFrame]: return [annotate_frame(f, source) for f in frames] def load_and_annotate(s3_path: Path, bw_path: Path) -> tuple[list[AnnotatedFrame], list[AnnotatedFrame]]: """Parse both raw files and return annotated frame lists.""" s3_blob = s3_path.read_bytes() if s3_path.exists() else b"" bw_blob = bw_path.read_bytes() if bw_path.exists() else b"" s3_frames = parse_s3(s3_blob, trailer_len=0) bw_frames = parse_bw(bw_blob, trailer_len=0, validate_checksum=True) return annotate_frames(s3_frames, "S3"), annotate_frames(bw_frames, "BW") # ────────────────────────────────────────────────────────────────────────────── # Session detection # ────────────────────────────────────────────────────────────────────────────── # BW SUB that marks the end of a compliance write session SESSION_CLOSE_SUB = 0x74 def split_into_sessions( bw_annotated: list[AnnotatedFrame], s3_annotated: list[AnnotatedFrame], ) -> list[Session]: """ Split frames into sessions. A session ends on BW SUB 0x74 (WRITE_CONFIRM_C). New session starts at the stream beginning and after each 0x74. The protocol is synchronous: BW[i] request → S3[i] response. S3 frame i belongs to the same session as BW frame i. """ if not bw_annotated and not s3_annotated: return [] sessions: list[Session] = [] session_idx = 0 bw_start = 0 # Track where we are in S3 frames — they mirror BW frame count per session s3_cursor = 0 i = 0 while i < len(bw_annotated): frame = bw_annotated[i] i += 1 is_close = ( frame.header is not None and frame.header.sub == SESSION_CLOSE_SUB ) if is_close: bw_slice = bw_annotated[bw_start:i] # S3 frames in this session match BW frame count (synchronous protocol) n_s3 = len(bw_slice) s3_slice = s3_annotated[s3_cursor : s3_cursor + n_s3] s3_cursor += n_s3 sess = Session(index=session_idx, bw_frames=bw_slice, s3_frames=s3_slice) for f in sess.all_frames: f.session_idx = session_idx sessions.append(sess) session_idx += 1 bw_start = i # Remaining frames (in-progress / no closing 0x74 yet) if bw_start < len(bw_annotated) or s3_cursor < len(s3_annotated): bw_slice = bw_annotated[bw_start:] n_s3 = len(bw_slice) s3_slice = s3_annotated[s3_cursor : s3_cursor + n_s3] # also grab any extra S3 frames beyond expected pairing if s3_cursor + n_s3 < len(s3_annotated): s3_slice = s3_annotated[s3_cursor:] if bw_slice or s3_slice: sess = Session(index=session_idx, bw_frames=bw_slice, s3_frames=s3_slice) for f in sess.all_frames: f.session_idx = session_idx sessions.append(sess) return sessions # ────────────────────────────────────────────────────────────────────────────── # Mark-based session splitting (using structured .bin log) # ────────────────────────────────────────────────────────────────────────────── # Structured .bin record types (from s3_bridge.py) _REC_BW = 0x01 _REC_S3 = 0x02 _REC_MARK = 0x03 _REC_INFO = 0x04 @dataclass class MarkSplit: """A session boundary derived from a MARK record in the structured .bin log.""" label: str bw_byte_offset: int # byte position in the flat raw_bw stream at mark time s3_byte_offset: int # byte position in the flat raw_s3 stream at mark time def parse_structured_bin(bin_blob: bytes) -> list[MarkSplit]: """ Read a structured s3_session_*.bin file and return one MarkSplit per MARK record, containing the cumulative BW and S3 byte counts at that point. Record format: [type:1][ts_us:8 LE][len:4 LE][payload:len] """ marks: list[MarkSplit] = [] bw_bytes = 0 s3_bytes = 0 pos = 0 while pos + 13 <= len(bin_blob): rec_type = bin_blob[pos] # ts_us: 8 bytes LE (we don't need it, just skip) length = struct.unpack_from(" len(bin_blob): break # truncated record payload = bin_blob[payload_start:payload_end] if rec_type == _REC_BW: bw_bytes += length elif rec_type == _REC_S3: s3_bytes += length elif rec_type == _REC_MARK: label = payload.decode("utf-8", errors="replace") marks.append(MarkSplit(label=label, bw_byte_offset=bw_bytes, s3_byte_offset=s3_bytes)) pos = payload_end return marks def split_sessions_at_marks( bw_blob: bytes, s3_blob: bytes, marks: list[MarkSplit], ) -> list[Session]: """ Split raw byte streams into sessions using mark byte offsets, then apply the standard 0x74-based sub-splitting within each mark segment. Each mark creates a new session boundary: session 0 = bytes before mark 0, session 1 = bytes between mark 0 and mark 1, etc. """ if not marks: # No marks — fall back to standard session detection bw_frames = annotate_frames(parse_bw(bw_blob, trailer_len=0, validate_checksum=True), "BW") s3_frames = annotate_frames(parse_s3(s3_blob, trailer_len=0), "S3") return split_into_sessions(bw_frames, s3_frames) # Build slice boundaries: [0 .. mark0.bw, mark0.bw .. mark1.bw, ...] bw_cuts = [m.bw_byte_offset for m in marks] + [len(bw_blob)] s3_cuts = [m.s3_byte_offset for m in marks] + [len(s3_blob)] all_sessions: list[Session] = [] session_offset = 0 bw_prev = s3_prev = 0 for seg_i, (bw_end, s3_end) in enumerate(zip(bw_cuts, s3_cuts)): bw_chunk = bw_blob[bw_prev:bw_end] s3_chunk = s3_blob[s3_prev:s3_end] bw_frames = annotate_frames(parse_bw(bw_chunk, trailer_len=0, validate_checksum=True), "BW") s3_frames = annotate_frames(parse_s3(s3_chunk, trailer_len=0), "S3") seg_sessions = split_into_sessions(bw_frames, s3_frames) # Re-index sessions so they are globally unique for sess in seg_sessions: sess.index = session_offset for f in sess.all_frames: f.session_idx = session_offset session_offset += 1 all_sessions.append(sess) bw_prev = bw_end s3_prev = s3_end return all_sessions # ────────────────────────────────────────────────────────────────────────────── # Diff engine # ────────────────────────────────────────────────────────────────────────────── def _mask_noisy(sub: int, data: bytes) -> bytearray: """ Zero out known-noisy byte ranges before diffing. For NOISY_SUBS: mask bytes 0–5 of the data section (timestamps). """ buf = bytearray(data) if sub in NOISY_SUBS and len(buf) >= 6: for k in range(6): buf[k] = 0x00 return buf HEADER_LEN = 5 # Observed de-stuffed header size: CMD + ? + SUB + OFFSET_HI + OFFSET_LO def _get_data_section(af: AnnotatedFrame) -> bytes: """ Return the data section of the frame (after the 5-byte protocol header). For S3 frames, payload still contains a trailing SUM8 byte — exclude it. For BW frames, parse_bw with validate_checksum=True already stripped it. """ payload = af.frame.payload if len(payload) < HEADER_LEN: return b"" data = payload[HEADER_LEN:] if af.source == "S3" and len(data) >= 1: # SUM8 is still present at end of S3 frame payload data = data[:-1] return data def lookup_field_name(sub: int, page_key: int, payload_offset: int) -> Optional[str]: """Return field name if the given payload offset matches a known field, else None.""" for entry in FIELD_MAP: if entry.sub != sub: continue if entry.page_key is not None and entry.page_key != page_key: continue if entry.payload_offset == payload_offset: return entry.name return None def diff_sessions(sess_a: Session, sess_b: Session) -> list[FrameDiff]: """ Compare two sessions frame-by-frame, matched by (sub, page_key). Returns a list of FrameDiff for SUBs where bytes changed. """ # Build lookup: (sub, page_key) → AnnotatedFrame for each session def index_session(sess: Session) -> dict[tuple[int, int], AnnotatedFrame]: idx: dict[tuple[int, int], AnnotatedFrame] = {} for af in sess.all_frames: if af.header is None: continue key = (af.header.sub, af.header.page_key) # Keep first occurrence per key (or we could keep all — for now, first) if key not in idx: idx[key] = af return idx idx_a = index_session(sess_a) idx_b = index_session(sess_b) results: list[FrameDiff] = [] # Only compare SUBs present in both sessions common_keys = set(idx_a.keys()) & set(idx_b.keys()) for key in sorted(common_keys): sub, page_key = key af_a = idx_a[key] af_b = idx_b[key] data_a = _mask_noisy(sub, _get_data_section(af_a)) data_b = _mask_noisy(sub, _get_data_section(af_b)) if data_a == data_b: continue # Compare byte by byte up to the shorter length diffs: list[ByteDiff] = [] max_len = max(len(data_a), len(data_b)) for offset in range(max_len): byte_a = data_a[offset] if offset < len(data_a) else None byte_b = data_b[offset] if offset < len(data_b) else None if byte_a != byte_b: # payload_offset = data_section_offset + HEADER_LEN payload_off = offset + HEADER_LEN field = lookup_field_name(sub, page_key, payload_off) diffs.append(ByteDiff( payload_offset=payload_off, before=byte_a if byte_a is not None else -1, after=byte_b if byte_b is not None else -1, field_name=field, )) if diffs: entry = SUB_TABLE.get(sub) sub_name = entry[0] if entry else f"UNKNOWN_{sub:02X}" results.append(FrameDiff(sub=sub, page_key=page_key, sub_name=sub_name, diffs=diffs)) return results # ────────────────────────────────────────────────────────────────────────────── # Report rendering # ────────────────────────────────────────────────────────────────────────────── def format_hex_dump(data: bytes, indent: str = " ") -> list[str]: """Compact 16-bytes-per-line hex dump. Returns list of lines.""" lines = [] for row_start in range(0, len(data), 16): chunk = data[row_start:row_start + 16] hex_part = " ".join(f"{b:02x}" for b in chunk) lines.append(f"{indent}{row_start:04x}: {hex_part}") return lines def render_session_report( session: Session, diffs: Optional[list[FrameDiff]], prev_session_index: Optional[int], ) -> str: lines: list[str] = [] n_bw = len(session.bw_frames) n_s3 = len(session.s3_frames) total = n_bw + n_s3 is_complete = any( af.header is not None and af.header.sub == SESSION_CLOSE_SUB for af in session.bw_frames ) status = "" if is_complete else " [IN PROGRESS]" lines.append(f"{'='*72}") lines.append(f"SESSION {session.index}{status}") lines.append(f"{'='*72}") lines.append(f"Frames: {total} (BW: {n_bw}, S3: {n_s3})") if n_bw != n_s3: lines.append(f" WARNING: BW/S3 frame count mismatch — protocol sync issue?") lines.append("") # ── Frame inventory ────────────────────────────────────────────────────── lines.append("FRAME INVENTORY") for seq_i, af in enumerate(session.all_frames): if af.header is not None: sub_hex = f"{af.header.sub:02X}" page_str = f" (page {af.header.page_key:04X})" if af.header.page_key != 0 else "" else: sub_hex = "??" page_str = "" chk = "" if af.frame.checksum_valid is False: chk = " [BAD CHECKSUM]" elif af.frame.checksum_valid is True: chk = f" [{af.frame.checksum_type}]" lines.append( f" [{af.source}] #{seq_i:<3} SUB={sub_hex} {af.sub_name:<30}{page_str}" f" len={len(af.frame.payload)}{chk}" ) lines.append("") # ── Hex dumps ──────────────────────────────────────────────────────────── lines.append("HEX DUMPS") for seq_i, af in enumerate(session.all_frames): sub_hex = f"{af.header.sub:02X}" if af.header else "??" lines.append(f" [{af.source}] #{seq_i} SUB={sub_hex} {af.sub_name}") dump_lines = format_hex_dump(af.frame.payload, indent=" ") if dump_lines: lines.extend(dump_lines) else: lines.append(" (empty payload)") lines.append("") # ── Diff section ───────────────────────────────────────────────────────── if diffs is not None: if prev_session_index is not None: lines.append(f"DIFF vs SESSION {prev_session_index}") else: lines.append("DIFF") if not diffs: lines.append(" (no changes)") else: for fd in diffs: page_str = f" (page {fd.page_key:04X})" if fd.page_key != 0 else "" lines.append(f" SUB {fd.sub:02X} ({fd.sub_name}){page_str}:") for bd in fd.diffs: field_str = f" [{bd.field_name}]" if bd.field_name else "" before_str = f"{bd.before:02x}" if bd.before >= 0 else "--" after_str = f"{bd.after:02x}" if bd.after >= 0 else "--" lines.append( f" offset [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: " f"{before_str} -> {after_str}{field_str}" ) lines.append("") return "\n".join(lines) + "\n" def write_report(session: Session, report_text: str, outdir: Path) -> Path: outdir.mkdir(parents=True, exist_ok=True) out_path = outdir / f"session_{session.index:03d}.report" out_path.write_text(report_text, encoding="utf-8") return out_path # ────────────────────────────────────────────────────────────────────────────── # Claude export # ────────────────────────────────────────────────────────────────────────────── def _hex_block(data: bytes, bytes_per_row: int = 16) -> list[str]: """Hex dump with offset + hex + ASCII columns.""" lines = [] for row in range(0, len(data), bytes_per_row): chunk = data[row:row + bytes_per_row] hex_col = " ".join(f"{b:02x}" for b in chunk) hex_col = f"{hex_col:<{bytes_per_row * 3 - 1}}" asc_col = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk) lines.append(f" {row:04x} {hex_col} |{asc_col}|") return lines def render_claude_export( sessions: list[Session], diffs: list[Optional[list[FrameDiff]]], s3_path: Optional[Path] = None, bw_path: Optional[Path] = None, ) -> str: """ Produce a single self-contained Markdown file suitable for pasting into a Claude conversation for protocol reverse-engineering assistance. Structure: 1. Context block — what this is, protocol background, field map 2. Capture summary — session count, frame counts, what changed 3. Per-diff section — one section per session pair that had changes: a. Diff table (before/after bytes, known field labels) b. Full hex dumps of ONLY the frames that changed 4. Full hex dumps of all frames in sessions with no prior comparison (session 0 baseline) """ import datetime lines: list[str] = [] now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") s3_name = s3_path.name if s3_path else "raw_s3.bin" bw_name = bw_path.name if bw_path else "raw_bw.bin" # ── 1. Context block ────────────────────────────────────────────────── lines += [ f"# Instantel MiniMate Plus — Protocol Capture Analysis", f"Generated: {now} | Source: `{s3_name}` + `{bw_name}`", "", "## Protocol Background", "", "This file contains parsed RS-232 captures from an Instantel MiniMate Plus", "seismograph communicating with Blastware PC software at 38400 baud 8N1.", "", "**Frame structure (de-stuffed payload):**", "```", " [0] CMD 0x10 = BW request, 0x00 = S3 response", " [1] ? 0x00 (BW) or 0x10 (S3)", " [2] SUB Command/response identifier (key field)", " [3] OFFSET_HI Page offset high byte", " [4] OFFSET_LO Page offset low byte", " [5+] DATA Payload data section", "```", "", "**Response SUB rule:** response_SUB = 0xFF - request_SUB (confirmed, no exceptions observed)", "", "**Known field map** (offsets from payload[0]):", "```", " SUB F7 (EVENT_INDEX_RESPONSE):", " [80] 0x52 backlight_on_time uint8 seconds", " [88] 0x58 power_save_timeout uint8 minutes", " [89] 0x59 monitoring_lcd_cycle uint16BE 65500=disabled", " SUB E5 page 0x082A (COMPLIANCE_CONFIG_RESPONSE):", " [45] 0x2D record_time float32BE seconds (7s=40E00000, 13s=41500000)", " SUB 82 (TRIGGER_CONFIG_WRITE, BW-side only):", " [22] trigger_sample_width uint8 samples", "```", "", "**Session boundary:** a compliance session ends when BW sends SUB 0x74 (WRITE_CONFIRM_C).", "Sessions are numbered from 0. The diff compares consecutive complete sessions.", "", ] # ── 2. Capture summary ──────────────────────────────────────────────── lines += ["## Capture Summary", ""] lines.append(f"Sessions found: {len(sessions)}") for sess in sessions: is_complete = any( af.header is not None and af.header.sub == SESSION_CLOSE_SUB for af in sess.bw_frames ) status = "complete" if is_complete else "partial/in-progress" n_bw, n_s3 = len(sess.bw_frames), len(sess.s3_frames) changed = len(diffs[sess.index] or []) if sess.index < len(diffs) else 0 changed_str = f" ({changed} SUBs changed vs prev)" if sess.index > 0 else " (baseline)" lines.append(f" Session {sess.index} [{status}]: BW={n_bw} S3={n_s3} frames{changed_str}") lines.append("") # ── 3. Per-diff sections ────────────────────────────────────────────── any_diffs = False for sess in sessions: sess_diffs = diffs[sess.index] if sess.index < len(diffs) else None if sess_diffs is None or sess.index == 0: continue any_diffs = True prev_idx = sess.index - 1 lines += [ f"---", f"## Diff: Session {prev_idx} -> Session {sess.index}", "", ] if not sess_diffs: lines.append("_No byte changes detected between these sessions._") lines.append("") continue # Build index of changed frames for this session (and prev) prev_sess = sessions[prev_idx] if prev_idx < len(sessions) else None for fd in sess_diffs: page_str = f" page 0x{fd.page_key:04X}" if fd.page_key != 0 else "" lines += [ f"### SUB {fd.sub:02X} — {fd.sub_name}{page_str}", "", ] # Diff table known_count = sum(1 for bd in fd.diffs if bd.field_name) unknown_count = sum(1 for bd in fd.diffs if not bd.field_name) lines.append( f"Changed bytes: **{len(fd.diffs)}** total " f"({known_count} known fields, {unknown_count} unknown)" ) lines.append("") lines.append("| Offset | Hex | Dec | Session {0} | Session {1} | Field |".format(prev_idx, sess.index)) lines.append("|--------|-----|-----|" + "-" * 12 + "|" + "-" * 12 + "|-------|") for bd in fd.diffs: before_s = f"`{bd.before:02x}`" if bd.before >= 0 else "`--`" after_s = f"`{bd.after:02x}`" if bd.after >= 0 else "`--`" before_d = str(bd.before) if bd.before >= 0 else "--" after_d = str(bd.after) if bd.after >= 0 else "--" field = f"`{bd.field_name}`" if bd.field_name else "**UNKNOWN**" lines.append( f"| [{bd.payload_offset}] 0x{bd.payload_offset:04X} " f"| {before_s}->{after_s} | {before_d}->{after_d} " f"| {before_s} | {after_s} | {field} |" ) lines.append("") # Hex dumps of the changed frame in both sessions def _find_af(target_sess: Session, sub: int, page_key: int) -> Optional[AnnotatedFrame]: for af in target_sess.all_frames: if af.header and af.header.sub == sub and af.header.page_key == page_key: return af return None af_prev = _find_af(sessions[prev_idx], fd.sub, fd.page_key) if prev_sess else None af_curr = _find_af(sess, fd.sub, fd.page_key) lines.append("**Hex dumps (full de-stuffed payload):**") lines.append("") for label, af in [(f"Session {prev_idx} (before)", af_prev), (f"Session {sess.index} (after)", af_curr)]: if af is None: lines.append(f"_{label}: frame not found_") lines.append("") continue lines.append(f"_{label}_ — {len(af.frame.payload)} bytes:") lines.append("```") lines += _hex_block(af.frame.payload) lines.append("```") lines.append("") if not any_diffs: lines += [ "---", "## Diffs", "", "_Only one session found — no diff available. " "Run a second capture with changed settings to see what moves._", "", ] # ── 4. Baseline hex dumps (session 0, all frames) ───────────────────── if sessions: baseline = sessions[0] lines += [ "---", f"## Baseline — Session 0 (all frames)", "", "Full hex dump of every frame in the first session.", "Use this to map field positions from known values.", "", ] for seq_i, af in enumerate(baseline.all_frames): sub_hex = f"{af.header.sub:02X}" if af.header else "??" page_str = f" page 0x{af.header.page_key:04X}" if af.header and af.header.page_key != 0 else "" chk_str = f" [{af.frame.checksum_type}]" if af.frame.checksum_valid else "" lines.append( f"### [{af.source}] #{seq_i} SUB {sub_hex} — {af.sub_name}{page_str}{chk_str}" ) lines.append(f"_{len(af.frame.payload)} bytes_") lines.append("```") lines += _hex_block(af.frame.payload) lines.append("```") lines.append("") lines += [ "---", "_End of analysis. To map an unknown field: change exactly one setting in Blastware,_", "_capture again, run the analyzer, and look for the offset that moved._", ] return "\n".join(lines) + "\n" def write_claude_export( sessions: list[Session], diffs: list[Optional[list[FrameDiff]]], outdir: Path, s3_path: Optional[Path] = None, bw_path: Optional[Path] = None, ) -> Path: import datetime outdir.mkdir(parents=True, exist_ok=True) stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") out_path = outdir / f"claude_export_{stamp}.md" out_path.write_text( render_claude_export(sessions, diffs, s3_path, bw_path), encoding="utf-8" ) return out_path # ────────────────────────────────────────────────────────────────────────────── # Post-processing mode # ────────────────────────────────────────────────────────────────────────────── def run_postprocess(s3_path: Path, bw_path: Path, outdir: Path, export: bool = False) -> None: print(f"s3_analyzer v{__version__}") print(f" S3 file : {s3_path}") print(f" BW file : {bw_path}") print(f" Out dir : {outdir}") print() s3_frames, bw_frames = load_and_annotate(s3_path, bw_path) print(f"Parsed: {len(s3_frames)} S3 frames, {len(bw_frames)} BW frames") sessions = split_into_sessions(bw_frames, s3_frames) print(f"Sessions: {len(sessions)}") print() all_diffs: list[Optional[list[FrameDiff]]] = [None] prev_session: Optional[Session] = None for sess in sessions: sess_diffs: Optional[list[FrameDiff]] = None prev_idx: Optional[int] = None if prev_session is not None: sess_diffs = diff_sessions(prev_session, sess) prev_idx = prev_session.index all_diffs.append(sess_diffs) report = render_session_report(sess, sess_diffs, prev_idx) out_path = write_report(sess, report, outdir) n_diffs = len(sess_diffs) if sess_diffs else 0 print(f" Session {sess.index}: {len(sess.all_frames)} frames, {n_diffs} changed SUBs -> {out_path.name}") prev_session = sess if export: export_path = write_claude_export(sessions, all_diffs, outdir, s3_path, bw_path) print(f"\n Claude export -> {export_path.name}") print() print(f"Reports written to: {outdir}") # ────────────────────────────────────────────────────────────────────────────── # Live mode # ────────────────────────────────────────────────────────────────────────────── def live_loop( s3_path: Path, bw_path: Path, outdir: Path, poll_interval: float = 0.05, ) -> None: """ Tail both raw files continuously, re-parsing on new bytes. Emits a session report as soon as BW SUB 0x74 is detected. """ print(f"s3_analyzer v{__version__} — LIVE MODE") print(f" S3 file : {s3_path}") print(f" BW file : {bw_path}") print(f" Out dir : {outdir}") print(f" Poll : {poll_interval*1000:.0f}ms") print("Waiting for frames... (Ctrl+C to stop)") print() s3_buf = bytearray() bw_buf = bytearray() s3_pos = 0 bw_pos = 0 last_s3_count = 0 last_bw_count = 0 sessions: list[Session] = [] prev_complete_session: Optional[Session] = None try: while True: # Read new bytes from both files changed = False if s3_path.exists(): with s3_path.open("rb") as fh: fh.seek(s3_pos) new_bytes = fh.read() if new_bytes: s3_buf.extend(new_bytes) s3_pos += len(new_bytes) changed = True if bw_path.exists(): with bw_path.open("rb") as fh: fh.seek(bw_pos) new_bytes = fh.read() if new_bytes: bw_buf.extend(new_bytes) bw_pos += len(new_bytes) changed = True if changed: s3_frames_raw = parse_s3(bytes(s3_buf), trailer_len=0) bw_frames_raw = parse_bw(bytes(bw_buf), trailer_len=0, validate_checksum=True) s3_annotated = annotate_frames(s3_frames_raw, "S3") bw_annotated = annotate_frames(bw_frames_raw, "BW") new_s3 = len(s3_annotated) - last_s3_count new_bw = len(bw_annotated) - last_bw_count if new_s3 > 0 or new_bw > 0: last_s3_count = len(s3_annotated) last_bw_count = len(bw_annotated) print(f"[+] S3:{len(s3_annotated)} BW:{len(bw_annotated)} frames", end="") # Annotate newest BW frame if bw_annotated: latest_bw = bw_annotated[-1] sub_str = f"SUB={latest_bw.header.sub:02X}" if latest_bw.header else "SUB=??" print(f" latest BW {sub_str} {latest_bw.sub_name}", end="") print() # Check for session close all_sessions = split_into_sessions(bw_annotated, s3_annotated) # A complete session has the closing 0x74 complete_sessions = [ s for s in all_sessions if any( af.header is not None and af.header.sub == SESSION_CLOSE_SUB for af in s.bw_frames ) ] # Emit reports for newly completed sessions for sess in complete_sessions[len(sessions):]: diffs: Optional[list[FrameDiff]] = None prev_idx: Optional[int] = None if prev_complete_session is not None: diffs = diff_sessions(prev_complete_session, sess) prev_idx = prev_complete_session.index report = render_session_report(sess, diffs, prev_idx) out_path = write_report(sess, report, outdir) n_diffs = len(diffs) if diffs else 0 print(f"\n [+] Session {sess.index} complete: {len(sess.all_frames)} frames, " f"{n_diffs} changed SUBs -> {out_path.name}\n") prev_complete_session = sess sessions = complete_sessions time.sleep(poll_interval) except KeyboardInterrupt: print("\nStopped.") # Emit any in-progress (incomplete) session as a partial report if s3_buf or bw_buf: s3_frames_raw = parse_s3(bytes(s3_buf), trailer_len=0) bw_frames_raw = parse_bw(bytes(bw_buf), trailer_len=0, validate_checksum=True) s3_annotated = annotate_frames(s3_frames_raw, "S3") bw_annotated = annotate_frames(bw_frames_raw, "BW") all_sessions = split_into_sessions(bw_annotated, s3_annotated) incomplete = [ s for s in all_sessions if not any( af.header is not None and af.header.sub == SESSION_CLOSE_SUB for af in s.bw_frames ) ] for sess in incomplete: report = render_session_report(sess, diffs=None, prev_session_index=None) out_path = write_report(sess, report, outdir) print(f" Partial session {sess.index} written -> {out_path.name}") # ────────────────────────────────────────────────────────────────────────────── # CLI # ────────────────────────────────────────────────────────────────────────────── def main() -> None: ap = argparse.ArgumentParser( description="s3_analyzer — Instantel MiniMate Plus live protocol analyzer" ) ap.add_argument("--s3", type=Path, required=True, help="Path to raw_s3.bin (S3→BW raw capture)") ap.add_argument("--bw", type=Path, required=True, help="Path to raw_bw.bin (BW→S3 raw capture)") ap.add_argument("--live", action="store_true", help="Live mode: tail files as they grow") ap.add_argument("--export", action="store_true", help="Also write a claude_export_.md file for Claude analysis") ap.add_argument("--outdir", type=Path, default=None, help="Output directory for .report files (default: same as input)") ap.add_argument("--poll", type=float, default=0.05, help="Live mode poll interval in seconds (default: 0.05)") args = ap.parse_args() outdir = args.outdir if outdir is None: outdir = args.s3.parent if args.live: live_loop(args.s3, args.bw, outdir, poll_interval=args.poll) else: if not args.s3.exists(): print(f"ERROR: S3 file not found: {args.s3}", file=sys.stderr) sys.exit(1) if not args.bw.exists(): print(f"ERROR: BW file not found: {args.bw}", file=sys.stderr) sys.exit(1) run_postprocess(args.s3, args.bw, outdir, export=args.export) if __name__ == "__main__": main()