fix: parser frame logic now tracks DLE state.

This commit is contained in:
serversdwn
2026-03-03 00:30:03 -05:00
parent 8ca40d52a4
commit 50be6410fe

View File

@@ -102,68 +102,113 @@ CRC_FUNCS = {
def parse_frames(blob: bytes, trailer_len: int, etx_mode: str = ETX_MODE_AUTO) -> List[Frame]: def parse_frames(blob: bytes, trailer_len: int, etx_mode: str = ETX_MODE_AUTO) -> List[Frame]:
frames: List[Frame] = [] frames: List[Frame] = []
i = 0
idx = 0
n = len(blob) n = len(blob)
def is_dle_seq(pos: int, second: int) -> bool: STATE_IDLE = 0
return pos + 1 < n and blob[pos] == DLE and blob[pos + 1] == second STATE_IN_FRAME = 1
STATE_AFTER_DLE = 2
# Auto-detect whether ETX is bare (logger-stripped) or DLE+ETX (raw wire). state = STATE_IDLE
payload_raw = bytearray()
start_offset = 0
idx = 0
i = 0
# Auto-detect ETX style once
if etx_mode == ETX_MODE_AUTO: if etx_mode == ETX_MODE_AUTO:
raw_etx = sum(1 for p in range(n - 1) if is_dle_seq(p, ETX)) raw_etx = sum(1 for p in range(n - 1) if blob[p] == DLE and blob[p + 1] == ETX)
stx_count = sum(1 for p in range(n - 1) if is_dle_seq(p, STX)) stx_count = sum(1 for p in range(n - 1) if blob[p] == DLE and blob[p + 1] == STX)
# Heuristic: the logger-stripped .bin files have plenty of STX but
# almost no DLE+ETX. If ETX count is far below STX count, assume stripped.
if raw_etx and raw_etx >= max(1, int(0.8 * stx_count)): if raw_etx and raw_etx >= max(1, int(0.8 * stx_count)):
etx_mode = ETX_MODE_RAW etx_mode = ETX_MODE_RAW
else: else:
etx_mode = ETX_MODE_STRIPPED etx_mode = ETX_MODE_STRIPPED
while i < n - 1:
if is_dle_seq(i, STX):
start = i
i += 2 # move past DLE STX
payload_start = i
# find end-of-frame marker
while i < n: while i < n:
if etx_mode == ETX_MODE_RAW and is_dle_seq(i, ETX): b = blob[i]
payload_end = i # up to (but not including) DLE ETX
i += 2 # skip DLE ETX if state == STATE_IDLE:
end = i if b == DLE and i + 1 < n and blob[i + 1] == STX:
break start_offset = i
if etx_mode == ETX_MODE_STRIPPED and blob[i] == ETX: payload_raw = bytearray()
payload_end = i # up to (but not including) bare ETX i += 2
i += 1 # skip ETX state = STATE_IN_FRAME
end = i continue
break
i += 1 i += 1
continue
else: elif state == STATE_IN_FRAME:
# Ran off the end without finding ETX
break
payload_raw = blob[payload_start:payload_end] # RAW mode: look for DLE+ETX
payload = unescape_dle(payload_raw) if etx_mode == ETX_MODE_RAW and b == DLE:
state = STATE_AFTER_DLE
i += 1
continue
# STRIPPED mode: bare ETX ends frame
if etx_mode == ETX_MODE_STRIPPED and b == ETX:
payload_end = i
i += 1
end_offset = i
payload = unescape_dle(bytes(payload_raw))
trailer = blob[i:i + trailer_len] if trailer_len > 0 else b"" trailer = blob[i:i + trailer_len] if trailer_len > 0 else b""
i += trailer_len i += trailer_len
frames.append(Frame( frames.append(Frame(
index=idx, index=idx,
start_offset=start, start_offset=start_offset,
end_offset=end, end_offset=end_offset,
payload_raw=payload_raw, payload_raw=bytes(payload_raw),
payload=payload, payload=payload,
trailer=trailer, trailer=trailer,
crc_match=None crc_match=None
)) ))
idx += 1 idx += 1
state = STATE_IDLE
continue continue
# optional: you can also detect DLE EOT boundaries if useful later payload_raw.append(b)
i += 1 i += 1
continue
elif state == STATE_AFTER_DLE:
if etx_mode == ETX_MODE_RAW and b == ETX:
# Proper DLE ETX terminator
end_offset = i + 1
i += 1
payload = unescape_dle(bytes(payload_raw))
trailer = blob[i:i + trailer_len] if trailer_len > 0 else b""
i += trailer_len
frames.append(Frame(
index=idx,
start_offset=start_offset,
end_offset=end_offset,
payload_raw=bytes(payload_raw),
payload=payload,
trailer=trailer,
crc_match=None
))
idx += 1
state = STATE_IDLE
continue
elif b == DLE:
# Escaped DLE (10 10)
payload_raw.append(DLE)
state = STATE_IN_FRAME
i += 1
continue
else:
# False alarm — previous 10 was data
payload_raw.append(DLE)
payload_raw.append(b)
state = STATE_IN_FRAME
i += 1
continue
return frames return frames