feat: added raw capture pipeline. added simple windows gui.
This commit is contained in:
@@ -20,6 +20,14 @@ STX = 0x02
|
||||
ETX = 0x03
|
||||
EOT = 0x04
|
||||
|
||||
# How the capture was produced:
|
||||
# - Raw serial captures include DLE+ETX (`0x10 0x03`).
|
||||
# - The s3_bridge `.bin` logger strips the DLE byte from ETX, so frames end with a
|
||||
# bare `0x03`. See docs/instantel_protocol_reference.md §Appendix A.
|
||||
ETX_MODE_AUTO = "auto"
|
||||
ETX_MODE_RAW = "raw" # expect DLE+ETX
|
||||
ETX_MODE_STRIPPED = "stripped" # expect bare ETX
|
||||
|
||||
@dataclass
|
||||
class Frame:
|
||||
index: int
|
||||
@@ -92,7 +100,7 @@ CRC_FUNCS = {
|
||||
"CRC-16/X-25": crc16_x25,
|
||||
}
|
||||
|
||||
def parse_frames(blob: bytes, trailer_len: int) -> List[Frame]:
|
||||
def parse_frames(blob: bytes, trailer_len: int, etx_mode: str = ETX_MODE_AUTO) -> List[Frame]:
|
||||
frames: List[Frame] = []
|
||||
i = 0
|
||||
idx = 0
|
||||
@@ -101,21 +109,40 @@ def parse_frames(blob: bytes, trailer_len: int) -> List[Frame]:
|
||||
def is_dle_seq(pos: int, second: int) -> bool:
|
||||
return pos + 1 < n and blob[pos] == DLE and blob[pos + 1] == second
|
||||
|
||||
# Auto-detect whether ETX is bare (logger-stripped) or DLE+ETX (raw wire).
|
||||
if etx_mode == ETX_MODE_AUTO:
|
||||
raw_etx = sum(1 for p in range(n - 1) if is_dle_seq(p, ETX))
|
||||
stx_count = sum(1 for p in range(n - 1) if is_dle_seq(p, STX))
|
||||
# Heuristic: the logger-stripped .bin files have plenty of STX but
|
||||
# almost no DLE+ETX. If ETX count is far below STX count, assume stripped.
|
||||
if raw_etx and raw_etx >= max(1, int(0.8 * stx_count)):
|
||||
etx_mode = ETX_MODE_RAW
|
||||
else:
|
||||
etx_mode = ETX_MODE_STRIPPED
|
||||
|
||||
while i < n - 1:
|
||||
if is_dle_seq(i, STX):
|
||||
start = i
|
||||
i += 2 # move past DLE STX
|
||||
payload_start = i
|
||||
|
||||
# find DLE ETX
|
||||
while i < n - 1 and not is_dle_seq(i, ETX):
|
||||
# find end-of-frame marker
|
||||
while i < n:
|
||||
if etx_mode == ETX_MODE_RAW and is_dle_seq(i, ETX):
|
||||
payload_end = i # up to (but not including) DLE ETX
|
||||
i += 2 # skip DLE ETX
|
||||
end = i
|
||||
break
|
||||
if etx_mode == ETX_MODE_STRIPPED and blob[i] == ETX:
|
||||
payload_end = i # up to (but not including) bare ETX
|
||||
i += 1 # skip ETX
|
||||
end = i
|
||||
break
|
||||
i += 1
|
||||
|
||||
if i >= n - 1:
|
||||
break # truncated
|
||||
payload_end = i # bytes up to (but not including) DLE ETX
|
||||
i += 2 # skip DLE ETX
|
||||
end = i
|
||||
else:
|
||||
# Ran off the end without finding ETX
|
||||
break
|
||||
|
||||
payload_raw = blob[payload_start:payload_end]
|
||||
payload = unescape_dle(payload_raw)
|
||||
@@ -162,11 +189,18 @@ def main() -> None:
|
||||
ap.add_argument("--trailer-len", type=int, default=2, help="Bytes to capture after DLE ETX (default: 2)")
|
||||
ap.add_argument("--crc", action="store_true", help="Attempt CRC match using first 2 trailer bytes")
|
||||
ap.add_argument("--crc-endian", choices=["little", "big"], default="little", help="CRC endian when reading trailer")
|
||||
ap.add_argument(
|
||||
"--etx-mode",
|
||||
choices=[ETX_MODE_AUTO, ETX_MODE_RAW, ETX_MODE_STRIPPED],
|
||||
default=ETX_MODE_AUTO,
|
||||
help="How to detect end-of-frame: 'raw' expects DLE+ETX, "
|
||||
"'stripped' expects bare ETX (s3_bridge .bin), 'auto' picks based on presence of DLE+ETX."
|
||||
)
|
||||
ap.add_argument("--out", type=Path, default=None, help="Write JSONL output to this file")
|
||||
args = ap.parse_args()
|
||||
|
||||
blob = args.binfile.read_bytes()
|
||||
frames = parse_frames(blob, trailer_len=args.trailer_len)
|
||||
frames = parse_frames(blob, trailer_len=args.trailer_len, etx_mode=args.etx_mode)
|
||||
|
||||
little = (args.crc_endian == "little")
|
||||
if args.crc:
|
||||
@@ -208,4 +242,4 @@ def main() -> None:
|
||||
print(f"... ({len(lines) - 10} more)")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user