From 967a5b2dad97384c44f265cce44b180dd8eba3a7 Mon Sep 17 00:00:00 2001 From: serversdwn Date: Mon, 2 Mar 2026 18:21:30 -0500 Subject: [PATCH] fix: actually put the code in the parser file this time... --- parsers/s3_parser.py | 211 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 211 insertions(+) diff --git a/parsers/s3_parser.py b/parsers/s3_parser.py index e69de29..4716b08 100644 --- a/parsers/s3_parser.py +++ b/parsers/s3_parser.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +""" +s3_parse.py — parse Instantel/Series3-like DLE-framed serial captures from a raw .bin logger. + +Assumptions (based on your HxD patterns): +- Frames are delimited by DLE STX (0x10 0x02) ... DLE ETX (0x10 0x03) +- Inside payload, a literal 0x10 is escaped as 0x10 0x10 +- After ETX, there may be a trailer (often CRC16, maybe + seq/flags) +""" + +from __future__ import annotations +import argparse +import json +from dataclasses import dataclass +from pathlib import Path +from typing import List, Optional, Tuple + +DLE = 0x10 +STX = 0x02 +ETX = 0x03 +EOT = 0x04 + +@dataclass +class Frame: + index: int + start_offset: int + end_offset: int + payload_raw: bytes # as captured between STX..ETX, still escaped + payload: bytes # unescaped + trailer: bytes # bytes immediately after ETX (length chosen by user) + crc_match: Optional[str] # best-guess CRC type if verified, else None + +def unescape_dle(payload_escaped: bytes) -> bytes: + """Convert DLE-stuffing: 0x10 0x10 => 0x10 (literal DLE).""" + out = bytearray() + i = 0 + n = len(payload_escaped) + while i < n: + b = payload_escaped[i] + if b == DLE: + if i + 1 < n and payload_escaped[i + 1] == DLE: + out.append(DLE) + i += 2 + continue + # If we see a single DLE not followed by DLE inside payload, + # keep it as-is (conservative) — could be real data or malformed capture. + out.append(b) + i += 1 + return bytes(out) + +# ---- CRC helpers (we don't know which one yet, so we try a few) ---- + +def crc16_ibm(data: bytes) -> int: + # CRC-16/IBM (aka ARC) poly=0xA001 (reflected 0x8005), init=0x0000 + crc = 0x0000 + for b in data: + crc ^= b + for _ in range(8): + if crc & 1: + crc = (crc >> 1) ^ 0xA001 + else: + crc >>= 1 + return crc & 0xFFFF + +def crc16_ccitt_false(data: bytes) -> int: + # CRC-16/CCITT-FALSE poly=0x1021, init=0xFFFF, no reflection + crc = 0xFFFF + for b in data: + crc ^= (b << 8) + for _ in range(8): + if crc & 0x8000: + crc = ((crc << 1) ^ 0x1021) & 0xFFFF + else: + crc = (crc << 1) & 0xFFFF + return crc + +def crc16_x25(data: bytes) -> int: + # CRC-16/X-25 poly=0x1021, init=0xFFFF, refin/refout true, xorout=0xFFFF + crc = 0xFFFF + for b in data: + crc ^= b + for _ in range(8): + if crc & 1: + crc = (crc >> 1) ^ 0x8408 + else: + crc >>= 1 + return (crc ^ 0xFFFF) & 0xFFFF + +CRC_FUNCS = { + "CRC-16/IBM": crc16_ibm, + "CRC-16/CCITT-FALSE": crc16_ccitt_false, + "CRC-16/X-25": crc16_x25, +} + +def parse_frames(blob: bytes, trailer_len: int) -> List[Frame]: + frames: List[Frame] = [] + i = 0 + idx = 0 + n = len(blob) + + def is_dle_seq(pos: int, second: int) -> bool: + return pos + 1 < n and blob[pos] == DLE and blob[pos + 1] == second + + while i < n - 1: + if is_dle_seq(i, STX): + start = i + i += 2 # move past DLE STX + payload_start = i + + # find DLE ETX + while i < n - 1 and not is_dle_seq(i, ETX): + i += 1 + + if i >= n - 1: + break # truncated + payload_end = i # bytes up to (but not including) DLE ETX + i += 2 # skip DLE ETX + end = i + + payload_raw = blob[payload_start:payload_end] + payload = unescape_dle(payload_raw) + + trailer = blob[i:i + trailer_len] if trailer_len > 0 else b"" + i += trailer_len + + frames.append(Frame( + index=idx, + start_offset=start, + end_offset=end, + payload_raw=payload_raw, + payload=payload, + trailer=trailer, + crc_match=None + )) + idx += 1 + continue + + # optional: you can also detect DLE EOT boundaries if useful later + i += 1 + + return frames + +def best_crc_match(payload: bytes, trailer: bytes, little_endian: bool) -> Optional[str]: + """Try to interpret first 2 trailer bytes as CRC16 and see which algorithm matches.""" + if len(trailer) < 2: + return None + given = int.from_bytes(trailer[:2], byteorder="little" if little_endian else "big", signed=False) + matches = [] + for name, fn in CRC_FUNCS.items(): + calc = fn(payload) + if calc == given: + matches.append(name) + if len(matches) == 1: + return matches[0] + if len(matches) > 1: + return " / ".join(matches) + return None + +def main() -> None: + ap = argparse.ArgumentParser(description="Parse DLE-framed serial capture .bin into frames (and guess CRC).") + ap.add_argument("binfile", type=Path, help="Path to capture .bin file") + ap.add_argument("--trailer-len", type=int, default=2, help="Bytes to capture after DLE ETX (default: 2)") + ap.add_argument("--crc", action="store_true", help="Attempt CRC match using first 2 trailer bytes") + ap.add_argument("--crc-endian", choices=["little", "big"], default="little", help="CRC endian when reading trailer") + ap.add_argument("--out", type=Path, default=None, help="Write JSONL output to this file") + args = ap.parse_args() + + blob = args.binfile.read_bytes() + frames = parse_frames(blob, trailer_len=args.trailer_len) + + little = (args.crc_endian == "little") + if args.crc: + for f in frames: + f.crc_match = best_crc_match(f.payload, f.trailer, little_endian=little) + + # Summary + total = len(frames) + crc_hits = sum(1 for f in frames if f.crc_match) if args.crc else 0 + print(f"Frames found: {total}") + if args.crc: + print(f"CRC matches: {crc_hits} ({(crc_hits/total*100.0):.1f}%)" if total else "CRC matches: 0") + + # Emit JSONL + def to_hex(b: bytes) -> str: + return b.hex() + + lines = [] + for f in frames: + obj = { + "index": f.index, + "start_offset": f.start_offset, + "end_offset": f.end_offset, + "payload_len": len(f.payload), + "payload_hex": to_hex(f.payload), + "trailer_hex": to_hex(f.trailer), + "crc_match": f.crc_match, + } + lines.append(json.dumps(obj)) + + if args.out: + args.out.write_text("\n".join(lines) + "\n", encoding="utf-8") + print(f"Wrote: {args.out}") + else: + # Print first few only (avoid spewing your terminal) + for line in lines[:10]: + print(line) + if len(lines) > 10: + print(f"... ({len(lines) - 10} more)") + +if __name__ == "__main__": + main() \ No newline at end of file