-_extract_a4_inner_frames(payload) — splits the A4 container payload into inner sub-frames using the ACK DLE STX delimiter pattern, returning (sub, page_key, data) tuples -_diff_a4_payloads(payload_a, payload_b) — matches inner frames by (sub, page_key), diffs data byte-by-byte (with existing noise masking), and reports added/removed inner frames as synthetic entries
1207 lines
51 KiB
Python
1207 lines
51 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
s3_analyzer.py — Live protocol analysis tool for Instantel MiniMate Plus RS-232.
|
||
|
||
Reads raw_s3.bin and raw_bw.bin (produced by s3_bridge.py), parses DLE frames,
|
||
groups into sessions, auto-diffs consecutive sessions, and annotates known fields.
|
||
|
||
Usage:
|
||
python s3_analyzer.py --s3 raw_s3.bin --bw raw_bw.bin [--live] [--outdir DIR]
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import struct
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
# Allow running from any working directory
|
||
sys.path.insert(0, str(Path(__file__).parent))
|
||
from s3_parser import Frame, parse_bw, parse_s3 # noqa: E402
|
||
|
||
__version__ = "0.1.0"
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
# Protocol constants
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
||
# SUB_TABLE: sub_byte → (name, direction, notes)
|
||
# direction: "BW→S3", "S3→BW", or "both"
|
||
SUB_TABLE: dict[int, tuple[str, str, str]] = {
|
||
# BW→S3 read requests
|
||
0x5B: ("POLL", "BW→S3", "Keepalive / device discovery"),
|
||
0x01: ("FULL_CONFIG_READ", "BW→S3", "~0x98 bytes; firmware, model, serial, channel config"),
|
||
0x06: ("CHANNEL_CONFIG_READ", "BW→S3", "0x24 bytes; channel configuration block"),
|
||
0x08: ("EVENT_INDEX_READ", "BW→S3", "0x58 bytes; event count and record pointers"),
|
||
0x0A: ("WAVEFORM_HEADER_READ", "BW→S3", "0x30 bytes/page; waveform header keyed by timestamp"),
|
||
0x0C: ("FULL_WAVEFORM_READ", "BW→S3", "0xD2 bytes/page × 2; project strings, PPV floats"),
|
||
0x1C: ("TRIGGER_CONFIG_READ", "BW→S3", "0x2C bytes; trigger settings block"),
|
||
0x09: ("UNKNOWN_READ_A", "BW→S3", "0xCA bytes response (F6); purpose unknown"),
|
||
0x1A: ("COMPLIANCE_CONFIG_READ", "BW→S3", "Large block (E5); trigger/alarm floats, unit strings"),
|
||
0x2E: ("UNKNOWN_READ_B", "BW→S3", "0x1A bytes response (D1); purpose unknown"),
|
||
# BW→S3 write commands
|
||
0x68: ("EVENT_INDEX_WRITE", "BW→S3", "Mirrors SUB 08 read; event count and timestamps"),
|
||
0x69: ("WAVEFORM_DATA_WRITE", "BW→S3", "0xCA bytes; mirrors SUB 09"),
|
||
0x71: ("COMPLIANCE_STRINGS_WRITE", "BW→S3", "Compliance config + all project string fields"),
|
||
0x72: ("WRITE_CONFIRM_A", "BW→S3", "Short frame; commit step after 0x71"),
|
||
0x73: ("WRITE_CONFIRM_B", "BW→S3", "Short frame"),
|
||
0x74: ("WRITE_CONFIRM_C", "BW→S3", "Short frame; final session-close confirm"),
|
||
0x82: ("TRIGGER_CONFIG_WRITE", "BW→S3", "0x1C bytes; trigger config block; mirrors SUB 1C"),
|
||
0x83: ("TRIGGER_WRITE_CONFIRM", "BW→S3", "Short frame; commit step after 0x82"),
|
||
# S3→BW responses
|
||
0xA4: ("POLL_RESPONSE", "S3→BW", "Response to SUB 5B poll"),
|
||
0xFE: ("FULL_CONFIG_RESPONSE", "S3→BW", "Response to SUB 01"),
|
||
0xF9: ("CHANNEL_CONFIG_RESPONSE", "S3→BW", "Response to SUB 06"),
|
||
0xF7: ("EVENT_INDEX_RESPONSE", "S3→BW", "Response to SUB 08; contains backlight/power-save"),
|
||
0xF5: ("WAVEFORM_HEADER_RESPONSE", "S3→BW", "Response to SUB 0A"),
|
||
0xF3: ("FULL_WAVEFORM_RESPONSE", "S3→BW", "Response to SUB 0C; project strings, PPV floats"),
|
||
0xE3: ("TRIGGER_CONFIG_RESPONSE", "S3→BW", "Response to SUB 1C; contains timestamps"),
|
||
0xF6: ("UNKNOWN_RESPONSE_A", "S3→BW", "Response to SUB 09; 0xCA bytes"),
|
||
0xE5: ("COMPLIANCE_CONFIG_RESPONSE","S3→BW", "Response to SUB 1A; record time in page 2"),
|
||
0xD1: ("UNKNOWN_RESPONSE_B", "S3→BW", "Response to SUB 2E; 0x1A bytes"),
|
||
0xEA: ("SERIAL_NUMBER_RESPONSE", "S3→BW", "0x0A bytes; serial number + firmware minor version"),
|
||
# Short ack responses to writes (0xFF - write_sub)
|
||
0x8E: ("WRITE_CONFIRM_RESPONSE_71", "S3→BW", "Ack for SUB 71 COMPLIANCE_STRINGS_WRITE"),
|
||
0x8D: ("WRITE_CONFIRM_RESPONSE_72", "S3→BW", "Ack for SUB 72 WRITE_CONFIRM_A"),
|
||
0x8C: ("WRITE_CONFIRM_RESPONSE_73", "S3→BW", "Ack for SUB 73 WRITE_CONFIRM_B"),
|
||
0x8B: ("WRITE_CONFIRM_RESPONSE_74", "S3→BW", "Ack for SUB 74 WRITE_CONFIRM_C"),
|
||
0x97: ("WRITE_CONFIRM_RESPONSE_68", "S3→BW", "Ack for SUB 68 EVENT_INDEX_WRITE"),
|
||
0x96: ("WRITE_CONFIRM_RESPONSE_69", "S3→BW", "Ack for SUB 69 WAVEFORM_DATA_WRITE"),
|
||
0x7D: ("WRITE_CONFIRM_RESPONSE_82", "S3→BW", "Ack for SUB 82 TRIGGER_CONFIG_WRITE"),
|
||
0x7C: ("WRITE_CONFIRM_RESPONSE_83", "S3→BW", "Ack for SUB 83 TRIGGER_WRITE_CONFIRM"),
|
||
}
|
||
|
||
# SUBs whose data-section bytes 0–5 are known timestamps (suppress in diffs)
|
||
NOISY_SUBS: set[int] = {0xE3, 0xF7, 0xF5}
|
||
|
||
# E5 page 2 key: the OFFSET_HI:OFFSET_LO that identifies the data page
|
||
# E5 page 1 (length probe) has offset 0x0000; page 2 has offset 0x082A
|
||
E5_PAGE2_KEY = 0x082A
|
||
|
||
# FieldEntry: (sub, page_key_or_none, payload_offset, field_name, type_hint, notes)
|
||
# payload_offset = offset from start of Frame.payload (not data section, not wire)
|
||
# Exception: for SUB 0x82, offset [22] is from full de-stuffed payload[0] per protocol ref.
|
||
@dataclass(frozen=True)
|
||
class FieldEntry:
|
||
sub: int
|
||
page_key: Optional[int] # None = any / all pages
|
||
payload_offset: int # offset from frame.payload[0]
|
||
name: str
|
||
type_hint: str
|
||
notes: str
|
||
|
||
|
||
FIELD_MAP: list[FieldEntry] = [
|
||
# F7 (EVENT_INDEX_RESPONSE) — data section starts at payload[5]
|
||
# Protocol ref: backlight at data+0x4B = payload[5+0x4B] = payload[80]
|
||
FieldEntry(0xF7, None, 5 + 0x4B, "backlight_on_time", "uint8", "seconds; 0=off"),
|
||
FieldEntry(0xF7, None, 5 + 0x53, "power_save_timeout", "uint8", "minutes; 0=disabled"),
|
||
FieldEntry(0xF7, None, 5 + 0x54, "monitoring_lcd_cycle", "uint16 BE","65500=disabled"),
|
||
# E5 page 2 (COMPLIANCE_CONFIG_RESPONSE) — record time at data+0x28
|
||
FieldEntry(0xE5, E5_PAGE2_KEY, 5 + 0x28, "record_time", "float32 BE", "seconds; 7s=40E00000, 13s=41500000"),
|
||
# SUB 0x82 (TRIGGER_CONFIG_WRITE) — BW→S3 write
|
||
# Protocol ref offset [22] is from the de-stuffed payload[0], confirmed from raw_bw.bin
|
||
FieldEntry(0x82, None, 22, "trigger_sample_width", "uint8", "samples; mode-gated, BW-side write only"),
|
||
]
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
# Data structures
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class FrameHeader:
|
||
cmd: int
|
||
sub: int
|
||
offset_hi: int
|
||
offset_lo: int
|
||
flags: int
|
||
|
||
@property
|
||
def page_key(self) -> int:
|
||
return (self.offset_hi << 8) | self.offset_lo
|
||
|
||
|
||
@dataclass
|
||
class AnnotatedFrame:
|
||
frame: Frame
|
||
source: str # "BW" or "S3"
|
||
header: Optional[FrameHeader] # None if payload < 7 bytes (malformed/short)
|
||
sub_name: str
|
||
session_idx: int = -1
|
||
|
||
|
||
@dataclass
|
||
class Session:
|
||
index: int
|
||
bw_frames: list[AnnotatedFrame]
|
||
s3_frames: list[AnnotatedFrame]
|
||
|
||
@property
|
||
def all_frames(self) -> list[AnnotatedFrame]:
|
||
"""Interleave BW/S3 in synchronous protocol order: BW[0], S3[0], BW[1], S3[1]..."""
|
||
result: list[AnnotatedFrame] = []
|
||
for i in range(max(len(self.bw_frames), len(self.s3_frames))):
|
||
if i < len(self.bw_frames):
|
||
result.append(self.bw_frames[i])
|
||
if i < len(self.s3_frames):
|
||
result.append(self.s3_frames[i])
|
||
return result
|
||
|
||
|
||
@dataclass
|
||
class ByteDiff:
|
||
payload_offset: int
|
||
before: int
|
||
after: int
|
||
field_name: Optional[str]
|
||
|
||
|
||
@dataclass
|
||
class FrameDiff:
|
||
sub: int
|
||
page_key: int
|
||
sub_name: str
|
||
diffs: list[ByteDiff]
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
# Parsing helpers
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
||
def extract_header(payload: bytes) -> Optional[FrameHeader]:
|
||
"""
|
||
Extract protocol header from de-stuffed payload.
|
||
|
||
After de-stuffing, the actual observed layout is 5 bytes:
|
||
[0] CMD -- 0x10 for BW requests, 0x00 for S3 responses
|
||
[1] ? -- 0x00 for BW, 0x10 for S3 (DLE/ADDR byte that survives de-stuffing)
|
||
[2] SUB -- the actual command/response identifier
|
||
[3] OFFSET_HI
|
||
[4] OFFSET_LO
|
||
Data section begins at payload[5].
|
||
|
||
Note: The protocol reference describes a 7-byte header with CMD/DLE/ADDR/FLAGS/SUB/...,
|
||
but DLE+ADDR (both 0x10 on wire) are de-stuffed into single bytes by parse_bw/parse_s3,
|
||
collapsing the observable header to 5 bytes.
|
||
"""
|
||
if len(payload) < 5:
|
||
return None
|
||
return FrameHeader(
|
||
cmd=payload[0],
|
||
sub=payload[2],
|
||
offset_hi=payload[3],
|
||
offset_lo=payload[4],
|
||
flags=payload[1],
|
||
)
|
||
|
||
|
||
def annotate_frame(frame: Frame, source: str) -> AnnotatedFrame:
|
||
header = extract_header(frame.payload)
|
||
if header is not None:
|
||
entry = SUB_TABLE.get(header.sub)
|
||
sub_name = entry[0] if entry else f"UNKNOWN_{header.sub:02X}"
|
||
else:
|
||
sub_name = "MALFORMED"
|
||
return AnnotatedFrame(frame=frame, source=source, header=header, sub_name=sub_name)
|
||
|
||
|
||
def annotate_frames(frames: list[Frame], source: str) -> list[AnnotatedFrame]:
|
||
return [annotate_frame(f, source) for f in frames]
|
||
|
||
|
||
def load_and_annotate(s3_path: Path, bw_path: Path) -> tuple[list[AnnotatedFrame], list[AnnotatedFrame]]:
|
||
"""Parse both raw files and return annotated frame lists."""
|
||
s3_blob = s3_path.read_bytes() if s3_path.exists() else b""
|
||
bw_blob = bw_path.read_bytes() if bw_path.exists() else b""
|
||
|
||
s3_frames = parse_s3(s3_blob, trailer_len=0)
|
||
bw_frames = parse_bw(bw_blob, trailer_len=0, validate_checksum=True)
|
||
|
||
return annotate_frames(s3_frames, "S3"), annotate_frames(bw_frames, "BW")
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
# Session detection
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
||
# BW SUB that marks the end of a compliance write session
|
||
SESSION_CLOSE_SUB = 0x74
|
||
|
||
def split_into_sessions(
|
||
bw_annotated: list[AnnotatedFrame],
|
||
s3_annotated: list[AnnotatedFrame],
|
||
) -> list[Session]:
|
||
"""
|
||
Split frames into sessions. A session ends on BW SUB 0x74 (WRITE_CONFIRM_C).
|
||
New session starts at the stream beginning and after each 0x74.
|
||
|
||
The protocol is synchronous: BW[i] request → S3[i] response. S3 frame i
|
||
belongs to the same session as BW frame i.
|
||
"""
|
||
if not bw_annotated and not s3_annotated:
|
||
return []
|
||
|
||
sessions: list[Session] = []
|
||
session_idx = 0
|
||
bw_start = 0
|
||
|
||
# Track where we are in S3 frames — they mirror BW frame count per session
|
||
s3_cursor = 0
|
||
|
||
i = 0
|
||
while i < len(bw_annotated):
|
||
frame = bw_annotated[i]
|
||
i += 1
|
||
|
||
is_close = (
|
||
frame.header is not None and frame.header.sub == SESSION_CLOSE_SUB
|
||
)
|
||
|
||
if is_close:
|
||
bw_slice = bw_annotated[bw_start:i]
|
||
# S3 frames in this session match BW frame count (synchronous protocol)
|
||
n_s3 = len(bw_slice)
|
||
s3_slice = s3_annotated[s3_cursor : s3_cursor + n_s3]
|
||
s3_cursor += n_s3
|
||
|
||
sess = Session(index=session_idx, bw_frames=bw_slice, s3_frames=s3_slice)
|
||
for f in sess.all_frames:
|
||
f.session_idx = session_idx
|
||
sessions.append(sess)
|
||
|
||
session_idx += 1
|
||
bw_start = i
|
||
|
||
# Remaining frames (in-progress / no closing 0x74 yet)
|
||
if bw_start < len(bw_annotated) or s3_cursor < len(s3_annotated):
|
||
bw_slice = bw_annotated[bw_start:]
|
||
n_s3 = len(bw_slice)
|
||
s3_slice = s3_annotated[s3_cursor : s3_cursor + n_s3]
|
||
# also grab any extra S3 frames beyond expected pairing
|
||
if s3_cursor + n_s3 < len(s3_annotated):
|
||
s3_slice = s3_annotated[s3_cursor:]
|
||
|
||
if bw_slice or s3_slice:
|
||
sess = Session(index=session_idx, bw_frames=bw_slice, s3_frames=s3_slice)
|
||
for f in sess.all_frames:
|
||
f.session_idx = session_idx
|
||
sessions.append(sess)
|
||
|
||
return sessions
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
# Mark-based session splitting (using structured .bin log)
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
||
# Structured .bin record types (from s3_bridge.py)
|
||
_REC_BW = 0x01
|
||
_REC_S3 = 0x02
|
||
_REC_MARK = 0x03
|
||
_REC_INFO = 0x04
|
||
|
||
|
||
@dataclass
|
||
class MarkSplit:
|
||
"""A session boundary derived from a MARK record in the structured .bin log."""
|
||
label: str
|
||
bw_byte_offset: int # byte position in the flat raw_bw stream at mark time
|
||
s3_byte_offset: int # byte position in the flat raw_s3 stream at mark time
|
||
|
||
|
||
def parse_structured_bin(bin_blob: bytes) -> list[MarkSplit]:
|
||
"""
|
||
Read a structured s3_session_*.bin file and return one MarkSplit per MARK
|
||
record, containing the cumulative BW and S3 byte counts at that point.
|
||
|
||
Record format: [type:1][ts_us:8 LE][len:4 LE][payload:len]
|
||
"""
|
||
marks: list[MarkSplit] = []
|
||
bw_bytes = 0
|
||
s3_bytes = 0
|
||
pos = 0
|
||
|
||
while pos + 13 <= len(bin_blob):
|
||
rec_type = bin_blob[pos]
|
||
# ts_us: 8 bytes LE (we don't need it, just skip)
|
||
length = struct.unpack_from("<I", bin_blob, pos + 9)[0]
|
||
payload_start = pos + 13
|
||
payload_end = payload_start + length
|
||
|
||
if payload_end > len(bin_blob):
|
||
break # truncated record
|
||
|
||
payload = bin_blob[payload_start:payload_end]
|
||
|
||
if rec_type == _REC_BW:
|
||
bw_bytes += length
|
||
elif rec_type == _REC_S3:
|
||
s3_bytes += length
|
||
elif rec_type == _REC_MARK:
|
||
label = payload.decode("utf-8", errors="replace")
|
||
# Skip auto-generated bridge lifecycle marks — only keep user marks
|
||
if label.startswith("SESSION START") or label.startswith("SESSION END"):
|
||
pass
|
||
else:
|
||
marks.append(MarkSplit(label=label,
|
||
bw_byte_offset=bw_bytes,
|
||
s3_byte_offset=s3_bytes))
|
||
|
||
pos = payload_end
|
||
|
||
return marks
|
||
|
||
|
||
def split_sessions_at_marks(
|
||
bw_blob: bytes,
|
||
s3_blob: bytes,
|
||
marks: list[MarkSplit],
|
||
) -> list[Session]:
|
||
"""
|
||
Split raw byte streams into sessions using mark byte offsets, then apply
|
||
the standard 0x74-based sub-splitting within each mark segment.
|
||
|
||
Each mark creates a new session boundary: session 0 = bytes before mark 0,
|
||
session 1 = bytes between mark 0 and mark 1, etc.
|
||
"""
|
||
if not marks:
|
||
# No marks — fall back to standard session detection
|
||
bw_frames = annotate_frames(parse_bw(bw_blob, trailer_len=0,
|
||
validate_checksum=True), "BW")
|
||
s3_frames = annotate_frames(parse_s3(s3_blob, trailer_len=0), "S3")
|
||
return split_into_sessions(bw_frames, s3_frames)
|
||
|
||
# Build slice boundaries: [0 .. mark0.bw, mark0.bw .. mark1.bw, ...]
|
||
bw_cuts = [m.bw_byte_offset for m in marks] + [len(bw_blob)]
|
||
s3_cuts = [m.s3_byte_offset for m in marks] + [len(s3_blob)]
|
||
|
||
all_sessions: list[Session] = []
|
||
session_offset = 0
|
||
bw_prev = s3_prev = 0
|
||
|
||
for seg_i, (bw_end, s3_end) in enumerate(zip(bw_cuts, s3_cuts)):
|
||
bw_chunk = bw_blob[bw_prev:bw_end]
|
||
s3_chunk = s3_blob[s3_prev:s3_end]
|
||
|
||
bw_frames = annotate_frames(parse_bw(bw_chunk, trailer_len=0,
|
||
validate_checksum=True), "BW")
|
||
s3_frames = annotate_frames(parse_s3(s3_chunk, trailer_len=0), "S3")
|
||
|
||
seg_sessions = split_into_sessions(bw_frames, s3_frames)
|
||
|
||
# Re-index sessions so they are globally unique
|
||
for sess in seg_sessions:
|
||
sess.index = session_offset
|
||
for f in sess.all_frames:
|
||
f.session_idx = session_offset
|
||
session_offset += 1
|
||
all_sessions.append(sess)
|
||
|
||
bw_prev = bw_end
|
||
s3_prev = s3_end
|
||
|
||
return all_sessions
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
# Diff engine
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
||
def _mask_noisy(sub: int, data: bytes) -> bytearray:
|
||
"""
|
||
Zero out known-noisy byte ranges before diffing.
|
||
For NOISY_SUBS: mask bytes 0–5 of the data section (timestamps).
|
||
"""
|
||
buf = bytearray(data)
|
||
if sub in NOISY_SUBS and len(buf) >= 6:
|
||
for k in range(6):
|
||
buf[k] = 0x00
|
||
return buf
|
||
|
||
|
||
HEADER_LEN = 5 # Observed de-stuffed header size: CMD + ? + SUB + OFFSET_HI + OFFSET_LO
|
||
|
||
|
||
def _get_data_section(af: AnnotatedFrame) -> bytes:
|
||
"""
|
||
Return the data section of the frame (after the 5-byte protocol header).
|
||
For S3 frames, payload still contains a trailing SUM8 byte — exclude it.
|
||
For BW frames, parse_bw with validate_checksum=True already stripped it.
|
||
"""
|
||
payload = af.frame.payload
|
||
if len(payload) < HEADER_LEN:
|
||
return b""
|
||
data = payload[HEADER_LEN:]
|
||
if af.source == "S3" and len(data) >= 1:
|
||
# SUM8 is still present at end of S3 frame payload
|
||
data = data[:-1]
|
||
return data
|
||
|
||
|
||
def lookup_field_name(sub: int, page_key: int, payload_offset: int) -> Optional[str]:
|
||
"""Return field name if the given payload offset matches a known field, else None."""
|
||
for entry in FIELD_MAP:
|
||
if entry.sub != sub:
|
||
continue
|
||
if entry.page_key is not None and entry.page_key != page_key:
|
||
continue
|
||
if entry.payload_offset == payload_offset:
|
||
return entry.name
|
||
return None
|
||
|
||
|
||
def _extract_a4_inner_frames(payload: bytes) -> list[tuple[int, int, bytes]]:
|
||
"""
|
||
Parse the inner sub-frame stream packed inside an A4 (POLL_RESPONSE) payload.
|
||
|
||
The payload is a sequence of inner frames, each starting with DLE STX (10 02)
|
||
and delimited by ACK (41) before the next DLE STX. The inner frame body
|
||
(after the 10 02 preamble) has the same 5-byte header layout as outer frames:
|
||
[0] 00
|
||
[1] 10
|
||
[2] SUB
|
||
[3] OFFSET_HI (page_key high byte)
|
||
[4] OFFSET_LO (page_key low byte)
|
||
[5+] data
|
||
|
||
Returns a list of (sub, page_key, data_bytes) — one entry per inner frame,
|
||
keeping ALL occurrences (not deduped), so the caller can decide how to match.
|
||
"""
|
||
DLE, STX, ACK = 0x10, 0x02, 0x41
|
||
results: list[tuple[int, int, bytes]] = []
|
||
|
||
# Collect start positions of each inner frame (offset of the DLE STX)
|
||
starts: list[int] = []
|
||
i = 0
|
||
# First frame may begin at offset 0 with DLE STX directly
|
||
if len(payload) >= 2 and payload[0] == DLE and payload[1] == STX:
|
||
starts.append(0)
|
||
i = 2
|
||
while i < len(payload) - 2:
|
||
if payload[i] == ACK and payload[i + 1] == DLE and payload[i + 2] == STX:
|
||
starts.append(i + 1) # point at the DLE
|
||
i += 3
|
||
else:
|
||
i += 1
|
||
|
||
for k, s in enumerate(starts):
|
||
# Body starts after DLE STX (2 bytes)
|
||
body_start = s + 2
|
||
body_end = starts[k + 1] - 1 if k + 1 < len(starts) else len(payload)
|
||
body = payload[body_start:body_end]
|
||
if len(body) < 5:
|
||
continue
|
||
# body[0]=0x00, body[1]=0x10, body[2]=SUB, body[3]=OFFSET_HI, body[4]=OFFSET_LO
|
||
sub = body[2]
|
||
page_key = (body[3] << 8) | body[4]
|
||
data = body[5:]
|
||
results.append((sub, page_key, data))
|
||
|
||
return results
|
||
|
||
|
||
def _diff_a4_payloads(payload_a: bytes, payload_b: bytes) -> list[ByteDiff]:
|
||
"""
|
||
Diff two A4 container payloads at the inner sub-frame level.
|
||
|
||
Inner frames are matched by (sub, page_key). For each pair of matching
|
||
inner frames whose data differs, the changed bytes are reported with
|
||
payload_offset encoded as: (inner_frame_index << 16) | byte_offset_in_data.
|
||
|
||
Inner frames present in one payload but not the other are reported as a
|
||
single synthetic ByteDiff entry with before/after = -1 / -2 respectively,
|
||
and field_name describing the missing inner SUB.
|
||
|
||
The high-16 / low-16 split in payload_offset lets the GUI render these
|
||
differently if desired, but they degrade gracefully in the existing renderer.
|
||
"""
|
||
frames_a = _extract_a4_inner_frames(payload_a)
|
||
frames_b = _extract_a4_inner_frames(payload_b)
|
||
|
||
# Build multimap: (sub, page_key) → list of data blobs, preserving order
|
||
def index(frames):
|
||
idx: dict[tuple[int, int], list[bytes]] = {}
|
||
for sub, pk, data in frames:
|
||
idx.setdefault((sub, pk), []).append(data)
|
||
return idx
|
||
|
||
idx_a = index(frames_a)
|
||
idx_b = index(frames_b)
|
||
|
||
all_keys = sorted(set(idx_a) | set(idx_b))
|
||
diffs: list[ByteDiff] = []
|
||
|
||
for sub, pk in all_keys:
|
||
list_a = idx_a.get((sub, pk), [])
|
||
list_b = idx_b.get((sub, pk), [])
|
||
|
||
# Pair up by position; extras are treated as added/removed
|
||
n = max(len(list_a), len(list_b))
|
||
for pos in range(n):
|
||
da = list_a[pos] if pos < len(list_a) else None
|
||
db = list_b[pos] if pos < len(list_b) else None
|
||
|
||
if da is None:
|
||
# Inner frame added in B
|
||
entry = SUB_TABLE.get(sub)
|
||
name = entry[0] if entry else f"UNKNOWN_{sub:02X}"
|
||
diffs.append(ByteDiff(
|
||
payload_offset=(sub << 16) | (pk & 0xFFFF),
|
||
before=-1,
|
||
after=-2,
|
||
field_name=f"[A4 inner] SUB {sub:02X} ({name}) pk={pk:04X} added",
|
||
))
|
||
continue
|
||
if db is None:
|
||
entry = SUB_TABLE.get(sub)
|
||
name = entry[0] if entry else f"UNKNOWN_{sub:02X}"
|
||
diffs.append(ByteDiff(
|
||
payload_offset=(sub << 16) | (pk & 0xFFFF),
|
||
before=-2,
|
||
after=-1,
|
||
field_name=f"[A4 inner] SUB {sub:02X} ({name}) pk={pk:04X} removed",
|
||
))
|
||
continue
|
||
|
||
# Both present — byte diff the data sections
|
||
da_m = _mask_noisy(sub, da)
|
||
db_m = _mask_noisy(sub, db)
|
||
if da_m == db_m:
|
||
continue
|
||
max_len = max(len(da_m), len(db_m))
|
||
for off in range(max_len):
|
||
ba = da_m[off] if off < len(da_m) else None
|
||
bb = db_m[off] if off < len(db_m) else None
|
||
if ba != bb:
|
||
field = lookup_field_name(sub, pk, off + HEADER_LEN)
|
||
diffs.append(ByteDiff(
|
||
payload_offset=(sub << 16) | (off & 0xFFFF),
|
||
before=ba if ba is not None else -1,
|
||
after=bb if bb is not None else -1,
|
||
field_name=field or f"[A4:{sub:02X} pk={pk:04X}] off={off}",
|
||
))
|
||
|
||
return diffs
|
||
|
||
|
||
def diff_sessions(sess_a: Session, sess_b: Session) -> list[FrameDiff]:
|
||
"""
|
||
Compare two sessions frame-by-frame, matched by (sub, page_key).
|
||
Returns a list of FrameDiff for SUBs where bytes changed.
|
||
"""
|
||
# Build lookup: (sub, page_key) → AnnotatedFrame for each session
|
||
def index_session(sess: Session) -> dict[tuple[int, int], AnnotatedFrame]:
|
||
idx: dict[tuple[int, int], AnnotatedFrame] = {}
|
||
for af in sess.all_frames:
|
||
if af.header is None:
|
||
continue
|
||
key = (af.header.sub, af.header.page_key)
|
||
# Keep first occurrence per key (or we could keep all — for now, first)
|
||
if key not in idx:
|
||
idx[key] = af
|
||
return idx
|
||
|
||
idx_a = index_session(sess_a)
|
||
idx_b = index_session(sess_b)
|
||
|
||
results: list[FrameDiff] = []
|
||
|
||
# Only compare SUBs present in both sessions
|
||
common_keys = set(idx_a.keys()) & set(idx_b.keys())
|
||
for key in sorted(common_keys):
|
||
sub, page_key = key
|
||
af_a = idx_a[key]
|
||
af_b = idx_b[key]
|
||
|
||
# A4 is a container frame — diff at the inner sub-frame level to avoid
|
||
# phase-shift noise when the number of embedded records differs.
|
||
if sub == 0xA4:
|
||
diffs = _diff_a4_payloads(af_a.frame.payload, af_b.frame.payload)
|
||
if diffs:
|
||
entry = SUB_TABLE.get(sub)
|
||
sub_name = entry[0] if entry else f"UNKNOWN_{sub:02X}"
|
||
results.append(FrameDiff(sub=sub, page_key=page_key, sub_name=sub_name, diffs=diffs))
|
||
continue
|
||
|
||
data_a = _mask_noisy(sub, _get_data_section(af_a))
|
||
data_b = _mask_noisy(sub, _get_data_section(af_b))
|
||
|
||
if data_a == data_b:
|
||
continue
|
||
|
||
# Compare byte by byte up to the shorter length
|
||
diffs: list[ByteDiff] = []
|
||
max_len = max(len(data_a), len(data_b))
|
||
for offset in range(max_len):
|
||
byte_a = data_a[offset] if offset < len(data_a) else None
|
||
byte_b = data_b[offset] if offset < len(data_b) else None
|
||
if byte_a != byte_b:
|
||
# payload_offset = data_section_offset + HEADER_LEN
|
||
payload_off = offset + HEADER_LEN
|
||
field = lookup_field_name(sub, page_key, payload_off)
|
||
diffs.append(ByteDiff(
|
||
payload_offset=payload_off,
|
||
before=byte_a if byte_a is not None else -1,
|
||
after=byte_b if byte_b is not None else -1,
|
||
field_name=field,
|
||
))
|
||
|
||
if diffs:
|
||
entry = SUB_TABLE.get(sub)
|
||
sub_name = entry[0] if entry else f"UNKNOWN_{sub:02X}"
|
||
results.append(FrameDiff(sub=sub, page_key=page_key, sub_name=sub_name, diffs=diffs))
|
||
|
||
return results
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
# Report rendering
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
||
def format_hex_dump(data: bytes, indent: str = " ") -> list[str]:
|
||
"""Compact 16-bytes-per-line hex dump. Returns list of lines."""
|
||
lines = []
|
||
for row_start in range(0, len(data), 16):
|
||
chunk = data[row_start:row_start + 16]
|
||
hex_part = " ".join(f"{b:02x}" for b in chunk)
|
||
lines.append(f"{indent}{row_start:04x}: {hex_part}")
|
||
return lines
|
||
|
||
|
||
def render_session_report(
|
||
session: Session,
|
||
diffs: Optional[list[FrameDiff]],
|
||
prev_session_index: Optional[int],
|
||
) -> str:
|
||
lines: list[str] = []
|
||
|
||
n_bw = len(session.bw_frames)
|
||
n_s3 = len(session.s3_frames)
|
||
total = n_bw + n_s3
|
||
is_complete = any(
|
||
af.header is not None and af.header.sub == SESSION_CLOSE_SUB
|
||
for af in session.bw_frames
|
||
)
|
||
status = "" if is_complete else " [IN PROGRESS]"
|
||
|
||
lines.append(f"{'='*72}")
|
||
lines.append(f"SESSION {session.index}{status}")
|
||
lines.append(f"{'='*72}")
|
||
lines.append(f"Frames: {total} (BW: {n_bw}, S3: {n_s3})")
|
||
if n_bw != n_s3:
|
||
lines.append(f" WARNING: BW/S3 frame count mismatch — protocol sync issue?")
|
||
lines.append("")
|
||
|
||
# ── Frame inventory ──────────────────────────────────────────────────────
|
||
lines.append("FRAME INVENTORY")
|
||
for seq_i, af in enumerate(session.all_frames):
|
||
if af.header is not None:
|
||
sub_hex = f"{af.header.sub:02X}"
|
||
page_str = f" (page {af.header.page_key:04X})" if af.header.page_key != 0 else ""
|
||
else:
|
||
sub_hex = "??"
|
||
page_str = ""
|
||
chk = ""
|
||
if af.frame.checksum_valid is False:
|
||
chk = " [BAD CHECKSUM]"
|
||
elif af.frame.checksum_valid is True:
|
||
chk = f" [{af.frame.checksum_type}]"
|
||
lines.append(
|
||
f" [{af.source}] #{seq_i:<3} SUB={sub_hex} {af.sub_name:<30}{page_str}"
|
||
f" len={len(af.frame.payload)}{chk}"
|
||
)
|
||
lines.append("")
|
||
|
||
# ── Hex dumps ────────────────────────────────────────────────────────────
|
||
lines.append("HEX DUMPS")
|
||
for seq_i, af in enumerate(session.all_frames):
|
||
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
|
||
lines.append(f" [{af.source}] #{seq_i} SUB={sub_hex} {af.sub_name}")
|
||
dump_lines = format_hex_dump(af.frame.payload, indent=" ")
|
||
if dump_lines:
|
||
lines.extend(dump_lines)
|
||
else:
|
||
lines.append(" (empty payload)")
|
||
lines.append("")
|
||
|
||
# ── Diff section ─────────────────────────────────────────────────────────
|
||
if diffs is not None:
|
||
if prev_session_index is not None:
|
||
lines.append(f"DIFF vs SESSION {prev_session_index}")
|
||
else:
|
||
lines.append("DIFF")
|
||
|
||
if not diffs:
|
||
lines.append(" (no changes)")
|
||
else:
|
||
for fd in diffs:
|
||
page_str = f" (page {fd.page_key:04X})" if fd.page_key != 0 else ""
|
||
lines.append(f" SUB {fd.sub:02X} ({fd.sub_name}){page_str}:")
|
||
for bd in fd.diffs:
|
||
field_str = f" [{bd.field_name}]" if bd.field_name else ""
|
||
before_str = f"{bd.before:02x}" if bd.before >= 0 else "--"
|
||
after_str = f"{bd.after:02x}" if bd.after >= 0 else "--"
|
||
lines.append(
|
||
f" offset [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: "
|
||
f"{before_str} -> {after_str}{field_str}"
|
||
)
|
||
lines.append("")
|
||
|
||
return "\n".join(lines) + "\n"
|
||
|
||
|
||
def write_report(session: Session, report_text: str, outdir: Path) -> Path:
|
||
outdir.mkdir(parents=True, exist_ok=True)
|
||
out_path = outdir / f"session_{session.index:03d}.report"
|
||
out_path.write_text(report_text, encoding="utf-8")
|
||
return out_path
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
# Claude export
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
||
def _hex_block(data: bytes, bytes_per_row: int = 16) -> list[str]:
|
||
"""Hex dump with offset + hex + ASCII columns."""
|
||
lines = []
|
||
for row in range(0, len(data), bytes_per_row):
|
||
chunk = data[row:row + bytes_per_row]
|
||
hex_col = " ".join(f"{b:02x}" for b in chunk)
|
||
hex_col = f"{hex_col:<{bytes_per_row * 3 - 1}}"
|
||
asc_col = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
|
||
lines.append(f" {row:04x} {hex_col} |{asc_col}|")
|
||
return lines
|
||
|
||
|
||
def render_claude_export(
|
||
sessions: list[Session],
|
||
diffs: list[Optional[list[FrameDiff]]],
|
||
s3_path: Optional[Path] = None,
|
||
bw_path: Optional[Path] = None,
|
||
) -> str:
|
||
"""
|
||
Produce a single self-contained Markdown file suitable for pasting into
|
||
a Claude conversation for protocol reverse-engineering assistance.
|
||
|
||
Structure:
|
||
1. Context block — what this is, protocol background, field map
|
||
2. Capture summary — session count, frame counts, what changed
|
||
3. Per-diff section — one section per session pair that had changes:
|
||
a. Diff table (before/after bytes, known field labels)
|
||
b. Full hex dumps of ONLY the frames that changed
|
||
4. Full hex dumps of all frames in sessions with no prior comparison
|
||
(session 0 baseline)
|
||
"""
|
||
import datetime
|
||
lines: list[str] = []
|
||
|
||
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
|
||
s3_name = s3_path.name if s3_path else "raw_s3.bin"
|
||
bw_name = bw_path.name if bw_path else "raw_bw.bin"
|
||
|
||
# ── 1. Context block ──────────────────────────────────────────────────
|
||
lines += [
|
||
f"# Instantel MiniMate Plus — Protocol Capture Analysis",
|
||
f"Generated: {now} | Source: `{s3_name}` + `{bw_name}`",
|
||
"",
|
||
"## Protocol Background",
|
||
"",
|
||
"This file contains parsed RS-232 captures from an Instantel MiniMate Plus",
|
||
"seismograph communicating with Blastware PC software at 38400 baud 8N1.",
|
||
"",
|
||
"**Frame structure (de-stuffed payload):**",
|
||
"```",
|
||
" [0] CMD 0x10 = BW request, 0x00 = S3 response",
|
||
" [1] ? 0x00 (BW) or 0x10 (S3)",
|
||
" [2] SUB Command/response identifier (key field)",
|
||
" [3] OFFSET_HI Page offset high byte",
|
||
" [4] OFFSET_LO Page offset low byte",
|
||
" [5+] DATA Payload data section",
|
||
"```",
|
||
"",
|
||
"**Response SUB rule:** response_SUB = 0xFF - request_SUB (confirmed, no exceptions observed)",
|
||
"",
|
||
"**Known field map** (offsets from payload[0]):",
|
||
"```",
|
||
" SUB F7 (EVENT_INDEX_RESPONSE):",
|
||
" [80] 0x52 backlight_on_time uint8 seconds",
|
||
" [88] 0x58 power_save_timeout uint8 minutes",
|
||
" [89] 0x59 monitoring_lcd_cycle uint16BE 65500=disabled",
|
||
" SUB E5 page 0x082A (COMPLIANCE_CONFIG_RESPONSE):",
|
||
" [45] 0x2D record_time float32BE seconds (7s=40E00000, 13s=41500000)",
|
||
" SUB 82 (TRIGGER_CONFIG_WRITE, BW-side only):",
|
||
" [22] trigger_sample_width uint8 samples",
|
||
"```",
|
||
"",
|
||
"**Session boundary:** a compliance session ends when BW sends SUB 0x74 (WRITE_CONFIRM_C).",
|
||
"Sessions are numbered from 0. The diff compares consecutive complete sessions.",
|
||
"",
|
||
]
|
||
|
||
# ── 2. Capture summary ────────────────────────────────────────────────
|
||
lines += ["## Capture Summary", ""]
|
||
lines.append(f"Sessions found: {len(sessions)}")
|
||
for sess in sessions:
|
||
is_complete = any(
|
||
af.header is not None and af.header.sub == SESSION_CLOSE_SUB
|
||
for af in sess.bw_frames
|
||
)
|
||
status = "complete" if is_complete else "partial/in-progress"
|
||
n_bw, n_s3 = len(sess.bw_frames), len(sess.s3_frames)
|
||
changed = len(diffs[sess.index] or []) if sess.index < len(diffs) else 0
|
||
changed_str = f" ({changed} SUBs changed vs prev)" if sess.index > 0 else " (baseline)"
|
||
lines.append(f" Session {sess.index} [{status}]: BW={n_bw} S3={n_s3} frames{changed_str}")
|
||
lines.append("")
|
||
|
||
# ── 3. Per-diff sections ──────────────────────────────────────────────
|
||
any_diffs = False
|
||
for sess in sessions:
|
||
sess_diffs = diffs[sess.index] if sess.index < len(diffs) else None
|
||
if sess_diffs is None or sess.index == 0:
|
||
continue
|
||
|
||
any_diffs = True
|
||
prev_idx = sess.index - 1
|
||
lines += [
|
||
f"---",
|
||
f"## Diff: Session {prev_idx} -> Session {sess.index}",
|
||
"",
|
||
]
|
||
|
||
if not sess_diffs:
|
||
lines.append("_No byte changes detected between these sessions._")
|
||
lines.append("")
|
||
continue
|
||
|
||
# Build index of changed frames for this session (and prev)
|
||
prev_sess = sessions[prev_idx] if prev_idx < len(sessions) else None
|
||
|
||
for fd in sess_diffs:
|
||
page_str = f" page 0x{fd.page_key:04X}" if fd.page_key != 0 else ""
|
||
lines += [
|
||
f"### SUB {fd.sub:02X} — {fd.sub_name}{page_str}",
|
||
"",
|
||
]
|
||
|
||
# Diff table
|
||
known_count = sum(1 for bd in fd.diffs if bd.field_name)
|
||
unknown_count = sum(1 for bd in fd.diffs if not bd.field_name)
|
||
lines.append(
|
||
f"Changed bytes: **{len(fd.diffs)}** total "
|
||
f"({known_count} known fields, {unknown_count} unknown)"
|
||
)
|
||
lines.append("")
|
||
lines.append("| Offset | Hex | Dec | Session {0} | Session {1} | Field |".format(prev_idx, sess.index))
|
||
lines.append("|--------|-----|-----|" + "-" * 12 + "|" + "-" * 12 + "|-------|")
|
||
for bd in fd.diffs:
|
||
before_s = f"`{bd.before:02x}`" if bd.before >= 0 else "`--`"
|
||
after_s = f"`{bd.after:02x}`" if bd.after >= 0 else "`--`"
|
||
before_d = str(bd.before) if bd.before >= 0 else "--"
|
||
after_d = str(bd.after) if bd.after >= 0 else "--"
|
||
field = f"`{bd.field_name}`" if bd.field_name else "**UNKNOWN**"
|
||
lines.append(
|
||
f"| [{bd.payload_offset}] 0x{bd.payload_offset:04X} "
|
||
f"| {before_s}->{after_s} | {before_d}->{after_d} "
|
||
f"| {before_s} | {after_s} | {field} |"
|
||
)
|
||
lines.append("")
|
||
|
||
# Hex dumps of the changed frame in both sessions
|
||
def _find_af(target_sess: Session, sub: int, page_key: int) -> Optional[AnnotatedFrame]:
|
||
for af in target_sess.all_frames:
|
||
if af.header and af.header.sub == sub and af.header.page_key == page_key:
|
||
return af
|
||
return None
|
||
|
||
af_prev = _find_af(sessions[prev_idx], fd.sub, fd.page_key) if prev_sess else None
|
||
af_curr = _find_af(sess, fd.sub, fd.page_key)
|
||
|
||
lines.append("**Hex dumps (full de-stuffed payload):**")
|
||
lines.append("")
|
||
|
||
for label, af in [(f"Session {prev_idx} (before)", af_prev),
|
||
(f"Session {sess.index} (after)", af_curr)]:
|
||
if af is None:
|
||
lines.append(f"_{label}: frame not found_")
|
||
lines.append("")
|
||
continue
|
||
lines.append(f"_{label}_ — {len(af.frame.payload)} bytes:")
|
||
lines.append("```")
|
||
lines += _hex_block(af.frame.payload)
|
||
lines.append("```")
|
||
lines.append("")
|
||
|
||
if not any_diffs:
|
||
lines += [
|
||
"---",
|
||
"## Diffs",
|
||
"",
|
||
"_Only one session found — no diff available. "
|
||
"Run a second capture with changed settings to see what moves._",
|
||
"",
|
||
]
|
||
|
||
# ── 4. Baseline hex dumps (session 0, all frames) ─────────────────────
|
||
if sessions:
|
||
baseline = sessions[0]
|
||
lines += [
|
||
"---",
|
||
f"## Baseline — Session 0 (all frames)",
|
||
"",
|
||
"Full hex dump of every frame in the first session.",
|
||
"Use this to map field positions from known values.",
|
||
"",
|
||
]
|
||
for seq_i, af in enumerate(baseline.all_frames):
|
||
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
|
||
page_str = f" page 0x{af.header.page_key:04X}" if af.header and af.header.page_key != 0 else ""
|
||
chk_str = f" [{af.frame.checksum_type}]" if af.frame.checksum_valid else ""
|
||
lines.append(
|
||
f"### [{af.source}] #{seq_i} SUB {sub_hex} — {af.sub_name}{page_str}{chk_str}"
|
||
)
|
||
lines.append(f"_{len(af.frame.payload)} bytes_")
|
||
lines.append("```")
|
||
lines += _hex_block(af.frame.payload)
|
||
lines.append("```")
|
||
lines.append("")
|
||
|
||
lines += [
|
||
"---",
|
||
"_End of analysis. To map an unknown field: change exactly one setting in Blastware,_",
|
||
"_capture again, run the analyzer, and look for the offset that moved._",
|
||
]
|
||
|
||
return "\n".join(lines) + "\n"
|
||
|
||
|
||
def write_claude_export(
|
||
sessions: list[Session],
|
||
diffs: list[Optional[list[FrameDiff]]],
|
||
outdir: Path,
|
||
s3_path: Optional[Path] = None,
|
||
bw_path: Optional[Path] = None,
|
||
) -> Path:
|
||
import datetime
|
||
outdir.mkdir(parents=True, exist_ok=True)
|
||
stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
out_path = outdir / f"claude_export_{stamp}.md"
|
||
out_path.write_text(
|
||
render_claude_export(sessions, diffs, s3_path, bw_path),
|
||
encoding="utf-8"
|
||
)
|
||
return out_path
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
# Post-processing mode
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
||
def run_postprocess(s3_path: Path, bw_path: Path, outdir: Path, export: bool = False) -> None:
|
||
print(f"s3_analyzer v{__version__}")
|
||
print(f" S3 file : {s3_path}")
|
||
print(f" BW file : {bw_path}")
|
||
print(f" Out dir : {outdir}")
|
||
print()
|
||
|
||
s3_frames, bw_frames = load_and_annotate(s3_path, bw_path)
|
||
print(f"Parsed: {len(s3_frames)} S3 frames, {len(bw_frames)} BW frames")
|
||
|
||
sessions = split_into_sessions(bw_frames, s3_frames)
|
||
print(f"Sessions: {len(sessions)}")
|
||
print()
|
||
|
||
all_diffs: list[Optional[list[FrameDiff]]] = [None]
|
||
prev_session: Optional[Session] = None
|
||
for sess in sessions:
|
||
sess_diffs: Optional[list[FrameDiff]] = None
|
||
prev_idx: Optional[int] = None
|
||
if prev_session is not None:
|
||
sess_diffs = diff_sessions(prev_session, sess)
|
||
prev_idx = prev_session.index
|
||
all_diffs.append(sess_diffs)
|
||
|
||
report = render_session_report(sess, sess_diffs, prev_idx)
|
||
out_path = write_report(sess, report, outdir)
|
||
n_diffs = len(sess_diffs) if sess_diffs else 0
|
||
print(f" Session {sess.index}: {len(sess.all_frames)} frames, {n_diffs} changed SUBs -> {out_path.name}")
|
||
|
||
prev_session = sess
|
||
|
||
if export:
|
||
export_path = write_claude_export(sessions, all_diffs, outdir, s3_path, bw_path)
|
||
print(f"\n Claude export -> {export_path.name}")
|
||
|
||
print()
|
||
print(f"Reports written to: {outdir}")
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
# Live mode
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
||
def live_loop(
|
||
s3_path: Path,
|
||
bw_path: Path,
|
||
outdir: Path,
|
||
poll_interval: float = 0.05,
|
||
) -> None:
|
||
"""
|
||
Tail both raw files continuously, re-parsing on new bytes.
|
||
Emits a session report as soon as BW SUB 0x74 is detected.
|
||
"""
|
||
print(f"s3_analyzer v{__version__} — LIVE MODE")
|
||
print(f" S3 file : {s3_path}")
|
||
print(f" BW file : {bw_path}")
|
||
print(f" Out dir : {outdir}")
|
||
print(f" Poll : {poll_interval*1000:.0f}ms")
|
||
print("Waiting for frames... (Ctrl+C to stop)")
|
||
print()
|
||
|
||
s3_buf = bytearray()
|
||
bw_buf = bytearray()
|
||
s3_pos = 0
|
||
bw_pos = 0
|
||
|
||
last_s3_count = 0
|
||
last_bw_count = 0
|
||
sessions: list[Session] = []
|
||
prev_complete_session: Optional[Session] = None
|
||
|
||
try:
|
||
while True:
|
||
# Read new bytes from both files
|
||
changed = False
|
||
|
||
if s3_path.exists():
|
||
with s3_path.open("rb") as fh:
|
||
fh.seek(s3_pos)
|
||
new_bytes = fh.read()
|
||
if new_bytes:
|
||
s3_buf.extend(new_bytes)
|
||
s3_pos += len(new_bytes)
|
||
changed = True
|
||
|
||
if bw_path.exists():
|
||
with bw_path.open("rb") as fh:
|
||
fh.seek(bw_pos)
|
||
new_bytes = fh.read()
|
||
if new_bytes:
|
||
bw_buf.extend(new_bytes)
|
||
bw_pos += len(new_bytes)
|
||
changed = True
|
||
|
||
if changed:
|
||
s3_frames_raw = parse_s3(bytes(s3_buf), trailer_len=0)
|
||
bw_frames_raw = parse_bw(bytes(bw_buf), trailer_len=0, validate_checksum=True)
|
||
|
||
s3_annotated = annotate_frames(s3_frames_raw, "S3")
|
||
bw_annotated = annotate_frames(bw_frames_raw, "BW")
|
||
|
||
new_s3 = len(s3_annotated) - last_s3_count
|
||
new_bw = len(bw_annotated) - last_bw_count
|
||
|
||
if new_s3 > 0 or new_bw > 0:
|
||
last_s3_count = len(s3_annotated)
|
||
last_bw_count = len(bw_annotated)
|
||
print(f"[+] S3:{len(s3_annotated)} BW:{len(bw_annotated)} frames", end="")
|
||
|
||
# Annotate newest BW frame
|
||
if bw_annotated:
|
||
latest_bw = bw_annotated[-1]
|
||
sub_str = f"SUB={latest_bw.header.sub:02X}" if latest_bw.header else "SUB=??"
|
||
print(f" latest BW {sub_str} {latest_bw.sub_name}", end="")
|
||
print()
|
||
|
||
# Check for session close
|
||
all_sessions = split_into_sessions(bw_annotated, s3_annotated)
|
||
# A complete session has the closing 0x74
|
||
complete_sessions = [
|
||
s for s in all_sessions
|
||
if any(
|
||
af.header is not None and af.header.sub == SESSION_CLOSE_SUB
|
||
for af in s.bw_frames
|
||
)
|
||
]
|
||
|
||
# Emit reports for newly completed sessions
|
||
for sess in complete_sessions[len(sessions):]:
|
||
diffs: Optional[list[FrameDiff]] = None
|
||
prev_idx: Optional[int] = None
|
||
if prev_complete_session is not None:
|
||
diffs = diff_sessions(prev_complete_session, sess)
|
||
prev_idx = prev_complete_session.index
|
||
|
||
report = render_session_report(sess, diffs, prev_idx)
|
||
out_path = write_report(sess, report, outdir)
|
||
n_diffs = len(diffs) if diffs else 0
|
||
print(f"\n [+] Session {sess.index} complete: {len(sess.all_frames)} frames, "
|
||
f"{n_diffs} changed SUBs -> {out_path.name}\n")
|
||
prev_complete_session = sess
|
||
|
||
sessions = complete_sessions
|
||
|
||
time.sleep(poll_interval)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\nStopped.")
|
||
|
||
# Emit any in-progress (incomplete) session as a partial report
|
||
if s3_buf or bw_buf:
|
||
s3_frames_raw = parse_s3(bytes(s3_buf), trailer_len=0)
|
||
bw_frames_raw = parse_bw(bytes(bw_buf), trailer_len=0, validate_checksum=True)
|
||
s3_annotated = annotate_frames(s3_frames_raw, "S3")
|
||
bw_annotated = annotate_frames(bw_frames_raw, "BW")
|
||
all_sessions = split_into_sessions(bw_annotated, s3_annotated)
|
||
incomplete = [
|
||
s for s in all_sessions
|
||
if not any(
|
||
af.header is not None and af.header.sub == SESSION_CLOSE_SUB
|
||
for af in s.bw_frames
|
||
)
|
||
]
|
||
for sess in incomplete:
|
||
report = render_session_report(sess, diffs=None, prev_session_index=None)
|
||
out_path = write_report(sess, report, outdir)
|
||
print(f" Partial session {sess.index} written -> {out_path.name}")
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
# CLI
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
||
def main() -> None:
|
||
ap = argparse.ArgumentParser(
|
||
description="s3_analyzer — Instantel MiniMate Plus live protocol analyzer"
|
||
)
|
||
ap.add_argument("--s3", type=Path, required=True, help="Path to raw_s3.bin (S3→BW raw capture)")
|
||
ap.add_argument("--bw", type=Path, required=True, help="Path to raw_bw.bin (BW→S3 raw capture)")
|
||
ap.add_argument("--live", action="store_true", help="Live mode: tail files as they grow")
|
||
ap.add_argument("--export", action="store_true", help="Also write a claude_export_<ts>.md file for Claude analysis")
|
||
ap.add_argument("--outdir", type=Path, default=None, help="Output directory for .report files (default: same as input)")
|
||
ap.add_argument("--poll", type=float, default=0.05, help="Live mode poll interval in seconds (default: 0.05)")
|
||
args = ap.parse_args()
|
||
|
||
outdir = args.outdir
|
||
if outdir is None:
|
||
outdir = args.s3.parent
|
||
|
||
if args.live:
|
||
live_loop(args.s3, args.bw, outdir, poll_interval=args.poll)
|
||
else:
|
||
if not args.s3.exists():
|
||
print(f"ERROR: S3 file not found: {args.s3}", file=sys.stderr)
|
||
sys.exit(1)
|
||
if not args.bw.exists():
|
||
print(f"ERROR: BW file not found: {args.bw}", file=sys.stderr)
|
||
sys.exit(1)
|
||
run_postprocess(args.s3, args.bw, outdir, export=args.export)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|