fix: parser no v0.2.2, uses proper frame handling, checksum for large frames still unknown.

This commit is contained in:
serversdwn
2026-03-03 17:54:33 -05:00
parent 22d4023ea0
commit a684d3e642
6 changed files with 362 additions and 152 deletions

View File

@@ -1,232 +1,364 @@
#!/usr/bin/env python3
"""
s3_parse.py — parse Instantel/Series3-like DLE-framed serial captures from a raw .bin logger.
s3_parser.py — Unified Instantel frame parser (S3 + BW).
Assumptions (based on your HxD patterns):
- Frames are delimited by DLE STX (0x10 0x02) ... DLE ETX (0x10 0x03)
- Inside payload, a literal 0x10 is escaped as 0x10 0x10
- After ETX, there may be a trailer (often CRC16, maybe + seq/flags)
Modes:
- s3: DLE STX (10 02) ... DLE ETX (10 03)
- bw: ACK+STX (41 02) ... ETX (03)
Stuffing:
- Literal 0x10 in payload is stuffed as 10 10 in both directions.
Checksums:
- BW frames appear to use more than one checksum style depending on message type.
Small frames often validate with 1-byte SUM8.
Large config/write frames appear to use a 2-byte CRC16 variant.
In BW mode we therefore validate candidate ETX positions using AUTO checksum matching:
- SUM8 (1 byte)
- CRC16 variants (2 bytes), both little/big endian
If any match, we accept the ETX as a real frame terminator.
"""
from __future__ import annotations
import argparse
import json
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple
from typing import Callable, Dict, List, Optional, Tuple
DLE = 0x10
STX = 0x02
ETX = 0x03
EOT = 0x04
ACK = 0x41
__version__ = "0.2.2"
# How the capture was produced:
# - Raw serial captures include DLE+ETX (`0x10 0x03`).
# - The s3_bridge `.bin` logger strips the DLE byte from ETX, so frames end with a
# bare `0x03`. See docs/instantel_protocol_reference.md §Appendix A.
ETX_MODE_AUTO = "auto"
ETX_MODE_RAW = "raw" # expect DLE+ETX
ETX_MODE_STRIPPED = "stripped" # expect bare ETX
@dataclass
class Frame:
index: int
start_offset: int
end_offset: int
payload_raw: bytes # as captured between STX..ETX, still escaped
payload: bytes # unescaped
trailer: bytes # bytes immediately after ETX (length chosen by user)
crc_match: Optional[str] # best-guess CRC type if verified, else None
payload_raw: bytes # de-stuffed bytes between STX..ETX (includes checksum bytes at end)
payload: bytes # payload without checksum bytes
trailer: bytes
checksum_valid: Optional[bool]
checksum_type: Optional[str]
checksum_hex: Optional[str]
def unescape_dle(payload_escaped: bytes) -> bytes:
"""Convert DLE-stuffing: 0x10 0x10 => 0x10 (literal DLE)."""
out = bytearray()
i = 0
n = len(payload_escaped)
while i < n:
b = payload_escaped[i]
if b == DLE:
if i + 1 < n and payload_escaped[i + 1] == DLE:
out.append(DLE)
i += 2
continue
# If we see a single DLE not followed by DLE inside payload,
# keep it as-is (conservative) — could be real data or malformed capture.
out.append(b)
i += 1
return bytes(out)
# ---- CRC helpers (we don't know which one yet, so we try a few) ----
# ------------------------
# Checksum / CRC helpers
# ------------------------
def checksum8_sum(data: bytes) -> int:
"""SUM8: sum(payload) & 0xFF"""
return sum(data) & 0xFF
def crc16_ibm(data: bytes) -> int:
# CRC-16/IBM (aka ARC) poly=0xA001 (reflected 0x8005), init=0x0000
# CRC-16/IBM (aka ARC) poly=0xA001, init=0x0000, refin/refout true
crc = 0x0000
for b in data:
crc ^= b
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
crc = (crc >> 1) ^ 0xA001 if (crc & 1) else (crc >> 1)
return crc & 0xFFFF
def crc16_ccitt_false(data: bytes) -> int:
# CRC-16/CCITT-FALSE poly=0x1021, init=0xFFFF, no reflection
# CRC-16/CCITT-FALSE poly=0x1021, init=0xFFFF, refin/refout false
crc = 0xFFFF
for b in data:
crc ^= (b << 8)
for _ in range(8):
if crc & 0x8000:
crc = ((crc << 1) ^ 0x1021) & 0xFFFF
else:
crc = (crc << 1) & 0xFFFF
crc = ((crc << 1) ^ 0x1021) & 0xFFFF if (crc & 0x8000) else (crc << 1) & 0xFFFF
return crc
def crc16_x25(data: bytes) -> int:
# CRC-16/X-25 poly=0x1021, init=0xFFFF, refin/refout true, xorout=0xFFFF
# CRC-16/X-25 poly=0x8408 (reflected), init=0xFFFF, xorout=0xFFFF
crc = 0xFFFF
for b in data:
crc ^= b
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0x8408
else:
crc >>= 1
crc = (crc >> 1) ^ 0x8408 if (crc & 1) else (crc >> 1)
return (crc ^ 0xFFFF) & 0xFFFF
CRC_FUNCS = {
"CRC-16/IBM": crc16_ibm,
"CRC-16/CCITT-FALSE": crc16_ccitt_false,
"CRC-16/X-25": crc16_x25,
CRC16_FUNCS: Dict[str, Callable[[bytes], int]] = {
"CRC16_IBM": crc16_ibm,
"CRC16_CCITT_FALSE": crc16_ccitt_false,
"CRC16_X25": crc16_x25,
}
def parse_frames(blob: bytes, trailer_len: int) -> List[Frame]:
def _try_validate_sum8(body: bytes) -> Optional[Tuple[bytes, bytes, str]]:
"""
body = payload + chk8
Returns (payload, chk_bytes, type) if valid, else None
"""
if len(body) < 1:
return None
payload = body[:-1]
chk = body[-1]
if checksum8_sum(payload) == chk:
return payload, bytes([chk]), "SUM8"
return None
def _try_validate_crc16(body: bytes) -> Optional[Tuple[bytes, bytes, str]]:
"""
body = payload + crc16(2 bytes)
Try multiple CRC16 types and both endian interpretations.
Returns (payload, chk_bytes, type) if valid, else None
"""
if len(body) < 2:
return None
payload = body[:-2]
chk_bytes = body[-2:]
given_le = int.from_bytes(chk_bytes, "little", signed=False)
given_be = int.from_bytes(chk_bytes, "big", signed=False)
for name, fn in CRC16_FUNCS.items():
calc = fn(payload)
if calc == given_le:
return payload, chk_bytes, f"{name}_LE"
if calc == given_be:
return payload, chk_bytes, f"{name}_BE"
return None
def validate_bw_body_auto(body: bytes) -> Optional[Tuple[bytes, bytes, str]]:
"""
Try to interpret the tail of body as a checksum in several ways.
Return (payload, checksum_bytes, checksum_type) if any match; else None.
"""
# Prefer SUM8 first (it fits small frames and is cheap)
hit = _try_validate_sum8(body)
if hit:
return hit
# Then CRC16 variants
hit = _try_validate_crc16(body)
if hit:
return hit
return None
# ------------------------
# S3 MODE (DLE framed)
# ------------------------
def parse_s3(blob: bytes, trailer_len: int) -> List[Frame]:
frames: List[Frame] = []
STATE_IDLE = 0
STATE_IN_FRAME = 1
STATE_AFTER_DLE = 2
IDLE = 0
IN_FRAME = 1
AFTER_DLE = 2
state = STATE_IDLE
payload_raw = bytearray()
state = IDLE
body = bytearray()
start_offset = 0
idx = 0
i = 0
n = len(blob)
print(">>> CLEAN RAW STATE MACHINE ACTIVE <<<")
while i < n:
b = blob[i]
if state == STATE_IDLE:
# look for DLE STX
if state == IDLE:
if b == DLE and i + 1 < n and blob[i + 1] == STX:
print("FRAME START at", i)
start_offset = i
payload_raw = bytearray()
state = STATE_IN_FRAME
body.clear()
state = IN_FRAME
i += 2
continue
elif state == STATE_IN_FRAME:
elif state == IN_FRAME:
if b == DLE:
state = STATE_AFTER_DLE
state = AFTER_DLE
i += 1
continue
else:
payload_raw.append(b)
body.append(b)
elif state == STATE_AFTER_DLE:
else: # AFTER_DLE
if b == DLE:
# escaped literal DLE
payload_raw.append(DLE)
state = STATE_IN_FRAME
body.append(DLE)
state = IN_FRAME
i += 1
continue
elif b == ETX:
print("FRAME END at", i)
# end of frame
if b == ETX:
end_offset = i + 1
# capture trailer
trailer_start = i + 1
trailer_end = trailer_start + trailer_len
trailer = blob[trailer_start:trailer_end]
# For S3 mode we don't assume checksum type here yet.
frames.append(Frame(
index=idx,
start_offset=start_offset,
end_offset=end_offset,
payload_raw=bytes(payload_raw),
payload=bytes(payload_raw),
payload_raw=bytes(body),
payload=bytes(body),
trailer=trailer,
crc_match=None
checksum_valid=None,
checksum_type=None,
checksum_hex=None
))
idx += 1
state = STATE_IDLE
state = IDLE
i = trailer_end
continue
else:
# unexpected sequence: DLE followed by non-DLE/non-ETX
# treat both bytes as data (robust recovery)
payload_raw.append(DLE)
payload_raw.append(b)
state = STATE_IN_FRAME
i += 1
continue
# Unexpected DLE + byte → treat as literal data
body.append(DLE)
body.append(b)
state = IN_FRAME
i += 1
continue
i += 1
print("Frames parsed:", len(frames))
return frames
def best_crc_match(payload: bytes, trailer: bytes, little_endian: bool) -> Optional[str]:
"""Try to interpret first 2 trailer bytes as CRC16 and see which algorithm matches."""
if len(trailer) < 2:
return None
given = int.from_bytes(trailer[:2], byteorder="little" if little_endian else "big", signed=False)
matches = []
for name, fn in CRC_FUNCS.items():
calc = fn(payload)
if calc == given:
matches.append(name)
if len(matches) == 1:
return matches[0]
if len(matches) > 1:
return " / ".join(matches)
return None
# ------------------------
# BW MODE (ACK+STX framed, bare ETX)
# ------------------------
def parse_bw(blob: bytes, trailer_len: int, validate_checksum: bool) -> List[Frame]:
frames: List[Frame] = []
IDLE = 0
IN_FRAME = 1
AFTER_DLE = 2
state = IDLE
body = bytearray()
start_offset = 0
idx = 0
i = 0
n = len(blob)
while i < n:
b = blob[i]
if state == IDLE:
# Frame start signature: ACK + STX
if b == ACK and i + 1 < n and blob[i + 1] == STX:
start_offset = i
body.clear()
state = IN_FRAME
i += 2
continue
i += 1
continue
if state == IN_FRAME:
if b == DLE:
state = AFTER_DLE
i += 1
continue
if b == ETX:
# Candidate end-of-frame.
# Accept ETX if the next bytes look like a real next-frame start (ACK+STX),
# or we're at EOF. This prevents chopping on in-payload 0x03.
next_is_start = (i + 2 < n and blob[i + 1] == ACK and blob[i + 2] == STX)
at_eof = (i == n - 1)
if not (next_is_start or at_eof):
# Not a real boundary -> payload byte
body.append(ETX)
i += 1
continue
trailer_start = i + 1
trailer_end = trailer_start + trailer_len
trailer = blob[trailer_start:trailer_end]
chk_valid = None
chk_type = None
chk_hex = None
payload = bytes(body)
if validate_checksum:
hit = validate_bw_body_auto(payload)
if hit:
payload, chk_bytes, chk_type = hit
chk_valid = True
chk_hex = chk_bytes.hex()
else:
chk_valid = False
frames.append(Frame(
index=idx,
start_offset=start_offset,
end_offset=i + 1,
payload_raw=bytes(body),
payload=payload,
trailer=trailer,
checksum_valid=chk_valid,
checksum_type=chk_type,
checksum_hex=chk_hex
))
idx += 1
state = IDLE
i = trailer_end
continue
# Normal byte
body.append(b)
i += 1
continue
# AFTER_DLE
if b == DLE:
body.append(DLE) # 10 10 => literal 10
else:
# Robust recovery: treat as literal DLE + byte
body.append(DLE)
body.append(b)
state = IN_FRAME
i += 1
return frames
# ------------------------
# CLI
# ------------------------
def main() -> None:
ap = argparse.ArgumentParser(description="Parse DLE-framed serial capture .bin into frames (and guess CRC).")
ap.add_argument("binfile", type=Path, help="Path to capture .bin file")
ap.add_argument("--trailer-len", type=int, default=2, help="Bytes to capture after DLE ETX (default: 2)")
ap.add_argument("--crc", action="store_true", help="Attempt CRC match using first 2 trailer bytes")
ap.add_argument("--crc-endian", choices=["little", "big"], default="little", help="CRC endian when reading trailer")
ap = argparse.ArgumentParser(description="Parse Instantel S3/BW binary captures.")
ap.add_argument("binfile", type=Path)
ap.add_argument("--mode", choices=["s3", "bw"], default="s3")
ap.add_argument("--trailer-len", type=int, default=0)
ap.add_argument("--no-checksum", action="store_true")
ap.add_argument("--out", type=Path, default=None)
ap.add_argument("--out", type=Path, default=None, help="Write JSONL output to this file")
args = ap.parse_args()
print(f"s3_parser v{__version__}")
blob = args.binfile.read_bytes()
frames = parse_frames(blob, trailer_len=args.trailer_len)
little = (args.crc_endian == "little")
if args.crc:
for f in frames:
f.crc_match = best_crc_match(f.payload, f.trailer, little_endian=little)
if args.mode == "s3":
frames = parse_s3(blob, args.trailer_len)
else:
frames = parse_bw(blob, args.trailer_len, validate_checksum=not args.no_checksum)
# Summary
total = len(frames)
crc_hits = sum(1 for f in frames if f.crc_match) if args.crc else 0
print(f"Frames found: {total}")
if args.crc:
print(f"CRC matches: {crc_hits} ({(crc_hits/total*100.0):.1f}%)" if total else "CRC matches: 0")
print("Frames found:", len(frames))
# Emit JSONL
def to_hex(b: bytes) -> str:
return b.hex()
@@ -239,7 +371,9 @@ def main() -> None:
"payload_len": len(f.payload),
"payload_hex": to_hex(f.payload),
"trailer_hex": to_hex(f.trailer),
"crc_match": f.crc_match,
"checksum_valid": f.checksum_valid,
"checksum_type": f.checksum_type,
"checksum_hex": f.checksum_hex,
}
lines.append(json.dumps(obj))
@@ -247,11 +381,11 @@ def main() -> None:
args.out.write_text("\n".join(lines) + "\n", encoding="utf-8")
print(f"Wrote: {args.out}")
else:
# Print first few only (avoid spewing your terminal)
for line in lines[:10]:
print(line)
if len(lines) > 10:
print(f"... ({len(lines) - 10} more)")
if __name__ == "__main__":
main()
main()