Files
seismo-relay/parsers/s3_analyzer.py

1205 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
s3_analyzer.py — Live protocol analysis tool for Instantel MiniMate Plus RS-232.
Reads raw_s3.bin and raw_bw.bin (produced by s3_bridge.py), parses DLE frames,
groups into sessions, auto-diffs consecutive sessions, and annotates known fields.
Usage:
python s3_analyzer.py --s3 raw_s3.bin --bw raw_bw.bin [--live] [--outdir DIR]
"""
from __future__ import annotations
import argparse
import struct
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
# Allow running from any working directory
sys.path.insert(0, str(Path(__file__).parent))
from s3_parser import Frame, parse_bw, parse_s3 # noqa: E402
__version__ = "0.1.0"
# ──────────────────────────────────────────────────────────────────────────────
# Protocol constants
# ──────────────────────────────────────────────────────────────────────────────
# SUB_TABLE: sub_byte → (name, direction, notes)
# direction: "BW→S3", "S3→BW", or "both"
SUB_TABLE: dict[int, tuple[str, str, str]] = {
# BW→S3 read requests
0x5B: ("POLL", "BW→S3", "Keepalive / device discovery"),
0x01: ("FULL_CONFIG_READ", "BW→S3", "~0x98 bytes; firmware, model, serial, channel config"),
0x06: ("CHANNEL_CONFIG_READ", "BW→S3", "0x24 bytes; channel configuration block"),
0x08: ("EVENT_INDEX_READ", "BW→S3", "0x58 bytes; event count and record pointers"),
0x0A: ("WAVEFORM_HEADER_READ", "BW→S3", "0x30 bytes/page; waveform header keyed by timestamp"),
0x0C: ("FULL_WAVEFORM_READ", "BW→S3", "0xD2 bytes/page × 2; project strings, PPV floats"),
0x1C: ("TRIGGER_CONFIG_READ", "BW→S3", "0x2C bytes; trigger settings block"),
0x09: ("UNKNOWN_READ_A", "BW→S3", "0xCA bytes response (F6); purpose unknown"),
0x1A: ("COMPLIANCE_CONFIG_READ", "BW→S3", "Large block (E5); trigger/alarm floats, unit strings"),
0x2E: ("UNKNOWN_READ_B", "BW→S3", "0x1A bytes response (D1); purpose unknown"),
# BW→S3 write commands
0x68: ("EVENT_INDEX_WRITE", "BW→S3", "Mirrors SUB 08 read; event count and timestamps"),
0x69: ("WAVEFORM_DATA_WRITE", "BW→S3", "0xCA bytes; mirrors SUB 09"),
0x71: ("COMPLIANCE_STRINGS_WRITE", "BW→S3", "Compliance config + all project string fields"),
0x72: ("WRITE_CONFIRM_A", "BW→S3", "Short frame; commit step after 0x71"),
0x73: ("WRITE_CONFIRM_B", "BW→S3", "Short frame"),
0x74: ("WRITE_CONFIRM_C", "BW→S3", "Short frame; final session-close confirm"),
0x82: ("TRIGGER_CONFIG_WRITE", "BW→S3", "0x1C bytes; trigger config block; mirrors SUB 1C"),
0x83: ("TRIGGER_WRITE_CONFIRM", "BW→S3", "Short frame; commit step after 0x82"),
# S3→BW responses
0xA4: ("POLL_RESPONSE", "S3→BW", "Response to SUB 5B poll"),
0xFE: ("FULL_CONFIG_RESPONSE", "S3→BW", "Response to SUB 01"),
0xF9: ("CHANNEL_CONFIG_RESPONSE", "S3→BW", "Response to SUB 06"),
0xF7: ("EVENT_INDEX_RESPONSE", "S3→BW", "Response to SUB 08; contains backlight/power-save"),
0xF5: ("WAVEFORM_HEADER_RESPONSE", "S3→BW", "Response to SUB 0A"),
0xF3: ("FULL_WAVEFORM_RESPONSE", "S3→BW", "Response to SUB 0C; project strings, PPV floats"),
0xE3: ("TRIGGER_CONFIG_RESPONSE", "S3→BW", "Response to SUB 1C; contains timestamps"),
0xF6: ("UNKNOWN_RESPONSE_A", "S3→BW", "Response to SUB 09; 0xCA bytes"),
0xE5: ("COMPLIANCE_CONFIG_RESPONSE","S3→BW", "Response to SUB 1A; record time in page 2"),
0xD1: ("UNKNOWN_RESPONSE_B", "S3→BW", "Response to SUB 2E; 0x1A bytes"),
0xEA: ("SERIAL_NUMBER_RESPONSE", "S3→BW", "0x0A bytes; serial number + firmware minor version"),
# Short ack responses to writes (0xFF - write_sub)
0x8E: ("WRITE_CONFIRM_RESPONSE_71", "S3→BW", "Ack for SUB 71 COMPLIANCE_STRINGS_WRITE"),
0x8D: ("WRITE_CONFIRM_RESPONSE_72", "S3→BW", "Ack for SUB 72 WRITE_CONFIRM_A"),
0x8C: ("WRITE_CONFIRM_RESPONSE_73", "S3→BW", "Ack for SUB 73 WRITE_CONFIRM_B"),
0x8B: ("WRITE_CONFIRM_RESPONSE_74", "S3→BW", "Ack for SUB 74 WRITE_CONFIRM_C"),
0x97: ("WRITE_CONFIRM_RESPONSE_68", "S3→BW", "Ack for SUB 68 EVENT_INDEX_WRITE"),
0x96: ("WRITE_CONFIRM_RESPONSE_69", "S3→BW", "Ack for SUB 69 WAVEFORM_DATA_WRITE"),
0x7D: ("WRITE_CONFIRM_RESPONSE_82", "S3→BW", "Ack for SUB 82 TRIGGER_CONFIG_WRITE"),
0x7C: ("WRITE_CONFIRM_RESPONSE_83", "S3→BW", "Ack for SUB 83 TRIGGER_WRITE_CONFIRM"),
}
# SUBs whose data-section bytes 05 are known timestamps (suppress in diffs)
NOISY_SUBS: set[int] = {0xE3, 0xF7, 0xF5}
# E5 page 2 key: the OFFSET_HI:OFFSET_LO that identifies the data page
# E5 page 1 (length probe) has offset 0x0000; page 2 has offset 0x082A
E5_PAGE2_KEY = 0x082A
# FieldEntry: (sub, page_key_or_none, payload_offset, field_name, type_hint, notes)
# payload_offset = offset from start of Frame.payload (not data section, not wire)
# Exception: for SUB 0x82, offset [22] is from full de-stuffed payload[0] per protocol ref.
@dataclass(frozen=True)
class FieldEntry:
sub: int
page_key: Optional[int] # None = any / all pages
payload_offset: int # offset from frame.payload[0]
name: str
type_hint: str
notes: str
FIELD_MAP: list[FieldEntry] = [
# F7 (EVENT_INDEX_RESPONSE) — data section starts at payload[5]
# Protocol ref: backlight at data+0x4B = payload[5+0x4B] = payload[80]
FieldEntry(0xF7, None, 5 + 0x4B, "backlight_on_time", "uint8", "seconds; 0=off"),
FieldEntry(0xF7, None, 5 + 0x53, "power_save_timeout", "uint8", "minutes; 0=disabled"),
FieldEntry(0xF7, None, 5 + 0x54, "monitoring_lcd_cycle", "uint16 BE","65500=disabled"),
# E5 page 2 (COMPLIANCE_CONFIG_RESPONSE) — record time at data+0x28
FieldEntry(0xE5, E5_PAGE2_KEY, 5 + 0x28, "record_time", "float32 BE", "seconds; 7s=40E00000, 13s=41500000"),
# SUB 0x82 (TRIGGER_CONFIG_WRITE) — BW→S3 write
# Protocol ref offset [22] is from the de-stuffed payload[0], confirmed from raw_bw.bin
FieldEntry(0x82, None, 22, "trigger_sample_width", "uint8", "samples; mode-gated, BW-side write only"),
]
# ──────────────────────────────────────────────────────────────────────────────
# Data structures
# ──────────────────────────────────────────────────────────────────────────────
@dataclass
class FrameHeader:
cmd: int
sub: int
offset_hi: int
offset_lo: int
flags: int
@property
def page_key(self) -> int:
return (self.offset_hi << 8) | self.offset_lo
@dataclass
class AnnotatedFrame:
frame: Frame
source: str # "BW" or "S3"
header: Optional[FrameHeader] # None if payload < 7 bytes (malformed/short)
sub_name: str
session_idx: int = -1
@dataclass
class Session:
index: int
bw_frames: list[AnnotatedFrame]
s3_frames: list[AnnotatedFrame]
# None = infer from SUB 0x74 presence; True/False = explicitly set by splitter
complete: Optional[bool] = None
def is_complete(self) -> bool:
"""A session is complete if explicitly marked, or if it contains SUB 0x74."""
if self.complete is not None:
return self.complete
return any(af.header is not None and af.header.sub == SESSION_CLOSE_SUB
for af in self.bw_frames)
@property
def all_frames(self) -> list[AnnotatedFrame]:
"""Interleave BW/S3 in synchronous protocol order: BW[0], S3[0], BW[1], S3[1]..."""
result: list[AnnotatedFrame] = []
for i in range(max(len(self.bw_frames), len(self.s3_frames))):
if i < len(self.bw_frames):
result.append(self.bw_frames[i])
if i < len(self.s3_frames):
result.append(self.s3_frames[i])
return result
@dataclass
class ByteDiff:
payload_offset: int
before: int
after: int
field_name: Optional[str]
@dataclass
class FrameDiff:
sub: int
page_key: int
sub_name: str
diffs: list[ByteDiff]
# ──────────────────────────────────────────────────────────────────────────────
# Parsing helpers
# ──────────────────────────────────────────────────────────────────────────────
def extract_header(payload: bytes) -> Optional[FrameHeader]:
"""
Extract protocol header from de-stuffed payload.
After de-stuffing, the actual observed layout is 5 bytes:
[0] CMD -- 0x10 for BW requests, 0x00 for S3 responses
[1] ? -- 0x00 for BW, 0x10 for S3 (DLE/ADDR byte that survives de-stuffing)
[2] SUB -- the actual command/response identifier
[3] OFFSET_HI
[4] OFFSET_LO
Data section begins at payload[5].
Note: The protocol reference describes a 7-byte header with CMD/DLE/ADDR/FLAGS/SUB/...,
but DLE+ADDR (both 0x10 on wire) are de-stuffed into single bytes by parse_bw/parse_s3,
collapsing the observable header to 5 bytes.
"""
if len(payload) < 5:
return None
return FrameHeader(
cmd=payload[0],
sub=payload[2],
offset_hi=payload[3],
offset_lo=payload[4],
flags=payload[1],
)
def annotate_frame(frame: Frame, source: str) -> AnnotatedFrame:
header = extract_header(frame.payload)
if header is not None:
entry = SUB_TABLE.get(header.sub)
sub_name = entry[0] if entry else f"UNKNOWN_{header.sub:02X}"
else:
sub_name = "MALFORMED"
return AnnotatedFrame(frame=frame, source=source, header=header, sub_name=sub_name)
def annotate_frames(frames: list[Frame], source: str) -> list[AnnotatedFrame]:
return [annotate_frame(f, source) for f in frames]
def load_and_annotate(s3_path: Path, bw_path: Path) -> tuple[list[AnnotatedFrame], list[AnnotatedFrame]]:
"""Parse both raw files and return annotated frame lists."""
s3_blob = s3_path.read_bytes() if s3_path.exists() else b""
bw_blob = bw_path.read_bytes() if bw_path.exists() else b""
s3_frames = parse_s3(s3_blob, trailer_len=0)
bw_frames = parse_bw(bw_blob, trailer_len=0, validate_checksum=True)
return annotate_frames(s3_frames, "S3"), annotate_frames(bw_frames, "BW")
# ──────────────────────────────────────────────────────────────────────────────
# Session detection
# ──────────────────────────────────────────────────────────────────────────────
# BW SUB that marks the end of a compliance write session
SESSION_CLOSE_SUB = 0x74
def split_into_sessions(
bw_annotated: list[AnnotatedFrame],
s3_annotated: list[AnnotatedFrame],
) -> list[Session]:
"""
Split frames into sessions. A session ends on BW SUB 0x74 (WRITE_CONFIRM_C).
New session starts at the stream beginning and after each 0x74.
The protocol is synchronous: BW[i] request → S3[i] response. S3 frame i
belongs to the same session as BW frame i.
"""
if not bw_annotated and not s3_annotated:
return []
sessions: list[Session] = []
session_idx = 0
bw_start = 0
# Track where we are in S3 frames — they mirror BW frame count per session
s3_cursor = 0
i = 0
while i < len(bw_annotated):
frame = bw_annotated[i]
i += 1
is_close = (
frame.header is not None and frame.header.sub == SESSION_CLOSE_SUB
)
if is_close:
bw_slice = bw_annotated[bw_start:i]
# S3 frames in this session match BW frame count (synchronous protocol)
n_s3 = len(bw_slice)
s3_slice = s3_annotated[s3_cursor : s3_cursor + n_s3]
s3_cursor += n_s3
sess = Session(index=session_idx, bw_frames=bw_slice, s3_frames=s3_slice)
for f in sess.all_frames:
f.session_idx = session_idx
sessions.append(sess)
session_idx += 1
bw_start = i
# Remaining frames (in-progress / no closing 0x74 yet)
if bw_start < len(bw_annotated) or s3_cursor < len(s3_annotated):
bw_slice = bw_annotated[bw_start:]
n_s3 = len(bw_slice)
s3_slice = s3_annotated[s3_cursor : s3_cursor + n_s3]
# also grab any extra S3 frames beyond expected pairing
if s3_cursor + n_s3 < len(s3_annotated):
s3_slice = s3_annotated[s3_cursor:]
if bw_slice or s3_slice:
sess = Session(index=session_idx, bw_frames=bw_slice, s3_frames=s3_slice)
for f in sess.all_frames:
f.session_idx = session_idx
sessions.append(sess)
return sessions
# ──────────────────────────────────────────────────────────────────────────────
# Mark-based session splitting (using structured .bin log)
# ──────────────────────────────────────────────────────────────────────────────
# Structured .bin record types (from s3_bridge.py)
_REC_BW = 0x01
_REC_S3 = 0x02
_REC_MARK = 0x03
_REC_INFO = 0x04
@dataclass
class MarkSplit:
"""A session boundary derived from a MARK record in the structured .bin log."""
label: str
bw_byte_offset: int # byte position in the flat raw_bw stream at mark time
s3_byte_offset: int # byte position in the flat raw_s3 stream at mark time
def parse_structured_bin(bin_blob: bytes) -> list[MarkSplit]:
"""
Read a structured s3_session_*.bin file and return one MarkSplit per MARK
record, containing the cumulative BW and S3 byte counts at that point.
Record format: [type:1][ts_us:8 LE][len:4 LE][payload:len]
"""
marks: list[MarkSplit] = []
bw_bytes = 0
s3_bytes = 0
pos = 0
while pos + 13 <= len(bin_blob):
rec_type = bin_blob[pos]
# ts_us: 8 bytes LE (we don't need it, just skip)
length = struct.unpack_from("<I", bin_blob, pos + 9)[0]
payload_start = pos + 13
payload_end = payload_start + length
if payload_end > len(bin_blob):
break # truncated record
payload = bin_blob[payload_start:payload_end]
if rec_type == _REC_BW:
bw_bytes += length
elif rec_type == _REC_S3:
s3_bytes += length
elif rec_type == _REC_MARK:
label = payload.decode("utf-8", errors="replace")
# Skip auto-generated bridge lifecycle marks — only keep user marks
if label.startswith("SESSION START") or label.startswith("SESSION END"):
pass
else:
marks.append(MarkSplit(label=label,
bw_byte_offset=bw_bytes,
s3_byte_offset=s3_bytes))
pos = payload_end
return marks
def split_sessions_at_marks(
bw_blob: bytes,
s3_blob: bytes,
marks: list[MarkSplit],
) -> list[Session]:
"""
Split raw byte streams into sessions using mark byte offsets, then apply
the standard 0x74-based sub-splitting within each mark segment.
Each mark creates a new session boundary: session 0 = bytes before mark 0,
session 1 = bytes between mark 0 and mark 1, etc.
"""
if not marks:
# No marks — fall back to standard session detection
bw_frames = annotate_frames(parse_bw(bw_blob, trailer_len=0,
validate_checksum=True), "BW")
s3_frames = annotate_frames(parse_s3(s3_blob, trailer_len=0), "S3")
return split_into_sessions(bw_frames, s3_frames)
# Build slice boundaries: [0 .. mark0.bw, mark0.bw .. mark1.bw, ...]
bw_cuts = [m.bw_byte_offset for m in marks] + [len(bw_blob)]
s3_cuts = [m.s3_byte_offset for m in marks] + [len(s3_blob)]
all_sessions: list[Session] = []
session_offset = 0
bw_prev = s3_prev = 0
n_segments = len(bw_cuts)
for seg_i, (bw_end, s3_end) in enumerate(zip(bw_cuts, s3_cuts)):
bw_chunk = bw_blob[bw_prev:bw_end]
s3_chunk = s3_blob[s3_prev:s3_end]
bw_frames = annotate_frames(parse_bw(bw_chunk, trailer_len=0,
validate_checksum=True), "BW")
s3_frames = annotate_frames(parse_s3(s3_chunk, trailer_len=0), "S3")
seg_sessions = split_into_sessions(bw_frames, s3_frames)
# A mark-bounded segment is complete by definition — the user placed the
# mark after the read finished. Only the last segment (trailing, unbounded)
# may be genuinely in-progress.
is_last_segment = (seg_i == n_segments - 1)
# Re-index sessions so they are globally unique
for sess in seg_sessions:
sess.index = session_offset
for f in sess.all_frames:
f.session_idx = session_offset
# Explicitly mark completeness: mark-bounded segments are complete;
# the trailing segment falls back to 0x74 inference.
if not is_last_segment:
sess.complete = True
session_offset += 1
all_sessions.append(sess)
bw_prev = bw_end
s3_prev = s3_end
return all_sessions
# ──────────────────────────────────────────────────────────────────────────────
# Diff engine
# ──────────────────────────────────────────────────────────────────────────────
def _mask_noisy(sub: int, data: bytes) -> bytearray:
"""
Zero out known-noisy byte ranges before diffing.
For NOISY_SUBS: mask bytes 05 of the data section (timestamps).
"""
buf = bytearray(data)
if sub in NOISY_SUBS and len(buf) >= 6:
for k in range(6):
buf[k] = 0x00
return buf
HEADER_LEN = 5 # Observed de-stuffed header size: CMD + ? + SUB + OFFSET_HI + OFFSET_LO
def _get_data_section(af: AnnotatedFrame) -> bytes:
"""
Return the data section of the frame (after the 5-byte protocol header).
For S3 frames, payload still contains a trailing SUM8 byte — exclude it.
For BW frames, parse_bw with validate_checksum=True already stripped it.
"""
payload = af.frame.payload
if len(payload) < HEADER_LEN:
return b""
data = payload[HEADER_LEN:]
if af.source == "S3" and len(data) >= 1:
# SUM8 is still present at end of S3 frame payload
data = data[:-1]
return data
def lookup_field_name(sub: int, page_key: int, payload_offset: int) -> Optional[str]:
"""Return field name if the given payload offset matches a known field, else None."""
for entry in FIELD_MAP:
if entry.sub != sub:
continue
if entry.page_key is not None and entry.page_key != page_key:
continue
if entry.payload_offset == payload_offset:
return entry.name
return None
def _extract_a4_inner_frames(payload: bytes) -> list[tuple[int, int, bytes]]:
"""
Parse the inner sub-frame stream packed inside an A4 (POLL_RESPONSE) payload.
The payload is a sequence of inner frames, each starting with DLE STX (10 02)
and delimited by ACK (41) before the next DLE STX. The inner frame body
(after the 10 02 preamble) has the same 5-byte header layout as outer frames:
[0] 00
[1] 10
[2] SUB
[3] OFFSET_HI (page_key high byte)
[4] OFFSET_LO (page_key low byte)
[5+] data
Returns a list of (sub, page_key, data_bytes) — one entry per inner frame,
keeping ALL occurrences (not deduped), so the caller can decide how to match.
"""
DLE, STX, ACK = 0x10, 0x02, 0x41
results: list[tuple[int, int, bytes]] = []
# Collect start positions of each inner frame (offset of the DLE STX)
starts: list[int] = []
i = 0
# First frame may begin at offset 0 with DLE STX directly
if len(payload) >= 2 and payload[0] == DLE and payload[1] == STX:
starts.append(0)
i = 2
while i < len(payload) - 2:
if payload[i] == ACK and payload[i + 1] == DLE and payload[i + 2] == STX:
starts.append(i + 1) # point at the DLE
i += 3
else:
i += 1
for k, s in enumerate(starts):
# Body starts after DLE STX (2 bytes)
body_start = s + 2
body_end = starts[k + 1] - 1 if k + 1 < len(starts) else len(payload)
body = payload[body_start:body_end]
if len(body) < 5:
continue
# body[0]=0x00, body[1]=0x10, body[2]=SUB, body[3]=OFFSET_HI, body[4]=OFFSET_LO
sub = body[2]
page_key = (body[3] << 8) | body[4]
data = body[5:]
results.append((sub, page_key, data))
return results
def _diff_a4_payloads(payload_a: bytes, payload_b: bytes) -> list[ByteDiff]:
"""
Diff two A4 container payloads at the inner sub-frame level.
Inner frames are matched by (sub, page_key). For each pair of matching
inner frames whose data differs, the changed bytes are reported with
payload_offset encoded as: (inner_frame_index << 16) | byte_offset_in_data.
Inner frames present in one payload but not the other are reported as a
single synthetic ByteDiff entry with before/after = -1 / -2 respectively,
and field_name describing the missing inner SUB.
The high-16 / low-16 split in payload_offset lets the GUI render these
differently if desired, but they degrade gracefully in the existing renderer.
"""
frames_a = _extract_a4_inner_frames(payload_a)
frames_b = _extract_a4_inner_frames(payload_b)
# Build multimap: (sub, page_key) → list of data blobs, preserving order
def index(frames):
idx: dict[tuple[int, int], list[bytes]] = {}
for sub, pk, data in frames:
idx.setdefault((sub, pk), []).append(data)
return idx
idx_a = index(frames_a)
idx_b = index(frames_b)
all_keys = sorted(set(idx_a) | set(idx_b))
diffs: list[ByteDiff] = []
for sub, pk in all_keys:
list_a = idx_a.get((sub, pk), [])
list_b = idx_b.get((sub, pk), [])
# Pair up by position; extras are treated as added/removed
n = max(len(list_a), len(list_b))
for pos in range(n):
da = list_a[pos] if pos < len(list_a) else None
db = list_b[pos] if pos < len(list_b) else None
if da is None:
# Inner frame added in B
entry = SUB_TABLE.get(sub)
name = entry[0] if entry else f"UNKNOWN_{sub:02X}"
diffs.append(ByteDiff(
payload_offset=(sub << 16) | (pk & 0xFFFF),
before=-1,
after=-2,
field_name=f"[A4 inner] SUB {sub:02X} ({name}) pk={pk:04X} added",
))
continue
if db is None:
entry = SUB_TABLE.get(sub)
name = entry[0] if entry else f"UNKNOWN_{sub:02X}"
diffs.append(ByteDiff(
payload_offset=(sub << 16) | (pk & 0xFFFF),
before=-2,
after=-1,
field_name=f"[A4 inner] SUB {sub:02X} ({name}) pk={pk:04X} removed",
))
continue
# Both present — byte diff the data sections
da_m = _mask_noisy(sub, da)
db_m = _mask_noisy(sub, db)
if da_m == db_m:
continue
max_len = max(len(da_m), len(db_m))
for off in range(max_len):
ba = da_m[off] if off < len(da_m) else None
bb = db_m[off] if off < len(db_m) else None
if ba != bb:
field = lookup_field_name(sub, pk, off + HEADER_LEN)
diffs.append(ByteDiff(
payload_offset=(sub << 16) | (off & 0xFFFF),
before=ba if ba is not None else -1,
after=bb if bb is not None else -1,
field_name=field or f"[A4:{sub:02X} pk={pk:04X}] off={off}",
))
return diffs
def diff_sessions(sess_a: Session, sess_b: Session) -> list[FrameDiff]:
"""
Compare two sessions frame-by-frame, matched by (sub, page_key).
Returns a list of FrameDiff for SUBs where bytes changed.
"""
# Build lookup: (sub, page_key) → AnnotatedFrame for each session
def index_session(sess: Session) -> dict[tuple[int, int], AnnotatedFrame]:
idx: dict[tuple[int, int], AnnotatedFrame] = {}
for af in sess.all_frames:
if af.header is None:
continue
key = (af.header.sub, af.header.page_key)
# Keep first occurrence per key (or we could keep all — for now, first)
if key not in idx:
idx[key] = af
return idx
idx_a = index_session(sess_a)
idx_b = index_session(sess_b)
results: list[FrameDiff] = []
# Only compare SUBs present in both sessions
common_keys = set(idx_a.keys()) & set(idx_b.keys())
for key in sorted(common_keys):
sub, page_key = key
af_a = idx_a[key]
af_b = idx_b[key]
# A4 is a container frame — diff at the inner sub-frame level to avoid
# phase-shift noise when the number of embedded records differs.
if sub == 0xA4:
diffs = _diff_a4_payloads(af_a.frame.payload, af_b.frame.payload)
if diffs:
entry = SUB_TABLE.get(sub)
sub_name = entry[0] if entry else f"UNKNOWN_{sub:02X}"
results.append(FrameDiff(sub=sub, page_key=page_key, sub_name=sub_name, diffs=diffs))
continue
data_a = _mask_noisy(sub, _get_data_section(af_a))
data_b = _mask_noisy(sub, _get_data_section(af_b))
if data_a == data_b:
continue
# Compare byte by byte up to the shorter length
diffs: list[ByteDiff] = []
max_len = max(len(data_a), len(data_b))
for offset in range(max_len):
byte_a = data_a[offset] if offset < len(data_a) else None
byte_b = data_b[offset] if offset < len(data_b) else None
if byte_a != byte_b:
# payload_offset = data_section_offset + HEADER_LEN
payload_off = offset + HEADER_LEN
field = lookup_field_name(sub, page_key, payload_off)
diffs.append(ByteDiff(
payload_offset=payload_off,
before=byte_a if byte_a is not None else -1,
after=byte_b if byte_b is not None else -1,
field_name=field,
))
if diffs:
entry = SUB_TABLE.get(sub)
sub_name = entry[0] if entry else f"UNKNOWN_{sub:02X}"
results.append(FrameDiff(sub=sub, page_key=page_key, sub_name=sub_name, diffs=diffs))
return results
# ──────────────────────────────────────────────────────────────────────────────
# Report rendering
# ──────────────────────────────────────────────────────────────────────────────
def format_hex_dump(data: bytes, indent: str = " ") -> list[str]:
"""Compact 16-bytes-per-line hex dump. Returns list of lines."""
lines = []
for row_start in range(0, len(data), 16):
chunk = data[row_start:row_start + 16]
hex_part = " ".join(f"{b:02x}" for b in chunk)
lines.append(f"{indent}{row_start:04x}: {hex_part}")
return lines
def render_session_report(
session: Session,
diffs: Optional[list[FrameDiff]],
prev_session_index: Optional[int],
) -> str:
lines: list[str] = []
n_bw = len(session.bw_frames)
n_s3 = len(session.s3_frames)
total = n_bw + n_s3
status = "" if session.is_complete() else " [IN PROGRESS]"
lines.append(f"{'='*72}")
lines.append(f"SESSION {session.index}{status}")
lines.append(f"{'='*72}")
lines.append(f"Frames: {total} (BW: {n_bw}, S3: {n_s3})")
if n_bw != n_s3:
lines.append(f" WARNING: BW/S3 frame count mismatch — protocol sync issue?")
lines.append("")
# ── Frame inventory ──────────────────────────────────────────────────────
lines.append("FRAME INVENTORY")
for seq_i, af in enumerate(session.all_frames):
if af.header is not None:
sub_hex = f"{af.header.sub:02X}"
page_str = f" (page {af.header.page_key:04X})" if af.header.page_key != 0 else ""
else:
sub_hex = "??"
page_str = ""
chk = ""
if af.frame.checksum_valid is False:
chk = " [BAD CHECKSUM]"
elif af.frame.checksum_valid is True:
chk = f" [{af.frame.checksum_type}]"
lines.append(
f" [{af.source}] #{seq_i:<3} SUB={sub_hex} {af.sub_name:<30}{page_str}"
f" len={len(af.frame.payload)}{chk}"
)
lines.append("")
# ── Hex dumps ────────────────────────────────────────────────────────────
lines.append("HEX DUMPS")
for seq_i, af in enumerate(session.all_frames):
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
lines.append(f" [{af.source}] #{seq_i} SUB={sub_hex} {af.sub_name}")
dump_lines = format_hex_dump(af.frame.payload, indent=" ")
if dump_lines:
lines.extend(dump_lines)
else:
lines.append(" (empty payload)")
lines.append("")
# ── Diff section ─────────────────────────────────────────────────────────
if diffs is not None:
if prev_session_index is not None:
lines.append(f"DIFF vs SESSION {prev_session_index}")
else:
lines.append("DIFF")
if not diffs:
lines.append(" (no changes)")
else:
for fd in diffs:
page_str = f" (page {fd.page_key:04X})" if fd.page_key != 0 else ""
lines.append(f" SUB {fd.sub:02X} ({fd.sub_name}){page_str}:")
for bd in fd.diffs:
field_str = f" [{bd.field_name}]" if bd.field_name else ""
before_str = f"{bd.before:02x}" if bd.before >= 0 else "--"
after_str = f"{bd.after:02x}" if bd.after >= 0 else "--"
lines.append(
f" offset [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: "
f"{before_str} -> {after_str}{field_str}"
)
lines.append("")
return "\n".join(lines) + "\n"
def write_report(session: Session, report_text: str, outdir: Path) -> Path:
outdir.mkdir(parents=True, exist_ok=True)
out_path = outdir / f"session_{session.index:03d}.report"
out_path.write_text(report_text, encoding="utf-8")
return out_path
# ──────────────────────────────────────────────────────────────────────────────
# Claude export
# ──────────────────────────────────────────────────────────────────────────────
def _hex_block(data: bytes, bytes_per_row: int = 16) -> list[str]:
"""Hex dump with offset + hex + ASCII columns."""
lines = []
for row in range(0, len(data), bytes_per_row):
chunk = data[row:row + bytes_per_row]
hex_col = " ".join(f"{b:02x}" for b in chunk)
hex_col = f"{hex_col:<{bytes_per_row * 3 - 1}}"
asc_col = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
lines.append(f" {row:04x} {hex_col} |{asc_col}|")
return lines
def render_claude_export(
sessions: list[Session],
diffs: list[Optional[list[FrameDiff]]],
s3_path: Optional[Path] = None,
bw_path: Optional[Path] = None,
) -> str:
"""
Produce a single self-contained Markdown file suitable for pasting into
a Claude conversation for protocol reverse-engineering assistance.
Structure:
1. Context block — what this is, protocol background, field map
2. Capture summary — session count, frame counts, what changed
3. Per-diff section — one section per session pair that had changes:
a. Diff table (before/after bytes, known field labels)
b. Full hex dumps of ONLY the frames that changed
4. Full hex dumps of all frames in sessions with no prior comparison
(session 0 baseline)
"""
import datetime
lines: list[str] = []
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
s3_name = s3_path.name if s3_path else "raw_s3.bin"
bw_name = bw_path.name if bw_path else "raw_bw.bin"
# ── 1. Context block ──────────────────────────────────────────────────
lines += [
f"# Instantel MiniMate Plus — Protocol Capture Analysis",
f"Generated: {now} | Source: `{s3_name}` + `{bw_name}`",
"",
"## Protocol Background",
"",
"This file contains parsed RS-232 captures from an Instantel MiniMate Plus",
"seismograph communicating with Blastware PC software at 38400 baud 8N1.",
"",
"**Frame structure (de-stuffed payload):**",
"```",
" [0] CMD 0x10 = BW request, 0x00 = S3 response",
" [1] ? 0x00 (BW) or 0x10 (S3)",
" [2] SUB Command/response identifier (key field)",
" [3] OFFSET_HI Page offset high byte",
" [4] OFFSET_LO Page offset low byte",
" [5+] DATA Payload data section",
"```",
"",
"**Response SUB rule:** response_SUB = 0xFF - request_SUB (confirmed, no exceptions observed)",
"",
"**Known field map** (offsets from payload[0]):",
"```",
" SUB F7 (EVENT_INDEX_RESPONSE):",
" [80] 0x52 backlight_on_time uint8 seconds",
" [88] 0x58 power_save_timeout uint8 minutes",
" [89] 0x59 monitoring_lcd_cycle uint16BE 65500=disabled",
" SUB E5 page 0x082A (COMPLIANCE_CONFIG_RESPONSE):",
" [45] 0x2D record_time float32BE seconds (7s=40E00000, 13s=41500000)",
" SUB 82 (TRIGGER_CONFIG_WRITE, BW-side only):",
" [22] trigger_sample_width uint8 samples",
"```",
"",
"**Session boundary:** a compliance session ends when BW sends SUB 0x74 (WRITE_CONFIRM_C).",
"Sessions are numbered from 0. The diff compares consecutive complete sessions.",
"",
]
# ── 2. Capture summary ────────────────────────────────────────────────
lines += ["## Capture Summary", ""]
lines.append(f"Sessions found: {len(sessions)}")
for sess in sessions:
status = "complete" if sess.is_complete() else "partial/in-progress"
n_bw, n_s3 = len(sess.bw_frames), len(sess.s3_frames)
changed = len(diffs[sess.index] or []) if sess.index < len(diffs) else 0
changed_str = f" ({changed} SUBs changed vs prev)" if sess.index > 0 else " (baseline)"
lines.append(f" Session {sess.index} [{status}]: BW={n_bw} S3={n_s3} frames{changed_str}")
lines.append("")
# ── 3. Per-diff sections ──────────────────────────────────────────────
any_diffs = False
for sess in sessions:
sess_diffs = diffs[sess.index] if sess.index < len(diffs) else None
if sess_diffs is None or sess.index == 0:
continue
any_diffs = True
prev_idx = sess.index - 1
lines += [
f"---",
f"## Diff: Session {prev_idx} -> Session {sess.index}",
"",
]
if not sess_diffs:
lines.append("_No byte changes detected between these sessions._")
lines.append("")
continue
# Build index of changed frames for this session (and prev)
prev_sess = sessions[prev_idx] if prev_idx < len(sessions) else None
for fd in sess_diffs:
page_str = f" page 0x{fd.page_key:04X}" if fd.page_key != 0 else ""
lines += [
f"### SUB {fd.sub:02X}{fd.sub_name}{page_str}",
"",
]
# Diff table
known_count = sum(1 for bd in fd.diffs if bd.field_name)
unknown_count = sum(1 for bd in fd.diffs if not bd.field_name)
lines.append(
f"Changed bytes: **{len(fd.diffs)}** total "
f"({known_count} known fields, {unknown_count} unknown)"
)
lines.append("")
lines.append("| Offset | Hex | Dec | Session {0} | Session {1} | Field |".format(prev_idx, sess.index))
lines.append("|--------|-----|-----|" + "-" * 12 + "|" + "-" * 12 + "|-------|")
for bd in fd.diffs:
before_s = f"`{bd.before:02x}`" if bd.before >= 0 else "`--`"
after_s = f"`{bd.after:02x}`" if bd.after >= 0 else "`--`"
before_d = str(bd.before) if bd.before >= 0 else "--"
after_d = str(bd.after) if bd.after >= 0 else "--"
field = f"`{bd.field_name}`" if bd.field_name else "**UNKNOWN**"
lines.append(
f"| [{bd.payload_offset}] 0x{bd.payload_offset:04X} "
f"| {before_s}->{after_s} | {before_d}->{after_d} "
f"| {before_s} | {after_s} | {field} |"
)
lines.append("")
# Hex dumps of the changed frame in both sessions
def _find_af(target_sess: Session, sub: int, page_key: int) -> Optional[AnnotatedFrame]:
for af in target_sess.all_frames:
if af.header and af.header.sub == sub and af.header.page_key == page_key:
return af
return None
af_prev = _find_af(sessions[prev_idx], fd.sub, fd.page_key) if prev_sess else None
af_curr = _find_af(sess, fd.sub, fd.page_key)
lines.append("**Hex dumps (full de-stuffed payload):**")
lines.append("")
for label, af in [(f"Session {prev_idx} (before)", af_prev),
(f"Session {sess.index} (after)", af_curr)]:
if af is None:
lines.append(f"_{label}: frame not found_")
lines.append("")
continue
lines.append(f"_{label}_ — {len(af.frame.payload)} bytes:")
lines.append("```")
lines += _hex_block(af.frame.payload)
lines.append("```")
lines.append("")
if not any_diffs:
lines += [
"---",
"## Diffs",
"",
"_Only one session found — no diff available. "
"Run a second capture with changed settings to see what moves._",
"",
]
# ── 4. Baseline hex dumps (session 0, all frames) ─────────────────────
if sessions:
baseline = sessions[0]
lines += [
"---",
f"## Baseline — Session 0 (all frames)",
"",
"Full hex dump of every frame in the first session.",
"Use this to map field positions from known values.",
"",
]
for seq_i, af in enumerate(baseline.all_frames):
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
page_str = f" page 0x{af.header.page_key:04X}" if af.header and af.header.page_key != 0 else ""
chk_str = f" [{af.frame.checksum_type}]" if af.frame.checksum_valid else ""
lines.append(
f"### [{af.source}] #{seq_i} SUB {sub_hex}{af.sub_name}{page_str}{chk_str}"
)
lines.append(f"_{len(af.frame.payload)} bytes_")
lines.append("```")
lines += _hex_block(af.frame.payload)
lines.append("```")
lines.append("")
lines += [
"---",
"_End of analysis. To map an unknown field: change exactly one setting in Blastware,_",
"_capture again, run the analyzer, and look for the offset that moved._",
]
return "\n".join(lines) + "\n"
def write_claude_export(
sessions: list[Session],
diffs: list[Optional[list[FrameDiff]]],
outdir: Path,
s3_path: Optional[Path] = None,
bw_path: Optional[Path] = None,
) -> Path:
import datetime
outdir.mkdir(parents=True, exist_ok=True)
stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
out_path = outdir / f"claude_export_{stamp}.md"
out_path.write_text(
render_claude_export(sessions, diffs, s3_path, bw_path),
encoding="utf-8"
)
return out_path
# ──────────────────────────────────────────────────────────────────────────────
# Post-processing mode
# ──────────────────────────────────────────────────────────────────────────────
def run_postprocess(s3_path: Path, bw_path: Path, outdir: Path, export: bool = False) -> None:
print(f"s3_analyzer v{__version__}")
print(f" S3 file : {s3_path}")
print(f" BW file : {bw_path}")
print(f" Out dir : {outdir}")
print()
s3_frames, bw_frames = load_and_annotate(s3_path, bw_path)
print(f"Parsed: {len(s3_frames)} S3 frames, {len(bw_frames)} BW frames")
sessions = split_into_sessions(bw_frames, s3_frames)
print(f"Sessions: {len(sessions)}")
print()
all_diffs: list[Optional[list[FrameDiff]]] = [None]
prev_session: Optional[Session] = None
for sess in sessions:
sess_diffs: Optional[list[FrameDiff]] = None
prev_idx: Optional[int] = None
if prev_session is not None:
sess_diffs = diff_sessions(prev_session, sess)
prev_idx = prev_session.index
all_diffs.append(sess_diffs)
report = render_session_report(sess, sess_diffs, prev_idx)
out_path = write_report(sess, report, outdir)
n_diffs = len(sess_diffs) if sess_diffs else 0
print(f" Session {sess.index}: {len(sess.all_frames)} frames, {n_diffs} changed SUBs -> {out_path.name}")
prev_session = sess
if export:
export_path = write_claude_export(sessions, all_diffs, outdir, s3_path, bw_path)
print(f"\n Claude export -> {export_path.name}")
print()
print(f"Reports written to: {outdir}")
# ──────────────────────────────────────────────────────────────────────────────
# Live mode
# ──────────────────────────────────────────────────────────────────────────────
def live_loop(
s3_path: Path,
bw_path: Path,
outdir: Path,
poll_interval: float = 0.05,
) -> None:
"""
Tail both raw files continuously, re-parsing on new bytes.
Emits a session report as soon as BW SUB 0x74 is detected.
"""
print(f"s3_analyzer v{__version__} — LIVE MODE")
print(f" S3 file : {s3_path}")
print(f" BW file : {bw_path}")
print(f" Out dir : {outdir}")
print(f" Poll : {poll_interval*1000:.0f}ms")
print("Waiting for frames... (Ctrl+C to stop)")
print()
s3_buf = bytearray()
bw_buf = bytearray()
s3_pos = 0
bw_pos = 0
last_s3_count = 0
last_bw_count = 0
sessions: list[Session] = []
prev_complete_session: Optional[Session] = None
try:
while True:
# Read new bytes from both files
changed = False
if s3_path.exists():
with s3_path.open("rb") as fh:
fh.seek(s3_pos)
new_bytes = fh.read()
if new_bytes:
s3_buf.extend(new_bytes)
s3_pos += len(new_bytes)
changed = True
if bw_path.exists():
with bw_path.open("rb") as fh:
fh.seek(bw_pos)
new_bytes = fh.read()
if new_bytes:
bw_buf.extend(new_bytes)
bw_pos += len(new_bytes)
changed = True
if changed:
s3_frames_raw = parse_s3(bytes(s3_buf), trailer_len=0)
bw_frames_raw = parse_bw(bytes(bw_buf), trailer_len=0, validate_checksum=True)
s3_annotated = annotate_frames(s3_frames_raw, "S3")
bw_annotated = annotate_frames(bw_frames_raw, "BW")
new_s3 = len(s3_annotated) - last_s3_count
new_bw = len(bw_annotated) - last_bw_count
if new_s3 > 0 or new_bw > 0:
last_s3_count = len(s3_annotated)
last_bw_count = len(bw_annotated)
print(f"[+] S3:{len(s3_annotated)} BW:{len(bw_annotated)} frames", end="")
# Annotate newest BW frame
if bw_annotated:
latest_bw = bw_annotated[-1]
sub_str = f"SUB={latest_bw.header.sub:02X}" if latest_bw.header else "SUB=??"
print(f" latest BW {sub_str} {latest_bw.sub_name}", end="")
print()
# Check for session close
all_sessions = split_into_sessions(bw_annotated, s3_annotated)
complete_sessions = [s for s in all_sessions if s.is_complete()]
# Emit reports for newly completed sessions
for sess in complete_sessions[len(sessions):]:
diffs: Optional[list[FrameDiff]] = None
prev_idx: Optional[int] = None
if prev_complete_session is not None:
diffs = diff_sessions(prev_complete_session, sess)
prev_idx = prev_complete_session.index
report = render_session_report(sess, diffs, prev_idx)
out_path = write_report(sess, report, outdir)
n_diffs = len(diffs) if diffs else 0
print(f"\n [+] Session {sess.index} complete: {len(sess.all_frames)} frames, "
f"{n_diffs} changed SUBs -> {out_path.name}\n")
prev_complete_session = sess
sessions = complete_sessions
time.sleep(poll_interval)
except KeyboardInterrupt:
print("\nStopped.")
# Emit any in-progress (incomplete) session as a partial report
if s3_buf or bw_buf:
s3_frames_raw = parse_s3(bytes(s3_buf), trailer_len=0)
bw_frames_raw = parse_bw(bytes(bw_buf), trailer_len=0, validate_checksum=True)
s3_annotated = annotate_frames(s3_frames_raw, "S3")
bw_annotated = annotate_frames(bw_frames_raw, "BW")
all_sessions = split_into_sessions(bw_annotated, s3_annotated)
incomplete = [s for s in all_sessions if not s.is_complete()]
for sess in incomplete:
report = render_session_report(sess, diffs=None, prev_session_index=None)
out_path = write_report(sess, report, outdir)
print(f" Partial session {sess.index} written -> {out_path.name}")
# ──────────────────────────────────────────────────────────────────────────────
# CLI
# ──────────────────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(
description="s3_analyzer — Instantel MiniMate Plus live protocol analyzer"
)
ap.add_argument("--s3", type=Path, required=True, help="Path to raw_s3.bin (S3→BW raw capture)")
ap.add_argument("--bw", type=Path, required=True, help="Path to raw_bw.bin (BW→S3 raw capture)")
ap.add_argument("--live", action="store_true", help="Live mode: tail files as they grow")
ap.add_argument("--export", action="store_true", help="Also write a claude_export_<ts>.md file for Claude analysis")
ap.add_argument("--outdir", type=Path, default=None, help="Output directory for .report files (default: same as input)")
ap.add_argument("--poll", type=float, default=0.05, help="Live mode poll interval in seconds (default: 0.05)")
args = ap.parse_args()
outdir = args.outdir
if outdir is None:
outdir = args.s3.parent
if args.live:
live_loop(args.s3, args.bw, outdir, poll_interval=args.poll)
else:
if not args.s3.exists():
print(f"ERROR: S3 file not found: {args.s3}", file=sys.stderr)
sys.exit(1)
if not args.bw.exists():
print(f"ERROR: BW file not found: {args.bw}", file=sys.stderr)
sys.exit(1)
run_postprocess(args.s3, args.bw, outdir, export=args.export)
if __name__ == "__main__":
main()