291 lines
9.3 KiB
Python
291 lines
9.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
s3_parse.py — parse Instantel/Series3-like DLE-framed serial captures from a raw .bin logger.
|
|
|
|
Assumptions (based on your HxD patterns):
|
|
- Frames are delimited by DLE STX (0x10 0x02) ... DLE ETX (0x10 0x03)
|
|
- Inside payload, a literal 0x10 is escaped as 0x10 0x10
|
|
- After ETX, there may be a trailer (often CRC16, maybe + seq/flags)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import argparse
|
|
import json
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import List, Optional, Tuple
|
|
|
|
DLE = 0x10
|
|
STX = 0x02
|
|
ETX = 0x03
|
|
EOT = 0x04
|
|
|
|
# How the capture was produced:
|
|
# - Raw serial captures include DLE+ETX (`0x10 0x03`).
|
|
# - The s3_bridge `.bin` logger strips the DLE byte from ETX, so frames end with a
|
|
# bare `0x03`. See docs/instantel_protocol_reference.md §Appendix A.
|
|
ETX_MODE_AUTO = "auto"
|
|
ETX_MODE_RAW = "raw" # expect DLE+ETX
|
|
ETX_MODE_STRIPPED = "stripped" # expect bare ETX
|
|
|
|
@dataclass
|
|
class Frame:
|
|
index: int
|
|
start_offset: int
|
|
end_offset: int
|
|
payload_raw: bytes # as captured between STX..ETX, still escaped
|
|
payload: bytes # unescaped
|
|
trailer: bytes # bytes immediately after ETX (length chosen by user)
|
|
crc_match: Optional[str] # best-guess CRC type if verified, else None
|
|
|
|
def unescape_dle(payload_escaped: bytes) -> bytes:
|
|
"""Convert DLE-stuffing: 0x10 0x10 => 0x10 (literal DLE)."""
|
|
out = bytearray()
|
|
i = 0
|
|
n = len(payload_escaped)
|
|
while i < n:
|
|
b = payload_escaped[i]
|
|
if b == DLE:
|
|
if i + 1 < n and payload_escaped[i + 1] == DLE:
|
|
out.append(DLE)
|
|
i += 2
|
|
continue
|
|
# If we see a single DLE not followed by DLE inside payload,
|
|
# keep it as-is (conservative) — could be real data or malformed capture.
|
|
out.append(b)
|
|
i += 1
|
|
return bytes(out)
|
|
|
|
# ---- CRC helpers (we don't know which one yet, so we try a few) ----
|
|
|
|
def crc16_ibm(data: bytes) -> int:
|
|
# CRC-16/IBM (aka ARC) poly=0xA001 (reflected 0x8005), init=0x0000
|
|
crc = 0x0000
|
|
for b in data:
|
|
crc ^= b
|
|
for _ in range(8):
|
|
if crc & 1:
|
|
crc = (crc >> 1) ^ 0xA001
|
|
else:
|
|
crc >>= 1
|
|
return crc & 0xFFFF
|
|
|
|
def crc16_ccitt_false(data: bytes) -> int:
|
|
# CRC-16/CCITT-FALSE poly=0x1021, init=0xFFFF, no reflection
|
|
crc = 0xFFFF
|
|
for b in data:
|
|
crc ^= (b << 8)
|
|
for _ in range(8):
|
|
if crc & 0x8000:
|
|
crc = ((crc << 1) ^ 0x1021) & 0xFFFF
|
|
else:
|
|
crc = (crc << 1) & 0xFFFF
|
|
return crc
|
|
|
|
def crc16_x25(data: bytes) -> int:
|
|
# CRC-16/X-25 poly=0x1021, init=0xFFFF, refin/refout true, xorout=0xFFFF
|
|
crc = 0xFFFF
|
|
for b in data:
|
|
crc ^= b
|
|
for _ in range(8):
|
|
if crc & 1:
|
|
crc = (crc >> 1) ^ 0x8408
|
|
else:
|
|
crc >>= 1
|
|
return (crc ^ 0xFFFF) & 0xFFFF
|
|
|
|
CRC_FUNCS = {
|
|
"CRC-16/IBM": crc16_ibm,
|
|
"CRC-16/CCITT-FALSE": crc16_ccitt_false,
|
|
"CRC-16/X-25": crc16_x25,
|
|
}
|
|
|
|
def parse_frames(blob: bytes, trailer_len: int, etx_mode: str = ETX_MODE_AUTO) -> List[Frame]:
|
|
frames: List[Frame] = []
|
|
n = len(blob)
|
|
|
|
STATE_IDLE = 0
|
|
STATE_IN_FRAME = 1
|
|
STATE_AFTER_DLE = 2
|
|
|
|
state = STATE_IDLE
|
|
payload_raw = bytearray()
|
|
start_offset = 0
|
|
idx = 0
|
|
i = 0
|
|
print(">>> NEW STATE MACHINE ACTIVE <<<")
|
|
# Auto-detect ETX style once
|
|
if etx_mode == ETX_MODE_AUTO:
|
|
raw_etx = sum(1 for p in range(n - 1) if blob[p] == DLE and blob[p + 1] == ETX)
|
|
stx_count = sum(1 for p in range(n - 1) if blob[p] == DLE and blob[p + 1] == STX)
|
|
if raw_etx and raw_etx >= max(1, int(0.8 * stx_count)):
|
|
etx_mode = ETX_MODE_RAW
|
|
else:
|
|
etx_mode = ETX_MODE_STRIPPED
|
|
|
|
while i < n:
|
|
b = blob[i]
|
|
|
|
if state == STATE_IDLE:
|
|
if b == DLE and i + 1 < n and blob[i + 1] == STX:
|
|
start_offset = i
|
|
payload_raw = bytearray()
|
|
i += 2
|
|
state = STATE_IN_FRAME
|
|
continue
|
|
i += 1
|
|
continue
|
|
|
|
elif state == STATE_IN_FRAME:
|
|
|
|
# RAW mode: look for DLE+ETX
|
|
if etx_mode == ETX_MODE_RAW and b == DLE:
|
|
state = STATE_AFTER_DLE
|
|
i += 1
|
|
continue
|
|
|
|
# STRIPPED mode: bare ETX ends frame
|
|
if etx_mode == ETX_MODE_STRIPPED and b == ETX:
|
|
payload_end = i
|
|
i += 1
|
|
end_offset = i
|
|
|
|
payload = unescape_dle(bytes(payload_raw))
|
|
trailer = blob[i:i + trailer_len] if trailer_len > 0 else b""
|
|
i += trailer_len
|
|
|
|
frames.append(Frame(
|
|
index=idx,
|
|
start_offset=start_offset,
|
|
end_offset=end_offset,
|
|
payload_raw=bytes(payload_raw),
|
|
payload=payload,
|
|
trailer=trailer,
|
|
crc_match=None
|
|
))
|
|
idx += 1
|
|
state = STATE_IDLE
|
|
continue
|
|
|
|
payload_raw.append(b)
|
|
i += 1
|
|
continue
|
|
|
|
elif state == STATE_AFTER_DLE:
|
|
|
|
if etx_mode == ETX_MODE_RAW and b == ETX:
|
|
# Proper DLE ETX terminator
|
|
end_offset = i + 1
|
|
i += 1
|
|
|
|
payload = unescape_dle(bytes(payload_raw))
|
|
trailer = blob[i:i + trailer_len] if trailer_len > 0 else b""
|
|
i += trailer_len
|
|
|
|
frames.append(Frame(
|
|
index=idx,
|
|
start_offset=start_offset,
|
|
end_offset=end_offset,
|
|
payload_raw=bytes(payload_raw),
|
|
payload=payload,
|
|
trailer=trailer,
|
|
crc_match=None
|
|
))
|
|
idx += 1
|
|
state = STATE_IDLE
|
|
continue
|
|
|
|
elif b == DLE:
|
|
# Escaped DLE (10 10)
|
|
payload_raw.append(DLE)
|
|
state = STATE_IN_FRAME
|
|
i += 1
|
|
continue
|
|
|
|
else:
|
|
# False alarm — previous 10 was data
|
|
payload_raw.append(DLE)
|
|
payload_raw.append(b)
|
|
state = STATE_IN_FRAME
|
|
i += 1
|
|
continue
|
|
|
|
return frames
|
|
|
|
def best_crc_match(payload: bytes, trailer: bytes, little_endian: bool) -> Optional[str]:
|
|
"""Try to interpret first 2 trailer bytes as CRC16 and see which algorithm matches."""
|
|
if len(trailer) < 2:
|
|
return None
|
|
given = int.from_bytes(trailer[:2], byteorder="little" if little_endian else "big", signed=False)
|
|
matches = []
|
|
for name, fn in CRC_FUNCS.items():
|
|
calc = fn(payload)
|
|
if calc == given:
|
|
matches.append(name)
|
|
if len(matches) == 1:
|
|
return matches[0]
|
|
if len(matches) > 1:
|
|
return " / ".join(matches)
|
|
return None
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description="Parse DLE-framed serial capture .bin into frames (and guess CRC).")
|
|
ap.add_argument("binfile", type=Path, help="Path to capture .bin file")
|
|
ap.add_argument("--trailer-len", type=int, default=2, help="Bytes to capture after DLE ETX (default: 2)")
|
|
ap.add_argument("--crc", action="store_true", help="Attempt CRC match using first 2 trailer bytes")
|
|
ap.add_argument("--crc-endian", choices=["little", "big"], default="little", help="CRC endian when reading trailer")
|
|
ap.add_argument(
|
|
"--etx-mode",
|
|
choices=[ETX_MODE_AUTO, ETX_MODE_RAW, ETX_MODE_STRIPPED],
|
|
default=ETX_MODE_AUTO,
|
|
help="How to detect end-of-frame: 'raw' expects DLE+ETX, "
|
|
"'stripped' expects bare ETX (s3_bridge .bin), 'auto' picks based on presence of DLE+ETX."
|
|
)
|
|
ap.add_argument("--out", type=Path, default=None, help="Write JSONL output to this file")
|
|
args = ap.parse_args()
|
|
|
|
blob = args.binfile.read_bytes()
|
|
frames = parse_frames(blob, trailer_len=args.trailer_len, etx_mode=args.etx_mode)
|
|
|
|
little = (args.crc_endian == "little")
|
|
if args.crc:
|
|
for f in frames:
|
|
f.crc_match = best_crc_match(f.payload, f.trailer, little_endian=little)
|
|
|
|
# Summary
|
|
total = len(frames)
|
|
crc_hits = sum(1 for f in frames if f.crc_match) if args.crc else 0
|
|
print(f"Frames found: {total}")
|
|
if args.crc:
|
|
print(f"CRC matches: {crc_hits} ({(crc_hits/total*100.0):.1f}%)" if total else "CRC matches: 0")
|
|
|
|
# Emit JSONL
|
|
def to_hex(b: bytes) -> str:
|
|
return b.hex()
|
|
|
|
lines = []
|
|
for f in frames:
|
|
obj = {
|
|
"index": f.index,
|
|
"start_offset": f.start_offset,
|
|
"end_offset": f.end_offset,
|
|
"payload_len": len(f.payload),
|
|
"payload_hex": to_hex(f.payload),
|
|
"trailer_hex": to_hex(f.trailer),
|
|
"crc_match": f.crc_match,
|
|
}
|
|
lines.append(json.dumps(obj))
|
|
|
|
if args.out:
|
|
args.out.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
print(f"Wrote: {args.out}")
|
|
else:
|
|
# Print first few only (avoid spewing your terminal)
|
|
for line in lines[:10]:
|
|
print(line)
|
|
if len(lines) > 10:
|
|
print(f"... ({len(lines) - 10} more)")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|