From 927aad6c1fb76dd78a82aaa8431fa662abb88ab7 Mon Sep 17 00:00:00 2001 From: serversdwn Date: Tue, 3 Mar 2026 02:35:26 -0500 Subject: [PATCH] FIX: s3_parser.py framing/protocol now working. --- parsers/s3_parser.py | 123 ++++++++++++++++--------------------------- 1 file changed, 45 insertions(+), 78 deletions(-) diff --git a/parsers/s3_parser.py b/parsers/s3_parser.py index fd1a6f3..b211681 100644 --- a/parsers/s3_parser.py +++ b/parsers/s3_parser.py @@ -100,9 +100,8 @@ CRC_FUNCS = { "CRC-16/X-25": crc16_x25, } -def parse_frames(blob: bytes, trailer_len: int, etx_mode: str = ETX_MODE_AUTO) -> List[Frame]: +def parse_frames(blob: bytes, trailer_len: int) -> List[Frame]: frames: List[Frame] = [] - n = len(blob) STATE_IDLE = 0 STATE_IN_FRAME = 1 @@ -112,104 +111,78 @@ def parse_frames(blob: bytes, trailer_len: int, etx_mode: str = ETX_MODE_AUTO) - payload_raw = bytearray() start_offset = 0 idx = 0 + i = 0 - print(">>> NEW STATE MACHINE ACTIVE <<<") - # Auto-detect ETX style once - if etx_mode == ETX_MODE_AUTO: - raw_etx = sum(1 for p in range(n - 1) if blob[p] == DLE and blob[p + 1] == ETX) - stx_count = sum(1 for p in range(n - 1) if blob[p] == DLE and blob[p + 1] == STX) - if raw_etx and raw_etx >= max(1, int(0.8 * stx_count)): - etx_mode = ETX_MODE_RAW - else: - etx_mode = ETX_MODE_STRIPPED + n = len(blob) + + print(">>> CLEAN RAW STATE MACHINE ACTIVE <<<") while i < n: b = blob[i] if state == STATE_IDLE: + # look for DLE STX if b == DLE and i + 1 < n and blob[i + 1] == STX: + print("FRAME START at", i) start_offset = i payload_raw = bytearray() - i += 2 state = STATE_IN_FRAME + i += 2 continue - i += 1 - continue elif state == STATE_IN_FRAME: - - # RAW mode: look for DLE+ETX - if etx_mode == ETX_MODE_RAW and b == DLE: + if b == DLE: state = STATE_AFTER_DLE i += 1 continue - - # STRIPPED mode: bare ETX ends frame - if etx_mode == ETX_MODE_STRIPPED and b == ETX: - payload_end = i - i += 1 - end_offset = i - - payload = unescape_dle(bytes(payload_raw)) - trailer = blob[i:i + trailer_len] if trailer_len > 0 else b"" - i += trailer_len - - frames.append(Frame( - index=idx, - start_offset=start_offset, - end_offset=end_offset, - payload_raw=bytes(payload_raw), - payload=payload, - trailer=trailer, - crc_match=None - )) - idx += 1 - state = STATE_IDLE - continue - - payload_raw.append(b) - i += 1 - continue + else: + payload_raw.append(b) elif state == STATE_AFTER_DLE: - - if etx_mode == ETX_MODE_RAW and b == ETX: - # Proper DLE ETX terminator - end_offset = i + 1 - i += 1 - - payload = unescape_dle(bytes(payload_raw)) - trailer = blob[i:i + trailer_len] if trailer_len > 0 else b"" - i += trailer_len - - frames.append(Frame( - index=idx, - start_offset=start_offset, - end_offset=end_offset, - payload_raw=bytes(payload_raw), - payload=payload, - trailer=trailer, - crc_match=None - )) - idx += 1 - state = STATE_IDLE - continue - - elif b == DLE: - # Escaped DLE (10 10) + if b == DLE: + # escaped literal DLE payload_raw.append(DLE) state = STATE_IN_FRAME i += 1 continue + elif b == ETX: + print("FRAME END at", i) + # end of frame + end_offset = i + 1 + + # capture trailer + trailer_start = i + 1 + trailer_end = trailer_start + trailer_len + trailer = blob[trailer_start:trailer_end] + + frames.append(Frame( + index=idx, + start_offset=start_offset, + end_offset=end_offset, + payload_raw=bytes(payload_raw), + payload=bytes(payload_raw), + trailer=trailer, + crc_match=None + )) + + idx += 1 + state = STATE_IDLE + i = trailer_end + continue + else: - # False alarm — previous 10 was data + # unexpected sequence: DLE followed by non-DLE/non-ETX + # treat both bytes as data (robust recovery) payload_raw.append(DLE) payload_raw.append(b) state = STATE_IN_FRAME i += 1 continue + i += 1 + + print("Frames parsed:", len(frames)) return frames def best_crc_match(payload: bytes, trailer: bytes, little_endian: bool) -> Optional[str]: @@ -234,18 +207,12 @@ def main() -> None: ap.add_argument("--trailer-len", type=int, default=2, help="Bytes to capture after DLE ETX (default: 2)") ap.add_argument("--crc", action="store_true", help="Attempt CRC match using first 2 trailer bytes") ap.add_argument("--crc-endian", choices=["little", "big"], default="little", help="CRC endian when reading trailer") - ap.add_argument( - "--etx-mode", - choices=[ETX_MODE_AUTO, ETX_MODE_RAW, ETX_MODE_STRIPPED], - default=ETX_MODE_AUTO, - help="How to detect end-of-frame: 'raw' expects DLE+ETX, " - "'stripped' expects bare ETX (s3_bridge .bin), 'auto' picks based on presence of DLE+ETX." - ) + ap.add_argument("--out", type=Path, default=None, help="Write JSONL output to this file") args = ap.parse_args() blob = args.binfile.read_bytes() - frames = parse_frames(blob, trailer_len=args.trailer_len, etx_mode=args.etx_mode) + frames = parse_frames(blob, trailer_len=args.trailer_len) little = (args.crc_endian == "little") if args.crc: