FIX: s3_parser.py framing/protocol now working.
This commit is contained in:
@@ -100,9 +100,8 @@ CRC_FUNCS = {
|
|||||||
"CRC-16/X-25": crc16_x25,
|
"CRC-16/X-25": crc16_x25,
|
||||||
}
|
}
|
||||||
|
|
||||||
def parse_frames(blob: bytes, trailer_len: int, etx_mode: str = ETX_MODE_AUTO) -> List[Frame]:
|
def parse_frames(blob: bytes, trailer_len: int) -> List[Frame]:
|
||||||
frames: List[Frame] = []
|
frames: List[Frame] = []
|
||||||
n = len(blob)
|
|
||||||
|
|
||||||
STATE_IDLE = 0
|
STATE_IDLE = 0
|
||||||
STATE_IN_FRAME = 1
|
STATE_IN_FRAME = 1
|
||||||
@@ -112,104 +111,78 @@ def parse_frames(blob: bytes, trailer_len: int, etx_mode: str = ETX_MODE_AUTO) -
|
|||||||
payload_raw = bytearray()
|
payload_raw = bytearray()
|
||||||
start_offset = 0
|
start_offset = 0
|
||||||
idx = 0
|
idx = 0
|
||||||
|
|
||||||
i = 0
|
i = 0
|
||||||
print(">>> NEW STATE MACHINE ACTIVE <<<")
|
n = len(blob)
|
||||||
# Auto-detect ETX style once
|
|
||||||
if etx_mode == ETX_MODE_AUTO:
|
print(">>> CLEAN RAW STATE MACHINE ACTIVE <<<")
|
||||||
raw_etx = sum(1 for p in range(n - 1) if blob[p] == DLE and blob[p + 1] == ETX)
|
|
||||||
stx_count = sum(1 for p in range(n - 1) if blob[p] == DLE and blob[p + 1] == STX)
|
|
||||||
if raw_etx and raw_etx >= max(1, int(0.8 * stx_count)):
|
|
||||||
etx_mode = ETX_MODE_RAW
|
|
||||||
else:
|
|
||||||
etx_mode = ETX_MODE_STRIPPED
|
|
||||||
|
|
||||||
while i < n:
|
while i < n:
|
||||||
b = blob[i]
|
b = blob[i]
|
||||||
|
|
||||||
if state == STATE_IDLE:
|
if state == STATE_IDLE:
|
||||||
|
# look for DLE STX
|
||||||
if b == DLE and i + 1 < n and blob[i + 1] == STX:
|
if b == DLE and i + 1 < n and blob[i + 1] == STX:
|
||||||
|
print("FRAME START at", i)
|
||||||
start_offset = i
|
start_offset = i
|
||||||
payload_raw = bytearray()
|
payload_raw = bytearray()
|
||||||
i += 2
|
|
||||||
state = STATE_IN_FRAME
|
state = STATE_IN_FRAME
|
||||||
continue
|
i += 2
|
||||||
i += 1
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
elif state == STATE_IN_FRAME:
|
elif state == STATE_IN_FRAME:
|
||||||
|
if b == DLE:
|
||||||
# RAW mode: look for DLE+ETX
|
|
||||||
if etx_mode == ETX_MODE_RAW and b == DLE:
|
|
||||||
state = STATE_AFTER_DLE
|
state = STATE_AFTER_DLE
|
||||||
i += 1
|
i += 1
|
||||||
continue
|
continue
|
||||||
|
else:
|
||||||
# STRIPPED mode: bare ETX ends frame
|
|
||||||
if etx_mode == ETX_MODE_STRIPPED and b == ETX:
|
|
||||||
payload_end = i
|
|
||||||
i += 1
|
|
||||||
end_offset = i
|
|
||||||
|
|
||||||
payload = unescape_dle(bytes(payload_raw))
|
|
||||||
trailer = blob[i:i + trailer_len] if trailer_len > 0 else b""
|
|
||||||
i += trailer_len
|
|
||||||
|
|
||||||
frames.append(Frame(
|
|
||||||
index=idx,
|
|
||||||
start_offset=start_offset,
|
|
||||||
end_offset=end_offset,
|
|
||||||
payload_raw=bytes(payload_raw),
|
|
||||||
payload=payload,
|
|
||||||
trailer=trailer,
|
|
||||||
crc_match=None
|
|
||||||
))
|
|
||||||
idx += 1
|
|
||||||
state = STATE_IDLE
|
|
||||||
continue
|
|
||||||
|
|
||||||
payload_raw.append(b)
|
payload_raw.append(b)
|
||||||
i += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
elif state == STATE_AFTER_DLE:
|
elif state == STATE_AFTER_DLE:
|
||||||
|
if b == DLE:
|
||||||
if etx_mode == ETX_MODE_RAW and b == ETX:
|
# escaped literal DLE
|
||||||
# Proper DLE ETX terminator
|
|
||||||
end_offset = i + 1
|
|
||||||
i += 1
|
|
||||||
|
|
||||||
payload = unescape_dle(bytes(payload_raw))
|
|
||||||
trailer = blob[i:i + trailer_len] if trailer_len > 0 else b""
|
|
||||||
i += trailer_len
|
|
||||||
|
|
||||||
frames.append(Frame(
|
|
||||||
index=idx,
|
|
||||||
start_offset=start_offset,
|
|
||||||
end_offset=end_offset,
|
|
||||||
payload_raw=bytes(payload_raw),
|
|
||||||
payload=payload,
|
|
||||||
trailer=trailer,
|
|
||||||
crc_match=None
|
|
||||||
))
|
|
||||||
idx += 1
|
|
||||||
state = STATE_IDLE
|
|
||||||
continue
|
|
||||||
|
|
||||||
elif b == DLE:
|
|
||||||
# Escaped DLE (10 10)
|
|
||||||
payload_raw.append(DLE)
|
payload_raw.append(DLE)
|
||||||
state = STATE_IN_FRAME
|
state = STATE_IN_FRAME
|
||||||
i += 1
|
i += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
elif b == ETX:
|
||||||
|
print("FRAME END at", i)
|
||||||
|
# end of frame
|
||||||
|
end_offset = i + 1
|
||||||
|
|
||||||
|
# capture trailer
|
||||||
|
trailer_start = i + 1
|
||||||
|
trailer_end = trailer_start + trailer_len
|
||||||
|
trailer = blob[trailer_start:trailer_end]
|
||||||
|
|
||||||
|
frames.append(Frame(
|
||||||
|
index=idx,
|
||||||
|
start_offset=start_offset,
|
||||||
|
end_offset=end_offset,
|
||||||
|
payload_raw=bytes(payload_raw),
|
||||||
|
payload=bytes(payload_raw),
|
||||||
|
trailer=trailer,
|
||||||
|
crc_match=None
|
||||||
|
))
|
||||||
|
|
||||||
|
idx += 1
|
||||||
|
state = STATE_IDLE
|
||||||
|
i = trailer_end
|
||||||
|
continue
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# False alarm — previous 10 was data
|
# unexpected sequence: DLE followed by non-DLE/non-ETX
|
||||||
|
# treat both bytes as data (robust recovery)
|
||||||
payload_raw.append(DLE)
|
payload_raw.append(DLE)
|
||||||
payload_raw.append(b)
|
payload_raw.append(b)
|
||||||
state = STATE_IN_FRAME
|
state = STATE_IN_FRAME
|
||||||
i += 1
|
i += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
print("Frames parsed:", len(frames))
|
||||||
return frames
|
return frames
|
||||||
|
|
||||||
def best_crc_match(payload: bytes, trailer: bytes, little_endian: bool) -> Optional[str]:
|
def best_crc_match(payload: bytes, trailer: bytes, little_endian: bool) -> Optional[str]:
|
||||||
@@ -234,18 +207,12 @@ def main() -> None:
|
|||||||
ap.add_argument("--trailer-len", type=int, default=2, help="Bytes to capture after DLE ETX (default: 2)")
|
ap.add_argument("--trailer-len", type=int, default=2, help="Bytes to capture after DLE ETX (default: 2)")
|
||||||
ap.add_argument("--crc", action="store_true", help="Attempt CRC match using first 2 trailer bytes")
|
ap.add_argument("--crc", action="store_true", help="Attempt CRC match using first 2 trailer bytes")
|
||||||
ap.add_argument("--crc-endian", choices=["little", "big"], default="little", help="CRC endian when reading trailer")
|
ap.add_argument("--crc-endian", choices=["little", "big"], default="little", help="CRC endian when reading trailer")
|
||||||
ap.add_argument(
|
|
||||||
"--etx-mode",
|
|
||||||
choices=[ETX_MODE_AUTO, ETX_MODE_RAW, ETX_MODE_STRIPPED],
|
|
||||||
default=ETX_MODE_AUTO,
|
|
||||||
help="How to detect end-of-frame: 'raw' expects DLE+ETX, "
|
|
||||||
"'stripped' expects bare ETX (s3_bridge .bin), 'auto' picks based on presence of DLE+ETX."
|
|
||||||
)
|
|
||||||
ap.add_argument("--out", type=Path, default=None, help="Write JSONL output to this file")
|
ap.add_argument("--out", type=Path, default=None, help="Write JSONL output to this file")
|
||||||
args = ap.parse_args()
|
args = ap.parse_args()
|
||||||
|
|
||||||
blob = args.binfile.read_bytes()
|
blob = args.binfile.read_bytes()
|
||||||
frames = parse_frames(blob, trailer_len=args.trailer_len, etx_mode=args.etx_mode)
|
frames = parse_frames(blob, trailer_len=args.trailer_len)
|
||||||
|
|
||||||
little = (args.crc_endian == "little")
|
little = (args.crc_endian == "little")
|
||||||
if args.crc:
|
if args.crc:
|
||||||
|
|||||||
Reference in New Issue
Block a user