386 lines
10 KiB
Python
386 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
s3_parser.py — Unified Instantel frame parser (S3 + BW).
|
|
|
|
Modes:
|
|
- s3: DLE STX (10 02) ... DLE ETX (10 03)
|
|
- bw: ACK+STX (41 02) ... ETX (03)
|
|
|
|
Stuffing:
|
|
- Literal 0x10 in payload is stuffed as 10 10 in both directions.
|
|
|
|
Checksums:
|
|
- BW frames appear to use more than one checksum style depending on message type.
|
|
Small frames often validate with 1-byte SUM8.
|
|
Large config/write frames appear to use a 2-byte CRC16 variant.
|
|
|
|
In BW mode we therefore validate candidate ETX positions using AUTO checksum matching:
|
|
- SUM8 (1 byte)
|
|
- CRC16 variants (2 bytes), both little/big endian
|
|
If any match, we accept the ETX as a real frame terminator.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Callable, Dict, List, Optional, Tuple
|
|
|
|
DLE = 0x10
|
|
STX = 0x02
|
|
ETX = 0x03
|
|
ACK = 0x41
|
|
|
|
__version__ = "0.2.2"
|
|
|
|
|
|
@dataclass
|
|
class Frame:
|
|
index: int
|
|
start_offset: int
|
|
end_offset: int
|
|
payload_raw: bytes # de-stuffed bytes between STX..ETX (includes checksum bytes at end)
|
|
payload: bytes # payload without checksum bytes
|
|
trailer: bytes
|
|
checksum_valid: Optional[bool]
|
|
checksum_type: Optional[str]
|
|
checksum_hex: Optional[str]
|
|
|
|
|
|
# ------------------------
|
|
# Checksum / CRC helpers
|
|
# ------------------------
|
|
|
|
def checksum8_sum(data: bytes) -> int:
|
|
"""SUM8: sum(payload) & 0xFF"""
|
|
return sum(data) & 0xFF
|
|
|
|
|
|
def crc16_ibm(data: bytes) -> int:
|
|
# CRC-16/IBM (aka ARC) poly=0xA001, init=0x0000, refin/refout true
|
|
crc = 0x0000
|
|
for b in data:
|
|
crc ^= b
|
|
for _ in range(8):
|
|
crc = (crc >> 1) ^ 0xA001 if (crc & 1) else (crc >> 1)
|
|
return crc & 0xFFFF
|
|
|
|
|
|
def crc16_ccitt_false(data: bytes) -> int:
|
|
# CRC-16/CCITT-FALSE poly=0x1021, init=0xFFFF, refin/refout false
|
|
crc = 0xFFFF
|
|
for b in data:
|
|
crc ^= (b << 8)
|
|
for _ in range(8):
|
|
crc = ((crc << 1) ^ 0x1021) & 0xFFFF if (crc & 0x8000) else (crc << 1) & 0xFFFF
|
|
return crc
|
|
|
|
|
|
def crc16_x25(data: bytes) -> int:
|
|
# CRC-16/X-25 poly=0x8408 (reflected), init=0xFFFF, xorout=0xFFFF
|
|
crc = 0xFFFF
|
|
for b in data:
|
|
crc ^= b
|
|
for _ in range(8):
|
|
crc = (crc >> 1) ^ 0x8408 if (crc & 1) else (crc >> 1)
|
|
return (crc ^ 0xFFFF) & 0xFFFF
|
|
|
|
|
|
CRC16_FUNCS: Dict[str, Callable[[bytes], int]] = {
|
|
"CRC16_IBM": crc16_ibm,
|
|
"CRC16_CCITT_FALSE": crc16_ccitt_false,
|
|
"CRC16_X25": crc16_x25,
|
|
}
|
|
|
|
|
|
def _try_validate_sum8(body: bytes) -> Optional[Tuple[bytes, bytes, str]]:
|
|
"""
|
|
body = payload + chk8
|
|
Returns (payload, chk_bytes, type) if valid, else None
|
|
"""
|
|
if len(body) < 1:
|
|
return None
|
|
payload = body[:-1]
|
|
chk = body[-1]
|
|
if checksum8_sum(payload) == chk:
|
|
return payload, bytes([chk]), "SUM8"
|
|
return None
|
|
|
|
|
|
def _try_validate_crc16(body: bytes) -> Optional[Tuple[bytes, bytes, str]]:
|
|
"""
|
|
body = payload + crc16(2 bytes)
|
|
Try multiple CRC16 types and both endian interpretations.
|
|
Returns (payload, chk_bytes, type) if valid, else None
|
|
"""
|
|
if len(body) < 2:
|
|
return None
|
|
payload = body[:-2]
|
|
chk_bytes = body[-2:]
|
|
|
|
given_le = int.from_bytes(chk_bytes, "little", signed=False)
|
|
given_be = int.from_bytes(chk_bytes, "big", signed=False)
|
|
|
|
for name, fn in CRC16_FUNCS.items():
|
|
calc = fn(payload)
|
|
if calc == given_le:
|
|
return payload, chk_bytes, f"{name}_LE"
|
|
if calc == given_be:
|
|
return payload, chk_bytes, f"{name}_BE"
|
|
return None
|
|
|
|
|
|
def validate_bw_body_auto(body: bytes) -> Optional[Tuple[bytes, bytes, str]]:
|
|
"""
|
|
Try to interpret the tail of body as a checksum in several ways.
|
|
Return (payload, checksum_bytes, checksum_type) if any match; else None.
|
|
"""
|
|
# Prefer SUM8 first (it fits small frames and is cheap)
|
|
hit = _try_validate_sum8(body)
|
|
if hit:
|
|
return hit
|
|
|
|
# Then CRC16 variants
|
|
hit = _try_validate_crc16(body)
|
|
if hit:
|
|
return hit
|
|
|
|
return None
|
|
|
|
|
|
# ------------------------
|
|
# S3 MODE (DLE framed)
|
|
# ------------------------
|
|
|
|
def parse_s3(blob: bytes, trailer_len: int) -> List[Frame]:
|
|
frames: List[Frame] = []
|
|
|
|
IDLE = 0
|
|
IN_FRAME = 1
|
|
AFTER_DLE = 2
|
|
|
|
state = IDLE
|
|
body = bytearray()
|
|
start_offset = 0
|
|
idx = 0
|
|
|
|
i = 0
|
|
n = len(blob)
|
|
|
|
while i < n:
|
|
b = blob[i]
|
|
|
|
if state == IDLE:
|
|
if b == DLE and i + 1 < n and blob[i + 1] == STX:
|
|
start_offset = i
|
|
body.clear()
|
|
state = IN_FRAME
|
|
i += 2
|
|
continue
|
|
|
|
elif state == IN_FRAME:
|
|
if b == DLE:
|
|
state = AFTER_DLE
|
|
i += 1
|
|
continue
|
|
body.append(b)
|
|
|
|
else: # AFTER_DLE
|
|
if b == DLE:
|
|
body.append(DLE)
|
|
state = IN_FRAME
|
|
i += 1
|
|
continue
|
|
|
|
if b == ETX:
|
|
end_offset = i + 1
|
|
trailer_start = i + 1
|
|
trailer_end = trailer_start + trailer_len
|
|
trailer = blob[trailer_start:trailer_end]
|
|
|
|
# For S3 mode we don't assume checksum type here yet.
|
|
frames.append(Frame(
|
|
index=idx,
|
|
start_offset=start_offset,
|
|
end_offset=end_offset,
|
|
payload_raw=bytes(body),
|
|
payload=bytes(body),
|
|
trailer=trailer,
|
|
checksum_valid=None,
|
|
checksum_type=None,
|
|
checksum_hex=None
|
|
))
|
|
|
|
idx += 1
|
|
state = IDLE
|
|
i = trailer_end
|
|
continue
|
|
|
|
# Unexpected DLE + byte → treat as literal data
|
|
body.append(DLE)
|
|
body.append(b)
|
|
state = IN_FRAME
|
|
i += 1
|
|
continue
|
|
|
|
i += 1
|
|
|
|
return frames
|
|
|
|
|
|
# ------------------------
|
|
# BW MODE (ACK+STX framed, bare ETX)
|
|
# ------------------------
|
|
|
|
def parse_bw(blob: bytes, trailer_len: int, validate_checksum: bool) -> List[Frame]:
|
|
frames: List[Frame] = []
|
|
|
|
IDLE = 0
|
|
IN_FRAME = 1
|
|
AFTER_DLE = 2
|
|
|
|
state = IDLE
|
|
body = bytearray()
|
|
start_offset = 0
|
|
idx = 0
|
|
|
|
i = 0
|
|
n = len(blob)
|
|
|
|
while i < n:
|
|
b = blob[i]
|
|
|
|
if state == IDLE:
|
|
# Frame start signature: ACK + STX
|
|
if b == ACK and i + 1 < n and blob[i + 1] == STX:
|
|
start_offset = i
|
|
body.clear()
|
|
state = IN_FRAME
|
|
i += 2
|
|
continue
|
|
i += 1
|
|
continue
|
|
|
|
if state == IN_FRAME:
|
|
if b == DLE:
|
|
state = AFTER_DLE
|
|
i += 1
|
|
continue
|
|
|
|
if b == ETX:
|
|
# Candidate end-of-frame.
|
|
# Accept ETX if the next bytes look like a real next-frame start (ACK+STX),
|
|
# or we're at EOF. This prevents chopping on in-payload 0x03.
|
|
next_is_start = (i + 2 < n and blob[i + 1] == ACK and blob[i + 2] == STX)
|
|
at_eof = (i == n - 1)
|
|
|
|
if not (next_is_start or at_eof):
|
|
# Not a real boundary -> payload byte
|
|
body.append(ETX)
|
|
i += 1
|
|
continue
|
|
|
|
trailer_start = i + 1
|
|
trailer_end = trailer_start + trailer_len
|
|
trailer = blob[trailer_start:trailer_end]
|
|
|
|
chk_valid = None
|
|
chk_type = None
|
|
chk_hex = None
|
|
payload = bytes(body)
|
|
|
|
if validate_checksum:
|
|
hit = validate_bw_body_auto(payload)
|
|
if hit:
|
|
payload, chk_bytes, chk_type = hit
|
|
chk_valid = True
|
|
chk_hex = chk_bytes.hex()
|
|
else:
|
|
chk_valid = False
|
|
|
|
frames.append(Frame(
|
|
index=idx,
|
|
start_offset=start_offset,
|
|
end_offset=i + 1,
|
|
payload_raw=bytes(body),
|
|
payload=payload,
|
|
trailer=trailer,
|
|
checksum_valid=chk_valid,
|
|
checksum_type=chk_type,
|
|
checksum_hex=chk_hex
|
|
))
|
|
idx += 1
|
|
state = IDLE
|
|
i = trailer_end
|
|
continue
|
|
|
|
# Normal byte
|
|
body.append(b)
|
|
i += 1
|
|
continue
|
|
|
|
# AFTER_DLE: DLE XX => literal XX for any XX (full DLE stuffing)
|
|
body.append(b)
|
|
state = IN_FRAME
|
|
i += 1
|
|
|
|
return frames
|
|
|
|
|
|
# ------------------------
|
|
# CLI
|
|
# ------------------------
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description="Parse Instantel S3/BW binary captures.")
|
|
ap.add_argument("binfile", type=Path)
|
|
ap.add_argument("--mode", choices=["s3", "bw"], default="s3")
|
|
ap.add_argument("--trailer-len", type=int, default=0)
|
|
ap.add_argument("--no-checksum", action="store_true")
|
|
ap.add_argument("--out", type=Path, default=None)
|
|
|
|
args = ap.parse_args()
|
|
|
|
print(f"s3_parser v{__version__}")
|
|
|
|
blob = args.binfile.read_bytes()
|
|
|
|
if args.mode == "s3":
|
|
frames = parse_s3(blob, args.trailer_len)
|
|
else:
|
|
frames = parse_bw(blob, args.trailer_len, validate_checksum=not args.no_checksum)
|
|
|
|
print("Frames found:", len(frames))
|
|
|
|
def to_hex(b: bytes) -> str:
|
|
return b.hex()
|
|
|
|
lines = []
|
|
for f in frames:
|
|
obj = {
|
|
"index": f.index,
|
|
"start_offset": f.start_offset,
|
|
"end_offset": f.end_offset,
|
|
"payload_len": len(f.payload),
|
|
"payload_hex": to_hex(f.payload),
|
|
"trailer_hex": to_hex(f.trailer),
|
|
"checksum_valid": f.checksum_valid,
|
|
"checksum_type": f.checksum_type,
|
|
"checksum_hex": f.checksum_hex,
|
|
}
|
|
lines.append(json.dumps(obj))
|
|
|
|
if args.out:
|
|
args.out.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
print(f"Wrote: {args.out}")
|
|
else:
|
|
for line in lines[:10]:
|
|
print(line)
|
|
if len(lines) > 10:
|
|
print(f"... ({len(lines) - 10} more)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |