Files
seismo-relay/parsers/s3_parser.py

211 lines
6.6 KiB
Python

#!/usr/bin/env python3
"""
s3_parse.py — parse Instantel/Series3-like DLE-framed serial captures from a raw .bin logger.
Assumptions (based on your HxD patterns):
- Frames are delimited by DLE STX (0x10 0x02) ... DLE ETX (0x10 0x03)
- Inside payload, a literal 0x10 is escaped as 0x10 0x10
- After ETX, there may be a trailer (often CRC16, maybe + seq/flags)
"""
from __future__ import annotations
import argparse
import json
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple
DLE = 0x10
STX = 0x02
ETX = 0x03
EOT = 0x04
@dataclass
class Frame:
index: int
start_offset: int
end_offset: int
payload_raw: bytes # as captured between STX..ETX, still escaped
payload: bytes # unescaped
trailer: bytes # bytes immediately after ETX (length chosen by user)
crc_match: Optional[str] # best-guess CRC type if verified, else None
def unescape_dle(payload_escaped: bytes) -> bytes:
"""Convert DLE-stuffing: 0x10 0x10 => 0x10 (literal DLE)."""
out = bytearray()
i = 0
n = len(payload_escaped)
while i < n:
b = payload_escaped[i]
if b == DLE:
if i + 1 < n and payload_escaped[i + 1] == DLE:
out.append(DLE)
i += 2
continue
# If we see a single DLE not followed by DLE inside payload,
# keep it as-is (conservative) — could be real data or malformed capture.
out.append(b)
i += 1
return bytes(out)
# ---- CRC helpers (we don't know which one yet, so we try a few) ----
def crc16_ibm(data: bytes) -> int:
# CRC-16/IBM (aka ARC) poly=0xA001 (reflected 0x8005), init=0x0000
crc = 0x0000
for b in data:
crc ^= b
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc & 0xFFFF
def crc16_ccitt_false(data: bytes) -> int:
# CRC-16/CCITT-FALSE poly=0x1021, init=0xFFFF, no reflection
crc = 0xFFFF
for b in data:
crc ^= (b << 8)
for _ in range(8):
if crc & 0x8000:
crc = ((crc << 1) ^ 0x1021) & 0xFFFF
else:
crc = (crc << 1) & 0xFFFF
return crc
def crc16_x25(data: bytes) -> int:
# CRC-16/X-25 poly=0x1021, init=0xFFFF, refin/refout true, xorout=0xFFFF
crc = 0xFFFF
for b in data:
crc ^= b
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0x8408
else:
crc >>= 1
return (crc ^ 0xFFFF) & 0xFFFF
CRC_FUNCS = {
"CRC-16/IBM": crc16_ibm,
"CRC-16/CCITT-FALSE": crc16_ccitt_false,
"CRC-16/X-25": crc16_x25,
}
def parse_frames(blob: bytes, trailer_len: int) -> List[Frame]:
frames: List[Frame] = []
i = 0
idx = 0
n = len(blob)
def is_dle_seq(pos: int, second: int) -> bool:
return pos + 1 < n and blob[pos] == DLE and blob[pos + 1] == second
while i < n - 1:
if is_dle_seq(i, STX):
start = i
i += 2 # move past DLE STX
payload_start = i
# find DLE ETX
while i < n - 1 and not is_dle_seq(i, ETX):
i += 1
if i >= n - 1:
break # truncated
payload_end = i # bytes up to (but not including) DLE ETX
i += 2 # skip DLE ETX
end = i
payload_raw = blob[payload_start:payload_end]
payload = unescape_dle(payload_raw)
trailer = blob[i:i + trailer_len] if trailer_len > 0 else b""
i += trailer_len
frames.append(Frame(
index=idx,
start_offset=start,
end_offset=end,
payload_raw=payload_raw,
payload=payload,
trailer=trailer,
crc_match=None
))
idx += 1
continue
# optional: you can also detect DLE EOT boundaries if useful later
i += 1
return frames
def best_crc_match(payload: bytes, trailer: bytes, little_endian: bool) -> Optional[str]:
"""Try to interpret first 2 trailer bytes as CRC16 and see which algorithm matches."""
if len(trailer) < 2:
return None
given = int.from_bytes(trailer[:2], byteorder="little" if little_endian else "big", signed=False)
matches = []
for name, fn in CRC_FUNCS.items():
calc = fn(payload)
if calc == given:
matches.append(name)
if len(matches) == 1:
return matches[0]
if len(matches) > 1:
return " / ".join(matches)
return None
def main() -> None:
ap = argparse.ArgumentParser(description="Parse DLE-framed serial capture .bin into frames (and guess CRC).")
ap.add_argument("binfile", type=Path, help="Path to capture .bin file")
ap.add_argument("--trailer-len", type=int, default=2, help="Bytes to capture after DLE ETX (default: 2)")
ap.add_argument("--crc", action="store_true", help="Attempt CRC match using first 2 trailer bytes")
ap.add_argument("--crc-endian", choices=["little", "big"], default="little", help="CRC endian when reading trailer")
ap.add_argument("--out", type=Path, default=None, help="Write JSONL output to this file")
args = ap.parse_args()
blob = args.binfile.read_bytes()
frames = parse_frames(blob, trailer_len=args.trailer_len)
little = (args.crc_endian == "little")
if args.crc:
for f in frames:
f.crc_match = best_crc_match(f.payload, f.trailer, little_endian=little)
# Summary
total = len(frames)
crc_hits = sum(1 for f in frames if f.crc_match) if args.crc else 0
print(f"Frames found: {total}")
if args.crc:
print(f"CRC matches: {crc_hits} ({(crc_hits/total*100.0):.1f}%)" if total else "CRC matches: 0")
# Emit JSONL
def to_hex(b: bytes) -> str:
return b.hex()
lines = []
for f in frames:
obj = {
"index": f.index,
"start_offset": f.start_offset,
"end_offset": f.end_offset,
"payload_len": len(f.payload),
"payload_hex": to_hex(f.payload),
"trailer_hex": to_hex(f.trailer),
"crc_match": f.crc_match,
}
lines.append(json.dumps(obj))
if args.out:
args.out.write_text("\n".join(lines) + "\n", encoding="utf-8")
print(f"Wrote: {args.out}")
else:
# Print first few only (avoid spewing your terminal)
for line in lines[:10]:
print(line)
if len(lines) > 10:
print(f"... ({len(lines) - 10} more)")
if __name__ == "__main__":
main()